Skip to content

Commit 7e255d3

Browse files
Ivan Gorbachevjenkins
Ivan Gorbachev
authored and
jenkins
committed
finagle-core: Halve Netty tasks in Netty4 pipeline client
Problem The pipelining client is used for the Memcache client. Extra tasks are generated for the Netty event loop during message sending and receiving. Solution When receiving a response from the server, we process the response in the current thread if it is a Netty event loop thread. When sending a message, we do not hand off the message to a Netty thread inside Netty4PushChannelHandle, if the sending is already performed within the Netty event loop. Differential Revision: https://phabricator.twitter.biz/D1193718
1 parent 50265ef commit 7e255d3

File tree

7 files changed

+80
-32
lines changed

7 files changed

+80
-32
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Runtime Behavior Changes
2323
* finagle-core: `Backoff.equalJittered` is now deprecated and falls back to `exponentialJittered`. ``PHAB_ID=D1182535``
2424
* finagle-core: `PipeliningClientPushSession` now collects stats `epoll_queue_delay_ns` and `message_send_latency_ns`.
2525
``PHAB_ID=D1185421``
26+
* finagle-core: Halve Netty tasks in Netty4 pipeline client. ``PHAB_ID=D1193718``
2627

2728
New Features
2829
~~~~~~~~~~

