From 5f0b41499ef8a8f4d2d67c06cab11d6008fbd781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8F=A9=E8=BD=A9?= Date: Mon, 13 Nov 2023 14:16:39 +0800 Subject: [PATCH] Remove the dependency on the openyurt project and rely on the openyurt api, and optimize raven agent --- charts/raven-agent/Chart.yaml | 4 +- charts/raven-agent/templates/config.yaml | 4 +- charts/raven-agent/templates/daemonset.yaml | 11 +- charts/raven-agent/templates/rbac.yaml | 1 + charts/raven-agent/values.yaml | 45 +-- cmd/agent/app/options/options.go | 32 +- go.mod | 2 +- go.sum | 4 +- pkg/engine/engine.go | 4 +- pkg/engine/proxy.go | 298 +++++++++------ pkg/engine/tunnel.go | 52 ++- pkg/engine/utils.go | 21 +- pkg/metrics/metrics.go | 6 +- pkg/networkengine/routedriver/driver.go | 16 + pkg/networkengine/routedriver/vxlan/vxlan.go | 2 +- .../routedriver/vxlan/vxlan_test.go | 2 +- pkg/networkengine/vpndriver/driver.go | 16 + pkg/networkengine/vpndriver/driver_test.go | 2 +- .../vpndriver/libreswan/libreswan.go | 2 +- .../vpndriver/wireguard/wireguard.go | 2 +- pkg/proxyengine/proxyclient/proxyclient.go | 4 +- pkg/proxyengine/proxyserver/interceptor.go | 2 +- pkg/proxyengine/proxyserver/manageheader.go | 16 +- pkg/proxyengine/proxyserver/proxyserver.go | 105 +++--- pkg/proxyengine/proxyserver/servers.go | 6 +- pkg/tunnelengine/tunnelagent.go | 11 +- pkg/types/network.go | 2 +- pkg/types/network_test.go | 2 +- pkg/utils/certmanager/factory/factory.go | 175 +++++++++ pkg/utils/certmanager/pki.go | 187 ++++++++++ pkg/utils/certmanager/pki_test.go | 347 ++++++++++++++++++ .../certmanager/store/filestore_wrapper.go | 54 +++ .../store/filestore_wrapper_test.go | 105 ++++++ pkg/utils/constants.go | 10 +- pkg/utils/metaserver.go | 22 +- pkg/utils/utils.go | 2 +- 36 files changed, 1268 insertions(+), 308 deletions(-) create mode 100644 pkg/utils/certmanager/factory/factory.go create mode 100644 pkg/utils/certmanager/pki.go create mode 100644 pkg/utils/certmanager/pki_test.go create mode 100644 pkg/utils/certmanager/store/filestore_wrapper.go create mode 100644 pkg/utils/certmanager/store/filestore_wrapper_test.go diff --git a/charts/raven-agent/Chart.yaml b/charts/raven-agent/Chart.yaml index 24574f6..8963737 100644 --- a/charts/raven-agent/Chart.yaml +++ b/charts/raven-agent/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.1 +version: 0.4.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.2.0" +appVersion: "0.4.0" diff --git a/charts/raven-agent/templates/config.yaml b/charts/raven-agent/templates/config.yaml index 3b1c536..6108bdc 100644 --- a/charts/raven-agent/templates/config.yaml +++ b/charts/raven-agent/templates/config.yaml @@ -1,7 +1,7 @@ apiVersion: v1 data: - enable-l3-tunnel: "true" - enable-l7-proxy: "false" + enable-l3-tunnel: {{.Values.controller.enableTunnel | quote}} + enable-l7-proxy: {{.Values.controller.enableProxy | quote}} kind: ConfigMap metadata: name: raven-cfg diff --git a/charts/raven-agent/templates/daemonset.yaml b/charts/raven-agent/templates/daemonset.yaml index a0d05a5..d12b966 100644 --- a/charts/raven-agent/templates/daemonset.yaml +++ b/charts/raven-agent/templates/daemonset.yaml @@ -27,7 +27,16 @@ spec: {{- toYaml . | nindent 8 }} {{- end }} args: - - --v=4 + - --v=2 + - --vpn-driver={{.Values.vpn.driver}} + - --forward-node-ip={{.Values.vpn.forwardNodeIP}} + - --metric-bind-addr={{.Values.vpn.metricBindAddr}} + - --vpn-bind-port={{.Values.vpn.tunnelAddr}} + - --proxy-metric-bind-addr={{.Values.proxy.metricsBindAddr}} + - --proxy-internal-secure-addr={{.Values.proxy.internalSecureAddr}} + - --proxy-internal-insecure-addr={{.Values.proxy.internalInsecureAddr}} + - --proxy-external-addr={{.Values.proxy.externalAddr}} + hostNetwork: true {{- with .Values.nodeSelector }} nodeSelector: diff --git a/charts/raven-agent/templates/rbac.yaml b/charts/raven-agent/templates/rbac.yaml index 7d5ce61..4430c7e 100644 --- a/charts/raven-agent/templates/rbac.yaml +++ b/charts/raven-agent/templates/rbac.yaml @@ -25,6 +25,7 @@ rules: - configmaps - services - nodes + - pods verbs: - get - list diff --git a/charts/raven-agent/values.yaml b/charts/raven-agent/values.yaml index 92a7a27..fef15f4 100644 --- a/charts/raven-agent/values.yaml +++ b/charts/raven-agent/values.yaml @@ -6,7 +6,7 @@ image: repository: openyurt/raven-agent pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. - tag: latest + tag: v0.4.0 imagePullSecrets: [] nameOverride: "" @@ -58,41 +58,7 @@ containerEnv: secretKeyRef: key: vpn-connection-psk name: raven-agent-secret - - name: VPN_DRIVER - valueFrom: - configMapKeyRef: - key: vpn-driver - name: raven-agent-config - - name: FORWARD_NODE_IP - valueFrom: - configMapKeyRef: - key: forward-node-ip - name: raven-agent-config - - name: METRIC_BIND_ADDR - valueFrom: - configMapKeyRef: - key: metric-bind-addr - name: raven-agent-config - - name: VPN_BIND_ADDRESS - valueFrom: - configMapKeyRef: - key: tunnel-bind-addr - name: raven-agent-config - - name: PROXY_SERVER_INTERNAL_SECURE_ADDRESS - valueFrom: - configMapKeyRef: - key: proxy-internal-secure-addr - name: raven-agent-config - - name: PROXY_SERVER_INTERNAL_INSECURE_ADDRESS - valueFrom: - configMapKeyRef: - key: proxy-internal-insecure-addr - name: raven-agent-config - - name: PROXY_SERVER_EXTERNAL_ADDRESS - valueFrom: - configMapKeyRef: - key: proxy-external-addr - name: raven-agent-config + vpn: driver: libreswan forwardNodeIP: true @@ -104,11 +70,16 @@ vpn: psk: OPENYURT-RAVEN-AGENT-VPN-PSK metricBindAddr: ":10265" tunnelAddr: ":4500" + +controller: + enableProxy: true + enableTunnel: true + proxy: externalAddr: ":10262" internalInsecureAddr: ":10264" internalSecureAddr: ":10263" - + metricsBindAddr: ":10266" rollingUpdate: maxUnavailable: 5% \ No newline at end of file diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 05aa1f9..bfe0e64 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -18,7 +18,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/networkengine/routedriver/vxlan" "github.com/openyurtio/raven/pkg/networkengine/vpndriver" @@ -73,30 +73,6 @@ func (o *AgentOptions) Validate() error { return errors.New("either --node-name or $NODE_NAME has to be set") } } - if o.VPNPort == "" { - o.VPNPort = os.Getenv("VPN_BIND_ADDRESS") - if o.VPNPort == "" { - return errors.New("either --vpn-bind-address or $VPN_BIND_PORT has to be set") - } - } - if o.InternalSecureAddress == "" { - o.InternalSecureAddress = os.Getenv("PROXY_SERVER_INTERNAL_SECURE_ADDRESS") - if o.InternalSecureAddress == "" { - return errors.New("either --proxy-internal-secure-address or PROXY_SERVER_INTERNAL_SECURE_ADDRESS has to be set") - } - } - if o.InternalInsecureAddress == "" { - o.InternalInsecureAddress = os.Getenv("PROXY_SERVER_INTERNAL_INSECURE_ADDRESS") - if o.InternalInsecureAddress == "" { - return errors.New("either --proxy-internal-insecure-address or PROXY_SERVER_INTERNAL_INSECURE_ADDRESS has to be set") - } - } - if o.ExternalAddress == "" { - o.ExternalAddress = os.Getenv("PROXY_SERVER_EXTERNAL_ADDRESS") - if o.ExternalAddress == "" { - return errors.New("either --proxy-external-address or $PROXY_SERVER_EXTERNAL_ADDRESS has to be set") - } - } return nil } @@ -154,6 +130,12 @@ func (o *AgentOptions) Config() (*config.Config, error) { InternalInsecureAddress: o.InternalInsecureAddress, InternalSecureAddress: o.InternalSecureAddress, ExternalAddress: o.ExternalAddress, + + ProxyServerCertDNSNames: o.ProxyServerCertDNSNames, + ProxyServerCertIPs: o.ProxyServerCertIPs, + ProxyClientCertDir: o.ProxyClientCertDir, + ProxyServerCertDir: o.ProxyServerCertDir, + InterceptorServerUDSFile: o.InterceptorServerUDSFile, } if err != nil { return nil, fmt.Errorf("failed to create manager: %s", err) diff --git a/go.mod b/go.mod index dacc6f5..33ed13c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/coreos/go-iptables v0.6.0 github.com/gorilla/mux v1.8.0 github.com/lorenzosaino/go-sysctl v0.3.1 - github.com/openyurtio/openyurt v1.3.1-0.20230920090905-424dcc283167 + github.com/openyurtio/api v0.0.0-20231116122426-dfc46506cdb0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.1 github.com/spf13/cobra v1.7.0 diff --git a/go.sum b/go.sum index 7171522..1e47f96 100644 --- a/go.sum +++ b/go.sum @@ -439,10 +439,10 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/openyurtio/api v0.0.0-20231116122426-dfc46506cdb0 h1:zH7jnLVvAKVPr8WleYOhGG5yAiXoUYu2M2nqqk10/yM= +github.com/openyurtio/api v0.0.0-20231116122426-dfc46506cdb0/go.mod h1:IIVOJ8bnLSzqPB3VHBuQ3wELW4gjRKJ2aSyLhaLxdOA= github.com/openyurtio/apiserver-network-proxy v0.1.1-0.20231007082056-cecf4c454651 h1:pBjrj95WZ7NkaIQvvb7hveI5fd8fXxlX7OtKRcciamY= github.com/openyurtio/apiserver-network-proxy v0.1.1-0.20231007082056-cecf4c454651/go.mod h1:yPrw5zKs7BVVCuQUaY4MzSmsYPKXWnZsrjKUWQrZX3w= -github.com/openyurtio/openyurt v1.3.1-0.20230920090905-424dcc283167 h1:gbjh0njlthM0mM9Z6LHURdKpovLXReleT5GUwaksyFE= -github.com/openyurtio/openyurt v1.3.1-0.20230920090905-424dcc283167/go.mod h1:c77rs9s+ztXyyHNf2EB+lMHuDfSWwel6dhP2ZVi5cvM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 34208e5..4e7bd07 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -16,7 +16,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/utils" ) @@ -27,7 +27,7 @@ type Engine struct { context context.Context manager manager.Manager client client.Client - option StatusOption + option *Option tunnelQueue workqueue.RateLimitingInterface tunnelEngine *TunnelEngine diff --git a/pkg/engine/proxy.go b/pkg/engine/proxy.go index 12d5170..c7ca132 100644 --- a/pkg/engine/proxy.go +++ b/pkg/engine/proxy.go @@ -15,8 +15,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" - ravenutil "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/utils" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/proxyengine" "github.com/openyurtio/raven/pkg/proxyengine/proxyclient" @@ -24,31 +23,55 @@ import ( "github.com/openyurtio/raven/pkg/utils" ) +type ActionType string + +const ( + StartType ActionType = "Start" + StopType ActionType = "Stop" + RestartType ActionType = "Restart" + SkipType ActionType = "Skip" +) + +func JudgeType(curr, spec bool) ActionType { + if curr && spec { + return RestartType + } + if curr && !spec { + return StopType + } + if !curr && spec { + return StartType + } + return SkipType +} + type ProxyEngine struct { - nodeName string - nodeIP string - proxyServerAddresses []string - gateway *v1beta1.Gateway - config *config.Config - client client.Client - engineOption StatusOption - proxyOption *proxyOption - ravenContext context.Context - proxyContext ProxyContext - queue workqueue.RateLimitingInterface + nodeName string + nodeIP string + serverLocalEndpoints []string + clientRemoteEndpoints []string + gateway *v1beta1.Gateway + config *config.Config + client client.Client + + ctx context.Context + option *Option + proxyCtx ProxyContext + proxyOption *proxyOption + queue workqueue.RateLimitingInterface } -func newProxyEngine(ctx context.Context, cfg *config.Config, client client.Client, opt StatusOption, queue workqueue.RateLimitingInterface) *ProxyEngine { +func newProxyEngine(ctx context.Context, cfg *config.Config, client client.Client, opt *Option, queue workqueue.RateLimitingInterface) *ProxyEngine { return &ProxyEngine{ - nodeName: cfg.NodeName, - nodeIP: cfg.NodeIP, - config: cfg, - client: client, - engineOption: opt, - ravenContext: ctx, - proxyOption: newProxyOption(), - proxyContext: newProxyContext(ctx), - queue: queue, + nodeName: cfg.NodeName, + nodeIP: cfg.NodeIP, + config: cfg, + client: client, + option: opt, + ctx: ctx, + proxyCtx: newProxyContext(ctx), + proxyOption: newProxyOption(), + queue: queue, } } @@ -74,119 +97,166 @@ func (p *ProxyEngine) processNextWorkItem() bool { } func (p *ProxyEngine) handler(gw *v1beta1.Gateway) error { - curServer := p.proxyOption.GetServerStatus() - curClient := p.proxyOption.GetClientStatus() - specServer, specClient := p.getRole(enableProxy(gw)) + proxyStatus := enableProxy(gw) + p.option.SetProxyStatus(proxyStatus) + specServer, specClient := p.getRole(proxyStatus) var err error p.gateway, err = utils.GetOwnGateway(p.client, p.nodeName) if err != nil { - klog.Errorf("failed get gateway for %s, can not start proxy server", p.nodeName) - return fmt.Errorf("failed get gateway name for %s, can not start proxy server", p.nodeName) + klog.Errorf(utils.FormatProxyServer("failed get gateway for %s, can not start proxy server", p.nodeName)) + return err } - if !curServer && specServer { - klog.Infoln(utils.FormatProxyServer("start raven l7 proxy server")) - if p.gateway == nil { - klog.Errorf("unknown gateway for node %s, can not start proxy server", p.nodeName) - } - pe := &proxyengine.EnginConfig{ - Name: p.nodeName, - IP: p.nodeIP, - GatewayName: p.gateway.Name, - CertDir: p.config.Proxy.ProxyServerCertDir, - MetaAddress: p.config.Proxy.ProxyMetricsAddress, - CertIPs: p.config.Proxy.ProxyServerCertIPs, - CertDNSNames: p.config.Proxy.ProxyServerCertDNSNames, - InterceptorUDSFile: p.config.Proxy.InterceptorServerUDSFile, - InternalSecureAddress: p.config.Proxy.InternalSecureAddress, - InternalInsecureAddress: p.config.Proxy.InternalInsecureAddress, - ExposedAddress: p.config.Proxy.ExternalAddress, - } - ctx := p.proxyContext.GetServerContext() - ps, err := proxyserver.NewProxyServer(pe, p.client, p.config.Manager.GetConfig(), p.gateway.DeepCopy()) + + switch JudgeType(p.proxyOption.GetServerStatus(), specServer) { + case StartType: + err = p.startProxyServer() if err != nil { - klog.Errorf("failed to new proxy server, error %s", err.Error()) + klog.Errorf(utils.FormatProxyServer("failed to start proxy server, error %s", err.Error())) return err } - err = ps.Start(ctx) - if err != nil { - klog.Errorf("failed to start proxy server, error %s", err.Error()) + case StopType: + p.stopProxyServer() + case RestartType: + srcAddr := getSrcAddressForProxyServer(p.client, p.nodeName) + if computeHash(strings.Join(p.serverLocalEndpoints, ",")) != computeHash(strings.Join(srcAddr, ",")) { + p.stopProxyServer() + time.Sleep(time.Second) + err = p.startProxyServer() + if err != nil { + klog.Errorf(utils.FormatProxyServer("failed to start proxy server, error %s", err.Error())) + return err + } + p.serverLocalEndpoints = srcAddr } - p.proxyOption.SetServerStatus(specServer) - } else if curServer && !specServer { - klog.Infoln(utils.FormatProxyServer("Stop raven l7 proxy server")) - cancel := p.proxyContext.GetServerCancelFunc() - cancel() - p.proxyOption.SetServerStatus(specServer) - p.proxyContext.ReloadServerContext(p.ravenContext) + default: + } - if !curClient && specClient { - klog.Infoln(utils.FormatProxyClient("start raven l7 proxy client")) - var err error - dstAddr := getDestAddressForProxyClient(p.client, p.gateway) - if len(dstAddr) < 1 { - klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it")) - return nil - } - p.proxyServerAddresses = dstAddr - pe := &proxyengine.EnginConfig{ - Name: p.nodeName, - IP: p.nodeIP, - CertDir: p.config.Proxy.ProxyClientCertDir, - MetaAddress: p.config.Proxy.ProxyMetricsAddress, - } - pc, err := proxyclient.NewProxyClient(pe, p.proxyServerAddresses, p.config.KubeConfig) + switch JudgeType(p.proxyOption.GetClientStatus(), specClient) { + case StartType: + err = p.startProxyClient() if err != nil { - klog.Errorf("failed to new proxy client, error %s", err.Error()) + klog.Errorf(utils.FormatProxyServer("failed to start proxy client, error %s", err.Error())) return err } - ctx := p.proxyContext.GetClientContext() - err = pc.Start(ctx) - if err != nil { - klog.Errorf("failed to start proxy client, error %s", err.Error()) - } - p.proxyOption.SetClientStatus(specClient) - return nil - } else if curClient && !specClient { - klog.Infoln(utils.FormatProxyClient("stop raven l7 proxy client")) - cancel := p.proxyContext.GetClientCancelFunc() - cancel() - p.proxyOption.SetClientStatus(specClient) - p.proxyContext.ReloadClientContext(p.ravenContext) - } else if curClient && specClient { + case StopType: + p.stopProxyClient() + case RestartType: dstAddr := getDestAddressForProxyClient(p.client, p.gateway) if len(dstAddr) < 1 { klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it")) return nil } - if computeHash(strings.Join(p.proxyServerAddresses, ",")) != computeHash(strings.Join(dstAddr, ",")) { - klog.Infoln(utils.FormatProxyClient("Update raven l7 proxy client")) - cancel := p.proxyContext.GetClientCancelFunc() - cancel() - time.Sleep(2 * time.Second) - p.proxyContext.ReloadClientContext(p.ravenContext) - p.proxyServerAddresses = dstAddr - pe := &proxyengine.EnginConfig{ - Name: p.nodeName, - IP: p.nodeIP, - CertDir: p.config.Proxy.ProxyClientCertDir, - MetaAddress: p.config.Proxy.ProxyMetricsAddress, - } - pc, err := proxyclient.NewProxyClient(pe, p.proxyServerAddresses, p.config.KubeConfig) + if computeHash(strings.Join(p.clientRemoteEndpoints, ",")) != computeHash(strings.Join(dstAddr, ",")) { + p.stopProxyClient() + time.Sleep(time.Second) + err = p.startProxyClient() if err != nil { - klog.Errorf("failed to new proxy server, error %s", err.Error()) + klog.Errorf(utils.FormatProxyServer("failed to start proxy client, error %s", err.Error())) return err } - ctx := p.proxyContext.GetClientContext() - err = pc.Start(ctx) - if err != nil { - klog.Errorf("failed to start proxy client, error %s", err.Error()) - } } + default: + } return nil } +func (p *ProxyEngine) startProxyServer() error { + klog.Infoln(utils.FormatProxyServer("start raven l7 proxy server")) + if p.gateway == nil { + return fmt.Errorf("unknown gateway for node %s, can not start proxy server", p.nodeName) + } + pe := &proxyengine.EnginConfig{ + Name: p.nodeName, + IP: p.nodeIP, + GatewayName: p.gateway.Name, + CertDir: p.config.Proxy.ProxyServerCertDir, + MetaAddress: p.config.Proxy.ProxyMetricsAddress, + CertIPs: p.config.Proxy.ProxyServerCertIPs, + CertDNSNames: p.config.Proxy.ProxyServerCertDNSNames, + InterceptorUDSFile: p.config.Proxy.InterceptorServerUDSFile, + InternalSecureAddress: p.config.Proxy.InternalSecureAddress, + InternalInsecureAddress: p.config.Proxy.InternalInsecureAddress, + ExposedAddress: p.config.Proxy.ExternalAddress, + } + ctx := p.proxyCtx.GetServerContext() + ps, err := proxyserver.NewProxyServer(pe, p.client, p.config.Manager.GetConfig(), p.gateway.DeepCopy()) + if err != nil { + return fmt.Errorf("failed to new proxy server, error %s", err.Error()) + } + err = ps.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start proxy server, error %s", err.Error()) + } + p.proxyOption.SetServerStatus(true) + return nil +} + +func (p *ProxyEngine) stopProxyServer() { + klog.Infoln(utils.FormatProxyServer("Stop raven l7 proxy server")) + cancel := p.proxyCtx.GetServerCancelFunc() + cancel() + p.proxyOption.SetServerStatus(false) + p.proxyCtx.ReloadServerContext(p.ctx) +} + +func (p *ProxyEngine) startProxyClient() error { + klog.Infoln(utils.FormatProxyClient("start raven l7 proxy client")) + var err error + dstAddr := getDestAddressForProxyClient(p.client, p.gateway) + if len(dstAddr) < 1 { + klog.Infoln(utils.FormatProxyClient("dest address is empty, will not connected it")) + return nil + } + p.clientRemoteEndpoints = dstAddr + pe := &proxyengine.EnginConfig{ + Name: p.nodeName, + IP: p.nodeIP, + CertDir: p.config.Proxy.ProxyClientCertDir, + MetaAddress: p.config.Proxy.ProxyMetricsAddress, + } + pc, err := proxyclient.NewProxyClient(pe, p.clientRemoteEndpoints, p.config.KubeConfig) + if err != nil { + klog.Errorf("failed to new proxy client, error %s", err.Error()) + return err + } + ctx := p.proxyCtx.GetClientContext() + err = pc.Start(ctx) + if err != nil { + klog.Errorf("failed to start proxy client, error %s", err.Error()) + } + p.proxyOption.SetClientStatus(true) + return nil +} + +func (p *ProxyEngine) stopProxyClient() { + klog.Infoln(utils.FormatProxyClient("stop raven l7 proxy client")) + cancel := p.proxyCtx.GetClientCancelFunc() + cancel() + p.proxyOption.SetClientStatus(false) + p.proxyCtx.ReloadClientContext(p.ctx) +} +func getSrcAddressForProxyServer(client client.Client, nodeName string) []string { + srcAddr := make([]string, 0) + var gwList v1beta1.GatewayList + err := client.List(context.TODO(), &gwList) + if err != nil { + return srcAddr + } + for _, gw := range gwList.Items { + if gw.Spec.ExposeType == "" { + continue + } + for _, aep := range gw.Status.ActiveEndpoints { + if aep.NodeName == nodeName && aep.Type == v1beta1.Proxy { + srcAddr = append(srcAddr, aep.PublicIP) + } + } + } + return srcAddr +} + func getDestAddressForProxyClient(client client.Client, ownGateway *v1beta1.Gateway) []string { destAddr := make([]string, 0) var gwList v1beta1.GatewayList @@ -218,7 +288,7 @@ func (p *ProxyEngine) getRole(enableProxy bool) (enableServer, enableClient bool return } var gwList v1beta1.GatewayList - err := p.client.List(p.ravenContext, &gwList) + err := p.client.List(p.ctx, &gwList) if err != nil { return } @@ -250,11 +320,11 @@ func (p *ProxyEngine) getRole(enableProxy bool) (enableServer, enableClient bool func (p *ProxyEngine) stopServers() { if p.proxyOption.GetServerStatus() { - cancelServer := p.proxyContext.GetServerCancelFunc() + cancelServer := p.proxyCtx.GetServerCancelFunc() cancelServer() } if p.proxyOption.GetClientStatus() { - cancelClient := p.proxyContext.GetClientCancelFunc() + cancelClient := p.proxyCtx.GetClientCancelFunc() cancelClient() } } @@ -267,7 +337,7 @@ func enableProxy(gw *v1beta1.Gateway) (enable bool) { enable = false return } - start, ok := aep.Config[ravenutil.RavenEnableProxy] + start, ok := aep.Config[utils.RavenEnableProxy] if !ok { enable = false return diff --git a/pkg/engine/tunnel.go b/pkg/engine/tunnel.go index 9b39558..afade72 100644 --- a/pkg/engine/tunnel.go +++ b/pkg/engine/tunnel.go @@ -2,12 +2,13 @@ package engine import ( "fmt" + "strings" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/networkengine/routedriver" "github.com/openyurtio/raven/pkg/networkengine/vpndriver" @@ -19,14 +20,14 @@ type TunnelEngine struct { nodeName string config *config.Config client client.Client - option StatusOption + option *Option queue workqueue.RateLimitingInterface routeDriver routedriver.Driver vpnDriver vpndriver.Driver tunnelHandler *tunnelengine.TunnelHandler } -func newTunnelEngine(cfg *config.Config, client client.Client, opt StatusOption, queue workqueue.RateLimitingInterface) *TunnelEngine { +func newTunnelEngine(cfg *config.Config, client client.Client, opt *Option, queue workqueue.RateLimitingInterface) *TunnelEngine { return &TunnelEngine{nodeName: cfg.NodeName, config: cfg, client: client, option: opt, queue: queue} } @@ -52,11 +53,17 @@ func (t *TunnelEngine) processNextWorkItem() bool { func (t *TunnelEngine) handler(gw *v1beta1.Gateway) error { klog.Info(utils.FormatRavenEngine("update raven l3 tunnel config for gateway %s", gw.GetName())) - err := t.reconcile() + if t.routeDriver == nil || t.vpnDriver == nil { + err := t.initDriver() + if err != nil { + klog.Errorf(utils.FormatRavenEngine("failed to init raven l3 tunnel engine")) + } + } + err := t.tunnelHandler.Handler() if err != nil { - klog.Errorf("failed update tunnel driver, error %s", err.Error()) return err } + t.option.SetTunnelStatus(enableTunnel(gw)) return nil } @@ -97,20 +104,6 @@ func (t *TunnelEngine) clearDriver() error { return nil } -func (t *TunnelEngine) reconcile() error { - if t.routeDriver == nil || t.vpnDriver == nil { - err := t.initDriver() - if err != nil { - klog.Errorf(utils.FormatRavenEngine("failed to init raven l3 tunnel engine")) - } - } - err := t.tunnelHandler.Handler() - if err != nil { - return err - } - return nil -} - func (t *TunnelEngine) handleEventErr(err error, event interface{}) { if err == nil { t.queue.Forget(event) @@ -124,3 +117,24 @@ func (t *TunnelEngine) handleEventErr(err error, event interface{}) { klog.Info(utils.FormatRavenEngine("dropping event %q out of the queue: %v", event, err)) t.queue.Forget(event) } + +func enableTunnel(gw *v1beta1.Gateway) (enable bool) { + enable = false + for _, aep := range gw.Status.ActiveEndpoints { + if aep.Type == v1beta1.Tunnel { + if aep.Config == nil { + enable = false + return + } + start, ok := aep.Config[utils.RavenEnableTunnel] + if !ok { + enable = false + return + } + if strings.ToLower(start) == "true" { + enable = true + } + } + } + return +} diff --git a/pkg/engine/utils.go b/pkg/engine/utils.go index de9e829..7b72dd1 100644 --- a/pkg/engine/utils.go +++ b/pkg/engine/utils.go @@ -5,42 +5,35 @@ import ( "sync" ) -type StatusOption interface { - GetProxyStatus() bool - GetTunnelStatus() bool - SetProxyStatus(status bool) - SetTunnelStatus(status bool) -} - -type engineOption struct { +type Option struct { mu sync.Mutex enableProxy bool enableTunnel bool } -func NewEngineOption() StatusOption { - return &engineOption{enableTunnel: false, enableProxy: false} +func NewEngineOption() *Option { + return &Option{enableTunnel: false, enableProxy: false} } -func (s *engineOption) GetProxyStatus() bool { +func (s *Option) GetProxyStatus() bool { s.mu.Lock() defer s.mu.Unlock() return s.enableProxy } -func (s *engineOption) GetTunnelStatus() bool { +func (s *Option) GetTunnelStatus() bool { s.mu.Lock() defer s.mu.Unlock() return s.enableTunnel } -func (s *engineOption) SetProxyStatus(status bool) { +func (s *Option) SetProxyStatus(status bool) { s.mu.Lock() defer s.mu.Unlock() s.enableProxy = status } -func (s *engineOption) SetTunnelStatus(status bool) { +func (s *Option) SetTunnelStatus(status bool) { s.mu.Lock() defer s.mu.Unlock() s.enableTunnel = status diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index d3534b4..0315668 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -17,15 +17,11 @@ limitations under the License. package metrics import ( - "strings" - "github.com/prometheus/client_golang/prometheus" - - "github.com/openyurtio/openyurt/pkg/projectinfo" ) var ( - namespace = strings.ReplaceAll(projectinfo.GetTunnelName(), "-", "_") + namespace = "raven_agent" subsystem = "server" ) diff --git a/pkg/networkengine/routedriver/driver.go b/pkg/networkengine/routedriver/driver.go index 65cf526..2a49148 100644 --- a/pkg/networkengine/routedriver/driver.go +++ b/pkg/networkengine/routedriver/driver.go @@ -1,3 +1,19 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package routedriver import ( diff --git a/pkg/networkengine/routedriver/vxlan/vxlan.go b/pkg/networkengine/routedriver/vxlan/vxlan.go index 811029b..d071751 100644 --- a/pkg/networkengine/routedriver/vxlan/vxlan.go +++ b/pkg/networkengine/routedriver/vxlan/vxlan.go @@ -28,7 +28,7 @@ import ( "github.com/vishvananda/netlink" "k8s.io/klog/v2" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/networkengine/routedriver" networkutil "github.com/openyurtio/raven/pkg/networkengine/util" diff --git a/pkg/networkengine/routedriver/vxlan/vxlan_test.go b/pkg/networkengine/routedriver/vxlan/vxlan_test.go index 37878c0..018b0a8 100644 --- a/pkg/networkengine/routedriver/vxlan/vxlan_test.go +++ b/pkg/networkengine/routedriver/vxlan/vxlan_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/vishvananda/netlink" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" networkutil "github.com/openyurtio/raven/pkg/networkengine/util" netlinkutil "github.com/openyurtio/raven/pkg/networkengine/util/netlink" "github.com/openyurtio/raven/pkg/types" diff --git a/pkg/networkengine/vpndriver/driver.go b/pkg/networkengine/vpndriver/driver.go index f377025..7eca5e7 100644 --- a/pkg/networkengine/vpndriver/driver.go +++ b/pkg/networkengine/vpndriver/driver.go @@ -1,3 +1,19 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package vpndriver import ( diff --git a/pkg/networkengine/vpndriver/driver_test.go b/pkg/networkengine/vpndriver/driver_test.go index 1ac9f10..4707f83 100644 --- a/pkg/networkengine/vpndriver/driver_test.go +++ b/pkg/networkengine/vpndriver/driver_test.go @@ -4,7 +4,7 @@ import ( "reflect" "testing" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/cmd/agent/app/config" "github.com/openyurtio/raven/pkg/types" diff --git a/pkg/networkengine/vpndriver/libreswan/libreswan.go b/pkg/networkengine/vpndriver/libreswan/libreswan.go index 8a4098a..9db1802 100644 --- a/pkg/networkengine/vpndriver/libreswan/libreswan.go +++ b/pkg/networkengine/vpndriver/libreswan/libreswan.go @@ -319,7 +319,7 @@ func whackCmdFn(args ...string) error { time.Sleep(1 * time.Second) } if err != nil { - return fmt.Errorf("error whacking with %v: %v", args, err) + return fmt.Errorf("error whacking with %v: status code %v, error %s", args, err, string(output)) } return nil } diff --git a/pkg/networkengine/vpndriver/wireguard/wireguard.go b/pkg/networkengine/vpndriver/wireguard/wireguard.go index 6c640ae..9cdadb8 100644 --- a/pkg/networkengine/vpndriver/wireguard/wireguard.go +++ b/pkg/networkengine/vpndriver/wireguard/wireguard.go @@ -26,7 +26,7 @@ import ( "strconv" "time" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" "github.com/pkg/errors" "github.com/vdobler/ht/errorlist" "github.com/vishvananda/netlink" diff --git a/pkg/proxyengine/proxyclient/proxyclient.go b/pkg/proxyengine/proxyclient/proxyclient.go index cbf73e8..9cb3bf6 100644 --- a/pkg/proxyengine/proxyclient/proxyclient.go +++ b/pkg/proxyengine/proxyclient/proxyclient.go @@ -32,10 +32,10 @@ import ( "k8s.io/klog/v2" anp "sigs.k8s.io/apiserver-network-proxy/pkg/agent" - "github.com/openyurtio/openyurt/pkg/util/certmanager" - "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" "github.com/openyurtio/raven/pkg/proxyengine" "github.com/openyurtio/raven/pkg/utils" + "github.com/openyurtio/raven/pkg/utils/certmanager" + "github.com/openyurtio/raven/pkg/utils/certmanager/factory" ) type ProxyClient struct { diff --git a/pkg/proxyengine/proxyserver/interceptor.go b/pkg/proxyengine/proxyserver/interceptor.go index ad988a3..03b9dac 100644 --- a/pkg/proxyengine/proxyserver/interceptor.go +++ b/pkg/proxyengine/proxyserver/interceptor.go @@ -118,7 +118,7 @@ func NewInterceptor(udsFile string, cfg *tls.Config) http.Handler { return nil, fmt.Errorf("unrecognize the proxy forwarding mode") } if isTLS { - cfg.InsecureSkipVerify = false + cfg.InsecureSkipVerify = true tlsConn := tls.Client(conn, cfg) if err := tlsConn.Handshake(); err != nil { conn.Close() diff --git a/pkg/proxyengine/proxyserver/manageheader.go b/pkg/proxyengine/proxyserver/manageheader.go index e07e4e0..4ecceed 100644 --- a/pkg/proxyengine/proxyserver/manageheader.go +++ b/pkg/proxyengine/proxyserver/manageheader.go @@ -19,22 +19,21 @@ package proxyserver import ( "context" "fmt" + "math/rand" "net" "net/http" "strconv" "strings" "time" - "math/rand" - v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/openyurtio/openyurt/pkg/apis/raven" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/pkg/metrics" "github.com/openyurtio/raven/pkg/utils" ) @@ -89,7 +88,7 @@ func (h *headerManger) Handler(handler http.Handler) http.Handler { } } if host == "" || ip == "" || port == "" { - logAndHTTPError(w, http.StatusBadRequest, "request host %s and url %s is invalid, %s", r.Host, r.URL.String(), err.Error()) + logAndHTTPError(w, http.StatusBadRequest, "request host %s and url %s is invalid", r.Host, r.URL.String()) return } // Complete request header information @@ -143,18 +142,19 @@ func (h *headerManger) getAPIServerRequestDestAddress(r *http.Request) (name, ip if err != nil { return "", "", "", err } - name, err = h.getGatewayNodeName(&node) if err != nil { return "", "", "", fmt.Errorf("gateway include node %s, has no active endpoints, error %s", node.Name, err.Error()) } - ip = getNodeIP(&node) if ip == "" { return "", "", "", fmt.Errorf("node %s ip is empty", node.Name) } - port = strconv.Itoa(int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)) + _, port, _ = net.SplitHostPort(r.Header.Get(utils.RavenProxyDestHeaderKey)) + if port == "" { + port = strconv.Itoa(int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)) + } return name, ip, port, nil } diff --git a/pkg/proxyengine/proxyserver/proxyserver.go b/pkg/proxyengine/proxyserver/proxyserver.go index 6359851..a3047fb 100644 --- a/pkg/proxyengine/proxyserver/proxyserver.go +++ b/pkg/proxyengine/proxyserver/proxyserver.go @@ -38,13 +38,12 @@ import ( anpserver "sigs.k8s.io/apiserver-network-proxy/pkg/server" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/openyurtio/openyurt/pkg/apis/raven" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" - "github.com/openyurtio/openyurt/pkg/util/certmanager" - "github.com/openyurtio/openyurt/pkg/util/certmanager/factory" - ravenutils "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/utils" + "github.com/openyurtio/api/raven" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/pkg/proxyengine" "github.com/openyurtio/raven/pkg/utils" + "github.com/openyurtio/raven/pkg/utils/certmanager" + "github.com/openyurtio/raven/pkg/utils/certmanager/factory" ) type ProxyServer struct { @@ -110,12 +109,12 @@ func NewProxyServer(cfg *proxyengine.EnginConfig, client client.Client, kubeCfg } func (c *ProxyServer) Start(ctx context.Context) error { - dnsNames, IPs := c.getProxyServerIPsAndDNSName(ctx) + dnsNames, IPs := c.getProxyServerIPsAndDNSName() certFactory := factory.NewCertManagerFactory(c.clientSet) serverCertCfg := &factory.CertManagerConfig{ IPs: append(c.certIPs, IPs...), IPGetter: func() ([]net.IP, error) { - _, ips := c.getProxyServerIPsAndDNSName(ctx) + _, ips := c.getProxyServerIPsAndDNSName() return ips, nil }, DNSNames: append(c.certDNSNames, dnsNames...), @@ -128,10 +127,10 @@ func (c *ProxyServer) Start(ctx context.Context) error { } serverCertMgr, err := certFactory.New(serverCertCfg) if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to new server cert manager factory for proxy server %s, error %s", c.nodeName, err.Error())) return fmt.Errorf("failed to new server cert manager factory for proxy server %s, error %s", c.nodeName, err.Error()) } serverCertMgr.Start() + defer serverCertMgr.Stop() proxyCertCfg := &factory.CertManagerConfig{ CertDir: c.certDir, @@ -142,7 +141,6 @@ func (c *ProxyServer) Start(ctx context.Context) error { } proxyCertMgr, err := certFactory.New(proxyCertCfg) if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to new proxy cert manager factory for proxy server %s, error %s", c.nodeName, err.Error())) return fmt.Errorf("failed to new proxy cert manager factory for proxy server %s, error %s", c.nodeName, err.Error()) } proxyCertMgr.Start() @@ -165,11 +163,14 @@ func (c *ProxyServer) Start(ctx context.Context) error { return err } utils.RunMetaServer(ctx, c.metaAddress) - c.runServers(ctx) + err = c.runServers(ctx) + if err != nil { + return fmt.Errorf("failed to run proxy servers, error %s", err.Error()) + } return nil } -func (c *ProxyServer) runServers(ctx context.Context) { +func (c *ProxyServer) runServers(ctx context.Context) error { klog.Info(utils.FormatProxyServer("start proxy server")) strategy := []anpserver.ProxyStrategy{anpserver.ProxyStrategyDestHost} proxyServer := anpserver.NewProxyServer(c.nodeName, strategy, 1, &anpserver.AgentTokenAuthenticationOptions{}) @@ -178,58 +179,60 @@ func (c *ProxyServer) runServers(ctx context.Context) { headerMgr := NewHeaderManager(c.client, c.gateway.GetName(), utilnet.IsIPv4String(c.nodeIP)) NewMaster(headerMgr.Handler(interceptor), c.serverTLSConfig, c.internalSecureAddress, c.internalInsecureAddress).Run(ctx) NewAgent(c.serverTLSConfig, proxyServer, c.exposedAddress).Run(ctx) + return nil } -func (c *ProxyServer) getProxyServerIPsAndDNSName(ctx context.Context) (dnsName []string, ipAddr []net.IP) { +func (c *ProxyServer) getProxyServerIPsAndDNSName() (dnsName []string, ipAddr []net.IP) { + ipAddr = append(ipAddr, net.ParseIP(c.nodeIP)) ipAddr = append(ipAddr, net.ParseIP(utils.DefaultLoopBackIP4)) + ipAddr = append(ipAddr, c.certIPs...) dnsName = append(dnsName, c.nodeName) - _ = wait.PollImmediateWithContext(ctx, 10*time.Second, 10*time.Minute, func(ctx context.Context) (done bool, err error) { - var svc v1.Service - err = c.client.Get(ctx, types.NamespacedName{Namespace: ravenutils.WorkingNamespace, Name: ravenutils.GatewayProxyInternalService}, &svc) - if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to get internal service %s/%s to get proxy server IPs and DNSNames, error %s", - svc.GetNamespace(), svc.GetName(), err.Error())) - return false, nil - } + + var svc v1.Service + err := c.client.Get(context.TODO(), types.NamespacedName{Namespace: utils.WorkingNamespace, Name: utils.GatewayProxyInternalService}, &svc) + if err != nil { + klog.Errorf(utils.FormatProxyServer("failed to get internal service %s/%s to get proxy server IPs and DNSNames, error %s", + svc.GetNamespace(), svc.GetName(), err.Error())) + return + } + dnsName = append(dnsName, getDefaultDomainsForSvc(svc.GetNamespace(), svc.GetName())...) + if svc.Spec.ClusterIP != "" { + ipAddr = append(ipAddr, net.ParseIP(svc.Spec.ClusterIP)) + } + var svcList v1.ServiceList + err = c.client.List(context.TODO(), &svcList, &client.ListOptions{ + LabelSelector: labels.Set{ + raven.LabelCurrentGateway: c.gateway.GetName(), + utils.LabelCurrentGatewayType: v1beta1.Proxy, + utils.LabelCurrentGatewayEndpoints: c.nodeName, + }.AsSelector(), + }) + if err != nil { + klog.Errorf(utils.FormatProxyServer("failed to get public serivce for gateway %s, node %s to get proxy server IPs and DNSNames, error %s", + c.gateway.GetName(), c.nodeName, err.Error())) + return + } + + for _, svc = range svcList.Items { dnsName = append(dnsName, getDefaultDomainsForSvc(svc.GetNamespace(), svc.GetName())...) + dnsName = append(dnsName, getExternalDNSName(&svc)...) + ipAddr = append(ipAddr, getExternalIPForSvc(&svc)...) if svc.Spec.ClusterIP != "" { ipAddr = append(ipAddr, net.ParseIP(svc.Spec.ClusterIP)) } - var svcList v1.ServiceList - err = c.client.List(ctx, &svcList, &client.ListOptions{ - LabelSelector: labels.Set{ - raven.LabelCurrentGateway: c.gateway.GetName(), - raven.LabelCurrentGatewayType: v1beta1.Proxy, - ravenutils.LabelCurrentGatewayEndpoints: c.nodeName, - }.AsSelector(), - }) - if err != nil { - klog.Errorf(utils.FormatProxyServer("failed to get public service for gateway %s, node %s to get proxy server IPs and DNSNames, error %s", - c.gateway.GetName(), c.nodeName, err.Error())) - return false, nil - } - - for _, svc = range svcList.Items { - dnsName = append(dnsName, getDefaultDomainsForSvc(svc.GetNamespace(), svc.GetName())...) - dnsName = append(dnsName, getExternalDNSName(&svc)...) - ipAddr = append(ipAddr, getExternalIPForSvc(&svc)...) - if svc.Spec.ClusterIP != "" { - ipAddr = append(ipAddr, net.ParseIP(svc.Spec.ClusterIP)) - } - if svc.Status.LoadBalancer.Ingress != nil { - for _, ing := range svc.Status.LoadBalancer.Ingress { - if ing.IP != "" { - ipAddr = append(ipAddr, net.ParseIP(ing.IP)) - } - if ing.Hostname != "" { - dnsName = append(dnsName, ing.Hostname) - } + if svc.Status.LoadBalancer.Ingress != nil { + for _, ing := range svc.Status.LoadBalancer.Ingress { + if ing.IP != "" { + ipAddr = append(ipAddr, net.ParseIP(ing.IP)) + } + if ing.Hostname != "" { + dnsName = append(dnsName, ing.Hostname) } } } - return true, nil - }) + } + klog.V(3).Info("cert address is %v", ipAddr) return } diff --git a/pkg/proxyengine/proxyserver/servers.go b/pkg/proxyengine/proxyserver/servers.go index 52c41e4..3d7b668 100644 --- a/pkg/proxyengine/proxyserver/servers.go +++ b/pkg/proxyengine/proxyserver/servers.go @@ -60,7 +60,7 @@ func (p *proxies) Run(ctx context.Context) { defer listen.Close() go func(ctx context.Context) { <-ctx.Done() - err = server.Shutdown(ctx) + err := server.Shutdown(context.TODO()) if err != nil { klog.Errorf("failed to shutdown proxies server, error %s", err.Error()) } @@ -96,7 +96,7 @@ func (m *master) Run(ctx context.Context) { } go func(ctx context.Context) { <-ctx.Done() - err := server.Shutdown(ctx) + err := server.Shutdown(context.TODO()) if err != nil { klog.Errorf("failed to shutdown master secure server, error %s", err.Error()) } @@ -117,7 +117,7 @@ func (m *master) Run(ctx context.Context) { } go func(ctx context.Context) { <-ctx.Done() - err := server.Shutdown(ctx) + err := server.Shutdown(context.TODO()) if err != nil { klog.Errorf("failed to shutdown master insecure server, error %s", err.Error()) } diff --git a/pkg/tunnelengine/tunnelagent.go b/pkg/tunnelengine/tunnelagent.go index ab87d17..aa5559a 100644 --- a/pkg/tunnelengine/tunnelagent.go +++ b/pkg/tunnelengine/tunnelagent.go @@ -30,9 +30,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/EvilSuperstars/go-cidrman" - "github.com/openyurtio/openyurt/pkg/apis/raven" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" - ravenutils "github.com/openyurtio/openyurt/pkg/yurtmanager/controller/raven/utils" + "github.com/openyurtio/api/raven" + "github.com/openyurtio/api/raven/v1beta1" "github.com/openyurtio/raven/pkg/networkengine/routedriver" "github.com/openyurtio/raven/pkg/networkengine/vpndriver" "github.com/openyurtio/raven/pkg/types" @@ -255,9 +254,9 @@ func (c *TunnelHandler) getLoadBalancerPublicIP(gwName string) (string, error) { var svcList v1.ServiceList err := c.ravenClient.List(context.TODO(), &svcList, &client.ListOptions{ LabelSelector: labels.Set{ - raven.LabelCurrentGateway: gwName, - raven.LabelCurrentGatewayType: v1beta1.Tunnel, - ravenutils.LabelCurrentGatewayEndpoints: c.nodeName, + raven.LabelCurrentGateway: gwName, + utils.LabelCurrentGatewayType: v1beta1.Tunnel, + utils.LabelCurrentGatewayEndpoints: c.nodeName, }.AsSelector(), }) if err != nil { diff --git a/pkg/types/network.go b/pkg/types/network.go index 0c53ea6..cc80538 100644 --- a/pkg/types/network.go +++ b/pkg/types/network.go @@ -17,7 +17,7 @@ package types import ( - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" ) // GatewayName is the type representing the name of Gateway. diff --git a/pkg/types/network_test.go b/pkg/types/network_test.go index 9efc71a..40892e4 100644 --- a/pkg/types/network_test.go +++ b/pkg/types/network_test.go @@ -20,7 +20,7 @@ import ( "reflect" "testing" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" ) const ( diff --git a/pkg/utils/certmanager/factory/factory.go b/pkg/utils/certmanager/factory/factory.go new file mode 100644 index 0000000..13a9b01 --- /dev/null +++ b/pkg/utils/certmanager/factory/factory.go @@ -0,0 +1,175 @@ +/* +Copyright 2022 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "fmt" + "net" + "reflect" + + certificatesv1 "k8s.io/api/certificates/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/certificate" + "k8s.io/klog/v2" + + "github.com/openyurtio/raven/pkg/utils/certmanager/store" +) + +type IPGetter func() ([]net.IP, error) +type DNSGetter func() ([]string, error) + +// CertManagerConfig specifies the attributes of the created CertManager +type CertManagerConfig struct { + // ComponentName represents the name of the component which will use this CertManager. + ComponentName string + // CommonName is CN of cert. + CommonName string + // CertDir represents the dir of local file system where the cert related files will be stored. + CertDir string + // Organizations is O of cert. + Organizations []string + // DNSNames contain a list of DNS names this component will use. + // Note: + // If DNSGetter is set and it can get dns names with no error returned, + // DNSNames will be ignored and what got from DNSGetter will be used. + DNSNames []string + // DNSGetter can get dns names at runtime. If no error returned when getting dns names, + // these dns names will be used in the cert instead of the DNSNames. + DNSGetter + // IPs contain a list of IP this component will use. + // Note: + // If IPGetter is set and it can get ips with no error returned, + // IPs will be ignored and what got from IPGetter will be used. + IPs []net.IP + // IPGetter can get ips at runtime. If no error returned when getting ips, + // these ips will be used in the cert instead of the IPs. + IPGetter + // SignerName can specified the signer of Kubernetes, which can be one of + // 1. "kubernetes.io/kube-apiserver-client" + // 2. "kubernetes.io/kube-apiserver-client-kubelet" + // 3. "kubernetes.io/kubelet-serving" + // More details can be found at k8s.io/api/certificates/v1 + SignerName string + // ForServerUsage indicates the usage of this cert. + // If set, UsageServerAuth will be used, otherwise UsageClientAuth will be used. + // Additionally, UsageKeyEncipherment and UsageDigitalSignature are always used. + ForServerUsage bool +} + +// CertManagerFactory knows how to create CertManager for OpenYurt Components. +type CertManagerFactory interface { + // New function will create the CertManager as what CertManagerConfig specified. + New(*CertManagerConfig) (certificate.Manager, error) +} + +type factory struct { + clientsetFn certificate.ClientsetFunc + fileStore certificate.FileStore +} + +func NewCertManagerFactory(clientSet kubernetes.Interface) CertManagerFactory { + return &factory{ + clientsetFn: func(current *tls.Certificate) (kubernetes.Interface, error) { + return clientSet, nil + }, + } +} + +func NewCertManagerFactoryWithFnAndStore(clientsetFn certificate.ClientsetFunc, store certificate.FileStore) CertManagerFactory { + return &factory{ + clientsetFn: clientsetFn, + fileStore: store, + } +} + +func (f *factory) New(cfg *CertManagerConfig) (certificate.Manager, error) { + var err error + if IsNil(f.fileStore) { + f.fileStore, err = store.NewFileStoreWrapper(cfg.ComponentName, cfg.CertDir, cfg.CertDir, "", "") + if err != nil { + return nil, fmt.Errorf("failed to initialize the server certificate store: %w", err) + } + } + + ips, dnsNames := cfg.IPs, cfg.DNSNames + getTemplate := func() *x509.CertificateRequest { + if cfg.IPGetter != nil { + newIPs, err := cfg.IPGetter() + if err == nil && len(newIPs) != 0 { + klog.V(4).Infof("cr template of %s uses ips=%#+v", cfg.ComponentName, newIPs) + ips = newIPs + } + if err != nil { + klog.Errorf("failed to get ips for %s when preparing cr template, %v", cfg.ComponentName, err) + return nil + } + } + if cfg.DNSGetter != nil { + newDNSNames, err := cfg.DNSGetter() + if err == nil && len(newDNSNames) != 0 { + klog.V(4).Infof("cr template of %s uses dns names=%#+v", cfg.ComponentName, newDNSNames) + dnsNames = newDNSNames + } + if err != nil { + klog.Errorf("failed to get dns names for %s when preparing cr template, %v", cfg.ComponentName, err) + return nil + } + } + return &x509.CertificateRequest{ + Subject: pkix.Name{ + CommonName: cfg.CommonName, + Organization: cfg.Organizations, + }, + DNSNames: dnsNames, + IPAddresses: ips, + } + } + + usages := []certificatesv1.KeyUsage{ + certificatesv1.UsageKeyEncipherment, + certificatesv1.UsageDigitalSignature, + } + if cfg.ForServerUsage { + usages = append(usages, certificatesv1.UsageServerAuth) + } else { + usages = append(usages, certificatesv1.UsageClientAuth) + } + + return certificate.NewManager(&certificate.Config{ + ClientsetFn: f.clientsetFn, + SignerName: cfg.SignerName, + GetTemplate: getTemplate, + Usages: usages, + CertificateStore: f.fileStore, + Logf: klog.Infof, + }) +} + +func IsNil(i interface{}) bool { + if i == nil { + return true + } + + switch reflect.TypeOf(i).Kind() { + case reflect.Ptr, reflect.Slice, reflect.Array, reflect.Chan, reflect.Map: + return reflect.ValueOf(i).IsNil() + } + return false +} diff --git a/pkg/utils/certmanager/pki.go b/pkg/utils/certmanager/pki.go new file mode 100644 index 0000000..d114c8e --- /dev/null +++ b/pkg/utils/certmanager/pki.go @@ -0,0 +1,187 @@ +/* +Copyright 2020 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package certmanager + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net" + "os" + + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/certificate" +) + +// GenTLSConfigUseCurrentCertAndCertPool generates a TLS configuration +// using the given current certificate and x509 CertPool +func GenTLSConfigUseCurrentCertAndCertPool( + current func() *tls.Certificate, + root *x509.CertPool, + mode string) (*tls.Config, error) { + tlsConfig := &tls.Config{ + // Can't use SSLv3 because of POODLE and BEAST + // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher + // Can't use TLSv1.1 because of RC4 cipher usage + MinVersion: tls.VersionTLS12, + } + + switch mode { + case "server": + tlsConfig.ClientCAs = root + tlsConfig.ClientAuth = tls.VerifyClientCertIfGiven + if current != nil { + tlsConfig.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) { + cert := current() + if cert == nil { + return &tls.Certificate{Certificate: nil}, nil + } + return cert, nil + } + } + case "client": + tlsConfig.RootCAs = root + if current != nil { + tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert := current() + if cert == nil { + return &tls.Certificate{Certificate: nil}, nil + } + return cert, nil + } + } + default: + return nil, fmt.Errorf("unsupported cert manager mode(only server or client), %s", mode) + } + + return tlsConfig, nil +} + +// GenRootCertPool generates a x509 CertPool based on the given kubeconfig, +// if the kubeConfig is empty, it will creates the CertPool using the CA file +func GenRootCertPool(kubeConfig, caFile string) (*x509.CertPool, error) { + if kubeConfig != "" { + // kubeconfig is given, generate the clientset based on it + if _, err := os.Stat(kubeConfig); os.IsNotExist(err) { + return nil, err + } + + // load the root ca from the given kubeconfig file + config, err := clientcmd.LoadFromFile(kubeConfig) + if err != nil || config == nil { + return nil, fmt.Errorf("failed to load the kubeconfig file(%s), %w", + kubeConfig, err) + } + + if len(config.CurrentContext) == 0 { + return nil, fmt.Errorf("'current context' is not set in %s", + kubeConfig) + } + + ctx, ok := config.Contexts[config.CurrentContext] + if !ok || ctx == nil { + return nil, fmt.Errorf("'current context(%s)' is not found in %s", + config.CurrentContext, kubeConfig) + } + + cluster, ok := config.Clusters[ctx.Cluster] + if !ok || cluster == nil { + return nil, fmt.Errorf("'cluster(%s)' is not found in %s", + ctx.Cluster, kubeConfig) + } + + if len(cluster.CertificateAuthorityData) == 0 { + return nil, fmt.Errorf("'certificate authority data of the cluster(%s) is not set in %s", + ctx.Cluster, kubeConfig) + } + + rootCertPool := x509.NewCertPool() + rootCertPool.AppendCertsFromPEM(cluster.CertificateAuthorityData) + return rootCertPool, nil + } + + // kubeConfig is missing, generate the cluster root ca based on the given ca file + return GenCertPoolUseCA(caFile) +} + +// GenTLSConfigUseCertMgrAndCA generates a TLS configuration based on the +// given certificate manager and the CA file +func GenTLSConfigUseCertMgrAndCA( + m certificate.Manager, + serverAddr, caFile string) (*tls.Config, error) { + root, err := GenCertPoolUseCA(caFile) + if err != nil { + return nil, err + } + + host, _, err := net.SplitHostPort(serverAddr) + if err != nil { + return nil, err + } + + tlsConfig := &tls.Config{ + // Can't use SSLv3 because of POODLE and BEAST + // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher + // Can't use TLSv1.1 because of RC4 cipher usage + MinVersion: tls.VersionTLS12, + ServerName: host, + RootCAs: root, + } + + tlsConfig.GetClientCertificate = + func(*tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert := m.Current() + if cert == nil { + return &tls.Certificate{Certificate: nil}, nil + } + return cert, nil + } + tlsConfig.GetCertificate = + func(*tls.ClientHelloInfo) (*tls.Certificate, error) { + cert := m.Current() + if cert == nil { + return &tls.Certificate{Certificate: nil}, nil + } + return cert, nil + } + + return tlsConfig, nil +} + +// GenCertPoolUseCA generates a x509 CertPool based on the given CA file +func GenCertPoolUseCA(caFile string) (*x509.CertPool, error) { + if caFile == "" { + return nil, errors.New("CA file is not set") + } + + if _, err := os.Stat(caFile); err != nil { + if os.IsNotExist(err) { + return nil, fmt.Errorf("CA file(%s) doesn't exist", caFile) + } + return nil, fmt.Errorf("fail to stat the CA file(%s): %w", caFile, err) + } + + caData, err := os.ReadFile(caFile) + if err != nil { + return nil, err + } + + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(caData) + return certPool, nil +} diff --git a/pkg/utils/certmanager/pki_test.go b/pkg/utils/certmanager/pki_test.go new file mode 100644 index 0000000..800fac3 --- /dev/null +++ b/pkg/utils/certmanager/pki_test.go @@ -0,0 +1,347 @@ +/* +Copyright 2020 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package certmanager + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "fmt" + "reflect" + "testing" + "time" + + certificatesv1 "k8s.io/api/certificates/v1" + "k8s.io/client-go/util/certificate" +) + +const ( + failed = "\u2717" + succeed = "\u2713" +) + +type certificateData struct { + keyPEM []byte + certificatePEM []byte + certificate *tls.Certificate +} + +func newCertificateData(certificatePEM string, keyPEM string) *certificateData { + certificate, err := tls.X509KeyPair([]byte(certificatePEM), []byte(keyPEM)) + if err != nil { + panic(fmt.Sprintf("Unable to initialize certificate: %v", err)) + } + certs, err := x509.ParseCertificates(certificate.Certificate[0]) + if err != nil { + panic(fmt.Sprintf("Unable to initialize certificate leaf: %v", err)) + } + certificate.Leaf = certs[0] + return &certificateData{ + keyPEM: []byte(keyPEM), + certificatePEM: []byte(certificatePEM), + certificate: &certificate, + } +} + +var storeCertData = newCertificateData(`-----BEGIN CERTIFICATE----- +MIICRzCCAfGgAwIBAgIJALMb7ecMIk3MMA0GCSqGSIb3DQEBCwUAMH4xCzAJBgNV +BAYTAkdCMQ8wDQYDVQQIDAZMb25kb24xDzANBgNVBAcMBkxvbmRvbjEYMBYGA1UE +CgwPR2xvYmFsIFNlY3VyaXR5MRYwFAYDVQQLDA1JVCBEZXBhcnRtZW50MRswGQYD +VQQDDBJ0ZXN0LWNlcnRpZmljYXRlLTAwIBcNMTcwNDI2MjMyNjUyWhgPMjExNzA0 +MDIyMzI2NTJaMH4xCzAJBgNVBAYTAkdCMQ8wDQYDVQQIDAZMb25kb24xDzANBgNV +BAcMBkxvbmRvbjEYMBYGA1UECgwPR2xvYmFsIFNlY3VyaXR5MRYwFAYDVQQLDA1J +VCBEZXBhcnRtZW50MRswGQYDVQQDDBJ0ZXN0LWNlcnRpZmljYXRlLTAwXDANBgkq +hkiG9w0BAQEFAANLADBIAkEAtBMa7NWpv3BVlKTCPGO/LEsguKqWHBtKzweMY2CV +tAL1rQm913huhxF9w+ai76KQ3MHK5IVnLJjYYA5MzP2H5QIDAQABo1AwTjAdBgNV +HQ4EFgQU22iy8aWkNSxv0nBxFxerfsvnZVMwHwYDVR0jBBgwFoAU22iy8aWkNSxv +0nBxFxerfsvnZVMwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAANBAEOefGbV +NcHxklaW06w6OBYJPwpIhCVozC1qdxGX1dg8VkEKzjOzjgqVD30m59OFmSlBmHsl +nkVA6wyOSDYBf3o= +-----END CERTIFICATE-----`, `-----BEGIN RSA PRIVATE KEY----- +MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAtBMa7NWpv3BVlKTC +PGO/LEsguKqWHBtKzweMY2CVtAL1rQm913huhxF9w+ai76KQ3MHK5IVnLJjYYA5M +zP2H5QIDAQABAkAS9BfXab3OKpK3bIgNNyp+DQJKrZnTJ4Q+OjsqkpXvNltPJosf +G8GsiKu/vAt4HGqI3eU77NvRI+mL4MnHRmXBAiEA3qM4FAtKSRBbcJzPxxLEUSwg +XSCcosCktbkXvpYrS30CIQDPDxgqlwDEJQ0uKuHkZI38/SPWWqfUmkecwlbpXABK +iQIgZX08DA8VfvcA5/Xj1Zjdey9FVY6POLXen6RPiabE97UCICp6eUW7ht+2jjar +e35EltCRCjoejRHTuN9TC0uCoVipAiAXaJIx/Q47vGwiw6Y8KXsNU6y54gTbOSxX +54LzHNk/+Q== +-----END RSA PRIVATE KEY-----`) +var bootstrapCertData = newCertificateData( + `-----BEGIN CERTIFICATE----- +MIICRzCCAfGgAwIBAgIJANXr+UzRFq4TMA0GCSqGSIb3DQEBCwUAMH4xCzAJBgNV +BAYTAkdCMQ8wDQYDVQQIDAZMb25kb24xDzANBgNVBAcMBkxvbmRvbjEYMBYGA1UE +CgwPR2xvYmFsIFNlY3VyaXR5MRYwFAYDVQQLDA1JVCBEZXBhcnRtZW50MRswGQYD +VQQDDBJ0ZXN0LWNlcnRpZmljYXRlLTEwIBcNMTcwNDI2MjMyNzMyWhgPMjExNzA0 +MDIyMzI3MzJaMH4xCzAJBgNVBAYTAkdCMQ8wDQYDVQQIDAZMb25kb24xDzANBgNV +BAcMBkxvbmRvbjEYMBYGA1UECgwPR2xvYmFsIFNlY3VyaXR5MRYwFAYDVQQLDA1J +VCBEZXBhcnRtZW50MRswGQYDVQQDDBJ0ZXN0LWNlcnRpZmljYXRlLTEwXDANBgkq +hkiG9w0BAQEFAANLADBIAkEAqvbkN4RShH1rL37JFp4fZPnn0JUhVWWsrP8NOomJ +pXdBDUMGWuEQIsZ1Gf9JrCQLu6ooRyHSKRFpAVbMQ3ABJwIDAQABo1AwTjAdBgNV +HQ4EFgQUEGBc6YYheEZ/5MhwqSUYYPYRj2MwHwYDVR0jBBgwFoAUEGBc6YYheEZ/ +5MhwqSUYYPYRj2MwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAANBAIyNmznk +5dgJY52FppEEcfQRdS5k4XFPc22SHPcz77AHf5oWZ1WG9VezOZZPp8NCiFDDlDL8 +yma33a5eMyTjLD8= +-----END CERTIFICATE-----`, `-----BEGIN RSA PRIVATE KEY----- +MIIBVAIBADANBgkqhkiG9w0BAQEFAASCAT4wggE6AgEAAkEAqvbkN4RShH1rL37J +Fp4fZPnn0JUhVWWsrP8NOomJpXdBDUMGWuEQIsZ1Gf9JrCQLu6ooRyHSKRFpAVbM +Q3ABJwIDAQABAkBC2OBpGLMPHN8BJijIUDFkURakBvuOoX+/8MYiYk7QxEmfLCk6 +L6r+GLNFMfXwXcBmXtMKfZKAIKutKf098JaBAiEA10azfqt3G/5owrNA00plSyT6 +ZmHPzY9Uq1p/QTR/uOcCIQDLTkfBkLHm0UKeobbO/fSm6ZflhyBRDINy4FvwmZMt +wQIgYV/tmQJeIh91q3wBepFQOClFykG8CTMoDUol/YyNqUkCIHfp6Rr7fGL3JIMq +QQgf9DCK8SPZqq8DYXjdan0kKBJBAiEAyDb+07o2gpggo8BYUKSaiRCiyXfaq87f +eVqgpBq/QN4= +-----END RSA PRIVATE KEY-----`) +var apiServerCertData = newCertificateData( + `-----BEGIN CERTIFICATE----- +MIICRzCCAfGgAwIBAgIJAIydTIADd+yqMA0GCSqGSIb3DQEBCwUAMH4xCzAJBgNV +BAYTAkdCMQ8wDQYDVQQIDAZMb25kb24xDzANBgNVBAcMBkxvbmRvbjEYMBYGA1UE +CgwPR2xvYmFsIFNlY3VyaXR5MRYwFAYDVQQLDA1JVCBEZXBhcnRtZW50MRswGQYD +VQQDDBJ0ZXN0LWNlcnRpZmljYXRlLTIwIBcNMTcwNDI2MjMyNDU4WhgPMjExNzA0 +MDIyMzI0NThaMH4xCzAJBgNVBAYTAkdCMQ8wDQYDVQQIDAZMb25kb24xDzANBgNV +BAcMBkxvbmRvbjEYMBYGA1UECgwPR2xvYmFsIFNlY3VyaXR5MRYwFAYDVQQLDA1J +VCBEZXBhcnRtZW50MRswGQYDVQQDDBJ0ZXN0LWNlcnRpZmljYXRlLTIwXDANBgkq +hkiG9w0BAQEFAANLADBIAkEAuiRet28DV68Dk4A8eqCaqgXmymamUEjW/DxvIQqH +3lbhtm8BwSnS9wUAajSLSWiq3fci2RbRgaSPjUrnbOHCLQIDAQABo1AwTjAdBgNV +HQ4EFgQU0vhI4OPGEOqT+VAWwxdhVvcmgdIwHwYDVR0jBBgwFoAU0vhI4OPGEOqT ++VAWwxdhVvcmgdIwDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAANBALNeJGDe +nV5cXbp9W1bC12Tc8nnNXn4ypLE2JTQAvyp51zoZ8hQoSnRVx/VCY55Yu+br8gQZ ++tW+O/PoE7B3tuY= +-----END CERTIFICATE-----`, `-----BEGIN RSA PRIVATE KEY----- +MIIBVgIBADANBgkqhkiG9w0BAQEFAASCAUAwggE8AgEAAkEAuiRet28DV68Dk4A8 +eqCaqgXmymamUEjW/DxvIQqH3lbhtm8BwSnS9wUAajSLSWiq3fci2RbRgaSPjUrn +bOHCLQIDAQABAkEArDR1g9IqD3aUImNikDgAngbzqpAokOGyMoxeavzpEaFOgCzi +gi7HF7yHRmZkUt8CzdEvnHSqRjFuaaB0gGA+AQIhAOc8Z1h8ElLRSqaZGgI3jCTp +Izx9HNY//U5NGrXD2+ttAiEAzhOqkqI4+nDab7FpiD7MXI6fO549mEXeVBPvPtsS +OcECIQCIfkpOm+ZBBpO3JXaJynoqK4gGI6ALA/ik6LSUiIlfPQIhAISjd9hlfZME +bDQT1r8Q3Gx+h9LRqQeHgPBQ3F5ylqqBAiBaJ0hkYvrIdWxNlcLqD3065bJpHQ4S +WQkuZUQN1M/Xvg== +-----END RSA PRIVATE KEY-----`) + +type fakeStore struct { + cert *tls.Certificate +} + +func (s *fakeStore) Current() (*tls.Certificate, error) { + if s.cert == nil { + noKeyErr := certificate.NoCertKeyError("") + return nil, &noKeyErr + } + return s.cert, nil +} + +// Accepts the PEM data for the cert/key pair and makes the new cert/key +// pair the 'current' pair, that will be returned by future calls to +// Current(). +func (s *fakeStore) Update(certPEM, keyPEM []byte) (*tls.Certificate, error) { + // In order to make the mocking work, whenever a cert/key pair is passed in + // to be updated in the mock store, assume that the certificate manager + // generated the key, and then asked the mock CertificateSigningRequest API + // to sign it, then the faked API returned a canned response. The canned + // signing response will not match the generated key. In order to make + // things work out, search here for the correct matching key and use that + // instead of the passed in key. That way this file of test code doesn't + // have to implement an actual certificate signing process. + for _, tc := range []*certificateData{storeCertData, bootstrapCertData, apiServerCertData} { + if bytes.Equal(tc.certificatePEM, certPEM) { + keyPEM = tc.keyPEM + } + } + cert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, err + } + now := time.Now() + s.cert = &cert + s.cert.Leaf = &x509.Certificate{ + NotBefore: now.Add(-24 * time.Hour), + NotAfter: now.Add(24 * time.Hour), + } + return s.cert, nil +} + +func TestGenTLSConfigUseCurrentCertAndCertPool(t *testing.T) { + store := &fakeStore{ + cert: storeCertData.certificate, + } + cm, err := certificate.NewManager(&certificate.Config{ + Template: &x509.CertificateRequest{}, + Usages: []certificatesv1.KeyUsage{}, + CertificateStore: store, + }) + if err != nil { + t.Fatalf("Failed to initialize the certificate manager: %v", err) + } + + tests := []struct { + name string + m certificate.Manager + root *x509.CertPool + mode string + expect error + }{ + { + "server mode", + cm, + &x509.CertPool{}, + "server", + nil, + }, + { + "client mode", + cm, + &x509.CertPool{}, + "client", + nil, + }, + { + "default mode", + cm, + &x509.CertPool{}, + "ran", + fmt.Errorf("unsupported cert manager mode(only server or client), ran"), + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + t.Parallel() + t.Logf("\tTestCase: %s", tt.name) + { + _, get := GenTLSConfigUseCurrentCertAndCertPool(tt.m.Current, tt.root, tt.mode) + + if !reflect.DeepEqual(get, tt.expect) { + t.Fatalf("\t%s\texpect %v, but get %v", failed, tt.expect, get) + } + t.Logf("\t%s\texpect %v, get %v", succeed, tt.expect, get) + + } + } + t.Run(tt.name, tf) + } +} + +func TestGenRootCertPool(t *testing.T) { + tests := []struct { + name string + kubeConfig string + caFile string + expect *x509.CertPool + }{ + { + "empty kubeconfig caFile", + "", + "", + nil, + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + t.Parallel() + t.Logf("\tTestCase: %s", tt.name) + { + get, _ := GenRootCertPool(tt.kubeConfig, tt.caFile) + + if !reflect.DeepEqual(tt.expect, get) { + t.Fatalf("\t%s\texpect %v, but get %v", failed, tt.expect, get) + } + t.Logf("\t%s\texpect %v, get %v", succeed, tt.expect, get) + + } + } + t.Run(tt.name, tf) + } +} + +func TestGenTLSConfigUseCertMgrAndCA(t *testing.T) { + store := &fakeStore{ + cert: storeCertData.certificate, + } + cm, err := certificate.NewManager(&certificate.Config{ + Template: &x509.CertificateRequest{}, + Usages: []certificatesv1.KeyUsage{}, + CertificateStore: store, + }) + if err != nil { + t.Fatalf("Failed to initialize the certificate manager: %v", err) + } + + tests := []struct { + name string + m certificate.Manager + serverAddr string + caFile string + expect *tls.Config + }{ + { + "empty caFile", + cm, + "127.0.0.1:80", + "", + nil, + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + t.Parallel() + t.Logf("\tTestCase: %s", tt.name) + { + get, _ := GenTLSConfigUseCertMgrAndCA(tt.m, tt.serverAddr, tt.caFile) + + if !reflect.DeepEqual(tt.expect, get) { + t.Fatalf("\t%s\texpect %v, but get %v", failed, tt.expect, get) + } + t.Logf("\t%s\texpect %v, get %v", succeed, tt.expect, get) + + } + } + t.Run(tt.name, tf) + } +} + +func TestGenCertPoolUseCA(t *testing.T) { + tests := []struct { + name string + caFile string + expect *x509.CertPool + }{ + { + "empty caFIle", + "", + nil, + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + t.Parallel() + t.Logf("\tTestCase: %s", tt.name) + { + get, _ := GenCertPoolUseCA(tt.caFile) + + if !reflect.DeepEqual(tt.expect, get) { + t.Fatalf("\t%s\texpect %v, but get %v", failed, tt.expect, get) + } + t.Logf("\t%s\texpect %v, get %v", succeed, tt.expect, get) + + } + } + t.Run(tt.name, tf) + } +} diff --git a/pkg/utils/certmanager/store/filestore_wrapper.go b/pkg/utils/certmanager/store/filestore_wrapper.go new file mode 100644 index 0000000..542fdde --- /dev/null +++ b/pkg/utils/certmanager/store/filestore_wrapper.go @@ -0,0 +1,54 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "crypto/tls" + + "k8s.io/client-go/util/certificate" + "k8s.io/klog/v2" +) + +// fileStoreWrapper is a wrapper for "k8s.io/client-go/util/certificate#FileStore" +// This wrapper increases tolerance for unexpected situations and is more robust. +type fileStoreWrapper struct { + certificate.FileStore +} + +// NewFileStoreWrapper returns a wrapper for "k8s.io/client-go/util/certificate#FileStore" +// This wrapper increases tolerance for unexpected situations and is more robust. +func NewFileStoreWrapper(pairNamePrefix, certDirectory, keyDirectory, certFile, keyFile string) (certificate.FileStore, error) { + fileStore, err := certificate.NewFileStore(pairNamePrefix, certDirectory, keyDirectory, certFile, keyFile) + if err != nil { + return nil, err + } + return &fileStoreWrapper{ + FileStore: fileStore, + }, nil +} + +func (s *fileStoreWrapper) Current() (*tls.Certificate, error) { + cert, err := s.FileStore.Current() + // If an error occurs, just return the NoCertKeyError. + // The cert-manager will regenerate the related certificates when it receives the NoCertKeyError. + if err != nil { + klog.Warningf("unexpected error occurred when loading the certificate: %v, will regenerate it", err) + noCertKeyErr := certificate.NoCertKeyError("NO_VALID_CERT") + return nil, &noCertKeyErr + } + return cert, nil +} diff --git a/pkg/utils/certmanager/store/filestore_wrapper_test.go b/pkg/utils/certmanager/store/filestore_wrapper_test.go new file mode 100644 index 0000000..b5b7f99 --- /dev/null +++ b/pkg/utils/certmanager/store/filestore_wrapper_test.go @@ -0,0 +1,105 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "reflect" + "testing" + + "k8s.io/client-go/util/certificate" +) + +const ( + failed = "\u2717" + succeed = "\u2713" +) + +var fw, _ = NewFileStoreWrapper("", "", "", "", "") + +func TestNewFileStoreWrapper(t *testing.T) { + tests := []struct { + name string + pairNamePrefix string + certDirectory string + keyDirectory string + certFile string + keyFile string + expect error + }{ + { + "normal", + "", + "", + "", + "", + "", + nil, + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + t.Parallel() + t.Logf("\tTestCase: %s", tt.name) + { + _, get := NewFileStoreWrapper( + tt.pairNamePrefix, + tt.certDirectory, + tt.keyDirectory, + tt.certFile, + tt.keyFile) + + if !reflect.DeepEqual(get, tt.expect) { + t.Fatalf("\t%s\texpect %v, but get %v", failed, tt.expect, get) + } + t.Logf("\t%s\texpect %v, get %v", succeed, tt.expect, get) + + } + } + t.Run(tt.name, tf) + } +} + +func TestCurrent(t *testing.T) { + noCertKeyErr := certificate.NoCertKeyError("NO_VALID_CERT") + tests := []struct { + name string + expect error + }{ + { + "normal", + &noCertKeyErr, + }, + } + + for _, tt := range tests { + tf := func(t *testing.T) { + t.Parallel() + t.Logf("\tTestCase: %s", tt.name) + { + _, get := fw.Current() + + if !reflect.DeepEqual(get, tt.expect) { + t.Fatalf("\t%s\texpect %v, but get %v", failed, tt.expect, get) + } + t.Logf("\t%s\texpect %v, get %v", succeed, tt.expect, get) + + } + } + t.Run(tt.name, tf) + } +} diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index a7b5d0e..a4b59fd 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -43,6 +43,12 @@ const ( RavenProxyServerForwardLocalMode = "Local" RavenProxyServerForwardRemoteMode = "Remote" - WorkingNamespace = "kube-system" - RavenConfigName = "raven-cfg" + WorkingNamespace = "kube-system" + RavenConfigName = "raven-cfg" + RavenEnableProxy = "enable-l7-proxy" + RavenEnableTunnel = "enable-l3-tunnel" + + GatewayProxyInternalService = "x-raven-proxy-internal-svc" + LabelCurrentGatewayEndpoints = "raven.openyurt.io/endpoints-name" + LabelCurrentGatewayType = "raven.openyurt.io/gateway-type" ) diff --git a/pkg/utils/metaserver.go b/pkg/utils/metaserver.go index b5cfb22..d9f24d6 100644 --- a/pkg/utils/metaserver.go +++ b/pkg/utils/metaserver.go @@ -19,11 +19,11 @@ package utils import ( "context" "net/http" + "net/http/pprof" "k8s.io/klog/v2" "github.com/gorilla/mux" - "github.com/openyurtio/openyurt/pkg/util/profile" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -34,7 +34,7 @@ func RunMetaServer(ctx context.Context, addr string) { muxHandler := mux.NewRouter() muxHandler.Handle("/metrics", promhttp.Handler()) // register handler for pprof - profile.Install(muxHandler) + Install(muxHandler) metaServer := &http.Server{ Addr: addr, Handler: muxHandler, @@ -42,7 +42,7 @@ func RunMetaServer(ctx context.Context, addr string) { } go func(ctx context.Context) { <-ctx.Done() - err := metaServer.Shutdown(ctx) + err := metaServer.Shutdown(context.TODO()) if err != nil { klog.Errorf("failed to shutdown meta server, error %s", err.Error()) } @@ -53,3 +53,19 @@ func RunMetaServer(ctx context.Context, addr string) { } }(ctx) } + +// Install adds the Profiling webservice to the given mux. +func Install(c *mux.Router) { + c.HandleFunc("/debug/pprof/profile", pprof.Profile) + c.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + c.HandleFunc("/debug/pprof/trace", pprof.Trace) + c.HandleFunc("/debug/pprof", redirectTo("/debug/pprof/")) + c.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index) +} + +// redirectTo redirects request to a certain destination. +func redirectTo(to string) func(http.ResponseWriter, *http.Request) { + return func(rw http.ResponseWriter, req *http.Request) { + http.Redirect(rw, req, to, http.StatusFound) + } +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index ed4b427..d625de1 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -20,7 +20,7 @@ import ( "context" "fmt" - "github.com/openyurtio/openyurt/pkg/apis/raven/v1beta1" + "github.com/openyurtio/api/raven/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" )