diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala index fcdf892510..cff7187c4a 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/DefaultThreadPoolExecutor.scala @@ -9,13 +9,16 @@ import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit -private[twitter] class DefaultThreadPoolExecutor(poolSize: Int, stats: StatsReceiver) +private[twitter] class DefaultThreadPoolExecutor( + poolSize: Int, + maxQueueLen: Int, + stats: StatsReceiver) extends ThreadPoolExecutor( poolSize /*corePoolSize*/, poolSize /*maximumPoolSize*/, 0L /*keepAliveTime*/, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue[Runnable]() /*workQueue*/, + new LinkedBlockingQueue[Runnable](maxQueueLen) /*workQueue*/, new NamedPoolThreadFactory("finagle/offload", makeDaemons = true) /*threadFactory*/, new RunsOnNettyThread(stats.counter("not_offloaded_tasks"))) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala index e74ae46a2a..275c3c566d 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadFuturePool.scala @@ -45,10 +45,11 @@ object OffloadFuturePool { lazy val configuredPool: Option[FuturePool] = { val workers = numWorkers.get.orElse(if (auto()) Some(com.twitter.jvm.numProcs().ceil.toInt) else None) + val maxQueueLen = maxQueueLength() workers.map { threads => val stats = FinagleStatsReceiver.scope("offload_pool") - val pool = new OffloadFuturePool(OffloadThreadPool(threads, stats), stats) + val pool = new OffloadFuturePool(OffloadThreadPool(threads, maxQueueLen, stats), stats) // Start sampling the offload delay if the interval isn't Duration.Top. if (statsSampleInterval().isFinite && statsSampleInterval() > Duration.Zero) { diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala index 5778491ac5..92bca80b3a 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPool.scala @@ -10,21 +10,29 @@ private object OffloadThreadPool { private[this] val logger = Logger.get() /** Construct an `ExecutorService` with the proper thread names and metrics */ - def apply(poolSize: Int, stats: StatsReceiver): ExecutorService = { + def apply(poolSize: Int, maxQueueLen: Int, stats: StatsReceiver): ExecutorService = { LoadService[OffloadThreadPoolFactory]() match { case Seq() => logger.info("Constructing the default OffloadThreadPool executor service") - new DefaultThreadPoolExecutor(poolSize, stats) + new DefaultThreadPoolExecutor( + poolSize = poolSize, + maxQueueLen = maxQueueLen, + stats = stats + ) case Seq(factory) => logger.info(s"Constructing OffloadThreadPool using $factory") - factory.newPool(poolSize, stats) + factory.newPool(poolSize, maxQueueLen, stats) case multiple => logger.error( s"Found multiple `OffloadThreadPoolFactory`s: $multiple. " + s"Using the default implementation.") - new DefaultThreadPoolExecutor(poolSize, stats) + new DefaultThreadPoolExecutor( + poolSize = poolSize, + maxQueueLen = maxQueueLen, + stats = stats + ) } } } diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala index 2dd8a013a0..fa3266dd83 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/OffloadThreadPoolFactory.scala @@ -14,12 +14,14 @@ abstract class OffloadThreadPoolFactory { /** Construct a new `ExecutorService` * - * @param poolSize The size of the pool as configured by the finagle flags - * `com.twitter.finagle.offload.numWorkers` and `com.twitter.finagle.offload.auto` - * @param stats `StatsReceiver` to use for observability. + * @param poolSize The size of the pool as configured by the finagle flags + * `com.twitter.finagle.offload.numWorkers` and `com.twitter.finagle.offload.auto` + * @param maxQueueLen The maximum length of the queue in the pool as configured by the finagle flag + * `com.twitter.finagle.offload.maxQueueLen` + * @param stats `StatsReceiver` to use for observability. */ - def newPool(poolSize: Int, stats: StatsReceiver): ExecutorService + def newPool(poolSize: Int, maxQueueLen: Int, stats: StatsReceiver): ExecutorService /** Implementors should make the `toString` method meaningful and it will be used in log entries */ override def toString: String diff --git a/finagle-core/src/main/scala/com/twitter/finagle/offload/maxQueueLength.scala b/finagle-core/src/main/scala/com/twitter/finagle/offload/maxQueueLength.scala new file mode 100644 index 0000000000..b234d93a35 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/offload/maxQueueLength.scala @@ -0,0 +1,10 @@ +package com.twitter.finagle.offload + +import com.twitter.app.GlobalFlag + +object maxQueueLength + extends GlobalFlag[Int]( + default = Int.MaxValue, + help = + "Experimental flag. Sets the maximum number of jobs in the thread pool in the offload filter" + )