Skip to content

Commit

Permalink
enhance code quality
Browse files Browse the repository at this point in the history
  • Loading branch information
珩轩 committed May 6, 2024
1 parent ad64b89 commit 4139f39
Show file tree
Hide file tree
Showing 23 changed files with 531 additions and 656 deletions.
8 changes: 4 additions & 4 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (

"github.com/openyurtio/api/raven/v1beta1"
"github.com/openyurtio/raven/cmd/agent/app/config"
"github.com/openyurtio/raven/pkg/networkengine/routedriver/vxlan"
"github.com/openyurtio/raven/pkg/networkengine/vpndriver"
"github.com/openyurtio/raven/pkg/networkengine/vpndriver/libreswan"
"github.com/openyurtio/raven/pkg/networkengine/vpndriver/wireguard"
"github.com/openyurtio/raven/pkg/tunnelengine/routedriver/vxlan"
"github.com/openyurtio/raven/pkg/tunnelengine/vpndriver"
"github.com/openyurtio/raven/pkg/tunnelengine/vpndriver/libreswan"
"github.com/openyurtio/raven/pkg/tunnelengine/vpndriver/wireguard"
"github.com/openyurtio/raven/pkg/utils"
)

Expand Down
8 changes: 5 additions & 3 deletions cmd/agent/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ func NewRavenAgentCommand(ctx context.Context) *cobra.Command {

// Run starts the raven-agent
func Run(ctx context.Context, cfg *config.CompletedConfig) error {
klog.Info("Start raven agent")
defer klog.Info("Stop raven agent")
if err := disableICMPRedirect(); err != nil {
return err
}
if err := disableICMPRpFilter(); err != nil {
return err
}
engine := ravenengine.NewEngine(ctx, cfg.Config)
engine, err := ravenengine.NewEngine(ctx, cfg.Config)
if err != nil {
return err
}
klog.Info("engine successfully start")
engine.Start()
return nil
}
Expand Down
141 changes: 104 additions & 37 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,20 @@ type Engine struct {
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface

tunnelQueue workqueue.RateLimitingInterface
tunnelEngine *TunnelEngine

proxyQueue workqueue.RateLimitingInterface
proxyEngine *ProxyEngine
tunnel *TunnelEngine
proxy *ProxyEngine
}

func NewEngine(ctx context.Context, cfg *config.Config) *Engine {
func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
engine := &Engine{
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
tunnelQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tunnel"),
proxyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Proxy"),
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
}
err := ctrl.NewControllerManagedBy(engine.manager).
For(&v1beta1.Gateway{}, builder.WithPredicates(predicate.Funcs{
Expand All @@ -56,57 +53,129 @@ func NewEngine(ctx context.Context, cfg *config.Config) *Engine {
return reconcile.Result{}, nil
}))
if err != nil {
klog.ErrorS(err, utils.FormatRavenEngine("failed to new raven agent controller with manager"))
klog.Errorf(utils.FormatRavenEngine("fail to new controller with manager, error %s", err.Error()))
return engine, err
}
engine.client = engine.manager.GetClient()
engine.tunnelEngine = newTunnelEngine(cfg, engine.client, engine.option, engine.tunnelQueue)
engine.proxyEngine = newProxyEngine(engine.context, cfg, engine.client, engine.option, engine.proxyQueue)
return engine
engine.tunnel = &TunnelEngine{
nodeName: engine.nodeName,
forwardNodeIP: cfg.Tunnel.ForwardNodeIP,
natTraversal: cfg.Tunnel.NATTraversal,
config: cfg,
ravenClient: engine.client,
}
err = engine.tunnel.InitDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to init tunnel driver, error %s", err.Error()))
return engine, err
}

engine.proxy = &ProxyEngine{
nodeName: engine.nodeName,
nodeIP: engine.nodeIP,
config: cfg,
client: engine.client,
option: engine.option,
ctx: engine.context,
proxyOption: newProxyOption(),
proxyCtx: newProxyContext(ctx),
}
return engine, nil
}

func (e *Engine) Start() {
defer utilruntime.HandleCrash()
klog.Info(utils.FormatRavenEngine("engine successfully start"))
go func() {
if err := e.manager.Start(e.context); err != nil {
klog.ErrorS(err, utils.FormatRavenEngine("failed to start engine controller"))
klog.ErrorS(err, "failed to start engine controller")
}
}()
go wait.Until(e.tunnelEngine.worker, time.Second, e.context.Done())
go wait.Until(e.proxyEngine.worker, time.Second, e.context.Done())
go wait.Until(e.worker, time.Second, e.context.Done())
<-e.context.Done()
e.cleanup()
klog.Info(utils.FormatRavenEngine("engine successfully stop"))
}

func (e *Engine) worker() {
for e.processNextWorkItem() {
}
}

func (e *Engine) processNextWorkItem() bool {
obj, quit := e.queue.Get()
if quit {
return false
}
gw, ok := obj.(*v1beta1.Gateway)
if !ok {
return false
}
defer e.queue.Done(gw)
e.findLocalGateway()
err := e.tunnel.Handler()
if err != nil {
e.handleEventErr(err, gw)
}
e.option.SetTunnelStatus(e.tunnel.Status())

err = e.proxy.Handler()
if err != nil {
e.handleEventErr(err, gw)
}

return true
}

func (e *Engine) findLocalGateway() {
e.tunnel.localGateway = nil
e.proxy.localGateway = nil
var gwList v1beta1.GatewayList
err := e.client.List(context.TODO(), &gwList)
if err != nil {
return
}
for _, gw := range gwList.Items {
for _, node := range gw.Status.Nodes {
if node.NodeName == e.nodeName {
e.tunnel.localGateway = gw.DeepCopy()
e.proxy.localGateway = gw.DeepCopy()
return
}
}
}
}

func (e *Engine) cleanup() {
if e.option.GetTunnelStatus() {
err := e.tunnelEngine.clearDriver()
err := e.tunnel.CleanupDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("failed to cleanup tunnel driver, error %s", err.Error()))
}
}
if e.option.GetProxyStatus() {
e.proxyEngine.stopServers()
e.proxy.stop()
}
}

