diff --git a/internal/provider/kubernetes/status_updater.go b/internal/provider/kubernetes/status_updater.go index 9da708f1b02..fbb5ada36a1 100644 --- a/internal/provider/kubernetes/status_updater.go +++ b/internal/provider/kubernetes/status_updater.go @@ -7,6 +7,7 @@ package kubernetes import ( "context" + "errors" "time" "github.com/go-logr/logr" @@ -58,14 +59,23 @@ type UpdateHandler struct { client client.Client sendUpdates chan struct{} updateChannel chan Update + writer *UpdateWriter } func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler { + sendUpdates := make(chan struct{}) + updateChannel := make(chan Update, 100) return &UpdateHandler{ log: log, client: client, - sendUpdates: make(chan struct{}), - updateChannel: make(chan Update, 100), + sendUpdates: sendUpdates, + updateChannel: updateChannel, + writer: &UpdateWriter{ + log: log, + enabled: sendUpdates, + updateChannel: updateChannel, + missedEvents: make(chan Update, 1000), + }, } } @@ -131,6 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Enable Updaters to start sending updates to this handler. close(u.sendUpdates) + u.writer.start() for { select { @@ -147,10 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error { // Writer retrieves the interface that should be used to write to the UpdateHandler. func (u *UpdateHandler) Writer() Updater { - return &UpdateWriter{ - enabled: u.sendUpdates, - updateChannel: u.updateChannel, - } + return u.writer } // Updater describes an interface to send status updates somewhere. @@ -160,8 +168,10 @@ type Updater interface { // UpdateWriter takes status updates and sends these to the UpdateHandler via a channel. type UpdateWriter struct { + log logr.Logger enabled <-chan struct{} updateChannel chan<- Update + missedEvents chan Update } // Send sends the given Update off to the update channel for writing by the UpdateHandler. @@ -171,9 +181,26 @@ func (u *UpdateWriter) Send(update Update) { case <-u.enabled: u.updateChannel <- update default: + if len(u.missedEvents) < cap(u.missedEvents) { + u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName) + u.missedEvents <- update + } else { + u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName) + } } } +// start runs the goroutine to send the events received before the Updater was enabled to the update channel. +func (u *UpdateWriter) start() { + go func() { + for e := range u.missedEvents { + u.log.Info("sending stored status update", "event", e.NamespacedName) + u.updateChannel <- e + } + close(u.missedEvents) + }() +} + // isStatusEqual checks if two objects have equivalent status. // // Supported objects: