Skip to content

Commit

Permalink
Implemented locality weighted load balancing | Added metric test
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Feb 1, 2024
1 parent 5f9fbc8 commit 8b882e7
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ val AdsWithDisabledEndpointPermissions = EnvoyConfig("envoy/config_ads_disabled_
val AdsWithStaticListeners = EnvoyConfig("envoy/config_ads_static_listeners.yaml")
val AdsWithNoDependencies = EnvoyConfig("envoy/config_ads_no_dependencies.yaml")
val Xds = EnvoyConfig("envoy/config_xds.yaml")
val XdsWithStatConfig = EnvoyConfig("envoy/config_xds_stat_config.yaml")
val RandomConfigFile = listOf(Ads, Xds, DeltaAds).random()
val OAuthEnvoyConfig = EnvoyConfig("envoy/config_oauth.yaml")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class EnvoyAdmin(
return splitedStats[1].trim()
}

fun rawMetricValue(metricName: String): String? = get("stats/prometheus").body?.use {
it.string().lines().find { line -> line.contains(metricName) }
}

fun resetCounters() {
post("reset_counters")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class EnvoyContainer(
private val envoyControl1XdsPort: Int,
private val envoyControl2XdsPort: Int = envoyControl1XdsPort,
private val logLevel: String = "info",
private val zone: String = "",
image: String = DEFAULT_IMAGE
) : SSLGenericContainer<EnvoyContainer>(
dockerfileBuilder = DockerfileBuilder()
Expand Down Expand Up @@ -69,7 +70,7 @@ class EnvoyContainer(
withEnv(ENVOY_UID_ENV_NAME, "0")
withExposedPorts(EGRESS_LISTENER_CONTAINER_PORT, INGRESS_LISTENER_CONTAINER_PORT, ADMIN_PORT)
withPrivilegedMode(true)

val zoneFlag = if(zone != "") "--service-zone $zone" else ""
withCommand(
"/bin/sh", "/usr/local/bin/launch_envoy.sh",
Integer.toString(envoyControl1XdsPort),
Expand All @@ -81,7 +82,8 @@ class EnvoyContainer(
config.privateKey,
config.serviceName,
"--config-yaml", config.configOverride,
"-l", logLevel
"-l", logLevel,
zoneFlag
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import java.time.Duration
class EnvoyExtension(
private val envoyControl: EnvoyControlExtensionBase,
private val localService: ServiceExtension<*>? = null,
private val config: EnvoyConfig = RandomConfigFile
private val config: EnvoyConfig = RandomConfigFile,
zone: String = ""
) : BeforeAllCallback, AfterAllCallback, AfterEachCallback {

companion object {
Expand All @@ -29,7 +30,8 @@ class EnvoyExtension(
val container: EnvoyContainer = EnvoyContainer(
config,
{ localService?.container()?.ipAddress() ?: "127.0.0.1" },
envoyControl.app.grpcPort
envoyControl.app.grpcPort,
zone = zone
).withNetwork(Network.SHARED)

val ingressOperations: IngressOperations = IngressOperations(container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package pl.allegro.tech.servicemesh.envoycontrol.trafficsplitting
import TrafficSplitting.serviceName
import TrafficSplitting.upstreamServiceName
import callUpstreamServiceRepeatedly
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import pl.allegro.tech.servicemesh.envoycontrol.config.Xds
import pl.allegro.tech.servicemesh.envoycontrol.config.XdsWithStatConfig
import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulMultiClusterExtension
import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension
import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlClusteredExtension
Expand All @@ -19,8 +20,6 @@ class WeightedClustersRoutingTest {
private const val forceTrafficZone = "dc2"

private val properties = mapOf(
"pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.EnvoyEndpointsFactory" to "DEBUG",
"pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory" to "DEBUG",
"envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ofSeconds(0),
"envoy-control.sync.enabled" to true,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.zoneName" to forceTrafficZone,
Expand Down Expand Up @@ -55,7 +54,7 @@ class WeightedClustersRoutingTest {
- service: "service-1"
""".trimIndent()

private val config = Xds.copy(configOverride = echo2Config, serviceName = "echo2")
private val config = XdsWithStatConfig.copy(configOverride = echo2Config, serviceName = "echo2")

@JvmField
@RegisterExtension
Expand Down Expand Up @@ -94,14 +93,15 @@ class WeightedClustersRoutingTest {

@JvmField
@RegisterExtension
val echoEnvoyDC1 = EnvoyExtension(envoyControl, localService = echoServiceDC1, config)
val echoEnvoyDC1 = EnvoyExtension(envoyControl, localService = echoServiceDC1, config, zone = "dc1")

@JvmField
@RegisterExtension
val echoEnvoyDC2 = EnvoyExtension(envoyControl2)
val echoEnvoyDC2 = EnvoyExtension(envoyControl2, zone = "dc2")

@JvmField
@RegisterExtension
val echoEnvoyDC3 = EnvoyExtension(envoyControl3)
val echoEnvoyDC3 = EnvoyExtension(envoyControl3, zone = "dc3")
}

@Test
Expand All @@ -117,21 +117,39 @@ class WeightedClustersRoutingTest {
echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2)
.verifyCallsCountCloseTo(upstreamServiceDC1, 75)
.verifyCallsCountCloseTo(upstreamServiceDC2, 25)
println("snapshot: " + envoyControl.app.getGlobalSnapshot(false).toString())

assertThat(
echoEnvoyDC1.container.admin().rawMetricValue(
"envoy_cluster_zone_dc1__upstream_rq_xx{envoy_response_code_class=\"2\"," +
"envoy_cluster_name=\"service-1\",destination_dc=\"dc2\"}"
)
).isNotNull()
}

@Test
fun `should route traffic according to weights with service tag`() {
consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName)

consul.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName, tags = listOf("tag"))
consul.serverFirst.operations.registerService(
upstreamServiceDC1,
name = upstreamServiceName,
tags = listOf("tag")
)
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC1, upstreamServiceName)

consul.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName, tags = listOf("tag"))
consul.serverSecond.operations.registerService(
upstreamServiceDC2,
name = upstreamServiceName,
tags = listOf("tag")
)
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName)

echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2, tag = "tag")
.verifyCallsCountCloseTo(upstreamServiceDC1, 75)
.verifyCallsCountCloseTo(upstreamServiceDC2, 25)
}

private fun assertMetrics() {
val metric = "cluster.<name>.zone.}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
admin:
access_log_path: /dev/null
address:
socket_address: { address: 0.0.0.0, port_value: 10000 }
dynamic_resources:
lds_config:
resource_api_version: V3
api_config_source:
api_type: GRPC
transport_api_version: V3
grpc_services:
envoy_grpc:
cluster_name: envoy-control-xds
cds_config:
resource_api_version: V3
api_config_source:
api_type: GRPC
transport_api_version: V3
grpc_services:
envoy_grpc:
cluster_name: envoy-control-xds
stats_config:
stats_tags:
- tag_name: destination_dc
regex: ^cluster\..+.zone\.[a-zA-Z]+-?[a-zA-Z]?[0-9].([a-zA-Z]+-?[a-zA-Z]?[0-9]).*

node:
cluster: test-cluster
id: test-id
metadata:
ingress_host: "0.0.0.0"
ingress_port: 5001
egress_host: "0.0.0.0"
egress_port: 5000
use_remote_address: true
generate_request_id: true
preserve_external_request_id: true
access_log_enabled: false
add_upstream_external_address_header: true
resources_dir: "/etc/envoy/extra"
service_name: "echo2"
proxy_settings:
incoming:
endpoints:
- path: "/endpoint"
clients: ["authorizedClient"]
outgoing:
dependencies:
- service: "service-1"
- service: "service-2"
- service: "service-3"
- service: "service-4"
- service: "service-5"
- service: "echo"
timeoutPolicy:
requestTimeout: "15s"
- service: "consul"
timeoutPolicy:
requestTimeout: "15s"
- service: "proxy1"
- service: "proxy2"
- service: "service-redirect"
handleInternalRedirect: true
- service: "host-rewrite-service"
rewriteHostHeader: true
- domain: "https://my.example.com"
- domain: "https://bad.host.example.com"
- domain: "https://www.example.com"
- domain: "https://www.example-redirect.com"
handleInternalRedirect: true

static_resources:
clusters:
- connect_timeout: 1s
load_assignment:
cluster_name: envoy-control-xds
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: HOST_IP
port_value: HOST_PORT
- endpoint:
address:
socket_address:
address: HOST_IP
port_value: HOST2_PORT
http2_protocol_options: {}
name: envoy-control-xds
- name: envoy-original-destination
type: ORIGINAL_DST
lb_policy: CLUSTER_PROVIDED
original_dst_lb_config:
use_http_header: true
connect_timeout:
seconds: 1
http_protocol_options:
allow_absolute_url: true
- name: local_service
type: STATIC
load_assignment:
cluster_name: local_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: LOCAL_SERVICE_IP
port_value: 5678
connect_timeout: 1s
- name: this_admin
type: STATIC
load_assignment:
cluster_name: this_admin
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 10000
connect_timeout:
seconds: 1

0 comments on commit 8b882e7

Please sign in to comment.