From 07c93376b3b145f89748501640cb356cda6dcddb Mon Sep 17 00:00:00 2001 From: Huabing Zhao Date: Tue, 26 Nov 2024 06:08:26 +0000 Subject: [PATCH] rename method Signed-off-by: Huabing Zhao --- .../provider/kubernetes/status_updater.go | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index fbb5ada36a1..ee5cbce59d2 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -71,10 +71,10 @@ func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { sendUpdates: sendUpdates, updateChannel: updateChannel, writer: &UpdateWriter{ - log: log, - enabled: sendUpdates, - updateChannel: updateChannel, - missedEvents: make(chan Update, 1000), + log: log, + enabled: sendUpdates, + updateChannel: updateChannel, + eventsBeforeEnabled: make(chan Update, 1000), }, } } @@ -141,7 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Enable Updaters to start sending updates to this handler. close(u.sendUpdates) - u.writer.start() + u.writer.handleEventsReceivedBeforeEnabled() for { select { @@ -171,7 +171,9 @@ type UpdateWriter struct { log logr.Logger enabled <-chan struct{} updateChannel chan<- Update - missedEvents chan Update + // a temporary buffer to store events received before the Updater is enabled. + // These events will be sent to the update channel once the Updater is enabled. + eventsBeforeEnabled chan Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. @@ -181,23 +183,24 @@ func (u *UpdateWriter) Send(update Update) { case <-u.enabled: u.updateChannel <- update default: - if len(u.missedEvents) < cap(u.missedEvents) { + if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) { u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) - u.missedEvents <- update + u.eventsBeforeEnabled <- update } else { + // If the buffer is full, drop the event to avoid blocking the sender. u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) } } } -// start runs the goroutine to send the events received before the Updater was enabled to the update channel. -func (u *UpdateWriter) start() { +// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel. +func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() { go func() { - for e := range u.missedEvents { + for e := range u.eventsBeforeEnabled { u.log.Info("sending stored status update", "event", e.NamespacedName) u.updateChannel <- e } - close(u.missedEvents) + close(u.eventsBeforeEnabled) }() }