diff --git a/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala b/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala index dc189a4ae71..d291bef6b4b 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala @@ -291,3 +291,15 @@ object ExceptionStatsHandler { lazy val default = ExceptionStatsHandler(StatsFilter.DefaultExceptions) } } + +/** + * A class eligible for configuring a + * [[com.twitter.finagle.stats.HistogramCounterFactory]] throughout finagle servers + * and clients. + */ +private[twitter] case class HistogramCounterFactory( + histogramCounterFactoryOpt: Option[stats.HistogramCounterFactory]) +object HistogramCounterFactory { + implicit val param: Stack.Param[HistogramCounterFactory] = + Stack.Param(HistogramCounterFactory(None)) +} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/service/StatsFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/service/StatsFilter.scala index 90285ac21ea..d8a915d3187 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/service/StatsFilter.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/service/StatsFilter.scala @@ -45,7 +45,7 @@ object StatsFilter { * Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.service.StatsFilter]]. */ def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] = - new Stack.Module7[ + new Stack.Module8[ param.Stats, param.ExceptionStatsHandler, param.ResponseClassifier, @@ -53,6 +53,7 @@ object StatsFilter { Now, param.MetricBuilders, param.StandardStats, + param.HistogramCounterFactory, ServiceFactory[Req, Rep] ] { val role: Stack.Role = StatsFilter.role @@ -65,6 +66,7 @@ object StatsFilter { now: Now, metrics: param.MetricBuilders, standardStats: param.StandardStats, + histogramCounterFactory: param.HistogramCounterFactory, next: ServiceFactory[Req, Rep] ): ServiceFactory[Req, Rep] = { val param.Stats(statsReceiver) = _stats @@ -78,7 +80,8 @@ object StatsFilter { _param.unit, now.nowOrDefault(_param.unit), metrics.registry, - standardStats.standardStats + standardStats.standardStats, + histogramCounterFactory.histogramCounterFactoryOpt ).andThen(next) } } @@ -163,6 +166,10 @@ object StatsFilter { * * @param metricsRegistry an optional [MetricBuilderRegistry] set by stack parameter * for injecting metrics and instrumenting top-line expressions + * + * @param histogramCounterFactoryOpt an optional [HistogramCounterFactory] that, if present, will + * be used to record a stat for requests received over a 100ms + * period. This can be used to see "burstiness" of requests. */ class StatsFilter[Req, Rep] private[service] ( statsReceiver: StatsReceiver, @@ -171,7 +178,8 @@ class StatsFilter[Req, Rep] private[service] ( timeUnit: TimeUnit, now: () => Long, metricsRegistry: Option[CoreMetricsRegistry] = None, - standardStats: StandardStats = Disabled) + standardStats: StandardStats = Disabled, + histogramCounterFactoryOpt: Option[HistogramCounterFactory] = None) extends SimpleFilter[Req, Rep] { /** @@ -278,6 +286,15 @@ class StatsFilter[Req, Rep] private[service] ( statsReceiver.addGauge(Descriptions.pending, "pending") { outstandingRequestCount.sum() } + private[this] val requestsHistogramCounterOpt = histogramCounterFactoryOpt match { + case Some(histogramCounterFactory) => + Some( + histogramCounterFactory( + Seq("requests"), + StatsFrequency.HundredMilliSecondly, + statsReceiver)) + case None => None + } private[this] def isIgnorableResponse(rep: Try[Rep]): Boolean = rep match { case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Ignorable) @@ -302,6 +319,12 @@ class StatsFilter[Req, Rep] private[service] ( stats.recordStats(request, response, duration) case None => // no-op } + + requestsHistogramCounterOpt match { + case Some(requestsHistogramCounter) => + requestsHistogramCounter.incr() + case None => // no-op + } } } } diff --git a/finagle-core/src/test/scala/com/twitter/finagle/service/StatsFilterTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/service/StatsFilterTest.scala index 1e5015349f2..be63f1dade1 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/service/StatsFilterTest.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/service/StatsFilterTest.scala @@ -116,6 +116,42 @@ class StatsFilterTest extends AnyFunSuite { } } + test("requests histogram counter stat") { + Time.withCurrentTimeFrozen { tc => + val timer = new MockTimer + val sr = new InMemoryStatsReceiver() + val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis) + val filter = + new StatsFilter[String, String]( + sr, + ResponseClassifier.Default, + StatsFilter.DefaultExceptions, + TimeUnit.SECONDS, + () => Time.now.inSeconds, + histogramCounterFactoryOpt = Some(histogramCounterFactory) + ) + var promise = new Promise[String] + val svc = filter.andThen(new Service[String, String] { + def apply(request: String): Promise[String] = promise + }) + svc("1") + svc("1") + svc("1") + promise.setValue("done") + tc.advance(100.millis) + timer.tick() + + promise = new Promise[String] + svc("1") + svc("1") + promise.setValue("done") + tc.advance(100.millis) + timer.tick() + + assert(sr.stats(Seq("requests", "hundredMilliSecondly")) == Seq(3, 2)) + } + } + test("report exceptions") { val (promise, receiver, statsService) = getService() diff --git a/finagle-stats-core/src/test/scala/com/twitter/finagle/stats/HistogramCounterTest.scala b/finagle-stats-core/src/test/scala/com/twitter/finagle/stats/HistogramCounterTest.scala new file mode 100644 index 00000000000..12e256257f5 --- /dev/null +++ b/finagle-stats-core/src/test/scala/com/twitter/finagle/stats/HistogramCounterTest.scala @@ -0,0 +1,118 @@ +package com.twitter.finagle.stats + +import com.twitter.conversions.DurationOps._ +import com.twitter.util.MockTimer +import com.twitter.util.Time +import org.scalatest.funsuite.AnyFunSuite + +class HistogramCounterTest extends AnyFunSuite { + + test("Records stat at given frequency") { + Time.withCurrentTimeFrozen { tc => + val timer = new MockTimer() + val statsReceiver = new InMemoryStatsReceiver + val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis) + val histogramCounter = + histogramCounterFactory( + Seq("foo", "bar"), + StatsFrequency.HundredMilliSecondly, + statsReceiver + ) + histogramCounter.incr(5) + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")).isEmpty) + histogramCounter.incr() + + tc.advance(100.millis) + timer.tick() + + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6)) + + tc.advance(100.millis) + timer.tick() + + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6, 0)) + + histogramCounter.incr(2) + + tc.advance(50.millis) + timer.tick() + + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6, 0)) + histogramCounter.incr(3) + + tc.advance(50.millis) + timer.tick() + + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6, 0, 5)) + } + } + + test("Normalizes recorded stat to the elapsed time since last recording") { + Time.withCurrentTimeFrozen { tc => + val timer = new MockTimer() + val statsReceiver = new InMemoryStatsReceiver + val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis) + val histogramCounter = + histogramCounterFactory( + Seq("foo", "bar"), + StatsFrequency.HundredMilliSecondly, + statsReceiver + ) + histogramCounter.incr(5) + histogramCounter.incr(10) + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")).isEmpty) + + // Our task was slow to execute on the timer :( + tc.advance(150.millis) + timer.tick() + + // We have 15 requests in 1.5 windows, so 10 requests in 1 window. + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(10)) + + histogramCounter.incr(12) + + tc.advance(300.millis) + timer.tick() + + // We have 12 requests in 3 windows, so 4 requests in 1 window. + assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(10, 4)) + } + } + + test("Returns the same histogramCounter object for equivalent names") { + val timer = new MockTimer() + val metrics = Metrics.createDetached() + val statsReceiver = new MetricsStatsReceiver(metrics) + + val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis) + + val histogramCounter1 = + histogramCounterFactory(Seq("foo", "bar"), StatsFrequency.HundredMilliSecondly, statsReceiver) + val histogramCounter2 = + histogramCounterFactory(Seq("foo", "bar"), StatsFrequency.HundredMilliSecondly, statsReceiver) + val histogramCounter3 = + histogramCounterFactory(Seq("foo/bar"), StatsFrequency.HundredMilliSecondly, statsReceiver) + + val scopedStatsReceiver = statsReceiver.scope("foo") + + val histogramCounter4 = + histogramCounterFactory(Seq("bar"), StatsFrequency.HundredMilliSecondly, scopedStatsReceiver) + + assert(histogramCounter1 eq histogramCounter2) + assert(histogramCounter1 eq histogramCounter3) + assert(histogramCounter1 eq histogramCounter4) + } + + test("Doesn't schedule recording of stats after close is called") { + Time.withCurrentTimeFrozen { tc => + val timer = new MockTimer() + assert(timer.tasks.size == 0) + val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis) + assert(timer.tasks.size == 1) + histogramCounterFactory.close() + tc.advance(100.millis) + timer.tick() + assert(timer.tasks.size == 0) + } + } +}