Skip to content

Commit e6b54c4

Browse files
committed
KG-217. Add a span adapter for the Open Telemetry feature
- Span adapter provides an ability to customize the span behavior for the Open Telemetry that can be used by a specific provider which expects a specific span structure.
1 parent 7f69dba commit e6b54c4

File tree

18 files changed

+436
-177
lines changed

18 files changed

+436
-177
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package ai.koog.agents.features.opentelemetry.extension
2+
3+
import ai.koog.agents.features.opentelemetry.attribute.Attribute
4+
import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute
5+
import ai.koog.agents.features.opentelemetry.event.EventBodyField
6+
import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent
7+
import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan
8+
9+
internal fun GenAIAgentEvent.toSpanAttributes(span: GenAIAgentSpan, verbose: Boolean = false) {
10+
// Convert event attributes to Span Attributes
11+
attributes.forEach { attribute -> span.addAttribute(attribute) }
12+
13+
// Convert Body Fields to Span Attributes
14+
bodyFields.forEach { bodyField ->
15+
val attribute = bodyField.toGenAIAttribute(verbose)
16+
span.addAttribute(attribute)
17+
}
18+
}
19+
20+
internal fun EventBodyField.toGenAIAttribute(verbose: Boolean): Attribute {
21+
return CustomAttribute(key, value, verbose = verbose)
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package ai.koog.agents.features.opentelemetry.extension
2+
3+
import ai.koog.agents.features.opentelemetry.attribute.Attribute
4+
import ai.koog.agents.features.opentelemetry.attribute.toSdkAttributes
5+
import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent
6+
import ai.koog.agents.features.opentelemetry.span.SpanEndStatus
7+
import io.opentelemetry.api.trace.Span
8+
import io.opentelemetry.api.trace.SpanBuilder
9+
import io.opentelemetry.api.trace.StatusCode
10+
11+
internal fun Span.setSpanStatus(endStatus: SpanEndStatus? = null) {
12+
val statusCode = endStatus?.code ?: StatusCode.OK
13+
val statusDescription = endStatus?.description ?: ""
14+
this.setStatus(statusCode, statusDescription)
15+
}
16+
17+
internal fun SpanBuilder.setAttributes(attributes: List<Attribute>) {
18+
setAllAttributes(attributes.toSdkAttributes())
19+
}
20+
21+
internal fun Span.setAttributes(attributes: List<Attribute>) {
22+
setAllAttributes(attributes.toSdkAttributes())
23+
}
24+
25+
internal fun Span.setEvents(events: List<GenAIAgentEvent>) {
26+
events.forEach { event ->
27+
// The 'opentelemetry-java' SDK does not have support for event body fields at the moment.
28+
// Pass body fields as attributes until an API is updated.
29+
val attributes = buildList {
30+
addAll(event.attributes)
31+
if (event.bodyFields.isNotEmpty()) {
32+
add(event.bodyFieldsAsAttribute())
33+
}
34+
}
35+
36+
addEvent(event.name, attributes.toSdkAttributes())
37+
}
38+
}

agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class OpenTelemetry {
6464
val interceptContext = InterceptContext(this, OpenTelemetry())
6565
val tracer = config.tracer
6666
val spanProcessor = SpanProcessor(tracer)
67+
val spanAdapter = config.spanAdapter
6768

6869
// Stop all unfinished spans on a process finish to report them
6970
Runtime.getRuntime().addShutdownHook(
@@ -106,6 +107,7 @@ public class OpenTelemetry {
106107
strategyName = eventContext.strategy.name
107108
)
108109

110+
spanAdapter?.onBeforeSpanStarted(invokeAgentSpan)
109111
spanProcessor.startSpan(invokeAgentSpan)
110112
}
111113

@@ -123,7 +125,10 @@ public class OpenTelemetry {
123125
agentId = eventContext.agentId,
124126
runId = eventContext.runId
125127
)
126-
spanProcessor.endSpan(spanId = invokeAgentSpanId)
128+
129+
val invokeAgentSpan = spanProcessor.getSpanOrThrow<InvokeAgentSpan>(invokeAgentSpanId)
130+
spanAdapter?.onBeforeSpanFinished(invokeAgentSpan)
131+
spanProcessor.endSpan(span = invokeAgentSpan)
127132
}
128133

129134
pipeline.interceptAgentRunError(interceptContext) { eventContext ->
@@ -141,13 +146,16 @@ public class OpenTelemetry {
141146
runId = eventContext.runId
142147
)
143148

144-
val finishAttributes = buildList {
145-
add(SpanAttributes.Response.FinishReasons(listOf(SpanAttributes.Response.FinishReasonType.Error)))
146-
}
149+
val invokeAgentSpan = spanProcessor.getSpanOrThrow<InvokeAgentSpan>(invokeAgentSpanId)
150+
invokeAgentSpan.addAttribute(
151+
attribute = SpanAttributes.Response.FinishReasons(
152+
listOf(SpanAttributes.Response.FinishReasonType.Error)
153+
)
154+
)
147155

156+
spanAdapter?.onBeforeSpanFinished(invokeAgentSpan)
148157
spanProcessor.endSpan(
149-
spanId = invokeAgentSpanId,
150-
attributes = finishAttributes,
158+
span = invokeAgentSpan,
151159
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message)
152160
)
153161
}
@@ -156,7 +164,10 @@ public class OpenTelemetry {
156164
logger.debug { "Execute OpenTelemetry before agent closed handler" }
157165

158166
val agentSpanId = CreateAgentSpan.createId(agentId = eventContext.agentId)
159-
spanProcessor.endSpan(agentSpanId)
167+
val agentSpan = spanProcessor.getSpanOrThrow<CreateAgentSpan>(agentSpanId)
168+
169+
spanAdapter?.onBeforeSpanFinished(agentSpan)
170+
spanProcessor.endSpan(span = agentSpan)
160171
}
161172

