Skip to content

Commit

Permalink
finagle: TCP_NODELAY and SO_REUSEADDR are now stack params
Browse files Browse the repository at this point in the history
Problem

There is no convinient and finagle-idiomatic way of configuring socket options
for both listeners and transportes.

Solution

Make `TCP_NODELAY` and `SO_REUSEADDR` stack params so users could easily override
them. Keep the defaults unchanged (`TCP_NODELAY=true`, `SO_REUSEADDR=true`).

RB_ID=773824
  • Loading branch information
vkostyukov authored and jenkins committed Dec 3, 2015
1 parent ef6ada9 commit 200f8b5
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 36 deletions.
15 changes: 9 additions & 6 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ New Features
as an address. ``RB_ID=758862``

* finagle-core: `c.t.f.service.Retries` now supports adding delay between each automatic retry.
This is configured via the Retries.Budget. ``RB_ID=768883``
This is configured via the `Retries.Budget`. ``RB_ID=768883``

* finagle-core: FailureAccrualFactory now uses a FailureAccrualPolicy to determine when to
mark an endpoint dead. The default policy, FailureAccrualPolicy.consecutiveFailures(),
mimicks existing functionality, and FailureAccrualPolicy.successRate() operates on the
exponentially weighted average success rate over a window of requests.``RB_ID=756921``

* finagle-core: Introduce `c.t.f.transport.Transport.Options` to configure transport-level options
(i.e., socket options `TCP_NODELAY` and `SO_REUSEADDR`). ``RB_ID=773824``

* finagle-netty4: Hello World. Introduce a `Listener` for Netty 4.1. This is still considered beta.
``RB_ID=718688``
Expand Down Expand Up @@ -100,11 +108,6 @@ Breaking API Changes
Bug Fixes
~~~~~~~~~

* finagle-core: FailureAccrualFactory now uses a FailureAccrualPolicy to determine when to
mark an endpoint dead. The default policy, FailureAccrualPolicy.consecutiveFailures(),
mimicks existing functionality, and FailureAccrualPolicy.successRate() operates on the
exponentially weighted average success rate over a window of requests.``RB_ID=756921``

* finagle-thrift: `c.t.f.ThriftRichClient` scoped stats label is now threaded
properly through `newServiceIface` ``RB_ID=760157``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ object Netty3Listener {
case Transport.Verbose(true) => Some(ChannelSnooper(label)(logger.log(Level.INFO, _, _)))
case _ => None
}
val Transport.Options(noDelay, reuseAddr) = params[Transport.Options]

