From c3a22e2d5b970d31ffbc73748bbff3f87fb9d5cc Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Thu, 21 Nov 2024 21:48:17 +0800 Subject: [PATCH 1/4] Revert "fix: some status updates are discarded by the status updater (#4337)" This reverts commit 14830c7b7a7fa20cd3c5e82625c355485bcbd961. Signed-off-by: Huabing Zhao --- internal/provider/kubernetes/status_updater.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index 24adaedd563..9da708f1b02 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -56,6 +56,7 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object { type UpdateHandler struct { log logr.Logger client client.Client + sendUpdates chan struct{} updateChannel chan Update } @@ -63,6 +64,7 @@ func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { return &UpdateHandler{ log: log, client: client, + sendUpdates: make(chan struct{}), updateChannel: make(chan Update, 100), } } @@ -127,6 +129,9 @@ func (u *UpdateHandler) Start(ctx context.Context) error { u.log.Info("started status update handler") defer u.log.Info("stopped status update handler") + // Enable Updaters to start sending updates to this handler. + close(u.sendUpdates) + for { select { case <-ctx.Done(): @@ -143,6 +148,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Writer retrieves the interface that should be used to write to the UpdateHandler. func (u *UpdateHandler) Writer() Updater { return &UpdateWriter{ + enabled: u.sendUpdates, updateChannel: u.updateChannel, } } @@ -154,13 +160,18 @@ type Updater interface { // UpdateWriter takes status updates and sends these to the UpdateHandler via a channel. type UpdateWriter struct { + enabled <-chan struct{} updateChannel chan<- Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. func (u *UpdateWriter) Send(update Update) { // Non-blocking receive to see if we should pass along update. - u.updateChannel <- update + select { + case <-u.enabled: + u.updateChannel <- update + default: + } } // isStatusEqual checks if two objects have equivalent status. From bce5a67e33cd9e40f36d4471e8508620f9ca683d Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Tue, 26 Nov 2024 05:51:21 +0000 Subject: [PATCH 2/4] store update events and process it later Signed-off-by: Huabing Zhao --- .../provider/kubernetes/status_updater.go | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index 9da708f1b02..fbb5ada36a1 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -7,6 +7,7 @@ package kubernetes import ( "context" + "errors" "time" "github.com/go-logr/logr" @@ -58,14 +59,23 @@ type UpdateHandler struct { client client.Client sendUpdates chan struct{} updateChannel chan Update + writer *UpdateWriter } func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { + sendUpdates := make(chan struct{}) + updateChannel := make(chan Update, 100) return &UpdateHandler{ log: log, client: client, - sendUpdates: make(chan struct{}), - updateChannel: make(chan Update, 100), + sendUpdates: sendUpdates, + updateChannel: updateChannel, + writer: &UpdateWriter{ + log: log, + enabled: sendUpdates, + updateChannel: updateChannel, + missedEvents: make(chan Update, 1000), + }, } } @@ -131,6 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Enable Updaters to start sending updates to this handler. close(u.sendUpdates) + u.writer.start() for { select { @@ -147,10 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Writer retrieves the interface that should be used to write to the UpdateHandler. func (u *UpdateHandler) Writer() Updater { - return &UpdateWriter{ - enabled: u.sendUpdates, - updateChannel: u.updateChannel, - } + return u.writer } // Updater describes an interface to send status updates somewhere. @@ -160,8 +168,10 @@ type Updater interface { // UpdateWriter takes status updates and sends these to the UpdateHandler via a channel. type UpdateWriter struct { + log logr.Logger enabled <-chan struct{} updateChannel chan<- Update + missedEvents chan Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. @@ -171,9 +181,26 @@ func (u *UpdateWriter) Send(update Update) { case <-u.enabled: u.updateChannel <- update default: + if len(u.missedEvents) < cap(u.missedEvents) { + u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) + u.missedEvents <- update + } else { + u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) + } } } +// start runs the goroutine to send the events received before the Updater was enabled to the update channel. +func (u *UpdateWriter) start() { + go func() { + for e := range u.missedEvents { + u.log.Info("sending stored status update", "event", e.NamespacedName) + u.updateChannel <- e + } + close(u.missedEvents) + }() +} + // isStatusEqual checks if two objects have equivalent status. // // Supported objects: From 07c93376b3b145f89748501640cb356cda6dcddb Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Tue, 26 Nov 2024 06:08:26 +0000 Subject: [PATCH 3/4] rename method Signed-off-by: Huabing Zhao --- .../provider/kubernetes/status_updater.go | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index fbb5ada36a1..ee5cbce59d2 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -71,10 +71,10 @@ func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { sendUpdates: sendUpdates, updateChannel: updateChannel, writer: &UpdateWriter{ - log: log, - enabled: sendUpdates, - updateChannel: updateChannel, - missedEvents: make(chan Update, 1000), + log: log, + enabled: sendUpdates, + updateChannel: updateChannel, + eventsBeforeEnabled: make(chan Update, 1000), }, } } @@ -141,7 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Enable Updaters to start sending updates to this handler. close(u.sendUpdates) - u.writer.start() + u.writer.handleEventsReceivedBeforeEnabled() for { select { @@ -171,7 +171,9 @@ type UpdateWriter struct { log logr.Logger enabled <-chan struct{} updateChannel chan<- Update - missedEvents chan Update + // a temporary buffer to store events received before the Updater is enabled. + // These events will be sent to the update channel once the Updater is enabled. + eventsBeforeEnabled chan Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. @@ -181,23 +183,24 @@ func (u *UpdateWriter) Send(update Update) { case <-u.enabled: u.updateChannel <- update default: - if len(u.missedEvents) < cap(u.missedEvents) { + if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) { u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) - u.missedEvents <- update + u.eventsBeforeEnabled <- update } else { + // If the buffer is full, drop the event to avoid blocking the sender. u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) } } } -// start runs the goroutine to send the events received before the Updater was enabled to the update channel. -func (u *UpdateWriter) start() { +// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel. +func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() { go func() { - for e := range u.missedEvents { + for e := range u.eventsBeforeEnabled { u.log.Info("sending stored status update", "event", e.NamespacedName) u.updateChannel <- e } - close(u.missedEvents) + close(u.eventsBeforeEnabled) }() } From 708dfcc6d411e6fb6a836af426e2d5f0e8178bf9 Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Tue, 26 Nov 2024 08:19:25 +0000 Subject: [PATCH 4/4] add release note Signed-off-by: Huabing Zhao --- release-notes/current.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/release-notes/current.yaml b/release-notes/current.yaml index c0f8dfc0e5b..9ad7f7fe90b 100644 --- a/release-notes/current.yaml +++ b/release-notes/current.yaml @@ -20,6 +20,7 @@ bug fixes: | Fixed failed to update SecurityPolicy resources with the `backendRef` field specified Fixed Envoy rejecting TCP Listeners that have no attached TCPRoutes Fixed xDS translation failed when oidc tokenEndpoint and jwt remoteJWKS are specified in the same SecurityPolicy and using the same hostname + Fixed frequent 503 errors when connecting to a Service experiencing high Pod churn # Enhancements that improve performance. performance improvements: |