162173
//endregion Agent
@@ -182,6 +193,7 @@ public class OpenTelemetry {
182193
nodeName = eventContext.node.name,
183194
)
184195

196+
spanAdapter?.onBeforeSpanStarted(nodeExecuteSpan)
185197
spanProcessor.startSpan(nodeExecuteSpan)
186198
}
187199

@@ -198,7 +210,10 @@ public class OpenTelemetry {
198210
nodeName = eventContext.node.name
199211
)
200212

201-
spanProcessor.endSpan(nodeExecuteSpanId)
213+
val nodeExecuteSpan = spanProcessor.getSpanOrThrow<NodeExecuteSpan>(nodeExecuteSpanId)
214+
215+
spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan)
216+
spanProcessor.endSpan(nodeExecuteSpan)
202217
}
203218

204219
pipeline.interceptNodeExecutionError(interceptContext) { eventContext ->
@@ -214,8 +229,11 @@ public class OpenTelemetry {
214229
nodeName = eventContext.node.name
215230
)
216231

232+
val nodeExecuteSpan = spanProcessor.getSpanOrThrow<NodeExecuteSpan>(nodeExecuteSpanId)
233+
234+
spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan)
217235
spanProcessor.endSpan(
218-
spanId = nodeExecuteSpanId,
236+
span = nodeExecuteSpan,
219237
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message)
220238
)
221239
}
@@ -256,9 +274,6 @@ public class OpenTelemetry {
256274
promptId = promptId,
257275
)
258276

259-
// Start span
260-
spanProcessor.startSpan(inferenceSpan)
261-
262277
// Add events to the InferenceSpan after the span is created
263278
val eventsFromMessages = eventContext.prompt.messages.mapNotNull { message ->
264279
when (message) {
@@ -278,6 +293,10 @@ public class OpenTelemetry {
278293
}
279294

280295
inferenceSpan.addEvents(eventsFromMessages)
296+
297+
// Start span
298+
spanAdapter?.onBeforeSpanStarted(inferenceSpan)
299+
spanProcessor.startSpan(inferenceSpan)
281300
}
282301

283302
pipeline.interceptAfterLLMCall(interceptContext) { eventContext ->
@@ -312,14 +331,15 @@ public class OpenTelemetry {
312331
}
313332

314333
eventContext.moderationResponse?.let { response ->
315-
ModerationResponseEvent(provider, response, config.isVerbose)
334+
add(ModerationResponseEvent(provider, response, config.isVerbose))
316335
}
317336
}
318337

319338
inferenceSpan.addEvents(eventsToAdd)
320339

321340
// Stop InferenceSpan
322-
spanProcessor.endSpan(inferenceSpanId)
341+
spanAdapter?.onBeforeSpanFinished(inferenceSpan)
342+
spanProcessor.endSpan(inferenceSpan)
323343
}
324344

