Skip to content

KG-217. Add ability to customize span data after been created on Koog side for a specific client #566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal class ChoiceEvent(
provider: LLMProvider,
private val message: Message.Response,
private val arguments: JsonObject? = null,
val index: Int,
override val verbose: Boolean = false,
) : GenAIAgentEvent {

Expand All @@ -20,7 +21,7 @@ internal class ChoiceEvent(
}

override val bodyFields: List<EventBodyField> = buildList {
add(EventBodyFields.Index(0))
add(EventBodyFields.Index(index))

when (message) {
is Message.Assistant -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ai.koog.agents.features.opentelemetry.extension

import ai.koog.agents.features.opentelemetry.attribute.Attribute
import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute
import ai.koog.agents.features.opentelemetry.event.EventBodyField
import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent
import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan

internal fun GenAIAgentEvent.toSpanAttributes(span: GenAIAgentSpan, verbose: Boolean = false) {

Check warning on line 9 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/GenAIAgentEventExt.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `toSpanAttributes` coverage is below the threshold 50%
// Convert event attributes to Span Attributes
attributes.forEach { attribute -> span.addAttribute(attribute) }

// Convert Body Fields to Span Attributes
bodyFields.forEach { bodyField ->
val attribute = bodyField.toGenAIAttribute(verbose)
span.addAttribute(attribute)
}
}

internal fun EventBodyField.toGenAIAttribute(verbose: Boolean): Attribute {

Check warning on line 20 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/GenAIAgentEventExt.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `toGenAIAttribute` coverage is below the threshold 50%
return CustomAttribute(key, value, verbose = verbose)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ai.koog.agents.features.opentelemetry.extension

import ai.koog.agents.features.opentelemetry.attribute.Attribute
import ai.koog.agents.features.opentelemetry.attribute.toSdkAttributes
import ai.koog.agents.features.opentelemetry.event.EventBodyField
import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent
import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan
import ai.koog.agents.features.opentelemetry.span.SpanEndStatus
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanBuilder
import io.opentelemetry.api.trace.StatusCode

internal fun Span.setSpanStatus(endStatus: SpanEndStatus? = null) {

Check warning on line 13 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/SpanExt.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `setSpanStatus` coverage is below the threshold 50%
val statusCode = endStatus?.code ?: StatusCode.OK
val statusDescription = endStatus?.description ?: ""
this.setStatus(statusCode, statusDescription)
}

internal fun SpanBuilder.setAttributes(attributes: List<Attribute>) {
setAllAttributes(attributes.toSdkAttributes())
}

internal fun Span.setAttributes(attributes: List<Attribute>) {
setAllAttributes(attributes.toSdkAttributes())
}

internal fun Span.setEvents(events: List<GenAIAgentEvent>) {
events.forEach { event ->
// The 'opentelemetry-java' SDK does not have support for event body fields at the moment.
// Pass body fields as attributes until an API is updated.
val attributes = buildList {
addAll(event.attributes)
if (event.bodyFields.isNotEmpty()) {
add(event.bodyFieldsAsAttribute())
}
}

addEvent(event.name, attributes.toSdkAttributes())
}
}

internal inline fun <reified TBodyField> GenAIAgentSpan.eventBodyFieldToAttribute(

Check warning on line 42 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/SpanExt.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `eventBodyFieldToAttribute` coverage is below the threshold 50%
event: GenAIAgentEvent,
attributeCreate: (TBodyField) -> Attribute
) where TBodyField : EventBodyField {
event.bodyFields.filterIsInstance<TBodyField>().forEach { bodyField ->
val attributeFromEvent = attributeCreate(bodyField)
this.addAttribute(attributeFromEvent)
}

this.removeEvent(event)
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class OpenTelemetry {
val interceptContext = InterceptContext(this, OpenTelemetry())
val tracer = config.tracer
val spanProcessor = SpanProcessor(tracer)
val spanAdapter = config.spanAdapter

// Stop all unfinished spans on a process finish to report them
Runtime.getRuntime().addShutdownHook(
Expand Down Expand Up @@ -106,6 +107,7 @@ public class OpenTelemetry {
strategyName = eventContext.strategy.name
)

spanAdapter?.onBeforeSpanStarted(invokeAgentSpan)
spanProcessor.startSpan(invokeAgentSpan)
}

Expand All @@ -123,7 +125,10 @@ public class OpenTelemetry {
agentId = eventContext.agentId,
runId = eventContext.runId
)
spanProcessor.endSpan(spanId = invokeAgentSpanId)

val invokeAgentSpan = spanProcessor.getSpanOrThrow<InvokeAgentSpan>(invokeAgentSpanId)
spanAdapter?.onBeforeSpanFinished(invokeAgentSpan)
spanProcessor.endSpan(span = invokeAgentSpan)
}

pipeline.interceptAgentRunError(interceptContext) { eventContext ->
Expand All @@ -141,13 +146,16 @@ public class OpenTelemetry {
runId = eventContext.runId
)

val finishAttributes = buildList {
add(SpanAttributes.Response.FinishReasons(listOf(SpanAttributes.Response.FinishReasonType.Error)))
}
val invokeAgentSpan = spanProcessor.getSpanOrThrow<InvokeAgentSpan>(invokeAgentSpanId)
invokeAgentSpan.addAttribute(
attribute = SpanAttributes.Response.FinishReasons(
listOf(SpanAttributes.Response.FinishReasonType.Error)
)
)

spanAdapter?.onBeforeSpanFinished(invokeAgentSpan)
spanProcessor.endSpan(
spanId = invokeAgentSpanId,
attributes = finishAttributes,
span = invokeAgentSpan,
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message)
)
}
Expand All @@ -156,7 +164,10 @@ public class OpenTelemetry {
logger.debug { "Execute OpenTelemetry before agent closed handler" }

val agentSpanId = CreateAgentSpan.createId(agentId = eventContext.agentId)
spanProcessor.endSpan(agentSpanId)
val agentSpan = spanProcessor.getSpanOrThrow<CreateAgentSpan>(agentSpanId)

spanAdapter?.onBeforeSpanFinished(agentSpan)
spanProcessor.endSpan(span = agentSpan)
}

//endregion Agent
Expand All @@ -182,6 +193,7 @@ public class OpenTelemetry {
nodeName = eventContext.node.name,
)

spanAdapter?.onBeforeSpanStarted(nodeExecuteSpan)
spanProcessor.startSpan(nodeExecuteSpan)
}

Expand All @@ -198,7 +210,10 @@ public class OpenTelemetry {
nodeName = eventContext.node.name
)

spanProcessor.endSpan(nodeExecuteSpanId)
val nodeExecuteSpan = spanProcessor.getSpanOrThrow<NodeExecuteSpan>(nodeExecuteSpanId)

spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan)
spanProcessor.endSpan(nodeExecuteSpan)
}

pipeline.interceptNodeExecutionError(interceptContext) { eventContext ->
Expand All @@ -214,8 +229,11 @@ public class OpenTelemetry {
nodeName = eventContext.node.name
)

val nodeExecuteSpan = spanProcessor.getSpanOrThrow<NodeExecuteSpan>(nodeExecuteSpanId)

spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan)
spanProcessor.endSpan(
spanId = nodeExecuteSpanId,
span = nodeExecuteSpan,
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message)
)
}
Expand Down Expand Up @@ -256,15 +274,11 @@ public class OpenTelemetry {
promptId = promptId,
)

