Skip to content

Commit

Permalink
#624 Set flat priority only for services with traffic splitting (#414)
Browse files Browse the repository at this point in the history
* #624 Set flat priority only for services with traffic splitting
  • Loading branch information
nastassia-dailidava authored Apr 24, 2024
1 parent 8e902af commit a7d946e
Show file tree
Hide file tree
Showing 17 changed files with 431 additions and 180 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## [0.20.13]
### Changed
- Added setting: "zonesAllowingTrafficSplitting", so changes in a config would be made only for envoys in that zone
- Fixed setting priority for traffic splitting endpoints, they will be duplicated with higher priorities

## [0.20.12]
### Changed
- Added "trackRemaining" flag to enable possibility of tracking additional circuit breaker metrics
Expand Down
3 changes: 2 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Property
**envoy-control.envoy.snapshot.load-balancing.policy** | load balancing policy used for clusters. [Accepted values](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/cluster.proto#enum-config-cluster-v3-cluster-lbpolicy) | LEAST_REQUEST
**envoy-control.envoy.snapshot.load-balancing.use-keys-subset-fallback-policy** | KEYS_SUBSET fallback policy is used by default when canary and service-tags are enabled. It is not supported in Envoy <= 1.12.x. Set to false for compatibility with Envoy 1.12.x | true
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.zoneName** | a zone to which traffic will be routed if traffic splitting is enabled | ""
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.weights-by-service-properties.** | a map that maps service name to a map [zoneName: weight] | empty map
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.zones-allowing-traffic-splitting** | a zone from which traffic should be splitted | empty list
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.weights-by-service** | a map that maps service name to a map [zoneName: weight] | empty map

## Routing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class ControlPlane private constructor(
val envoySnapshotFactory = EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, envoyHttpFilters, currentZone),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, currentZone),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties,
ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ class EnvoySnapshotFactory(
}
}
}

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class TrafficSplittingProperties {
var zoneName = ""
var headerName = ""
var weightsByService: Map<String, ZoneWeights> = mapOf()
var zonesAllowingTrafficSplitting = listOf<String>()
}

class ZoneWeights {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory

typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig

class EnvoyClustersFactory(
private val properties: SnapshotProperties
private val properties: SnapshotProperties,
private val currentZone: String
) {
private val httpProtocolOptions: HttpProtocolOptions = HttpProtocolOptions.newBuilder().setIdleTimeout(
Durations.fromMillis(properties.egress.commonHttp.connectionIdleTimeout.toMillis())
Expand Down Expand Up @@ -283,14 +283,18 @@ class EnvoyClustersFactory(
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.weightsByService.containsKey(serviceName)
return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting)
val allowed = clusterLoadAssignment != null &&
hasEndpointsInZone(clusterLoadAssignment, trafficSplitting.zoneName) &&
trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)
return trafficSplitEnabled && allowed
}

private fun hasEndpointsInZone(
clusterLoadAssignment: ClusterLoadAssignment?,
trafficSplitting: TrafficSplittingProperties
zoneName: String
) = clusterLoadAssignment?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone && e.lbEndpointsCount > 0 } ?: false
?.any { e -> zoneName == e.locality.zone && e.lbEndpointsCount > 0 }
?: false

private fun shouldAddDynamicForwardProxyCluster(group: Group) =
group.proxySettings.outgoing.getDomainPatternDependencies().isNotEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class EnvoyEndpointsFactory(
) {
companion object {
private val logger by logger()
private const val HIGHEST_PRIORITY = 0
private const val DEFAULT_WEIGHT = 1
}

fun createLoadAssignment(
Expand Down Expand Up @@ -84,22 +86,48 @@ class EnvoyEndpointsFactory(
return if (routeSpec is WeightRouteSpecification) {
ClusterLoadAssignment.newBuilder(loadAssignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(loadAssignment.endpointsList, routeSpec.clusterWeights))
.addAllEndpoints(
assignWeightsAndDuplicateEndpoints(
loadAssignment.endpointsList,
routeSpec.clusterWeights
)
)
.setClusterName(routeSpec.clusterName)
.build()
} else loadAssignment
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
private fun assignWeightsAndDuplicateEndpoints(
llbEndpointsList: List<LocalityLbEndpoints>,
weights: ZoneWeights
): List<LocalityLbEndpoints> {
if (properties.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)) {
val endpoints = llbEndpointsList
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(
UInt32Value.of(
weights.weightByZone[it.locality.zone] ?: DEFAULT_WEIGHT
)
)
.build()
} else it
}
return overrideTrafficSplittingZoneEndpointsPriority(endpoints) + endpoints
}
return llbEndpointsList
}

