From 44e4565f1a657f0d11a1bd577332bd496b64c76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8F=A9=E8=BD=A9?= Date: Thu, 23 May 2024 18:19:14 +0800 Subject: [PATCH] bugfix: fix vpn link and rule leak --- charts/raven-agent/values.yaml | 2 +- cmd/agent/app/options/options.go | 2 +- pkg/engine/engine.go | 20 +- pkg/engine/proxy.go | 20 +- pkg/engine/tunnel.go | 22 +- pkg/networkengine/routedriver/driver.go | 7 +- pkg/networkengine/routedriver/vxlan/utils.go | 3 +- pkg/networkengine/routedriver/vxlan/vxlan.go | 17 +- pkg/networkengine/util/ipset/ipset.go | 32 +- pkg/networkengine/util/netlink/netlink.go | 11 + pkg/networkengine/util/utils.go | 8 +- .../vpndriver/libreswan/libreswan.go | 299 +++++++++++------- .../vpndriver/libreswan/libreswan_test.go | 6 +- pkg/networkengine/vpndriver/vpnipset.go | 129 ++++++++ .../vpndriver/wireguard/wireguard.go | 255 +++++++-------- pkg/proxyengine/proxyclient/proxyclient.go | 8 +- pkg/proxyengine/proxyserver/interceptor.go | 38 +-- pkg/proxyengine/proxyserver/manageheader.go | 14 +- pkg/proxyengine/proxyserver/proxyserver.go | 14 +- pkg/proxyengine/proxyserver/servers.go | 25 +- pkg/utils/utils.go | 41 --- 21 files changed, 566 insertions(+), 407 deletions(-) create mode 100644 pkg/networkengine/vpndriver/vpnipset.go delete mode 100644 pkg/utils/utils.go diff --git a/charts/raven-agent/values.yaml b/charts/raven-agent/values.yaml index 5b1d467..e794cbe 100644 --- a/charts/raven-agent/values.yaml +++ b/charts/raven-agent/values.yaml @@ -89,4 +89,4 @@ proxy: metricsBindAddr: ":10266" rollingUpdate: - maxUnavailable: 5% \ No newline at end of file + maxUnavailable: 20% \ No newline at end of file diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index f1e2467..016dbd0 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -113,7 +113,7 @@ func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.MetricsBindAddress, "metric-bind-addr", o.MetricsBindAddress, `Binding address of tunnel metrics. (default ":10265")`) fs.StringVar(&o.HealthProbeAddr, "health-probe-addr", o.HealthProbeAddr, `The address the healthz/readyz endpoint binds to.. (default ":10275")`) fs.BoolVar(&o.SyncRules, "sync-raven-rules", true, "Whether to synchronize raven rules regularly") - fs.DurationVar(&o.SyncPeriod.Duration, "sync-raven-rules-period", 30*time.Minute, "The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute and the maximum value is 24 hour") + fs.DurationVar(&o.SyncPeriod.Duration, "sync-raven-rules-period", 10*time.Minute, "The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute and the maximum value is 24 hour") fs.StringVar(&o.VPNPort, "vpn-bind-port", o.VPNPort, `Binding port of vpn. (default ":4500")`) fs.BoolVar(&o.NATTraversal, "nat-traversal", o.NATTraversal, `Enable NAT Traversal or not. (default "false")`) diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 42868ca..c744b71 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -2,9 +2,9 @@ package engine import ( "context" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" @@ -59,7 +59,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) { return reconcile.Result{}, nil })) if err != nil { - klog.Errorf(utils.FormatRavenEngine("fail to new controller with manager, error %s", err.Error())) + klog.Errorf("fail to new controller with manager, error %s", err.Error()) return engine, err } engine.client = engine.manager.GetClient() @@ -72,7 +72,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) { } err = engine.tunnel.InitDriver() if err != nil { - klog.Errorf(utils.FormatRavenEngine("fail to init tunnel driver, error %s", err.Error())) + klog.Errorf("fail to init tunnel driver, error %s", err.Error()) return engine, err } @@ -167,7 +167,7 @@ func (e *Engine) cleanup() { if e.option.GetTunnelStatus() { err := e.tunnel.CleanupDriver() if err != nil { - klog.Errorf(utils.FormatRavenEngine("failed to cleanup tunnel driver, error %s", err.Error())) + klog.Errorf("failed to cleanup tunnel driver, error %s", err.Error()) } } if e.option.GetProxyStatus() { @@ -182,18 +182,18 @@ func (e *Engine) handleEventErr(err error, gw *v1beta1.Gateway) { } if e.queue.NumRequeues(gw) < utils.MaxRetries { - klog.Info(utils.FormatRavenEngine("error syncing event %s: %s", gw.GetName(), err.Error())) + klog.Infof("error syncing event %s: %s", gw.GetName(), err.Error()) e.queue.AddRateLimited(gw) return } - klog.Info(utils.FormatRavenEngine("dropping event %s out of the queue: %s", gw.GetName(), err.Error())) + klog.Infof("dropping event %s out of the queue: %s", gw.GetName(), err.Error()) e.queue.Forget(gw) } func (e *Engine) addGateway(evt event.CreateEvent) bool { gw, ok := evt.Object.(*v1beta1.Gateway) if ok { - klog.InfoS(utils.FormatRavenEngine("adding gateway %s", gw.GetName())) + klog.Infof("adding gateway %s", gw.GetName()) e.queue.Add(gw.DeepCopy()) } return ok @@ -206,10 +206,8 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool { if ok1 && ok2 { if oldGw.ResourceVersion != newGw.ResourceVersion { update = true - klog.InfoS(utils.FormatRavenEngine("updating gateway, %s", newGw.GetName())) + klog.Infof("updating gateway, %s", newGw.GetName()) e.queue.Add(newGw.DeepCopy()) - } else { - klog.InfoS(utils.FormatRavenEngine("skip handle update gateway"), klog.KObj(newGw)) } } return update @@ -218,7 +216,7 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool { func (e *Engine) deleteGateway(evt event.DeleteEvent) bool { gw, ok := evt.Object.(*v1beta1.Gateway) if ok { - klog.InfoS(utils.FormatRavenEngine("deleting gateway, %s", gw.GetName())) + klog.Infof("deleting gateway, %s", gw.GetName()) e.queue.Add(gw.DeepCopy()) } return ok diff --git a/pkg/engine/proxy.go b/pkg/engine/proxy.go index c9f999a..8f3f016 100644 --- a/pkg/engine/proxy.go +++ b/pkg/engine/proxy.go @@ -79,7 +79,7 @@ func (p *ProxyEngine) Handler() error { srcAddr := getSrcAddressForProxyServer(p.client, p.nodeName) err = p.startProxyServer() if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to start proxy server, error %s", err.Error())) + klog.Errorf("failed to start proxy server, error %s", err.Error()) return err } p.serverLocalEndpoints = srcAddr @@ -93,7 +93,7 @@ func (p *ProxyEngine) Handler() error { time.Sleep(2 * time.Second) err = p.startProxyServer() if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to start proxy server, error %s", err.Error())) + klog.Errorf("failed to start proxy server, error %s", err.Error()) return err } p.serverLocalEndpoints = srcAddr @@ -106,7 +106,7 @@ func (p *ProxyEngine) Handler() error { case StartType: err = p.startProxyClient() if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to start proxy client, error %s", err.Error())) + klog.Errorf("failed to start proxy client, error %s", err.Error()) return err } case StopType: @@ -114,7 +114,7 @@ func (p *ProxyEngine) Handler() error { case RestartType: dstAddr := getDestAddressForProxyClient(p.client, p.localGateway) if len(dstAddr) < 1 { - klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it")) + klog.Infoln("dest address is empty, will not connected it") return nil } if strings.Join(p.clientRemoteEndpoints, ",") != strings.Join(dstAddr, ",") { @@ -122,7 +122,7 @@ func (p *ProxyEngine) Handler() error { time.Sleep(2 * time.Second) err = p.startProxyClient() if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to start proxy client, error %s", err.Error())) + klog.Errorf("failed to start proxy client, error %s", err.Error()) return err } } @@ -133,7 +133,7 @@ func (p *ProxyEngine) Handler() error { } func (p *ProxyEngine) startProxyServer() error { - klog.Infoln(utils.FormatProxyServer("start raven l7 proxy server")) + klog.Infoln("start raven l7 proxy server") if p.localGateway == nil { return fmt.Errorf("unknown gateway for node %s, can not start proxy server", p.nodeName) } @@ -164,7 +164,7 @@ func (p *ProxyEngine) startProxyServer() error { } func (p *ProxyEngine) stopProxyServer() { - klog.Infoln(utils.FormatProxyServer("Stop raven l7 proxy server")) + klog.Infoln("Stop raven l7 proxy server") cancel := p.proxyCtx.GetServerCancelFunc() cancel() p.proxyOption.SetServerStatus(false) @@ -172,11 +172,11 @@ func (p *ProxyEngine) stopProxyServer() { } func (p *ProxyEngine) startProxyClient() error { - klog.Infoln(utils.FormatProxyClient("start raven l7 proxy client")) + klog.Infoln("start raven l7 proxy client") var err error dstAddr := getDestAddressForProxyClient(p.client, p.localGateway) if len(dstAddr) < 1 { - klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it")) + klog.Infoln("dest address is empty, will not connected it") return nil } p.clientRemoteEndpoints = dstAddr @@ -202,7 +202,7 @@ func (p *ProxyEngine) startProxyClient() error { } func (p *ProxyEngine) stopProxyClient() { - klog.Infoln(utils.FormatProxyClient("stop raven l7 proxy client")) + klog.Infoln("stop raven l7 proxy client") cancel := p.proxyCtx.GetClientCancelFunc() cancel() p.proxyOption.SetClientStatus(false) diff --git a/pkg/engine/tunnel.go b/pkg/engine/tunnel.go index 6bae19c..2efbefc 100644 --- a/pkg/engine/tunnel.go +++ b/pkg/engine/tunnel.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net" - "reflect" "strconv" "github.com/EvilSuperstars/go-cidrman" @@ -51,9 +50,8 @@ type TunnelEngine struct { routeDriver routedriver.Driver vpnDriver vpndriver.Driver - nodeInfos map[types.NodeName]*v1beta1.NodeInfo - network *types.Network - lastSeenNetwork *types.Network + nodeInfos map[types.NodeName]*v1beta1.NodeInfo + network *types.Network } func (c *TunnelEngine) InitDriver() error { @@ -74,7 +72,7 @@ func (c *TunnelEngine) InitDriver() error { if err != nil { return fmt.Errorf("fail to initialize vpn driver: %s, %s", c.config.Tunnel.VPNDriver, err) } - klog.Info(utils.FormatTunnel("route driver %s and vpn driver %s are initialized", c.config.Tunnel.RouteDriver, c.config.Tunnel.VPNDriver)) + klog.Infof("route driver %s and vpn driver %s are initialized", c.config.Tunnel.RouteDriver, c.config.Tunnel.VPNDriver) return nil } @@ -105,7 +103,7 @@ func (c *TunnelEngine) Status() bool { func (c *TunnelEngine) Handler() error { if c.config.Tunnel.NATTraversal { if err := c.checkNatCapability(); err != nil { - klog.Errorf(utils.FormatTunnel("fail to check the capability of NAT, error %s", err.Error())) + klog.Errorf("fail to check the capability of NAT, error %s", err.Error()) return err } } @@ -154,26 +152,18 @@ func (c *TunnelEngine) Handler() error { } c.syncGateway(gw) } - if reflect.DeepEqual(c.network, c.lastSeenNetwork) { - klog.Info("network not changed, skip to process") - return nil - } nw := c.network.Copy() klog.InfoS("applying network", "localEndpoint", nw.LocalEndpoint, "remoteEndpoint", nw.RemoteEndpoints) err = c.vpnDriver.Apply(nw, c.routeDriver.MTU) if err != nil { - klog.ErrorS(err, "error apply vpn driver") + klog.Errorf("error apply vpn driver, error %s", err.Error()) return err } err = c.routeDriver.Apply(nw, c.vpnDriver.MTU) if err != nil { - klog.ErrorS(err, "error apply route driver") + klog.Errorf("error apply route driver, error %s", err.Error()) return err } - - // Only update lastSeenNetwork when all operations succeeded. - c.lastSeenNetwork = c.network - return nil } diff --git a/pkg/networkengine/routedriver/driver.go b/pkg/networkengine/routedriver/driver.go index 2a49148..3754a42 100644 --- a/pkg/networkengine/routedriver/driver.go +++ b/pkg/networkengine/routedriver/driver.go @@ -25,7 +25,6 @@ import ( "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/types" - "github.com/openyurtio/raven/pkg/utils" ) // Driver is the interface for inner gateway routing mechanism. @@ -61,9 +60,9 @@ func RegisterRouteDriver(name string, factory Factory) { driversMutex.Lock() defer driversMutex.Unlock() if _, found := drivers[name]; found { - klog.Fatal(utils.FormatTunnel("route drivers %q was registered twice", name)) + klog.Fatal("route drivers %q was registered twice", name) } - klog.V(1).Info(utils.FormatTunnel("registered route driver %q", name)) + klog.Info("registered route driver %q", name) drivers[name] = factory } @@ -71,7 +70,7 @@ func New(name string, cfg *config.Config) (Driver, error) { driversMutex.Lock() defer driversMutex.Unlock() if _, found := drivers[name]; !found { - klog.Fatal(utils.FormatTunnel("route driver %q not found", name)) + klog.Fatal("route driver %q not found", name) } return drivers[name](cfg) } diff --git a/pkg/networkengine/routedriver/vxlan/utils.go b/pkg/networkengine/routedriver/vxlan/utils.go index f3abf9b..de47e86 100644 --- a/pkg/networkengine/routedriver/vxlan/utils.go +++ b/pkg/networkengine/routedriver/vxlan/utils.go @@ -28,7 +28,6 @@ import ( "k8s.io/klog/v2" netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" - "github.com/openyurtio/raven/pkg/utils" ) const ( @@ -42,7 +41,7 @@ func ensureVxlanLink(vxlan netlink.Vxlan, vtepIP net.IP) (netlink.Link, error) { return link } if _, ok := err.(netlink.LinkNotFoundError); !ok { - klog.Errorf(utils.FormatTunnel("error get vxlan link: %v", err)) + klog.Errorf("error get vxlan link: %v", err) } return nil } diff --git a/pkg/networkengine/routedriver/vxlan/vxlan.go b/pkg/networkengine/routedriver/vxlan/vxlan.go index 0f74483..509b40b 100644 --- a/pkg/networkengine/routedriver/vxlan/vxlan.go +++ b/pkg/networkengine/routedriver/vxlan/vxlan.go @@ -36,7 +36,6 @@ import ( ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" "github.com/openyurtio/raven/pkg/types" - "github.com/openyurtio/raven/pkg/utils" ) const ( @@ -53,7 +52,8 @@ const ( ravenMark = 0x40 - ravenMarkSet = "raven-mark-set" + ravenMarkSet = "raven-mark-set" + ravenMarkSetType = "hash:net" ) var ( @@ -76,11 +76,11 @@ type vxlan struct { func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error)) (err error) { if network.LocalEndpoint == nil || len(network.RemoteEndpoints) == 0 { - klog.Info(utils.FormatTunnel("no local gateway or remote gateway is found, cleaning up route setting")) + klog.Info("no local gateway or remote gateway is found, cleaning up route setting") return vx.Cleanup() } if len(network.LocalNodeInfo) == 1 { - klog.Infof(utils.FormatTunnel("only gateway node exist in current gateway, cleaning up route setting")) + klog.Info("only gateway node exist in current gateway, cleaning up route setting") return vx.Cleanup() } @@ -103,8 +103,7 @@ func (vx *vxlan) Apply(network *types.Network, vpnDriverMTUFn func() (int, error // The desired and current ipset entries calculated from given network. // The key is ip set entry var desiredSet, currentSet map[string]*netlink.IPSetEntry - - vx.ipset, err = ipsetutil.New(ravenMarkSet) + vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{}) if err != nil { return fmt.Errorf("error create ip set: %s", err) } @@ -256,7 +255,7 @@ func (vx *vxlan) Init() (err error) { return err } - vx.ipset, err = ipsetutil.New(ravenMarkSet) + vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{}) if err != nil { return err } @@ -530,7 +529,7 @@ func (vx *vxlan) calIPSetOnNode(network *types.Network) map[string]*netlink.IPSe CIDR: uint8(ones), Replace: true, } - set[ipsetutil.SetEntryKey(entry)] = entry + set[vx.ipset.Key(entry)] = entry } } return set @@ -569,7 +568,7 @@ func (vx *vxlan) Cleanup() error { } // Clean may be called more than one time, so we should ensure ip set exists - vx.ipset, err = ipsetutil.New(ravenMarkSet) + vx.ipset, err = ipsetutil.New(ravenMarkSet, ravenMarkSetType, ipsetutil.IpsetWrapperOption{}) if err != nil { errList = errList.Append(fmt.Errorf("error ensure ip set %s: %s", ravenMarkSet, err)) } diff --git a/pkg/networkengine/util/ipset/ipset.go b/pkg/networkengine/util/ipset/ipset.go index 66e34e6..ee844eb 100644 --- a/pkg/networkengine/util/ipset/ipset.go +++ b/pkg/networkengine/util/ipset/ipset.go @@ -33,14 +33,26 @@ type IPSetInterface interface { Del(entry *netlink.IPSetEntry) error Flush() error Destroy() error + Key(entry *netlink.IPSetEntry) string } +var DefaultKeyFunc = EntryKey + type ipSetWrapper struct { setName string + setType string + keyFunc func(setEntry *netlink.IPSetEntry) string +} + +type IpsetWrapperOption struct { + KeyFunc func(setEntry *netlink.IPSetEntry) string } -func New(setName string) (IPSetInterface, error) { - err := netlink.IpsetCreate(setName, "hash:net", netlink.IpsetCreateOptions{ +func New(setName, setTypeName string, options IpsetWrapperOption) (IPSetInterface, error) { + if options.KeyFunc == nil { + options.KeyFunc = DefaultKeyFunc + } + err := netlink.IpsetCreate(setName, setTypeName, netlink.IpsetCreateOptions{ Replace: true, }) if err != nil { @@ -50,7 +62,7 @@ func New(setName string) (IPSetInterface, error) { if klog.V(5).Enabled() { klog.V(5).InfoS("netlink.IpsetCreate succeeded", "setName", setName) } - return &ipSetWrapper{setName}, nil + return &ipSetWrapper{setName, setTypeName, options.KeyFunc}, nil } func (i *ipSetWrapper) List() (*netlink.IPSetResult, error) { @@ -72,11 +84,11 @@ func (i *ipSetWrapper) Name() string { func (i *ipSetWrapper) Add(entry *netlink.IPSetEntry) (err error) { err = netlink.IpsetAdd(i.Name(), entry) if err != nil { - klog.ErrorS(err, "error on netlink.IpsetAdd", "setName", i.Name(), "entry", SetEntryKey(entry)) + klog.ErrorS(err, "error on netlink.IpsetAdd", "setName", i.Name(), "entry", i.Key(entry)) return } if klog.V(5).Enabled() { - klog.V(5).InfoS("netlink.IpsetAdd succeeded", "setName", i.Name(), "entry", SetEntryKey(entry)) + klog.V(5).InfoS("netlink.IpsetAdd succeeded", "setName", i.Name(), "entry", i.Key(entry)) } return } @@ -84,11 +96,11 @@ func (i *ipSetWrapper) Add(entry *netlink.IPSetEntry) (err error) { func (i *ipSetWrapper) Del(entry *netlink.IPSetEntry) (err error) { err = netlink.IpsetDel(i.Name(), entry) if err != nil { - klog.ErrorS(err, "error on netlink.IpsetDel", "setName", i.Name(), "entry", SetEntryKey(entry)) + klog.ErrorS(err, "error on netlink.IpsetDel", "setName", i.Name(), "entry", i.Key) return } if klog.V(5).Enabled() { - klog.V(5).InfoS("netlink.IpsetDel succeeded", "setName", i.Name(), "entry", SetEntryKey(entry)) + klog.V(5).InfoS("netlink.IpsetDel succeeded", "setName", i.Name(), "entry", i.Key(entry)) } return } @@ -117,6 +129,10 @@ func (i *ipSetWrapper) Destroy() (err error) { return } -func SetEntryKey(setEntry *netlink.IPSetEntry) string { +func (i *ipSetWrapper) Key(entry *netlink.IPSetEntry) string { + return i.keyFunc(entry) +} + +func EntryKey(setEntry *netlink.IPSetEntry) string { return fmt.Sprintf("%s/%d", setEntry.IP.String(), setEntry.CIDR) } diff --git a/pkg/networkengine/util/netlink/netlink.go b/pkg/networkengine/util/netlink/netlink.go index 2c05366..a503f81 100644 --- a/pkg/networkengine/util/netlink/netlink.go +++ b/pkg/networkengine/util/netlink/netlink.go @@ -41,6 +41,7 @@ var ( RuleDel = ruleDel XfrmPolicyFlush = xfrmPolicyFlush + XfrmStateFlush = xfrmStateFlush NeighAdd = neighAdd NeighReplace = neighReplace @@ -127,6 +128,16 @@ func xfrmPolicyFlush() (err error) { return nil } +func xfrmStateFlush() (err error) { + err = netlink.XfrmStateFlush(0) + if err != nil { + klog.ErrorS(err, "error on netlink.XfrmStateFlush") + return + } + klog.V(5).InfoS("netlink.XfrmStateFlush succeeded") + return nil +} + func ruleListFiltered(family int, filter *netlink.Rule, filterMask uint64) (rules []netlink.Rule, err error) { rules, err = netlink.RuleListFiltered(family, filter, filterMask) if err != nil { diff --git a/pkg/networkengine/util/utils.go b/pkg/networkengine/util/utils.go index 297dcb5..1b9c834 100644 --- a/pkg/networkengine/util/utils.go +++ b/pkg/networkengine/util/utils.go @@ -21,7 +21,6 @@ package networkutil import ( "fmt" - "net" "syscall" "github.com/vdobler/ht/errorlist" @@ -32,11 +31,6 @@ import ( netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" ) -var ( - AllZeroMAC = net.HardwareAddr{0, 0, 0, 0, 0, 0} - AllZeroAddress = "0.0.0.0/0" -) - func NewRavenRule(rulePriority int, routeTableID int) *netlink.Rule { rule := netlink.NewRule() rule.Priority = rulePriority @@ -94,7 +88,7 @@ func ListIPSetOnNode(set ipsetutil.IPSetInterface) (map[string]*netlink.IPSetEnt } ro := make(map[string]*netlink.IPSetEntry) for i := range info.Entries { - ro[ipsetutil.SetEntryKey(&info.Entries[i])] = &info.Entries[i] + ro[set.Key(&info.Entries[i])] = &info.Entries[i] } return ro, nil } diff --git a/pkg/networkengine/vpndriver/libreswan/libreswan.go b/pkg/networkengine/vpndriver/libreswan/libreswan.go index 50395b0..d81b58d 100644 --- a/pkg/networkengine/vpndriver/libreswan/libreswan.go +++ b/pkg/networkengine/vpndriver/libreswan/libreswan.go @@ -17,10 +17,14 @@ package libreswan import ( + "bufio" + "bytes" "fmt" "os" "os/exec" + "regexp" "strconv" + "strings" "syscall" "time" @@ -28,6 +32,8 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/raven/cmd/agent/app/config" + networkutil "github.com/openyurtio/raven/pkg/networkengine/util" + ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" "github.com/openyurtio/raven/pkg/networkengine/vpndriver" @@ -40,12 +46,16 @@ const ( // DriverName specifies name of libreswan VPN backend driver. DriverName = "libreswan" + + IKESAESTABLISHED = "STATE_V2_ESTABLISHED_IKE_SA" + ChILDSAESTABLISHED = "STATE_V2_ESTABLISHED_CHILD_SA" ) var _ vpndriver.Driver = (*libreswan)(nil) // can be modified for testing. var whackCmd = whackCmdFn +var ipsecCmd = ipsecCmdFn var findCentralGw = vpndriver.FindCentralGwFn var enableCreateEdgeConnection = vpndriver.EnableCreateEdgeConnection @@ -58,11 +68,11 @@ const ( ) type libreswan struct { - relayConnections map[string]*vpndriver.Connection - edgeConnections map[string]*vpndriver.Connection + connections map[string]bool nodeName types.NodeName centralGw *types.Endpoint iptables iptablesutil.IPTablesInterface + ipset ipsetutil.IPSetInterface listenPort string keepaliveInterval int keepaliveTimeout int @@ -73,6 +83,11 @@ func (l *libreswan) Init() (err error) { if err != nil { return err } + l.ipset, err = ipsetutil.New(vpndriver.RavenSkipNatSet, vpndriver.RavenSkipNatSetType, ipsetutil.IpsetWrapperOption{}) + if err != nil { + return err + } + // Ensure secrets file _, err = os.Stat(SecretFile) if err == nil { @@ -95,8 +110,7 @@ func (l *libreswan) Init() (err error) { func New(cfg *config.Config) (vpndriver.Driver, error) { return &libreswan{ - relayConnections: make(map[string]*vpndriver.Connection), - edgeConnections: make(map[string]*vpndriver.Connection), + connections: make(map[string]bool), nodeName: types.NodeName(cfg.NodeName), listenPort: cfg.Tunnel.VPNPort, keepaliveInterval: cfg.Tunnel.KeepAliveInterval, @@ -110,12 +124,13 @@ func (l *libreswan) Apply(network *types.Network, routeDriverMTUFn func(*types.N return l.Cleanup() } if network.LocalEndpoint.NodeName != l.nodeName { - klog.Infof(utils.FormatTunnel("the current node is not gateway node, cleaning vpn connections")) + klog.Infof("the current node is not gateway node, cleaning vpn connections") return l.Cleanup() } - if err := l.createConnections(network); err != nil { - return fmt.Errorf("error create VPN tunnels: %v", err) + l.centralGw = findCentralGw(network) + if err := l.ensureConnections(network); err != nil { + return fmt.Errorf("error ensure VPN tunnels: %s", err.Error()) } return nil @@ -176,136 +191,181 @@ func (l *libreswan) getEndpointResolver(network *types.Network) func(centralGw, } } -func (l *libreswan) createConnections(network *types.Network) error { - l.centralGw = findCentralGw(network) +func (l *libreswan) ensureConnections(network *types.Network) error { + defer func() { + // wait connection is established + time.Sleep(5 * time.Second) + }() + + l.connections = currentConnections() + if err := l.deleteUnavailableConn(); err != nil { + return fmt.Errorf("delete unavailabel connections error %s", err.Error()) + } desiredEdgeConns, desiredRelayConns := l.computeDesiredConnections(network) - if len(desiredEdgeConns) == 0 && len(desiredRelayConns) == 0 { - klog.Infof(utils.FormatTunnel("no desired connections, cleaning vpn connections")) - return l.Cleanup() + klog.Infof("desired edge connections: %+v, desired relay connections: %+v", desiredEdgeConns, desiredRelayConns) + + if err := l.deleteUndesiredConn(desiredEdgeConns, desiredRelayConns); err != nil { + return fmt.Errorf("ensure delete undesired connections error %s", err.Error()) } - klog.Infof(utils.FormatTunnel("desired edge connections: %+v, desired relay connections: %+v", desiredEdgeConns, desiredRelayConns)) + if err := l.ensureEdgeConnections(desiredEdgeConns); err != nil { + return fmt.Errorf("ensure delete edge-edge connections error %s", err.Error()) + } - if err := l.createEdgeConnections(desiredEdgeConns); err != nil { - return err + if err := l.ensureRelayConnections(desiredRelayConns); err != nil { + return fmt.Errorf("ensure delete cloud-edge connections error %s", err.Error()) } - if err := l.createRelayConnections(desiredRelayConns); err != nil { - return err + + if err := l.ensureRavenNAT(network); err != nil { + return fmt.Errorf("ensure raven skip nat error %s", err.Error()) } return nil } -func (l *libreswan) createEdgeConnections(desiredEdgeConns map[string]*vpndriver.Connection) error { - if len(desiredEdgeConns) == 0 { - klog.Infof("no desired edge connections") - return nil +func currentConnections() map[string]bool { + connections := make(map[string]bool) + reg := regexp.MustCompile(`"([^"]+)"`) + out, err := ipsecCmd("auto", "--status") + if err != nil { + return connections + } + foundConnectionList := false + scanner := bufio.NewScanner(out) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "Connection list") { + foundConnectionList = true + continue + } + if foundConnectionList { + matches := reg.FindAllStringSubmatch(line, -1) + for _, match := range matches { + if len(match) > 1 { + connections[match[1]] = false + } + } + } } + for k := range connections { + out, err = ipsecCmd("whack", "--showstates") + if err != nil { + continue + } + foundIKESAEstablished := false + foundChildSAEstablished := false + scanner = bufio.NewScanner(out) + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, k) { + continue + } + if strings.Contains(line, IKESAESTABLISHED) { + foundIKESAEstablished = true + } + if strings.Contains(line, ChILDSAESTABLISHED) { + foundChildSAEstablished = true + } + } + if foundIKESAEstablished && foundChildSAEstablished { + connections[k] = true + } + } + return connections +} +func (l *libreswan) deleteUnavailableConn() error { errList := errorlist.List{} - - // remove unwanted connections - for connName := range l.edgeConnections { - if _, ok := desiredEdgeConns[connName]; !ok { + for connName, established := range l.connections { + if !established { err := l.whackDelConnection(connName) if err != nil { errList = errList.Append(err) - klog.ErrorS(err, "error disconnecting endpoint", "connectionName", connName) + klog.ErrorS(err, "error delete unavailable connection", "connectionName", connName) continue } - delete(l.edgeConnections, connName) + delete(l.connections, connName) } } - - // add new connections - for name, connection := range desiredEdgeConns { - err := l.connectToEdgeEndpoint(name, connection) - errList = errList.Append(err) - } - return errList.AsError() } -func (l *libreswan) createRelayConnections(desiredRelayConns map[string]*vpndriver.Connection) error { - if len(desiredRelayConns) == 0 { - klog.Infof("no desired relay connections") - return nil - } - +func (l *libreswan) deleteUndesiredConn(desiredEdgeConns, desiredRelayConns map[string]*vpndriver.Connection) error { errList := errorlist.List{} - - // remove unwanted connections - for connName := range l.relayConnections { - if _, ok := desiredRelayConns[connName]; !ok { + desireConn := make(map[string]struct{}) + for k := range desiredEdgeConns { + desireConn[k] = struct{}{} + } + for k := range desiredRelayConns { + desireConn[k] = struct{}{} + } + for connName := range l.connections { + if _, ok := desireConn[connName]; !ok { err := l.whackDelConnection(connName) if err != nil { errList = errList.Append(err) - klog.ErrorS(err, "error disconnecting endpoint", "connectionName", connName) + klog.ErrorS(err, "error delete undesired connection", "connectionName", connName) continue } - if l.centralGw.NodeName == l.nodeName { - if conn, ok := l.relayConnections[connName]; ok && conn != nil { - err := l.deleteRavenSkipNAT(conn) - if err != nil { - errList = errList.Append(err) - } - } - } - delete(l.relayConnections, connName) + delete(l.connections, connName) } } + return errList.AsError() +} + +func (l *libreswan) ensureEdgeConnections(desiredEdgeConns map[string]*vpndriver.Connection) error { + errList := errorlist.List{} + for name, connection := range desiredEdgeConns { + err := l.connectToEdgeEndpoint(name, connection) + errList = errList.Append(err) + } + return errList.AsError() +} - // add new connections +func (l *libreswan) ensureRelayConnections(desiredRelayConns map[string]*vpndriver.Connection) error { + errList := errorlist.List{} for name, connection := range desiredRelayConns { err := l.connectToEndpoint(name, connection) errList = errList.Append(err) - if l.centralGw.NodeName == l.nodeName { - err = l.ensureRavenSkipNAT(connection) - if err != nil { - errList = errList.Append(err) - } - } } - return errList.AsError() } -func (l *libreswan) ensureRavenSkipNAT(connection *vpndriver.Connection) errorlist.List { - errList := errorlist.List{} - for _, subnet := range l.centralGw.Subnets { - if connection.LocalSubnet == subnet || connection.RemoteSubnet == subnet { - return errList - } +func (l *libreswan) ensureRavenNAT(network *types.Network) error { + if !vpndriver.IsGatewayRole(network, l.nodeName) { + klog.Infof("node %s is not gateway, skip add skip nat", l.nodeName) + return nil } - // for raven skip nat - if err := l.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain); err != nil { - errList = errList.Append(fmt.Errorf("error create %s chain: %s", iptablesutil.RavenPostRoutingChain, err)) + + // The desired and current ipset entries calculated from given network. + // The key is ip set entry + var err error + l.ipset, err = ipsetutil.New(vpndriver.RavenSkipNatSet, vpndriver.RavenSkipNatSetType, ipsetutil.IpsetWrapperOption{KeyFunc: vpndriver.KeyFunc}) + if err != nil { + return fmt.Errorf("error ensure ipset %s, type %s", vpndriver.RavenSkipNatSet, vpndriver.RavenSkipNatSetType) } - if err := l.iptables.InsertIfNotExists(iptablesutil.NatTable, iptablesutil.PostRoutingChain, 1, "-m", "comment", "--comment", "raven traffic should skip NAT", "-j", iptablesutil.RavenPostRoutingChain); err != nil { - errList = errList.Append(fmt.Errorf("error adding chain %s rule: %s", iptablesutil.PostRoutingChain, err)) + currentSet, err := networkutil.ListIPSetOnNode(l.ipset) + if err != nil { + return fmt.Errorf("error listing ip set %s on node: %s", l.ipset.Name(), err.Error()) } - if err := l.iptables.AppendIfNotExists(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain, "-s", connection.LocalSubnet, "-d", connection.RemoteSubnet, "-j", "ACCEPT"); err != nil { - errList = errList.Append(fmt.Errorf("error adding chain %s rule: %s", iptablesutil.RavenPostRoutingChain, err)) + desiredSet := vpndriver.CalIPSetOnNode(network, l.nodeName, l.ipset) + err = networkutil.ApplyIPSet(l.ipset, currentSet, desiredSet) + if err != nil { + return fmt.Errorf("error applying ip set: %s", err) } - return errList -} -func (l *libreswan) deleteRavenSkipNAT(connection *vpndriver.Connection) errorlist.List { - errList := errorlist.List{} - err := l.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain) - if err != nil { - errList = errList.Append(fmt.Errorf("error create %s chain: %s", iptablesutil.PostRoutingChain, err)) + // for raven skip nat + if err = l.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain); err != nil { + return fmt.Errorf("error create %s chain: %s", iptablesutil.RavenPostRoutingChain, err) } - for _, subnet := range l.centralGw.Subnets { - if connection.LocalSubnet == subnet || connection.RemoteSubnet == subnet { - return errList - } + if err = l.iptables.InsertIfNotExists(iptablesutil.NatTable, iptablesutil.PostRoutingChain, 1, "-m", "comment", "--comment", "raven traffic should skip NAT", "-j", iptablesutil.RavenPostRoutingChain); err != nil { + return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.PostRoutingChain, err) } - err = l.iptables.DeleteIfExists(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain, "-s", connection.LocalSubnet, "-d", connection.RemoteSubnet, "-j", "ACCEPT") - if err != nil { - errList = errList.Append(fmt.Errorf("error deleting %s chain rule: %s", iptablesutil.RavenPostRoutingChain, err)) + if err = l.iptables.AppendIfNotExists(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain, "-m", "set", "--match-set", vpndriver.RavenSkipNatSet, "src,dst", "-j", "ACCEPT"); err != nil { + return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.RavenPostRoutingChain, err) } - return errList + + return nil } func (l *libreswan) computeDesiredConnections(network *types.Network) (map[string]*vpndriver.Connection, map[string]*vpndriver.Connection) { @@ -456,6 +516,24 @@ func whackCmdFn(args ...string) error { return nil } +func ipsecCmdFn(args ...string) (*bytes.Buffer, error) { + var err error + var output bytes.Buffer + for i := 0; i < 5; i++ { + cmd := exec.Command("ipsec", args...) + cmd.Stdout = &output + err = cmd.Run() + if err == nil { + break + } + time.Sleep(1 * time.Second) + } + if err != nil { + return nil, fmt.Errorf("error ipsec with %v, error %s", args, err.Error()) + } + return &output, nil +} + func (l *libreswan) whackDelConnection(conn string) error { return whackCmd("--delete", "--name", conn) } @@ -466,43 +544,36 @@ func connectionName(localID, remoteID, leftSubnet, rightSubnet string) string { func (l *libreswan) Cleanup() error { errList := errorlist.List{} - for name := range l.relayConnections { + connections := currentConnections() + for name := range connections { if err := l.whackDelConnection(name); err != nil { errList = errList.Append(err) klog.ErrorS(err, "fail to delete connection", "connectionName", name) } - if l.centralGw != nil && l.centralGw.NodeName == l.nodeName { - if conn, ok := l.relayConnections[name]; ok && conn != nil { - err := l.deleteRavenSkipNAT(conn) - if err != nil { - errList = errList.Append(err) - } - } - } } - for name := range l.edgeConnections { - if err := l.whackDelConnection(name); err != nil { - errList = errList.Append(err) - klog.ErrorS(err, "fail to delete connection", "connectionName", name) - } - } - l.relayConnections = make(map[string]*vpndriver.Connection) - l.edgeConnections = make(map[string]*vpndriver.Connection) err := netlinkutil.XfrmPolicyFlush() errList = errList.Append(err) + err = netlinkutil.XfrmStateFlush() + errList = errList.Append(err) + + err = vpndriver.CleanupRavenSkipNATIPSet() + if err != nil { + errList = errList.Append(fmt.Errorf("error cleanup ipset %s, %s", vpndriver.RavenSkipNatSet, err.Error())) + } err = l.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain) if err != nil { - errList = errList.Append(fmt.Errorf("error create %s chain: %s", iptablesutil.PostRoutingChain, err)) + errList = errList.Append(fmt.Errorf("error create %s chain: %s", iptablesutil.PostRoutingChain, err.Error())) } err = l.iptables.DeleteIfExists(iptablesutil.NatTable, iptablesutil.PostRoutingChain, "-m", "comment", "--comment", "raven traffic should skip NAT", "-j", iptablesutil.RavenPostRoutingChain) if err != nil { - errList = errList.Append(fmt.Errorf("error deleting %s chain rule: %s", iptablesutil.PostRoutingChain, err)) + errList = errList.Append(fmt.Errorf("error deleting %s chain rule: %s", iptablesutil.PostRoutingChain, err.Error())) } err = l.iptables.ClearAndDeleteChain(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain) if err != nil { - errList = errList.Append(fmt.Errorf("error deleting %s chain %s", iptablesutil.RavenPostRoutingChain, err)) + errList = errList.Append(fmt.Errorf("error deleting %s chain %s", iptablesutil.RavenPostRoutingChain, err.Error())) } + return errList.AsError() } @@ -542,8 +613,7 @@ func (l *libreswan) runPluto() error { func (l *libreswan) connectToEndpoint(name string, connection *vpndriver.Connection) errorlist.List { errList := errorlist.List{} - if _, ok := l.relayConnections[name]; ok { - klog.InfoS("skipping connect because connection already exists", "connectionName", name) + if _, ok := l.connections[name]; ok { return errList } err := l.whackConnectToEndpoint(name, connection) @@ -552,14 +622,12 @@ func (l *libreswan) connectToEndpoint(name string, connection *vpndriver.Connect klog.ErrorS(err, "error connect connection", "connectionName", name) return errList } - l.relayConnections[name] = connection return errList } func (l *libreswan) connectToEdgeEndpoint(name string, connection *vpndriver.Connection) errorlist.List { errList := errorlist.List{} - if _, ok := l.edgeConnections[name]; ok { - klog.InfoS("skipping connect because connection already exists", "connectionName", name) + if _, ok := l.connections[name]; ok { return errList } err := l.whackConnectToEdgeEndpoint(name, connection) @@ -568,6 +636,5 @@ func (l *libreswan) connectToEdgeEndpoint(name string, connection *vpndriver.Con klog.ErrorS(err, "error connect connection", "connectionName", name) return errList } - l.edgeConnections[name] = connection return errList } diff --git a/pkg/networkengine/vpndriver/libreswan/libreswan_test.go b/pkg/networkengine/vpndriver/libreswan/libreswan_test.go index 529963d..0d7e22e 100644 --- a/pkg/networkengine/vpndriver/libreswan/libreswan_test.go +++ b/pkg/networkengine/vpndriver/libreswan/libreswan_test.go @@ -25,7 +25,6 @@ import ( iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" - "github.com/openyurtio/raven/pkg/networkengine/vpndriver" "github.com/openyurtio/raven/pkg/types" ) @@ -372,9 +371,8 @@ func TestLibreswan_Apply(t *testing.T) { whackCmd = w.whackCmd a := assert.New(t) l := &libreswan{ - relayConnections: make(map[string]*vpndriver.Connection), - edgeConnections: make(map[string]*vpndriver.Connection), - nodeName: types.NodeName(v.nodeName), + connections: make(map[string]bool), + nodeName: types.NodeName(v.nodeName), } var err error l.iptables, err = iptablesutil.New() diff --git a/pkg/networkengine/vpndriver/vpnipset.go b/pkg/networkengine/vpndriver/vpnipset.go new file mode 100644 index 0000000..ee58bd7 --- /dev/null +++ b/pkg/networkengine/vpndriver/vpnipset.go @@ -0,0 +1,129 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vpndriver + +import ( + "fmt" + "net" + + "k8s.io/klog/v2" + + "github.com/EvilSuperstars/go-cidrman" + ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" + "github.com/openyurtio/raven/pkg/types" + "github.com/vdobler/ht/errorlist" + "github.com/vishvananda/netlink" +) + +const ( + RavenSkipNatSet = "raven-skip-nat-set" + RavenSkipNatSetType = "hash:net,net" +) + +var KeyFunc = func(entry *netlink.IPSetEntry) string { + return fmt.Sprintf("%s/%d-%s/%d", entry.IP.String(), entry.CIDR, entry.IP2.String(), entry.CIDR2) +} + +func IsGatewayRole(network *types.Network, nodeName types.NodeName) bool { + return network != nil && + network.LocalEndpoint != nil && + network.LocalEndpoint.NodeName == nodeName +} + +func IsCentreGatewayRole(centralGw *types.Endpoint, localNodeName types.NodeName) bool { + return centralGw != nil && centralGw.NodeName == localNodeName +} + +func CalIPSetOnNode(network *types.Network, nodeName types.NodeName, ipset ipsetutil.IPSetInterface) map[string]*netlink.IPSetEntry { + set := make(map[string]*netlink.IPSetEntry) + subnets := make([]string, 0) + for _, v := range network.RemoteNodeInfo { + nodeInfo := network.RemoteNodeInfo[types.NodeName(v.NodeName)] + if nodeInfo == nil { + klog.Errorf("node %s not found in RemoteNodeInfo", v.NodeName) + continue + } + subnets = append(subnets, nodeInfo.Subnets...) + } + var err error + subnets, err = cidrman.MergeCIDRs(subnets) + if err != nil { + return set + } + if IsCentreGatewayRole(FindCentralGwFn(network), nodeName) { + subnets = append(subnets, network.LocalEndpoint.Subnets...) + for _, srcCIDR := range subnets { + _, ipNet, err := net.ParseCIDR(srcCIDR) + if err != nil { + klog.Errorf("parse node subnet %s error %s", srcCIDR, err.Error()) + continue + } + ones, _ := ipNet.Mask.Size() + entry := &netlink.IPSetEntry{ + IP: ipNet.IP, + CIDR: uint8(ones), + IP2: ipNet.IP, + CIDR2: uint8(ones), + Replace: true, + } + set[ipset.Key(entry)] = entry + } + } else { + for _, localCIDR := range network.LocalEndpoint.Subnets { + _, localIPNet, err := net.ParseCIDR(localCIDR) + if err != nil { + klog.Errorf("parse node subnet %s error %s", localCIDR, err.Error()) + continue + } + localOnes, _ := localIPNet.Mask.Size() + for _, remoteCIDR := range subnets { + _, remoteIPNet, err := net.ParseCIDR(remoteCIDR) + if err != nil { + klog.Errorf("parse node subnet %s error %s", remoteCIDR, err.Error()) + continue + } + remoteOnes, _ := remoteIPNet.Mask.Size() + entry := &netlink.IPSetEntry{ + IP: localIPNet.IP, + CIDR: uint8(localOnes), + IP2: remoteIPNet.IP, + CIDR2: uint8(remoteOnes), + Replace: true, + } + set[ipset.Key(entry)] = entry + } + } + } + return set +} + +func CleanupRavenSkipNATIPSet() error { + errList := errorlist.List{} + ipset, err := ipsetutil.New(RavenSkipNatSet, RavenSkipNatSetType, ipsetutil.IpsetWrapperOption{}) + if err != nil { + errList = errList.Append(fmt.Errorf("error ensure ip set %s: %s", RavenSkipNatSet, err)) + } + err = ipset.Flush() + if err != nil { + errList = errList.Append(fmt.Errorf("error flushing ipset: %s", err)) + } + err = ipset.Destroy() + if err != nil { + errList = errList.Append(fmt.Errorf("error destroying ipset: %s", err)) + } + return errList.AsError() +} diff --git a/pkg/networkengine/vpndriver/wireguard/wireguard.go b/pkg/networkengine/vpndriver/wireguard/wireguard.go index e17b24c..e681e36 100644 --- a/pkg/networkengine/vpndriver/wireguard/wireguard.go +++ b/pkg/networkengine/vpndriver/wireguard/wireguard.go @@ -26,7 +26,6 @@ import ( "strconv" "time" - "github.com/openyurtio/api/raven/v1beta1" "github.com/pkg/errors" "github.com/vdobler/ht/errorlist" "github.com/vishvananda/netlink" @@ -36,8 +35,10 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" networkutil "github.com/openyurtio/raven/pkg/networkengine/util" + ipsetutil "github.com/openyurtio/raven/pkg/networkengine/util/ipset" iptablesutil "github.com/openyurtio/raven/pkg/networkengine/util/iptables" "github.com/openyurtio/raven/pkg/networkengine/vpndriver" "github.com/openyurtio/raven/pkg/types" @@ -61,6 +62,9 @@ const ( DeviceName = "raven-wg0" // DefaultListenPort specifies port of WireGuard listened. DefaultListenPort = 4500 + + ravenSkipNatSet = "raven-skip-nat-set" + ravenSkipNatSetType = "hash:net,net" ) var findCentralGw = vpndriver.FindCentralGwFn @@ -78,10 +82,10 @@ type wireguard struct { psk wgtypes.Key wgLink netlink.Link - relayConnections map[string]*vpndriver.Connection - edgeConnections map[string]*vpndriver.Connection iptables iptablesutil.IPTablesInterface + ipset ipsetutil.IPSetInterface nodeName types.NodeName + centralGw *types.Endpoint ravenClient client.Client listenPort int keepaliveInterval int @@ -93,8 +97,6 @@ func New(cfg *config.Config) (vpndriver.Driver, error) { port = DefaultListenPort } return &wireguard{ - relayConnections: make(map[string]*vpndriver.Connection), - edgeConnections: make(map[string]*vpndriver.Connection), nodeName: types.NodeName(cfg.NodeName), ravenClient: cfg.Manager.GetClient(), listenPort: port, @@ -217,56 +219,78 @@ func (w *wireguard) ensureWgLink(network *types.Network, routeDriverMTUFn func(* return nil } -func (w *wireguard) createConnections(network *types.Network) error { +func (w *wireguard) ensureConnections(network *types.Network) error { desiredEdgeConns, desiredRelayConns, centralAllowedIPs := w.computeDesiredConnections(network) if len(desiredEdgeConns) == 0 && len(desiredRelayConns) == 0 { klog.Infof("no desired connections, cleaning vpn connections") return w.Cleanup() } - klog.Infof("desired edge connections: %+v, desired relay connections: %+v", desiredEdgeConns, desiredRelayConns) - centralGw := findCentralGw(network) - if err := w.createEdgeConnections(desiredEdgeConns); err != nil { - return err + var err error + + peers := w.currentPeers() + klog.Infof("current peers: %v", peers) + + if err = w.deleteUndesiredPeers(peers, desiredEdgeConns, desiredRelayConns); err != nil { + return fmt.Errorf("ensure edge-edge peers error %s", err.Error()) } - if err := w.createRelayConnections(desiredRelayConns, centralAllowedIPs, centralGw); err != nil { - return err + + if err = w.ensureEdgePeers(desiredEdgeConns); err != nil { + return fmt.Errorf("ensure edge-edge peers error %s", err.Error()) + } + if err = w.ensureRelayPeers(desiredRelayConns, centralAllowedIPs); err != nil { + return fmt.Errorf("ensure cloud-edge peers error %s", err.Error()) + } + + if err = w.ensureRavenNAT(network); err != nil { + return fmt.Errorf("ensure raven skip nat error %s", err.Error()) } return nil } -func (w *wireguard) createEdgeConnections(desiredEdgeConns map[string]*vpndriver.Connection) error { - if len(desiredEdgeConns) == 0 { - klog.Infof("no desired edge connections") - return nil +func (w *wireguard) currentPeers() map[string]wgtypes.Peer { + set := make(map[string]wgtypes.Peer) + dev, err := w.wgClient.Device(DeviceName) + if err != nil { + klog.Errorf("can not found wireguard device %s, error %s", DeviceName, err.Error()) + return set } + for _, peer := range dev.Peers { + set[peer.PublicKey.String()] = peer + } + return set +} - for connName, connection := range w.edgeConnections { - if _, ok := desiredEdgeConns[connName]; !ok { - remoteKey := keyFromEndpoint(connection.RemoteEndpoint) - if err := w.removePeer(remoteKey); err == nil { - delete(w.edgeConnections, connName) - } +func (w *wireguard) deleteUndesiredPeers(currentConns map[string]wgtypes.Peer, desiredEdgeConns, desiredRelayConns map[string]*vpndriver.Connection) error { + errList := errorlist.List{} + desiredPeers := make(map[string]struct{}) + for _, connection := range desiredEdgeConns { + desiredPeers[keyFromEndpoint(connection.RemoteEndpoint).String()] = struct{}{} + } + for _, connection := range desiredRelayConns { + desiredPeers[keyFromEndpoint(connection.RemoteEndpoint).String()] = struct{}{} + } + var err error + for key, peer := range currentConns { + if _, ok := desiredPeers[key]; !ok { + err = w.removePeer(&peer.PublicKey) + errList = errList.Append(err) } } + return errList.AsError() +} +func (w *wireguard) ensureEdgePeers(desiredEdgeConns map[string]*vpndriver.Connection) error { + if len(desiredEdgeConns) == 0 { + klog.Infof("no desired edge connections") + return nil + } peerConfigs := make([]wgtypes.PeerConfig, 0) - for name, newConn := range desiredEdgeConns { - newKey := keyFromEndpoint(newConn.RemoteEndpoint) - - if oldConn, ok := w.edgeConnections[name]; ok { - oldKey := keyFromEndpoint(oldConn.RemoteEndpoint) - if oldKey.String() != newKey.String() { - if err := w.removePeer(oldKey); err == nil { - delete(w.edgeConnections, name) - } - } - } - + for _, newConn := range desiredEdgeConns { klog.InfoS("create edge-to-edge connection", "c", newConn) - + newKey := keyFromEndpoint(newConn.RemoteEndpoint) allowedIPs := parseSubnets(newConn.RemoteEndpoint.Subnets) ka := time.Duration(w.keepaliveInterval) var remotePort int @@ -284,59 +308,29 @@ func (w *wireguard) createEdgeConnections(desiredEdgeConns map[string]*vpndriver IP: net.ParseIP(newConn.RemoteEndpoint.PublicIP), Port: remotePort, }, - PersistentKeepaliveInterval: &ka, ReplaceAllowedIPs: true, AllowedIPs: allowedIPs, }) } - - if err := w.wgClient.ConfigureDevice(DeviceName, wgtypes.Config{ + return w.wgClient.ConfigureDevice(DeviceName, wgtypes.Config{ ReplacePeers: true, Peers: peerConfigs, - }); err != nil { - return fmt.Errorf("error add peers: %v", err) - } - - w.edgeConnections = desiredEdgeConns - - return nil + }) } -func (w *wireguard) createRelayConnections(desiredRelayConns map[string]*vpndriver.Connection, centralAllowedIPs []string, centralGw *types.Endpoint) error { +func (w *wireguard) ensureRelayPeers(desiredRelayConns map[string]*vpndriver.Connection, centralAllowedIPs []string) error { if len(desiredRelayConns) == 0 { klog.Infof("no desired relay connections") return nil } - - // delete unwanted connections - for connName, connection := range w.relayConnections { - if _, ok := desiredRelayConns[connName]; !ok { - remoteKey := keyFromEndpoint(connection.RemoteEndpoint) - if err := w.removePeer(remoteKey); err == nil { - delete(w.relayConnections, connName) - } - } - } - // add or update connections peerConfigs := make([]wgtypes.PeerConfig, 0) - for name, newConn := range desiredRelayConns { - newKey := keyFromEndpoint(newConn.RemoteEndpoint) - - if oldConn, ok := w.relayConnections[name]; ok { - oldKey := keyFromEndpoint(oldConn.RemoteEndpoint) - if oldKey.String() != newKey.String() { - if err := w.removePeer(oldKey); err == nil { - delete(w.relayConnections, name) - } - } - } - + for _, newConn := range desiredRelayConns { klog.InfoS("create connection", "c", newConn) - + newKey := keyFromEndpoint(newConn.RemoteEndpoint) allowedIPs := parseSubnets(newConn.RemoteEndpoint.Subnets) - if newConn.RemoteEndpoint.NodeName == centralGw.NodeName { + if w.centralGw != nil && newConn.RemoteEndpoint.NodeName == w.centralGw.NodeName { allowedIPs = append(allowedIPs, parseSubnets(centralAllowedIPs)...) } @@ -357,16 +351,10 @@ func (w *wireguard) createRelayConnections(desiredRelayConns map[string]*vpndriv }) } - if err := w.wgClient.ConfigureDevice(DeviceName, wgtypes.Config{ + return w.wgClient.ConfigureDevice(DeviceName, wgtypes.Config{ ReplacePeers: false, Peers: peerConfigs, - }); err != nil { - return fmt.Errorf("error add peers: %v", err) - } - - w.relayConnections = desiredRelayConns - - return nil + }) } func (w *wireguard) Apply(network *types.Network, routeDriverMTUFn func(*types.Network) (int, error)) error { @@ -378,7 +366,7 @@ func (w *wireguard) Apply(network *types.Network, routeDriverMTUFn func(*types.N klog.Infof("the current node is not gateway node, cleaning vpn connections") return w.Cleanup() } - + w.centralGw = findCentralGw(network) if _, ok := network.LocalEndpoint.Config[PublicKey]; !ok || network.LocalEndpoint.Config[PublicKey] != w.privateKey.PublicKey().String() { err := w.configGatewayPublicKey(string(network.LocalEndpoint.GatewayName), string(network.LocalEndpoint.NodeName)) if err != nil { @@ -387,24 +375,17 @@ func (w *wireguard) Apply(network *types.Network, routeDriverMTUFn func(*types.N return errors.New("retry to config public key") } - centralGw := findCentralGw(network) - if centralGw.NodeName == w.nodeName { - if err := w.ensureRavenSkipNAT(); err != nil { - return fmt.Errorf("error ensure raven skip nat: %s", err) - } - } - if err := w.ensureWgLink(network, routeDriverMTUFn); err != nil { - return fmt.Errorf("fail to ensure wireguar link: %v", err) + return fmt.Errorf("fail to ensure wireguar link: %s", err.Error()) } // 3. Config device route and rules currentRoutes, err := networkutil.ListRoutesOnNode(wgRouteTableID) if err != nil { - return fmt.Errorf("error listing wireguard routes on node: %s", err) + return fmt.Errorf("error listing wireguard routes on node: %s", err.Error()) } currentRules, err := networkutil.ListRulesOnNode(wgRouteTableID) if err != nil { - return fmt.Errorf("error listing wireguard rules on node: %s", err) + return fmt.Errorf("error listing wireguard rules on node: %s", err.Error()) } desiredRoutes := w.calWgRoutes(network) @@ -412,15 +393,52 @@ func (w *wireguard) Apply(network *types.Network, routeDriverMTUFn func(*types.N err = networkutil.ApplyRoutes(currentRoutes, desiredRoutes) if err != nil { - return fmt.Errorf("error applying wireguard routes: %s", err) + return fmt.Errorf("error applying wireguard routes: %s", err.Error()) } err = networkutil.ApplyRules(currentRules, desiredRules) if err != nil { - return fmt.Errorf("error applying wireguard rules: %s", err) + return fmt.Errorf("error applying wireguard rules: %s", err.Error()) + } + + if err = w.ensureConnections(network); err != nil { + return fmt.Errorf("error ensure VPN tunnels: %s", err.Error()) } - if err := w.createConnections(network); err != nil { - return fmt.Errorf("error create VPN tunnels: %v", err) + return nil +} + +func (w *wireguard) ensureRavenNAT(network *types.Network) error { + if !vpndriver.IsGatewayRole(network, w.nodeName) { + klog.Infof("node %s is not gateway, skip add skip nat", w.nodeName) + return nil + } + + // The desired and current ipset entries calculated from given network. + // The key is ip set entry + var err error + w.ipset, err = ipsetutil.New(ravenSkipNatSet, ravenSkipNatSetType, ipsetutil.IpsetWrapperOption{KeyFunc: vpndriver.KeyFunc}) + if err != nil { + return fmt.Errorf("error new ipset %s, type %s", vpndriver.RavenSkipNatSet, vpndriver.RavenSkipNatSetType) + } + currentSet, err := networkutil.ListIPSetOnNode(w.ipset) + if err != nil { + return fmt.Errorf("error listing ip set %s on node: %s", w.ipset.Name(), err.Error()) + } + desiredSet := vpndriver.CalIPSetOnNode(network, w.nodeName, w.ipset) + err = networkutil.ApplyIPSet(w.ipset, currentSet, desiredSet) + if err != nil { + return fmt.Errorf("error applying ip set: %s", err) + } + + // for raven skip nat + if err = w.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain); err != nil { + return fmt.Errorf("error create %s chain: %s", iptablesutil.RavenPostRoutingChain, err) + } + if err = w.iptables.InsertIfNotExists(iptablesutil.NatTable, iptablesutil.PostRoutingChain, 1, "-m", "comment", "--comment", "raven traffic should skip NAT", "-o", DeviceName, "-j", iptablesutil.RavenPostRoutingChain); err != nil { + return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.PostRoutingChain, err) + } + if err = w.iptables.AppendIfNotExists(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain, "-m", "set", "--match-set", vpndriver.RavenSkipNatSet, "src,dst", "-j", "ACCEPT"); err != nil { + return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.RavenPostRoutingChain, err) } return nil @@ -457,12 +475,24 @@ func (w *wireguard) Cleanup() error { errList = errList.Append(fmt.Errorf("error delete existing wireguard device %q: %v", DeviceName, err)) } - if err = w.deleteRavenSkipNAT(); err != nil { - errList = errList.Append(err) + err = vpndriver.CleanupRavenSkipNATIPSet() + if err != nil { + errList = errList.Append(fmt.Errorf("error cleanup ipset %s, %s", vpndriver.RavenSkipNatSet, err.Error())) + } + + err = w.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain) + if err != nil { + errList = errList.Append(fmt.Errorf("error create %s chain: %s", iptablesutil.PostRoutingChain, err)) + } + err = w.iptables.DeleteIfExists(iptablesutil.NatTable, iptablesutil.PostRoutingChain, "-m", "comment", "--comment", "raven traffic should skip NAT", "-o", DeviceName, "-j", iptablesutil.RavenPostRoutingChain) + if err != nil { + errList = errList.Append(fmt.Errorf("error deleting %s chain rule: %s", iptablesutil.PostRoutingChain, err)) + } + err = w.iptables.ClearAndDeleteChain(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain) + if err != nil { + errList = errList.Append(fmt.Errorf("error deleting %s chain %s", iptablesutil.RavenPostRoutingChain, err)) } - w.relayConnections = make(map[string]*vpndriver.Connection) - w.edgeConnections = make(map[string]*vpndriver.Connection) return errList.AsError() } @@ -604,32 +634,3 @@ func parseSubnets(subnets []string) []net.IPNet { } return nets } - -func (w *wireguard) ensureRavenSkipNAT() error { - // for raven skip nat - if err := w.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain); err != nil { - return fmt.Errorf("error create %s chain: %s", iptablesutil.RavenPostRoutingChain, err) - } - if err := w.iptables.InsertIfNotExists(iptablesutil.NatTable, iptablesutil.PostRoutingChain, 1, "-m", "comment", "--comment", "raven traffic should skip NAT", "-o", "raven-wg0", "-j", iptablesutil.RavenPostRoutingChain); err != nil { - return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.PostRoutingChain, err) - } - if err := w.iptables.AppendIfNotExists(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain, "-j", "ACCEPT"); err != nil { - return fmt.Errorf("error adding chain %s rule: %s", iptablesutil.RavenPostRoutingChain, err) - } - - return nil -} - -func (w *wireguard) deleteRavenSkipNAT() error { - if err := w.iptables.NewChainIfNotExist(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain); err != nil { - return fmt.Errorf("error create %s chain: %s", iptablesutil.PostRoutingChain, err) - } - if err := w.iptables.DeleteIfExists(iptablesutil.NatTable, iptablesutil.PostRoutingChain, "-m", "comment", "--comment", "raven traffic should skip NAT", "-o", "raven-wg0", "-j", iptablesutil.RavenPostRoutingChain); err != nil { - return fmt.Errorf("error deleting %s chain rule: %s", iptablesutil.PostRoutingChain, err) - } - if err := w.iptables.ClearAndDeleteChain(iptablesutil.NatTable, iptablesutil.RavenPostRoutingChain); err != nil { - return fmt.Errorf("error deleting %s chain %s", iptablesutil.RavenPostRoutingChain, err) - } - - return nil -} diff --git a/pkg/proxyengine/proxyclient/proxyclient.go b/pkg/proxyengine/proxyclient/proxyclient.go index 6639b10..61601a7 100644 --- a/pkg/proxyengine/proxyclient/proxyclient.go +++ b/pkg/proxyengine/proxyclient/proxyclient.go @@ -74,7 +74,7 @@ func (c *ProxyClient) Start(ctx context.Context) error { } clientCertManager, err := factory.NewCertManagerFactory(c.client).New(certMgrCfg) if err != nil { - klog.Errorf(utils.FormatProxyClient("failed to new cert manager factory for proxy client %s, error %s", c.name, err.Error())) + klog.Errorf("failed to new cert manager factory for proxy client %s, error %s", c.name, err.Error()) return fmt.Errorf("failed to new cert manager factory for proxy client %s, error %s", c.name, err.Error()) } clientCertManager.Start() @@ -83,13 +83,13 @@ func (c *ProxyClient) Start(ctx context.Context) error { if clientCertManager.Current() != nil { return true, nil } - klog.Infof(utils.FormatProxyClient("certificate %s not signed, waiting...", certMgrCfg.CommonName)) + klog.Infof("certificate %s not signed, waiting...", certMgrCfg.CommonName) return false, nil }, ctx.Done()) for addr := range c.servers { tlsCfg, err := certmanager.GenTLSConfigUseCertMgrAndCA(clientCertManager, addr, utils.RavenCAFile) if err != nil { - klog.Error(utils.FormatProxyClient("failed to generate TLS Config")) + klog.Error("failed to generate TLS Config") return fmt.Errorf("failed to generate TLS Config") } c.servers[addr] = tlsCfg @@ -105,7 +105,7 @@ func (c *ProxyClient) run(stopCh <-chan struct{}) { for addr, cert := range c.servers { client := c.NewClient(addr, cert, stopCh) client.Serve() - klog.Infof(utils.FormatProxyClient("start serving grpc request redirected from %s", addr)) + klog.Infof("start serving grpc request redirected from %s", addr) } } diff --git a/pkg/proxyengine/proxyserver/interceptor.go b/pkg/proxyengine/proxyserver/interceptor.go index 03b9dac..b1e397f 100644 --- a/pkg/proxyengine/proxyserver/interceptor.go +++ b/pkg/proxyengine/proxyserver/interceptor.go @@ -158,10 +158,10 @@ func (c *Interceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) { // serverRequest serves the normal requests, e.g., kubectl logs func serveRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) { - klog.Info(utils.FormatProxyServer("interceptor: start serving request %s with header: host %s, proxy mode: %s", - r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])) - defer klog.Info(utils.FormatProxyServer("interceptor: stop serving request %s with header: host %s, proxy mode: %s", - r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])) + klog.Infof("interceptor: start serving request %s with header: host %s, proxy mode: %s", + r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]) + defer klog.Infof("interceptor: stop serving request %s with header: host %s, proxy mode: %s", + r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]) br := newBufioReader(conn) defer putBufioReader(br) resp, err := http.ReadResponse(br, r) @@ -189,10 +189,10 @@ func serveRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) { ctx := r.Context() select { case <-stopCh: - klog.Info(utils.FormatProxyServer("chunked request(%s) normally exit", r.URL.String())) + klog.Info("chunked request(%s) normally exit", r.URL.String()) case <-ctx.Done(): - klog.Info(utils.FormatProxyServer("chunked request(%s) to agent(%s) closed by cloud client, %v", r.URL.String(), - r.Header.Get(utils.RavenProxyHostHeaderKey), ctx.Err())) + klog.Info("chunked request(%s) to agent(%s) closed by cloud client, %v", r.URL.String(), + r.Header.Get(utils.RavenProxyHostHeaderKey), ctx.Err()) conn.Close() } }(r, conn, stopCh) @@ -203,15 +203,15 @@ func serveRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) { } _, err = io.Copy(writer, resp.Body) if err != nil && !isHTTPCloseError(err) { - klog.ErrorS(err, utils.FormatProxyServer("failed to copy response from proxy server to the frontend")) + klog.ErrorS(err, "failed to copy response from proxy server to the frontend") } } func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) { - klog.Info(utils.FormatProxyServer("interceptor: start serving streaming request %s with header: host %s, proxy mode: %s", - r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])) - defer klog.Info(utils.FormatProxyServer("interceptor: stop serving streaming request %s with header: host %s, proxy mode: %s", - r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey])) + klog.Infof("interceptor: start serving streaming request %s with header: host %s, proxy mode: %s", + r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]) + defer klog.Infof("interceptor: stop serving streaming request %s with header: host %s, proxy mode: %s", + r.URL.String(), r.Header[utils.RavenProxyHostHeaderKey], r.Header[utils.RavenProxyServerForwardModeHeaderKey]) resp, rawResp, err := getResponse(conn) if err != nil { @@ -233,21 +233,21 @@ func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) if resp.StatusCode != http.StatusSwitchingProtocols { deadline := time.Now().Add(10 * time.Second) if err = conn.SetReadDeadline(deadline); err != nil { - klog.Errorf(utils.FormatProxyServer("failed set proxy connect deadline, error %s", err.Error())) + klog.Errorf("failed set proxy connect deadline, error %s", err.Error()) } if err = frontend.SetReadDeadline(deadline); err != nil { - klog.Errorf(utils.FormatProxyServer("failed set frontend connect deadline, error %s", err.Error())) + klog.Errorf("failed set frontend connect deadline, error %s", err.Error()) } err = resp.Write(frontend) if err != nil && !isHTTPCloseError(err) { - klog.Errorf(utils.FormatProxyServer("error proxying un-upgrade response from proxy channel to frontend: %s", err.Error())) + klog.Errorf("error proxying un-upgrade response from proxy channel to frontend: %s", err.Error()) } return } if len(rawResp) > 0 { if _, err = frontend.Write(rawResp); err != nil { - klog.Errorf(utils.FormatProxyServer("error proxying response bytes from tunnel to client: %s", err.Error())) + klog.Errorf("error proxying response bytes from tunnel to client: %s", err.Error()) } } @@ -255,7 +255,7 @@ func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) go func() { _, err = io.Copy(conn, frontend) if err != nil && !isHTTPCloseError(err) { - klog.Errorf(utils.FormatProxyServer("error proxying data from frontend to proxy channel: %s", err.Error())) + klog.Errorf("error proxying data from frontend to proxy channel: %s", err.Error()) } close(writerComplete) }() @@ -263,7 +263,7 @@ func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) go func() { _, err = io.Copy(frontend, conn) if err != nil && !isHTTPCloseError(err) { - klog.Errorf(utils.FormatProxyServer("error proxying data from proxy channel to frontend: %s", err.Error())) + klog.Errorf("error proxying data from proxy channel to frontend: %s", err.Error()) } close(readerComplete) }() @@ -276,7 +276,7 @@ func serveUpgradeRequest(conn net.Conn, w http.ResponseWriter, r *http.Request) func logAndHTTPError(w http.ResponseWriter, errCode int, format string, i ...interface{}) { errMsg := fmt.Sprintf(format, i...) - klog.Error(utils.FormatProxyServer(errMsg)) + klog.Error(errMsg) http.Error(w, errMsg, errCode) } diff --git a/pkg/proxyengine/proxyserver/manageheader.go b/pkg/proxyengine/proxyserver/manageheader.go index eb7c35b..681e27c 100644 --- a/pkg/proxyengine/proxyserver/manageheader.go +++ b/pkg/proxyengine/proxyserver/manageheader.go @@ -67,21 +67,21 @@ func NewHeaderManager(client client.Client, gatewayName string, isIPv4 bool) Wra func (h *headerManger) Handler(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r == nil { - klog.Errorf(utils.FormatProxyServer("request is nil, skip it")) + klog.Errorf("request is nil, skip it") return } oldHost := r.Host var host, ip, port string var err error if isAPIServerRequest(r) { - klog.Info(utils.FormatProxyServer("request from apiserver with host %s and url %s is processed by header manager", oldHost, r.URL.String())) + klog.Infof("request from apiserver with host %s and url %s is processed by header manager", oldHost, r.URL.String()) host, ip, port, err = h.getAPIServerRequestDestAddress(r) if err != nil { logAndHTTPError(w, http.StatusBadRequest, "request host %s and url %s is invalid, %s", r.Host, r.URL.String(), err.Error()) return } } else { - klog.Info(utils.FormatProxyServer("normal request with host %s and url %s is processed by header manager", oldHost, r.URL.String())) + klog.Infof("normal request with host %s and url %s is processed by header manager", oldHost, r.URL.String()) host, ip, port, err = h.getNormalRequestDestAddress(r) if err != nil { logAndHTTPError(w, http.StatusBadRequest, "request host %s and url %s is invalid, %s", r.Host, r.URL.String(), err.Error()) @@ -117,11 +117,11 @@ func (h *headerManger) Handler(handler http.Handler) http.Handler { metrics.Metrics.IncInFlightRequests(r.Method, r.URL.Path) defer metrics.Metrics.DecInFlightRequests(r.Method, r.URL.Path) - klog.Infoln(utils.FormatProxyServer("start handling request %s %s, req.Host changed from %s to %s, remote address is %s", - r.Method, r.URL.String(), oldHost, r.Host, r.RemoteAddr)) + klog.Infof("start handling request %s %s, req.Host changed from %s to %s, remote address is %s", + r.Method, r.URL.String(), oldHost, r.Host, r.RemoteAddr) start := time.Now() handler.ServeHTTP(w, r) - klog.Infoln(utils.FormatProxyServer("finish handle request %s %s, handle lasts %v", r.Method, r.URL.String(), time.Since(start))) + klog.Infof("finish handle request %s %s, handle lasts %v", r.Method, r.URL.String(), time.Since(start)) }) } @@ -170,7 +170,7 @@ func (h *headerManger) getNormalRequestDestAddress(r *http.Request) (name, ip, p } ipAddress := net.ParseIP(nodeName) if ipAddress != nil { - klog.Warning(utils.FormatProxyServer("raven proxy server not support dest address %s and request.URL is %s", ipAddress, r.URL.String())) + klog.Warningf("raven proxy server not support dest address %s and request.URL is %s", ipAddress, r.URL.String()) return "", "", "", nil } var node v1.Node diff --git a/pkg/proxyengine/proxyserver/proxyserver.go b/pkg/proxyengine/proxyserver/proxyserver.go index a3047fb..76d8f43 100644 --- a/pkg/proxyengine/proxyserver/proxyserver.go +++ b/pkg/proxyengine/proxyserver/proxyserver.go @@ -149,11 +149,11 @@ func (c *ProxyServer) Start(ctx context.Context) error { if serverCertMgr.Current() != nil && proxyCertMgr.Current() != nil { return true, nil } - klog.Infof(utils.FormatProxyServer("certificate %s and %s not signed, waiting...", serverCertCfg.ComponentName, proxyCertCfg.ComponentName)) + klog.Infof("certificate %s and %s not signed, waiting...", serverCertCfg.ComponentName, proxyCertCfg.ComponentName) return false, nil }, ctx.Done()) - klog.Infof(utils.FormatProxyServer("certificate %s and %s ok", serverCertCfg.ComponentName, proxyCertCfg.ComponentName)) + klog.Infof("certificate %s and %s ok", serverCertCfg.ComponentName, proxyCertCfg.ComponentName) c.serverTLSConfig, err = certmanager.GenTLSConfigUseCurrentCertAndCertPool(serverCertMgr.Current, c.rootCert, "server") if err != nil { return err @@ -171,7 +171,7 @@ func (c *ProxyServer) Start(ctx context.Context) error { } func (c *ProxyServer) runServers(ctx context.Context) error { - klog.Info(utils.FormatProxyServer("start proxy server")) + klog.Info("start proxy server") strategy := []anpserver.ProxyStrategy{anpserver.ProxyStrategyDestHost} proxyServer := anpserver.NewProxyServer(c.nodeName, strategy, 1, &anpserver.AgentTokenAuthenticationOptions{}) NewProxies(&anpserver.Tunnel{Server: proxyServer}, c.interceptorUDSFile).Run(ctx) @@ -192,8 +192,8 @@ func (c *ProxyServer) getProxyServerIPsAndDNSName() (dnsName []string, ipAddr [] var svc v1.Service err := c.client.Get(context.TODO(), types.NamespacedName{Namespace: utils.WorkingNamespace, Name: utils.GatewayProxyInternalService}, &svc) if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to get internal service %s/%s to get proxy server IPs and DNSNames, error %s", - svc.GetNamespace(), svc.GetName(), err.Error())) + klog.Errorf("failed to get internal service %s/%s to get proxy server IPs and DNSNames, error %s", + svc.GetNamespace(), svc.GetName(), err.Error()) return } dnsName = append(dnsName, getDefaultDomainsForSvc(svc.GetNamespace(), svc.GetName())...) @@ -209,8 +209,8 @@ func (c *ProxyServer) getProxyServerIPsAndDNSName() (dnsName []string, ipAddr [] }.AsSelector(), }) if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to get public serivce for gateway %s, node %s to get proxy server IPs and DNSNames, error %s", - c.gateway.GetName(), c.nodeName, err.Error())) + klog.Errorf("failed to get public serivce for gateway %s, node %s to get proxy server IPs and DNSNames, error %s", + c.gateway.GetName(), c.nodeName, err.Error()) return } diff --git a/pkg/proxyengine/proxyserver/servers.go b/pkg/proxyengine/proxyserver/servers.go index 3d7b668..48d75cc 100644 --- a/pkg/proxyengine/proxyserver/servers.go +++ b/pkg/proxyengine/proxyserver/servers.go @@ -23,7 +23,6 @@ import ( "net/http" "time" - "github.com/openyurtio/raven/pkg/utils" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -47,8 +46,8 @@ func NewProxies(handler http.Handler, udsFile string) Server { func (p *proxies) Run(ctx context.Context) { go func(ctx context.Context) { - klog.Info(utils.FormatProxyServer("start listen unix %s", p.udsSockFile)) - defer klog.Info(utils.FormatProxyServer("finish listen unix %s", p.udsSockFile)) + klog.Info("start listen unix %s", p.udsSockFile) + defer klog.Info("finish listen unix %s", p.udsSockFile) server := &http.Server{ Handler: p.handler, ReadTimeout: 10 * time.Second, @@ -85,8 +84,8 @@ func NewMaster(handler http.Handler, tlsCfg *tls.Config, secureAddr, insecureAdd func (m *master) Run(ctx context.Context) { go func(ctx context.Context) { - klog.Info(utils.FormatProxyServer("start handling https request from master at %s", m.secureAddr)) - defer klog.Info(utils.FormatProxyServer("finish handling https request from master at %s", m.secureAddr)) + klog.Info("start handling https request from master at %s", m.secureAddr) + defer klog.Info("finish handling https request from master at %s", m.secureAddr) server := http.Server{ Addr: m.secureAddr, Handler: m.handler, @@ -102,13 +101,13 @@ func (m *master) Run(ctx context.Context) { } }(ctx) if err := server.ListenAndServeTLS("", ""); err != nil { - klog.Errorf(utils.FormatProxyServer("failed to serve https request from master: %s", err.Error())) + klog.Errorf("failed to serve https request from master: %s", err.Error()) } }(ctx) go func(ctx context.Context) { - klog.Infof(utils.FormatProxyServer("start handling https request from master at %s", m.insecureAddr)) - defer klog.Infof(utils.FormatProxyServer("finish handling https request from master at %s", m.insecureAddr)) + klog.Infof("start handling https request from master at %s", m.insecureAddr) + defer klog.Infof("finish handling https request from master at %s", m.insecureAddr) server := http.Server{ Addr: m.insecureAddr, Handler: m.handler, @@ -123,7 +122,7 @@ func (m *master) Run(ctx context.Context) { } }(ctx) if err := server.ListenAndServe(); err != nil { - klog.Errorf(utils.FormatProxyServer("failed to serve https request from master: %s", err.Error())) + klog.Errorf("failed to serve https request from master: %s", err.Error()) } }(ctx) } @@ -140,8 +139,8 @@ func NewAgent(tlsCfg *tls.Config, proxyServer *anpserver.ProxyServer, address st func (c *agent) Run(ctx context.Context) { go func(ctx context.Context) { - klog.Info(utils.FormatProxyServer("start handling grpc request from proxy client at %s", c.address)) - defer klog.Info(utils.FormatProxyServer("finish handling grpc request from proxy client at %s", c.address)) + klog.Info("start handling grpc request from proxy client at %s", c.address) + defer klog.Info("finish handling grpc request from proxy client at %s", c.address) ka := keepalive.ServerParameters{ MaxConnectionIdle: 10 * time.Minute, Time: 10 * time.Second, @@ -151,7 +150,7 @@ func (c *agent) Run(ctx context.Context) { anpagent.RegisterAgentServiceServer(grpcServer, c.proxyServer) listen, err := net.Listen("tcp", c.address) if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to listen to agent on %s: %s", c.address, err.Error())) + klog.Errorf("failed to listen to agent on %s: %s", c.address, err.Error()) return } defer listen.Close() @@ -160,7 +159,7 @@ func (c *agent) Run(ctx context.Context) { grpcServer.Stop() }(ctx) if err := grpcServer.Serve(listen); err != nil { - klog.Errorf(utils.FormatProxyServer("failed to server grpc request from proxy agent server, error %s", err.Error())) + klog.Errorf("failed to server grpc request from proxy agent server, error %s", err.Error()) } }(ctx) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go deleted file mode 100644 index cad4572..0000000 --- a/pkg/utils/utils.go +++ /dev/null @@ -1,41 +0,0 @@ -/* -Copyright 2023 The OpenYurt Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package utils - -import ( - "fmt" -) - -func FormatProxyServer(format string, args ...interface{}) string { - s := fmt.Sprintf(format, args...) - return fmt.Sprintf("[Proxy Server]: %s", s) -} - -func FormatProxyClient(format string, args ...interface{}) string { - s := fmt.Sprintf(format, args...) - return fmt.Sprintf("[Proxy Client]: %s", s) -} - -func FormatTunnel(format string, args ...interface{}) string { - s := fmt.Sprintf(format, args...) - return fmt.Sprintf("[Tunnel Agent]: %s", s) -} - -func FormatRavenEngine(format string, args ...interface{}) string { - s := fmt.Sprintf(format, args...) - return fmt.Sprintf("[Raven Engine]: %s", s) -}