Skip to content

Commit

Permalink
store update events and process it later
Browse files Browse the repository at this point in the history
Signed-off-by: Huabing Zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Nov 26, 2024
1 parent d3e3c6b commit cf28b14
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions internal/provider/kubernetes/status_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kubernetes

import (
"context"
"errors"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -58,14 +59,23 @@ type UpdateHandler struct {
client client.Client
sendUpdates chan struct{}
updateChannel chan Update
writer *UpdateWriter
}

func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
sendUpdates := make(chan struct{})
updateChannel := make(chan Update, 100)
return &UpdateHandler{
log: log,
client: client,
sendUpdates: make(chan struct{}),
updateChannel: make(chan Update, 100),
sendUpdates: sendUpdates,
updateChannel: updateChannel,
writer: &UpdateWriter{
log: log,
enabled: sendUpdates,
updateChannel: updateChannel,
missedEvents: make(chan Update, 1000),
},
}
}

Expand Down Expand Up @@ -131,6 +141,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {

// Enable Updaters to start sending updates to this handler.
close(u.sendUpdates)
u.writer.start()

for {
select {
Expand All @@ -147,10 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {

// Writer retrieves the interface that should be used to write to the UpdateHandler.
func (u *UpdateHandler) Writer() Updater {
return &UpdateWriter{
enabled: u.sendUpdates,
updateChannel: u.updateChannel,
}
return u.writer
}

// Updater describes an interface to send status updates somewhere.
Expand All @@ -160,8 +168,10 @@ type Updater interface {

// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
type UpdateWriter struct {
log logr.Logger
enabled <-chan struct{}
updateChannel chan<- Update
missedEvents chan Update
}

// Send sends the given Update off to the update channel for writing by the UpdateHandler.
Expand All @@ -171,9 +181,26 @@ func (u *UpdateWriter) Send(update Update) {
case <-u.enabled:
u.updateChannel <- update
default:
if len(u.missedEvents) < cap(u.missedEvents) {
u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName)
u.missedEvents <- update
} else {
u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName)
}
}
}

// start runs the goroutine to send the events received before the Updater was enabled to the update channel.
func (u *UpdateWriter) start() {
go func() {
for e := range u.missedEvents {
u.log.Info("sending stored status update", "event", e.NamespacedName)
u.updateChannel <- e
}
close(u.missedEvents)
}()
}

// isStatusEqual checks if two objects have equivalent status.
//
// Supported objects:
Expand Down

0 comments on commit cf28b14

Please sign in to comment.