325345
//endregion LLM Call
@@ -349,6 +369,7 @@ public class OpenTelemetry {
349369
toolArgs = eventContext.toolArgs,
350370
)
351371

372+
spanAdapter?.onBeforeSpanStarted(executeToolSpan)
352373
spanProcessor.startSpan(executeToolSpan)
353374
}
354375

@@ -368,17 +389,17 @@ public class OpenTelemetry {
368389
toolName = eventContext.tool.name
369390
)
370391

392+
val executeToolSpan = spanProcessor.getSpanOrThrow<ExecuteToolSpan>(executeToolSpanId)
393+
371394
// End the ExecuteToolSpan span
372-
val finishAttributes = buildList {
373-
eventContext.result?.toStringDefault()?.let { result ->
374-
add(SpanAttributes.Tool.OutputValue(result))
375-
}
395+
eventContext.result?.let { result ->
396+
executeToolSpan.addAttribute(
397+
attribute = SpanAttributes.Tool.OutputValue(output = result.toStringDefault())
398+
)
376399
}
377400

378-
spanProcessor.endSpan(
379-
spanId = executeToolSpanId,
380-
attributes = finishAttributes
381-
)
401+
spanAdapter?.onBeforeSpanFinished(span = executeToolSpan)
402+
spanProcessor.endSpan(span = executeToolSpan)
382403
}
383404

384405
pipeline.interceptToolCallFailure(interceptContext) { eventContext ->
@@ -397,14 +418,15 @@ public class OpenTelemetry {
397418
toolName = eventContext.tool.name
398419
)
399420

421+
val executeToolSpan = spanProcessor.getSpanOrThrow<ExecuteToolSpan>(executeToolSpanId)
422+
executeToolSpan.addAttribute(
423+
attribute = CommonAttributes.Error.Type(eventContext.throwable.message ?: "Unknown tool call error")
424+
)
425+
400426
// End the ExecuteToolSpan span
427+
spanAdapter?.onBeforeSpanFinished(executeToolSpan)
401428
spanProcessor.endSpan(
402-
spanId = executeToolSpanId,
403-
attributes = listOf(
404-
CommonAttributes.Error.Type(
405-
eventContext.throwable.message ?: "Unknown tool call error"
406-
)
407-
),
429+
span = executeToolSpan,
408430
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message)
409431
)
410432
}
@@ -430,10 +452,15 @@ public class OpenTelemetry {
430452
toolName = toolName
431453
)
432454

455+
val executeToolSpan = spanProcessor.getSpanOrThrow<ExecuteToolSpan>(executeToolSpanId)
456+
executeToolSpan.addAttribute(
457+
attribute = CommonAttributes.Error.Type(eventContext.error)
458+
)
459+
433460
// End the ExecuteToolSpan span
461+
spanAdapter?.onBeforeSpanFinished(executeToolSpan)
434462
spanProcessor.endSpan(
435-
spanId = executeToolSpanId,
436-
attributes = listOf(CommonAttributes.Error.Type(eventContext.error)),
463+
span = executeToolSpan,
437464
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.error)
438465
)
439466
}

agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ai.koog.agents.features.opentelemetry.feature
22