// Start span
spanProcessor.startSpan(inferenceSpan)

// Add events to the InferenceSpan after the span is created
val eventsFromMessages = eventContext.prompt.messages.mapNotNull { message ->
when (message) {
is Message.User -> UserMessageEvent(provider, message, verbose = config.isVerbose)
is Message.System -> SystemMessageEvent(provider, message, verbose = config.isVerbose)
is Message.Assistant -> AssistantMessageEvent(provider, message, verbose = config.isVerbose)
is Message.Tool.Result -> {
ToolMessageEvent(
provider = provider,
Expand All @@ -278,6 +292,10 @@ public class OpenTelemetry {
}

inferenceSpan.addEvents(eventsFromMessages)

// Start span
spanAdapter?.onBeforeSpanStarted(inferenceSpan)
spanProcessor.startSpan(inferenceSpan)
}

pipeline.interceptAfterLLMCall(interceptContext) { eventContext ->
Expand Down Expand Up @@ -306,20 +324,21 @@ public class OpenTelemetry {
when (message) {
is Message.Assistant -> add(AssistantMessageEvent(provider, message, verbose = config.isVerbose))
is Message.Tool.Call -> add(
ChoiceEvent(provider, message, arguments = message.contentJson, verbose = config.isVerbose)
ChoiceEvent(provider, message, arguments = message.contentJson, index = 0, verbose = config.isVerbose)
)
}
}

eventContext.moderationResponse?.let { response ->
ModerationResponseEvent(provider, response, config.isVerbose)
add(ModerationResponseEvent(provider, response, config.isVerbose))
}
}

inferenceSpan.addEvents(eventsToAdd)

// Stop InferenceSpan
spanProcessor.endSpan(inferenceSpanId)
spanAdapter?.onBeforeSpanFinished(inferenceSpan)
spanProcessor.endSpan(inferenceSpan)
}

//endregion LLM Call
Expand Down Expand Up @@ -349,6 +368,7 @@ public class OpenTelemetry {
toolArgs = eventContext.toolArgs,
)

spanAdapter?.onBeforeSpanStarted(executeToolSpan)
spanProcessor.startSpan(executeToolSpan)
}

Expand All @@ -368,17 +388,17 @@ public class OpenTelemetry {
toolName = eventContext.tool.name
)

val executeToolSpan = spanProcessor.getSpanOrThrow<ExecuteToolSpan>(executeToolSpanId)

// End the ExecuteToolSpan span
val finishAttributes = buildList {
eventContext.result?.toStringDefault()?.let { result ->
add(SpanAttributes.Tool.OutputValue(result))
}
eventContext.result?.let { result ->
executeToolSpan.addAttribute(
attribute = SpanAttributes.Tool.OutputValue(output = result.toStringDefault())
)
}

spanProcessor.endSpan(
spanId = executeToolSpanId,
attributes = finishAttributes
)
spanAdapter?.onBeforeSpanFinished(span = executeToolSpan)
spanProcessor.endSpan(span = executeToolSpan)
}

pipeline.interceptToolCallFailure(interceptContext) { eventContext ->
Expand All @@ -397,14 +417,15 @@ public class OpenTelemetry {
toolName = eventContext.tool.name
)

val executeToolSpan = spanProcessor.getSpanOrThrow<ExecuteToolSpan>(executeToolSpanId)
executeToolSpan.addAttribute(
attribute = CommonAttributes.Error.Type(eventContext.throwable.message ?: "Unknown tool call error")
)

// End the ExecuteToolSpan span
spanAdapter?.onBeforeSpanFinished(executeToolSpan)
spanProcessor.endSpan(
spanId = executeToolSpanId,
attributes = listOf(
CommonAttributes.Error.Type(
eventContext.throwable.message ?: "Unknown tool call error"
)
),
span = executeToolSpan,
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message)
)
}
Expand All @@ -430,10 +451,15 @@ public class OpenTelemetry {
toolName = toolName
)

