Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
KSmigielski committed Aug 29, 2023
1 parent a3aed17 commit 3449ff3
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint
import pl.allegro.tech.servicemesh.envoycontrol.groups.ListenersConfig
import pl.allegro.tech.servicemesh.envoycontrol.groups.ProxySettings
import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.groups.orDefault
import pl.allegro.tech.servicemesh.envoycontrol.logger
Expand Down Expand Up @@ -164,22 +166,20 @@ class EnvoySnapshotFactory(
return group.proxySettings.outgoing.getDomainDependencies().groupBy(
{ DomainRoutesGrouper(it.getPort(), it.useSsl()) },
{
RouteSpecification(
StandardRouteSpecification(
clusterName = it.getClusterName(),
routeDomains = listOf(it.getRouteDomain()),
settings = it.settings,
clusterWeights = mapOf()
)
}
)
}

private fun getDomainPatternRouteSpecifications(group: Group): RouteSpecification {
return RouteSpecification(
return StandardRouteSpecification(
clusterName = properties.dynamicForwardProxy.clusterName,
routeDomains = group.proxySettings.outgoing.getDomainPatternDependencies().map { it.domainPattern },
settings = group.proxySettings.outgoing.defaultServiceSettings,
clusterWeights = mapOf()
)
}

Expand All @@ -188,11 +188,12 @@ class EnvoySnapshotFactory(
globalSnapshot: GlobalSnapshot
): Collection<RouteSpecification> {
val definedServicesRoutes = group.proxySettings.outgoing.getServiceDependencies().map {
RouteSpecification(
getTrafficSplittingRouteSpecification(
clusterName = it.service,
routeDomains = listOf(it.service) + getServiceWithCustomDomain(it.service),
settings = it.settings,
clusterWeights = getTrafficSplittingWeights(group.serviceName, globalSnapshot, it.service)
group.serviceName,
globalSnapshot
)
}
return when (group) {
Expand All @@ -202,29 +203,45 @@ class EnvoySnapshotFactory(
is AllServicesGroup -> {
val servicesNames = group.proxySettings.outgoing.getServiceDependencies().map { it.service }.toSet()
val allServicesRoutes = globalSnapshot.allServicesNames.subtract(servicesNames).map {
RouteSpecification(
getTrafficSplittingRouteSpecification(
clusterName = it,
routeDomains = listOf(it) + getServiceWithCustomDomain(it),
settings = group.proxySettings.outgoing.defaultServiceSettings,
clusterWeights = getTrafficSplittingWeights(group.serviceName, globalSnapshot, it)
group.serviceName,
globalSnapshot
)
}
allServicesRoutes + definedServicesRoutes
}
}
}

private fun getTrafficSplittingWeights(
private fun getTrafficSplittingRouteSpecification(
clusterName: String,
routeDomains: List<String>,
settings: DependencySettings,
serviceName: String,
globalSnapshot: GlobalSnapshot,
dependencyServiceName: String
): Map<String, Int> {
): RouteSpecification {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val weights = trafficSplitting.serviceByWeightsProperties[serviceName] ?: mapOf()
val enabledForDependency = globalSnapshot.endpoints[dependencyServiceName]?.endpointsList
val weights = trafficSplitting.serviceByWeightsProperties[serviceName]
val enabledForDependency = globalSnapshot.endpoints[clusterName]?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
return if (enabledForDependency) weights else mapOf()
return if (weights != null && enabledForDependency) {
WeightRouteSpecification(
clusterName,
routeDomains,
settings,
weights
)
} else {
StandardRouteSpecification(
clusterName,
routeDomains,
settings
)
}
}

private fun getServiceWithCustomDomain(it: String): List<String> {
Expand All @@ -238,7 +255,7 @@ class EnvoySnapshotFactory(
private fun getServicesEndpointsForGroup(
rateLimitEndpoints: List<IncomingRateLimitEndpoint>,
globalSnapshot: GlobalSnapshot,
egressRouteSpecifications: Collection<RouteSpecification>
egressRouteSpecifications: List<RouteSpecification>
): List<ClusterLoadAssignment> {
val egressLoadAssignments = egressRouteSpecifications.mapNotNull { routeSpec ->
globalSnapshot.endpoints[routeSpec.clusterName]?.let { endpoints ->
Expand Down Expand Up @@ -396,9 +413,22 @@ data class ClusterConfiguration(
val http2Enabled: Boolean
)

class RouteSpecification(
val clusterName: String,
val routeDomains: List<String>,
val settings: DependencySettings,
val clusterWeights: Map<String, Int> = mapOf()
)
sealed class RouteSpecification {
abstract val clusterName: String
abstract val routeDomains: List<String>
abstract val settings: DependencySettings
}

data class StandardRouteSpecification(
override val clusterName: String,
override val routeDomains: List<String>,
override val settings: DependencySettings,
) : RouteSpecification() {
}

data class WeightRouteSpecification(
override val clusterName: String,
override val routeDomains: List<String>,
override val settings: DependencySettings,
val clusterWeights: ZoneWeights,
) : RouteSpecification()
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ class CanaryProperties {

class TrafficSplittingProperties {
var zoneName = ""
var serviceByWeightsProperties: Map<String, Map<String, Int>> = mapOf()
var serviceByWeightsProperties: Map<String, ZoneWeights> = mapOf()
}

class ZoneWeights {
var main = 100
var secondary = 0
}

class LoadBalancingWeightsProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory

typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig
Expand Down Expand Up @@ -211,7 +212,6 @@ class EnvoyClustersFactory(
is ServicesGroup -> dependencies.flatMap {
createClusters(
group.serviceName,
dependencies.keys,
it.value.settings,
clusters[it.key],
globalSnapshot.endpoints[it.key]
Expand All @@ -223,7 +223,6 @@ class EnvoyClustersFactory(
val dependency = dependencies[it]
createClusters(
group.serviceName,
globalSnapshot.allServicesNames,
getDependencySettings(dependency, group),
clusters[it],
globalSnapshot.endpoints[it]
Expand Down Expand Up @@ -275,13 +274,12 @@ class EnvoyClustersFactory(

private fun createClusters(
serviceName: String,
dependencies: Set<String>,
dependencySettings: DependencySettings,
cluster: Cluster?,
clusterLoadAssignment: ClusterLoadAssignment?
): Collection<Cluster> {
return cluster?.let {
if (enableTrafficSplitting(serviceName, cluster.name, dependencies, clusterLoadAssignment)) {
if (enableTrafficSplitting(serviceName, clusterLoadAssignment)) {
logger.debug(
"Creating traffic splitting egress cluster config for ${cluster.name}, service: $serviceName"
)
Expand All @@ -294,18 +292,19 @@ class EnvoyClustersFactory(

private fun enableTrafficSplitting(
serviceName: String,
clusterName: String,
dependencies: Set<String>,
clusterLoadAssignment: ClusterLoadAssignment?
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.containsKey(serviceName) &&
dependencies.contains(clusterName)
val hasEndpointsInZone = clusterLoadAssignment?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone } ?: false
return trafficSplitEnabled && hasEndpointsInZone
val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.containsKey(serviceName)
return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting)
}

private fun hasEndpointsInZone(
clusterLoadAssignment: ClusterLoadAssignment?,
trafficSplitting: TrafficSplittingProperties
) = clusterLoadAssignment?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone } ?: false

private fun shouldAddDynamicForwardProxyCluster(group: Group) =
group.proxySettings.outgoing.getDomainPatternDependencies().isNotEmpty()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator

Expand Down Expand Up @@ -78,10 +79,10 @@ class EnvoyEndpointsFactory(

fun getSecondaryClusterEndpoints(
clusterLoadAssignments: Map<String, ClusterLoadAssignment>,
egressRouteSpecifications: Collection<RouteSpecification>
egressRouteSpecifications: List<RouteSpecification>
): List<ClusterLoadAssignment> {
return egressRouteSpecifications
.filter { it.clusterWeights.isNotEmpty() }
.filterIsInstance<WeightRouteSpecification>()
.onEach { logger.debug("Traffic splitting is enabled for cluster: ${it.clusterName}") }
.mapNotNull { routeSpec ->
clusterLoadAssignments[routeSpec.clusterName]?.let { assignment ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryHostPredicate
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.StandardRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.ServiceTagFilterFactory
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryPolicy as EnvoyControlRetryPolicy
Expand Down Expand Up @@ -345,23 +347,25 @@ class EnvoyEgressRoutesFactory(
}

private fun RouteAction.Builder.setCluster(routeSpec: RouteSpecification): RouteAction.Builder {
val hasWeightsConfig = routeSpec.clusterWeights.keys.containsAll(listOf("main", "secondary"))
return if (!hasWeightsConfig) {
this.setCluster(routeSpec.clusterName)
} else {
logger.debug(
"Creating weighted cluster configuration for route spec {}, {}",
routeSpec.clusterName,
routeSpec.clusterWeights
)
this.setWeightedClusters(
WeightedCluster.newBuilder()
.withClusterWeight(routeSpec.clusterName, routeSpec.clusterWeights["main"]!!)
.withClusterWeight(
getSecondaryClusterName(routeSpec.clusterName),
routeSpec.clusterWeights["secondary"]!!
)
)
return when(routeSpec) {
is WeightRouteSpecification -> {
logger.debug(
"Creating weighted cluster configuration for route spec {}, {}",
routeSpec.clusterName,
routeSpec.clusterWeights
)
this.setWeightedClusters(
WeightedCluster.newBuilder()
.withClusterWeight(routeSpec.clusterName, routeSpec.clusterWeights.main)
.withClusterWeight(
getSecondaryClusterName(routeSpec.clusterName),
routeSpec.clusterWeights.secondary
)
)
}
is StandardRouteSpecification -> {
this.setCluster(routeSpec.clusterName)
}
}
}

Expand Down

0 comments on commit 3449ff3

Please sign in to comment.