Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 Migrated metrics to prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Sep 27, 2024
1 parent 58be4d6 commit df11439
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.envoyproxy.controlplane.server.callback.SnapshotCollectingCallback
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
Expand Down Expand Up @@ -221,10 +222,12 @@ class ControlPlane private constructor(
nioEventLoopExecutor
)
)
.bossEventLoopGroup(NioEventLoopGroup(
properties.server.nioBossEventLoopThreadCount,
nioBossEventLoopExecutor
))
.bossEventLoopGroup(
NioEventLoopGroup(
properties.server.nioBossEventLoopThreadCount,
nioBossEventLoopExecutor
)
)
.channelType(NioServerSocketChannel::class.java)
.executor(grpcServerExecutor)
.keepAliveTime(properties.server.netty.keepAliveTime.toMillis(), TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -410,7 +413,12 @@ class ControlPlane private constructor(
}

private fun meterExecutor(executor: ExecutorService, executorServiceName: String) {
ExecutorServiceMetrics(executor, executorServiceName, executorServiceName, emptySet())
ExecutorServiceMetrics(
executor,
executorServiceName,
"envoy-control",
Tags.of("executor", executorServiceName)
)
.bindTo(meterRegistry)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks

import com.google.common.net.InetAddresses.increment
import io.envoyproxy.controlplane.cache.Resources
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import java.util.concurrent.atomic.AtomicInteger

class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks {
Expand Down Expand Up @@ -34,9 +36,9 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
.map { type -> type to AtomicInteger(0) }
.toMap()

meterRegistry.gauge("grpc.all-connections", connections)
meterRegistry.gauge("grpc.connections", Tags.of("connection-type", "all"), connections)
connectionsByType.forEach { (type, typeConnections) ->
meterRegistry.gauge("grpc.connections.${type.name.toLowerCase()}", typeConnections)
meterRegistry.gauge("grpc.connections", Tags.of("connection-type", type.name.lowercase()), typeConnections)
}
}

Expand All @@ -51,15 +53,21 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
}

override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) {
meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}")
meterRegistry.counter(
"grpc.requests.count",
Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "total")
)
.increment()
}

override fun onV3StreamDeltaRequest(
streamId: Long,
request: V3DeltaDiscoveryRequest
) {
meterRegistry.counter("grpc.requests.${StreamType.fromTypeUrl(request.typeUrl).name.toLowerCase()}.delta")
meterRegistry.counter(
"grpc.requests.count",
Tags.of("type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "metric-type", "delta")
)
.increment()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode
Expand Down Expand Up @@ -67,7 +68,12 @@ class EnvoySnapshotFactory(
endpoints = endpoints,
properties = properties.outgoingPermissions
)
sample.stop(meterRegistry.timer("snapshot-factory.new-snapshot.time"))
sample.stop(
meterRegistry.timer(
"snapshot-factory.seconds",
Tags.of("operation", "new-snapshot", "type", "global")
)
)

return snapshot
}
Expand Down Expand Up @@ -155,7 +161,12 @@ class EnvoySnapshotFactory(
val groupSample = Timer.start(meterRegistry)

val newSnapshotForGroup = newSnapshotForGroup(group, globalSnapshot)
groupSample.stop(meterRegistry.timer("snapshot-factory.get-snapshot-for-group.time"))
groupSample.stop(
meterRegistry.timer(
"snapshot-factory.seconds",
Tags.of("operation", "new-snapshot", "type", "group")
)
)
return newSnapshotForGroup
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot
import io.envoyproxy.controlplane.cache.SnapshotCache
import io.envoyproxy.controlplane.cache.v3.Snapshot
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
Expand Down Expand Up @@ -50,9 +51,12 @@ class SnapshotUpdater(
// step 2: only watches groups. if groups change we use the last services state and update those groups
groups().subscribeOn(globalSnapshotScheduler)
)
.measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2)
.measureBuffer("snapshot.updater.count.total", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name("snapshot-updater-merged").metrics()
.name("snapshot.updater.count.total")
.tag("status", "merged")
.tag("type", "global")
.metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -87,28 +91,40 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot-updater-groups-published", meterRegistry)
.measureBuffer("snapshot-updater.count.total", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot-updater-groups-published").metrics()
.name("snapshot-updater.count.total")
.tag("type", "groups")
.tag("status", "published").metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.onErrorResume { e ->
meterRegistry.counter("snapshot-updater.groups.updates.errors").increment()
meterRegistry.counter(
"snapshot-updater.errors.total",
Tags.of("type", "groups")
)
.increment()
logger.error("Unable to process new group", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
}
}

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name("snapshot-updater-services-sampled").metrics()
.onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry)
.name("snapshot-updater.count.total")
.tag("type", "services")
.tag("status", "sampled")
.metrics()
.onBackpressureLatestMeasured("snapshot-updater.count.total", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater-services-published", meterRegistry)
.measureBuffer("snapshot-updater.count.total", meterRegistry) // todo
.checkpoint("snapshot-updater-services-published")
.name("snapshot-updater-services-published").metrics()
.name("snapshot-updater.count.total")
.tag("type", "services")
.tag("status", "published")
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand All @@ -135,14 +151,19 @@ class SnapshotUpdater(
}
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter("snapshot-updater.services.updates.errors").increment()
meterRegistry.counter(
"snapshot-updater.errors.total",
Tags.of("type", "services")
).increment()
logger.error("Unable to process service changes", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
}
}

private fun snapshotTimer(serviceName: String) = if (properties.metrics.cacheSetSnapshot) {
meterRegistry.timer("snapshot-updater.set-snapshot.$serviceName.time")
meterRegistry.timer(
"simple-cache.duration.seconds", Tags.of("service", serviceName, "operation", "set-snapshot")
)
} else {
noopTimer
}
Expand All @@ -154,12 +175,15 @@ class SnapshotUpdater(
cache.setSnapshot(group, groupSnapshot)
}
} catch (e: Throwable) {
meterRegistry.counter("snapshot-updater.services.${group.serviceName}.updates.errors").increment()
meterRegistry.counter(
"snapshot-updater.errors.total", Tags.of("service", group.serviceName)
).increment()
logger.error("Unable to create snapshot for group ${group.serviceName}", e)
}
}

