From 8e36a30b34072dde4aa920156725b317b24f7155 Mon Sep 17 00:00:00 2001 From: Mikhail Bezoyan Date: Fri, 18 Oct 2024 14:05:23 +0000 Subject: [PATCH] finagle-netty4: Track burstiness of cpu usage by netty threads Problem We need to know the burstiness of cpu utilization, pending tasks and registered channels within every metric collection interval Solution Add these metrics: - cpu-util shows the distribution of cpu utilization within each minutely interval - pending_tasks shows the distribution of the length of pending tasks within each minutely interval - all_sockets - shows the distribution of registered sockets Differential Revision: https://phabricator.twitter.biz/D1177719 --- CHANGELOG.rst | 2 + .../EventLoopGroupTrackingRunnable.scala | 63 ++++++++++++++----- .../threading/EventLoopGroupTrackerTest.scala | 2 +- 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e6ae18b8ef..4d02b9560c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,8 @@ Runtime Behavior Changes Added README in integration tests noting that this must exist for integration tests to run. ``PHAB_ID=D1152235`` * finagle-netty4: `EventLoopGroupTracker` (previously named `EventLoopGroupExecutionDelayTracker`) now collects stats cpu_time_ms and active_sockets per netty worker thread. +* finagle-netty4: `EventLoopGroupTracker` now collects the distribution of cpu utilization by each netty thread + and all_sockets instead of active_sockets. ``PHAB_ID=D1177719`` New Features diff --git a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala index 26c0ee9a4a..55416540f2 100644 --- a/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala +++ b/finagle-netty4/src/main/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackingRunnable.scala @@ -1,5 +1,8 @@ package com.twitter.finagle.netty4.threading +import com.twitter.finagle.stats.HistogramFormat +import com.twitter.finagle.stats.MetricBuilder +import com.twitter.finagle.stats.MetricBuilder.HistogramType import com.twitter.finagle.stats.Stat import com.twitter.finagle.stats.StatsReceiver import com.twitter.logging.Logger @@ -51,11 +54,35 @@ private[threading] class EventLoopGroupTrackingRunnable( private[this] val threadMXBean = ManagementFactory.getThreadMXBean private[this] val scopedStatsReceiver = statsReceiver.scope(threadName) - private[this] val activeSocketsStat = scopedStatsReceiver.stat("active_sockets") + private[this] val pendingTasksStat = scopedStatsReceiver.stat( + MetricBuilder( + metricType = HistogramType, + name = Seq("pending_tasks"), + percentiles = Array[Double](0.25, 0.50, 0.75, 0.90, 0.95, 0.99), + histogramFormat = HistogramFormat.FullSummary + ) + ) + private[this] val allSocketsStat = scopedStatsReceiver.stat( + MetricBuilder( + metricType = HistogramType, + name = Seq("all_sockets"), + percentiles = Array[Double](0.50), + histogramFormat = HistogramFormat.FullSummary + ) + ) private[this] val cpuTimeCounter = scopedStatsReceiver.counter("cpu_time_ms") + private[this] val cpuUtilStat = scopedStatsReceiver.stat( + MetricBuilder( + metricType = HistogramType, + name = Seq("cpu_util"), + percentiles = Array[Double](0.25, 0.50, 0.75, 0.90, 0.95, 0.99), + histogramFormat = HistogramFormat.FullSummary + ) + ) // Accessed only from within the same netty thread - private[this] var prevCPUTimeMs = 0L + private[this] var prevCPUTimeNs = 0L + private[this] var prevWallTimeNs = 0L setWatchTask() executor.scheduleWithFixedDelay( @@ -71,7 +98,8 @@ private[threading] class EventLoopGroupTrackingRunnable( watchTask.get.cancel(false) } - val executionDelay = Time.now - scheduledExecutionTime + val now = Time.now + val executionDelay = now - scheduledExecutionTime if (threadDumpEnabled && executionDelay.inMillis > threadDumpThreshold.inMillis) { dumpLogger.warning( s"THREAD: $threadName EXECUTION DELAY is greater than ${threadDumpThreshold.inMillis}ms, was ${executionDelay.inMillis}ms" @@ -79,23 +107,28 @@ private[threading] class EventLoopGroupTrackingRunnable( } delayStat.add(executionDelay.inMillis) - scheduledExecutionTime = Time.now.plus(taskTrackingPeriod) + scheduledExecutionTime = now.plus(taskTrackingPeriod) setWatchTask() - var numActiveSockets = 0 // This will be nio event loop or epoll event loop. - executor.asInstanceOf[SingleThreadEventLoop].registeredChannelsIterator().forEachRemaining { - channel => - if (channel.isActive) { - numActiveSockets += 1 - } - } - activeSocketsStat.add(numActiveSockets) + val loop = executor.asInstanceOf[SingleThreadEventLoop] + allSocketsStat.add(loop.registeredChannels()) + pendingTasksStat.add(loop.pendingTasks()) // `getThreadCPUTime` returns the time in nanoseconds. - val currentCPUTimeMs = threadMXBean.getThreadCpuTime(threadId) / 1000000 - cpuTimeCounter.incr(currentCPUTimeMs - prevCPUTimeMs) - prevCPUTimeMs = currentCPUTimeMs + val currCPUTimeNs = threadMXBean.getThreadCpuTime(threadId) + val cpuTime = currCPUTimeNs - prevCPUTimeNs + + val currWallTimeNs = System.nanoTime() + val wallTimeNs = currWallTimeNs - prevWallTimeNs + cpuTimeCounter.incr(TimeUnit.NANOSECONDS.toMillis(cpuTime)) + if (prevWallTimeNs != 0 && wallTimeNs != 0) { + cpuUtilStat.add( + 10000 * cpuTime / wallTimeNs + ) + } + prevCPUTimeNs = currCPUTimeNs + prevWallTimeNs = currWallTimeNs } private[this] def setWatchTask(): Unit = { diff --git a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala index e555d7d809..b28ec0fdd6 100644 --- a/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala +++ b/finagle-netty4/src/test/scala/com/twitter/finagle/netty4/threading/EventLoopGroupTrackerTest.scala @@ -62,7 +62,7 @@ class EventLoopGroupTrackerTest .get(Seq("finagle_thread_delay_tracking_test-1", "cpu_time_ms")).isDefined) assert( statsReceiver.stats - .get(Seq("finagle_thread_delay_tracking_test-1", "active_sockets")).isDefined) + .get(Seq("finagle_thread_delay_tracking_test-1", "all_sockets")).isDefined) // we should have no threads with the name no_threads_expected Thread.getAllStackTraces.keySet().asScala.foreach { thread: Thread =>