Skip to content

Commit

Permalink
[1.12] Fix Istio Crash When Using Headless Services (#7836)
Browse files Browse the repository at this point in the history
* Fix Istio Crash When Using Headless Services (#7804)

* proposal fix

* add changelog

* move istioIntegration logic back out of eps loop

* continue when setting istioIntegration ep

* fix validation webhook start condition

* add test for headless service curl-ing

* remove focused test

* empty commit

* move headless service setup into beforeEach

* add issue link

* move changelog

* Update changelog/v1.14.0-beta10/fix-istio-integration-headless-service-crash.yaml

Co-authored-by: Kevin Dorosh <[email protected]>

* add comment with issue link for clusterIP deprecation

---------

Co-authored-by: soloio-bulldozer[bot] <48420018+soloio-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Kevin Dorosh <[email protected]>

* move changelog

---------

Co-authored-by: soloio-bulldozer[bot] <48420018+soloio-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Kevin Dorosh <[email protected]>
  • Loading branch information
3 people authored Feb 12, 2023
1 parent 9a94ddf commit 1f607ea
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 158 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
changelog:
- type: FIX
description: Use Pod IPs for headless services when the gloo deployment's `ENABLE_ISTIO_INTEGRATION` is `true`.
issueLink: https://github.com/solo-io/gloo/issues/7491
resolvesIssue: false
- type: FIX
description: |
Fix a bug where the validation webhook server on the gloo pod would still start even if the `VALIDATION_MUST_START` environment variable was `false`.
Now it only starts if it's set to `true`, or if it is unset, so it acts as it previously has.
issueLink: https://github.com/solo-io/gloo/issues/7826
resolvesIssue: false
27 changes: 22 additions & 5 deletions projects/gloo/pkg/plugins/kubernetes/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ type Epkey struct {
Name string
Namespace string
UpstreamRef *core.ResourceRef
// While we can use the upstream ref to get the upstream and service, if there are too many upstreams that could be slow.
IsHeadless bool
}

// Returns first matching port in the namespace and boolean value of true if the
Expand All @@ -289,6 +291,15 @@ func findPortForService(services []*kubev1.Service, spec *kubeplugin.UpstreamSpe
return nil, false
}

func getServiceFromUpstreamSpec(spec *kubeplugin.UpstreamSpec, services []*kubev1.Service) *kubev1.Service {
for _, svc := range services {
if svc.Namespace == spec.GetServiceNamespace() && svc.Name == spec.GetServiceName() {
return svc
}
}
return nil
}

func filterEndpoints(
_ context.Context, // do not use for logging! return logging messages as strings and log them after hashing (see https://github.com/solo-io/gloo/issues/3761)
writeNamespace string,
Expand All @@ -313,8 +324,12 @@ func filterEndpoints(
continue
}

svc := getServiceFromUpstreamSpec(spec, services)
// TODO: Investigate possible deprecation of ClusterIPs in newer k8s versions https://github.com/solo-io/gloo/issues/7830
isHeadlessSvc := svc.Spec.ClusterIP == "None"
// Istio uses the service's port for routing requests
if istioIntegrationEnabled {
// Headless services don't have a cluster IP, so we'll resort to pod IP endpoints
if istioIntegrationEnabled && !isHeadlessSvc {
hostname := fmt.Sprintf("%v.%v", spec.GetServiceName(), spec.GetServiceNamespace())
copyRef := *usRef
key := Epkey{
Expand All @@ -323,6 +338,7 @@ func filterEndpoints(
Name: spec.GetServiceName(),
Namespace: spec.GetServiceNamespace(),
UpstreamRef: &copyRef,
IsHeadless: isHeadlessSvc,
}
endpointsMap[key] = append(endpointsMap[key], &copyRef)
continue
Expand All @@ -340,7 +356,7 @@ func filterEndpoints(
continue
}

warnings := processSubsetAddresses(subset, spec, podMap, usRef, port, endpointsMap)
warnings := processSubsetAddresses(subset, spec, podMap, usRef, port, endpointsMap, isHeadlessSvc)
warnsToLog = append(warnsToLog, warnings...)
}
}
Expand All @@ -351,7 +367,7 @@ func filterEndpoints(
return endpoints, warnsToLog, errorsToLog
}

func processSubsetAddresses(subset kubev1.EndpointSubset, spec *kubeplugin.UpstreamSpec, pods *podMap, usRef *core.ResourceRef, port uint32, endpointsMap map[Epkey][]*core.ResourceRef) []string {
func processSubsetAddresses(subset kubev1.EndpointSubset, spec *kubeplugin.UpstreamSpec, pods *podMap, usRef *core.ResourceRef, port uint32, endpointsMap map[Epkey][]*core.ResourceRef, isHeadlessService bool) []string {
var warnings []string
for _, addr := range subset.Addresses {
var podName, podNamespace string
Expand All @@ -378,7 +394,7 @@ func processSubsetAddresses(subset kubev1.EndpointSubset, spec *kubeplugin.Upstr
continue
}
}
key := Epkey{addr.IP, port, podName, podNamespace, usRef}
key := Epkey{addr.IP, port, podName, podNamespace, usRef, isHeadlessService}
copyRef := *usRef
endpointsMap[key] = append(endpointsMap[key], &copyRef)
}
Expand Down Expand Up @@ -426,7 +442,8 @@ func generateFilteredEndpointList(
endpointName := fmt.Sprintf("ep-%v-%v-%x", dnsname, addr.Port, hasher.Sum64())

var ep *v1.Endpoint
if istioIntegrationEnabled {
// While istio integration requires the Service VIP, headless services require the pod IP, as there is no Cluster IP
if istioIntegrationEnabled && !addr.IsHeadless {
// Istio integration requires assigning endpoints the Kub service VIP rather than pod address
service, _ := getServiceForHostname(addr.Address, addr.Name, addr.Namespace, services)
ep = createEndpoint(writeNamespace, endpointName, refs, service.Spec.ClusterIP, addr.Port, service.GetObjectMeta().GetLabels()) // TODO: labels may be nil
Expand Down
120 changes: 62 additions & 58 deletions projects/gloo/pkg/syncer/setup/setup_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,70 +834,74 @@ func RunGlooWithExtensions(opts bootstrap.Opts, extensions Extensions) error {
}
}()

// Start the validation webhook
validationServerErr := make(chan error, 1)
if gwOpts.Validation != nil {
// make sure non-empty WatchNamespaces contains the gloo instance's own namespace if
// ReadGatewaysFromAllNamespaces is false
if !gwOpts.ReadGatewaysFromAllNamespaces && !utils.AllNamespaces(opts.WatchNamespaces) {
foundSelf := false
for _, namespace := range opts.WatchNamespaces {
if gwOpts.GlooNamespace == namespace {
foundSelf = true
break
validationMustStart := os.Getenv("VALIDATION_MUST_START")
// only starting validation server if the env var is true or empty (previously, it always started, so this avoids causing unwanted changes for users)
if validationMustStart == "true" || validationMustStart == "" {
// Start the validation webhook
validationServerErr := make(chan error, 1)
if gwOpts.Validation != nil {
// make sure non-empty WatchNamespaces contains the gloo instance's own namespace if
// ReadGatewaysFromAllNamespaces is false
if !gwOpts.ReadGatewaysFromAllNamespaces && !utils.AllNamespaces(opts.WatchNamespaces) {
foundSelf := false
for _, namespace := range opts.WatchNamespaces {
if gwOpts.GlooNamespace == namespace {
foundSelf = true
break
}
}
if !foundSelf {
return errors.Errorf("The gateway configuration value readGatewaysFromAllNamespaces was set "+
"to false, but the non-empty settings.watchNamespaces "+
"list (%s) did not contain this gloo instance's own namespace: %s.",
strings.Join(opts.WatchNamespaces, ", "), gwOpts.GlooNamespace)
}
}
if !foundSelf {
return errors.Errorf("The gateway configuration value readGatewaysFromAllNamespaces was set "+
"to false, but the non-empty settings.watchNamespaces "+
"list (%s) did not contain this gloo instance's own namespace: %s.",
strings.Join(opts.WatchNamespaces, ", "), gwOpts.GlooNamespace)
}
}

validationWebhook, err := k8sadmission.NewGatewayValidatingWebhook(
k8sadmission.NewWebhookConfig(
watchOpts.Ctx,
gwValidationSyncer,
gwOpts.WatchNamespaces,
gwOpts.Validation.ValidatingWebhookPort,
gwOpts.Validation.ValidatingWebhookCertPath,
gwOpts.Validation.ValidatingWebhookKeyPath,
gwOpts.Validation.AlwaysAcceptResources,
gwOpts.ReadGatewaysFromAllNamespaces,
gwOpts.GlooNamespace,
),
)
if err != nil {
return errors.Wrapf(err, "creating validating webhook")
}

go func() {
// close out validation server when context is cancelled
<-watchOpts.Ctx.Done()
validationWebhook.Close()
}()
go func() {
contextutils.LoggerFrom(watchOpts.Ctx).Infow("starting gateway validation server",
zap.Int("port", gwOpts.Validation.ValidatingWebhookPort),
zap.String("cert", gwOpts.Validation.ValidatingWebhookCertPath),
zap.String("key", gwOpts.Validation.ValidatingWebhookKeyPath),
validationWebhook, err := k8sadmission.NewGatewayValidatingWebhook(
k8sadmission.NewWebhookConfig(
watchOpts.Ctx,
gwValidationSyncer,
gwOpts.WatchNamespaces,
gwOpts.Validation.ValidatingWebhookPort,
gwOpts.Validation.ValidatingWebhookCertPath,
gwOpts.Validation.ValidatingWebhookKeyPath,
gwOpts.Validation.AlwaysAcceptResources,
gwOpts.ReadGatewaysFromAllNamespaces,
gwOpts.GlooNamespace,
),
)
if err := validationWebhook.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed {
select {
case validationServerErr <- err:
default:
logger.DPanicw("failed to start validation webhook server", zap.Error(err))
}
if err != nil {
return errors.Wrapf(err, "creating validating webhook")
}
}()
}

// give the validation server 100ms to start
select {
case err := <-validationServerErr:
return errors.Wrapf(err, "failed to start validation webhook server")
case <-time.After(time.Millisecond * 100):
go func() {
// close out validation server when context is cancelled
<-watchOpts.Ctx.Done()
validationWebhook.Close()
}()
go func() {
contextutils.LoggerFrom(watchOpts.Ctx).Infow("starting gateway validation server",
zap.Int("port", gwOpts.Validation.ValidatingWebhookPort),
zap.String("cert", gwOpts.Validation.ValidatingWebhookCertPath),
zap.String("key", gwOpts.Validation.ValidatingWebhookKeyPath),
)
if err := validationWebhook.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed {
select {
case validationServerErr <- err:
default:
logger.DPanicw("failed to start validation webhook server", zap.Error(err))
}
}
}()
}

// give the validation server 100ms to start
select {
case err := <-validationServerErr:
return errors.Wrapf(err, "failed to start validation webhook server")
case <-time.After(time.Millisecond * 100):
}
}

go func() {
Expand Down
Loading

0 comments on commit 1f607ea

Please sign in to comment.