private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot-updater.update-snapshot-for-groups.time")
private val updateSnapshotForGroupsTimer =
meterRegistry.timer("snapshot-updater.duration.seconds", Tags.of("type", "groups"))

private fun updateSnapshotForGroups(
groups: Collection<Group>,
Expand All @@ -174,10 +198,13 @@ class SnapshotUpdater(
} else if (result.xdsSnapshot != null && group.communicationMode == XDS) {
updateSnapshotForGroup(group, result.xdsSnapshot)
} else {
meterRegistry.counter("snapshot-updater.communication-mode.errors").increment()
logger.error("Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
" Please report this to EC developers.")
meterRegistry.counter("snapshot-updater.errors.total", Tags.of("type", "communication-mode"))
.increment()
logger.error(
"Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
" Please report this to EC developers."
)
}
}
return results.then(Mono.fromCallable {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.servicemesh.envoycontrol.synchronization

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.Locality
Expand Down Expand Up @@ -29,9 +30,10 @@ class RemoteServices(
fun getChanges(interval: Long): Flux<MultiClusterState> {
val aclFlux: Flux<MultiClusterState> = Flux.create({ sink ->
scheduler.scheduleWithFixedDelay({
meterRegistry.timer("sync-dc.get-multi-cluster-states.time").recordCallable {
getChanges(sink::next, interval)
}
meterRegistry.timer("cross-dc-synchronization.seconds", Tags.of("operation", "get-multi-cluster-state"))
.recordCallable {
getChanges(sink::next, interval)
}
}, 0, interval, TimeUnit.SECONDS)
}, FluxSink.OverflowStrategy.LATEST)
return aclFlux.doOnCancel {
Expand Down Expand Up @@ -59,7 +61,10 @@ class RemoteServices(
.thenApply { servicesStateFromCluster(cluster, it) }
.orTimeout(interval, TimeUnit.SECONDS)
.exceptionally {
meterRegistry.counter("cross-dc-synchronization.$cluster.state-fetcher.errors").increment()
meterRegistry.counter(
"cross-dc-synchronization.errors",
Tags.of("cluster", cluster, "operation", "get-cluster-state")
).increment()
logger.warn("Error synchronizing instances ${it.message}", it)
clusterStateCache[cluster]
}
Expand All @@ -70,7 +75,10 @@ class RemoteServices(
val instances = controlPlaneInstanceFetcher.instances(cluster)
cluster to instances
} catch (e: Exception) {
meterRegistry.counter("cross-dc-synchronization.$cluster.instance-fetcher.errors").increment()
meterRegistry.counter(
"cross-dc-synchronization.errors",
Tags.of("cluster", cluster, "operation", "get-cluster-state")
).increment()
logger.warn("Failed fetching instances from $cluster", e)
cluster to emptyList()
}
Expand All @@ -80,7 +88,11 @@ class RemoteServices(
cluster: String,
state: ServicesState
): ClusterState {
meterRegistry.counter("cross-dc-service-update-$cluster").increment()
meterRegistry.counter(
"cross-dc-synchronization",
Tags.of("operation", "service-update", "cluster", cluster)
)
.increment()
val clusterState = ClusterState(
state.removeServicesWithoutInstances(),
Locality.REMOTE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.servicemesh.envoycontrol.metrics

import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
Expand All @@ -25,13 +26,26 @@ class ThreadPoolMetricTest {
controlPlane.start()

// then
val allMeterNames = meterRegistry.meters.map { it.id.name }
val requiredMeterNames = listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap {
listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size")
val metricNames = listOf("executor.completed", "executor.active", "executor.queued", "executor.pool.size")
.map { "envoy-control.$it" }

val metricMap = listOf(
"grpc-server-worker",
"grpc-worker-event-loop",
"snapshot-update",
"group-snapshot"
).associateWith { metricNames }

assertThat(metricMap.entries).allSatisfy {
assertThat(it.value.all { metricName ->
meterRegistry.meters.any { meter ->
meter.id.name == metricName && meter.id.tags.contains(
Tag.of("executor", it.key)
)
}
}).isTrue()
}

assertThat(allMeterNames).containsAll(requiredMeterNames)

// and
controlPlane.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener
import io.envoyproxy.envoy.config.route.v3.RetryPolicy
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -468,7 +469,8 @@ class SnapshotUpdaterTest {
val snapshot = cache.getSnapshot(servicesGroup)
assertThat(snapshot).isEqualTo(null)
assertThat(
simpleMeterRegistry.find("snapshot-updater.services.example-service.updates.errors")
simpleMeterRegistry.find("snapshot-updater.errors.total")
.tags(Tags.of("service", "example-service"))
.counter()?.count()
).isEqualTo(1.0)
}
Expand Down
Loading

0 comments on commit df11439

Please sign in to comment.