33
import ai.koog.agents.core.feature.config.FeatureConfig
44
import ai.koog.agents.features.opentelemetry.attribute.addAttributes
5+
import ai.koog.agents.features.opentelemetry.integrations.SpanAdapter
56
import io.github.oshai.kotlinlogging.KotlinLogging
67
import io.opentelemetry.api.common.AttributeKey
78
import io.opentelemetry.api.common.Attributes
@@ -68,6 +69,8 @@ public class OpenTelemetryConfig : FeatureConfig() {
6869

6970
private var _verbose: Boolean = false
7071

72+
private var _spanAdapter: SpanAdapter? = null
73+
7174
/**
7275
* Indicates whether verbose telemetry data is enabled.
7376
*
@@ -122,6 +125,9 @@ public class OpenTelemetryConfig : FeatureConfig() {
122125
public val serviceVersion: String
123126
get() = _serviceVersion
124127

128+
internal val spanAdapter: SpanAdapter?
129+
get() = _spanAdapter
130+
125131
/**
126132
* Sets the service information for the OpenTelemetry configuration.
127133
* This information is used to identify the service in telemetry data.
@@ -206,6 +212,18 @@ public class OpenTelemetryConfig : FeatureConfig() {
206212
_sdk = sdk
207213
}
208214

215+
/**
216+
* Adds a custom span adapter for post-processing GenAI agent spans.
217+
* The adapter can modify span data, add attributes/events, or perform other
218+
* post-processing logic before spans are completed.
219+
*
220+
* @param adapter The ProcessSpanAdapter implementation that will handle
221+
* post-processing of GenAI agent spans
222+
*/
223+
internal fun addSpanAdapter(adapter: SpanAdapter) {
224+
_spanAdapter = adapter
225+
}
226+
209227
//region Private Methods
210228

211229
private fun initializeOpenTelemetry(): OpenTelemetrySdk {

agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Langfuse.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public fun OpenTelemetryConfig.addLangfuseExporter(
2626
langfuseSecretKey: String? = null,
2727
timeout: Duration = 10.seconds,
2828
) {
29-
val url = langfuseUrl ?: System.getenv()["LANGFUSE_URL"] ?: "https://cloud.langfuse.com"
29+
val url = langfuseUrl ?: System.getenv()["LANGFUSE_HOST"] ?: "https://cloud.langfuse.com"
3030

3131
logger.debug { "Configured endpoint for Langfuse telemetry: $url" }
3232

@@ -45,6 +45,8 @@ public fun OpenTelemetryConfig.addLangfuseExporter(
4545
.addHeader("Authorization", "Basic $auth")
4646
.build()
4747
)
48+
49+
// TODO: Add span adapter for Langfuse
4850
}
4951

5052
private val logger = KotlinLogging.logger {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package ai.koog.agents.features.opentelemetry.integrations
2+
3+
import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan
4+
5+
/**
6+
* Adapter abstract class for post-processing GenAI agent spans.
7+
*
8+
* This class allows customization of how GenAI agent spans are processed after they are created.
9+
* Implementations can modify span data, add additional attributes or events, or perform any other
10+
* post-processing logic needed before the span is completed.
11+
*
12+
* The abstract class provides a single method called after a span is created but before it is finished.
13+
*/
14+
internal abstract class SpanAdapter {
15+
16+
/**
17+
* Invoked before the specified GenAIAgentSpan is started. This method allows implementations to
18+
* perform any setup or customization required prior to the span being initialized and used.
19+
*
20+
* @param span The GenAI agent span to process
21+
*/
22+
open fun onBeforeSpanStarted(span: GenAIAgentSpan) { }
23+
24+
/**
25+
* Invoked before the specified GenAIAgentSpan is finished. This method allows implementations
26+
* to perform any final processing, modifications, or cleanup tasks required before the span
27+
* is completed.
28+
*
29+
* @param span The GenAI agent span to process before it is finished.
30+
*/
31+
open fun onBeforeSpanFinished(span: GenAIAgentSpan) { }
32+
}

agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Weave.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public fun OpenTelemetryConfig.addWeaveExporter(
4545
.addHeader("Authorization", "Basic $auth")
4646
.build()
4747
)
48+
49+
// TODO: Add span adapter for Langfuse
4850
}
4951

50-
private val logger = KotlinLogging.logger {}
52+
private val logger = KotlinLogging.logger { }

0 commit comments

Comments
 (0)