From 23c13aa3aefee6b6c62e74688cb5154fdb8bc394 Mon Sep 17 00:00:00 2001 From: "nastassia.dailidava" Date: Fri, 18 Oct 2024 20:52:13 +0200 Subject: [PATCH] allegro-internal/flex-roadmap#819 returned reactor metrics --- .../envoycontrol/utils/ReactorUtils.kt | 53 ++++++++++++++++++- .../envoycontrol/utils/ReactorUtilsTest.kt | 2 - 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt index 1cfc0457a..ab4806a09 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.reactivestreams.Subscription import org.slf4j.LoggerFactory import reactor.core.Disposable @@ -11,6 +12,7 @@ import reactor.core.scheduler.Scheduler import reactor.core.scheduler.Schedulers import java.time.Duration import java.util.concurrent.TimeUnit +import kotlin.streams.asSequence private val logger = LoggerFactory.getLogger("pl.allegro.tech.servicemesh.envoycontrol.utils.ReactorUtils") private val defaultScheduler by lazy { Schedulers.newSingle("reactor-utils-scheduler") } @@ -110,7 +112,12 @@ private fun measureQueueSubscriptionBuffer( name: String, meterRegistry: MeterRegistry ) { - logger.info("subscription $subscription name: $name meterRegistry: $meterRegistry") + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name), + subscription, + queueSubscriptionBufferExtractor + ) } private fun measureScannableBuffer( @@ -119,7 +126,49 @@ private fun measureScannableBuffer( innerSources: Int, meterRegistry: MeterRegistry ) { - logger.info("scannable $scannable name: $name innerSources: $innerSources meterRegistry: $meterRegistry") + val buffered = scannable.scan(Scannable.Attr.BUFFERED) + if (buffered == null) { + logger.error( + "Cannot register metric $REACTOR_METRIC 'with $METRIC_EMITTER_TAG: $name'. Buffer size not available. " + + "Use measureBuffer() only on supported reactor operators" + ) + return + } + + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, name), + scannable, + scannableBufferExtractor + ) + + /** + * Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual + * buffer (that is controlled by `prefetch` parameter) size. Instead it returns simply number of connected sources. + * + * To access actual buffer size, we need to extract it from inners(). We don't know how many sources will + * be available, so it must be stated explicitly as innerSources parameter. + */ + for (i in 0 until innerSources) { + meterRegistry.gauge( + REACTOR_METRIC, + Tags.of(METRIC_TYPE_TAG, "buffer-size", METRIC_EMITTER_TAG, "${(name)}_$i"), + scannable, + innerBufferExtractor(i) + ) + } +} + +private val scannableBufferExtractor = { s: Scannable -> s.scan(Scannable.Attr.BUFFERED)?.toDouble() ?: -1.0 } +private fun innerBufferExtractor(index: Int) = { s: Scannable -> + s.inners().asSequence() + .elementAtOrNull(index) + ?.let(scannableBufferExtractor) + ?: -1.0 +} + +private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> -> + s.size.toDouble() } sealed class ParallelizableScheduler diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt index 111a1ee7b..c2f43c8f0 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtilsTest.kt @@ -3,7 +3,6 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.simple.SimpleMeterRegistry import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.fail import org.testcontainers.shaded.org.awaitility.Awaitility @@ -13,7 +12,6 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.function.BiFunction -@Disabled class ReactorUtilsTest { @Test