From d6872e11a389f387280401b6d743814baf354fe3 Mon Sep 17 00:00:00 2001 From: Tristan Pollitt Date: Thu, 21 Nov 2024 18:33:07 +0000 Subject: [PATCH] finagle/finagle-zipkin-core: Introduce local tracing implementation with duration filter and sampling This phab introduces `DurationFilteringTracer`, which can be used to collect traces locally and write them to a file, with a configured sample rate and span duration threshold. The implementation converts Finagle spans to the zipkin2 json format, so that they can be uploaded to the Zipkin ui or Grafana for analysis. For duration filtering, we need to keep sampled spans in memory until either a parent span completes with a duration > than the configured threshold, or the topmost span of the trace is received. I use a Caffeine cache for this, so that we won't run out of memory if the sampling rate is set too high, or if there are orphaned spans. Differential Revision: https://phabricator.twitter.biz/D1182242 --- build.sbt | 7 +- finagle-zipkin-core/src/main/scala/BUILD | 2 + .../zipkin/core/DurationFilteringTracer.scala | 143 +++++++++++++++++ .../core/DurationFilteringTracerSpec.scala | 148 ++++++++++++++++++ 4 files changed, 299 insertions(+), 1 deletion(-) create mode 100644 finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracer.scala create mode 100644 finagle-zipkin-core/src/test/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracerSpec.scala diff --git a/build.sbt b/build.sbt index 9b046cdb416..593087b68f3 100644 --- a/build.sbt +++ b/build.sbt @@ -82,6 +82,11 @@ val scroogeLibs = thriftLibs ++ Seq("com.twitter" %% "scrooge-core" % releaseVer val lz4Lib = "org.lz4" % "lz4-java" % "1.8.0" +val zipkinLibs = Seq( + "io.zipkin.java" % "zipkin" % "1.28.1", + "io.zipkin.zipkin2" % "zipkin" % "2.22.1" +) + def util(which: String) = "com.twitter" %% ("util-" + which) % releaseVersion excludeAll (ExclusionRule(organization = "junit"), @@ -482,7 +487,7 @@ lazy val finagleZipkinCore = Project( libraryDependencies ++= Seq( util("codec"), util("core"), - util("stats")) ++ scroogeLibs ++ jacksonLibs + util("stats")) ++ scroogeLibs ++ jacksonLibs ++ zipkinLibs ).dependsOn(finagleCore % "compile->compile;test->test", finagleThrift) lazy val finagleZipkinScribe = Project( diff --git a/finagle-zipkin-core/src/main/scala/BUILD b/finagle-zipkin-core/src/main/scala/BUILD index 5da8bb99959..fe0491bc504 100644 --- a/finagle-zipkin-core/src/main/scala/BUILD +++ b/finagle-zipkin-core/src/main/scala/BUILD @@ -12,6 +12,8 @@ scala_library( "3rdparty/jvm/com/fasterxml/jackson/core:jackson-core", "3rdparty/jvm/com/fasterxml/jackson/core:jackson-databind", "3rdparty/jvm/com/fasterxml/jackson/module:jackson-module-scala", + "3rdparty/jvm/io/zipkin/java:zipkin-core", + "3rdparty/jvm/io/zipkin/java:zipkin2", "3rdparty/jvm/org/apache/thrift:libthrift", "finagle/finagle-core/src/main", "finagle/finagle-thrift/src/main/java", diff --git a/finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracer.scala b/finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracer.scala new file mode 100644 index 00000000000..10d6a641059 --- /dev/null +++ b/finagle-zipkin-core/src/main/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracer.scala @@ -0,0 +1,143 @@ +package com.twitter.finagle.zipkin.core + +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.RemovalCause +import com.twitter.finagle.stats.NullStatsReceiver +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.tracing.Record +import com.twitter.finagle.tracing.TraceId +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.BitMask +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.Multiplier +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeFalse +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeTrue +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.salt +import com.twitter.util.Duration +import com.twitter.util.Future +import java.io.FileOutputStream +import java.lang.ThreadLocal +import java.util.concurrent.ConcurrentMap +import org.apache.thrift.TSerializer +import scala.util.Random +import zipkin.internal.ApplyTimestampAndDuration +import zipkin.Codec +import zipkin2.codec.SpanBytesDecoder +import java.lang.{Long => JLong} +import scala.util.Using +import zipkin2.codec.SpanBytesEncoder + +object DurationFilteringTracer { + // Use same sampling params here as in com.twitter.finagle.zipkin.core.Sampler + private val Multiplier = (1 << 24).toFloat + private val BitMask = (Multiplier - 1).toInt + private val salt = new Random().nextInt() + + private val SomeTrue = Some(true) + private val SomeFalse = Some(false) +} + +class DurationFilteringTracer( + duration: Duration, + samplingRate: Float, + outputPath: String, + maxInFlightTraces: Int = 2000, + statsReceiver: StatsReceiver = NullStatsReceiver) + extends RawZipkinTracer { + + if (samplingRate < 0 || samplingRate > 1) { + throw new IllegalArgumentException( + "Sample rate not within the valid range of 0-1, was " + samplingRate + ) + } + + private[this] val persistedSpansCounter = statsReceiver.counter("persistedSpans") + private[this] val evictions = statsReceiver.counter("evictions") + + private[this] val thriftSerialiser = ThreadLocal.withInitial(() => new TSerializer()) + + // map from TraceID -> spans within that trace + private[this] val spanRoots: ConcurrentMap[Long, List[zipkin.Span]] = Caffeine + .newBuilder() + .asInstanceOf[Caffeine[Long, List[zipkin.Span]]] + .maximumSize(maxInFlightTraces) + .evictionListener((_: Long, v: Seq[zipkin.Span], _: RemovalCause) => { + evictions.incr() + }) + .build[Long, List[zipkin.Span]].asMap() + + // sentinel value that will get set for a trace ID when we've seen at least one span in that trace + // with duration >= threshold + private[this] val durationThresholdMetSentinel = List[zipkin.Span]() + + val cacheSizeGauge = statsReceiver.addGauge("cacheSize")(spanRoots.size().floatValue()) + + override def record(record: Record): Unit = { + if (sampleTrace(record.traceId).contains(true)) { + super.record(record) + } + } + + override def sampleTrace(traceId: TraceId): Option[Boolean] = { + // Same as in com.twitter.finagle.zipkin.core.Sampler, except here we don't check if + // the traceId has already had Some(false) set, since we want to consider all traceIds + if (((JLong.hashCode(traceId.traceId.toLong) ^ salt) & BitMask) < samplingRate * Multiplier) + SomeTrue + else + SomeFalse + } + + override def getSampleRate: Float = samplingRate + + override def sendSpans(spans: Seq[Span]): Future[Unit] = { + spans.map(convertToZipkinSpan).foreach { span => + if (span.duration >= duration.inMicroseconds) { + val existingSpansForTrace = spanRoots.put(span.traceId, durationThresholdMetSentinel) + persistSpans(span, existingSpansForTrace) + } else { + val existingSpansForTrace = spanRoots.compute( + span.traceId, + { + case (_, null) => List(span) // this is the first span for the trace + case (_, v) if v.eq(durationThresholdMetSentinel) => + durationThresholdMetSentinel // duration threshold has already been met + case (_, v) => + v.+:(span) // there are existing spans, but duration threshold not yet met + } + ) + + if (existingSpansForTrace.eq(durationThresholdMetSentinel)) { + persistSpans(span, List.empty) + } + } + } + + Future.Done + } + + override def isActivelyTracing(traceId: TraceId): Boolean = sampleTrace(traceId).contains(true) + + private[this] def convertToZipkinSpan(span: Span): zipkin.Span = { + val serialisedBytes = thriftSerialiser.get().serialize(span.toThrift) + val zipkinV1ThriftSpan = zipkin.Codec.THRIFT.readSpan(serialisedBytes) + ApplyTimestampAndDuration.apply(zipkinV1ThriftSpan) + } + + private[this] def persistSpans(parent: zipkin.Span, children: Seq[zipkin.Span]): Unit = { + val spansToPersist = if (children != null) children :+ parent else Seq(parent) + persistedSpansCounter.incr(spansToPersist.size) + Using(new FileOutputStream(outputPath, true)) { fileOutputStream => + spansToPersist.foreach { span => + val converted = convertV1SpanToV2(span) + fileOutputStream.write( + SpanBytesEncoder.JSON_V2 + .encode(converted)) + fileOutputStream.write('\n') + } + fileOutputStream.flush() + } + } + + private[this] def convertV1SpanToV2(span: zipkin.Span): zipkin2.Span = { + val spanBytesV1 = Codec.THRIFT.writeSpan(span) + SpanBytesDecoder.THRIFT.decodeOne(spanBytesV1) + } +} diff --git a/finagle-zipkin-core/src/test/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracerSpec.scala b/finagle-zipkin-core/src/test/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracerSpec.scala new file mode 100644 index 00000000000..056aa85dceb --- /dev/null +++ b/finagle-zipkin-core/src/test/scala/com/twitter/finagle/zipkin/core/DurationFilteringTracerSpec.scala @@ -0,0 +1,148 @@ +package com.twitter.finagle.zipkin.core +package unit + +import org.scalatest.FunSuite +import com.twitter.conversions.DurationOps._ +import com.twitter.finagle.tracing.Trace +import com.twitter.finagle.util.DefaultTimer +import com.twitter.util.Await +import com.twitter.util.Duration +import com.twitter.util.Future +import com.twitter.util.FuturePool +import com.twitter.util.Futures +import com.twitter.util.Promise +import com.twitter.util.Return +import java.io.File +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import org.scalatest.concurrent.Eventually +import org.scalatest.time.Seconds +import org.scalatest.time.Span +import scala.io.Source +import zipkin2.codec.SpanBytesDecoder + +class DurationFilteringTracerSpec extends FunSuite with Eventually { + + implicit val config: PatienceConfig = PatienceConfig(timeout = scaled(Span(5, Seconds))) + private val futurePool = FuturePool(Executors.newFixedThreadPool(100)) + private implicit val timer = DefaultTimer + + test("Test only persists spans with duration greater than 100 milliseconds") { + val tempFile = File.createTempFile("traces", ".json") + val tracer = new DurationFilteringTracer(100.millis, 1F, tempFile.getPath) + + Trace.letTracers(Seq(tracer)) { + val future1 = Trace.traceLocalFuture("test") { + taskWithDuration(10.millis) + } + + val future2 = Trace.traceLocalFuture("test2") { + taskWithDuration(100.millis) + } + + Await.ready(Futures.join(future1, future2)) + } + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size == 1) + assert(parsedSpans.head.name() == "test2") + } + } + + test("Test persists all child spans where parent has duration greater than 100 milliseconds") { + val tempFile = File.createTempFile("traces", ".json") + val tracer = new DurationFilteringTracer(100.millis, 1F, tempFile.getPath) + + Trace.letTracers(Seq(tracer)) { + val future1 = Trace.traceLocalFuture("test") { + taskWithDuration(10.millis) + } + + val future2 = Trace.traceLocalFuture("test2") { + taskWithDuration( + 100.millis, + childTask = Futures + .join( + Trace.traceLocalFuture("test3") { + taskWithDuration(10.millis) + }, + Trace.traceLocalFuture("test4") { + taskWithDuration(10.millis) + }).unit + ) + } + + Await.ready(Futures.join(future1, future2)) + } + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size == 3) + assert(parsedSpans.exists(_.name() == "test2")) + assert(parsedSpans.exists(_.name() == "test3")) + assert(parsedSpans.exists(_.name() == "test4")) + } + } + + test("Keeps limited number of spans in memory") { + val tempFile = File.createTempFile("traces", ".json") + val tracer = + new DurationFilteringTracer(100.millis, 1F, tempFile.getPath, maxInFlightTraces = 99) + + val latch = new CountDownLatch(100) + val barrier = Promise[Unit]() + + val tasks = Trace.letTracers(Seq(tracer)) { + for (i <- 0.until(100)) yield { + Trace.traceLocalFuture(s"task$i")(futurePool { // top level tasks + Trace.traceLocalFuture(s"subtask$i")(Future()) // bottom level tasks + latch.countDown() + Await.ready(barrier) + }) + } + } + + latch.await() + Thread.sleep(100) // ensures tasks exceed 100ms threshold + barrier.update(Return()) + Await.ready(Future.join(tasks)) + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size < 200) // all 100 top level spans at most 99 bottom level spans + assert(parsedSpans.count(_.name().contains("subtask")) < 100) + } + } + + test("Only collects limited number of spans based on sample rate") { + val tempFile = File.createTempFile("traces", ".json") + + val tracer = new DurationFilteringTracer(0.millis, 0.1F, tempFile.getPath) + + Trace.letTracers(Seq(tracer)) { + for (i <- 0 until 100) { + Trace.traceLocalFuture(s"task$i")(Future()) + } + } + + eventually { + val parsedSpans = parseSpansFromFile(tempFile.getPath) + assert(parsedSpans.size < 30) + } + } + + private[this] def taskWithDuration( + duration: Duration, + childTask: => Future[Unit] = Future() + ): Future[Unit] = { + Future.Unit.delayed(duration).flatMap(_ => childTask) + } + + private[this] def parseSpansFromFile(filename: String): Seq[zipkin2.Span] = { + Source + .fromFile(filename) + .getLines() + .map(line => SpanBytesDecoder.JSON_V2.decodeOne(line.getBytes())) + }.toSeq +}