Skip to content

Commit

Permalink
bugfix: fix vpn link and rule leak
Browse files Browse the repository at this point in the history
  • Loading branch information
珩轩 committed Jun 4, 2024
1 parent 820f971 commit 44e4565
Show file tree
Hide file tree
Showing 21 changed files with 566 additions and 407 deletions.
2 changes: 1 addition & 1 deletion charts/raven-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ proxy:
metricsBindAddr: ":10266"

rollingUpdate:
maxUnavailable: 5%
maxUnavailable: 20%
2 changes: 1 addition & 1 deletion cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.MetricsBindAddress, "metric-bind-addr", o.MetricsBindAddress, `Binding address of tunnel metrics. (default ":10265")`)
fs.StringVar(&o.HealthProbeAddr, "health-probe-addr", o.HealthProbeAddr, `The address the healthz/readyz endpoint binds to.. (default ":10275")`)
fs.BoolVar(&o.SyncRules, "sync-raven-rules", true, "Whether to synchronize raven rules regularly")
fs.DurationVar(&o.SyncPeriod.Duration, "sync-raven-rules-period", 30*time.Minute, "The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute and the maximum value is 24 hour")
fs.DurationVar(&o.SyncPeriod.Duration, "sync-raven-rules-period", 10*time.Minute, "The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute and the maximum value is 24 hour")

fs.StringVar(&o.VPNPort, "vpn-bind-port", o.VPNPort, `Binding port of vpn. (default ":4500")`)
fs.BoolVar(&o.NATTraversal, "nat-traversal", o.NATTraversal, `Enable NAT Traversal or not. (default "false")`)
Expand Down
20 changes: 9 additions & 11 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package engine

import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -59,7 +59,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
return reconcile.Result{}, nil
}))
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to new controller with manager, error %s", err.Error()))
klog.Errorf("fail to new controller with manager, error %s", err.Error())
return engine, err
}
engine.client = engine.manager.GetClient()
Expand All @@ -72,7 +72,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
}
err = engine.tunnel.InitDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to init tunnel driver, error %s", err.Error()))
klog.Errorf("fail to init tunnel driver, error %s", err.Error())
return engine, err
}

Expand Down Expand Up @@ -167,7 +167,7 @@ func (e *Engine) cleanup() {
if e.option.GetTunnelStatus() {
err := e.tunnel.CleanupDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("failed to cleanup tunnel driver, error %s", err.Error()))
klog.Errorf("failed to cleanup tunnel driver, error %s", err.Error())
}
}
if e.option.GetProxyStatus() {
Expand All @@ -182,18 +182,18 @@ func (e *Engine) handleEventErr(err error, gw *v1beta1.Gateway) {
}

if e.queue.NumRequeues(gw) < utils.MaxRetries {
klog.Info(utils.FormatRavenEngine("error syncing event %s: %s", gw.GetName(), err.Error()))
klog.Infof("error syncing event %s: %s", gw.GetName(), err.Error())
e.queue.AddRateLimited(gw)
return
}
klog.Info(utils.FormatRavenEngine("dropping event %s out of the queue: %s", gw.GetName(), err.Error()))
klog.Infof("dropping event %s out of the queue: %s", gw.GetName(), err.Error())
e.queue.Forget(gw)
}

