Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add sync #173

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions charts/raven-agent/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
- --vpn-bind-port={{.Values.vpn.tunnelAddr}}
- --keep-alive-interval={{.Values.vpn.keepAliveInterval}}
- --keep-alive-timeout={{.Values.vpn.keepAliveTimeout}}
- --sync-raven-rules={{.Values.sync.syncRule}}
- --sync-raven-rules-period={{.Values.sync.syncPeriod}}
- --proxy-metric-bind-addr={{.Values.proxy.metricsBindAddr}}
- --proxy-internal-secure-addr={{.Values.proxy.internalSecureAddr}}
- --proxy-internal-insecure-addr={{.Values.proxy.internalInsecureAddr}}
Expand Down
5 changes: 4 additions & 1 deletion charts/raven-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ containerEnv:
secretKeyRef:
key: vpn-connection-psk
name: raven-agent-secret
sync:
syncRule: true
syncPeriod: 30m

vpn:
driver: libreswan
Expand Down Expand Up @@ -86,4 +89,4 @@ proxy:
metricsBindAddr: ":10266"

rollingUpdate:
maxUnavailable: 5%
maxUnavailable: 20%
8 changes: 6 additions & 2 deletions cmd/agent/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package config

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// Config is the main context object for raven agent
type Config struct {
NodeName string
NodeIP string
NodeName string
NodeIP string
SyncRules bool
SyncPeriod metav1.Duration

MetricsBindAddress string
HealthProbeAddr string

Expand Down
17 changes: 15 additions & 2 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,6 +51,8 @@ type AgentOptions struct {
Kubeconfig string
MetricsBindAddress string
HealthProbeAddr string
SyncRules bool
SyncPeriod metav1.Duration
}

type TunnelOptions struct {
Expand Down Expand Up @@ -91,6 +94,12 @@ func (o *AgentOptions) Validate() error {
}
}
}
if o.SyncPeriod.Duration < time.Minute {
o.SyncPeriod.Duration = time.Minute
}
if o.SyncPeriod.Duration > 24*time.Hour {
o.SyncPeriod.Duration = 24 * time.Hour
}
return nil
}

Expand All @@ -103,6 +112,8 @@ func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.RouteDriver, "route-driver", o.RouteDriver, `The Route driver name. (default "vxlan")`)
fs.StringVar(&o.MetricsBindAddress, "metric-bind-addr", o.MetricsBindAddress, `Binding address of tunnel metrics. (default ":10265")`)
fs.StringVar(&o.HealthProbeAddr, "health-probe-addr", o.HealthProbeAddr, `The address the healthz/readyz endpoint binds to.. (default ":10275")`)
fs.BoolVar(&o.SyncRules, "sync-raven-rules", true, "Whether to synchronize raven rules regularly")
fs.DurationVar(&o.SyncPeriod.Duration, "sync-raven-rules-period", 10*time.Minute, "The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute and the maximum value is 24 hour")

fs.StringVar(&o.VPNPort, "vpn-bind-port", o.VPNPort, `Binding port of vpn. (default ":4500")`)
fs.BoolVar(&o.NATTraversal, "nat-traversal", o.NATTraversal, `Enable NAT Traversal or not. (default "false")`)
Expand Down Expand Up @@ -141,8 +152,10 @@ func (o *AgentOptions) Config() (*config.Config, error) {
}
cfg = restclient.AddUserAgent(cfg, "raven-agent-ds")
c := &config.Config{
NodeName: o.NodeName,
NodeIP: o.NodeIP,
NodeName: o.NodeName,
NodeIP: o.NodeIP,
SyncRules: o.SyncRules,
SyncPeriod: o.SyncPeriod,
}
c.KubeConfig = cfg
c.MetricsBindAddress = resolveAddress(c.MetricsBindAddress, resolveLocalHost(), strconv.Itoa(DefaultTunnelMetricsPort))
Expand Down
13 changes: 12 additions & 1 deletion cmd/agent/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package app
import (
"context"
"fmt"
"sync"
"time"

"github.com/lorenzosaino/go-sysctl"
"github.com/spf13/cobra"
"k8s.io/klog/v2"

"github.com/openyurtio/raven/cmd/agent/app/config"
"github.com/openyurtio/raven/cmd/agent/app/options"
ravenengine "github.com/openyurtio/raven/pkg/engine"
"github.com/openyurtio/raven/pkg/features"
"github.com/spf13/cobra"
)

