|
| 1 | +package com.twitter.finagle.zipkin.core |
| 2 | + |
| 3 | +import com.github.benmanes.caffeine.cache.Caffeine |
| 4 | +import com.github.benmanes.caffeine.cache.RemovalCause |
| 5 | +import com.twitter.finagle.stats.NullStatsReceiver |
| 6 | +import com.twitter.finagle.stats.StatsReceiver |
| 7 | +import com.twitter.finagle.tracing.Record |
| 8 | +import com.twitter.finagle.tracing.TraceId |
| 9 | +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.BitMask |
| 10 | +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.Multiplier |
| 11 | +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeFalse |
| 12 | +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.SomeTrue |
| 13 | +import com.twitter.finagle.zipkin.core.DurationFilteringTracer.salt |
| 14 | +import com.twitter.util.Duration |
| 15 | +import com.twitter.util.Future |
| 16 | +import java.io.FileOutputStream |
| 17 | +import java.lang.ThreadLocal |
| 18 | +import java.util.concurrent.ConcurrentMap |
| 19 | +import org.apache.thrift.TSerializer |
| 20 | +import scala.util.Random |
| 21 | +import zipkin.internal.ApplyTimestampAndDuration |
| 22 | +import zipkin.Codec |
| 23 | +import zipkin2.codec.SpanBytesDecoder |
| 24 | +import java.lang.{Long => JLong} |
| 25 | +import scala.util.Using |
| 26 | +import zipkin2.codec.SpanBytesEncoder |
| 27 | + |
| 28 | +object DurationFilteringTracer { |
| 29 | + // Use same sampling params here as in com.twitter.finagle.zipkin.core.Sampler |
| 30 | + private val Multiplier = (1 << 24).toFloat |
| 31 | + private val BitMask = (Multiplier - 1).toInt |
| 32 | + private val salt = new Random().nextInt() |
| 33 | + |
| 34 | + private val SomeTrue = Some(true) |
| 35 | + private val SomeFalse = Some(false) |
| 36 | +} |
| 37 | + |
| 38 | +class DurationFilteringTracer( |
| 39 | + duration: Duration, |
| 40 | + samplingRate: Float, |
| 41 | + outputPath: String, |
| 42 | + maxInFlightTraces: Int = 2000, |
| 43 | + statsReceiver: StatsReceiver = NullStatsReceiver) |
| 44 | + extends RawZipkinTracer { |
| 45 | + |
| 46 | + if (samplingRate < 0 || samplingRate > 1) { |
| 47 | + throw new IllegalArgumentException( |
| 48 | + "Sample rate not within the valid range of 0-1, was " + samplingRate |
| 49 | + ) |
| 50 | + } |
| 51 | + |
| 52 | + private[this] val persistedSpansCounter = statsReceiver.counter("persistedSpans") |
| 53 | + private[this] val evictions = statsReceiver.counter("evictions") |
| 54 | + |
| 55 | + private[this] val thriftSerialiser = ThreadLocal.withInitial(() => new TSerializer()) |
| 56 | + |
| 57 | + // map from TraceID -> spans within that trace |
| 58 | + private[this] val spanRoots: ConcurrentMap[Long, List[zipkin.Span]] = Caffeine |
| 59 | + .newBuilder() |
| 60 | + .asInstanceOf[Caffeine[Long, List[zipkin.Span]]] |
| 61 | + .maximumSize(maxInFlightTraces) |
| 62 | + .evictionListener((_: Long, v: Seq[zipkin.Span], _: RemovalCause) => { |
| 63 | + evictions.incr() |
| 64 | + }) |
| 65 | + .build[Long, List[zipkin.Span]].asMap() |
| 66 | + |
| 67 | + // sentinel value that will get set for a trace ID when we've seen at least one span in that trace |
| 68 | + // with duration >= threshold |
| 69 | + private[this] val durationThresholdMetSentinel = List[zipkin.Span]() |
| 70 | + |
| 71 | + val cacheSizeGauge = statsReceiver.addGauge("cacheSize")(spanRoots.size().floatValue()) |
| 72 | + |
| 73 | + override def record(record: Record): Unit = { |
| 74 | + if (sampleTrace(record.traceId).contains(true)) { |
| 75 | + super.record(record) |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + override def sampleTrace(traceId: TraceId): Option[Boolean] = { |
| 80 | + // Same as in com.twitter.finagle.zipkin.core.Sampler, except here we don't check if |
| 81 | + // the traceId has already had Some(false) set, since we want to consider all traceIds |
| 82 | + if (((JLong.hashCode(traceId.traceId.toLong) ^ salt) & BitMask) < samplingRate * Multiplier) |
| 83 | + SomeTrue |
| 84 | + else |
| 85 | + SomeFalse |
| 86 | + } |
| 87 | + |
| 88 | + override def getSampleRate: Float = samplingRate |
| 89 | + |
| 90 | + override def sendSpans(spans: Seq[Span]): Future[Unit] = { |
| 91 | + spans.map(convertToZipkinSpan).foreach { span => |
| 92 | + if (span.duration >= duration.inMicroseconds) { |
| 93 | + val existingSpansForTrace = spanRoots.put(span.traceId, durationThresholdMetSentinel) |
| 94 | + persistSpans(span, existingSpansForTrace) |
| 95 | + } else { |
| 96 | + val existingSpansForTrace = spanRoots.compute( |
| 97 | + span.traceId, |
| 98 | + { |
| 99 | + case (_, null) => List(span) // this is the first span for the trace |
| 100 | + case (_, v) if v.eq(durationThresholdMetSentinel) => |
| 101 | + durationThresholdMetSentinel // duration threshold has already been met |
| 102 | + case (_, v) => |
| 103 | + v.+:(span) // there are existing spans, but duration threshold not yet met |
| 104 | + } |
| 105 | + ) |
| 106 | + |
| 107 | + if (existingSpansForTrace.eq(durationThresholdMetSentinel)) { |
| 108 | + persistSpans(span, List.empty) |
| 109 | + } |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + Future.Done |
| 114 | + } |
| 115 | + |
| 116 | + override def isActivelyTracing(traceId: TraceId): Boolean = sampleTrace(traceId).contains(true) |
| 117 | + |
| 118 | + private[this] def convertToZipkinSpan(span: Span): zipkin.Span = { |
| 119 | + val serialisedBytes = thriftSerialiser.get().serialize(span.toThrift) |
| 120 | + val zipkinV1ThriftSpan = zipkin.Codec.THRIFT.readSpan(serialisedBytes) |
| 121 | + ApplyTimestampAndDuration.apply(zipkinV1ThriftSpan) |
| 122 | + } |
| 123 | + |
| 124 | + private[this] def persistSpans(parent: zipkin.Span, children: Seq[zipkin.Span]): Unit = { |
| 125 | + val spansToPersist = if (children != null) children :+ parent else Seq(parent) |
| 126 | + persistedSpansCounter.incr(spansToPersist.size) |
| 127 | + Using(new FileOutputStream(outputPath, true)) { fileOutputStream => |
| 128 | + spansToPersist.foreach { span => |
| 129 | + val converted = convertV1SpanToV2(span) |
| 130 | + fileOutputStream.write( |
| 131 | + SpanBytesEncoder.JSON_V2 |
| 132 | + .encode(converted)) |
| 133 | + fileOutputStream.write('\n') |
| 134 | + } |
| 135 | + fileOutputStream.flush() |
| 136 | + } |
| 137 | + } |
| 138 | + |
| 139 | + private[this] def convertV1SpanToV2(span: zipkin.Span): zipkin2.Span = { |
| 140 | + val spanBytesV1 = Codec.THRIFT.writeSpan(span) |
| 141 | + SpanBytesDecoder.THRIFT.decodeOne(spanBytesV1) |
| 142 | + } |
| 143 | +} |
0 commit comments