val opts = new mutable.HashMap[String, Object]()
opts += "soLinger" -> (0: java.lang.Integer)
opts += "reuseAddress" -> java.lang.Boolean.TRUE
opts += "child.tcpNoDelay" -> java.lang.Boolean.TRUE
opts += "reuseAddress" -> (reuseAddr: java.lang.Boolean)
opts += "child.tcpNoDelay" -> (noDelay: java.lang.Boolean)
for (v <- backlog) opts += "backlog" -> (v: java.lang.Integer)
for (v <- sendBufSize) opts += "child.sendBufferSize" -> (v: java.lang.Integer)
for (v <- recvBufSize) opts += "child.receiveBufferSize" -> (v: java.lang.Integer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ object Netty3Transporter {
case Transport.Verbose(true) => Some(ChannelSnooper(label)(logger.log(Level.INFO, _, _)))
case _ => None
}
val Transport.Options(noDelay, reuseAddr) = params[Transport.Options]

val opts = new mutable.HashMap[String, Object]()
opts += "connectTimeoutMillis" -> ((connectTimeout + compensation).inMilliseconds: java.lang.Long)
opts += "tcpNoDelay" -> java.lang.Boolean.TRUE
opts += "reuseAddress" -> java.lang.Boolean.TRUE
opts += "tcpNoDelay" -> (noDelay: java.lang.Boolean)
opts += "reuseAddress" -> (reuseAddr: java.lang.Boolean)
for (v <- keepAlive) opts += "keepAlive" -> (v: java.lang.Boolean)
for (s <- sendBufSize) opts += "sendBufferSize" -> (s: java.lang.Integer)
for (s <- recvBufSize) opts += "receiveBufferSize" -> (s: java.lang.Integer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,4 @@ object Listener {
object TrafficClass {
implicit val param = Stack.Param(TrafficClass(None))
}

/**
* Configures TCP_NODELAY on the server socket
* @param b `true` disables Nagle's algorithm.
* @note This param only applies to netty's >= 4.
* Netty 3's socket options are still configured by
* the [[com.twitter.finagle.netty3.Netty3Listener]] instance.
*/
case class NoDelay(b: Boolean) {
def mk(): (NoDelay, Stack.Param[NoDelay]) =
(this, NoDelay.param)
}

object NoDelay {
implicit val param = Stack.Param(NoDelay(true))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ object Transport {
* $param the verbosity of a `Transport`. Transport activity is
* written to [[com.twitter.finagle.param.Logger]].
*/
case class Verbose(b: Boolean) {
case class Verbose(enabled: Boolean) {
def mk(): (Verbose, Stack.Param[Verbose]) =
(this, Verbose.param)
}
object Verbose {
implicit val param = Stack.Param(Verbose(false))
implicit val param = Stack.Param(Verbose(enabled = false))
}

/**
Expand All @@ -171,12 +171,31 @@ object Transport {
implicit val param = Stack.Param(TLSServerEngine(None))
}

/**
* $param the options (i.e., socket options) of a `Transport`.
*
* @param noDelay enables or disables `TCP_NODELAY` (Nagle's algorithm)
* option on a transport socket (`noDelay = true` means
* disabled). Default is `true` (disabled).
*
* @param reuseAddr enables or disables `SO_REUSEADDR` option on a
* transport socket. Default is `true`.
*/
case class Options(noDelay: Boolean, reuseAddr: Boolean) {
def mk(): (Options, Stack.Param[Options]) = (this, Options.param)
}

object Options {
implicit val param: Stack.Param[Options] =
Stack.Param(Options(noDelay = true, reuseAddr = true))
}

/**
* Serializes the object stream from a `Transport` into a
* [[com.twitter.io.Writer]].
*
* The serialization function `f` can return `Future.None` to interrupt the
* stream to faciliate using the transport with multiple writers and vice
* stream to facilitate using the transport with multiple writers and vice
* versa.
*
* Both transport and writer are unmanaged, the caller must close when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Netty3TransporterTest extends FunSpec with MockitoSugar {
assert(transporter.channelOptions.get("connectTimeoutMillis").get ==
inputParams[Transporter.ConnectTimeout].howlong.inMilliseconds +
inputParams[LatencyCompensation.Compensation].howlong.inMilliseconds)
assert(transporter.channelSnooper.nonEmpty == inputParams[Transport.Verbose].b)
assert(transporter.channelSnooper.nonEmpty == inputParams[Transport.Verbose].enabled)
}

it("newPipeline handles unresolved InetSocketAddresses") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ private[finagle] case class Netty4Listener[In, Out](
// transport params
private[this] val Transport.Liveness(_, _, keepAlive) = params[Transport.Liveness]
private[this] val Transport.BufferSizes(sendBufSize, recvBufSize) = params[Transport.BufferSizes]
private[this] val Transport.Options(noDelay, reuseAddr) = params[Transport.Options]

// socket params
// listener params
private[this] val Listener.Backlog(backlog) = params[Listener.Backlog]
private[this] val Listener.NoDelay(noDelay) = params[Listener.NoDelay]


/**
* Listen for connections and apply the `serveTransport` callback on connected [[Transport transports]].
Expand Down Expand Up @@ -90,7 +89,7 @@ private[finagle] case class Netty4Listener[In, Out](
bootstrap.childOption[JBool](ChannelOption.TCP_NODELAY, noDelay)
//bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //todo: investigate pooled allocator CSL-2089
//bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
bootstrap.option[JBool](ChannelOption.SO_REUSEADDR, true)
bootstrap.option[JBool](ChannelOption.SO_REUSEADDR, reuseAddr)
bootstrap.option[JInt](ChannelOption.SO_LINGER, 0)
backlog.foreach(bootstrap.option[JInt](ChannelOption.SO_BACKLOG, _))
sendBufSize.foreach(bootstrap.childOption[JInt](ChannelOption.SO_SNDBUF, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ private[finagle] object ChannelRequestStatsHandler {
* A channel stats handler that keeps per-connection request
* statistics. This handler should be after the request codec in the
* stack as it assumes messages are POJOs with request/responses.
* @param statsReceiver
*
* @param statsReceiver the [[StatsReceiver]] to which stats are reported
*/
private[finagle] class ChannelRequestStatsHandler(statsReceiver: StatsReceiver)
extends ChannelInboundHandlerAdapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[netty4] class Netty4ChannelInitializer(
(None, None)

val channelSnooper =
if (params[Transport.Verbose].b)
if (params[Transport.Verbose].enabled)
Some(ChannelSnooper(label)(logger.log(Level.INFO, _, _)))
else
None
Expand Down

0 comments on commit 200f8b5

Please sign in to comment.