Skip to content

Commit

Permalink
store update events and process it later
Browse files Browse the repository at this point in the history
Signed-off-by: Huabing Zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Nov 26, 2024
1 parent d3e3c6b commit 841520a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
11 changes: 3 additions & 8 deletions internal/provider/kubernetes/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo
// Check if the Service belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
return false
return true
}

// Merged gateways will have only this label, update status of all Gateways under found GatewayClass.
Expand All @@ -305,7 +304,7 @@ func (r *gatewayAPIReconciler) validateServiceForReconcile(obj client.Object) bo
r.log.Info("no Gateways found under GatewayClass", "name", gcName)
return false
}
return false
return true
}

nsName := utils.NamespacedName(svc)
Expand Down Expand Up @@ -540,7 +539,7 @@ func (r *gatewayAPIReconciler) validateObjectForReconcile(obj client.Object) boo
r.log.Info("no Gateways found under GatewayClass", "name", gcName)
return false
}
return false
return true
}

// There is no need to reconcile the object any further.
Expand Down Expand Up @@ -635,10 +634,6 @@ func (r *gatewayAPIReconciler) updateStatusForGatewaysUnderGatewayClass(ctx cont
return fmt.Errorf("no gateways found for gatewayclass: %s", gatewayClassName)
}

for _, gateway := range gateways.Items {
r.updateStatusForGateway(ctx, &gateway)
}

return nil
}

Expand Down
39 changes: 33 additions & 6 deletions internal/provider/kubernetes/status_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kubernetes

import (
"context"
"errors"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -58,14 +59,23 @@ type UpdateHandler struct {
client client.Client
sendUpdates chan struct{}
updateChannel chan Update
writer *UpdateWriter
}

func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
sendUpdates := make(chan struct{})
updateChannel := make(chan Update, 100)
return &UpdateHandler{
log: log,
client: client,
sendUpdates: make(chan struct{}),
updateChannel: make(chan Update, 100),
sendUpdates: sendUpdates,
updateChannel: updateChannel,
writer: &UpdateWriter{
log: log,
enabled: sendUpdates,
updateChannel: updateChannel,
missedEvents: make(chan Update, 1000),
},
}
}

Expand Down Expand Up @@ -131,6 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {

// Enable Updaters to start sending updates to this handler.
close(u.sendUpdates)
u.writer.start()

for {
select {
Expand All @@ -147,10 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {

// Writer retrieves the interface that should be used to write to the UpdateHandler.
func (u *UpdateHandler) Writer() Updater {
return &UpdateWriter{
enabled: u.sendUpdates,
updateChannel: u.updateChannel,
}
return u.writer
}

// Updater describes an interface to send status updates somewhere.
Expand All @@ -160,8 +168,10 @@ type Updater interface {

// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
type UpdateWriter struct {
log logr.Logger
enabled <-chan struct{}
updateChannel chan<- Update
missedEvents chan Update
}

// Send sends the given Update off to the update channel for writing by the UpdateHandler.
Expand All @@ -171,9 +181,26 @@ func (u *UpdateWriter) Send(update Update) {
case <-u.enabled:
u.updateChannel <- update
default:
if len(u.missedEvents) < cap(u.missedEvents) {
u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName)
u.missedEvents <- update
} else {
u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName)
}
}
}

// start runs the goroutine to send the events received before the Updater was enabled to the update channel.
func (u *UpdateWriter) start() {
go func() {
for e := range u.missedEvents {
u.log.Info("sending stored status update", "event", e.NamespacedName)
u.updateChannel <- e
}
close(u.missedEvents)
}()
}

// isStatusEqual checks if two objects have equivalent status.
//
// Supported objects:
Expand Down

0 comments on commit 841520a

Please sign in to comment.