From 68184397a22353cbdde74ec32db156d736fda788 Mon Sep 17 00:00:00 2001 From: Jillian Crossley Date: Thu, 17 Oct 2024 15:28:22 +0000 Subject: [PATCH] finagle/finagle-netty4: Add CPU tracking and active socket count for worker threads in EventLoopGroupTracker Problem We'd like to be able to see how much work different netty threads are doing, in order to size the pool correctly and potentially improve work distribution. Solution As an initial step, add stats for the cpu time for each thread and the number of active sockets it's assigned to EventLoopGroupTracker. These stats will only be collected if the `TrackWorkerPool.enableTracking` param is set to `true` on the server (false by default). Differential Revision: https://phabricator.twitter.biz/D1176906 --- CHANGELOG.rst | 12 ++++ .../netty4/ListeningServerBuilder.scala | 14 ++--- .../param/TrackWorkerPoolExcutionDelay.scala | 26 ++++---- ...cker.scala => EventLoopGroupTracker.scala} | 13 ++-- ...a => EventLoopGroupTrackingRunnable.scala} | 59 +++++++++++++++---- ....scala => EventLoopGroupTrackerTest.scala} | 34 ++++++----- 6 files changed, 105 insertions(+), 53 deletions(-) rename finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/{EventLoopGroupExecutionDelayTracker.scala => EventLoopGroupTracker.scala} (87%) rename finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/{EventLoopGroupExecutionDelayTrackingRunnable.scala => EventLoopGroupTrackingRunnable.scala} (56%) rename finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/{EventLoopGroupExecutionDelayTrackerTest.scala => EventLoopGroupTrackerTest.scala} (79%) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d021aa55ad..e6ae18b8ef 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,8 @@ Runtime Behavior Changes * finagle-mysql: (Testing behaviour change only) Updated mysql version expected by integration tests to 8.0.21. Added README in integration tests noting that this must exist for integration tests to run. ``PHAB_ID=D1152235`` +* finagle-netty4: `EventLoopGroupTracker` (previously named `EventLoopGroupExecutionDelayTracker`) now collects + stats cpu_time_ms and active_sockets per netty worker thread. New Features @@ -19,6 +21,16 @@ New Features * finagle-mysql: Added support for LONG_BLOB data type. ``PHAB_ID=D1152247`` + +Breaking API Changes +~~~~~~~~~~~~~~~~~~~~ + +* finagle-netty4: `c.t.f.netty4.threading.EventLoopGroupExecutionDelayTracker` has been renamed to + `EventLoopGroupTracker`, `c.t.f.netty4.threading.TrackWorkerPoolExecutionDelay` has been renamed to + `TrackWorkerPoolExcutionDelay`, `c.t.f.netty4.param.TrackWorkerPoolExecutionDelay` has been renamed + to `TrackWorkerPool`. These changes reflect the tracker's new functionality of collecting metrics + and data other than the execution delay (see Runtime Behaviour Changes). ``PHAB_ID=D1176906`` + 24.5.0 ------ diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala index 4f754b807b..3766369eff 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/ListeningServerBuilder.scala @@ -4,7 +4,7 @@ import com.twitter.finagle.ListeningServer import com.twitter.finagle.Stack import com.twitter.finagle.netty4.channel.Netty4FramedServerChannelInitializer import com.twitter.finagle.netty4.channel.Netty4RawServerChannelInitializer -import com.twitter.finagle.netty4.threading.EventLoopGroupExecutionDelayTracker +import com.twitter.finagle.netty4.threading.EventLoopGroupTracker import com.twitter.finagle.param.Stats import com.twitter.finagle.param.Timer import com.twitter.finagle.server.Listener @@ -221,13 +221,13 @@ private[finagle] class ListeningServerBuilder( def boundAddress: SocketAddress = ch.localAddress() - private[this] val workerPoolExecutionDelayTrackingSettings = - params[param.TrackWorkerPoolExecutionDelay] - if (workerPoolExecutionDelayTrackingSettings.enableTracking) { - EventLoopGroupExecutionDelayTracker.track( + private[this] val workerPoolTrackingSettings = + params[param.TrackWorkerPool] + if (workerPoolTrackingSettings.enableTracking) { + EventLoopGroupTracker.track( params[param.WorkerPool].eventLoopGroup, - workerPoolExecutionDelayTrackingSettings.trackingTaskPeriod, - workerPoolExecutionDelayTrackingSettings.threadDumpThreshold, + workerPoolTrackingSettings.trackingTaskPeriod, + workerPoolTrackingSettings.threadDumpThreshold, params[Stats].statsReceiver, s"finagle/netty-4/delayTracking/${boundAddress}", Logger.get("com.twitter.finagle.netty4.Netty4Listener.threadDelay") diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala index 50c9b3bb67..0e295c986a 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/param/TrackWorkerPoolExcutionDelay.scala @@ -4,31 +4,31 @@ import com.twitter.finagle.Stack import com.twitter.util.Duration /** - * Control for tracking execution delay in the worker threads for a listener. This is intended - * to be enabled for perf tracking, and may impact performance as it adds tracking runnables to - * the event executors. Stats will be written to the stats receiver for the listener under - * workerpool/deviation_ms. When thread dumping is enabled, all logging is done at the warning - * level. + * Control for tracking execution delay, cpu time, and active sockets in the worker threads for a + * listener. This is intended to be enabled for perf tracking, and may impact performance as it adds + * tracking runnables to the event executors. Stats will be written to the stats receiver for the + * listener under workerpool/deviation_ms. When thread dumping is enabled, all logging is done at + * the warning level. * - * @param enableTracking If true enable thread pause tracking. + * @param enableTracking If true enable thread tracking. * @param trackingTaskPeriod The fixed time scheduling window for the execution delay runnable. * @param threadDumpThreshold If > 0ms, enable stack dumping of threads when they have been delayed for * more than the threshold. Thresholds of < 10ms will not work as * expected as the underlying executors do not use high resolution timers. */ -case class TrackWorkerPoolExecutionDelay( +case class TrackWorkerPool( enableTracking: Boolean, trackingTaskPeriod: Duration, threadDumpThreshold: Duration) { - def mk(): (TrackWorkerPoolExecutionDelay, Stack.Param[TrackWorkerPoolExecutionDelay]) = - (this, TrackWorkerPoolExecutionDelay.trackWorkerPoolExecutionDelayParam) + def mk(): (TrackWorkerPool, Stack.Param[TrackWorkerPool]) = + (this, TrackWorkerPool.trackWorkerPoolParam) } -object TrackWorkerPoolExecutionDelay { - implicit val trackWorkerPoolExecutionDelayParam: Stack.Param[TrackWorkerPoolExecutionDelay] = - Stack.Param[TrackWorkerPoolExecutionDelay]( - TrackWorkerPoolExecutionDelay( +object TrackWorkerPool { + implicit val trackWorkerPoolParam: Stack.Param[TrackWorkerPool] = + Stack.Param[TrackWorkerPool]( + TrackWorkerPool( false, Duration.fromMilliseconds(0), Duration.fromMilliseconds(0) diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTracker.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTracker.scala similarity index 87% rename from finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTracker.scala rename to finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTracker.scala index 62b713c4ed..a2398cf574 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTracker.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTracker.scala @@ -8,7 +8,7 @@ import io.netty.channel.EventLoopGroup import java.util.concurrent.ScheduledThreadPoolExecutor import scala.reflect.internal.util.WeakHashSet -object EventLoopGroupExecutionDelayTracker { +object EventLoopGroupTracker { private[threading] val trackedEventLoopGroups = new WeakHashSet[EventLoopGroup]() @@ -19,8 +19,8 @@ object EventLoopGroupExecutionDelayTracker { * instrumentation. * * @param nettyEventLoopGroup The netty EventLoopGroup for which thread delays should be captured - * @param injectionPeriod The fixed delay for the runnables added to the EventLoopGroup threads to - * capture thread execution delays. + * @param trackingTaskPeriod The fixed delay for the runnables added to the EventLoopGroup threads to + * capture thread tracking information. * @param dumpThreshold If > 0ms log seen delay for threads and the stack trace for threads at * the when the threads exceed the dumpThreshold delay. * @param statsReceiver The stats receiver under which execution delay stats should be reported. @@ -29,7 +29,7 @@ object EventLoopGroupExecutionDelayTracker { */ def track( nettyEventLoopGroup: EventLoopGroup, - injectionPeriod: Duration, + trackingTaskPeriod: Duration, dumpThreshold: Duration, statsReceiver: StatsReceiver, dumpThreadPoolName: String, @@ -50,10 +50,11 @@ object EventLoopGroupExecutionDelayTracker { val stat = statsReceiver.stat("workerpool", "deviation_ms") while (workerIter.hasNext) { val loop = workerIter.next() - new EventLoopGroupExecutionDelayTrackingRunnable( + new EventLoopGroupTrackingRunnable( loop, - injectionPeriod, + trackingTaskPeriod, stat, + statsReceiver, dumpThreshold, dumpThresholdExceededThreadPool, logger diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackingRunnable.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala similarity index 56% rename from finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackingRunnable.scala rename to finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala index e87ca6edad..26c0ee9a4a 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackingRunnable.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala @@ -1,15 +1,23 @@ package com.twitter.finagle.netty4.threading import com.twitter.finagle.stats.Stat +import com.twitter.finagle.stats.StatsReceiver import com.twitter.logging.Logger -import com.twitter.util.{Duration, Time} +import com.twitter.util.Duration +import com.twitter.util.Time +import io.netty.channel.SingleThreadEventLoop import io.netty.util.concurrent.EventExecutor -import java.util.concurrent.{Callable, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit} +import java.lang.management.ManagementFactory +import java.util.concurrent.Callable +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.ScheduledThreadPoolExecutor +import java.util.concurrent.TimeUnit -private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( - eventExecutor: EventExecutor, - injectionPeriod: Duration, +private[threading] class EventLoopGroupTrackingRunnable( + executor: EventExecutor, + taskTrackingPeriod: Duration, delayStat: Stat, + statsReceiver: StatsReceiver, threadDumpThreshold: Duration, dumpWatchThreadPool: Option[ScheduledThreadPoolExecutor], dumpLogger: Logger) @@ -22,10 +30,10 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( // the one thread in the executor. This is currently how netty is implemented // but this class will stop working if netty changes their implementation private[this] val executorThread: Thread = { - if (eventExecutor.inEventLoop()) { + if (executor.inEventLoop()) { Thread.currentThread() } else { - eventExecutor + executor .submit(new Callable[Thread] { override def call(): Thread = { Thread.currentThread() @@ -34,15 +42,26 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( } } + private[this] val threadId = executorThread.getId private[this] val threadName: String = executorThread.getName + private[this] var scheduledExecutionTime: Time = Time.now private[this] var watchTask: Option[ScheduledFuture[_]] = None + private[this] val threadMXBean = ManagementFactory.getThreadMXBean + + private[this] val scopedStatsReceiver = statsReceiver.scope(threadName) + private[this] val activeSocketsStat = scopedStatsReceiver.stat("active_sockets") + private[this] val cpuTimeCounter = scopedStatsReceiver.counter("cpu_time_ms") + + // Accessed only from within the same netty thread + private[this] var prevCPUTimeMs = 0L + setWatchTask() - eventExecutor.scheduleWithFixedDelay( + executor.scheduleWithFixedDelay( this, 0, - injectionPeriod.inMillis, + taskTrackingPeriod.inMillis, java.util.concurrent.TimeUnit.MILLISECONDS ) @@ -57,12 +76,26 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( dumpLogger.warning( s"THREAD: $threadName EXECUTION DELAY is greater than ${threadDumpThreshold.inMillis}ms, was ${executionDelay.inMillis}ms" ) - } delayStat.add(executionDelay.inMillis) - scheduledExecutionTime = Time.now.plus(injectionPeriod) + scheduledExecutionTime = Time.now.plus(taskTrackingPeriod) setWatchTask() + + var numActiveSockets = 0 + // This will be nio event loop or epoll event loop. + executor.asInstanceOf[SingleThreadEventLoop].registeredChannelsIterator().forEachRemaining { + channel => + if (channel.isActive) { + numActiveSockets += 1 + } + } + activeSocketsStat.add(numActiveSockets) + + // `getThreadCPUTime` returns the time in nanoseconds. + val currentCPUTimeMs = threadMXBean.getThreadCpuTime(threadId) / 1000000 + cpuTimeCounter.incr(currentCPUTimeMs - prevCPUTimeMs) + prevCPUTimeMs = currentCPUTimeMs } private[this] def setWatchTask(): Unit = { @@ -71,7 +104,7 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( dumpWatchThreadPool.get.schedule( new Runnable { override def run(): Unit = { - var builder = new StringBuilder() + val builder = new StringBuilder() builder .append( s"THREAD: $threadName EXECUTION DELAY exceeded configured dump threshold. Thread stack trace:\n" @@ -80,7 +113,7 @@ private[threading] class EventLoopGroupExecutionDelayTrackingRunnable( dumpLogger.warning(builder.toString()) } }, - (injectionPeriod + threadDumpThreshold).inMillis, + (taskTrackingPeriod + threadDumpThreshold).inMillis, TimeUnit.MILLISECONDS ) ) diff --git a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackerTest.scala b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala similarity index 79% rename from finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackerTest.scala rename to finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala index 634b85d87f..e555d7d809 100644 --- a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupExecutionDelayTrackerTest.scala +++ b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala @@ -18,14 +18,14 @@ import org.scalatestplus.mockito.MockitoSugar import scala.collection.JavaConverters._ import org.scalatest.funsuite.AnyFunSuite -class EventLoopGroupExecutionDelayTrackerTest +class EventLoopGroupTrackerTest extends AnyFunSuite with Eventually with IntegrationPatience with MockitoSugar { test( - "EventLoopGroupExecutionDelayTracker with thread dump disabled records stats but no threads created and no logging" + "EventLoopGroupTracker with thread dump disabled records stats but no threads created and no logging" ) { val statsReceiver = new InMemoryStatsReceiver @@ -37,7 +37,7 @@ class EventLoopGroupExecutionDelayTrackerTest val eventLoopGroup = new NioEventLoopGroup(1, executor) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.Zero, @@ -55,8 +55,14 @@ class EventLoopGroupExecutionDelayTrackerTest // Force ourselves to wait Thread.sleep(300) - // we should have deviation stats + // we should have deviation, cpu time, and active sockets stats assert(statsReceiver.stats.get(Seq("workerpool", "deviation_ms")).isDefined) + assert( + statsReceiver.counters + .get(Seq("finagle_thread_delay_tracking_test-1", "cpu_time_ms")).isDefined) + assert( + statsReceiver.stats + .get(Seq("finagle_thread_delay_tracking_test-1", "active_sockets")).isDefined) // we should have no threads with the name no_threads_expected Thread.getAllStackTraces.keySet().asScala.foreach { thread: Thread => @@ -68,7 +74,7 @@ class EventLoopGroupExecutionDelayTrackerTest } test( - "EventLoopGroupExecutionDelayTracker with thread dump enabled records stats creates watch threads and logs dumps" + "EventLoopGroupTracker with thread dump enabled records stats creates watch threads and logs dumps" ) { val statsReceiver = new InMemoryStatsReceiver @@ -80,7 +86,7 @@ class EventLoopGroupExecutionDelayTrackerTest val eventLoopGroup = new NioEventLoopGroup(1, executor) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.fromMilliseconds(10), @@ -115,10 +121,10 @@ class EventLoopGroupExecutionDelayTrackerTest } test( - "validate EventLoopGroupExecutionDelayTracker track guards against multiple submissions of the same EventLoopGroup" + "validate EventLoopGroupTracker track guards against multiple submissions of the same EventLoopGroup" ) { // clear our tracking set first as other tests added to the set - EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.clear() + EventLoopGroupTracker.trackedEventLoopGroups.clear() val statsReceiver = new InMemoryStatsReceiver @@ -131,7 +137,7 @@ class EventLoopGroupExecutionDelayTrackerTest val eventLoopGroup = new NioEventLoopGroup(1, executor) val eventLoopGroup2 = new NioEventLoopGroup(1, executor) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.Zero, @@ -139,9 +145,9 @@ class EventLoopGroupExecutionDelayTrackerTest "execution_delay_test_pool", mockLogger ) - assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 1) + assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 1) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup2, Duration.fromMilliseconds(50), Duration.Zero, @@ -149,9 +155,9 @@ class EventLoopGroupExecutionDelayTrackerTest "execution_delay_test_pool", mockLogger ) - assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 2) + assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 2) - EventLoopGroupExecutionDelayTracker.track( + EventLoopGroupTracker.track( eventLoopGroup, Duration.fromMilliseconds(50), Duration.Zero, @@ -159,7 +165,7 @@ class EventLoopGroupExecutionDelayTrackerTest "execution_delay_test_pool", mockLogger ) - assert(EventLoopGroupExecutionDelayTracker.trackedEventLoopGroups.size == 2) + assert(EventLoopGroupTracker.trackedEventLoopGroups.size == 2) } }