diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index 24adaedd563..ee5cbce59d2 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -7,6 +7,7 @@ package kubernetes import ( "context" + "errors" "time" "github.com/go-logr/logr" @@ -56,14 +57,25 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object { type UpdateHandler struct { log logr.Logger client client.Client + sendUpdates chan struct{} updateChannel chan Update + writer *UpdateWriter } func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { + sendUpdates := make(chan struct{}) + updateChannel := make(chan Update, 100) return &UpdateHandler{ log: log, client: client, - updateChannel: make(chan Update, 100), + sendUpdates: sendUpdates, + updateChannel: updateChannel, + writer: &UpdateWriter{ + log: log, + enabled: sendUpdates, + updateChannel: updateChannel, + eventsBeforeEnabled: make(chan Update, 1000), + }, } } @@ -127,6 +139,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error { u.log.Info("started status update handler") defer u.log.Info("stopped status update handler") + // Enable Updaters to start sending updates to this handler. + close(u.sendUpdates) + u.writer.handleEventsReceivedBeforeEnabled() + for { select { case <-ctx.Done(): @@ -142,9 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Writer retrieves the interface that should be used to write to the UpdateHandler. func (u *UpdateHandler) Writer() Updater { - return &UpdateWriter{ - updateChannel: u.updateChannel, - } + return u.writer } // Updater describes an interface to send status updates somewhere. @@ -154,13 +168,40 @@ type Updater interface { // UpdateWriter takes status updates and sends these to the UpdateHandler via a channel. type UpdateWriter struct { + log logr.Logger + enabled <-chan struct{} updateChannel chan<- Update + // a temporary buffer to store events received before the Updater is enabled. + // These events will be sent to the update channel once the Updater is enabled. + eventsBeforeEnabled chan Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. func (u *UpdateWriter) Send(update Update) { // Non-blocking receive to see if we should pass along update. - u.updateChannel <- update + select { + case <-u.enabled: + u.updateChannel <- update + default: + if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) { + u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) + u.eventsBeforeEnabled <- update + } else { + // If the buffer is full, drop the event to avoid blocking the sender. + u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) + } + } +} + +// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel. +func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() { + go func() { + for e := range u.eventsBeforeEnabled { + u.log.Info("sending stored status update", "event", e.NamespacedName) + u.updateChannel <- e + } + close(u.eventsBeforeEnabled) + }() } // isStatusEqual checks if two objects have equivalent status. diff --git a/release-notes/current.yaml b/release-notes/current.yaml index eea29e8b78b..7b9d547d839 100644 --- a/release-notes/current.yaml +++ b/release-notes/current.yaml @@ -21,6 +21,7 @@ bug fixes: | Fixed failed to update SecurityPolicy resources with the `backendRef` field specified Fixed Envoy rejecting TCP Listeners that have no attached TCPRoutes Fixed xDS translation failed when oidc tokenEndpoint and jwt remoteJWKS are specified in the same SecurityPolicy and using the same hostname + Fixed frequent 503 errors when connecting to a Service experiencing high Pod churn # Enhancements that improve performance. performance improvements: |