From cd0f2155bc12bbff5cddd426dab94436240deeec Mon Sep 17 00:00:00 2001 From: Asya Dailidava <133115055+nastassia-dailidava@users.noreply.github.com> Date: Mon, 21 Oct 2024 18:09:25 +0200 Subject: [PATCH 1/2] Fixed metric names (#438) * allegro-internal/flex-roadmap#819 changed names --- CHANGELOG.md | 4 ++ .../envoycontrol/groups/GroupChangeWatcher.kt | 4 +- .../MetricsDiscoveryServerCallbacks.kt | 6 +- .../envoycontrol/snapshot/SnapshotUpdater.kt | 60 +++++++++---------- .../synchronization/GlobalStateChanges.kt | 12 ++-- .../RemoteClusterStateChanges.kt | 8 +-- .../synchronization/RemoteServices.kt | 13 ++-- .../servicemesh/envoycontrol/utils/Metrics.kt | 20 +++++-- .../envoycontrol/utils/ReactorUtils.kt | 3 +- .../snapshot/SnapshotUpdaterTest.kt | 16 +++-- .../envoycontrol/utils/ReactorUtilsTest.kt | 8 +-- .../infrastructure/ControlPlaneConfig.kt | 28 +++------ .../consul/services/ConsulServiceChanges.kt | 14 ++--- .../MetricsDiscoveryServerCallbacksTest.kt | 5 +- 14 files changed, 97 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25dd586c6..69301cd37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ Lists all changes with user impact. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). +## [0.22.3] +### Changed +- Changed names of some metrics + ## [0.22.2] ### Changed - Migrated metrics to prometheus diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index 56cd44685..dfba4912c 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -10,8 +10,8 @@ import io.envoyproxy.controlplane.cache.XdsRequest import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics import pl.allegro.tech.servicemesh.envoycontrol.logger +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHANGE_WATCHER_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink @@ -38,7 +38,7 @@ internal class GroupChangeWatcher( return groupsChanged .measureBuffer("group-change-watcher", meterRegistry) .checkpoint("group-change-watcher-emitted") - .name(REACTOR_METRIC) + .name(CHANGE_WATCHER_METRIC) .tag(WATCH_TYPE_TAG, "group") .metrics() .doOnSubscribe { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt index 4121df56a..9acc69af1 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt @@ -2,16 +2,16 @@ package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks import io.envoyproxy.controlplane.cache.Resources import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest -import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tags -import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG import java.util.concurrent.atomic.AtomicInteger +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 2ca421461..63bd0e7a3 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -10,15 +10,17 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS import pl.allegro.tech.servicemesh.envoycontrol.groups.Group import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState -import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_ERROR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_GROUP_ERROR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_UPDATE_DURATION_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer @@ -60,12 +62,10 @@ class SnapshotUpdater( // step 2: only watches groups. if groups change we use the last services state and update those groups groups().subscribeOn(globalSnapshotScheduler) ) - .measureBuffer("snapshot-updater", meterRegistry, innerSources = 2) + .measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "snapshot-updater") - .tag(SNAPSHOT_STATUS_TAG, "merged") - .tag(UPDATE_TRIGGER_TAG, "global") + .name(SNAPSHOT_METRIC) + .tag(CHECKPOINT_TAG, "merged") .metrics() // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use @@ -101,20 +101,17 @@ class SnapshotUpdater( // see GroupChangeWatcher return onGroupAdded .publishOn(globalSnapshotScheduler) - .measureBuffer("snapshot-updater", meterRegistry) + .measureBuffer("snapshot-updater-groups-published", meterRegistry) .checkpoint("snapshot-updater-groups-published") + .name(SNAPSHOT_METRIC) + .tag(CHECKPOINT_TAG, "published") + .metrics() .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "snapshot-updater") - .tag(SNAPSHOT_STATUS_TAG, "published") - .tag(UPDATE_TRIGGER_TAG, "groups") - .metrics() .onErrorResume { e -> meterRegistry.counter( - ERRORS_TOTAL_METRIC, - Tags.of(UPDATE_TRIGGER_TAG, "groups", METRIC_EMITTER_TAG, "snapshot-updater") + SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "groups") ) .increment() logger.error("Unable to process new group", e) @@ -124,17 +121,16 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .name(REACTOR_METRIC) - .tag(UPDATE_TRIGGER_TAG, "services") - .tag(STATUS_TAG, "sampled") - .onBackpressureLatestMeasured("snapshot-updater", meterRegistry) + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "sampled") + .metrics() + .onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) - .measureBuffer("snapshot-updater", meterRegistry) + .measureBuffer("snapshot-updater-services-published", meterRegistry) .checkpoint("snapshot-updater-services-published") - .name(REACTOR_METRIC) - .tag(UPDATE_TRIGGER_TAG, "services") - .tag(STATUS_TAG, "published") + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "published") .metrics() .createClusterConfigurations() .map { (states, clusters) -> @@ -163,8 +159,7 @@ class SnapshotUpdater( .filter { it != emptyUpdateResult } .onErrorResume { e -> meterRegistry.counter( - ERRORS_TOTAL_METRIC, - Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services") + SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "services") ).increment() logger.error("Unable to process service changes", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) @@ -187,18 +182,17 @@ class SnapshotUpdater( } } catch (e: Throwable) { meterRegistry.counter( - ERRORS_TOTAL_METRIC, + SNAPSHOT_GROUP_ERROR_METRIC, Tags.of( SERVICE_TAG, group.serviceName, - OPERATION_TAG, "create-snapshot", - METRIC_EMITTER_TAG, "snapshot-updater" + OPERATION_TAG, "create-snapshot" ) ).increment() logger.error("Unable to create snapshot for group ${group.serviceName}", e) } } - private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot.update.duration.seconds") + private val updateSnapshotForGroupsTimer = meterRegistry.timer(SNAPSHOT_UPDATE_DURATION_METRIC) private fun updateSnapshotForGroups( groups: Collection, @@ -213,7 +207,7 @@ class SnapshotUpdater( } else if (result.xdsSnapshot != null && group.communicationMode == XDS) { updateSnapshotForGroup(group, result.xdsSnapshot) } else { - meterRegistry.counter(ERRORS_TOTAL_METRIC, Tags.of("type", "communication-mode")).increment() + meterRegistry.counter(COMMUNICATION_MODE_ERROR_METRIC).increment() logger.error( "Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " + "Handling Envoy with not supported communication mode should have been rejected before." + diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 912b6f57f..8cd281eee 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -5,8 +5,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured @@ -47,8 +46,8 @@ class GlobalStateChanges( .logSuppressedError("combineLatest() suppressed exception") .measureBuffer("global-service-changes-combinator", meterRegistry) .checkpoint("global-service-changes-emitted") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "global-service-changes-combinator") + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "combined") .metrics() } @@ -76,12 +75,13 @@ class GlobalStateChanges( .logSuppressedError("combineLatest() suppressed exception") .measureBuffer("global-service-changes-combine-latest", meterRegistry) .checkpoint("global-service-changes-emitted") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "global-service-changes") + .name(SERVICES_STATE_METRIC) .tag(CHECKPOINT_TAG, "emitted") + .metrics() .onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry) .publishOn(scheduler, 1) .checkpoint("global-service-changes-published") + .name(SERVICES_STATE_METRIC) .tag(CHECKPOINT_TAG, "published") .metrics() } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt index ee85877b8..34c1f6818 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteClusterStateChanges.kt @@ -3,8 +3,8 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import reactor.core.publisher.Flux class RemoteClusterStateChanges( @@ -16,7 +16,7 @@ class RemoteClusterStateChanges( .getChanges(properties.sync.pollingInterval) .startWith(MultiClusterState.empty()) .distinctUntilChanged() - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation") + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "cross-dc") .metrics() } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt index 092e0a077..050ba1c74 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt @@ -12,9 +12,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_CANCELLED_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_SECONDS_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_TOTAL_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_ERRORS_METRIC import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.lang.Integer.max @@ -72,11 +71,10 @@ class RemoteServices( .orTimeout(interval, TimeUnit.SECONDS) .exceptionally { meterRegistry.counter( - ERRORS_TOTAL_METRIC, + SERVICES_STATE_ERRORS_METRIC, Tags.of( CLUSTER_TAG, cluster, - OPERATION_TAG, "get-state", - METRIC_EMITTER_TAG, "cross-dc-synchronization" + OPERATION_TAG, "get-state" ) ).increment() logger.warn("Error synchronizing instances ${it.message}", it) @@ -90,11 +88,10 @@ class RemoteServices( cluster to instances } catch (e: Exception) { meterRegistry.counter( - ERRORS_TOTAL_METRIC, + SERVICES_STATE_ERRORS_METRIC, Tags.of( CLUSTER_TAG, cluster, - OPERATION_TAG, "get-instances", - METRIC_EMITTER_TAG, "cross-dc-synchronization" + OPERATION_TAG, "get-instances" ) ).increment() logger.warn("Failed fetching instances from $cluster", e) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt index 6a99f0fa0..1721c6c6a 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt @@ -5,11 +5,19 @@ import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.noop.NoopTimer val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER)) -const val REACTOR_METRIC = "reactor" -const val ERRORS_TOTAL_METRIC = "errors.total" -const val CONNECTIONS_METRIC = "connections" -const val REQUESTS_METRIC = "requests.total" -const val WATCH_METRIC = "watch" +const val REACTOR_METRIC = "reactor.stream.stats" +const val REACTOR_DISCARDED_METRIC = "reactor.stream.discarded" +const val SERVICES_STATE_METRIC = "services.state" +const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors.total" +const val SNAPSHOT_METRIC = "snapshot" +const val SNAPSHOT_UPDATE_DURATION_METRIC = "snapshot.update.duration.seconds" +const val SNAPSHOT_ERROR_METRIC = "snapshot.errors" +const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors.total" +const val COMMUNICATION_MODE_ERROR_METRIC = "communication.errors.total" +const val CONNECTIONS_METRIC = "connection.stats" +const val REQUESTS_METRIC = "request.stats" +const val WATCH_ERRORS_METRIC = "services.watch.errors.total" +const val WATCH_METRIC = "services.watch" const val ENVOY_CONTROL_WARM_UP_METRIC = "envoy.control.warmup.seconds" const val CROSS_DC_SYNC_METRIC = "cross.dc.synchronization" const val CROSS_DC_SYNC_CANCELLED_METRIC = "$CROSS_DC_SYNC_METRIC.cancelled.total" @@ -19,6 +27,7 @@ const val SIMPLE_CACHE_METRIC = "simple.cache.duration.seconds" const val PROTOBUF_CACHE_METRIC = "protobuf.cache.serialize.time" const val CACHE_GROUP_COUNT_METRIC = "cache.groups.count" const val SNAPSHOT_FACTORY_SECONDS_METRIC = "snapshot.factory.seconds" +const val CHANGE_WATCHER_METRIC = "group.change.watcher" const val CONNECTION_TYPE_TAG = "connection-type" const val STREAM_TYPE_TAG = "stream-type" @@ -27,7 +36,6 @@ const val WATCH_TYPE_TAG = "watch-type" const val DISCOVERY_REQ_TYPE_TAG = "discovery-request-type" const val METRIC_TYPE_TAG = "metric-type" const val METRIC_EMITTER_TAG = "metric-emitter" -const val SNAPSHOT_STATUS_TAG = "snapshot-status" const val UPDATE_TRIGGER_TAG = "update-trigger" const val SERVICE_TAG = "service" const val OPERATION_TAG = "operation" diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt index ab4806a09..0925f6528 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt @@ -52,8 +52,7 @@ fun Flux.measureBuffer( fun Flux.measureDiscardedItems(name: String, meterRegistry: MeterRegistry): Flux = this .doOnDiscard(Any::class.java) { meterRegistry.counter( - REACTOR_METRIC, - METRIC_TYPE_TAG, "discarded-items", + REACTOR_DISCARDED_METRIC, METRIC_EMITTER_TAG, name ).increment() } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt index 6c0a14939..fd8b9b4bf 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt @@ -57,13 +57,12 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIn import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.RequestPolicyMapper import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator import pl.allegro.tech.servicemesh.envoycontrol.utils.DirectScheduler +import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler -import pl.allegro.tech.servicemesh.envoycontrol.utils.any -import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_GROUP_ERROR_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.any import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers @@ -473,8 +472,13 @@ class SnapshotUpdaterTest { val snapshot = cache.getSnapshot(servicesGroup) assertThat(snapshot).isEqualTo(null) assertThat( - simpleMeterRegistry.find(ERRORS_TOTAL_METRIC) - .tags(Tags.of(SERVICE_TAG, "example-service", OPERATION_TAG, "create-snapshot", METRIC_EMITTER_TAG, "snapshot-updater")) + simpleMeterRegistry.find(SNAPSHOT_GROUP_ERROR_METRIC) + .tags( + Tags.of( + SERVICE_TAG, "example-service", + OPERATION_TAG, "create-snapshot" + ) + ) .counter()?.count() ).isEqualTo(1.0) } diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt index c2f43c8f0..d58fd836a 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt @@ -98,10 +98,10 @@ class ReactorUtilsTest { // then assertThat(received.await(2, TimeUnit.SECONDS)).isTrue() - val discardedItemsBeforeBackpressure = meterRegistry.find(REACTOR_METRIC) - .tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest-before")).counter()?.count() - val discardedItemsAfterBackpressure = meterRegistry.find(REACTOR_METRIC) - .tags(Tags.of(METRIC_TYPE_TAG, "discarded-items", METRIC_EMITTER_TAG, "latest")).counter()?.count() + val discardedItemsBeforeBackpressure = meterRegistry.find(REACTOR_DISCARDED_METRIC) + .tags(Tags.of(METRIC_EMITTER_TAG, "latest-before")).counter()?.count() + val discardedItemsAfterBackpressure = meterRegistry.find(REACTOR_DISCARDED_METRIC) + .tags(Tags.of(METRIC_EMITTER_TAG, "latest")).counter()?.count() /** * Published by range: (0..10) diff --git a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt index d54743e20..6b88dc8a3 100644 --- a/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt +++ b/envoy-control-runner/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/infrastructure/ControlPlaneConfig.kt @@ -42,11 +42,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.transformers.ServiceIns import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.EnvoyHttpFilters import pl.allegro.tech.servicemesh.envoycontrol.synchronization.GlobalStateChanges import pl.allegro.tech.servicemesh.envoycontrol.utils.CACHE_GROUP_COUNT_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG +import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_ERRORS_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_METRIC -import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG import reactor.core.scheduler.Schedulers import java.net.URI @@ -180,26 +178,14 @@ class ControlPlaneConfig { fun controlPlaneMetrics(meterRegistry: MeterRegistry): DefaultEnvoyControlMetrics { return DefaultEnvoyControlMetrics(meterRegistry = meterRegistry).also { - meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "added", WATCH_TYPE_TAG, "service"), it.servicesAdded) - meterRegistry.gauge( - WATCH_METRIC, - Tags.of(STATUS_TAG, "removed", WATCH_TYPE_TAG, "service"), - it.servicesRemoved - ) - meterRegistry.gauge( - WATCH_METRIC, - Tags.of(STATUS_TAG, "instance-changed", WATCH_TYPE_TAG, "service"), - it.instanceChanges - ) - meterRegistry.gauge( - WATCH_METRIC, - Tags.of(STATUS_TAG, "snapshot-changed", WATCH_TYPE_TAG, "service"), - it.snapshotChanges - ) + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "added"), it.servicesAdded) + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "removed"), it.servicesRemoved) + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "instance-changed"), it.instanceChanges) + meterRegistry.gauge(WATCH_METRIC, Tags.of(STATUS_TAG, "snapshot-changed"), it.snapshotChanges) meterRegistry.gauge(CACHE_GROUP_COUNT_METRIC, it.cacheGroupsCount) it.meterRegistry.more().counter( - ERRORS_TOTAL_METRIC, - Tags.of(METRIC_EMITTER_TAG, WATCH_METRIC, WATCH_TYPE_TAG, "service"), + WATCH_ERRORS_METRIC, + listOf(), it.errorWatchingServices ) } diff --git a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt index 39b01cc79..333edfe9f 100644 --- a/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt +++ b/envoy-control-source-consul/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/consul/services/ConsulServiceChanges.kt @@ -13,11 +13,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.server.ReadinessStateHandler import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState +import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.ENVOY_CONTROL_WARM_UP_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.measureDiscardedItems -import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG -import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.time.Duration @@ -57,11 +56,12 @@ class ConsulServiceChanges( ) .measureDiscardedItems("consul-service-changes", metrics.meterRegistry) .checkpoint("consul-service-changes-emitted") - .name(REACTOR_METRIC) - .tag(METRIC_EMITTER_TAG, "consul-service-changes") - .tag(CHECKPOINT_TAG, "emitted") + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "consul-emitted") + .metrics() .checkpoint("consul-service-changes-emitted-distinct") - .tag(CHECKPOINT_TAG, "distinct") + .name(SERVICES_STATE_METRIC) + .tag(CHECKPOINT_TAG, "consul-distinct") .metrics() .doOnCancel { logger.warn("Cancelling watching consul service changes") diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index b7c9fda0a..bd1497e65 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -20,8 +20,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.RDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN -import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC +import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG @@ -246,7 +246,8 @@ interface MetricsDiscoveryServerCallbacksTest { ).isNotNull assertThat( meterRegistry.get(metric) - .tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge().value() + .tags(Tags.of(STREAM_TYPE_TAG, type.name.lowercase(), CONNECTION_TYPE_TAG, "grpc")).gauge() + .value() .toInt() ).isEqualTo(value) } From dfbf7bf743ebaa8406d3c54edabbf1ae5137d301 Mon Sep 17 00:00:00 2001 From: Asya Dailidava <133115055+nastassia-dailidava@users.noreply.github.com> Date: Thu, 24 Oct 2024 14:50:26 +0200 Subject: [PATCH 2/2] Added possibility for configuring priorities per service (#436) * allegro-internal/flex-roadmap#814 Added service-specific zone priorities --- CHANGELOG.md | 3 ++ .../snapshot/SnapshotProperties.kt | 1 + .../endpoints/EnvoyEndpointsFactory.kt | 12 +++-- .../endpoints/EnvoyEndpointsFactoryTest.kt | 45 ++++++++++++++++++- 4 files changed, 57 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69301cd37..f082bf463 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ Lists all changes with user impact. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). +## [0.22.4] +- Added possibility for configuring priorities per service + ## [0.22.3] ### Changed - Changed names of some metrics diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt index 6c64b7405..228e6c188 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotProperties.kt @@ -157,6 +157,7 @@ class LoadBalancingProperties { var policy = Cluster.LbPolicy.LEAST_REQUEST var useKeysSubsetFallbackPolicy = true var priorities = LoadBalancingPriorityProperties() + var servicePriorities: Map = mapOf() } class CanaryProperties { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt index b6ab798e3..483911b19 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactory.kt @@ -188,7 +188,7 @@ class EnvoyEndpointsFactory( ?.map { createLbEndpoint(it, serviceInstances.serviceName, locality) } ?: emptyList()) - .setPriority(toEnvoyPriority(zone, locality)) + .setPriority(toEnvoyPriority(zone, locality, serviceInstances)) .build() } @@ -286,8 +286,14 @@ class EnvoyEndpointsFactory( false -> this } - private fun toEnvoyPriority(zone: String, locality: Locality): Int { - val zonePriorities = properties.loadBalancing.priorities.zonePriorities + private fun toEnvoyPriority(zone: String, locality: Locality, serviceInstances: ServiceInstances?): Int { + var zonePriorities = properties.loadBalancing.priorities.zonePriorities + serviceInstances?.let { + if (properties.loadBalancing.servicePriorities.containsKey(serviceInstances.serviceName)) { + zonePriorities = + properties.loadBalancing.servicePriorities[serviceInstances.serviceName]!!.zonePriorities + } + } return when (zonePriorities.isNotEmpty()) { true -> zonePriorities[currentZone]?.get(zone) ?: toEnvoyPriority(locality) false -> toEnvoyPriority(locality) diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt index de151d627..b3c787a0e 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/endpoints/EnvoyEndpointsFactoryTest.kt @@ -406,6 +406,40 @@ internal class EnvoyEndpointsFactoryTest { .anySatisfy { it.hasZoneWithPriority("DC3", 2) } } + @Test + fun `should create load assignment with service zone priorities`() { + val envoyEndpointsFactory = EnvoyEndpointsFactory( + snapshotPropertiesWithPriorities + ( + mapOf("DC2" to mapOf("DC1" to 1, "DC2" to 1, "DC3" to 1)), + mapOf("DC2" to mapOf("DC1" to 2, "DC2" to 2, "DC3" to 2)), + serviceName + ), + currentZone = "DC2" + ) + val loadAssignments = envoyEndpointsFactory.createLoadAssignment(setOf(serviceName), multiClusterStateDC2Local) + loadAssignments.assertHasLoadAssignment( + mapOf("DC1" to 2, "DC2" to 2, "DC3" to 2) + ) + } + + @Test + fun `should create load assignment with global zone priorities when no service config found`() { + val envoyEndpointsFactory = EnvoyEndpointsFactory( + snapshotPropertiesWithPriorities + ( + mapOf("DC2" to mapOf("DC1" to 1, "DC2" to 1, "DC3" to 1)), + mapOf("DC2" to mapOf("DC1" to 2, "DC2" to 2, "DC3" to 2)), + "another-service" + ), + currentZone = "DC2" + ) + val loadAssignments = envoyEndpointsFactory.createLoadAssignment(setOf(serviceName), multiClusterStateDC2Local) + loadAssignments.assertHasLoadAssignment( + mapOf("DC1" to 1, "DC2" to 1, "DC3" to 1) + ) + } + private fun List.assertHasLoadAssignment(map: Map) { assertThat(this) .isNotEmpty() @@ -453,13 +487,22 @@ internal class EnvoyEndpointsFactoryTest { assertThat(this.locality.zone).isEqualTo(zone) } - private fun snapshotPropertiesWithPriorities(priorities: Map>) = + private fun snapshotPropertiesWithPriorities( + priorities: Map>, + servicePriorities: Map> = mapOf(), + serviceName: String? = null + ) = SnapshotProperties().apply { loadBalancing = LoadBalancingProperties() .apply { this.priorities = LoadBalancingPriorityProperties().apply { zonePriorities = priorities } + serviceName?.let { + this.servicePriorities = mapOf(serviceName to LoadBalancingPriorityProperties().apply { + zonePriorities = servicePriorities + }) + } } }