@@ -4,6 +4,12 @@ import com.twitter.finagle.offload.OffloadFilterAdmissionControl
4
4
import com .twitter .finagle .offload .OffloadFuturePool
5
5
import com .twitter .finagle .tracing .Trace
6
6
import com .twitter .finagle ._
7
+ import com .twitter .finagle .stats .DefaultStatsReceiver
8
+ import com .twitter .finagle .stats .HistogramFormat
9
+ import com .twitter .finagle .stats .MetricBuilder
10
+ import com .twitter .finagle .stats .MetricBuilder .HistogramType
11
+ import com .twitter .finagle .stats .MetricUsageHint
12
+ import com .twitter .finagle .stats .StatsReceiver
7
13
import com .twitter .util ._
8
14
import java .util .concurrent .ExecutorService
9
15
import scala .runtime .NonLocalReturnControl
@@ -59,7 +65,18 @@ object OffloadFilter {
59
65
private [finagle] def server [Req , Rep ]: Stackable [ServiceFactory [Req , Rep ]] =
60
66
new ServerModule [Req , Rep ]
61
67
62
- final class Client [Req , Rep ](pool : FuturePool ) extends SimpleFilter [Req , Rep ] {
68
+ final class Client [Req , Rep ](pool : FuturePool , statsReceiver : StatsReceiver )
69
+ extends SimpleFilter [Req , Rep ] {
70
+
71
+ def this (pool : FuturePool ) = this (pool, DefaultStatsReceiver .scope(" client_offload_filter" ))
72
+
73
+ private [this ] val applyTimeNs = statsReceiver.stat(
74
+ MetricBuilder (metricType = HistogramType )
75
+ .withHistogramFormat(HistogramFormat .ShortSummary )
76
+ .withPercentiles(0.99 , 0.999 , 0.9999 )
77
+ .withMetricUsageHints(Set (MetricUsageHint .HighContention ))
78
+ .withName(" apply_time_ns" )
79
+ )
63
80
64
81
def apply (request : Req , service : Service [Req , Rep ]): Future [Rep ] = {
65
82
// What we're trying to achieve is to ensure all continuations spawn out of a future returned
@@ -83,7 +100,11 @@ object OffloadFilter {
83
100
val response = service(request)
84
101
val shifted = Promise .interrupts[Rep ](response)
85
102
response.respond { t =>
86
- pool(shifted.update(t))
103
+ pool {
104
+ val startNs = System .nanoTime()
105
+ shifted.update(t)
106
+ applyTimeNs.add(System .nanoTime() - startNs)
107
+ }
87
108
88
109
val tracing = Trace ()
89
110
if (tracing.isActivelyTracing) {
@@ -178,15 +199,17 @@ object OffloadFilter {
178
199
}
179
200
180
201
private final class ClientModule [Req , Rep ]
181
- extends Stack .Module1 [ Param , ServiceFactory [Req , Rep ]] {
202
+ extends Stack .Module2 [param. Stats , Param , ServiceFactory [Req , Rep ]] {
182
203
183
204
def make (
205
+ statsParam : param.Stats ,
184
206
p : Param ,
185
207
next : ServiceFactory [Req , Rep ]
186
208
): ServiceFactory [Req , Rep ] = {
187
209
p.pool match {
188
210
case Some (pool) =>
189
- new Client (pool).andThen(next)
211
+ val param .Stats (statsReceiver) = statsParam
212
+ new Client (pool, statsReceiver.scope(" offload_filter" )).andThen(next)
190
213
case None => next
191
214
}
192
215
}
0 commit comments