Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decoup gateway status updates from the reconciler #4767

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,11 @@ func (r *Runner) getAllStatuses() *StatusesToDelete {
}

func (r *Runner) deleteStatusKeys(ds *StatusesToDelete) {
for key := range ds.GatewayStatusKeys {
r.ProviderResources.GatewayStatuses.Delete(key)
delete(ds.GatewayStatusKeys, key)
}
// Do not delete the status keys for the Gateway because the Gateway status has also been stored into the ProviderResources
// by the kubernetes provider to update the address and workload status.
//
// TODO: zhaohuabing this is acceptable as the Gateway status typically does not occupy a lot of memory,
// but it's better to move all the status handling to Gateway API translator layer to avoid this.
for key := range ds.HTTPRouteStatusKeys {
r.ProviderResources.HTTPRouteStatuses.Delete(key)
delete(ds.HTTPRouteStatusKeys, key)
Expand Down
2 changes: 0 additions & 2 deletions internal/gatewayapi/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func TestDeleteStatusKeys(t *testing.T) {
delete(ds.UDPRouteStatusKeys, keys[6])
r.deleteStatusKeys(ds)

require.Equal(t, 0, r.ProviderResources.GatewayStatuses.Len())
require.Equal(t, 0, r.ProviderResources.HTTPRouteStatuses.Len())
require.Equal(t, 0, r.ProviderResources.GRPCRouteStatuses.Len())
require.Equal(t, 0, r.ProviderResources.TLSRouteStatuses.Len())
Expand Down Expand Up @@ -278,7 +277,6 @@ func TestDeleteAllStatusKeys(t *testing.T) {
// Checks that the keys are successfully stored to DeletableStatus and watchable maps
ds := r.getAllStatuses()

require.True(t, ds.GatewayStatusKeys[keys[0]])
require.True(t, ds.HTTPRouteStatusKeys[keys[1]])
require.True(t, ds.GRPCRouteStatusKeys[keys[2]])
require.True(t, ds.TLSRouteStatusKeys[keys[3]])
Expand Down
13 changes: 7 additions & 6 deletions internal/message/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func (p *ProviderResources) Close() {

// GatewayAPIStatuses contains gateway API resources statuses
type GatewayAPIStatuses struct {
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
GatewayClassStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayClassStatus]
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
}

func (s *GatewayAPIStatuses) Close() {
Expand Down
20 changes: 12 additions & 8 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,12 @@
if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil {
if err := r.processGatewayClassParamsRef(ctx, managedGC, resourceMappings, gwcResource); err != nil {
msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err)
if err := r.updateStatusForGatewayClass(ctx, managedGC, false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
false,
string(gwapiv1.GatewayClassReasonInvalidParameters),
msg)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)

Check warning on line 207 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L202-L207

Added lines #L202 - L207 were not covered by tests
r.log.Error(err, "failed to process parametersRef for gatewayclass", "name", managedGC.Name)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -293,11 +296,12 @@

// process envoy gateway secret refs
r.processEnvoyProxySecretRef(ctx, gwcResource)

if err := r.updateStatusForGatewayClass(ctx, managedGC, true, string(gwapiv1.GatewayClassReasonAccepted), status.MsgValidGatewayClass); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
return reconcile.Result{}, err
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
true,
string(gwapiv1.GatewayClassReasonAccepted),
status.MsgValidGatewayClass)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)

if len(gwcResource.Gateways) == 0 {
r.log.Info("No gateways found for accepted gatewayclass")
Expand Down
20 changes: 17 additions & 3 deletions internal/provider/kubernetes/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@
// Check if the Service belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
return false
}

Expand Down Expand Up @@ -528,7 +528,7 @@
// Check if the obj belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
return false
}
}
Expand Down Expand Up @@ -636,12 +636,26 @@
}

for _, gateway := range gateways.Items {
r.updateStatusForGateway(ctx, &gateway)
r.updateGatewayStatus(&gateway)

Check warning on line 639 in internal/provider/kubernetes/predicates.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/predicates.go#L639

Added line #L639 was not covered by tests
}

return nil
}

// updateGatewayStatus triggers a status update for the Gateway.
func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) {
// The status added to GatewayStatuses is solely used to trigger the status updater
// and does not reflect the real changed status.
//
// The status updater will check the Envoy Proxy service to get the addresses of the Gateway,
// and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload.
//
// Since the status does not reflect the actual changed status, we need to delete it first
// to prevent it from being considered unchanged. This ensures that subscribers receive the update event.
r.resources.GatewayStatuses.Delete(utils.NamespacedName(gateway))
r.resources.GatewayStatuses.Store(utils.NamespacedName(gateway), &gateway.Status)
}

func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool {
ctx := context.Background()
node, ok := obj.(*corev1.Node)
Expand Down
3 changes: 3 additions & 0 deletions internal/provider/kubernetes/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/envoyproxy/gateway/internal/gatewayapi/resource"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/provider/kubernetes/test"
)

Expand Down Expand Up @@ -854,6 +855,7 @@ func TestValidateServiceForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
grpcRouteCRDExists: true,
tcpRouteCRDExists: true,
udpRouteCRDExists: true,
Expand Down Expand Up @@ -972,6 +974,7 @@ func TestValidateObjectForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
}

for _, tc := range testCases {
Expand Down
61 changes: 29 additions & 32 deletions internal/provider/kubernetes/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"fmt"
"reflect"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -28,6 +27,35 @@
// subscribeAndUpdateStatus subscribes to gateway API object status updates and
// writes it into the Kubernetes API Server.
func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context, extensionManagerEnabled bool) {
// GatewayClass object status updater
go func() {
message.HandleSubscription(
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"},
r.resources.GatewayClassStatuses.Subscribe(ctx),
func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) {
// skip delete updates.
if update.Delete {
return
}

Check warning on line 39 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L38-L39

Added lines #L38 - L39 were not covered by tests

r.statusUpdater.Send(Update{
NamespacedName: update.Key,
Resource: new(gwapiv1.GatewayClass),
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))

Check warning on line 47 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L47

Added line #L47 was not covered by tests
}
gcCopy := gc.DeepCopy()
gcCopy.Status = *update.Value
return gcCopy
}),
})
},
)
r.log.Info("gatewayclass status subscriber shutting down")

Check warning on line 56 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L56

Added line #L56 was not covered by tests
}()

// Gateway object status updater
go func() {
message.HandleSubscription(
Expand Down Expand Up @@ -564,34 +592,3 @@
}),
})
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about

func (r *gatewayAPIReconciler) updateStatusForGateway(ctx context.Context, gtw *gwapiv1.Gateway) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect the call to updateStatusForGateway from inside the predicate blocked the reconciler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we deleting update status for gateway ?

func (r *gatewayAPIReconciler) updateStatusForGatewayClass(
ctx context.Context,
gc *gwapiv1.GatewayClass,
accepted bool,
reason,
msg string,
) error {
if r.statusUpdater != nil {
r.statusUpdater.Send(Update{
NamespacedName: types.NamespacedName{Name: gc.Name},
Resource: &gwapiv1.GatewayClass{},
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))
}

return status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)
}),
})
} else {
// this branch makes testing easier by not going through the status.Updater.
duplicate := status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)

if err := r.client.Status().Update(ctx, duplicate); err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error updating status of gatewayclass %s: %w", duplicate.Name, err)
}
}
return nil
}
Loading