finagle-core/src/main/scala/com/twitter/finagle/pushsession/PipeliningClientPushSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class PipeliningClientPushSession[In, Out](
154154
else {
155155
h_queue.offer(p)
156156
h_queueSize += 1
157-
handle.send(request) { _ =>
157+
handle.sendInsideEventLoop(request) { _ =>
158158
messageSendLatency.add(System.nanoTime() - handleStartTime)
159159
}
160160
}

finagle-core/src/main/scala/com/twitter/finagle/pushsession/PushChannelHandle.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.twitter.finagle.pushsession
22

3-
import com.twitter.finagle.{ClientConnection, Status}
3+
import com.twitter.finagle.ClientConnection
4+
import com.twitter.finagle.Status
45
import com.twitter.finagle.ssl.session.SslSessionInfo
5-
import com.twitter.util.{Closable, Try}
6+
import com.twitter.util.Closable
7+
import com.twitter.util.Try
68
import java.util.concurrent.Executor
79

810
/**
@@ -60,6 +62,8 @@ trait PushChannelHandle[In, Out] extends Closable with ClientConnection {
6062
*/
6163
def send(message: Out)(onComplete: Try[Unit] => Unit): Unit
6264

65+
def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit
66+
6367
/**
6468
* Write a message to the underlying IO pipeline.
6569
*

finagle-core/src/main/scala/com/twitter/finagle/pushsession/PushChannelHandleProxy.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package com.twitter.finagle.pushsession
22

33
import com.twitter.finagle.Status
44
import com.twitter.finagle.ssl.session.SslSessionInfo
5-
import com.twitter.util.{Future, Time, Try}
5+
import com.twitter.util.Future
6+
import com.twitter.util.Time
7+
import com.twitter.util.Try
68
import java.net.SocketAddress
79
import java.util.concurrent.Executor
810

@@ -25,6 +27,10 @@ abstract class PushChannelHandleProxy[In, Out](underlying: PushChannelHandle[In,
2527
def send(message: Out)(onComplete: (Try[Unit]) => Unit): Unit =
2628
underlying.send(message)(onComplete)
2729

30+
def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit = {
31+
underlying.sendInsideEventLoop(message)(onComplete)
32+
}
33+
2834
def sendAndForget(message: Out): Unit = underlying.sendAndForget(message)
2935

3036
def sendAndForget(messages: Iterable[Out]): Unit = underlying.sendAndForget(messages)

finagle-core/src/test/scala/com/twitter/finagle/pushsession/utils/MockChannelHandle.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
package com.twitter.finagle.pushsession.utils
22

33
import com.twitter.finagle.Status
4-
import com.twitter.finagle.pushsession.{PushChannelHandle, PushSession}
5-
import com.twitter.finagle.ssl.session.{NullSslSessionInfo, SslSessionInfo}
6-
import com.twitter.util.{Future, Promise, Return, Time, Try}
7-
import java.net.{InetSocketAddress, SocketAddress}
4+
import com.twitter.finagle.pushsession.PushChannelHandle
5+
import com.twitter.finagle.pushsession.PushSession
6+
import com.twitter.finagle.ssl.session.NullSslSessionInfo
7+
import com.twitter.finagle.ssl.session.SslSessionInfo
8+
import com.twitter.util.Future
9+
import com.twitter.util.Promise
10+
import com.twitter.util.Return
11+
import com.twitter.util.Time
12+
import com.twitter.util.Try
13+
import java.net.InetSocketAddress
14+
import java.net.SocketAddress
815
import scala.collection.mutable
916

1017
class MockChannelHandle[In, Out](var currentSession: PushSession[In, Out])
@@ -81,4 +88,8 @@ class MockChannelHandle[In, Out](var currentSession: PushSession[In, Out])
8188
closedCalled = true
8289
onClose
8390
}
91+
92+
override def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit = {
93+
pendingWrites += SendOne(message, onComplete)
94+
}
8495
}

finagle-mux/src/test/scala/com/twitter/finagle/mux/QueueChannelHandle.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
package com.twitter.finagle.mux
22

33
import com.twitter.concurrent.AsyncQueue
4-
import com.twitter.finagle.{ChannelClosedException, Status}
5-
import com.twitter.finagle.pushsession.{PushChannelHandle, PushSession, SentinelSession}
6-
import com.twitter.finagle.ssl.session.{NullSslSessionInfo, SslSessionInfo}
4+
import com.twitter.finagle.ChannelClosedException
5+
import com.twitter.finagle.Status
6+
import com.twitter.finagle.pushsession.PushChannelHandle
7+
import com.twitter.finagle.pushsession.PushSession
8+
import com.twitter.finagle.pushsession.SentinelSession
9+
import com.twitter.finagle.ssl.session.NullSslSessionInfo
10+
import com.twitter.finagle.ssl.session.SslSessionInfo
711
import com.twitter.finagle.util.Updater
8-
import com.twitter.util.{Future, Promise, Return, Throw, Time, Try}
12+
import com.twitter.util.Future
13+
import com.twitter.util.Promise
14+
import com.twitter.util.Return
15+
import com.twitter.util.Throw
16+
import com.twitter.util.Time
17+
import com.twitter.util.Try
918
import java.net.SocketAddress
1019
import java.util.concurrent.Executor
1120
import scala.util.control.NonFatal
@@ -73,6 +82,9 @@ private[mux] class QueueChannelHandle[In, Out](destinationQueue: AsyncQueue[Out]
7382
def send(message: Out)(onComplete: Try[Unit] => Unit): Unit =
7483
send(message :: Nil)(onComplete)
7584

85+
def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit =
86+
send(message :: Nil)(onComplete)
87+
7688
def send(messages: Iterable[Out])(onComplete: Try[Unit] => Unit): Unit = {
7789
serialExecutor.execute(new Runnable {
7890
def run(): Unit = {

finagle-netty4/src/main/scala/com/twitter/finagle/netty4/pushsession/Netty4PushChannelHandle.scala

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
package com.twitter.finagle.netty4.pushsession
22

3-
import com.twitter.finagle.{
4-
ChannelClosedException,
5-
ChannelException,
6-
Status,
7-
UnknownChannelException
8-
}
9-
import com.twitter.finagle.pushsession.{PushChannelHandle, PushSession}
10-
import com.twitter.finagle.ssl.session.{NullSslSessionInfo, SslSessionInfo, UsingSslSessionInfo}
3+
import com.twitter.finagle.ChannelClosedException
4+
import com.twitter.finagle.ChannelException
5+
import com.twitter.finagle.Status
6+
import com.twitter.finagle.UnknownChannelException
7+
import com.twitter.finagle.pushsession.PushChannelHandle
8+
import com.twitter.finagle.pushsession.PushSession
9+
import com.twitter.finagle.ssl.session.NullSslSessionInfo
10+
import com.twitter.finagle.ssl.session.SslSessionInfo
11+
import com.twitter.finagle.ssl.session.UsingSslSessionInfo
1112
import com.twitter.finagle.stats.StatsReceiver
1213
import com.twitter.logging.Logger
1314
import com.twitter.util._
1415
import io.netty.buffer.ByteBuf
15-
import io.netty.channel.{
16-
Channel,
17-
ChannelHandlerContext,
18-
ChannelInboundHandlerAdapter,
19-
ChannelPipeline,
20-
EventLoop
21-
}
16+
import io.netty.channel.Channel
17+
import io.netty.channel.ChannelHandlerContext
18+
import io.netty.channel.ChannelInboundHandlerAdapter
19+
import io.netty.channel.ChannelPipeline
20+
import io.netty.channel.EventLoop
2221
import io.netty.handler.ssl.SslHandler
2322
import io.netty.util
2423
import io.netty.util.concurrent.GenericFutureListener
@@ -142,12 +141,23 @@ private final class Netty4PushChannelHandle[In, Out] private (
142141
}
143142

144143
// See note above about the scheduling of send messages
145-
def send(message: Out)(continuation: (Try[Unit]) => Unit): Unit = {
144+
def send(message: Out)(onComplete: (Try[Unit]) => Unit): Unit = {
146145
safeExecutor.safeExecute(new SafeRunnable {
147-
def tryRun(): Unit = handleWriteAndFlush(message, continuation)
146+
def tryRun(): Unit = handleWriteAndFlush(message, onComplete)
148147
})
149148
}
150149

150+
def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit = {
151+
if (!ch.eventLoop().inEventLoop()) {
152+
throw new IllegalStateException(
153+
s"Expected to be called from within the `Channel`s " +
154+
s"associated `EventLoop` (${ch.eventLoop}), instead called " +
155+
s"from thread ${Thread.currentThread}")
156+
}
157+
158+
handleWriteAndFlush(message, onComplete)
159+
}
160+
151161
// See note above about the scheduling of send messages
152162
def sendAndForget(message: Out): Unit =
153163
safeExecutor.safeExecute(new SafeRunnable {
@@ -268,9 +278,13 @@ private final class Netty4PushChannelHandle[In, Out] private (
268278

269279
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
270280
val m = msg.asInstanceOf[In]
271-
safeExecutor.safeExecute(new SafeRunnable {
272-
def tryRun(): Unit = session.receive(m)
273-
})
281+
if (!ctx.channel().eventLoop().inEventLoop()) {
282+
safeExecutor.safeExecute(new SafeRunnable {
283+
def tryRun(): Unit = session.receive(m)
284+
})
285+
} else {
286+
session.receive(m)
287+
}
274288
}
275289

276290
override def channelInactive(ctx: ChannelHandlerContext): Unit =

0 commit comments

Comments
 (0)