Skip to content

Commit

Permalink
Fix: frequent 503 errors when connecting to a Service experiencing hi…
Browse files Browse the repository at this point in the history
…gh Pod churn (envoyproxy#4754)

* Revert "fix: some status updates are discarded by the status updater (envoyproxy#4337)"

This reverts commit 14830c7.

Signed-off-by: Huabing Zhao <[email protected]>

* store update events and process it later

Signed-off-by: Huabing Zhao <[email protected]>

* rename method

Signed-off-by: Huabing Zhao <[email protected]>

* add release note

Signed-off-by: Huabing Zhao <[email protected]>

---------

Signed-off-by: Huabing Zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Nov 27, 2024
1 parent 222a0f2 commit ef5f2c1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
51 changes: 46 additions & 5 deletions internal/provider/kubernetes/status_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package kubernetes

import (
"context"
"errors"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -56,14 +57,25 @@ func (m MutatorFunc) Mutate(old client.Object) client.Object {
type UpdateHandler struct {
log logr.Logger
client client.Client
sendUpdates chan struct{}
updateChannel chan Update
writer *UpdateWriter
}

func NewUpdateHandler(log logr.Logger, client client.Client) *UpdateHandler {
sendUpdates := make(chan struct{})
updateChannel := make(chan Update, 100)
return &UpdateHandler{
log: log,
client: client,
updateChannel: make(chan Update, 100),
sendUpdates: sendUpdates,
updateChannel: updateChannel,
writer: &UpdateWriter{
log: log,
enabled: sendUpdates,
updateChannel: updateChannel,
eventsBeforeEnabled: make(chan Update, 1000),
},
}
}

Expand Down Expand Up @@ -127,6 +139,10 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
u.log.Info("started status update handler")
defer u.log.Info("stopped status update handler")

// Enable Updaters to start sending updates to this handler.
close(u.sendUpdates)
u.writer.handleEventsReceivedBeforeEnabled()

for {
select {
case <-ctx.Done():
Expand All @@ -142,9 +158,7 @@ func (u *UpdateHandler) Start(ctx context.Context) error {

// Writer retrieves the interface that should be used to write to the UpdateHandler.
func (u *UpdateHandler) Writer() Updater {
return &UpdateWriter{
updateChannel: u.updateChannel,
}
return u.writer
}

// Updater describes an interface to send status updates somewhere.
Expand All @@ -154,13 +168,40 @@ type Updater interface {

// UpdateWriter takes status updates and sends these to the UpdateHandler via a channel.
type UpdateWriter struct {
log logr.Logger
enabled <-chan struct{}
updateChannel chan<- Update
// a temporary buffer to store events received before the Updater is enabled.
// These events will be sent to the update channel once the Updater is enabled.
eventsBeforeEnabled chan Update
}

// Send sends the given Update off to the update channel for writing by the UpdateHandler.
func (u *UpdateWriter) Send(update Update) {
// Non-blocking receive to see if we should pass along update.
u.updateChannel <- update
select {
case <-u.enabled:
u.updateChannel <- update
default:
if len(u.eventsBeforeEnabled) < cap(u.eventsBeforeEnabled) {
u.log.Info("received a status update while disabled, storing for later", "event", update.NamespacedName)
u.eventsBeforeEnabled <- update
} else {
// If the buffer is full, drop the event to avoid blocking the sender.
u.log.Error(errors.New("dropping status update, buffer full"), "event", update.NamespacedName)
}
}
}

// handleEventsReceivedBeforeEnabled sends the events received before the Updater was enabled to the update channel.
func (u *UpdateWriter) handleEventsReceivedBeforeEnabled() {
go func() {
for e := range u.eventsBeforeEnabled {
u.log.Info("sending stored status update", "event", e.NamespacedName)
u.updateChannel <- e
}
close(u.eventsBeforeEnabled)
}()
}

// isStatusEqual checks if two objects have equivalent status.
Expand Down
1 change: 1 addition & 0 deletions release-notes/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ bug fixes: |
Fixed Envoy rejecting TCP Listeners that have no attached TCPRoutes
Fixed failed to update SecurityPolicy resources with the `backendRef` field specified
Fixed xDS translation failed when oidc tokenEndpoint and jwt remoteJWKS are specified in the same SecurityPolicy and using the same hostname
Fixed frequent 503 errors when connecting to a Service experiencing high Pod churn
# Enhancements that improve performance.
performance improvements: |
Expand Down

0 comments on commit ef5f2c1

Please sign in to comment.