func (e *Engine) addGateway(evt event.CreateEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("adding gateway %s", gw.GetName()))
klog.Infof("adding gateway %s", gw.GetName())
e.queue.Add(gw.DeepCopy())
}
return ok
Expand All @@ -206,10 +206,8 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
if ok1 && ok2 {
if oldGw.ResourceVersion != newGw.ResourceVersion {
update = true
klog.InfoS(utils.FormatRavenEngine("updating gateway, %s", newGw.GetName()))
klog.Infof("updating gateway, %s", newGw.GetName())
e.queue.Add(newGw.DeepCopy())
} else {
klog.InfoS(utils.FormatRavenEngine("skip handle update gateway"), klog.KObj(newGw))
}
}
return update
Expand All @@ -218,7 +216,7 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
func (e *Engine) deleteGateway(evt event.DeleteEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("deleting gateway, %s", gw.GetName()))
klog.Infof("deleting gateway, %s", gw.GetName())
e.queue.Add(gw.DeepCopy())
}
return ok
Expand Down
20 changes: 10 additions & 10 deletions pkg/engine/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (p *ProxyEngine) Handler() error {
srcAddr := getSrcAddressForProxyServer(p.client, p.nodeName)
err = p.startProxyServer()
if err != nil {
klog.Errorf(utils.FormatProxyServer("failed to start proxy server, error %s", err.Error()))
klog.Errorf("failed to start proxy server, error %s", err.Error())
return err
}
p.serverLocalEndpoints = srcAddr
Expand All @@ -93,7 +93,7 @@ func (p *ProxyEngine) Handler() error {
time.Sleep(2 * time.Second)
err = p.startProxyServer()
if err != nil {
klog.Errorf(utils.FormatProxyServer("failed to start proxy server, error %s", err.Error()))
klog.Errorf("failed to start proxy server, error %s", err.Error())
return err
}
p.serverLocalEndpoints = srcAddr
Expand All @@ -106,23 +106,23 @@ func (p *ProxyEngine) Handler() error {
case StartType:
err = p.startProxyClient()
if err != nil {
klog.Errorf(utils.FormatProxyServer("failed to start proxy client, error %s", err.Error()))
klog.Errorf("failed to start proxy client, error %s", err.Error())
return err
}
case StopType:
p.stopProxyClient()
case RestartType:
dstAddr := getDestAddressForProxyClient(p.client, p.localGateway)
if len(dstAddr) < 1 {
klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it"))
klog.Infoln("dest address is empty, will not connected it")
return nil
}
if strings.Join(p.clientRemoteEndpoints, ",") != strings.Join(dstAddr, ",") {
p.stopProxyClient()
time.Sleep(2 * time.Second)
err = p.startProxyClient()
if err != nil {
klog.Errorf(utils.FormatProxyServer("failed to start proxy client, error %s", err.Error()))
klog.Errorf("failed to start proxy client, error %s", err.Error())
return err
}
}
Expand All @@ -133,7 +133,7 @@ func (p *ProxyEngine) Handler() error {
}

func (p *ProxyEngine) startProxyServer() error {
klog.Infoln(utils.FormatProxyServer("start raven l7 proxy server"))
klog.Infoln("start raven l7 proxy server")
if p.localGateway == nil {
return fmt.Errorf("unknown gateway for node %s, can not start proxy server", p.nodeName)
}
Expand Down Expand Up @@ -164,19 +164,19 @@ func (p *ProxyEngine) startProxyServer() error {
}

func (p *ProxyEngine) stopProxyServer() {
klog.Infoln(utils.FormatProxyServer("Stop raven l7 proxy server"))
klog.Infoln("Stop raven l7 proxy server")
cancel := p.proxyCtx.GetServerCancelFunc()
cancel()
p.proxyOption.SetServerStatus(false)
p.proxyCtx.ReloadServerContext(p.ctx)
}

func (p *ProxyEngine) startProxyClient() error {
klog.Infoln(utils.FormatProxyClient("start raven l7 proxy client"))
klog.Infoln("start raven l7 proxy client")
var err error
dstAddr := getDestAddressForProxyClient(p.client, p.localGateway)
if len(dstAddr) < 1 {
klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it"))
klog.Infoln("dest address is empty, will not connected it")
return nil
}
p.clientRemoteEndpoints = dstAddr
Expand All @@ -202,7 +202,7 @@ func (p *ProxyEngine) startProxyClient() error {
}

func (p *ProxyEngine) stopProxyClient() {
klog.Infoln(utils.FormatProxyClient("stop raven l7 proxy client"))
klog.Infoln("stop raven l7 proxy client")
cancel := p.proxyCtx.GetClientCancelFunc()
cancel()
p.proxyOption.SetClientStatus(false)
Expand Down
22 changes: 6 additions & 16 deletions pkg/engine/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"net"
"reflect"
"strconv"

"github.com/EvilSuperstars/go-cidrman"
Expand Down Expand Up @@ -51,9 +50,8 @@ type TunnelEngine struct {
routeDriver routedriver.Driver
vpnDriver vpndriver.Driver

nodeInfos map[types.NodeName]*v1beta1.NodeInfo
network *types.Network
lastSeenNetwork *types.Network
nodeInfos map[types.NodeName]*v1beta1.NodeInfo
network *types.Network
}

func (c *TunnelEngine) InitDriver() error {
Expand All @@ -74,7 +72,7 @@ func (c *TunnelEngine) InitDriver() error {
if err != nil {
return fmt.Errorf("fail to initialize vpn driver: %s, %s", c.config.Tunnel.VPNDriver, err)
}
klog.Info(utils.FormatTunnel("route driver %s and vpn driver %s are initialized", c.config.Tunnel.RouteDriver, c.config.Tunnel.VPNDriver))
klog.Infof("route driver %s and vpn driver %s are initialized", c.config.Tunnel.RouteDriver, c.config.Tunnel.VPNDriver)
return nil
}

Expand Down Expand Up @@ -105,7 +103,7 @@ func (c *TunnelEngine) Status() bool {
func (c *TunnelEngine) Handler() error {
if c.config.Tunnel.NATTraversal {
if err := c.checkNatCapability(); err != nil {
klog.Errorf(utils.FormatTunnel("fail to check the capability of NAT, error %s", err.Error()))
klog.Errorf("fail to check the capability of NAT, error %s", err.Error())
return err
}
}
Expand Down Expand Up @@ -154,26 +152,18 @@ func (c *TunnelEngine) Handler() error {
}
c.syncGateway(gw)
}
if reflect.DeepEqual(c.network, c.lastSeenNetwork) {
klog.Info("network not changed, skip to process")
return nil
}
nw := c.network.Copy()
klog.InfoS("applying network", "localEndpoint", nw.LocalEndpoint, "remoteEndpoint", nw.RemoteEndpoints)
err = c.vpnDriver.Apply(nw, c.routeDriver.MTU)
if err != nil {
klog.ErrorS(err, "error apply vpn driver")
klog.Errorf("error apply vpn driver, error %s", err.Error())
return err
}
err = c.routeDriver.Apply(nw, c.vpnDriver.MTU)
if err != nil {
klog.ErrorS(err, "error apply route driver")
klog.Errorf("error apply route driver, error %s", err.Error())
return err
}

// Only update lastSeenNetwork when all operations succeeded.
c.lastSeenNetwork = c.network

return nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/networkengine/routedriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/openyurtio/raven/cmd/agent/app/config"
"github.com/openyurtio/raven/pkg/types"
"github.com/openyurtio/raven/pkg/utils"
)

// Driver is the interface for inner gateway routing mechanism.
Expand Down Expand Up @@ -61,17 +60,17 @@ func RegisterRouteDriver(name string, factory Factory) {
driversMutex.Lock()
defer driversMutex.Unlock()
if _, found := drivers[name]; found {
klog.Fatal(utils.FormatTunnel("route drivers %q was registered twice", name))
klog.Fatal("route drivers %q was registered twice", name)
}
klog.V(1).Info(utils.FormatTunnel("registered route driver %q", name))
klog.Info("registered route driver %q", name)
drivers[name] = factory
}

func New(name string, cfg *config.Config) (Driver, error) {
driversMutex.Lock()
defer driversMutex.Unlock()
if _, found := drivers[name]; !found {
klog.Fatal(utils.FormatTunnel("route driver %q not found", name))
klog.Fatal("route driver %q not found", name)
}
return drivers[name](cfg)
}
3 changes: 1 addition & 2 deletions pkg/networkengine/routedriver/vxlan/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/klog/v2"

netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink"
"github.com/openyurtio/raven/pkg/utils"
)

const (
Expand All @@ -42,7 +41,7 @@ func ensureVxlanLink(vxlan netlink.Vxlan, vtepIP net.IP) (netlink.Link, error) {
return link
}
if _, ok := err.(netlink.LinkNotFoundError); !ok {
klog.Errorf(utils.FormatTunnel("error get vxlan link: %v", err))
klog.Errorf("error get vxlan link: %v", err)
}
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/networkengine/routedriver/vxlan/vxlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset"
iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables"
"github.com/openyurtio/raven/pkg/types"
"github.com/openyurtio/raven/pkg/utils"
)

const (
Expand All @@ -53,7 +52,8 @@ const (

ravenMark = 0x40

ravenMarkSet = "raven-mark-set"
ravenMarkSet = "raven-mark-set"
ravenMarkSetType = "hash:net"
)

var (
Expand All @@ -76,11 +76,11 @@ type vxlan struct {

func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error)) (err error) {
if network.LocalEndpoint == nil || len(network.RemoteEndpoints) == 0 {
klog.Info(utils.FormatTunnel("no local gateway or remote gateway is found, cleaning up route setting"))
klog.Info("no local gateway or remote gateway is found, cleaning up route setting")
return vx.Cleanup()
}
if len(network.LocalNodeInfo) == 1 {
klog.Infof(utils.FormatTunnel("only gateway node exist in current gateway, cleaning up route setting"))
klog.Info("only gateway node exist in current gateway, cleaning up route setting")
return vx.Cleanup()
}

Expand All @@ -103,8 +103,7 @@ func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error
// The desired and current ipset entries calculated from given network.
// The key is ip set entry
var desiredSet, currentSet map[string]*netlink.IPSetEntry

vx.ipset, err = ipsetutil.New(ravenMarkSet)
vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{})
if err != nil {
return fmt.Errorf("error create ip set: %s", err)
}
Expand Down Expand Up @@ -256,7 +255,7 @@ func (vx *vxlan) Init() (err error) {
return err
}

vx.ipset, err = ipsetutil.New(ravenMarkSet)
vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{})
if err != nil {
return err
}
Expand Down Expand Up @@ -530,7 +529,7 @@ func (vx *vxlan) calIPSetOnNode(network *types.Network) map[string]*netlink.IPSe
CIDR: uint8(ones),
Replace: true,
}
set[ipsetutil.SetEntryKey(entry)] = entry
set[vx.ipset.Key(entry)] = entry
}
}
return set
Expand Down Expand Up @@ -569,7 +568,7 @@ func (vx *vxlan) Cleanup() error {
}

// Clean may be called more than one time, so we should ensure ip set exists
vx.ipset, err = ipsetutil.New(ravenMarkSet)
vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{})
if err != nil {
errList = errList.Append(fmt.Errorf("error ensure ip set %s: %s", ravenMarkSet, err))
}
Expand Down
Loading

0 comments on commit 44e4565

Please sign in to comment.