val executeToolSpan = spanProcessor.getSpanOrThrow<ExecuteToolSpan>(executeToolSpanId)
executeToolSpan.addAttribute(
attribute = CommonAttributes.Error.Type(eventContext.error)
)

// End the ExecuteToolSpan span
spanAdapter?.onBeforeSpanFinished(executeToolSpan)
spanProcessor.endSpan(
spanId = executeToolSpanId,
attributes = listOf(CommonAttributes.Error.Type(eventContext.error)),
span = executeToolSpan,
spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.error)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import ai.koog.agents.core.feature.config.FeatureConfig
import ai.koog.agents.features.opentelemetry.attribute.addAttributes
import ai.koog.agents.features.opentelemetry.integration.SpanAdapter
import io.github.oshai.kotlinlogging.KotlinLogging
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.common.Attributes
Expand Down Expand Up @@ -68,6 +69,8 @@

private var _verbose: Boolean = false

private var _spanAdapter: SpanAdapter? = null

/**
* Indicates whether verbose telemetry data is enabled.
*
Expand Down Expand Up @@ -122,6 +125,9 @@
public val serviceVersion: String
get() = _serviceVersion

internal val spanAdapter: SpanAdapter?
get() = _spanAdapter

/**
* Sets the service information for the OpenTelemetry configuration.
* This information is used to identify the service in telemetry data.
Expand Down Expand Up @@ -150,7 +156,7 @@
* @param processor A function that takes a SpanExporter and returns the [SpanProcessor].
* This allows defining custom logic for processing spans before they are exported.
*/
public fun addSpanProcessor(processor: (SpanExporter) -> SpanProcessor) {

Check warning on line 159 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `addSpanProcessor` coverage is below the threshold 50%
customSpanProcessorsCreator.add(processor)
}

