Skip to content

Commit

Permalink
Support Namespaced Statuses (#447)
Browse files Browse the repository at this point in the history
* Add logs
* WIP
* WIP
* WIP
* more
* remove logs
* update types
* small changes
* update tests
* include POD_NAMESPACE in test
* synchronized
* INclude offset so error is propagated
* test changes
* Merge refs/heads/master into sam/support-namespaced-statuses
* use getters
* Merge branch 'sam/support-namespaced-statuses' of github.com:solo-io/solo-kit into sam/support-namespaced-statuses
* Deflake CRUD tests
* Merge branch 'deflake-sk-tests' into sam/support-namespaced-statuses
* Set pod_namespace
* Merge refs/heads/master into sam/support-namespaced-statuses
* update test template and rerun codegen
* update test templates
* Update mock protos
* update tests
* Merge refs/heads/master into sam/support-namespaced-statuses
* revert logging level in v2.reporter
* use constant for env variable name
* expose InputResource status unmarshal as method
* expose unmarshaller
* statusutils
* cleanup protutils.marshal
* small fixup to statusutils
* update propagator reference
* update comments
* update statusutils
* update interface to inject ns
* more changes
* small cleanup
* fix generated kube tests
* Merge refs/heads/master into sam/support-namespaced-statuses-attempt-2
* Adding changelog file to new location
* Deleting changelog file from old location
* resources.StatusUnmarshaler
* add statusreporter
* StatusReporter -> StatusStore
* one more name change
* inject statusClient into reporter
* small touchup
* inject statusSetter in reconciler
* support multicluster reconciler
* simplify input_resource interface
* fix propagator
* fixup unmarshal
* cleanup import cycle
* add NoOpStatusClient for testing or for easy use
* cleanup single status support
* missing return
  • Loading branch information
sam-heilbron authored Sep 21, 2021
1 parent afbf1e4 commit 33fda1f
Show file tree
Hide file tree
Showing 104 changed files with 1,951 additions and 1,093 deletions.
9 changes: 9 additions & 0 deletions api/v1/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ option (extproto.equal_all) = true;

import "google/protobuf/struct.proto";

/**
* NamespacedStatuses indicates the Status of a resource according to each controller.
* NamespacedStatuses are meant to be read-only by users
*/
message NamespacedStatuses {
// Mapping from namespace to the Status written by the controller running in that namespace.
map<string, Status> statuses = 1;
}

/**
* Status indicates whether a resource has been (in)validated by a reporter in the system.
* Statuses are meant to be read-only by users
Expand Down
5 changes: 5 additions & 0 deletions changelog/v0.23.0/support-namespaced-statuses.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
changelog:
- type: BREAKING_CHANGE
description: Add support for statuses from multiple reporters.
issueLink: https://github.com/solo-io/solo-kit/issues/430
resolvesIssue: false
15 changes: 15 additions & 0 deletions pkg/api/v1/clients/factory/resource_client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"strings"
"time"

"github.com/solo-io/solo-kit/pkg/utils/protoutils"

"github.com/solo-io/solo-kit/pkg/utils/statusutils"

"github.com/hashicorp/consul/api"
vaultapi "github.com/hashicorp/vault/api"
"github.com/solo-io/go-utils/stringutils"
Expand Down Expand Up @@ -83,13 +87,24 @@ func newResourceClient(ctx context.Context, factory ResourceClientFactory, param
}
}

// The POD_NAMESPACE determines which namespace to write statuses to
// This is used for backwards compatibility in case a CRD is unmarshalled
// and it contains a single status.
statusReporterNamespace, err := statusutils.GetStatusReporterNamespaceFromEnv()
if err != nil {
return nil, errors.Wrapf(err, "getting status reporter namespace")
}

inputResourceStatusUnmarshaler := statusutils.NewNamespacedStatusesUnmarshaler(statusReporterNamespace, protoutils.UnmarshalMapToProto)

client := kube.NewResourceClient(
opts.Crd,
crdClient,
opts.SharedCache,
inputResource,
namespaceWhitelist,
opts.ResyncPeriod,
inputResourceStatusUnmarshaler,
)
return wrapper.NewClusterResourceClient(client, opts.Cluster), nil

Expand Down
14 changes: 8 additions & 6 deletions pkg/api/v1/clients/kube/crd/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,19 @@ func (d Crd) KubeResource(resource resources.InputResource) (*v1.Resource, error
return nil, MarshalErr(err, "resource to map")
}

// When marshalling we CRD, we remove information from the resource spec that will be persisted elsewhere.
// For example, namespacedStatuses contains the status set by solo controllers. It is removed from the
// spec and maintained in the CRD status
delete(data, "metadata")
delete(data, "status")
delete(data, "namespacedStatuses")
spec = data

if resource.GetStatus() != nil {
statusProto := resource.GetStatus()
statusMap, err := protoutils.MarshalMapFromProtoWithEnumsAsInts(statusProto)
if namespacedStatusesProto := resource.GetNamespacedStatuses(); namespacedStatusesProto != nil {
namespacedStatusesMap, err := protoutils.MarshalMapFromProtoWithEnumsAsInts(namespacedStatusesProto)
if err != nil {
return nil, MarshalErr(err, "resource status to map")
return nil, MarshalErr(err, "resource namespacedStatuses to map")
}
status = statusMap
status = namespacedStatusesMap
}
}

Expand Down
49 changes: 19 additions & 30 deletions pkg/api/v1/clients/kube/resource_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"time"

"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"github.com/solo-io/solo-kit/pkg/utils/kubeutils"

"github.com/solo-io/go-utils/stringutils"
Expand Down Expand Up @@ -100,13 +99,14 @@ var _ clients.StorageWriteOpts = new(KubeWriteOpts)
// lazy start in list & watch
// register informers in register
type ResourceClient struct {
crd crd.Crd
crdClientset versioned.Interface
resourceName string
resourceType resources.InputResource
sharedCache SharedCache
namespaceWhitelist []string // Will contain at least metaV1.NamespaceAll ("")
resyncPeriod time.Duration
crd crd.Crd
crdClientset versioned.Interface
resourceName string
resourceType resources.InputResource
sharedCache SharedCache
namespaceWhitelist []string // Will contain at least metaV1.NamespaceAll ("")
resyncPeriod time.Duration
resourceStatusUnmarshaler resources.StatusUnmarshaler
}

func NewResourceClient(
Expand All @@ -116,20 +116,22 @@ func NewResourceClient(
resourceType resources.InputResource,
namespaceWhitelist []string,
resyncPeriod time.Duration,
resourceStatusUnmarshaler resources.StatusUnmarshaler,
) *ResourceClient {

typeof := reflect.TypeOf(resourceType)
resourceName := strings.Replace(typeof.String(), "*", "", -1)
resourceName = strings.Replace(resourceName, ".", "", -1)

return &ResourceClient{
crd: crd,
crdClientset: clientset,
resourceName: resourceName,
resourceType: resourceType,
sharedCache: sharedCache,
namespaceWhitelist: namespaceWhitelist,
resyncPeriod: resyncPeriod,
crd: crd,
crdClientset: clientset,
resourceName: resourceName,
resourceType: resourceType,
sharedCache: sharedCache,
namespaceWhitelist: namespaceWhitelist,
resyncPeriod: resyncPeriod,
resourceStatusUnmarshaler: resourceStatusUnmarshaler,
}
}

Expand Down Expand Up @@ -435,7 +437,7 @@ func (rc *ResourceClient) convertCrdToResource(resourceCrd *v1.Resource) (resour
resourceCrd.Name, resourceCrd.Namespace, rc.resourceName)
}
}
if err := customResource.UnmarshalStatus(resourceCrd.Status); err != nil {
if err := customResource.UnmarshalStatus(resourceCrd.Status, rc.resourceStatusUnmarshaler); err != nil {
return nil, errors.Wrapf(err, "unmarshalling crd status on custom resource %v in namespace %v into %v",
resourceCrd.Name, resourceCrd.Namespace, rc.resourceName)
}
Expand All @@ -444,20 +446,7 @@ func (rc *ResourceClient) convertCrdToResource(resourceCrd *v1.Resource) (resour
// Default unmarshalling

if withStatus, ok := resource.(resources.InputResource); ok {
// Always initialize status to empty, before it was empty by default, as it was a non-pointer value.
withStatus.SetStatus(&core.Status{})
updateFunc := func(status *core.Status) error {
if status == nil {
return nil
}
typedStatus := core.Status{}
if err := protoutils.UnmarshalMapToProto(resourceCrd.Status, &typedStatus); err != nil {
return err
}
*status = typedStatus
return nil
}
if err := resources.UpdateStatus(withStatus, updateFunc); err != nil {
if err := rc.resourceStatusUnmarshaler.UnmarshalStatus(resourceCrd.Status, withStatus); err != nil {
return nil, err
}
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/api/v1/clients/kube/resource_client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
var _ = Describe("Test ResourceClientSharedInformerFactory", func() {

const (
namespace1 = "test-ns-1"
namespace2 = "test-ns-2"
namespace3 = "test-ns-3"
podNamespace = "pod-ns"
namespace1 = "test-ns-1"
namespace2 = "test-ns-2"
namespace3 = "test-ns-3"
)

var (
Expand All @@ -35,9 +36,9 @@ var _ = Describe("Test ResourceClientSharedInformerFactory", func() {
kubeCache = kube.NewKubeCache(ctx).(*kube.ResourceClientSharedInformerFactory)
Expect(len(kubeCache.Informers())).To(BeZero())

client1 = util.MockClientForNamespace(kubeCache, []string{namespace1})
client2 = util.MockClientForNamespace(kubeCache, []string{namespace2})
client123 = util.MockClientForNamespace(kubeCache, []string{namespace1, namespace2, namespace3})
client1 = util.MockClientForNamespace(kubeCache, []string{namespace1}, podNamespace)
client2 = util.MockClientForNamespace(kubeCache, []string{namespace2}, podNamespace)
client123 = util.MockClientForNamespace(kubeCache, []string{namespace1, namespace2, namespace3}, podNamespace)
})

Describe("registering resource clients with the factory", func() {
Expand Down Expand Up @@ -117,7 +118,7 @@ var _ = Describe("Test ResourceClientSharedInformerFactory", func() {
BeforeEach(func() {
clientset = fake.NewSimpleClientset(mocksv1.MockResourceCrd)
// We need the resourceClient so that we can register its resourceType/namespaces with the cache
client := util.ClientForClientsetAndResource(clientset, kubeCache, mocksv1.MockResourceCrd, &mocksv1.MockResource{}, []string{namespace1})
client := util.ClientForClientsetAndResource(clientset, kubeCache, mocksv1.MockResourceCrd, &mocksv1.MockResource{}, []string{namespace1}, podNamespace)
err := kubeCache.Register(client)
Expect(err).NotTo(HaveOccurred())

Expand Down
Loading

0 comments on commit 33fda1f

Please sign in to comment.