diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a68a60681..fd2dd997c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -36,7 +36,7 @@ jobs: - uses: actions/setup-java@v3 with: distribution: 'temurin' - java-version: '11' + java-version: '17' - name: Cache Gradle packages uses: actions/cache@v2 diff --git a/.github/workflows/flaky.yaml b/.github/workflows/flaky.yaml new file mode 100644 index 000000000..7671d1c26 --- /dev/null +++ b/.github/workflows/flaky.yaml @@ -0,0 +1,52 @@ +name: Flaky tests + +on: + workflow_dispatch: + + push: + paths-ignore: + - 'readme.md' + +jobs: + flaky_test: + name: flaky_test + runs-on: ubuntu-latest + env: + GRADLE_OPTS: '-Dfile.encoding=utf-8 -Dorg.gradle.daemon=false' + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + ref: ${{ github.head_ref }} + + - uses: gradle/wrapper-validation-action@v1 + + - uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: '17' + + - name: Cache Gradle packages + uses: actions/cache@v3 + with: + path: | + ~/.gradle/caches + ~/.gradle/wrapper + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + restore-keys: | + ${{ runner.os }}-gradle- + + - name: Flaky tests + run: ./gradlew clean -Penvironment=integration :envoy-control-tests:flakyTest + + - name: Junit report + uses: mikepenz/action-junit-report@v2 + if: always() + with: + report_paths: '**/build/test-results/test/TEST-*.xml' + + - name: Cleanup Gradle Cache + run: | + rm -f ~/.gradle/caches/modules-2/modules-2.lock + rm -f ~/.gradle/caches/modules-2/gc.properties diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index acd251d94..5ce6489f3 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -21,11 +21,11 @@ jobs: with: fetch-depth: 0 - uses: gradle/wrapper-validation-action@v1 - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v3 with: distribution: 'temurin' - java-version: '11' + java-version: '17' - name: Release if: github.ref == 'refs/heads/master' run: ./gradlew release -Prelease.customPassword=${GITHUB_TOKEN} -Prelease.customUsername=${GITHUB_ACTOR} -Prelease.forceVersion=${FORCE_VERSION} diff --git a/.github/workflows/resilence.yaml b/.github/workflows/resilence.yaml index eea3a94aa..4a9d64bb6 100644 --- a/.github/workflows/resilence.yaml +++ b/.github/workflows/resilence.yaml @@ -25,7 +25,7 @@ jobs: - uses: actions/setup-java@v3 with: distribution: 'temurin' - java-version: '11' + java-version: '17' - name: Cache Gradle packages uses: actions/cache@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index d28b96a19..65e5bed63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,42 @@ Lists all changes with user impact. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). +## [0.20.4] + +### Changed +- Added possibility to add response header for weighted secondary cluster + +## [0.20.4] + +### Changed +- Fix `shouldAuditGlobalSnapshot` property + +## [0.20.3] + +### Changed +- Fixed traffic splitting condition check for cluster configuration + +## [0.20.2] + +### Changed +- Updated property names: secondaryClusterPostfix is changed to secondaryClusterSuffix, +- aggregateClusterPostfix is changed to aggregateClusterSuffix + +## [0.20.1] + +### Changed +- Implemented configuring traffic splitting and fallback using aggregate cluster functionality + +## [0.20.0] + +### Changed +- Spring Boot upgraded to 3.1.2 +- Java upgraded to 17 +- Kotlin upgraded to 1.8.2 +- Gradle upgraded to 8.3 + +### Fixed +- Random port generation for testcontainers ## [0.19.36] diff --git a/build.gradle b/build.gradle index 9b68f1f27..a69874cfb 100644 --- a/build.gradle +++ b/build.gradle @@ -12,13 +12,14 @@ plugins { id 'signing' id 'pl.allegro.tech.build.axion-release' version '1.13.3' - id 'org.jetbrains.kotlin.jvm' version '1.6.10' - id 'org.jetbrains.kotlin.plugin.spring' version '1.6.10' - id 'org.jetbrains.kotlin.plugin.allopen' version '1.6.10' + id 'org.jetbrains.kotlin.jvm' version '1.8.22' + id 'org.jetbrains.kotlin.plugin.spring' version '1.8.22' + id 'org.jetbrains.kotlin.plugin.allopen' version '1.8.22' id "org.jlleitschuh.gradle.ktlint" version "10.2.1" id "org.jlleitschuh.gradle.ktlint-idea" version "10.2.0" id "io.gitlab.arturbosch.detekt" version "1.18.0" id 'io.github.gradle-nexus.publish-plugin' version '1.0.0' + id 'org.springframework.boot' version '3.1.2' apply false } @@ -44,34 +45,25 @@ allprojects { apply plugin: 'kotlin' apply plugin: 'kotlin-spring' + apply plugin: 'io.spring.dependency-management' project.ext.versions = [ - kotlin : '1.6.10', java_controlplane : '1.0.37', - spring_boot : '2.3.4.RELEASE', + spring_boot : '3.1.2', grpc : '1.48.1', - jaxb : '2.3.1', - javaxactivation : '1.2.0', - micrometer : '1.5.5', - dropwizard : '4.1.12.1', ecwid_consul : '1.4.1', - awaitility : '4.0.3', - embedded_consul : '2.0.0', - junit : '5.6.2', - assertj : '3.17.2', - jackson : '2.11.2', toxiproxy : '2.1.3', - testcontainers : '1.16.0', - reactor : '3.3.10.RELEASE', consul_recipes : '0.9.1', - mockito : '3.3.3', cglib : '3.2.9', - logback : '1.2.3', - slf4j : '1.7.30', re2j : '1.3', xxhash : '0.10.1', - okhttp : '4.9.0' ] + + dependencyManagement { + imports { + mavenBom org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES + } + } } @@ -91,7 +83,6 @@ subprojects { apply plugin: 'io.gitlab.arturbosch.detekt' apply plugin: 'signing' - sourceCompatibility = JavaVersion.VERSION_11 [compileJava, compileTestJava]*.options*.encoding = 'UTF-8' ktlint { @@ -167,23 +158,22 @@ subprojects { compile.exclude group: 'log4j', module: 'log4j' } - compileKotlin { - kotlinOptions { - jvmTarget = '11' + java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(17)) } } - compileTestKotlin { - kotlinOptions { - jvmTarget = '11' + kotlin { + jvmToolchain { + languageVersion.set(JavaLanguageVersion.of(17)) } } - dependencies { - testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: versions.junit - testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params', version: versions.junit - testImplementation group: 'org.assertj', name: 'assertj-core', version: versions.assertj - testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: versions.junit + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api' + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params' + testImplementation group: 'org.assertj', name: 'assertj-core' + testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine' } detekt { @@ -195,5 +185,5 @@ subprojects { } wrapper { - gradleVersion = '7.1.1' + gradleVersion = '8.3' } diff --git a/docs/development.md b/docs/development.md index 85f89347a..94aff10e0 100644 --- a/docs/development.md +++ b/docs/development.md @@ -16,6 +16,8 @@ Envoy Control is a [Kotlin](https://kotlinlang.org/) application, it requires JD ```./gradlew integrationTest``` * Reliability tests ```./gradlew clean -i -Penvironment=integration :envoy-control-tests:reliabilityTest -DRELIABILITY_FAILURE_DURATION_SECONDS=20``` +* Flaky tests +```./gradlew -Penvironment=integration :envoy-control-tests:flakyTest``` ## Running Lua tests locally (not inside docker) for debugging purposes diff --git a/envoy-control-core/build.gradle b/envoy-control-core/build.gradle index 10726697f..8a7578b8a 100644 --- a/envoy-control-core/build.gradle +++ b/envoy-control-core/build.gradle @@ -1,33 +1,36 @@ +plugins { + id 'org.springframework.boot' apply false +} + dependencies { api project(':envoy-control-services') - implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib', version: versions.kotlin - implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: versions.kotlin - api group: 'com.fasterxml.jackson.module', name: 'jackson-module-afterburner', version: versions.jackson - api group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: versions.jackson - implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: versions.kotlin - api group: 'io.dropwizard.metrics', name: 'metrics-core', version: versions.dropwizard - api group: 'io.micrometer', name: 'micrometer-core', version: versions.micrometer + implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib' + api group: 'com.fasterxml.jackson.module', name: 'jackson-module-afterburner' + api group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin' + implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect' + api group: 'io.dropwizard.metrics', name: 'metrics-core' + api group: 'io.micrometer', name: 'micrometer-core' implementation group: 'com.google.re2j', name: 're2j', version: versions.re2j api group: 'io.envoyproxy.controlplane', name: 'server', version: versions.java_controlplane implementation group: 'io.grpc', name: 'grpc-netty', version: versions.grpc - implementation group: 'io.projectreactor', name: 'reactor-core', version: versions.reactor + implementation group: 'io.projectreactor', name: 'reactor-core' - implementation group: 'org.slf4j', name: 'jcl-over-slf4j', version: versions.slf4j - implementation group: 'ch.qos.logback', name: 'logback-classic', version: versions.logback + implementation group: 'org.slf4j', name: 'jcl-over-slf4j' + implementation group: 'ch.qos.logback', name: 'logback-classic' testImplementation group: 'io.grpc', name: 'grpc-testing', version: versions.grpc - testImplementation group: 'io.projectreactor', name: 'reactor-test', version: versions.reactor - testImplementation group: 'org.mockito', name: 'mockito-core', version: versions.mockito + testImplementation group: 'io.projectreactor', name: 'reactor-test' + testImplementation group: 'org.mockito', name: 'mockito-core' testImplementation group: 'cglib', name: 'cglib-nodep', version: versions.cglib - testImplementation group: 'org.awaitility', name: 'awaitility', version: versions.awaitility + testImplementation group: 'org.awaitility', name: 'awaitility' - testImplementation group: 'org.testcontainers', name: 'testcontainers', version: versions.testcontainers - testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: versions.testcontainers + testImplementation group: 'org.testcontainers', name: 'testcontainers' + testImplementation group: 'org.testcontainers', name: 'junit-jupiter' } tasks.withType(GroovyCompile) { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt index bc02ee3c5..c7fe1136c 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/ControlPlane.kt @@ -337,7 +337,11 @@ class ControlPlane private constructor( } fun withSnapshotChangeAuditor(snapshotChangeAuditor: SnapshotChangeAuditor): ControlPlaneBuilder { - this.snapshotChangeAuditor = snapshotChangeAuditor + if (properties.envoy.snapshot.shouldAuditGlobalSnapshot) { + this.snapshotChangeAuditor = snapshotChangeAuditor + } else { + this.snapshotChangeAuditor = NoopSnapshotChangeAuditor + } return this } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt index 86875d9ff..ae160ef02 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/EnvoySnapshotFactory.kt @@ -15,6 +15,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.orDefault +import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances @@ -38,6 +39,7 @@ class EnvoySnapshotFactory( companion object { const val DEFAULT_HTTP_PORT = 80 + private val logger by logger() } fun newSnapshot( @@ -111,6 +113,7 @@ class EnvoySnapshotFactory( val removedClusters = previous - current.keys current + removedClusters } + false -> current } } @@ -156,24 +159,26 @@ class EnvoySnapshotFactory( return newSnapshotForGroup } - private fun getDomainRouteSpecifications(group: Group): Map> { + private fun getDomainRouteSpecifications( + group: Group + ): Map> { return group.proxySettings.outgoing.getDomainDependencies().groupBy( { DomainRoutesGrouper(it.getPort(), it.useSsl()) }, { - RouteSpecification( + StandardRouteSpecification( clusterName = it.getClusterName(), routeDomains = listOf(it.getRouteDomain()), - settings = it.settings + settings = it.settings, ) } ) } private fun getDomainPatternRouteSpecifications(group: Group): RouteSpecification { - return RouteSpecification( + return StandardRouteSpecification( clusterName = properties.dynamicForwardProxy.clusterName, routeDomains = group.proxySettings.outgoing.getDomainPatternDependencies().map { it.domainPattern }, - settings = group.proxySettings.outgoing.defaultServiceSettings + settings = group.proxySettings.outgoing.defaultServiceSettings, ) } @@ -182,23 +187,28 @@ class EnvoySnapshotFactory( globalSnapshot: GlobalSnapshot ): Collection { val definedServicesRoutes = group.proxySettings.outgoing.getServiceDependencies().map { - RouteSpecification( + buildRouteSpecification( clusterName = it.service, routeDomains = listOf(it.service) + getServiceWithCustomDomain(it.service), - settings = it.settings + settings = it.settings, + group.serviceName, + globalSnapshot ) } return when (group) { is ServicesGroup -> { definedServicesRoutes } + is AllServicesGroup -> { val servicesNames = group.proxySettings.outgoing.getServiceDependencies().map { it.service }.toSet() val allServicesRoutes = globalSnapshot.allServicesNames.subtract(servicesNames).map { - RouteSpecification( + buildRouteSpecification( clusterName = it, routeDomains = listOf(it) + getServiceWithCustomDomain(it), - settings = group.proxySettings.outgoing.defaultServiceSettings + settings = group.proxySettings.outgoing.defaultServiceSettings, + group.serviceName, + globalSnapshot ) } allServicesRoutes + definedServicesRoutes @@ -206,6 +216,38 @@ class EnvoySnapshotFactory( } } + private fun buildRouteSpecification( + clusterName: String, + routeDomains: List, + settings: DependencySettings, + serviceName: String, + globalSnapshot: GlobalSnapshot, + ): RouteSpecification { + val trafficSplitting = properties.loadBalancing.trafficSplitting + val weights = trafficSplitting.serviceByWeightsProperties[serviceName] + val enabledForDependency = globalSnapshot.endpoints[clusterName]?.endpointsList + ?.any { e -> trafficSplitting.zoneName == e.locality.zone } + ?: false + return if (weights != null && enabledForDependency) { + logger.debug( + "Building traffic splitting route spec, weights: $weights, " + + "serviceName: $serviceName, clusterName: $clusterName, " + ) + WeightRouteSpecification( + clusterName, + routeDomains, + settings, + weights + ) + } else { + StandardRouteSpecification( + clusterName, + routeDomains, + settings + ) + } + } + private fun getServiceWithCustomDomain(it: String): List { return if (properties.egress.domains.isNotEmpty()) { properties.egress.domains.map { domain -> "$it$domain" } @@ -217,7 +259,7 @@ class EnvoySnapshotFactory( private fun getServicesEndpointsForGroup( rateLimitEndpoints: List, globalSnapshot: GlobalSnapshot, - egressRouteSpecifications: Collection + egressRouteSpecifications: List ): List { val egressLoadAssignments = egressRouteSpecifications.mapNotNull { routeSpec -> globalSnapshot.endpoints[routeSpec.clusterName]?.let { endpoints -> @@ -226,27 +268,30 @@ class EnvoySnapshotFactory( // endpointsFactory.filterEndpoints() can use this cache to prevent computing the same // ClusterLoadAssignments many times - it may reduce MEM, CPU and latency if some serviceTags are // commonly used - endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy) + routeSpec.clusterName to endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy) } - } + }.toMap() val rateLimitClusters = if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList() val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] } - - return egressLoadAssignments + rateLimitLoadAssignments + val secondaryLoadAssignments = endpointsFactory.getSecondaryClusterEndpoints( + egressLoadAssignments, + egressRouteSpecifications + ) + return egressLoadAssignments.values.toList() + rateLimitLoadAssignments + secondaryLoadAssignments } private fun newSnapshotForGroup( group: Group, globalSnapshot: GlobalSnapshot ): Snapshot { - // TODO(dj): This is where serious refactoring needs to be done val egressDomainRouteSpecifications = getDomainRouteSpecifications(group) val egressServiceRouteSpecification = getServiceRouteSpecifications(group, globalSnapshot) val egressRouteSpecification = egressServiceRouteSpecification + - egressDomainRouteSpecifications.values.flatten().toSet() + getDomainPatternRouteSpecifications(group) + egressDomainRouteSpecifications.values.flatten().toSet() + + getDomainPatternRouteSpecifications(group) val clusters: List = clustersFactory.getClustersForGroup(group, globalSnapshot) @@ -272,7 +317,6 @@ class EnvoySnapshotFactory( ) ) } - val listeners = if (properties.dynamicListeners.enabled) { listenersFactory.createListeners(group, globalSnapshot) } else { @@ -281,11 +325,12 @@ class EnvoySnapshotFactory( // TODO(dj): endpoints depends on prerequisite of routes -> but only to extract clusterName, // which is present only in services (not domains) so it could be implemented differently. - val endpoints = getServicesEndpointsForGroup(group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot, - egressRouteSpecification) + val endpoints = getServicesEndpointsForGroup( + group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot, + egressRouteSpecification + ) val version = snapshotsVersions.version(group, clusters, endpoints, listeners, routes) - return createSnapshot( clusters = clusters, clustersVersion = version.clusters, @@ -372,8 +417,21 @@ data class ClusterConfiguration( val http2Enabled: Boolean ) -class RouteSpecification( - val clusterName: String, - val routeDomains: List, - val settings: DependencySettings -) +sealed class RouteSpecification { + abstract val clusterName: String + abstract val routeDomains: List + abstract val settings: DependencySettings +} + +data class StandardRouteSpecification( + override val clusterName: String, + override val routeDomains: List, + override val settings: DependencySettings, +) : RouteSpecification() + +data class WeightRouteSpecification( + override val clusterName: String, + override val routeDomains: List, + override val settings: DependencySettings, + val clusterWeights: ZoneWeights, +) : RouteSpecification() diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt index 5e158e8da..5839041c9 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt @@ -36,6 +36,7 @@ class SnapshotProperties { var deltaXdsEnabled = false var retryPolicy = RetryPolicyProperties() var tcpDumpsEnabled: Boolean = true + var shouldAuditGlobalSnapshot: Boolean = true } class MetricsProperties { @@ -143,6 +144,7 @@ class LoadBalancingProperties { var regularMetadataKey = "lb_regular" var localityMetadataKey = "locality" var weights = LoadBalancingWeightsProperties() + var trafficSplitting = TrafficSplittingProperties() var policy = Cluster.LbPolicy.LEAST_REQUEST var useKeysSubsetFallbackPolicy = true var priorities = LoadBalancingPriorityProperties() @@ -154,6 +156,19 @@ class CanaryProperties { var headerValue = "1" } +class TrafficSplittingProperties { + var zoneName = "" + var headerName = "" + var serviceByWeightsProperties: Map = mapOf() + var secondaryClusterSuffix = "secondary" + var aggregateClusterSuffix = "aggregate" +} + +class ZoneWeights { + var main = 100 + var secondary = 0 +} + class LoadBalancingWeightsProperties { var enabled = false } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 92ef0be9b..3baea120d 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -150,7 +150,7 @@ class SnapshotUpdater( private fun updateSnapshotForGroup(group: Group, globalSnapshot: GlobalSnapshot) { try { val groupSnapshot = snapshotFactory.getSnapshotForGroup(group, globalSnapshot) - snapshotTimer(group.serviceName).record { + snapshotTimer(group.serviceName).recordCallable { cache.setSnapshot(group, groupSnapshot) } } catch (e: Throwable) { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt index b53b2c785..b0601c06d 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt @@ -43,6 +43,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings import pl.allegro.tech.servicemesh.envoycontrol.groups.DomainDependency import pl.allegro.tech.servicemesh.envoycontrol.groups.Group +import pl.allegro.tech.servicemesh.envoycontrol.groups.ServiceDependency import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup import pl.allegro.tech.servicemesh.envoycontrol.groups.containsGlobalRateLimits import pl.allegro.tech.servicemesh.envoycontrol.logger @@ -51,8 +52,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory +typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig + class EnvoyClustersFactory( private val properties: SnapshotProperties ) { @@ -75,6 +79,16 @@ class EnvoyClustersFactory( companion object { private val logger by logger() + + @JvmStatic + fun getSecondaryClusterName(serviceName: String, snapshotProperties: SnapshotProperties): String { + return "$serviceName-${snapshotProperties.loadBalancing.trafficSplitting.secondaryClusterSuffix}" + } + + @JvmStatic + fun getAggregateClusterName(serviceName: String, snapshotProperties: SnapshotProperties): String { + return "$serviceName-${snapshotProperties.loadBalancing.trafficSplitting.aggregateClusterSuffix}" + } } fun getClustersForServices( @@ -191,20 +205,26 @@ class EnvoyClustersFactory( globalSnapshot.clusters } - val serviceDependencies = group.proxySettings.outgoing.getServiceDependencies().associateBy { it.service } - + val dependencies = group.proxySettings.outgoing.getServiceDependencies().associateBy { it.service } val clustersForGroup = when (group) { - is ServicesGroup -> serviceDependencies.mapNotNull { - createClusterForGroup(it.value.settings, clusters[it.key]) + is ServicesGroup -> dependencies.flatMap { + createClusters( + group.serviceName, + it.value.settings, + clusters[it.key], + globalSnapshot.endpoints[it.key] + ) } + is AllServicesGroup -> { - globalSnapshot.allServicesNames.mapNotNull { - val dependency = serviceDependencies[it] - if (dependency != null && dependency.settings.timeoutPolicy.connectionIdleTimeout != null) { - createClusterForGroup(dependency.settings, clusters[it]) - } else { - createClusterForGroup(group.proxySettings.outgoing.defaultServiceSettings, clusters[it]) - } + globalSnapshot.allServicesNames.flatMap { + val dependency = dependencies[it] + createClusters( + group.serviceName, + getDependencySettings(dependency, group), + clusters[it], + globalSnapshot.endpoints[it] + ) } } } @@ -215,17 +235,77 @@ class EnvoyClustersFactory( return clustersForGroup } - private fun createClusterForGroup(dependencySettings: DependencySettings, cluster: Cluster?): Cluster? { + private fun getDependencySettings(dependency: ServiceDependency?, group: Group): DependencySettings { + return if (dependency != null && dependency.settings.timeoutPolicy.connectionIdleTimeout != null) { + dependency.settings + } else group.proxySettings.outgoing.defaultServiceSettings + } + + private fun createClusterForGroup( + dependencySettings: DependencySettings, + cluster: Cluster, + clusterName: String? = cluster.name + ): Cluster { + val idleTimeoutPolicy = + dependencySettings.timeoutPolicy.connectionIdleTimeout ?: cluster.commonHttpProtocolOptions.idleTimeout + return Cluster.newBuilder(cluster) + .setCommonHttpProtocolOptions(HttpProtocolOptions.newBuilder().setIdleTimeout(idleTimeoutPolicy)) + .setName(clusterName) + .setEdsClusterConfig( + Cluster.EdsClusterConfig.newBuilder(cluster.edsClusterConfig) + .setServiceName(clusterName) + ) + .build() + } + + private fun createSetOfClustersForGroup( + dependencySettings: DependencySettings, + cluster: Cluster + ): Collection { + val mainCluster = createClusterForGroup(dependencySettings, cluster) + val secondaryCluster = createClusterForGroup( + dependencySettings, + cluster, + getSecondaryClusterName(cluster.name, properties) + ) + val aggregateCluster = + createAggregateCluster(mainCluster.name, linkedSetOf(secondaryCluster.name, mainCluster.name)) + return listOf(mainCluster, secondaryCluster, aggregateCluster) + .onEach { + logger.debug("Created set of cluster configs for traffic splitting: {}", it.toString()) + } + } + + private fun createClusters( + serviceName: String, + dependencySettings: DependencySettings, + cluster: Cluster?, + clusterLoadAssignment: ClusterLoadAssignment? + ): Collection { return cluster?.let { - val idleTimeoutPolicy = - dependencySettings.timeoutPolicy.connectionIdleTimeout ?: cluster.commonHttpProtocolOptions.idleTimeout - Cluster.newBuilder(cluster) - .setCommonHttpProtocolOptions( - HttpProtocolOptions.newBuilder().setIdleTimeout(idleTimeoutPolicy) - ).build() - } + if (enableTrafficSplitting(serviceName, clusterLoadAssignment)) { + createSetOfClustersForGroup(dependencySettings, cluster) + } else { + listOf(createClusterForGroup(dependencySettings, cluster)) + } + } ?: listOf() + } + + private fun enableTrafficSplitting( + serviceName: String, + clusterLoadAssignment: ClusterLoadAssignment? + ): Boolean { + val trafficSplitting = properties.loadBalancing.trafficSplitting + val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.containsKey(serviceName) + return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting) } + private fun hasEndpointsInZone( + clusterLoadAssignment: ClusterLoadAssignment?, + trafficSplitting: TrafficSplittingProperties + ) = clusterLoadAssignment?.endpointsList + ?.any { e -> trafficSplitting.zoneName == e.locality.zone && e.lbEndpointsCount > 0 } ?: false + private fun shouldAddDynamicForwardProxyCluster(group: Group) = group.proxySettings.outgoing.getDomainPatternDependencies().isNotEmpty() @@ -277,6 +357,25 @@ class EnvoyClustersFactory( } } + private fun createAggregateCluster(clusterName: String, aggregatedClusters: Collection): Cluster { + return Cluster.newBuilder() + .setName(getAggregateClusterName(clusterName, properties)) + .setConnectTimeout(Durations.fromMillis(properties.edsConnectionTimeout.toMillis())) + .setLbPolicy(Cluster.LbPolicy.CLUSTER_PROVIDED) + .setClusterType( + Cluster.CustomClusterType.newBuilder() + .setName("envoy.clusters.aggregate") + .setTypedConfig( + Any.pack( + EnvoyClusterConfig.newBuilder() + .addAllClusters(aggregatedClusters) + .build() + ) + ) + ) + .build() + } + private fun strictDnsCluster( domainDependency: DomainDependency, useTransparentProxy: Boolean @@ -372,6 +471,7 @@ class EnvoyClustersFactory( ADS -> ConfigSource.newBuilder() .setResourceApiVersion(ApiVersion.V3) .setAds(AggregatedConfigSource.newBuilder()) + XDS -> ConfigSource.newBuilder() .setResourceApiVersion(ApiVersion.V3) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt index 98e371bf8..0472e0718 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt @@ -17,7 +17,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.Locality import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator typealias EnvoyProxyLocality = io.envoyproxy.envoy.config.core.v3.Locality @@ -74,6 +77,27 @@ class EnvoyEndpointsFactory( } } + fun getSecondaryClusterEndpoints( + clusterLoadAssignments: Map, + egressRouteSpecifications: List + ): List { + return egressRouteSpecifications + .filterIsInstance() + .onEach { logger.debug("Traffic splitting is enabled for cluster: ${it.clusterName}") } + .mapNotNull { routeSpec -> + clusterLoadAssignments[routeSpec.clusterName]?.let { assignment -> + ClusterLoadAssignment.newBuilder(assignment) + .clearEndpoints() + .addAllEndpoints(assignment.endpointsList?.filter { e -> + e.locality.zone == properties.loadBalancing.trafficSplitting.zoneName + }) + .setClusterName(getSecondaryClusterName(routeSpec.clusterName, properties)) + .build() + } + } + .filter { it.endpointsList.isNotEmpty() } + } + private fun filterEndpoints(loadAssignment: ClusterLoadAssignment, tag: String): ClusterLoadAssignment? { var allEndpointMatched = true val filteredEndpoints = loadAssignment.endpointsList.mapNotNull { localityLbEndpoint -> diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactory.kt index 0d2a39185..021444709 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactory.kt @@ -20,6 +20,7 @@ import io.envoyproxy.envoy.config.route.v3.RouteAction import io.envoyproxy.envoy.config.route.v3.RouteConfiguration import io.envoyproxy.envoy.config.route.v3.RouteMatch import io.envoyproxy.envoy.config.route.v3.VirtualHost +import io.envoyproxy.envoy.config.route.v3.WeightedCluster import io.envoyproxy.envoy.extensions.retry.host.omit_canary_hosts.v3.OmitCanaryHostsPredicate import io.envoyproxy.envoy.extensions.retry.host.omit_host_metadata.v3.OmitHostMetadataConfig import io.envoyproxy.envoy.extensions.retry.host.previous_hosts.v3.PreviousHostsPredicate @@ -28,14 +29,22 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.ListenersConfig.AddUpstre import pl.allegro.tech.servicemesh.envoycontrol.groups.RateLimitedRetryBackOff import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryBackOff import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryHostPredicate +import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.StandardRouteSpecification +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getAggregateClusterName import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.ServiceTagFilterFactory +import java.lang.Boolean.TRUE import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryPolicy as EnvoyControlRetryPolicy class EnvoyEgressRoutesFactory( private val properties: SnapshotProperties ) { + companion object { + private val logger by logger() + } /** * By default envoy doesn't proxy requests to provided IP address. We created cluster: envoy-original-destination @@ -314,7 +323,7 @@ class EnvoyEgressRoutesFactory( shouldAddRetryPolicy: Boolean = false ): RouteAction.Builder { val routeAction = RouteAction.newBuilder() - .setCluster(routeSpecification.clusterName) + .setCluster(routeSpecification) routeSpecification.settings.timeoutPolicy.let { timeoutPolicy -> timeoutPolicy.idleTimeout?.let { routeAction.setIdleTimeout(it) } @@ -337,6 +346,65 @@ class EnvoyEgressRoutesFactory( return routeAction } + + private fun RouteAction.Builder.setCluster(routeSpec: RouteSpecification): RouteAction.Builder { + return when (routeSpec) { + is WeightRouteSpecification -> { + logger.debug( + "Creating weighted cluster configuration for route spec {}, {}", + routeSpec.clusterName, + routeSpec.clusterWeights + ) + this.setWeightedClusters( + WeightedCluster.newBuilder() + .withClusterWeight(routeSpec.clusterName, routeSpec.clusterWeights.main) + .withClusterWeight( + getAggregateClusterName(routeSpec.clusterName, properties), + routeSpec.clusterWeights.secondary, + true + ) + ) + } + is StandardRouteSpecification -> { + this.setCluster(routeSpec.clusterName) + } + } + } + + private fun WeightedCluster.Builder.withClusterWeight( + clusterName: String, + weight: Int, + withHeader: Boolean = false + ): WeightedCluster.Builder { + val clusters = WeightedCluster.ClusterWeight.newBuilder() + .setName(clusterName) + .setWeight(UInt32Value.of(weight)) + .also { + if (withHeader) { + it.withHeader(properties.loadBalancing.trafficSplitting.headerName) + } + } + return this.addClusters(clusters) + } + + private fun WeightedCluster.ClusterWeight.Builder.withHeader(key: String?): WeightedCluster.ClusterWeight.Builder { + key?.takeIf { it.isNotBlank() } + ?.let { + this.addResponseHeadersToAdd(buildHeader(key)) + } + return this + } + + private fun buildHeader(key: String): HeaderValueOption.Builder { + return HeaderValueOption.newBuilder() + .setHeader( + HeaderValue.newBuilder() + .setKey(key) + .setValue(TRUE.toString()) + ) + .setAppendAction(HeaderValueOption.HeaderAppendAction.OVERWRITE_IF_EXISTS_OR_ADD) + .setKeepEmptyValue(false) + } } class RequestPolicyMapper private constructor() { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngressRoutesFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngressRoutesFactory.kt index f293b9ad6..40b076417 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngressRoutesFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyIngressRoutesFactory.kt @@ -19,8 +19,8 @@ import io.envoyproxy.envoy.config.route.v3.VirtualHost import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher import io.envoyproxy.envoy.type.metadata.v3.MetadataKey import pl.allegro.tech.servicemesh.envoycontrol.groups.ClientWithSelector -import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint import pl.allegro.tech.servicemesh.envoycontrol.groups.Group +import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint import pl.allegro.tech.servicemesh.envoycontrol.groups.PathMatchingType import pl.allegro.tech.servicemesh.envoycontrol.groups.ProxySettings import pl.allegro.tech.servicemesh.envoycontrol.protocol.HttpMethod diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 53709ef07..4a99ad39a 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -15,7 +15,9 @@ class GlobalStateChanges( private val meterRegistry: MeterRegistry, private val properties: SyncProperties ) { - private val scheduler = Schedulers.newElastic("global-service-changes-combinator") + private val scheduler = Schedulers.newBoundedElastic( + Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator" + ) fun combined(): Flux { val clusterStatesStreams: List> = clusterStateChanges.map { it.stream() } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt index 23c0932da..3782c7952 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt @@ -29,7 +29,7 @@ class RemoteServices( fun getChanges(interval: Long): Flux { val aclFlux: Flux = Flux.create({ sink -> scheduler.scheduleWithFixedDelay({ - meterRegistry.timer("sync-dc.get-multi-cluster-states.time").record { + meterRegistry.timer("sync-dc.get-multi-cluster-states.time").recordCallable { getChanges(sink::next, interval) } }, 0, interval, TimeUnit.SECONDS) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Ports.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Ports.kt new file mode 100644 index 000000000..783efcd08 --- /dev/null +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Ports.kt @@ -0,0 +1,22 @@ +package pl.allegro.tech.servicemesh.envoycontrol.utils + +import java.net.ServerSocket +import pl.allegro.tech.servicemesh.envoycontrol.logger + +object Ports { + private val usedPorts: MutableSet = mutableSetOf() + val logger by logger() + + @Synchronized + fun nextAvailable(): Int { + var randomPort: Int + do { + randomPort = ServerSocket(0).use { it.localPort } + } while (usedPorts.contains(randomPort)) + usedPorts.add(randomPort) + logger.info("Generated port: {}", randomPort) + logger.info("Used ports: {}", usedPorts) + + return randomPort + } +} diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt index b5e326057..4165fe57d 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt @@ -1,12 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol -import com.google.protobuf.Duration -import com.google.protobuf.util.Durations import io.envoyproxy.controlplane.cache.SnapshotResources import io.envoyproxy.envoy.config.cluster.v3.Cluster -import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource -import io.envoyproxy.envoy.config.core.v3.ConfigSource -import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions import io.envoyproxy.envoy.config.core.v3.Metadata import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment import io.envoyproxy.envoy.config.listener.v3.Listener @@ -38,18 +33,34 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyEg import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIngressRoutesFactory import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator import pl.allegro.tech.servicemesh.envoycontrol.snapshot.serviceDependencies +import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME +import pl.allegro.tech.servicemesh.envoycontrol.utils.CURRENT_ZONE +import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS +import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_DISCOVERY_SERVICE_NAME +import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_IDLE_TIMEOUT +import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME +import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_HOST +import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_PORT +import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_HOST +import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_PORT +import pl.allegro.tech.servicemesh.envoycontrol.utils.TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE +import pl.allegro.tech.servicemesh.envoycontrol.utils.createCluster +import pl.allegro.tech.servicemesh.envoycontrol.utils.createClusterConfigurations +import pl.allegro.tech.servicemesh.envoycontrol.utils.createEndpoints class EnvoySnapshotFactoryTest { companion object { - const val INGRESS_HOST = "ingress-host" - const val INGRESS_PORT = 3380 - const val EGRESS_HOST = "egress-host" - const val EGRESS_PORT = 3380 - const val CLUSTER_NAME = "cluster-name" - const val DEFAULT_SERVICE_NAME = "service-name" - const val DEFAULT_DISCOVERY_SERVICE_NAME = "discovery-service-name" - const val DEFAULT_IDLE_TIMEOUT = 100L - const val currentZone = "dc1" + const val MAIN_CLUSTER_NAME = "service-name-2" + const val SECONDARY_CLUSTER_NAME = "service-name-2-secondary" + const val AGGREGATE_CLUSTER_NAME = "service-name-2-aggregate" + const val SERVICE_NAME_2 = "service-name-2" + } + + private val snapshotPropertiesWithWeights = SnapshotProperties().also { + it.loadBalancing.trafficSplitting.serviceByWeightsProperties = mapOf( + DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS + ) + it.loadBalancing.trafficSplitting.zoneName = TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE } @Test @@ -217,6 +228,96 @@ class EnvoySnapshotFactoryTest { assertThat(actualCluster2.commonHttpProtocolOptions.idleTimeout.seconds).isEqualTo(12) } + @Test + fun `should create weighted snapshot clusters`() { + // given + val envoySnapshotFactory = createSnapshotFactory(snapshotPropertiesWithWeights) + val cluster1 = createCluster(snapshotPropertiesWithWeights, clusterName = DEFAULT_SERVICE_NAME) + val cluster2 = + createCluster(snapshotPropertiesWithWeights, clusterName = SERVICE_NAME_2) + val group: Group = createServicesGroup( + dependencies = arrayOf(cluster2.name to null), + snapshotProperties = snapshotPropertiesWithWeights + ) + val globalSnapshot = createGlobalSnapshot(cluster1, cluster2) + + // when + val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot) + + // then + assertThat(snapshot.clusters().resources()) + .containsKey(MAIN_CLUSTER_NAME) + .containsKey(SECONDARY_CLUSTER_NAME) + .containsKey(AGGREGATE_CLUSTER_NAME) + assertThat(snapshot.endpoints().resources().values) + .anySatisfy { + assertThat(it.clusterName).isEqualTo(MAIN_CLUSTER_NAME) + assertThat(it.endpointsList) + .anyMatch { e -> e.locality.zone == CURRENT_ZONE } + .anyMatch { e -> e.locality.zone == TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE } + } + assertThat(snapshot.endpoints().resources().values) + .anySatisfy { + assertThat(it.clusterName).isEqualTo(SECONDARY_CLUSTER_NAME) + assertThat(it.endpointsList) + .allMatch { e -> e.locality.zone == TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE } + } + } + + @Test + fun `should get regular snapshot clusters when traffic splitting zone condition isn't complied`() { + // given + val defaultProperties = SnapshotProperties().also { + it.dynamicListeners.enabled = false + it.loadBalancing.trafficSplitting.serviceByWeightsProperties = mapOf( + DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS + ) + it.loadBalancing.trafficSplitting.zoneName = "not-matching-dc" + } + val envoySnapshotFactory = createSnapshotFactory(defaultProperties) + val cluster1 = createCluster(defaultProperties, clusterName = DEFAULT_SERVICE_NAME) + val cluster2 = createCluster(defaultProperties, clusterName = SERVICE_NAME_2) + val group: Group = createServicesGroup( + dependencies = arrayOf(cluster2.name to null), + snapshotProperties = defaultProperties + ) + val globalSnapshot = createGlobalSnapshot(cluster1, cluster2) + + // when + val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot) + + // then + assertThat(snapshot.clusters().resources()) + .containsKey(MAIN_CLUSTER_NAME) + .doesNotContainKey(SECONDARY_CLUSTER_NAME) + .doesNotContainKey(AGGREGATE_CLUSTER_NAME) + } + + @Test + fun `should create weighted snapshot clusters for wildcard dependencies`() { + // given + val envoySnapshotFactory = createSnapshotFactory(snapshotPropertiesWithWeights) + val cluster1 = createCluster(snapshotPropertiesWithWeights, clusterName = DEFAULT_SERVICE_NAME) + val cluster2 = createCluster(snapshotPropertiesWithWeights, clusterName = SERVICE_NAME_2) + val wildcardTimeoutPolicy = outgoingTimeoutPolicy(connectionIdleTimeout = 12) + + val group: Group = createAllServicesGroup( + dependencies = arrayOf("*" to wildcardTimeoutPolicy), + snapshotProperties = snapshotPropertiesWithWeights, + defaultServiceSettings = DependencySettings(), + ) + val globalSnapshot = createGlobalSnapshot(cluster1, cluster2) + + // when + val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot) + + // then + assertThat(snapshot.clusters().resources()) + .containsKey(MAIN_CLUSTER_NAME) + .containsKey(SECONDARY_CLUSTER_NAME) + .containsKey(AGGREGATE_CLUSTER_NAME) + } + @Test fun `should fetch ratelimit service endpoint if there are global rate limits`() { // given @@ -274,10 +375,14 @@ class EnvoySnapshotFactoryTest { } private fun GlobalSnapshot.withEndpoint(clusterName: String): GlobalSnapshot = copy( - endpoints = SnapshotResources.create(listOf(ClusterLoadAssignment.newBuilder() + endpoints = SnapshotResources.create( + listOf( + ClusterLoadAssignment.newBuilder() .setClusterName(clusterName) .build() - ), "v1").resources()) + ), "v1" + ).resources() + ) private fun createServicesGroup( mode: CommunicationMode = CommunicationMode.XDS, @@ -351,7 +456,7 @@ class EnvoySnapshotFactoryTest { ) val egressRoutesFactory = EnvoyEgressRoutesFactory(properties) val clustersFactory = EnvoyClustersFactory(properties) - val endpointsFactory = EnvoyEndpointsFactory(properties, ServiceTagMetadataGenerator(), currentZone) + val endpointsFactory = EnvoyEndpointsFactory(properties, ServiceTagMetadataGenerator(), CURRENT_ZONE) val envoyHttpFilters = EnvoyHttpFilters.defaultFilters(properties) val listenersFactory = EnvoyListenersFactory(properties, envoyHttpFilters) val snapshotsVersions = SnapshotsVersions() @@ -371,33 +476,21 @@ class EnvoySnapshotFactoryTest { private fun createGlobalSnapshot(vararg clusters: Cluster): GlobalSnapshot { return GlobalSnapshot( - SnapshotResources.create(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3").resources(), + SnapshotResources.create(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3") + .resources(), clusters.map { it.name }.toSet(), - SnapshotResources.create(emptyList(), "v1").resources(), - emptyMap(), + SnapshotResources.create(createLoadAssignments(clusters.toList()), "v1").resources(), + createClusterConfigurations(), SnapshotResources.create(clusters.toList(), "v3").resources() ) } - private fun createCluster( - defaultProperties: SnapshotProperties, - clusterName: String = CLUSTER_NAME, - serviceName: String = DEFAULT_SERVICE_NAME, - idleTimeout: Long = DEFAULT_IDLE_TIMEOUT - ): Cluster { - return Cluster.newBuilder().setName(clusterName) - .setType(Cluster.DiscoveryType.EDS) - .setConnectTimeout(Durations.fromMillis(defaultProperties.edsConnectionTimeout.toMillis())) - .setEdsClusterConfig( - Cluster.EdsClusterConfig.newBuilder().setEdsConfig( - ConfigSource.newBuilder().setAds(AggregatedConfigSource.newBuilder()) - ).setServiceName(serviceName) - ) - .setLbPolicy(defaultProperties.loadBalancing.policy) - .setCommonHttpProtocolOptions( - HttpProtocolOptions.newBuilder() - .setIdleTimeout(Duration.newBuilder().setSeconds(idleTimeout).build()) - ) - .build() + private fun createLoadAssignments(clusters: List): List { + return clusters.map { + ClusterLoadAssignment.newBuilder() + .setClusterName(it.name) + .addAllEndpoints(createEndpoints()) + .build() + } } } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataTest.kt index e5ed243ab..6ef2480e1 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataTest.kt @@ -19,6 +19,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties import java.net.URI import java.time.Duration +import java.util.function.Consumer @Suppress("LargeClass") class NodeMetadataTest { @@ -63,12 +64,15 @@ class NodeMetadataTest { arguments("number", Value.newBuilder().setNumberValue(1.0).build(), 1.0), arguments("not_set", Value.newBuilder().build(), null), arguments("null", Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build(), null), - arguments("list", Value.newBuilder().setListValue( - ListValue.newBuilder() - .addValues(Value.newBuilder().setBoolValue(true).build()).build()) - .build(), listOf(true) + arguments( + "list", Value.newBuilder().setListValue( + ListValue.newBuilder() + .addValues(Value.newBuilder().setBoolValue(true).build()).build() + ) + .build(), listOf(true) ), - arguments("struct", Value.newBuilder().setStructValue( + arguments( + "struct", Value.newBuilder().setStructValue( Struct.newBuilder() .putFields("string", Value.newBuilder().setBoolValue(true).build()) .build() @@ -83,10 +87,12 @@ class NodeMetadataTest { arguments(Value.newBuilder().setNumberValue(1.0).build(), 1.0), arguments(Value.newBuilder().build(), null), arguments(Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build(), null), - arguments(Value.newBuilder().setListValue( - ListValue.newBuilder() - .addValues(Value.newBuilder().setBoolValue(true).build()).build()) - .build(), listOf(true) + arguments( + Value.newBuilder().setListValue( + ListValue.newBuilder() + .addValues(Value.newBuilder().setBoolValue(true).build()).build() + ) + .build(), listOf(true) ) ) } @@ -965,7 +971,7 @@ class NodeMetadataTest { // then assertThat(exception.status.description).isEqualTo( "Timeout definition has number format" + - " but should be in string format and ends with 's'" + " but should be in string format and ends with 's'" ) assertThat(exception.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) } @@ -981,7 +987,7 @@ class NodeMetadataTest { // then assertThat(exception.status.description).isEqualTo( "Timeout definition has incorrect format: " + - "Invalid duration string: 20" + "Invalid duration string: 20" ) assertThat(exception.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) } @@ -1216,7 +1222,8 @@ class NodeMetadataTest { withService("lorem") withService("ipsum", routingPolicy = RoutingPolicyInput(autoServiceTag = false)) withService("dolom", routingPolicy = RoutingPolicyInput(fallbackToAnyInstance = false)) - withService("est", routingPolicy = RoutingPolicyInput(serviceTagPreference = listOf("estTag")) + withService( + "est", routingPolicy = RoutingPolicyInput(serviceTagPreference = listOf("estTag")) ) } @@ -1228,30 +1235,30 @@ class NodeMetadataTest { assertThat(dependencies).hasSize(4) val loremDependency = dependencies[0] assertThat(loremDependency.service).isEqualTo("lorem") - assertThat(loremDependency.settings.routingPolicy).satisfies { policy -> + assertThat(loremDependency.settings.routingPolicy).satisfies(Consumer { policy -> assertThat(policy.autoServiceTag).isTrue assertThat(policy.serviceTagPreference).isEqualTo(listOf("preferredGlobalTag", "fallbackGlobalTag")) assertThat(policy.fallbackToAnyInstance).isTrue - } + }) val ipsumDependency = dependencies[1] assertThat(ipsumDependency.service).isEqualTo("ipsum") - assertThat(ipsumDependency.settings.routingPolicy).satisfies { policy -> + assertThat(ipsumDependency.settings.routingPolicy).satisfies(Consumer { policy -> assertThat(policy.autoServiceTag).isFalse - } + }) val dolomDependency = dependencies[2] assertThat(dolomDependency.service).isEqualTo("dolom") - assertThat(dolomDependency.settings.routingPolicy).satisfies { policy -> + assertThat(dolomDependency.settings.routingPolicy).satisfies(Consumer { policy -> assertThat(policy.autoServiceTag).isTrue assertThat(policy.serviceTagPreference).isEqualTo(listOf("preferredGlobalTag", "fallbackGlobalTag")) assertThat(policy.fallbackToAnyInstance).isFalse - } + }) val estDependency = dependencies[3] assertThat(estDependency.service).isEqualTo("est") - assertThat(estDependency.settings.routingPolicy).satisfies { policy -> + assertThat(estDependency.settings.routingPolicy).satisfies(Consumer { policy -> assertThat(policy.autoServiceTag).isTrue assertThat(policy.serviceTagPreference).isEqualTo(listOf("estTag")) assertThat(policy.fallbackToAnyInstance).isTrue - } + }) } @ParameterizedTest @@ -1269,9 +1276,11 @@ class NodeMetadataTest { fun `should parse custom data if it is a struct with value`(name: String, field: Value, expected: Any?) { // given val value = Value.newBuilder() - .setStructValue(Struct.newBuilder() - .putFields(name, field) - .build()) + .setStructValue( + Struct.newBuilder() + .putFields(name, field) + .build() + ) .build() // when @@ -1331,10 +1340,10 @@ class NodeMetadataTest { val jwtFilterProperties = JwtFilterProperties() val oauthProviders = mapOf( "oauth2-mock" to - OAuthProvider( - jwksUri = URI.create("http://localhost:8080/jwks-address/"), - clusterName = "oauth" - ) + OAuthProvider( + jwksUri = URI.create("http://localhost:8080/jwks-address/"), + clusterName = "oauth" + ) ) jwtFilterProperties.providers = oauthProviders snapshotProperties.jwt = jwtFilterProperties diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidatorTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidatorTest.kt index 76451d45c..59197755e 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidatorTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/NodeMetadataValidatorTest.kt @@ -12,6 +12,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.EnabledCommunicationMod import pl.allegro.tech.servicemesh.envoycontrol.snapshot.IncomingPermissionsProperties import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OutgoingPermissionsProperties import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties +import java.util.function.Consumer import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as DiscoveryRequestV3 class NodeMetadataValidatorTest { @@ -221,12 +222,10 @@ class NodeMetadataValidatorTest { // expects assertThatExceptionOfType(ServiceNameNotProvidedException::class.java) .isThrownBy { requireServiceNameValidator.onV3StreamRequest(streamId = 123, request = request) } - .satisfies { - assertThat(it.status.description).isEqualTo( - "Service name has not been provided." - ) + .satisfies(Consumer { + assertThat(it.status.description).isEqualTo("Service name has not been provided.") assertThat(it.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) - } + }) } @Test @@ -256,12 +255,10 @@ class NodeMetadataValidatorTest { // then assertThatExceptionOfType(RateLimitIncorrectValidationException::class.java) .isThrownBy { validator.onV3StreamRequest(123, request = request) } - .satisfies { - assertThat(it.status.description).isEqualTo( - "Rate limit value: 0/j is incorrect." - ) + .satisfies(Consumer { + assertThat(it.status.description).isEqualTo("Rate limit value: 0/j is incorrect.") assertThat(it.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) - } + }) } private fun createIncomingPermissions( diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/RoutesAssertions.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/RoutesAssertions.kt index d1162561c..ad89091ee 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/RoutesAssertions.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/RoutesAssertions.kt @@ -12,6 +12,7 @@ import io.envoyproxy.envoy.config.route.v3.RouteAction import io.envoyproxy.envoy.config.route.v3.RouteConfiguration import io.envoyproxy.envoy.config.route.v3.VirtualCluster import io.envoyproxy.envoy.config.route.v3.VirtualHost +import io.envoyproxy.envoy.config.route.v3.WeightedCluster import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher import org.assertj.core.api.Assertions.assertThat import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LocalRetryPolicyProperties @@ -27,6 +28,16 @@ fun RouteConfiguration.hasVirtualHostThat(name: String, condition: VirtualHost.( return this } +fun RouteConfiguration.hasClusterThat(name: String, condition: WeightedCluster.ClusterWeight?.() -> Unit): RouteConfiguration { + condition(this.virtualHostsList + .flatMap { it.routesList } + .map { it.route } + .flatMap { route -> route.weightedClusters.clustersList } + .find { it.name == name } + ) + return this +} + fun RouteConfiguration.hasRequestHeaderToAdd(key: String, value: String): RouteConfiguration { assertThat(this.requestHeadersToAddList).anySatisfy { assertThat(it.header.key).isEqualTo(key) diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactoryTest.kt new file mode 100644 index 000000000..2a1d8917a --- /dev/null +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactoryTest.kt @@ -0,0 +1,138 @@ +package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters + +import io.envoyproxy.controlplane.cache.SnapshotResources +import io.envoyproxy.envoy.config.cluster.v3.Cluster +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties +import pl.allegro.tech.servicemesh.envoycontrol.utils.AGGREGATE_CLUSTER_NAME +import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME1 +import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME2 +import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS +import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME +import pl.allegro.tech.servicemesh.envoycontrol.utils.MAIN_CLUSTER_NAME +import pl.allegro.tech.servicemesh.envoycontrol.utils.SECONDARY_CLUSTER_NAME +import pl.allegro.tech.servicemesh.envoycontrol.utils.TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE +import pl.allegro.tech.servicemesh.envoycontrol.utils.createAllServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.utils.createCluster +import pl.allegro.tech.servicemesh.envoycontrol.utils.createClusterConfigurations +import pl.allegro.tech.servicemesh.envoycontrol.utils.createListenersConfig +import pl.allegro.tech.servicemesh.envoycontrol.utils.createLoadAssignments +import pl.allegro.tech.servicemesh.envoycontrol.utils.createServicesGroup + +internal class EnvoyClustersFactoryTest { + + companion object { + private val factory = EnvoyClustersFactory(SnapshotProperties()) + private val snapshotPropertiesWithWeights = SnapshotProperties().apply { + loadBalancing.trafficSplitting.serviceByWeightsProperties = mapOf( + DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS + ) + loadBalancing.trafficSplitting.zoneName = TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE + } + } + + @Test + fun `should get clusters for group`() { + val snapshotProperties = SnapshotProperties() + val cluster1 = createCluster(snapshotProperties, CLUSTER_NAME1) + val result = factory.getClustersForGroup( + createServicesGroup( + snapshotProperties = snapshotProperties, + dependencies = arrayOf(CLUSTER_NAME1 to null), + ), + createGlobalSnapshot( + cluster1, + createCluster(snapshotProperties, CLUSTER_NAME2), + ) + ) + assertThat(result) + .allSatisfy { + assertThat(it).isEqualTo(cluster1) + } + } + + @Test + fun `should get wildcard clusters for group`() { + val snapshotProperties = SnapshotProperties() + val cluster1 = createCluster(clusterName = CLUSTER_NAME1) + val cluster2 = createCluster(clusterName = CLUSTER_NAME2) + val result = factory.getClustersForGroup( + createAllServicesGroup( + snapshotProperties = snapshotProperties, + defaultServiceSettings = DependencySettings() + ), + createGlobalSnapshot(cluster1, cluster2) + ) + assertThat(result) + .anySatisfy { + assertThat(it).isEqualTo(cluster1) + }.anySatisfy { + assertThat(it).isEqualTo(cluster2) + } + } + + @Test + fun `should get secured eds clusters for group`() { + val snapshotProperties = SnapshotProperties() + val cluster = createCluster(snapshotProperties, CLUSTER_NAME1) + val securedCluster = createCluster(snapshotProperties, CLUSTER_NAME1, idleTimeout = 100) + val result = factory.getClustersForGroup( + createServicesGroup( + snapshotProperties = snapshotProperties, + listenersConfig = createListenersConfig(snapshotProperties, true), + dependencies = arrayOf(CLUSTER_NAME1 to null), + ), + createGlobalSnapshot( + cluster, + securedClusters = listOf(securedCluster) + ) + ) + assertThat(result).allSatisfy { + assertThat(it).isEqualTo(securedCluster) + } + } + + @Test + fun `should get clusters for group with weighted and aggregate clusters`() { + val cluster1 = createCluster(snapshotPropertiesWithWeights, CLUSTER_NAME1) + val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights) + val result = factory.getClustersForGroup( + createServicesGroup( + snapshotProperties = snapshotPropertiesWithWeights, + listenersConfig = createListenersConfig(snapshotPropertiesWithWeights, true), + dependencies = arrayOf(CLUSTER_NAME1 to null), + ), + createGlobalSnapshot(cluster1) + ) + assertThat(result) + .anySatisfy { + assertThat(it.name).isEqualTo(MAIN_CLUSTER_NAME) + assertThat(it.edsClusterConfig).isEqualTo(cluster1.edsClusterConfig) + } + .anySatisfy { + assertThat(it.name).isEqualTo(SECONDARY_CLUSTER_NAME) + } + .anySatisfy { + assertThat(it.name).isEqualTo(AGGREGATE_CLUSTER_NAME) + assertThat(it.clusterType.typedConfig.isInitialized).isTrue() + } + } + + private fun createGlobalSnapshot( + vararg clusters: Cluster, + securedClusters: List = clusters.asList() + ): GlobalSnapshot { + return GlobalSnapshot( + SnapshotResources.create(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3") + .resources(), + clusters.map { it.name }.toSet(), + SnapshotResources.create(createLoadAssignments(clusters.toList()), "v1").resources(), + createClusterConfigurations(), + SnapshotResources.create(securedClusters, "v3").resources() + ) + } +} diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt index 1bc564e9d..78fef6122 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource +import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings import pl.allegro.tech.servicemesh.envoycontrol.groups.RoutingPolicy import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.Locality @@ -18,7 +19,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceName import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingPriorityProperties import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingProperties +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights +import pl.allegro.tech.servicemesh.envoycontrol.utils.zoneWeights import java.util.concurrent.ConcurrentHashMap import java.util.stream.Stream @@ -51,12 +57,20 @@ internal class EnvoyEndpointsFactoryTest { private val serviceName = "service-one" + private val secondaryClusterName = "service-one-secondary" + + private val serviceName2 = "service-two" + + private val defaultWeights = zoneWeights(50, 50) + + private val defaultZone = "DC1" + private val endpointsFactory = EnvoyEndpointsFactory( SnapshotProperties().apply { routing.serviceTags.enabled = true routing.serviceTags.autoServiceTagEnabled = true }, - currentZone = "DC1" + currentZone = defaultZone ) private val multiClusterStateDC1Local = MultiClusterState( @@ -352,6 +366,122 @@ internal class EnvoyEndpointsFactoryTest { ) } + @Test + fun `should create secondary cluster endpoints`() { + val multiClusterState = MultiClusterState( + listOf( + clusterState(cluster = "DC1"), + clusterState(cluster = "DC2"), + clusterState(cluster = "DC1", serviceName = serviceName2), + clusterState(cluster = "DC2", serviceName = serviceName2), + ) + ) + + val services = setOf(serviceName, serviceName2) + val envoyEndpointsFactory = EnvoyEndpointsFactory( + snapshotPropertiesWithTrafficSplitting( + mapOf(serviceName to defaultWeights) + ), + currentZone = defaultZone + ) + val loadAssignments = envoyEndpointsFactory + .createLoadAssignment(services, multiClusterState) + .associateBy { it.clusterName } + + val result = envoyEndpointsFactory.getSecondaryClusterEndpoints( + loadAssignments, + services.map { it.toRouteSpecification() } + ) + assertThat(result).hasSize(2) + .anySatisfy { x -> assertThat(x.clusterName).isEqualTo(secondaryClusterName) } + .allSatisfy { x -> assertThat(x.endpointsList).allMatch { it.locality.zone == defaultZone } } + } + + @Test + fun `should get empty secondary cluster endpoints for route spec with no weights`() { + val multiClusterState = MultiClusterState( + listOf( + clusterState(cluster = defaultZone), + clusterState(cluster = defaultZone, serviceName = serviceName2), + ) + ) + val services = setOf(serviceName, serviceName2) + val envoyEndpointsFactory = EnvoyEndpointsFactory( + snapshotPropertiesWithTrafficSplitting( + mapOf(serviceName to defaultWeights) + ), + currentZone = defaultZone + ) + val loadAssignments = envoyEndpointsFactory.createLoadAssignment( + services, + multiClusterState + ).associateBy { it.clusterName } + + val result = envoyEndpointsFactory.getSecondaryClusterEndpoints( + loadAssignments, + listOf(serviceName.toRouteSpecification()) + ) + assertThat(result).allSatisfy { x -> + assertThat(x.clusterName) + .isEqualTo(secondaryClusterName) + } + } + + @Test + fun `should get empty secondary cluster endpoints for route spec with no such cluster`() { + val multiClusterState = MultiClusterState( + listOf( + clusterState(cluster = defaultZone), + clusterState(cluster = defaultZone, serviceName = serviceName2), + ) + ) + val services = setOf(serviceName, serviceName2) + val envoyEndpointsFactory = EnvoyEndpointsFactory( + snapshotPropertiesWithTrafficSplitting( + mapOf(serviceName to defaultWeights) + ), + currentZone = defaultZone + ) + val loadAssignments = envoyEndpointsFactory.createLoadAssignment( + services, + multiClusterState + ).associateBy { it.clusterName } + + val result = envoyEndpointsFactory.getSecondaryClusterEndpoints( + loadAssignments, + listOf("some-other-service-name".toRouteSpecification()) + ) + assertThat(result).isEmpty() + } + + @Test + fun `should get empty secondary cluster endpoints when none comply zone condition`() { + val multiClusterState = MultiClusterState( + listOf( + clusterState(cluster = defaultZone), + clusterState(cluster = defaultZone, serviceName = serviceName2), + ) + ) + val services = setOf(serviceName, serviceName2) + val envoyEndpointsFactory = EnvoyEndpointsFactory( + snapshotPropertiesWithTrafficSplitting( + mapOf(serviceName to defaultWeights), + zone = "DC2" + ), + currentZone = defaultZone + ) + val loadAssignments = envoyEndpointsFactory.createLoadAssignment( + services, + multiClusterState + ).associateBy { it.clusterName } + + val result = envoyEndpointsFactory.getSecondaryClusterEndpoints( + loadAssignments, + listOf(serviceName.toRouteSpecification()) + ) + assertThat(result).isEmpty() + } + private fun List.assertHasLoadAssignment(map: Map) { assertThat(this) .isNotEmpty() @@ -364,7 +494,11 @@ internal class EnvoyEndpointsFactoryTest { } } - private fun clusterState(locality: Locality, cluster: String): ClusterState { + private fun clusterState( + locality: Locality = Locality.LOCAL, + cluster: String, + serviceName: String = this.serviceName + ): ClusterState { return ClusterState( ServicesState( serviceNameToInstances = concurrentMapOf( @@ -405,6 +539,21 @@ internal class EnvoyEndpointsFactoryTest { } } + private fun snapshotPropertiesWithTrafficSplitting( + serviceByWeights: Map, + zone: String = defaultZone + ) = + SnapshotProperties().apply { + loadBalancing.trafficSplitting = TrafficSplittingProperties().apply { + zoneName = zone + serviceByWeightsProperties = serviceByWeights + } + } + + private fun String.toRouteSpecification(weights: ZoneWeights = defaultWeights): RouteSpecification { + return WeightRouteSpecification(this, listOf(), DependencySettings(), weights) + } + private fun String.toClusterLoadAssignment(): ClusterLoadAssignment = ClusterLoadAssignment.newBuilder() .also { builder -> JsonFormat.parser().merge(this, builder) } .build() diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt index 08141d3b2..3ffa2b247 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/listeners/config/LocalReplyConfigFactoryTest.kt @@ -10,6 +10,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.HeaderMatcher import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LocalReplyMapperProperties import pl.allegro.tech.servicemesh.envoycontrol.snapshot.MatcherAndMapper import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ResponseFormat +import java.util.function.Consumer class LocalReplyConfigFactoryTest { @@ -246,11 +247,11 @@ class LocalReplyConfigFactoryTest { // expects assertThatExceptionOfType(IllegalArgumentException::class.java) .isThrownBy { LocalReplyConfigFactory(properties) } - .satisfies { + .satisfies(Consumer { assertThat(it.message).isEqualTo( "One and only one of: headerMatcher, responseFlagMatcher, statusCodeMatcher has to be defined." ) - } + }) } @Test @@ -272,11 +273,11 @@ class LocalReplyConfigFactoryTest { // expects assertThatExceptionOfType(IllegalArgumentException::class.java) .isThrownBy { LocalReplyConfigFactory(properties) } - .satisfies { + .satisfies(Consumer { assertThat(it.message).isEqualTo( "Only one of: exactMatch, regexMatch can be defined." ) - } + }) } @Test @@ -295,11 +296,11 @@ class LocalReplyConfigFactoryTest { // expects assertThatExceptionOfType(IllegalArgumentException::class.java) .isThrownBy { LocalReplyConfigFactory(properties) } - .satisfies { + .satisfies(Consumer { assertThat(it.message).isEqualTo( "Only one of: jsonFormat, textFormat can be defined." ) - } + }) } @Test @@ -326,11 +327,11 @@ class LocalReplyConfigFactoryTest { // expects assertThatExceptionOfType(IllegalArgumentException::class.java) .isThrownBy { LocalReplyConfigFactory(properties) } - .satisfies { + .satisfies(Consumer { assertThat(it.message).isEqualTo( "Only one of: jsonFormat, textFormat can be defined." ) - } + }) } private val expectedConfigForResponseFlagsMatcher = """mappers { diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactoryTest.kt index eebb7f283..2044699df 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/routes/EnvoyEgressRoutesFactoryTest.kt @@ -2,10 +2,13 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes import com.google.protobuf.util.Durations import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings import pl.allegro.tech.servicemesh.envoycontrol.groups.Outgoing import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryPolicy +import pl.allegro.tech.servicemesh.envoycontrol.groups.hasClusterThat import pl.allegro.tech.servicemesh.envoycontrol.groups.hasCustomIdleTimeout import pl.allegro.tech.servicemesh.envoycontrol.groups.hasCustomRequestTimeout import pl.allegro.tech.servicemesh.envoycontrol.groups.hasHostRewriteHeader @@ -21,13 +24,15 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.hostRewriteHeaderIsEmpty import pl.allegro.tech.servicemesh.envoycontrol.groups.matchingOnAnyMethod import pl.allegro.tech.servicemesh.envoycontrol.groups.matchingOnMethod import pl.allegro.tech.servicemesh.envoycontrol.groups.matchingOnPrefix -import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.StandardRouteSpecification +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights internal class EnvoyEgressRoutesFactoryTest { val clusters = listOf( - RouteSpecification( + StandardRouteSpecification( clusterName = "srv1", routeDomains = listOf("srv1"), settings = DependencySettings( @@ -42,6 +47,15 @@ internal class EnvoyEgressRoutesFactoryTest { ) ) + val weightedClusters = listOf( + WeightRouteSpecification( + clusterName = "srv1", + routeDomains = listOf("srv1"), + settings = DependencySettings(), + ZoneWeights() + ) + ) + @Test fun `should add client identity header if incoming permissions are enabled`() { // given @@ -171,8 +185,8 @@ internal class EnvoyEgressRoutesFactoryTest { egress.headersToRemove = mutableListOf("x-special-case-header", "x-custom") }) val routesSpecifications = listOf( - RouteSpecification("example_pl_1553", listOf("example.pl:1553"), DependencySettings()), - RouteSpecification("example_com_1553", listOf("example.com:1553"), DependencySettings()) + StandardRouteSpecification("example_pl_1553", listOf("example.pl:1553"), DependencySettings()), + StandardRouteSpecification("example_com_1553", listOf("example.com:1553"), DependencySettings()) ) // when @@ -203,7 +217,7 @@ internal class EnvoyEgressRoutesFactoryTest { val routesFactory = EnvoyEgressRoutesFactory(SnapshotProperties()) val retryPolicy = RetryPolicy(methods = setOf("GET", "POST")) val routesSpecifications = listOf( - RouteSpecification("example", listOf("example.pl:1553"), DependencySettings(retryPolicy = retryPolicy)), + StandardRouteSpecification("example", listOf("example.pl:1553"), DependencySettings(retryPolicy = retryPolicy)), ) val routeConfig = routesFactory.createEgressRouteConfig( @@ -234,7 +248,7 @@ internal class EnvoyEgressRoutesFactoryTest { val routesFactory = EnvoyEgressRoutesFactory(SnapshotProperties()) val retryPolicy = RetryPolicy(numberRetries = 3) val routesSpecifications = listOf( - RouteSpecification("example", listOf("example.pl:1553"), DependencySettings(retryPolicy = retryPolicy)), + StandardRouteSpecification("example", listOf("example.pl:1553"), DependencySettings(retryPolicy = retryPolicy)), ) val routeConfig = routesFactory.createEgressRouteConfig( @@ -259,7 +273,7 @@ internal class EnvoyEgressRoutesFactoryTest { // given val routesFactory = EnvoyEgressRoutesFactory(SnapshotProperties()) val routesSpecifications = listOf( - RouteSpecification("example", listOf("example.pl:1553"), DependencySettings()), + StandardRouteSpecification("example", listOf("example.pl:1553"), DependencySettings()), ) val routeConfig = routesFactory.createEgressRouteConfig( @@ -278,4 +292,47 @@ internal class EnvoyEgressRoutesFactoryTest { defaultRoute.matchingOnPrefix("/") } } + + @Test + fun `should add traffic splitting header for secondary weighted cluster`() { + + val expectedHeaderKey = "test-header" + val routesFactory = EnvoyEgressRoutesFactory(SnapshotProperties().apply { + loadBalancing.trafficSplitting.headerName = expectedHeaderKey + }) + + val routeConfig = routesFactory.createEgressRouteConfig( + "client1", + weightedClusters, + false + ) + + routeConfig + .hasClusterThat("srv1-aggregate") { + assertNotNull(this) + assertNotNull(this!!.responseHeadersToAddList.find { it.header.key == expectedHeaderKey }) + }.hasClusterThat("srv1") { + assertNotNull(this) + assertTrue(this!!.responseHeadersToAddList.isEmpty()) + } + } + + @Test + fun `should not add traffic splitting header if header key is not set`() { + val routesFactory = EnvoyEgressRoutesFactory(SnapshotProperties()) + val routeConfig = routesFactory.createEgressRouteConfig( + "client1", + weightedClusters, + false + ) + + routeConfig + .hasClusterThat("srv1-aggregate") { + assertNotNull(this) + assertTrue(this!!.responseHeadersToAddList.isEmpty()) + }.hasClusterThat("srv1") { + assertNotNull(this) + assertNotNull(this!!.responseHeadersToAddList.isEmpty()) + } + } } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ClusterOperations.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ClusterOperations.kt new file mode 100644 index 000000000..d261ed874 --- /dev/null +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ClusterOperations.kt @@ -0,0 +1,38 @@ +package pl.allegro.tech.servicemesh.envoycontrol.utils + +import com.google.protobuf.Duration +import com.google.protobuf.util.Durations +import io.envoyproxy.envoy.config.cluster.v3.Cluster +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource +import io.envoyproxy.envoy.config.core.v3.ConfigSource +import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ClusterConfiguration +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties + +fun createCluster( + defaultProperties: SnapshotProperties = SnapshotProperties(), + clusterName: String = CLUSTER_NAME, + idleTimeout: Long = DEFAULT_IDLE_TIMEOUT +): Cluster { + return Cluster.newBuilder().setName(clusterName) + .setType(Cluster.DiscoveryType.EDS) + .setConnectTimeout(Durations.fromMillis(defaultProperties.edsConnectionTimeout.toMillis())) + .setEdsClusterConfig( + Cluster.EdsClusterConfig.newBuilder() + .setEdsConfig( + ConfigSource.newBuilder().setAds( + AggregatedConfigSource.newBuilder() + ) + ).setServiceName(clusterName) + ) + .setLbPolicy(defaultProperties.loadBalancing.policy) + .setCommonHttpProtocolOptions( + HttpProtocolOptions.newBuilder() + .setIdleTimeout(Duration.newBuilder().setSeconds(idleTimeout).build()) + ) + .build() +} + +fun createClusterConfigurations(vararg clusters: Cluster): Map { + return clusters.associate { it.name to ClusterConfiguration(it.name, false) } +} diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/EndpointsOperations.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/EndpointsOperations.kt new file mode 100644 index 000000000..a47cf5fb6 --- /dev/null +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/EndpointsOperations.kt @@ -0,0 +1,33 @@ +package pl.allegro.tech.servicemesh.envoycontrol.utils + +import io.envoyproxy.envoy.config.cluster.v3.Cluster +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints + +fun createLoadAssignments(clusters: List): List { + return clusters.map { + ClusterLoadAssignment.newBuilder() + .setClusterName(it.name) + .addAllEndpoints(createEndpoints()) + .build() + } +} + +fun createEndpoints(): List = + listOf( + createEndpoint(CURRENT_ZONE), + createEndpoint(TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE) + ) + +fun createEndpoint(zone: String): LocalityLbEndpoints { + return LocalityLbEndpoints.newBuilder() + .setLocality( + io.envoyproxy.envoy.config.core.v3.Locality + .newBuilder() + .setZone(zone) + .build() + ) + .addAllLbEndpoints(listOf(LbEndpoint.getDefaultInstance())) + .build() +} diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/GroupsOperations.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/GroupsOperations.kt new file mode 100644 index 000000000..b38867bc7 --- /dev/null +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/GroupsOperations.kt @@ -0,0 +1,78 @@ +package pl.allegro.tech.servicemesh.envoycontrol.utils + +import pl.allegro.tech.servicemesh.envoycontrol.groups.AccessLogFilterSettings +import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode +import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings +import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint +import pl.allegro.tech.servicemesh.envoycontrol.groups.ListenersConfig +import pl.allegro.tech.servicemesh.envoycontrol.groups.Outgoing +import pl.allegro.tech.servicemesh.envoycontrol.groups.ProxySettings +import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup +import pl.allegro.tech.servicemesh.envoycontrol.groups.with +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.serviceDependencies + +fun createServicesGroup( + mode: CommunicationMode = CommunicationMode.XDS, + serviceName: String = DEFAULT_SERVICE_NAME, + discoveryServiceName: String = DEFAULT_DISCOVERY_SERVICE_NAME, + dependencies: Array> = emptyArray(), + rateLimitEndpoints: List = emptyList(), + snapshotProperties: SnapshotProperties, + listenersConfig: ListenersConfig? = createListenersConfig(snapshotProperties) +): ServicesGroup { + return ServicesGroup( + mode, + serviceName, + discoveryServiceName, + ProxySettings().with( + serviceDependencies = serviceDependencies(*dependencies), + rateLimitEndpoints = rateLimitEndpoints + ), + listenersConfig + ) +} + +fun createAllServicesGroup( + mode: CommunicationMode = CommunicationMode.XDS, + serviceName: String = DEFAULT_SERVICE_NAME, + discoveryServiceName: String = DEFAULT_DISCOVERY_SERVICE_NAME, + dependencies: Array> = emptyArray(), + defaultServiceSettings: DependencySettings, + listenersConfigExists: Boolean = true, + snapshotProperties: SnapshotProperties +): AllServicesGroup { + val listenersConfig = when (listenersConfigExists) { + true -> createListenersConfig(snapshotProperties) + false -> null + } + return AllServicesGroup( + mode, + serviceName, + discoveryServiceName, + ProxySettings().with( + serviceDependencies = serviceDependencies(*dependencies), + defaultServiceSettings = defaultServiceSettings + ), + listenersConfig + ) +} + +fun createListenersConfig( + snapshotProperties: SnapshotProperties, + hasStaticSecretsDefined: Boolean = false +) + : ListenersConfig { + return ListenersConfig( + ingressHost = INGRESS_HOST, + ingressPort = INGRESS_PORT, + egressHost = EGRESS_HOST, + egressPort = EGRESS_PORT, + accessLogFilterSettings = AccessLogFilterSettings( + null, + snapshotProperties.dynamicListeners.httpFilters.accessLog.filters + ), + hasStaticSecretsDefined = hasStaticSecretsDefined + ) +} diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/TestData.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/TestData.kt new file mode 100644 index 000000000..fc2ecdaea --- /dev/null +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/TestData.kt @@ -0,0 +1,26 @@ +package pl.allegro.tech.servicemesh.envoycontrol.utils + +import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights + +const val INGRESS_HOST = "ingress-host" +const val INGRESS_PORT = 3380 +const val EGRESS_HOST = "egress-host" +const val EGRESS_PORT = 3380 +const val DEFAULT_IDLE_TIMEOUT = 100L +const val DEFAULT_SERVICE_NAME = "service-name" +const val DEFAULT_DISCOVERY_SERVICE_NAME = "discovery-service-name" +const val CLUSTER_NAME = "cluster-name" +const val CLUSTER_NAME1 = "cluster-1" +const val CLUSTER_NAME2 = "cluster-2" +const val MAIN_CLUSTER_NAME = "cluster-1" +const val SECONDARY_CLUSTER_NAME = "cluster-1-secondary" +const val AGGREGATE_CLUSTER_NAME = "cluster-1-aggregate" +const val TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE = "dc2" +const val CURRENT_ZONE = "dc1" + +val DEFAULT_CLUSTER_WEIGHTS = zoneWeights(50, 50) + +fun zoneWeights(main: Int, secondary: Int) = ZoneWeights().also { + it.main = main + it.secondary = secondary +} diff --git a/envoy-control-runner/build.gradle b/envoy-control-runner/build.gradle index 138b8ffd5..cee970308 100644 --- a/envoy-control-runner/build.gradle +++ b/envoy-control-runner/build.gradle @@ -1,5 +1,6 @@ plugins { id 'application' + id 'org.springframework.boot' apply false } mainClassName = 'pl.allegro.tech.servicemesh.envoycontrol.EnvoyControl' @@ -7,15 +8,13 @@ mainClassName = 'pl.allegro.tech.servicemesh.envoycontrol.EnvoyControl' dependencies { api project(':envoy-control-source-consul') - implementation group: 'org.springframework.boot', name: 'spring-boot-starter', version: versions.spring_boot - api group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: versions.spring_boot - api group: 'org.springframework.boot', name: 'spring-boot-starter-actuator', version: versions.spring_boot - api group: 'org.springframework.boot', name: 'spring-boot-starter-security', version: versions.spring_boot - implementation group: 'io.micrometer', name: 'micrometer-registry-prometheus', version: versions.micrometer + implementation group: 'org.springframework.boot', name: 'spring-boot-starter' + api group: 'org.springframework.boot', name: 'spring-boot-starter-web' + api group: 'org.springframework.boot', name: 'spring-boot-starter-actuator' + api group: 'org.springframework.boot', name: 'spring-boot-starter-security' + implementation group: 'io.micrometer', name: 'micrometer-registry-prometheus' - api group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: versions.kotlin - - implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: versions.jackson + implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin' implementation group: 'net.openhft', name: 'zero-allocation-hashing', version: versions.xxhash } diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/chaos/api/ChaosController.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/chaos/api/ChaosController.kt index d39430ed1..6d49b6c72 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/chaos/api/ChaosController.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/chaos/api/ChaosController.kt @@ -3,11 +3,13 @@ package pl.allegro.tech.servicemesh.envoycontrol.chaos.api import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.http.HttpMethod import org.springframework.http.HttpStatus -import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder import org.springframework.security.config.annotation.web.builders.HttpSecurity -import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter +import org.springframework.security.config.annotation.web.invoke +import org.springframework.security.core.userdetails.User +import org.springframework.security.core.userdetails.UserDetails +import org.springframework.security.provisioning.InMemoryUserDetailsManager +import org.springframework.security.web.SecurityFilterChain import org.springframework.web.bind.annotation.DeleteMapping import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.PathVariable @@ -41,27 +43,36 @@ class ChaosController(val chaosService: ChaosService) { ExperimentsListResponse(chaosService.getExperimentsList().map { it.toResponseObject() }) @Configuration - class SecurityConfig : WebSecurityConfigurerAdapter() { + class SecurityConfig { @Bean @ConfigurationProperties("chaos") fun basicAuthUser() = BasicAuthUser() - override fun configure(auth: AuthenticationManagerBuilder) { - auth.inMemoryAuthentication() - .withUser(basicAuthUser().username) + @Bean + fun userDetailsService(): InMemoryUserDetailsManager { + val user: UserDetails = User.builder() + .username(basicAuthUser().username) .password("{noop}${basicAuthUser().password}") .roles("CHAOS") + .build() + + return InMemoryUserDetailsManager(user) } - override fun configure(http: HttpSecurity) { - http.httpBasic() - .and() - .authorizeRequests() - .antMatchers(HttpMethod.POST, "/chaos/fault/**").hasRole("CHAOS") - .and() - .csrf().disable() - .formLogin().disable() + @Bean + fun filterChain(http: HttpSecurity): SecurityFilterChain? { + http { + httpBasic { } + authorizeHttpRequests { + authorize("/chaos/fault/**", hasRole("CHAOS")) + authorize(anyRequest, permitAll) + } + csrf { disable() } + formLogin { disable() } + } + + return http.build() } } diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/SynchronizationConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/SynchronizationConfig.kt index 23b592e3a..934e40d91 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/SynchronizationConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/SynchronizationConfig.kt @@ -5,7 +5,6 @@ import io.micrometer.core.instrument.MeterRegistry import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.core.task.SimpleAsyncTaskExecutor import org.springframework.http.client.SimpleClientHttpRequestFactory import org.springframework.web.client.RestTemplate import pl.allegro.tech.discovery.consul.recipes.datacenter.ConsulDatacenterReader @@ -29,7 +28,6 @@ class SynchronizationConfig { envoyControlProperties: EnvoyControlProperties ): RestTemplate { val requestFactory = SimpleClientHttpRequestFactory() - requestFactory.setTaskExecutor(SimpleAsyncTaskExecutor()) requestFactory.setConnectTimeout(envoyControlProperties.sync.connectionTimeout.toMillis().toInt()) requestFactory.setReadTimeout(envoyControlProperties.sync.readTimeout.toMillis().toInt()) diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/consul/ConsulWatcherConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/consul/ConsulWatcherConfig.kt index cbada7d2b..d16e79f68 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/consul/ConsulWatcherConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/consul/ConsulWatcherConfig.kt @@ -1,12 +1,11 @@ package pl.allegro.tech.servicemesh.envoycontrol.infrastructure.consul import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.KotlinModule import okhttp3.Dispatcher import okhttp3.Interceptor import okhttp3.OkHttpClient import okhttp3.Response -import okhttp3.internal.Util +import okhttp3.internal.threadFactory import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import pl.allegro.tech.discovery.consul.recipes.ConsulRecipes @@ -86,7 +85,7 @@ open class ConsulWatcherConfig { watcherConfig.dispatcherPoolKeepAliveTime.toMillis(), TimeUnit.MILLISECONDS, SynchronousQueue(), - Util.threadFactory("consul-okhttp-dispatcher", false) + threadFactory("consul-okhttp-dispatcher", false) ) } @@ -106,7 +105,4 @@ open class ConsulWatcherConfig { private val counter = AtomicInteger() override fun newThread(r: Runnable) = Thread(r, "consul-watcher-worker-${counter.getAndIncrement()}") } - - @Bean - fun kotlinModule() = KotlinModule.Builder().build() } diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/consul/JacksonConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/consul/JacksonConfig.kt new file mode 100644 index 000000000..bdca62e9b --- /dev/null +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/consul/JacksonConfig.kt @@ -0,0 +1,12 @@ +package pl.allegro.tech.servicemesh.envoycontrol.infrastructure.consul + +import com.fasterxml.jackson.module.kotlin.KotlinModule +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration + +@Configuration +open class JacksonConfig { + + @Bean + fun kotlinModule() = KotlinModule.Builder().build() +} diff --git a/envoy-control-runner/src/main/resources/application.yaml b/envoy-control-runner/src/main/resources/application.yaml index 46ea03a2c..8fe991e1c 100644 --- a/envoy-control-runner/src/main/resources/application.yaml +++ b/envoy-control-runner/src/main/resources/application.yaml @@ -17,4 +17,4 @@ management: metrics.enabled: true prometheus.enabled: true endpoints.web.exposure.include: "*" - metrics.export.prometheus.enabled: true + prometheus.metrics.export.enabled: true diff --git a/envoy-control-services/build.gradle b/envoy-control-services/build.gradle index 181b56824..aeabef69a 100644 --- a/envoy-control-services/build.gradle +++ b/envoy-control-services/build.gradle @@ -1,4 +1,8 @@ +plugins { + id 'org.springframework.boot' apply false +} + dependencies { - implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib', version: versions.kotlin - api group: 'io.projectreactor', name: 'reactor-core', version: versions.reactor + implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib' + api group: 'io.projectreactor', name: 'reactor-core' } diff --git a/envoy-control-source-consul/build.gradle b/envoy-control-source-consul/build.gradle index 07deb9d65..d15e0c31c 100644 --- a/envoy-control-source-consul/build.gradle +++ b/envoy-control-source-consul/build.gradle @@ -1,17 +1,19 @@ +plugins { + id 'org.springframework.boot' apply false +} + dependencies { api project(':envoy-control-core') - implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib', version: versions.kotlin - implementation group: 'io.projectreactor', name: 'reactor-core', version: versions.reactor + implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib' + implementation group: 'io.projectreactor', name: 'reactor-core' api group: 'pl.allegro.tech.discovery', name: 'consul-recipes', version: versions.consul_recipes api group: 'com.ecwid.consul', name: 'consul-api', version: versions.ecwid_consul - testImplementation group: 'org.mockito', name: 'mockito-core', version: versions.mockito + testImplementation group: 'org.mockito', name: 'mockito-core' testImplementation group: 'cglib', name: 'cglib-nodep', version: versions.cglib - testImplementation(group: 'com.pszymczyk.consul', name: 'embedded-consul', version: versions.embedded_consul) { - exclude group: 'org.apache.httpcomponents', module: 'httpclient' - } - - testImplementation group: 'io.projectreactor', name: 'reactor-test', version: versions.reactor + testImplementation group: 'io.projectreactor', name: 'reactor-test' + testImplementation group: 'org.testcontainers', name: 'testcontainers' + testImplementation project(path: ':envoy-control-tests') } diff --git a/envoy-control-source-consul/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulClusterStateChangesTest.kt b/envoy-control-source-consul/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulClusterStateChangesTest.kt index 3069ef9f5..86c7f803e 100644 --- a/envoy-control-source-consul/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulClusterStateChangesTest.kt +++ b/envoy-control-source-consul/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulClusterStateChangesTest.kt @@ -2,9 +2,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.consul.services import com.ecwid.consul.v1.agent.AgentConsulClient import com.ecwid.consul.v1.agent.model.NewService -import com.pszymczyk.consul.ConsulStarterBuilder -import com.pszymczyk.consul.infrastructure.Ports -import com.pszymczyk.consul.junit.ConsulExtension import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -12,6 +9,7 @@ import org.junit.jupiter.api.extension.RegisterExtension import org.mockito.Mockito import org.mockito.Mockito.verify import pl.allegro.tech.discovery.consul.recipes.ConsulRecipes +import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulExtension import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler import reactor.test.StepVerifier import java.net.URI @@ -21,23 +19,17 @@ import java.util.concurrent.Executors class ConsulClusterStateChangesTest { companion object { - private val consulHttpPort = Ports.nextAvailable() @JvmField @RegisterExtension - val consul = ConsulExtension( - ConsulStarterBuilder.consulStarter() - .withHttpPort(consulHttpPort) - .withConsulVersion("1.11.4") - .build() - ) + val consulExtension = ConsulExtension() } private val watcher = ConsulRecipes .consulRecipes() .build() .consulWatcher(Executors.newFixedThreadPool(10)) - .withAgentUri(URI("http://localhost:${consul.httpPort}")) + .withAgentUri(URI("http://localhost:${consulExtension.server.port}")) .build() private val readinessStateHandler = Mockito.spy(ReadinessStateHandler::class.java) private val serviceWatchPolicy = Mockito.mock(ServiceWatchPolicy::class.java) @@ -46,12 +38,11 @@ class ConsulClusterStateChangesTest { readinessStateHandler = readinessStateHandler, serviceWatchPolicy = serviceWatchPolicy ) - private val client = AgentConsulClient("localhost", consul.httpPort) + private val client = AgentConsulClient("localhost", consulExtension.server.port) @BeforeEach fun reset() { watcher.close() - consul.reset() Mockito.`when`(serviceWatchPolicy.shouldBeWatched(Mockito.anyString(), Mockito.anyList())).thenReturn(true) } diff --git a/envoy-control-tests/build.gradle b/envoy-control-tests/build.gradle index f455842c4..1c0826c56 100644 --- a/envoy-control-tests/build.gradle +++ b/envoy-control-tests/build.gradle @@ -1,27 +1,28 @@ +plugins { + id 'org.springframework.boot' apply false +} + dependencies { implementation project(':envoy-control-runner') - implementation group: 'org.assertj', name: 'assertj-core', version: versions.assertj - implementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: versions.junit - implementation group: 'org.junit.jupiter', name: 'junit-jupiter-params', version: versions.junit - implementation group: 'org.awaitility', name: 'awaitility', version: versions.awaitility - implementation(group: 'com.pszymczyk.consul', name: 'embedded-consul', version: versions.embedded_consul) { - exclude group: 'org.apache.httpcomponents', module: 'httpclient' - } - implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: versions.okhttp + implementation group: 'org.assertj', name: 'assertj-core' + implementation group: 'org.junit.jupiter', name: 'junit-jupiter-api' + implementation group: 'org.junit.jupiter', name: 'junit-jupiter-params' + implementation group: 'org.awaitility', name: 'awaitility' + implementation group: 'com.squareup.okhttp3', name: 'okhttp' - implementation "org.apache.httpcomponents:httpcore:4.4.15" - implementation "org.apache.httpcomponents:httpclient:4.5.5" + implementation group: 'org.apache.httpcomponents.core5', name: 'httpcore5' + implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5' implementation group: 'eu.rekawek.toxiproxy', name: 'toxiproxy-java', version: versions.toxiproxy - runtimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine', version: versions.junit - implementation group: 'org.testcontainers', name: 'junit-jupiter', version: versions.testcontainers - implementation group: 'org.testcontainers', name: 'testcontainers', version: versions.testcontainers + runtimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine' + implementation group: 'org.testcontainers', name: 'junit-jupiter' + implementation group: 'org.testcontainers', name: 'testcontainers' } test { useJUnitPlatform { - excludeTags 'reliability' + excludeTags ('reliability', 'flaky') } maxParallelForks = 1 testClassesDirs = project.sourceSets.main.output.classesDirs @@ -40,6 +41,19 @@ task reliabilityTest(type: Test) { testClassesDirs = project.sourceSets.main.output.classesDirs } +task flakyTest(type: Test) { + systemProperty 'RELIABILITY_FAILURE_DURATION_SECONDS', System.getProperty('RELIABILITY_FAILURE_DURATION_SECONDS', '300') + useJUnitPlatform { + includeTags 'flaky' + } + + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + testClassesDirs = project.sourceSets.main.output.classesDirs +} + tasks.withType(Test).configureEach { project.findProperty("envoyVersion")?.with { systemProperty("pl.allegro.tech.servicemesh.envoyVersion", it) } } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EndpointMetadataMergingTests.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EndpointMetadataMergingTests.kt index 98eb5b8cc..352ee037d 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EndpointMetadataMergingTests.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EndpointMetadataMergingTests.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted @@ -10,6 +11,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlExtension import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension +@Tag("flaky") open class EndpointMetadataMergingTests { companion object { diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSynchronizationTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSynchronizationTest.kt index d2a1b8d2b..cc163f075 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSynchronizationTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoyControlSynchronizationTest.kt @@ -78,22 +78,6 @@ internal class EnvoyControlSynchronizationTest { waitServiceOkAndFrom("echo", serviceLocal) } - @Test - fun `latency between service registration in remote dc and being able to access it via envoy should be similar to envoy-control polling interval`() { - // when - val latency = measureRegistrationToAccessLatency( - registerService = { name, target -> registerServiceInRemoteDc(name, target) }, - readinessCheck = { name, target -> waitServiceOkAndFrom(name, target) } - ) - - // then - logger.info("remote dc latency: $latency") - - val tolerance = Duration.ofMillis(400) + stateSampleDuration - val expectedMax = (pollingInterval + tolerance).toMillis() - assertThat(latency.max()).isLessThanOrEqualTo(expectedMax) - } - @Test fun `latency between service registration in local dc and being able to access it via envoy should be less than 0,5s + stateSampleDuration`() { // when diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index beffcee16..912b5c89b 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -20,6 +20,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.RDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN +import java.util.function.Consumer class XdsMetricsDiscoveryServerCallbacksTest : MetricsDiscoveryServerCallbacksTest { companion object { @@ -236,7 +237,7 @@ interface MetricsDiscoveryServerCallbacksTest { expectedGrpcRequestsCounterValues().forEach { (type, condition) -> val counterValue = meterRegistry.counterValue("grpc.requests.$type") println("$type $counterValue") - assertThat(counterValue).satisfies { condition(it) } + assertThat(counterValue).satisfies(Consumer { condition(it) }) } } } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotUpdaterBadConfigTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotUpdaterBadConfigTest.kt index 527f7b940..681a50b6d 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotUpdaterBadConfigTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/SnapshotUpdaterBadConfigTest.kt @@ -6,6 +6,7 @@ import org.junit.jupiter.api.assertThrows import org.junit.jupiter.api.extension.RegisterExtension import org.testcontainers.containers.ContainerLaunchException import org.testcontainers.containers.Network +import org.testcontainers.containers.startupcheck.IndefiniteWaitOneShotStartupCheckStrategy import pl.allegro.tech.servicemesh.envoycontrol.assertions.isFrom import pl.allegro.tech.servicemesh.envoycontrol.assertions.isOk import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted @@ -15,7 +16,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyContainer import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlExtension import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension -import java.time.Duration class SnapshotUpdaterBadConfigTest { companion object { @@ -72,7 +72,7 @@ class SnapshotUpdaterBadConfigTest { envoyControl.app.grpcPort ) .withNetwork(Network.SHARED) - .withStartupTimeout(Duration.ofSeconds(10)) + .withStartupCheckStrategy(IndefiniteWaitOneShotStartupCheckStrategy()) .start() } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/EnvoyAssertions.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/EnvoyAssertions.kt index ec4356f5c..cb717e2d0 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/EnvoyAssertions.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/EnvoyAssertions.kt @@ -5,6 +5,7 @@ import com.fasterxml.jackson.module.kotlin.readValue import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.ObjectAssert import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyContainer +import java.util.function.Consumer private class RbacLog( val protocol: String, @@ -27,7 +28,7 @@ private val mapper = jacksonObjectMapper() fun isRbacAccessLog(log: String) = log.startsWith(RBAC_LOG_PREFIX) -fun ObjectAssert.hasNoRBACDenials(): ObjectAssert = satisfies { +fun ObjectAssert.hasNoRBACDenials(): ObjectAssert = satisfies(Consumer { val admin = it.admin() assertThat(admin.statValue("http.ingress_http.rbac.denied")?.toInt()).isZero() assertThat(admin.statValue("http.ingress_https.rbac.denied")?.toInt()).isZero() @@ -35,7 +36,7 @@ fun ObjectAssert.hasNoRBACDenials(): ObjectAssert.hasOneAccessDenialWithActionBlock( @@ -155,7 +156,7 @@ private fun ObjectAssert.hasOneAccessDenial( protocol: String, logPredicate: RbacLog, shadowDenied: Boolean = true -) = satisfies { +) = satisfies(Consumer { val admin = it.admin() val blockedRequestsCount = admin.statValue("http.ingress_$protocol.rbac.denied")?.toInt() val loggedRequestsCount = admin.statValue("http.ingress_$protocol.rbac.shadow_denied")?.toInt() @@ -172,9 +173,9 @@ private fun ObjectAssert.hasOneAccessDenial( assertThat(it.logRecorder.getRecordedLogs()).filteredOn(::isRbacAccessLog) .hasSize(1).first() .matchesRbacAccessDeniedLog(logPredicate) -} +}) -private fun ObjectAssert.matchesRbacAccessDeniedLog(logPredicate: RbacLog) = satisfies { +private fun ObjectAssert.matchesRbacAccessDeniedLog(logPredicate: RbacLog) = satisfies(Consumer { val parsed = mapper.readValue(it.removePrefix(RBAC_LOG_PREFIX)) // protocol is required because we check metrics assertThat(parsed.protocol).isEqualTo(logPredicate.protocol) @@ -192,7 +193,7 @@ private fun ObjectAssert.matchesRbacAccessDeniedLog(logPredicate: RbacLo assertEqualProperty(parsed, logPredicate, RbacLog::rbacAction) assertEqualProperty(parsed, logPredicate, RbacLog::statusCode) assertEqualProperty(parsed, logPredicate, RbacLog::jwtTokenStatus) -} +}) private fun assertEqualProperty(actual: RbacLog, expected: RbacLog, supplier: RbacLog.() -> T) { expected.supplier()?.let { diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/HttpsEchoResponseAssertions.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/HttpsEchoResponseAssertions.kt index adf1cc509..49a1b9b41 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/HttpsEchoResponseAssertions.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/assertions/HttpsEchoResponseAssertions.kt @@ -4,17 +4,18 @@ import org.assertj.core.api.Assertions import org.assertj.core.api.ObjectAssert import pl.allegro.tech.servicemesh.envoycontrol.config.service.HttpsEchoContainer import pl.allegro.tech.servicemesh.envoycontrol.config.service.HttpsEchoResponse +import java.util.function.Consumer fun ObjectAssert.isOk(): ObjectAssert { matches { it.response.isSuccessful } return this } -fun ObjectAssert.hasSNI(serverName: String): ObjectAssert = satisfies { +fun ObjectAssert.hasSNI(serverName: String): ObjectAssert = satisfies(Consumer { val actualServerName = HttpsEchoResponse.objectMapper.readTree(it.body).at("/connection/servername").textValue() Assertions.assertThat(actualServerName).isEqualTo(serverName) -} +}) -fun ObjectAssert.isFrom(container: HttpsEchoContainer) = satisfies { +fun ObjectAssert.isFrom(container: HttpsEchoContainer) = satisfies(Consumer { Assertions.assertThat(container.containerName()).isEqualTo(it.hostname) -} +}) diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/ClientsFactory.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/ClientsFactory.kt index c122bba73..ba9bfd91e 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/ClientsFactory.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/ClientsFactory.kt @@ -1,9 +1,9 @@ package pl.allegro.tech.servicemesh.envoycontrol.config import okhttp3.OkHttpClient -import org.apache.http.conn.ssl.NoopHostnameVerifier -import org.apache.http.conn.ssl.TrustAllStrategy -import org.apache.http.ssl.SSLContextBuilder +import org.apache.hc.client5.http.ssl.NoopHostnameVerifier +import org.apache.hc.client5.http.ssl.TrustAllStrategy +import org.apache.hc.core5.ssl.SSLContextBuilder import java.security.KeyStore import java.time.Duration import javax.net.ssl.SSLSocketFactory diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulContainer.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulContainer.kt index d6988c116..cf734b212 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulContainer.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulContainer.kt @@ -13,7 +13,7 @@ class ConsulContainer( val internalPort: Int = 8500 ) : GenericContainer( ImageFromDockerfile().withDockerfileFromBuilder { - it.from("consul:1.10.12") + it.from("consul:1.11.11") .run("apk", "add", "iproute2") .cmd(consulConfig.launchCommand()) .expose(internalPort) diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulOperations.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulOperations.kt index 342c7b9a2..b6843d6a6 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulOperations.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulOperations.kt @@ -70,6 +70,21 @@ class ConsulOperations(port: Int) { tags ) + fun registerServiceWithEnvoyOnEgress( + extension: EnvoyExtension, + id: String = UUID.randomUUID().toString(), + name: String, + registerDefaultCheck: Boolean = false, + tags: List = listOf("a") + ) = registerService( + id, + name, + extension.container.ipAddress(), + EnvoyContainer.EGRESS_LISTENER_CONTAINER_PORT, + registerDefaultCheck, + tags + ) + fun deregisterService(id: String) { client.agentServiceDeregister(id) } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulSetup.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulSetup.kt index 2207d9ce9..c54f823dc 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulSetup.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/consul/ConsulSetup.kt @@ -1,8 +1,8 @@ package pl.allegro.tech.servicemesh.envoycontrol.config.consul -import com.pszymczyk.consul.infrastructure.Ports import org.testcontainers.containers.Network import org.testcontainers.junit.jupiter.Testcontainers +import pl.allegro.tech.servicemesh.envoycontrol.utils.Ports @Testcontainers class ConsulSetup( diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyAdmin.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyAdmin.kt index b7cdf9e57..8be80d7fb 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyAdmin.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyAdmin.kt @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.module.kotlin.KotlinModule - import okhttp3.MediaType.Companion.toMediaType import okhttp3.OkHttpClient import okhttp3.Request @@ -109,14 +108,15 @@ class EnvoyAdmin( private val client = OkHttpClient.Builder() .build() - private fun get(path: String): Response = - client.newCall( + private fun get(path: String): Response { + return client.newCall( Request.Builder() .get() .url("$address/$path") .build() ) .execute().addToCloseableResponses() + } private fun post(path: String): Response = client.newCall( diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoycontrol/EnvoyControlTestApp.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoycontrol/EnvoyControlTestApp.kt index 2bc55e721..168cd1b0f 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoycontrol/EnvoyControlTestApp.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoycontrol/EnvoyControlTestApp.kt @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.module.kotlin.KotlinModule -import com.pszymczyk.consul.infrastructure.Ports import io.micrometer.core.instrument.MeterRegistry import okhttp3.Credentials @@ -22,6 +21,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.HttpResponseCloser. import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState import pl.allegro.tech.servicemesh.envoycontrol.snapshot.debug.Versions +import pl.allegro.tech.servicemesh.envoycontrol.utils.Ports import java.time.Duration interface EnvoyControlTestApp { diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/service/EchoContainer.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/service/EchoContainer.kt index 9c1aa15f1..c704ad8f0 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/service/EchoContainer.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/service/EchoContainer.kt @@ -6,7 +6,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.testcontainers.GenericCon import java.util.UUID import java.util.Locale -class EchoContainer : GenericContainer("hashicorp/http-echo:latest"), ServiceContainer { +class EchoContainer : GenericContainer("jxlwqq/http-echo"), ServiceContainer { val response = UUID.randomUUID().toString() @@ -14,7 +14,7 @@ class EchoContainer : GenericContainer("hashicorp/http-echo:lates super.configure() withExposedPorts(PORT) withNetwork(Network.SHARED) - withCommand(String.format(Locale.getDefault(), "-text=%s", response)) + withCommand(String.format(Locale.getDefault(), "--text=%s --addr=:%s", response, PORT)) waitingFor(Wait.forHttp("/").forStatusCode(200)) } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/IncomingPermissionsLoggingModeTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/IncomingPermissionsLoggingModeTest.kt index f943a3811..b1b889281 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/IncomingPermissionsLoggingModeTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/IncomingPermissionsLoggingModeTest.kt @@ -365,7 +365,7 @@ class IncomingPermissionsLoggingModeTest { } @Test - @Tag("BAD") + @Tag("flaky") fun `echo2 should allow echo3 to access 'log-unlisted-clients' endpoint over https`() { // when val echo2Response = echo3Envoy.egressOperations.callService("echo2", pathAndQuery = "/log-unlisted-clients") @@ -494,7 +494,7 @@ class IncomingPermissionsLoggingModeTest { } @Test - @Tag("BAD") + @Tag("flaky") fun `echo should NOT allow echo2 to access 'block-unlisted-clients-by-default' endpoint over https`() { // when val echoResponse = @@ -591,7 +591,7 @@ class IncomingPermissionsLoggingModeTest { } @Test - @Tag("BAD") + @Tag("flaky") fun `echo2 should allow echo to access unlisted endpoint over https and log it`() { // when val echo2Response = echoEnvoy.egressOperations.callService("echo2", pathAndQuery = "/unlisted-endpoint") diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/TlsBasedAuthenticationTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/TlsBasedAuthenticationTest.kt index a7cc3f8d7..432865c50 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/TlsBasedAuthenticationTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/permissions/TlsBasedAuthenticationTest.kt @@ -5,7 +5,7 @@ import okhttp3.Request import okhttp3.Response import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.Tag import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.RegisterExtension import pl.allegro.tech.servicemesh.envoycontrol.assertions.isForbidden @@ -189,8 +189,8 @@ internal class TlsBasedAuthenticationTest { } } - @Disabled("Flaky test") @Test + @Tag("flaky") fun `should encrypt traffic between selected services even if only one endpoint supports mtls`() { // given 2 endpoints registerEcho2Insecure() diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/reliability/Toxiproxy.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/reliability/Toxiproxy.kt index 8b2b317c6..c684bae1f 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/reliability/Toxiproxy.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/reliability/Toxiproxy.kt @@ -1,6 +1,5 @@ package pl.allegro.tech.servicemesh.envoycontrol.reliability -import com.pszymczyk.consul.infrastructure.Ports import eu.rekawek.toxiproxy.Proxy import org.testcontainers.junit.jupiter.Testcontainers import pl.allegro.tech.servicemesh.envoycontrol.config.BaseEnvoyTest.Companion.consul @@ -8,6 +7,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.BaseEnvoyTest.Companion.n import pl.allegro.tech.servicemesh.envoycontrol.config.containers.ToxiproxyContainer import pl.allegro.tech.servicemesh.envoycontrol.config.containers.ToxiproxyContainer.Companion.internalToxiproxyPort import pl.allegro.tech.servicemesh.envoycontrol.config.testcontainers.GenericContainer.Companion.allInterfaces +import pl.allegro.tech.servicemesh.envoycontrol.utils.Ports @Testcontainers internal class Toxiproxy private constructor() { diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/trafficsplitting/TrafficSplitting.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/trafficsplitting/TrafficSplitting.kt new file mode 100644 index 000000000..a49c7f996 --- /dev/null +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/trafficsplitting/TrafficSplitting.kt @@ -0,0 +1,67 @@ +import TrafficSplitting.deltaPercentage +import TrafficSplitting.upstreamServiceName +import org.assertj.core.api.Assertions +import org.assertj.core.data.Percentage +import pl.allegro.tech.servicemesh.envoycontrol.assertions.isFrom +import pl.allegro.tech.servicemesh.envoycontrol.assertions.isOk +import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted +import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.CallStats +import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension + +internal object TrafficSplitting { + const val upstreamServiceName = "service-1" + const val serviceName = "echo2" + const val deltaPercentage = 20.0 +} + +fun EnvoyExtension.verifyIsReachable(echoServiceExtension: EchoServiceExtension, service: String) { + untilAsserted { + this.egressOperations.callService(service).also { + Assertions.assertThat(it).isOk().isFrom(echoServiceExtension) + } + } +} + +fun CallStats.verifyCallsCountCloseTo(service: EchoServiceExtension, expectedCount: Int): CallStats { + Assertions.assertThat(this.hits(service)).isCloseTo(expectedCount, Percentage.withPercentage(deltaPercentage)) + return this +} + +fun CallStats.verifyCallsCountGreaterThan(service: EchoServiceExtension, hits: Int): CallStats { + Assertions.assertThat(this.hits(service)).isGreaterThan(hits) + return this +} + +fun EnvoyExtension.callUpstreamServiceRepeatedly( + vararg services: EchoServiceExtension, + numberOfCalls: Int = 100, +): CallStats { + val stats = CallStats(services.asList()) + this.egressOperations.callServiceRepeatedly( + service = upstreamServiceName, + stats = stats, + minRepeat = numberOfCalls, + maxRepeat = numberOfCalls, + repeatUntil = { true }, + headers = mapOf() + ) + return stats +} + +fun EnvoyExtension.callUpstreamServiceRepeatedly( + vararg services: EchoServiceExtension, + numberOfCalls: Int = 100, + tag: String? +): CallStats { + val stats = CallStats(services.asList()) + this.egressOperations.callServiceRepeatedly( + service = upstreamServiceName, + stats = stats, + minRepeat = numberOfCalls, + maxRepeat = numberOfCalls, + repeatUntil = { true }, + headers = tag?.let { mapOf("x-service-tag" to it) } ?: emptyMap(), + ) + return stats +} diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/trafficsplitting/WeightedClustersRoutingTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/trafficsplitting/WeightedClustersRoutingTest.kt new file mode 100644 index 000000000..564660fce --- /dev/null +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/trafficsplitting/WeightedClustersRoutingTest.kt @@ -0,0 +1,114 @@ +package pl.allegro.tech.servicemesh.envoycontrol.trafficsplitting + +import TrafficSplitting.serviceName +import TrafficSplitting.upstreamServiceName +import callUpstreamServiceRepeatedly +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.Xds +import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulMultiClusterExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlClusteredExtension +import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension +import verifyCallsCountCloseTo +import verifyCallsCountGreaterThan +import verifyIsReachable +import java.time.Duration + +class WeightedClustersRoutingTest { + companion object { + private const val forceTrafficZone = "dc2" + + private val properties = mapOf( + "envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ofSeconds(0), + "envoy-control.sync.enabled" to true, + "envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.zoneName" to forceTrafficZone, + "envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties.$serviceName.main" to 90, + "envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties.$serviceName.secondary" to 10, + "envoy-control.envoy.snapshot.loadBalancing.priorities.zonePriorities" to mapOf( + "dc1" to mapOf( + "dc1" to 0, + "dc2" to 1 + ), + "dc2" to mapOf( + "dc1" to 1, + "dc2" to 0, + ), + ) + ) + + private val echo2Config = """ + node: + metadata: + proxy_settings: + outgoing: + dependencies: + - service: "service-1" + """.trimIndent() + + private val config = Xds.copy(configOverride = echo2Config, serviceName = "echo2") + + @JvmField + @RegisterExtension + val consul = ConsulMultiClusterExtension() + + @JvmField + @RegisterExtension + val envoyControl = + EnvoyControlClusteredExtension(consul.serverFirst, { properties }, listOf(consul)) + + @JvmField + @RegisterExtension + val envoyControl2 = + EnvoyControlClusteredExtension(consul.serverSecond, { properties }, listOf(consul)) + + @JvmField + @RegisterExtension + val echoServiceDC1 = EchoServiceExtension() + + @JvmField + @RegisterExtension + val upstreamServiceDC1 = EchoServiceExtension() + + @JvmField + @RegisterExtension + val upstreamServiceDC2 = EchoServiceExtension() + + @JvmField + @RegisterExtension + val echoEnvoyDC1 = EnvoyExtension(envoyControl, localService = echoServiceDC1, config) + @JvmField + @RegisterExtension + val echoEnvoyDC2 = EnvoyExtension(envoyControl2) + } + + @Test + fun `should route traffic according to weights`() { + consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName) + + consul.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName) + echoEnvoyDC1.verifyIsReachable(upstreamServiceDC1, upstreamServiceName) + + consul.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName) + echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName) + + echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2) + .verifyCallsCountCloseTo(upstreamServiceDC1, 90) + .verifyCallsCountGreaterThan(upstreamServiceDC2, 1) + } + + @Test + fun `should route traffic according to weights with service tag`() { + consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName) + + consul.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName, tags = listOf("tag")) + echoEnvoyDC1.verifyIsReachable(upstreamServiceDC1, upstreamServiceName) + + consul.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName, tags = listOf("tag")) + echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName) + + echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2, tag = "tag") + .verifyCallsCountCloseTo(upstreamServiceDC1, 90) + .verifyCallsCountGreaterThan(upstreamServiceDC2, 1) + } +} diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index e708b1c02..7f93135c4 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 05679dc3c..ac72c34e8 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.1.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 4f906e0c8..0adc8e1a5 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ -#!/usr/bin/env sh +#!/bin/sh # -# Copyright 2015 the original author or authors. +# Copyright © 2015-2021 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,67 +17,99 @@ # ############################################################################## -## -## Gradle start up script for UN*X -## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# ############################################################################## # Attempt to set APP_HOME + # Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null -APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD="maximum" +MAX_FD=maximum warn () { echo "$*" -} +} >&2 die () { echo echo "$*" echo exit 1 -} +} >&2 # OS specific support (must be 'true' or 'false'). cygwin=false msys=false darwin=false nonstop=false -case "`uname`" in - CYGWIN* ) - cygwin=true - ;; - Darwin* ) - darwin=true - ;; - MINGW* ) - msys=true - ;; - NONSTOP* ) - nonstop=true - ;; +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -87,9 +119,9 @@ CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" + JAVACMD=$JAVA_HOME/jre/sh/java else - JAVACMD="$JAVA_HOME/bin/java" + JAVACMD=$JAVA_HOME/bin/java fi if [ ! -x "$JAVACMD" ] ; then die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME @@ -98,88 +130,120 @@ Please set the JAVA_HOME variable in your environment to match the location of your Java installation." fi else - JAVACMD="java" - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then - MAX_FD_LIMIT=`ulimit -H -n` - if [ $? -eq 0 ] ; then - if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then - MAX_FD="$MAX_FD_LIMIT" - fi - ulimit -n $MAX_FD - if [ $? -ne 0 ] ; then - warn "Could not set maximum file descriptor limit: $MAX_FD" - fi - else - warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" - fi +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac fi -# For Darwin, add options to specify how the application appears in the dock -if $darwin; then - GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" -fi +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. # For Cygwin or MSYS, switch paths to Windows format before running java -if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then - APP_HOME=`cygpath --path --mixed "$APP_HOME"` - CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` - - JAVACMD=`cygpath --unix "$JAVACMD"` - - # We build the pattern for arguments to be converted via cygpath - ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` - SEP="" - for dir in $ROOTDIRSRAW ; do - ROOTDIRS="$ROOTDIRS$SEP$dir" - SEP="|" - done - OURCYGPATTERN="(^($ROOTDIRS))" - # Add a user-defined pattern to the cygpath arguments - if [ "$GRADLE_CYGPATTERN" != "" ] ; then - OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" - fi +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + # Now convert the arguments - kludge to limit ourselves to /bin/sh - i=0 - for arg in "$@" ; do - CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` - CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option - - if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition - eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` - else - eval `echo args$i`="\"$arg\"" + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) fi - i=`expr $i + 1` + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg done - case $i in - 0) set -- ;; - 1) set -- "$args0" ;; - 2) set -- "$args0" "$args1" ;; - 3) set -- "$args0" "$args1" "$args2" ;; - 4) set -- "$args0" "$args1" "$args2" "$args3" ;; - 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; - esac fi -# Escape application args -save () { - for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done - echo " " -} -APP_ARGS=`save "$@"` -# Collect all arguments for the java command, following the shell quoting and substitution rules -eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index ac1b06f93..6689b85be 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -14,7 +14,7 @@ @rem limitations under the License. @rem -@if "%DEBUG%" == "" @echo off +@if "%DEBUG%"=="" @echo off @rem ########################################################################## @rem @rem Gradle startup script for Windows @@ -25,7 +25,8 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @@ -40,7 +41,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute +if %ERRORLEVEL% equ 0 goto execute echo. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. @@ -75,13 +76,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar :end @rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd +if %ERRORLEVEL% equ 0 goto mainEnd :fail rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% :mainEnd if "%OS%"=="Windows_NT" endlocal diff --git a/tools/docker-compose.yaml b/tools/docker-compose.yaml index 63606a60e..b7e8183c1 100644 --- a/tools/docker-compose.yaml +++ b/tools/docker-compose.yaml @@ -3,7 +3,7 @@ version: '3' services: consul: container_name: consul - image: consul:1.5.3 + image: hashicorp/consul:1.11.11 ports: - "18500:8500" - "18300:8300" diff --git a/tools/envoy-control/Dockerfile b/tools/envoy-control/Dockerfile index 790daa9f0..71152037f 100644 --- a/tools/envoy-control/Dockerfile +++ b/tools/envoy-control/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:6.6.1-jdk11 AS builder +FROM gradle:8.3-jdk17 AS builder COPY --chown=gradle:gradle settings.gradle build.gradle gradle.properties /home/gradle/src/ COPY --chown=gradle:gradle envoy-control-core/ /home/gradle/src/envoy-control-core/ COPY --chown=gradle:gradle envoy-control-runner/ /home/gradle/src/envoy-control-runner/ @@ -8,7 +8,7 @@ COPY --chown=gradle:gradle envoy-control-source-consul/ /home/gradle/src/envoy-c WORKDIR /home/gradle/src RUN gradle :envoy-control-runner:assemble --parallel --no-daemon -FROM adoptopenjdk/openjdk11:alpine-jre +FROM eclipse-temurin:17-jre RUN mkdir /tmp/envoy-control-dist /tmp/envoy-control /bin/envoy-control /etc/envoy-control /var/tmp/config COPY --from=builder /home/gradle/src/envoy-control-runner/build/distributions/ /tmp/envoy-control-dist diff --git a/tools/service/Dockerfile b/tools/service/Dockerfile index 2930ea82e..f75d4f753 100644 --- a/tools/service/Dockerfile +++ b/tools/service/Dockerfile @@ -1,4 +1,5 @@ FROM mendhak/http-https-echo +USER root RUN apk add --update \ curl \ && rm -rf /var/cache/apk/*