Expand All @@ -164,7 +170,7 @@
* will be added to the resource.
* @param T The type of the values in the attribute map, which must be non-null.
*/
public fun <T> addResourceAttributes(attributes: Map<AttributeKey<T>, T>) where T : Any {

Check warning on line 173 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `addResourceAttributes` coverage is below the threshold 50%
customResourceAttributes.putAll(attributes)
}

Expand All @@ -174,7 +180,7 @@
*
* @param sampler The sampler instance to set for the OpenTelemetry configuration.
*/
public fun setSampler(sampler: Sampler) {

Check warning on line 183 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `setSampler` coverage is below the threshold 50%
_sampler = sampler
}

Expand Down Expand Up @@ -206,6 +212,18 @@
_sdk = sdk
}

/**
* Adds a custom span adapter for post-processing GenAI agent spans.
* The adapter can modify span data, add attributes/events, or perform other
* post-processing logic before spans are completed.
*
* @param adapter The ProcessSpanAdapter implementation that will handle
* post-processing of GenAI agent spans
*/
internal fun addSpanAdapter(adapter: SpanAdapter) {
_spanAdapter = adapter
}

//region Private Methods

private fun initializeOpenTelemetry(): OpenTelemetrySdk {
Expand Down Expand Up @@ -272,7 +290,7 @@
}
}

private fun SdkTracerProviderBuilder.addProcessors(exporter: SpanExporter) {

Check warning on line 293 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `addProcessors` coverage is below the threshold 50%
if (customSpanProcessorsCreator.isEmpty()) {
logger.debug {
"No custom span processors configured. Use batch span processor with ${exporter::class.simpleName} as an exporter."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package ai.koog.agents.features.opentelemetry.integration

import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan

/**
* Adapter abstract class for post-processing GenAI agent spans.
*
* This class allows customization of how GenAI agent spans are processed after they are created.
* Implementations can modify span data, add additional attributes or events, or perform any other
* post-processing logic needed before the span is completed.
*
* The abstract class provides a single method called after a span is created but before it is finished.
*/
internal abstract class SpanAdapter {

Check warning on line 14 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/SpanAdapter.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Class `SpanAdapter` coverage is below the threshold 50%

Check warning on line 14 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/SpanAdapter.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Constructor `SpanAdapter` coverage is below the threshold 50%

/**
* Invoked before the specified GenAIAgentSpan is started. This method allows implementations to
* perform any setup or customization required prior to the span being initialized and used.
*
* @param span The GenAI agent span to process
*/
open fun onBeforeSpanStarted(span: GenAIAgentSpan) { }

Check warning on line 22 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/SpanAdapter.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `onBeforeSpanStarted` coverage is below the threshold 50%

/**
* Invoked before the specified GenAIAgentSpan is finished. This method allows implementations
* to perform any final processing, modifications, or cleanup tasks required before the span
* is completed.
*
* @param span The GenAI agent span to process before it is finished.
*/
open fun onBeforeSpanFinished(span: GenAIAgentSpan) { }

Check warning on line 31 in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/SpanAdapter.kt

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Check Kotlin and Java source code coverage

Method `onBeforeSpanFinished` coverage is below the threshold 50%
}
Loading
Loading