func (e *Engine) enqueueTunnel(obj *v1beta1.Gateway) {
klog.Info(utils.FormatRavenEngine("enqueue gateway %s to tunnel queue", obj.Name))
e.tunnelQueue.Add(obj)
}
func (e *Engine) handleEventErr(err error, gw *v1beta1.Gateway) {
if err == nil {
e.queue.Forget(gw)
return
}

func (e *Engine) enqueueProxy(obj *v1beta1.Gateway) {
klog.Info(utils.FormatRavenEngine("enqueue gateway %s to proxy queue", obj.Name))
e.proxyQueue.Add(obj)
if e.queue.NumRequeues(gw) < utils.MaxRetries {
klog.Info(utils.FormatRavenEngine("error syncing event %s: %s", gw.GetName(), err.Error()))
e.queue.AddRateLimited(gw)
return
}
klog.Info(utils.FormatRavenEngine("dropping event %s out of the queue: %s", gw.GetName(), err.Error()))
e.queue.Forget(gw)
}

func (e *Engine) addGateway(evt event.CreateEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("adding gateway %s", gw.GetName()))
e.enqueueTunnel(gw.DeepCopy())
e.enqueueProxy(gw.DeepCopy())
e.queue.Add(gw.DeepCopy())
}
return ok
}
Expand All @@ -119,8 +188,7 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
if oldGw.ResourceVersion != newGw.ResourceVersion {
update = true
klog.InfoS(utils.FormatRavenEngine("updating gateway, %s", newGw.GetName()))
e.enqueueTunnel(newGw.DeepCopy())
e.enqueueProxy(newGw.DeepCopy())
e.queue.Add(newGw.DeepCopy())
} else {
klog.InfoS(utils.FormatRavenEngine("skip handle update gateway"), klog.KObj(newGw))
}
Expand All @@ -132,8 +200,7 @@ func (e *Engine) deleteGateway(evt event.DeleteEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("deleting gateway, %s", gw.GetName()))
e.enqueueTunnel(gw.DeepCopy())
e.enqueueProxy(gw.DeepCopy())
e.queue.Add(gw.DeepCopy())
}
return ok
}
Loading

0 comments on commit 4139f39

Please sign in to comment.