private fun overrideTrafficSplittingZoneEndpointsPriority(
endpoints: List<LocalityLbEndpoints>
): List<LocalityLbEndpoints> {
return endpoints
.filter { properties.loadBalancing.trafficSplitting.zoneName == it.locality.zone }
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
} else it
LocalityLbEndpoints.newBuilder(it)
.setPriority(HIGHEST_PRIORITY)
.build()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_DISCOVERY_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_IDLE_TIMEOUT
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.HIGHEST_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_PROPERTIES_WITH_WEIGHTS
Expand Down Expand Up @@ -275,13 +277,18 @@ class EnvoySnapshotFactoryTest {
assertThat(it.endpointsList)
.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE]
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == DEFAULT_PRIORITY
}.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == HIGHEST_PRIORITY
}
.anySatisfy { e ->
e.locality.zone == CURRENT_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[CURRENT_ZONE]
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -313,7 +320,7 @@ class EnvoySnapshotFactoryTest {
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
!e.hasLoadBalancingWeight()
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -457,7 +464,7 @@ class EnvoySnapshotFactoryTest {
CURRENT_ZONE
)
val egressRoutesFactory = EnvoyEgressRoutesFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties, CURRENT_ZONE)
val endpointsFactory = EnvoyEndpointsFactory(properties, ServiceTagMetadataGenerator(), CURRENT_ZONE)
val envoyHttpFilters = EnvoyHttpFilters.defaultFilters(properties)
val listenersFactory = EnvoyListenersFactory(properties, envoyHttpFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ class SnapshotUpdaterTest {
EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, currentZone = CURRENT_ZONE),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, CURRENT_ZONE),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties, ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags), CURRENT_ZONE
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters
import io.envoyproxy.controlplane.cache.SnapshotResources
import io.envoyproxy.envoy.config.cluster.v3.Cluster
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME1
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME2
import pl.allegro.tech.servicemesh.envoycontrol.utils.CURRENT_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TRAFFIC_SPLITTING_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.createAllServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.utils.createCluster
import pl.allegro.tech.servicemesh.envoycontrol.utils.createClusterConfigurations
import pl.allegro.tech.servicemesh.envoycontrol.utils.createEndpoints
import pl.allegro.tech.servicemesh.envoycontrol.utils.createListenersConfig
import pl.allegro.tech.servicemesh.envoycontrol.utils.createLoadAssignments
import pl.allegro.tech.servicemesh.envoycontrol.utils.createServicesGroup

internal class EnvoyClustersFactoryTest {

companion object {
private val factory = EnvoyClustersFactory(SnapshotProperties())
private val factory = EnvoyClustersFactory(SnapshotProperties(), CURRENT_ZONE)
private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing.trafficSplitting.weightsByService = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
Expand Down Expand Up @@ -96,7 +99,7 @@ internal class EnvoyClustersFactoryTest {
@Test
fun `should get cluster with locality weighted config for group clusters`() {
val cluster1 = createCluster(snapshotPropertiesWithWeights, CLUSTER_NAME1)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights, CURRENT_ZONE)
val result = factory.getClustersForGroup(
createServicesGroup(
snapshotProperties = snapshotPropertiesWithWeights,
Expand All @@ -113,15 +116,39 @@ internal class EnvoyClustersFactoryTest {
}
}

@Test
fun `should not apply locality weighted config if there are no endpoints in the ts zone`() {
val cluster1 = createCluster(snapshotPropertiesWithWeights, CLUSTER_NAME1)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights, CURRENT_ZONE)
val result = factory.getClustersForGroup(
createServicesGroup(
snapshotProperties = snapshotPropertiesWithWeights,
listenersConfig = createListenersConfig(snapshotPropertiesWithWeights, true),
dependencies = arrayOf(CLUSTER_NAME1 to null),
),
createGlobalSnapshot(cluster1, endpoints = null)
)
assertThat(result)
.anySatisfy {
assertThat(it.name).isEqualTo(CLUSTER_NAME1)
assertThat(it.edsClusterConfig).isEqualTo(cluster1.edsClusterConfig)
assertThat(it.commonLbConfig.hasLocalityWeightedLbConfig()).isFalse()
}
}

private fun createGlobalSnapshot(
vararg clusters: Cluster,
securedClusters: List<Cluster> = clusters.asList()
securedClusters: List<Cluster> = clusters.asList(),
endpoints: List<LocalityLbEndpoints>? = createEndpoints()
): GlobalSnapshot {
val clusterLoadAssignment = endpoints
?.let { createLoadAssignments(clusters.toList(), endpoints) }
?: createLoadAssignments(clusters.toList(), listOf())
return GlobalSnapshot(
SnapshotResources.create<Cluster>(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3")
.resources(),
clusters.map { it.name }.toSet(),
SnapshotResources.create<ClusterLoadAssignment>(createLoadAssignments(clusters.toList()), "v1").resources(),
SnapshotResources.create<ClusterLoadAssignment>(clusterLoadAssignment, "v1").resources(),
createClusterConfigurations(),
SnapshotResources.create<Cluster>(securedClusters, "v3").resources()
)
Expand Down
Loading

0 comments on commit a7d946e

Please sign in to comment.