// NewRavenAgentCommand creates a new raven agent command
Expand Down Expand Up @@ -70,6 +72,15 @@ func Run(ctx context.Context, cfg *config.CompletedConfig) error {
}
klog.Info("engine successfully start")
engine.Start()
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ctx.Done()
time.Sleep(time.Second)
engine.Cleanup()
wg.Done()
}()
wg.Wait()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var GitCommit string
func main() {
klog.InitFlags(nil)
defer klog.Flush()
rand.Seed(time.Now().UnixNano())
rand.NewSource(time.Now().UnixNano())
klog.Infof("component: %s, git commit: %s\n", "raven-agent-ds", GitCommit)
cmd := app.NewRavenAgentCommand(server.SetupSignalContext())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
Expand Down
84 changes: 49 additions & 35 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand All @@ -22,26 +23,31 @@ import (
)

type Engine struct {
nodeName string
nodeIP string
context context.Context
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface
nodeName string
nodeIP string
syncRules bool
syncPeriod metav1.Duration

context context.Context
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface

tunnel *TunnelEngine
proxy *ProxyEngine
}

func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
engine := &Engine{
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
syncRules: cfg.SyncRules,
syncPeriod: cfg.SyncPeriod,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
}
err := ctrl.NewControllerManagedBy(engine.manager).
For(&v1beta1.Gateway{}, builder.WithPredicates(predicate.Funcs{
Expand All @@ -53,7 +59,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
return reconcile.Result{}, nil
}))
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to new controller with manager, error %s", err.Error()))
klog.Errorf("fail to new controller with manager, error %s", err.Error())
return engine, err
}
engine.client = engine.manager.GetClient()
Expand All @@ -66,7 +72,7 @@ func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
}
err = engine.tunnel.InitDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to init tunnel driver, error %s", err.Error()))
klog.Errorf("fail to init tunnel driver, error %s", err.Error())
return engine, err
}

Expand All @@ -90,9 +96,12 @@ func (e *Engine) Start() {
klog.ErrorS(err, "failed to start engine controller")
}
}()

go wait.Until(e.worker, time.Second, e.context.Done())
<-e.context.Done()
e.cleanup()

if e.syncRules {
go wait.Until(e.regularSync, e.syncPeriod.Duration, e.context.Done())
}
}

func (e *Engine) worker() {
Expand All @@ -110,19 +119,29 @@ func (e *Engine) processNextWorkItem() bool {
return false
}
defer e.queue.Done(gw)
e.findLocalGateway()
err := e.tunnel.Handler()
err := e.sync()
if err != nil {
e.handleEventErr(err, gw)
}
e.option.SetTunnelStatus(e.tunnel.Status())
return true
}

err = e.proxy.Handler()
func (e *Engine) sync() error {
e.findLocalGateway()
err := e.proxy.Handler()
if err != nil {
e.handleEventErr(err, gw)
return err
}
err = e.tunnel.Handler()
if err != nil {
return err
}
e.option.SetTunnelStatus(e.tunnel.Status())
return nil
}

return true
func (e *Engine) regularSync() {
e.queue.Add(&v1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: "gw-sync"}})
}

func (e *Engine) findLocalGateway() {
Expand All @@ -144,12 +163,9 @@ func (e *Engine) findLocalGateway() {
}
}

func (e *Engine) cleanup() {
func (e *Engine) Cleanup() {
if e.option.GetTunnelStatus() {
err := e.tunnel.CleanupDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("failed to cleanup tunnel driver, error %s", err.Error()))
}
e.tunnel.CleanupDriver()
}
if e.option.GetProxyStatus() {
e.proxy.stop()
Expand All @@ -163,18 +179,18 @@ func (e *Engine) handleEventErr(err error, gw *v1beta1.Gateway) {
}

if e.queue.NumRequeues(gw) < utils.MaxRetries {
klog.Info(utils.FormatRavenEngine("error syncing event %s: %s", gw.GetName(), err.Error()))
klog.Infof("error syncing event %s: %s", gw.GetName(), err.Error())
e.queue.AddRateLimited(gw)
return
}
klog.Info(utils.FormatRavenEngine("dropping event %s out of the queue: %s", gw.GetName(), err.Error()))
klog.Infof("dropping event %s out of the queue: %s", gw.GetName(), err.Error())
e.queue.Forget(gw)
}

func (e *Engine) addGateway(evt event.CreateEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("adding gateway %s", gw.GetName()))
klog.Infof("adding gateway %s", gw.GetName())
e.queue.Add(gw.DeepCopy())
}
return ok
Expand All @@ -187,10 +203,8 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
if ok1 && ok2 {
if oldGw.ResourceVersion != newGw.ResourceVersion {
update = true
klog.InfoS(utils.FormatRavenEngine("updating gateway, %s", newGw.GetName()))
klog.Infof("updating gateway, %s", newGw.GetName())
e.queue.Add(newGw.DeepCopy())
} else {
klog.InfoS(utils.FormatRavenEngine("skip handle update gateway"), klog.KObj(newGw))
}
}
return update
Expand All @@ -199,7 +213,7 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
func (e *Engine) deleteGateway(evt event.DeleteEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("deleting gateway, %s", gw.GetName()))
klog.Infof("deleting gateway, %s", gw.GetName())
e.queue.Add(gw.DeepCopy())
}
return ok
Expand Down
Loading
Loading