Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance code quality #172

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/agent/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ func NewRavenAgentCommand(ctx context.Context) *cobra.Command {

// Run starts the raven-agent
func Run(ctx context.Context, cfg *config.CompletedConfig) error {
klog.Info("Start raven agent")
defer klog.Info("Stop raven agent")
if err := disableICMPRedirect(); err != nil {
return err
}
if err := disableICMPRpFilter(); err != nil {
return err
}
engine := ravenengine.NewEngine(ctx, cfg.Config)
engine, err := ravenengine.NewEngine(ctx, cfg.Config)
if err != nil {
return err
}
klog.Info("engine successfully start")
engine.Start()
return nil
}
Expand Down
141 changes: 104 additions & 37 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,20 @@ type Engine struct {
manager manager.Manager
client client.Client
option *Option
queue workqueue.RateLimitingInterface

tunnelQueue workqueue.RateLimitingInterface
tunnelEngine *TunnelEngine

proxyQueue workqueue.RateLimitingInterface
proxyEngine *ProxyEngine
tunnel *TunnelEngine
proxy *ProxyEngine
}

func NewEngine(ctx context.Context, cfg *config.Config) *Engine {
func NewEngine(ctx context.Context, cfg *config.Config) (*Engine, error) {
engine := &Engine{
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
tunnelQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tunnel"),
proxyQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Proxy"),
nodeName: cfg.NodeName,
nodeIP: cfg.NodeIP,
manager: cfg.Manager,
context: ctx,
option: NewEngineOption(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "raven"),
}
err := ctrl.NewControllerManagedBy(engine.manager).
For(&v1beta1.Gateway{}, builder.WithPredicates(predicate.Funcs{
Expand All @@ -56,57 +53,129 @@ func NewEngine(ctx context.Context, cfg *config.Config) *Engine {
return reconcile.Result{}, nil
}))
if err != nil {
klog.ErrorS(err, utils.FormatRavenEngine("failed to new raven agent controller with manager"))
klog.Errorf(utils.FormatRavenEngine("fail to new controller with manager, error %s", err.Error()))
return engine, err
}
engine.client = engine.manager.GetClient()
engine.tunnelEngine = newTunnelEngine(cfg, engine.client, engine.option, engine.tunnelQueue)
engine.proxyEngine = newProxyEngine(engine.context, cfg, engine.client, engine.option, engine.proxyQueue)
return engine
engine.tunnel = &TunnelEngine{
nodeName: engine.nodeName,
forwardNodeIP: cfg.Tunnel.ForwardNodeIP,
natTraversal: cfg.Tunnel.NATTraversal,
config: cfg,
ravenClient: engine.client,
}
err = engine.tunnel.InitDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("fail to init tunnel driver, error %s", err.Error()))
return engine, err
}

engine.proxy = &ProxyEngine{
nodeName: engine.nodeName,
nodeIP: engine.nodeIP,
config: cfg,
client: engine.client,
option: engine.option,
ctx: engine.context,
proxyOption: newProxyOption(),
proxyCtx: newProxyContext(ctx),
}
return engine, nil
}

func (e *Engine) Start() {
defer utilruntime.HandleCrash()
klog.Info(utils.FormatRavenEngine("engine successfully start"))
go func() {
if err := e.manager.Start(e.context); err != nil {
klog.ErrorS(err, utils.FormatRavenEngine("failed to start engine controller"))
klog.ErrorS(err, "failed to start engine controller")
}
}()
go wait.Until(e.tunnelEngine.worker, time.Second, e.context.Done())
go wait.Until(e.proxyEngine.worker, time.Second, e.context.Done())
go wait.Until(e.worker, time.Second, e.context.Done())
<-e.context.Done()
e.cleanup()
klog.Info(utils.FormatRavenEngine("engine successfully stop"))
}

func (e *Engine) worker() {
for e.processNextWorkItem() {
}
}

func (e *Engine) processNextWorkItem() bool {
obj, quit := e.queue.Get()
if quit {
return false
}
gw, ok := obj.(*v1beta1.Gateway)
if !ok {
return false
}
defer e.queue.Done(gw)
e.findLocalGateway()
err := e.tunnel.Handler()
if err != nil {
e.handleEventErr(err, gw)
}
e.option.SetTunnelStatus(e.tunnel.Status())

err = e.proxy.Handler()
if err != nil {
e.handleEventErr(err, gw)
}

return true
}

func (e *Engine) findLocalGateway() {
e.tunnel.localGateway = nil
e.proxy.localGateway = nil
var gwList v1beta1.GatewayList
err := e.client.List(context.TODO(), &gwList)
if err != nil {
return
}
for _, gw := range gwList.Items {
for _, node := range gw.Status.Nodes {
if node.NodeName == e.nodeName {
e.tunnel.localGateway = gw.DeepCopy()
e.proxy.localGateway = gw.DeepCopy()
return
}
}
}
}

func (e *Engine) cleanup() {
if e.option.GetTunnelStatus() {
err := e.tunnelEngine.clearDriver()
err := e.tunnel.CleanupDriver()
if err != nil {
klog.Errorf(utils.FormatRavenEngine("failed to cleanup tunnel driver, error %s", err.Error()))
}
}
if e.option.GetProxyStatus() {
e.proxyEngine.stopServers()
e.proxy.stop()
}
}

func (e *Engine) enqueueTunnel(obj *v1beta1.Gateway) {
klog.Info(utils.FormatRavenEngine("enqueue gateway %s to tunnel queue", obj.Name))
e.tunnelQueue.Add(obj)
}
func (e *Engine) handleEventErr(err error, gw *v1beta1.Gateway) {
if err == nil {
e.queue.Forget(gw)
return
}

func (e *Engine) enqueueProxy(obj *v1beta1.Gateway) {
klog.Info(utils.FormatRavenEngine("enqueue gateway %s to proxy queue", obj.Name))
e.proxyQueue.Add(obj)
if e.queue.NumRequeues(gw) < utils.MaxRetries {
klog.Info(utils.FormatRavenEngine("error syncing event %s: %s", gw.GetName(), err.Error()))
e.queue.AddRateLimited(gw)
return
}
klog.Info(utils.FormatRavenEngine("dropping event %s out of the queue: %s", gw.GetName(), err.Error()))
e.queue.Forget(gw)
}

func (e *Engine) addGateway(evt event.CreateEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("adding gateway %s", gw.GetName()))
e.enqueueTunnel(gw.DeepCopy())
e.enqueueProxy(gw.DeepCopy())
e.queue.Add(gw.DeepCopy())
}
return ok
}
Expand All @@ -119,8 +188,7 @@ func (e *Engine) updateGateway(evt event.UpdateEvent) bool {
if oldGw.ResourceVersion != newGw.ResourceVersion {
update = true
klog.InfoS(utils.FormatRavenEngine("updating gateway, %s", newGw.GetName()))
e.enqueueTunnel(newGw.DeepCopy())
e.enqueueProxy(newGw.DeepCopy())
e.queue.Add(newGw.DeepCopy())
} else {
klog.InfoS(utils.FormatRavenEngine("skip handle update gateway"), klog.KObj(newGw))
}
Expand All @@ -132,8 +200,7 @@ func (e *Engine) deleteGateway(evt event.DeleteEvent) bool {
gw, ok := evt.Object.(*v1beta1.Gateway)
if ok {
klog.InfoS(utils.FormatRavenEngine("deleting gateway, %s", gw.GetName()))
e.enqueueTunnel(gw.DeepCopy())
e.enqueueProxy(gw.DeepCopy())
e.queue.Add(gw.DeepCopy())
}
return ok
}
Loading
Loading