Skip to content

Commit

Permalink
add regular sync raven rules
Browse files Browse the repository at this point in the history
  • Loading branch information
珩轩 committed May 24, 2024
1 parent 2c4a314 commit 820f971
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 25 deletions.
2 changes: 2 additions & 0 deletions charts/raven-agent/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
- --vpn-bind-port={{.Values.vpn.tunnelAddr}}
- --keep-alive-interval={{.Values.vpn.keepAliveInterval}}
- --keep-alive-timeout={{.Values.vpn.keepAliveTimeout}}
- --sync-raven-rules={{.Values.sync.syncRule}}
- --sync-raven-rules-period={{.Values.sync.syncPeriod}}
- --proxy-metric-bind-addr={{.Values.proxy.metricsBindAddr}}
- --proxy-internal-secure-addr={{.Values.proxy.internalSecureAddr}}
- --proxy-internal-insecure-addr={{.Values.proxy.internalInsecureAddr}}
Expand Down
3 changes: 3 additions & 0 deletions charts/raven-agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ containerEnv:
secretKeyRef:
key: vpn-connection-psk
name: raven-agent-secret
sync:
syncRule: true
syncPeriod: 30m

vpn:
driver: libreswan
Expand Down
8 changes: 6 additions & 2 deletions cmd/agent/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package config

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// Config is the main context object for raven agent
type Config struct {
NodeName string
NodeIP string
NodeName string
NodeIP string
SyncRules bool
SyncPeriod metav1.Duration

MetricsBindAddress string
HealthProbeAddr string

Expand Down
17 changes: 15 additions & 2 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,6 +51,8 @@ type AgentOptions struct {
Kubeconfig string
MetricsBindAddress string
HealthProbeAddr string
SyncRules bool
SyncPeriod metav1.Duration
}

type TunnelOptions struct {
Expand Down Expand Up @@ -91,6 +94,12 @@ func (o *AgentOptions) Validate() error {
}
}
}
if o.SyncPeriod.Duration < time.Minute {
o.SyncPeriod.Duration = time.Minute
}
if o.SyncPeriod.Duration > 24*time.Hour {
o.SyncPeriod.Duration = 24 * time.Hour
}
return nil
}

Expand All @@ -103,6 +112,8 @@ func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.RouteDriver, "route-driver", o.RouteDriver, `The Route driver name. (default "vxlan")`)
fs.StringVar(&o.MetricsBindAddress, "metric-bind-addr", o.MetricsBindAddress, `Binding address of tunnel metrics. (default ":10265")`)
fs.StringVar(&o.HealthProbeAddr, "health-probe-addr", o.HealthProbeAddr, `The address the healthz/readyz endpoint binds to.. (default ":10275")`)
fs.BoolVar(&o.SyncRules, "sync-raven-rules", true, "Whether to synchronize raven rules regularly")
fs.DurationVar(&o.SyncPeriod.Duration, "sync-raven-rules-period", 30*time.Minute, "The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute and the maximum value is 24 hour")

fs.StringVar(&o.VPNPort, "vpn-bind-port", o.VPNPort, `Binding port of vpn. (default ":4500")`)
fs.BoolVar(&o.NATTraversal, "nat-traversal", o.NATTraversal, `Enable NAT Traversal or not. (default "false")`)
Expand Down Expand Up @@ -141,8 +152,10 @@ func (o *AgentOptions) Config() (*config.Config, error) {
}
cfg = restclient.AddUserAgent(cfg, "raven-agent-ds")
c := &config.Config{
NodeName: o.NodeName,
NodeIP: o.NodeIP,
NodeName: o.NodeName,
NodeIP: o.NodeIP,
SyncRules: o.SyncRules,
SyncPeriod: o.SyncPeriod,
}
c.KubeConfig = cfg
c.MetricsBindAddress = resolveAddress(c.MetricsBindAddress, resolveLocalHost(), strconv.Itoa(DefaultTunnelMetricsPort))
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var GitCommit string
func main() {
klog.InitFlags(nil)
defer klog.Flush()
rand.Seed(time.Now().UnixNano())
rand.NewSource(time.Now().UnixNano())
klog.Infof("component: %s, git commit: %s\n", "raven-agent-ds", GitCommit)
cmd := app.NewRavenAgentCommand(server.SetupSignalContext())
cmd.Flags().AddGoFlagSet(flag.CommandLine)
Expand Down
57 changes: 38 additions & 19 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -22,26 +23,31 @@ import (
)

type Engine struct {
nodeName string
nodeIP string
context context.Context
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface
nodeName string
nodeIP string
syncRules bool
syncPeriod metav1.Duration

context context.Context
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface

tunnel *TunnelEngine
proxy *ProxyEngine
}

func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
engine := &Engine{
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
syncRules: cfg.SyncRules,
syncPeriod: cfg.SyncPeriod,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
}
err := ctrl.NewControllerManagedBy(engine.manager).
For(&v1beta1.Gateway{}, builder.WithPredicates(predicate.Funcs{
Expand Down Expand Up @@ -91,6 +97,9 @@ func (e *Engine) Start() {
}
}()
go wait.Until(e.worker, time.Second, e.context.Done())
if e.syncRules {
go wait.Until(e.regularSync, e.syncPeriod.Duration, e.context.Done())
}
<-e.context.Done()
e.cleanup()
}
Expand All @@ -110,19 +119,29 @@ func (e *Engine) processNextWorkItem() bool {
return false
}
defer e.queue.Done(gw)
e.findLocalGateway()
err := e.tunnel.Handler()
err := e.sync()
if err != nil {
e.handleEventErr(err, gw)
}
e.option.SetTunnelStatus(e.tunnel.Status())
return true
}

err = e.proxy.Handler()
func (e *Engine) sync() error {
e.findLocalGateway()
err := e.proxy.Handler()
if err != nil {
e.handleEventErr(err, gw)
return err
}
err = e.tunnel.Handler()
if err != nil {
return err
}
e.option.SetTunnelStatus(e.tunnel.Status())
return nil
}

return true
func (e *Engine) regularSync() {
e.queue.Add(&v1beta1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: "gw-sync"}})
}

func (e *Engine) findLocalGateway() {
Expand Down
1 change: 1 addition & 0 deletions pkg/engine/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (p *ProxyEngine) startProxyClient() error {
err = pc.Start(ctx)
if err != nil {
klog.Errorf("failed to start proxy client, error %s", err.Error())
return err
}
p.proxyOption.SetClientStatus(true)
return nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/networkengine/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ func ApplyRules(current, desired map[string]*netlink.Rule) (err error) {
}
}
// add expect ip rules
for _, v := range desired {
for k, v := range desired {
_, ok := current[k]
if ok {
continue
}
klog.InfoS("adding rule", "src", v.Src, "lookup", v.Table)
err = netlinkutil.RuleAdd(v)
errList = errList.Append(err)
Expand Down

0 comments on commit 820f971

Please sign in to comment.