diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEvent.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEvent.kt index 848a6d323..8af88ce36 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEvent.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEvent.kt @@ -10,6 +10,7 @@ internal class ChoiceEvent( provider: LLMProvider, private val message: Message.Response, private val arguments: JsonObject? = null, + val index: Int, override val verbose: Boolean = false, ) : GenAIAgentEvent { @@ -20,7 +21,7 @@ internal class ChoiceEvent( } override val bodyFields: List = buildList { - add(EventBodyFields.Index(0)) + add(EventBodyFields.Index(index)) when (message) { is Message.Assistant -> { diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/GenAIAgentEventExt.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/GenAIAgentEventExt.kt new file mode 100644 index 000000000..ac9d50f5c --- /dev/null +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/GenAIAgentEventExt.kt @@ -0,0 +1,22 @@ +package ai.koog.agents.features.opentelemetry.extension + +import ai.koog.agents.features.opentelemetry.attribute.Attribute +import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute +import ai.koog.agents.features.opentelemetry.event.EventBodyField +import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent +import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan + +internal fun GenAIAgentEvent.toSpanAttributes(span: GenAIAgentSpan, verbose: Boolean = false) { + // Convert event attributes to Span Attributes + attributes.forEach { attribute -> span.addAttribute(attribute) } + + // Convert Body Fields to Span Attributes + bodyFields.forEach { bodyField -> + val attribute = bodyField.toGenAIAttribute(verbose) + span.addAttribute(attribute) + } +} + +internal fun EventBodyField.toGenAIAttribute(verbose: Boolean): Attribute { + return CustomAttribute(key, value, verbose = verbose) +} diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/SpanExt.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/SpanExt.kt new file mode 100644 index 000000000..ef2ba8f3e --- /dev/null +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/extension/SpanExt.kt @@ -0,0 +1,52 @@ +package ai.koog.agents.features.opentelemetry.extension + +import ai.koog.agents.features.opentelemetry.attribute.Attribute +import ai.koog.agents.features.opentelemetry.attribute.toSdkAttributes +import ai.koog.agents.features.opentelemetry.event.EventBodyField +import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent +import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan +import ai.koog.agents.features.opentelemetry.span.SpanEndStatus +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.SpanBuilder +import io.opentelemetry.api.trace.StatusCode + +internal fun Span.setSpanStatus(endStatus: SpanEndStatus? = null) { + val statusCode = endStatus?.code ?: StatusCode.OK + val statusDescription = endStatus?.description ?: "" + this.setStatus(statusCode, statusDescription) +} + +internal fun SpanBuilder.setAttributes(attributes: List) { + setAllAttributes(attributes.toSdkAttributes()) +} + +internal fun Span.setAttributes(attributes: List) { + setAllAttributes(attributes.toSdkAttributes()) +} + +internal fun Span.setEvents(events: List) { + events.forEach { event -> + // The 'opentelemetry-java' SDK does not have support for event body fields at the moment. + // Pass body fields as attributes until an API is updated. + val attributes = buildList { + addAll(event.attributes) + if (event.bodyFields.isNotEmpty()) { + add(event.bodyFieldsAsAttribute()) + } + } + + addEvent(event.name, attributes.toSdkAttributes()) + } +} + +internal inline fun GenAIAgentSpan.eventBodyFieldToAttribute( + event: GenAIAgentEvent, + attributeCreate: (TBodyField) -> Attribute +) where TBodyField : EventBodyField { + event.bodyFields.filterIsInstance().forEach { bodyField -> + val attributeFromEvent = attributeCreate(bodyField) + this.addAttribute(attributeFromEvent) + } + + this.removeEvent(event) +} diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt index 7a5f6ef87..5d7754ffb 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt @@ -64,6 +64,7 @@ public class OpenTelemetry { val interceptContext = InterceptContext(this, OpenTelemetry()) val tracer = config.tracer val spanProcessor = SpanProcessor(tracer) + val spanAdapter = config.spanAdapter // Stop all unfinished spans on a process finish to report them Runtime.getRuntime().addShutdownHook( @@ -106,6 +107,7 @@ public class OpenTelemetry { strategyName = eventContext.strategy.name ) + spanAdapter?.onBeforeSpanStarted(invokeAgentSpan) spanProcessor.startSpan(invokeAgentSpan) } @@ -123,7 +125,10 @@ public class OpenTelemetry { agentId = eventContext.agentId, runId = eventContext.runId ) - spanProcessor.endSpan(spanId = invokeAgentSpanId) + + val invokeAgentSpan = spanProcessor.getSpanOrThrow(invokeAgentSpanId) + spanAdapter?.onBeforeSpanFinished(invokeAgentSpan) + spanProcessor.endSpan(span = invokeAgentSpan) } pipeline.interceptAgentRunError(interceptContext) { eventContext -> @@ -141,13 +146,16 @@ public class OpenTelemetry { runId = eventContext.runId ) - val finishAttributes = buildList { - add(SpanAttributes.Response.FinishReasons(listOf(SpanAttributes.Response.FinishReasonType.Error))) - } + val invokeAgentSpan = spanProcessor.getSpanOrThrow(invokeAgentSpanId) + invokeAgentSpan.addAttribute( + attribute = SpanAttributes.Response.FinishReasons( + listOf(SpanAttributes.Response.FinishReasonType.Error) + ) + ) + spanAdapter?.onBeforeSpanFinished(invokeAgentSpan) spanProcessor.endSpan( - spanId = invokeAgentSpanId, - attributes = finishAttributes, + span = invokeAgentSpan, spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message) ) } @@ -156,7 +164,10 @@ public class OpenTelemetry { logger.debug { "Execute OpenTelemetry before agent closed handler" } val agentSpanId = CreateAgentSpan.createId(agentId = eventContext.agentId) - spanProcessor.endSpan(agentSpanId) + val agentSpan = spanProcessor.getSpanOrThrow(agentSpanId) + + spanAdapter?.onBeforeSpanFinished(agentSpan) + spanProcessor.endSpan(span = agentSpan) } //endregion Agent @@ -182,6 +193,7 @@ public class OpenTelemetry { nodeName = eventContext.node.name, ) + spanAdapter?.onBeforeSpanStarted(nodeExecuteSpan) spanProcessor.startSpan(nodeExecuteSpan) } @@ -198,7 +210,10 @@ public class OpenTelemetry { nodeName = eventContext.node.name ) - spanProcessor.endSpan(nodeExecuteSpanId) + val nodeExecuteSpan = spanProcessor.getSpanOrThrow(nodeExecuteSpanId) + + spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan) + spanProcessor.endSpan(nodeExecuteSpan) } pipeline.interceptNodeExecutionError(interceptContext) { eventContext -> @@ -214,8 +229,11 @@ public class OpenTelemetry { nodeName = eventContext.node.name ) + val nodeExecuteSpan = spanProcessor.getSpanOrThrow(nodeExecuteSpanId) + + spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan) spanProcessor.endSpan( - spanId = nodeExecuteSpanId, + span = nodeExecuteSpan, spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message) ) } @@ -256,15 +274,11 @@ public class OpenTelemetry { promptId = promptId, ) - // Start span - spanProcessor.startSpan(inferenceSpan) - // Add events to the InferenceSpan after the span is created val eventsFromMessages = eventContext.prompt.messages.mapNotNull { message -> when (message) { is Message.User -> UserMessageEvent(provider, message, verbose = config.isVerbose) is Message.System -> SystemMessageEvent(provider, message, verbose = config.isVerbose) - is Message.Assistant -> AssistantMessageEvent(provider, message, verbose = config.isVerbose) is Message.Tool.Result -> { ToolMessageEvent( provider = provider, @@ -278,6 +292,10 @@ public class OpenTelemetry { } inferenceSpan.addEvents(eventsFromMessages) + + // Start span + spanAdapter?.onBeforeSpanStarted(inferenceSpan) + spanProcessor.startSpan(inferenceSpan) } pipeline.interceptAfterLLMCall(interceptContext) { eventContext -> @@ -306,20 +324,21 @@ public class OpenTelemetry { when (message) { is Message.Assistant -> add(AssistantMessageEvent(provider, message, verbose = config.isVerbose)) is Message.Tool.Call -> add( - ChoiceEvent(provider, message, arguments = message.contentJson, verbose = config.isVerbose) + ChoiceEvent(provider, message, arguments = message.contentJson, index = 0, verbose = config.isVerbose) ) } } eventContext.moderationResponse?.let { response -> - ModerationResponseEvent(provider, response, config.isVerbose) + add(ModerationResponseEvent(provider, response, config.isVerbose)) } } inferenceSpan.addEvents(eventsToAdd) // Stop InferenceSpan - spanProcessor.endSpan(inferenceSpanId) + spanAdapter?.onBeforeSpanFinished(inferenceSpan) + spanProcessor.endSpan(inferenceSpan) } //endregion LLM Call @@ -349,6 +368,7 @@ public class OpenTelemetry { toolArgs = eventContext.toolArgs, ) + spanAdapter?.onBeforeSpanStarted(executeToolSpan) spanProcessor.startSpan(executeToolSpan) } @@ -368,17 +388,17 @@ public class OpenTelemetry { toolName = eventContext.tool.name ) + val executeToolSpan = spanProcessor.getSpanOrThrow(executeToolSpanId) + // End the ExecuteToolSpan span - val finishAttributes = buildList { - eventContext.result?.toStringDefault()?.let { result -> - add(SpanAttributes.Tool.OutputValue(result)) - } + eventContext.result?.let { result -> + executeToolSpan.addAttribute( + attribute = SpanAttributes.Tool.OutputValue(output = result.toStringDefault()) + ) } - spanProcessor.endSpan( - spanId = executeToolSpanId, - attributes = finishAttributes - ) + spanAdapter?.onBeforeSpanFinished(span = executeToolSpan) + spanProcessor.endSpan(span = executeToolSpan) } pipeline.interceptToolCallFailure(interceptContext) { eventContext -> @@ -397,14 +417,15 @@ public class OpenTelemetry { toolName = eventContext.tool.name ) + val executeToolSpan = spanProcessor.getSpanOrThrow(executeToolSpanId) + executeToolSpan.addAttribute( + attribute = CommonAttributes.Error.Type(eventContext.throwable.message ?: "Unknown tool call error") + ) + // End the ExecuteToolSpan span + spanAdapter?.onBeforeSpanFinished(executeToolSpan) spanProcessor.endSpan( - spanId = executeToolSpanId, - attributes = listOf( - CommonAttributes.Error.Type( - eventContext.throwable.message ?: "Unknown tool call error" - ) - ), + span = executeToolSpan, spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.throwable.message) ) } @@ -430,10 +451,15 @@ public class OpenTelemetry { toolName = toolName ) + val executeToolSpan = spanProcessor.getSpanOrThrow(executeToolSpanId) + executeToolSpan.addAttribute( + attribute = CommonAttributes.Error.Type(eventContext.error) + ) + // End the ExecuteToolSpan span + spanAdapter?.onBeforeSpanFinished(executeToolSpan) spanProcessor.endSpan( - spanId = executeToolSpanId, - attributes = listOf(CommonAttributes.Error.Type(eventContext.error)), + span = executeToolSpan, spanEndStatus = SpanEndStatus(code = StatusCode.ERROR, description = eventContext.error) ) } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt index 6d49f67b4..5d26a5be8 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryConfig.kt @@ -2,6 +2,7 @@ package ai.koog.agents.features.opentelemetry.feature import ai.koog.agents.core.feature.config.FeatureConfig import ai.koog.agents.features.opentelemetry.attribute.addAttributes +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import io.github.oshai.kotlinlogging.KotlinLogging import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.Attributes @@ -68,6 +69,8 @@ public class OpenTelemetryConfig : FeatureConfig() { private var _verbose: Boolean = false + private var _spanAdapter: SpanAdapter? = null + /** * Indicates whether verbose telemetry data is enabled. * @@ -122,6 +125,9 @@ public class OpenTelemetryConfig : FeatureConfig() { public val serviceVersion: String get() = _serviceVersion + internal val spanAdapter: SpanAdapter? + get() = _spanAdapter + /** * Sets the service information for the OpenTelemetry configuration. * This information is used to identify the service in telemetry data. @@ -206,6 +212,18 @@ public class OpenTelemetryConfig : FeatureConfig() { _sdk = sdk } + /** + * Adds a custom span adapter for post-processing GenAI agent spans. + * The adapter can modify span data, add attributes/events, or perform other + * post-processing logic before spans are completed. + * + * @param adapter The ProcessSpanAdapter implementation that will handle + * post-processing of GenAI agent spans + */ + internal fun addSpanAdapter(adapter: SpanAdapter) { + _spanAdapter = adapter + } + //region Private Methods private fun initializeOpenTelemetry(): OpenTelemetrySdk { diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/SpanAdapter.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/SpanAdapter.kt new file mode 100644 index 000000000..64a59a2ea --- /dev/null +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/SpanAdapter.kt @@ -0,0 +1,32 @@ +package ai.koog.agents.features.opentelemetry.integration + +import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan + +/** + * Adapter abstract class for post-processing GenAI agent spans. + * + * This class allows customization of how GenAI agent spans are processed after they are created. + * Implementations can modify span data, add additional attributes or events, or perform any other + * post-processing logic needed before the span is completed. + * + * The abstract class provides a single method called after a span is created but before it is finished. + */ +internal abstract class SpanAdapter { + + /** + * Invoked before the specified GenAIAgentSpan is started. This method allows implementations to + * perform any setup or customization required prior to the span being initialized and used. + * + * @param span The GenAI agent span to process + */ + open fun onBeforeSpanStarted(span: GenAIAgentSpan) { } + + /** + * Invoked before the specified GenAIAgentSpan is finished. This method allows implementations + * to perform any final processing, modifications, or cleanup tasks required before the span + * is completed. + * + * @param span The GenAI agent span to process before it is finished. + */ + open fun onBeforeSpanFinished(span: GenAIAgentSpan) { } +} diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Langfuse.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/langfuse/Langfuse.kt similarity index 86% rename from agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Langfuse.kt rename to agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/langfuse/Langfuse.kt index 8a4c1b958..76e9d7d6d 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Langfuse.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/langfuse/Langfuse.kt @@ -1,4 +1,4 @@ -package ai.koog.agents.features.opentelemetry.integrations +package ai.koog.agents.features.opentelemetry.integration.langfuse import ai.koog.agents.features.opentelemetry.feature.OpenTelemetryConfig import io.github.oshai.kotlinlogging.KotlinLogging @@ -11,7 +11,7 @@ import kotlin.time.Duration.Companion.seconds /** * Configure an OpenTelemetry span exporter that sends data to [Langfuse](https://langfuse.com/). * - * @param langfuseUrl the base URL of the Langfuse instance. If not set is retrieved from `LANGFUSE_URL` environment variable. Defaults to [https://cloud.langfuse.com](https://cloud.langfuse.com). + * @param langfuseUrl the base URL of the Langfuse instance. If not set is retrieved from `LANGFUSE_HOST` environment variable. Defaults to [https://cloud.langfuse.com](https://cloud.langfuse.com). * @param langfusePublicKey if not set is retrieved from `LANGFUSE_PUBLIC_KEY` environment variable. * @param langfuseSecretKey if not set is retrieved from `LANGFUSE_SECRET_KEY` environment variable. * @param timeout OpenTelemetry SpanExporter timeout. See [io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder.setTimeout]. @@ -26,7 +26,7 @@ public fun OpenTelemetryConfig.addLangfuseExporter( langfuseSecretKey: String? = null, timeout: Duration = 10.seconds, ) { - val url = langfuseUrl ?: System.getenv()["LANGFUSE_URL"] ?: "https://cloud.langfuse.com" + val url = langfuseUrl ?: System.getenv()["LANGFUSE_HOST"] ?: "https://cloud.langfuse.com" logger.debug { "Configured endpoint for Langfuse telemetry: $url" } @@ -45,6 +45,8 @@ public fun OpenTelemetryConfig.addLangfuseExporter( .addHeader("Authorization", "Basic $auth") .build() ) + + addSpanAdapter(LangfuseSpanAdapter) } private val logger = KotlinLogging.logger {} diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/langfuse/LangfuseSpanAdapter.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/langfuse/LangfuseSpanAdapter.kt new file mode 100644 index 000000000..38204e31a --- /dev/null +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/langfuse/LangfuseSpanAdapter.kt @@ -0,0 +1,62 @@ +package ai.koog.agents.features.opentelemetry.integration.langfuse + +import ai.koog.agents.core.annotation.InternalAgentsApi +import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute +import ai.koog.agents.features.opentelemetry.event.AssistantMessageEvent +import ai.koog.agents.features.opentelemetry.event.ChoiceEvent +import ai.koog.agents.features.opentelemetry.event.EventBodyFields +import ai.koog.agents.features.opentelemetry.event.UserMessageEvent +import ai.koog.agents.features.opentelemetry.extension.eventBodyFieldToAttribute +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter +import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan +import ai.koog.agents.features.opentelemetry.span.InferenceSpan + +@OptIn(InternalAgentsApi::class) +internal object LangfuseSpanAdapter : SpanAdapter() { + + override fun onBeforeSpanFinished(span: GenAIAgentSpan) { + when (span) { + is InferenceSpan -> span.prepareSpanAttributes() + } + } + + //region Private Methods + + private fun InferenceSpan.prepareSpanAttributes() { + this.events.forEach { event -> + when (event) { + is AssistantMessageEvent -> { + this.eventBodyFieldToAttribute(event) { role -> + CustomAttribute("gen_ai.completion.${role.key}", role.value) + } + + this.eventBodyFieldToAttribute(event) { content -> + CustomAttribute("gen_ai.completion.content", content.value) + } + } + + is ChoiceEvent -> { + val index = event.index + + this.eventBodyFieldToAttribute(event) { role -> + CustomAttribute("gen_ai.$index.${role.key}", role.value) + } + + this.eventBodyFieldToAttribute(event) { content -> + CustomAttribute("gen_ai.completion.$index.content", content.value) + } + } + + is UserMessageEvent -> { + this.eventBodyFieldToAttribute(event) { content -> + CustomAttribute("gen_ai.completion.content", content.value) + } + } + + else -> { } + } + } + } + + //endregion Private Methods +} diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Weave.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/Weave.kt similarity index 94% rename from agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Weave.kt rename to agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/Weave.kt index 8c4ea877c..893e58975 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integrations/Weave.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/Weave.kt @@ -1,4 +1,4 @@ -package ai.koog.agents.features.opentelemetry.integrations +package ai.koog.agents.features.opentelemetry.integration.weave import ai.koog.agents.features.opentelemetry.feature.OpenTelemetryConfig import io.github.oshai.kotlinlogging.KotlinLogging @@ -45,6 +45,8 @@ public fun OpenTelemetryConfig.addWeaveExporter( .addHeader("Authorization", "Basic $auth") .build() ) + + addSpanAdapter(WeaveSpanAdapter) } -private val logger = KotlinLogging.logger {} +private val logger = KotlinLogging.logger { } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/WeaveSpanAdapter.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/WeaveSpanAdapter.kt new file mode 100644 index 000000000..649c515ac --- /dev/null +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/WeaveSpanAdapter.kt @@ -0,0 +1,62 @@ +package ai.koog.agents.features.opentelemetry.integration.weave + +import ai.koog.agents.core.annotation.InternalAgentsApi +import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute +import ai.koog.agents.features.opentelemetry.event.AssistantMessageEvent +import ai.koog.agents.features.opentelemetry.event.ChoiceEvent +import ai.koog.agents.features.opentelemetry.event.EventBodyFields +import ai.koog.agents.features.opentelemetry.event.UserMessageEvent +import ai.koog.agents.features.opentelemetry.extension.eventBodyFieldToAttribute +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter +import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan +import ai.koog.agents.features.opentelemetry.span.InferenceSpan + +@OptIn(InternalAgentsApi::class) +internal object WeaveSpanAdapter : SpanAdapter() { + + override fun onBeforeSpanFinished(span: GenAIAgentSpan) { + when (span) { + is InferenceSpan -> span.prepareSpanAttributes() + } + } + + //region Private Methods + + private fun InferenceSpan.prepareSpanAttributes() { + this.events.forEach { event -> + when (event) { + is AssistantMessageEvent -> { + this.eventBodyFieldToAttribute(event) { role -> + CustomAttribute("gen_ai.completion.${role.key}", role.value) + } + + this.eventBodyFieldToAttribute(event) { content -> + CustomAttribute("gen_ai.completion.content", content.value) + } + } + + is ChoiceEvent -> { + val index = event.index + + this.eventBodyFieldToAttribute(event) { role -> + CustomAttribute("gen_ai.$index.${role.key}", role.value) + } + + this.eventBodyFieldToAttribute(event) { content -> + CustomAttribute("gen_ai.completion.$index.content", content.value) + } + } + + is UserMessageEvent -> { + this.eventBodyFieldToAttribute(event) { content -> + CustomAttribute("gen_ai.completion.content", content.value) + } + } + + else -> { } + } + } + } + + //endregion Private Methods +} diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/CreateAgentSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/CreateAgentSpan.kt index df82405de..19e652cf1 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/CreateAgentSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/CreateAgentSpan.kt @@ -1,6 +1,5 @@ package ai.koog.agents.features.opentelemetry.span -import ai.koog.agents.features.opentelemetry.attribute.Attribute import ai.koog.agents.features.opentelemetry.attribute.CommonAttributes import ai.koog.agents.features.opentelemetry.attribute.SpanAttributes import ai.koog.prompt.llm.LLModel @@ -37,17 +36,17 @@ internal class CreateAgentSpan( * - server.port (conditional/required) * - server.address (recommended) */ - override val attributes: List = buildList { + init { // gen_ai.operation.name - add(SpanAttributes.Operation.Name(SpanAttributes.Operation.OperationNameType.CREATE_AGENT)) + addAttribute(SpanAttributes.Operation.Name(SpanAttributes.Operation.OperationNameType.CREATE_AGENT)) // gen_ai.system - add(CommonAttributes.System(model.provider)) + addAttribute(CommonAttributes.System(model.provider)) // gen_ai.agent.id - add(SpanAttributes.Agent.Id(agentId)) + addAttribute(SpanAttributes.Agent.Id(agentId)) // gen_ai.request.model - add(SpanAttributes.Request.Model(model)) + addAttribute(SpanAttributes.Request.Model(model)) } } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/ExecuteToolSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/ExecuteToolSpan.kt index 2c76d23bb..e35920263 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/ExecuteToolSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/ExecuteToolSpan.kt @@ -3,7 +3,6 @@ package ai.koog.agents.features.opentelemetry.span import ai.koog.agents.core.tools.Tool import ai.koog.agents.core.tools.ToolArgs import ai.koog.agents.core.tools.ToolResult -import ai.koog.agents.features.opentelemetry.attribute.Attribute import ai.koog.agents.features.opentelemetry.attribute.SpanAttributes import io.opentelemetry.api.trace.SpanKind @@ -12,7 +11,7 @@ import io.opentelemetry.api.trace.SpanKind */ internal class ExecuteToolSpan( parent: NodeExecuteSpan, - private val tool: Tool<*, *>, + tool: Tool<*, *>, private val toolArgs: ToolArgs ) : GenAIAgentSpan(parent) { @@ -38,17 +37,17 @@ internal class ExecuteToolSpan( * - gen_ai.tool.description (recommended) * - gen_ai.tool.name (recommended) */ - override val attributes: List = buildList { + init { // gen_ai.tool.description - add(SpanAttributes.Tool.Description(description = tool.descriptor.description)) + addAttribute(SpanAttributes.Tool.Description(description = tool.descriptor.description)) // gen_ai.tool.name - add(SpanAttributes.Tool.Name(name = tool.name)) + addAttribute(SpanAttributes.Tool.Name(name = tool.name)) - // Tool arguments + // Tool arguments custom attribute @Suppress("UNCHECKED_CAST") (tool as? Tool)?.let { tool -> - add(SpanAttributes.Tool.InputValue(tool.encodeArgsToString(toolArgs))) + addAttribute(SpanAttributes.Tool.InputValue(tool.encodeArgsToString(toolArgs))) } } } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpan.kt index f8ad6b861..7afee611c 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpan.kt @@ -1,71 +1,107 @@ package ai.koog.agents.features.opentelemetry.span import ai.koog.agents.features.opentelemetry.attribute.Attribute -import ai.koog.agents.features.opentelemetry.attribute.toSdkAttributes import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent -import io.github.oshai.kotlinlogging.KotlinLogging import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.SpanKind import io.opentelemetry.context.Context +/** + * Represents an abstract base class for a GenAI agent span in a trace. + * A span represents a logical unit of work or operation within a trace and is + * responsible for managing associated metadata, such as context, attributes, and events. + * + * @property parent The parent span. Null if this span is a root span. + */ internal abstract class GenAIAgentSpan( val parent: GenAIAgentSpan?, ) { - companion object { - private val logger = KotlinLogging.logger { } - } - private var _context: Context? = null private var _span: Span? = null + /** + * Represents the context associated with the current span. The context provides + * metadata and state information required to manage and propagate span information + * effectively within the tracing framework. + */ var context: Context get() = _context ?: error("Context for span '$spanId' is not initialized") set(value) { _context = value } + /** + * Represents the current active span within the `GenAIAgentSpan` context. + * The span is initialized and managed as part of the tracing process. + */ var span: Span get() = _span ?: error("Span '$spanId' is not started") set(value) { _span = value } + /** + * The name of the current span derived by removing the parent span ID prefix (if present) + * from the current span ID and trimming leading dots. Represents a more human-readable + * and simplified identifier for the current trace span. + */ val name: String get() = spanId.removePrefix(parent?.spanId ?: "").trimStart('.') + /** + * Represents the kind of span that is being created or used. + * + * This property identifies the role and context of the span within a trace, + * following predefined categories in OpenTelemetry's `SpanKind` enumeration. + */ open val kind: SpanKind = SpanKind.CLIENT + /** + * The unique identifier for the span, providing a means to track and distinguish spans. + */ abstract val spanId: String - abstract val attributes: List + private val _attributes = mutableListOf() + + /** + * Provides a list of attributes associated with the span. + * These attributes contain metadata and additional information about the span. + */ + val attributes: List + get() = _attributes + + private val _events = mutableListOf() - val events: Set + /** + * Provides access to the list of events associated with this span. + * The events represent specific occurrences or milestones within the context of this span. + */ + val events: List get() = _events - private val _events: MutableSet = mutableSetOf() + fun addAttribute(attribute: Attribute) { + _attributes.add(attribute) + } + + fun addAttributes(attributes: List) { + _attributes.addAll(attributes) + } + + fun removeAttribute(attribute: Attribute): Boolean { + return _attributes.remove(attribute) + } + + fun addEvent(event: GenAIAgentEvent) { + _events.add(event) + } fun addEvents(events: List) { - events.forEach { event -> - logger.debug { "Adding event '${event.name}' to span '$spanId'" } - - if (_events.contains(event)) { - logger.warn { "Event '${event.name}' already added to span '$spanId'" } - return@forEach - } - - // The 'opentelemetry-java' SDK does not have support for event body fields at the moment. - // Pass body fields as attributes until an API is updated. - val attributes = buildList { - addAll(event.attributes) - if (event.bodyFields.isNotEmpty()) { - add(event.bodyFieldsAsAttribute()) - } - } - - span.addEvent(event.name, attributes.toSdkAttributes()) - } _events.addAll(events) } + + fun removeEvent(event: GenAIAgentEvent): Boolean { + return _events.remove(event) + } } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InferenceSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InferenceSpan.kt index f044ba2e6..73ae3c768 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InferenceSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InferenceSpan.kt @@ -1,6 +1,5 @@ package ai.koog.agents.features.opentelemetry.span -import ai.koog.agents.features.opentelemetry.attribute.Attribute import ai.koog.agents.features.opentelemetry.attribute.CommonAttributes import ai.koog.agents.features.opentelemetry.attribute.SpanAttributes import ai.koog.prompt.llm.LLMProvider @@ -12,10 +11,10 @@ import io.opentelemetry.api.trace.SpanKind */ internal class InferenceSpan( parent: NodeExecuteSpan, - private val provider: LLMProvider, - private val runId: String, - private val model: LLModel, - private val temperature: Double, + provider: LLMProvider, + runId: String, + model: LLModel, + temperature: Double, private val promptId: String, ) : GenAIAgentSpan(parent) { @@ -59,20 +58,20 @@ internal class InferenceSpan( * - gen_ai.usage.output_tokens (recommended) * - server.address (recommended) */ - override val attributes: List = buildList { + init { // gen_ai.operation.name - add(SpanAttributes.Operation.Name(SpanAttributes.Operation.OperationNameType.CHAT)) + addAttribute(SpanAttributes.Operation.Name(SpanAttributes.Operation.OperationNameType.CHAT)) // gen_ai.system - add(CommonAttributes.System(provider)) + addAttribute(CommonAttributes.System(provider)) // gen_ai.conversation.id - add(SpanAttributes.Conversation.Id(runId)) + addAttribute(SpanAttributes.Conversation.Id(runId)) // gen_ai.request.model - add(SpanAttributes.Request.Model(model)) + addAttribute(SpanAttributes.Request.Model(model)) // gen_ai.request.temperature - add(SpanAttributes.Request.Temperature(temperature)) + addAttribute(SpanAttributes.Request.Temperature(temperature)) } } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InvokeAgentSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InvokeAgentSpan.kt index b10433905..cfd833eb5 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InvokeAgentSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/InvokeAgentSpan.kt @@ -1,6 +1,5 @@ package ai.koog.agents.features.opentelemetry.span -import ai.koog.agents.features.opentelemetry.attribute.Attribute import ai.koog.agents.features.opentelemetry.attribute.CommonAttributes import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute import ai.koog.agents.features.opentelemetry.attribute.SpanAttributes @@ -12,10 +11,10 @@ import io.opentelemetry.api.trace.SpanKind */ internal class InvokeAgentSpan( parent: CreateAgentSpan, - private val provider: LLMProvider, - private val runId: String, - private val agentId: String, - private val strategyName: String, + provider: LLMProvider, + runId: String, + agentId: String, + strategyName: String, ) : GenAIAgentSpan(parent) { companion object { @@ -61,20 +60,20 @@ internal class InvokeAgentSpan( * - gen_ai.usage.output_tokens (recommended) * - server.address (recommended) */ - override val attributes: List = buildList { + init { // gen_ai.operation.name - add(SpanAttributes.Operation.Name(SpanAttributes.Operation.OperationNameType.INVOKE_AGENT)) + addAttribute(SpanAttributes.Operation.Name(SpanAttributes.Operation.OperationNameType.INVOKE_AGENT)) // gen_ai.system - add(CommonAttributes.System(provider)) + addAttribute(CommonAttributes.System(provider)) // gen_ai.agent.id - add(SpanAttributes.Agent.Id(agentId)) + addAttribute(SpanAttributes.Agent.Id(agentId)) // gen_ai.conversation.id - add(SpanAttributes.Conversation.Id(runId)) + addAttribute(SpanAttributes.Conversation.Id(runId)) // custom: strategy name - add(CustomAttribute("koog.agent.strategy.name", strategyName)) + addAttribute(CustomAttribute("koog.agent.strategy.name", strategyName)) } } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/NodeExecuteSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/NodeExecuteSpan.kt index de94b4808..f2106ea94 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/NodeExecuteSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/NodeExecuteSpan.kt @@ -1,6 +1,5 @@ package ai.koog.agents.features.opentelemetry.span -import ai.koog.agents.features.opentelemetry.attribute.Attribute import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute import ai.koog.agents.features.opentelemetry.attribute.SpanAttributes import io.opentelemetry.api.trace.SpanKind @@ -12,8 +11,8 @@ import io.opentelemetry.api.trace.SpanKind */ internal class NodeExecuteSpan( parent: InvokeAgentSpan, - private val runId: String, - private val nodeName: String, + runId: String, + nodeName: String, ) : GenAIAgentSpan(parent) { companion object { @@ -34,8 +33,8 @@ internal class NodeExecuteSpan( * Note: Node Execute Span is not defined in the Open Telemetry Semantic Convention. * It is a custom span used to show a structure of Koog events */ - override val attributes: List = buildList { - add(SpanAttributes.Conversation.Id(runId)) - add(CustomAttribute("koog.node.name", nodeName)) + init { + addAttribute(SpanAttributes.Conversation.Id(runId)) + addAttribute(CustomAttribute("koog.node.name", nodeName)) } } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessor.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessor.kt index e698aff27..338d84745 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessor.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessor.kt @@ -1,11 +1,10 @@ package ai.koog.agents.features.opentelemetry.span -import ai.koog.agents.features.opentelemetry.attribute.Attribute -import ai.koog.agents.features.opentelemetry.attribute.toSdkAttributes import ai.koog.agents.features.opentelemetry.event.GenAIAgentEvent +import ai.koog.agents.features.opentelemetry.extension.setAttributes +import ai.koog.agents.features.opentelemetry.extension.setEvents +import ai.koog.agents.features.opentelemetry.extension.setSpanStatus import io.github.oshai.kotlinlogging.KotlinLogging -import io.opentelemetry.api.trace.Span -import io.opentelemetry.api.trace.SpanBuilder import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.api.trace.Tracer import io.opentelemetry.context.Context @@ -64,29 +63,22 @@ internal class SpanProcessor(private val tracer: Tracer) { } fun endSpan( - spanId: String, - attributes: List = emptyList(), + span: GenAIAgentSpan, spanEndStatus: SpanEndStatus? = null ) { - logger.debug { "Finishing the span (id: $spanId)" } - - spansLock.write { - val existingSpan = _spans[spanId] - ?: error("Span with id '$spanId' not found. Make sure span was started or was not finished previously") - - logger.debug { "Finishing the span (name: $${existingSpan.name}, id: ${existingSpan.spanId})" } + logger.debug { "Finishing the span (id: ${span.spanId})" } - val spanToFinish = existingSpan.span + val spanToFinish = span.span - spanToFinish.setAttributes(attributes) - spanToFinish.setSpanStatus(spanEndStatus) - spanToFinish.end() + spanToFinish.setAttributes(span.attributes) + spanToFinish.setEvents(span.events) + spanToFinish.setSpanStatus(spanEndStatus) + spanToFinish.end() - val removedSpan = _spans.remove(spanId) - if (removedSpan == null) { - logger.warn { - "Span with id '$spanId' not found. Make sure you do not delete span with same id several times" - } + val removedSpan = _spans.remove(span.spanId) + if (removedSpan == null) { + logger.warn { + "Span with id '${span.spanId}' not found. Make sure you do not delete span with same id several times" } } } @@ -95,25 +87,24 @@ internal class SpanProcessor(private val tracer: Tracer) { return _spans[spanId] as? T } - inline fun getSpanOrThrow(id: String): T where T : GenAIAgentSpan { - val span = _spans[id] ?: error("Span with id: $id not found") + inline fun getSpanOrThrow(spanId: String): T where T : GenAIAgentSpan { + val span = _spans[spanId] ?: error("Span with id: $spanId not found") return span as? T ?: error( - "Span with id <$id> is not of expected type. Expected: <${T::class.simpleName}>, actual: <${span::class.simpleName}>" + "Span with id <$spanId> is not of expected type. Expected: <${T::class.simpleName}>, actual: <${span::class.simpleName}>" ) } - fun endUnfinishedSpans(filter: (spanId: String) -> Boolean = { true }) { - _spans.keys - .filter { spanId -> - val isRequireFinish = filter(spanId) + fun endUnfinishedSpans(filter: (GenAIAgentSpan) -> Boolean = { true }) { + _spans.values + .filter { span -> + val isRequireFinish = filter(span) isRequireFinish } - .forEach { spanId -> - logger.warn { "Force close span with id: $spanId" } + .forEach { span -> + logger.warn { "Force close span with id: ${span.spanId}" } endSpan( - spanId = spanId, - attributes = emptyList(), + span = span, spanEndStatus = SpanEndStatus(StatusCode.UNSET) ) } @@ -123,7 +114,7 @@ internal class SpanProcessor(private val tracer: Tracer) { val agentRunSpanId = InvokeAgentSpan.createId(agentId, runId) val agentSpanId = CreateAgentSpan.createId(agentId) - endUnfinishedSpans(filter = { id -> id != agentSpanId && id != agentRunSpanId }) + endUnfinishedSpans(filter = { span -> span.spanId != agentSpanId && span.spanId != agentRunSpanId }) } //region Private Methods @@ -139,19 +130,5 @@ internal class SpanProcessor(private val tracer: Tracer) { } } - private fun Span.setSpanStatus(endStatus: SpanEndStatus? = null) { - val statusCode = endStatus?.code ?: StatusCode.OK - val statusDescription = endStatus?.description ?: "" - this.setStatus(statusCode, statusDescription) - } - - private fun SpanBuilder.setAttributes(attributes: List) { - setAllAttributes(attributes.toSdkAttributes()) - } - - private fun Span.setAttributes(attributes: List) { - setAllAttributes(attributes.toSdkAttributes()) - } - //endregion Private Methods } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEventTest.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEventTest.kt index 2af456323..a39ca145d 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEventTest.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/event/ChoiceEventTest.kt @@ -24,6 +24,7 @@ class ChoiceEventTest { val choiceEvent = ChoiceEvent( provider = llmProvider, message = expectedMessage, + index = 0, verbose = false, ) @@ -44,6 +45,7 @@ class ChoiceEventTest { val choiceEvent = ChoiceEvent( provider = llmProvider, message = expectedMessage, + index = 0, verbose = true, ) @@ -67,6 +69,7 @@ class ChoiceEventTest { val choiceEvent = ChoiceEvent( provider = MockLLMProvider(), message = expectedMessage, + index = 0, verbose = false, ) @@ -86,6 +89,7 @@ class ChoiceEventTest { val choiceEvent = ChoiceEvent( provider = MockLLMProvider(), message = expectedMessage, + index = 0, verbose = false, ) @@ -106,6 +110,7 @@ class ChoiceEventTest { val choiceEvent = ChoiceEvent( provider = MockLLMProvider(), message = expectedMessage, + index = 0, verbose = false, ) @@ -125,6 +130,7 @@ class ChoiceEventTest { val choiceEvent = ChoiceEvent( provider = MockLLMProvider(), message = expectedMessage, + index = 0, verbose = true, ) @@ -148,6 +154,7 @@ class ChoiceEventTest { val choiceEvent = ChoiceEvent( provider = MockLLMProvider(), message = expectedMessage, + index = 0, verbose = true, ) @@ -177,6 +184,7 @@ class ChoiceEventTest { provider = MockLLMProvider(), message = expectedMessage, arguments = args, + index = 0, verbose = true, ) @@ -203,6 +211,7 @@ class ChoiceEventTest { provider = MockLLMProvider(), message = expectedMessage, arguments = args, + index = 0, verbose = false, ) @@ -224,6 +233,7 @@ class ChoiceEventTest { provider = MockLLMProvider(), message = expectedMessage, arguments = args, + index = 0, verbose = true, ) @@ -246,6 +256,7 @@ class ChoiceEventTest { provider = MockLLMProvider(), message = expectedMessage, arguments = args, + index = 0, verbose = false, ) diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryTest.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryTest.kt index 806e4a547..b0a8d2bfc 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryTest.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryTest.kt @@ -9,9 +9,13 @@ import ai.koog.agents.core.dsl.extension.onAssistantMessage import ai.koog.agents.core.dsl.extension.onToolCall import ai.koog.agents.core.tools.ToolRegistry import ai.koog.agents.features.opentelemetry.OpenTelemetryTestAPI.createAgent +import ai.koog.agents.features.opentelemetry.attribute.CustomAttribute +import ai.koog.agents.features.opentelemetry.attribute.SpanAttributes import ai.koog.agents.features.opentelemetry.attribute.SpanAttributes.Response.FinishReasonType +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import ai.koog.agents.features.opentelemetry.mock.MockSpanExporter import ai.koog.agents.features.opentelemetry.mock.TestGetWeatherTool +import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan import ai.koog.agents.testing.tools.getMockExecutor import ai.koog.agents.testing.tools.mockLLMAnswer import ai.koog.agents.utils.use @@ -677,6 +681,7 @@ class OpenTelemetryTest { val agent = createAgent( agentId = agentId, strategy = strategy, + systemPrompt = systemPrompt, promptId = promptId, toolRegistry = toolRegistry, promptExecutor = mockExecutor, @@ -742,6 +747,9 @@ class OpenTelemetryTest { "gen_ai.request.temperature" to temperature, ), "events" to mapOf( + "gen_ai.system.message" to mapOf( + "gen_ai.system" to model.provider.id, + ), "gen_ai.assistant.message" to mapOf( "gen_ai.system" to model.provider.id, ), @@ -793,6 +801,9 @@ class OpenTelemetryTest { "gen_ai.request.temperature" to temperature, ), "events" to mapOf( + "gen_ai.system.message" to mapOf( + "gen_ai.system" to model.provider.id, + ), "gen_ai.user.message" to mapOf( "gen_ai.system" to model.provider.id, ), @@ -1099,6 +1110,89 @@ class OpenTelemetryTest { } } + @Test + fun `test span adapter applies custom attribute to invoke agent span`() = runBlocking { + MockSpanExporter().use { mockExporter -> + + val userPrompt = "What's the weather in Paris?" + val agentId = "test-agent-id" + val promptId = "test-prompt-id" + val testClock = Clock.System + val model = OpenAIModels.Chat.GPT4o + + val strategyName = "test-strategy" + + val strategy = strategy(strategyName) { + val nodeSendInput by nodeLLMRequest("test-llm-call") + + edge(nodeStart forwardTo nodeSendInput) + edge(nodeSendInput forwardTo nodeFinish onAssistantMessage { true }) + } + + val mockResponse = "Sunny" + val mockExecutor = getMockExecutor(clock = testClock) { + mockLLMAnswer(mockResponse) onRequestEquals userPrompt + } + + // Custom SpanAdapter that adds a test attribute to each processed span + val customBeforeStartAttribute = CustomAttribute(key = "test.adapter.before.start.key", value = "test-value-before-start") + val customBeforeFinishAttribute = CustomAttribute(key = "test.adapter.before.finish.key", value = "test-value-before-finish") + val adapter = object : SpanAdapter() { + override fun onBeforeSpanStarted(span: GenAIAgentSpan) { + span.addAttribute(customBeforeStartAttribute) + } + + override fun onBeforeSpanFinished(span: GenAIAgentSpan) { + span.addAttribute(customBeforeFinishAttribute) + } + } + + createAgent( + agentId = agentId, + strategy = strategy, + promptId = promptId, + promptExecutor = mockExecutor, + model = model, + clock = testClock, + ) { + install(OpenTelemetry) { + addSpanExporter(mockExporter) + + // Add custom span adapter + addSpanAdapter(adapter) + } + }.use { agent -> + agent.run(userPrompt) + } + + val collectedSpans = mockExporter.collectedSpans + assertTrue(collectedSpans.isNotEmpty(), "Spans should be created during agent execution") + + val actualInvokeAgentSpans = collectedSpans.filter { span -> span.name.startsWith("run.") } + assertEquals(1, actualInvokeAgentSpans.size, "Invoke agent span should be present") + + val expectedInvokeAgentSpans = listOf( + mapOf( + + "run.${mockExporter.lastRunId}" to mapOf( + "attributes" to mapOf( + "koog.agent.strategy.name" to strategyName, + "gen_ai.conversation.id" to mockExporter.lastRunId, + customBeforeStartAttribute.key to customBeforeStartAttribute.value, + customBeforeFinishAttribute.key to customBeforeFinishAttribute.value, + "gen_ai.system" to model.provider.id, + "gen_ai.agent.id" to agentId, + "gen_ai.operation.name" to SpanAttributes.Operation.OperationNameType.INVOKE_AGENT.id, + ), + "events" to emptyMap() + ) + ) + ) + + assertSpans(expectedInvokeAgentSpans, actualInvokeAgentSpans) + } + } + //region Private Methods /** diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/mock/MockGenAIAgentSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/mock/MockGenAIAgentSpan.kt index f3650d13c..abcc0a48e 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/mock/MockGenAIAgentSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/mock/MockGenAIAgentSpan.kt @@ -1,6 +1,5 @@ package ai.koog.agents.features.opentelemetry.mock -import ai.koog.agents.features.opentelemetry.attribute.Attribute import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan import ai.koog.agents.features.opentelemetry.span.SpanEndStatus import io.opentelemetry.api.trace.StatusCode @@ -8,7 +7,6 @@ import io.opentelemetry.api.trace.StatusCode internal class MockGenAIAgentSpan( override val spanId: String, parent: GenAIAgentSpan? = null, - override val attributes: List = emptyList(), private val spanEndStatus: SpanEndStatus? = null, ) : GenAIAgentSpan(parent) { diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpanTest.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpanTest.kt index b2aee03fe..a0b44a5e2 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpanTest.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/GenAIAgentSpanTest.kt @@ -118,7 +118,7 @@ class GenAIAgentSpanTest { ) ) - span.addEvents(events) + events.forEach { event -> span.addEvent(event) } // Verify events were added to the internal events set assertEquals(2, span.events.size) @@ -138,11 +138,11 @@ class GenAIAgentSpanTest { ) // Add the same event twice - span.addEvents(listOf(event)) - span.addEvents(listOf(event)) + span.addEvent(event) + span.addEvent(event) - // Verify the event was added only once - assertEquals(1, span.events.size) + // Verify that both event were added + assertEquals(2, span.events.size) assertTrue(span.events.contains(event)) } @@ -158,10 +158,45 @@ class GenAIAgentSpanTest { fields = listOf(EventBodyFields.Content("test content")) ) - span.addEvents(listOf(event)) + span.addEvent(event) // Verify the event was added assertEquals(1, span.events.size) assertTrue(span.events.contains(event)) } + + @Test + fun `addAttributes(list) should add multiple attributes to span`() { + val span = MockGenAIAgentSpan("test.span") + val mockSpan = MockSpan() + span.span = mockSpan + + val attributes = listOf( + MockAttribute("stringKey", "stringValue"), + MockAttribute("numberKey", 123), + MockAttribute("booleanKey", true) + ) + + span.addAttributes(attributes) + + assertEquals(3, span.attributes.size) + assertTrue(span.attributes.containsAll(attributes)) + } + + @Test + fun `addEvents(list) should add multiple events to span`() { + val span = MockGenAIAgentSpan("test.span") + val mockSpan = MockSpan() + span.span = mockSpan + + val events = listOf( + MockGenAIAgentEvent(name = "event1", attributes = listOf(MockAttribute("stringKey", "stringValue"))), + MockGenAIAgentEvent(name = "event2", attributes = listOf(MockAttribute("numberKey", 2))) + ) + + span.addEvents(events) + + assertEquals(2, span.events.size) + assertTrue(span.events.containsAll(events)) + } } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessorTest.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessorTest.kt index 5779f62fd..8c89e50ad 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessorTest.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/span/SpanProcessorTest.kt @@ -136,7 +136,7 @@ class SpanProcessorTest { spanProcessor.startSpan(span) assertEquals(1, spanProcessor.spansCount) - spanProcessor.endSpan(spanId) + spanProcessor.endSpan(span) assertEquals(0, spanProcessor.spansCount) val retrievedSpan = spanProcessor.getSpan(spanId) @@ -144,23 +144,6 @@ class SpanProcessorTest { assertEquals(0, spanProcessor.spansCount) } - @Test - fun `endSpan should throw when span not found`() { - val spanProcessor = SpanProcessor(MockTracer()) - val nonExistentSpanId = "non-existent-span" - assertEquals(0, spanProcessor.spansCount) - - val throwable = assertThrows { - spanProcessor.endSpan(nonExistentSpanId) - } - - assertEquals( - "Span with id '$nonExistentSpanId' not found. Make sure span was started or was not finished previously", - throwable.message - ) - assertEquals(0, spanProcessor.spansCount) - } - @Test fun `endUnfinishedSpans should end spans that match the filter`() { val spanProcessor = SpanProcessor(MockTracer()) @@ -183,7 +166,7 @@ class SpanProcessorTest { assertEquals(3, spanProcessor.spansCount) // End one of the spans - spanProcessor.endSpan(span2.spanId) + spanProcessor.endSpan(span2) assertEquals(2, spanProcessor.spansCount) // Verify initial state @@ -195,7 +178,7 @@ class SpanProcessorTest { assertFalse(span3.isEnded) // End spans that match the filter (only span1) - spanProcessor.endUnfinishedSpans { id -> id == span1Id } + spanProcessor.endUnfinishedSpans { span -> span.spanId == span1Id } // Verify span1 is ended, span2 was already ended, span3 is still not ended assertTrue(span1.isStarted) diff --git a/examples/src/main/kotlin/ai/koog/agents/example/features/langfuse/Langfuse.kt b/examples/src/main/kotlin/ai/koog/agents/example/features/langfuse/Langfuse.kt index 2a7cfd74f..c67cd9267 100644 --- a/examples/src/main/kotlin/ai/koog/agents/example/features/langfuse/Langfuse.kt +++ b/examples/src/main/kotlin/ai/koog/agents/example/features/langfuse/Langfuse.kt @@ -3,7 +3,7 @@ package ai.koog.agents.example.features.langfuse import ai.koog.agents.core.agent.AIAgent import ai.koog.agents.example.ApiKeyService import ai.koog.agents.features.opentelemetry.feature.OpenTelemetry -import ai.koog.agents.features.opentelemetry.integrations.addLangfuseExporter +import ai.koog.agents.features.opentelemetry.integration.langfuse.addLangfuseExporter import ai.koog.prompt.executor.clients.openai.OpenAIModels import ai.koog.prompt.executor.llms.all.simpleOpenAIExecutor import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter diff --git a/examples/src/main/kotlin/ai/koog/agents/example/features/weave/Weave.kt b/examples/src/main/kotlin/ai/koog/agents/example/features/weave/Weave.kt index f01ccf2e2..6c20e3fcb 100644 --- a/examples/src/main/kotlin/ai/koog/agents/example/features/weave/Weave.kt +++ b/examples/src/main/kotlin/ai/koog/agents/example/features/weave/Weave.kt @@ -3,7 +3,7 @@ package ai.koog.agents.example.features.weave import ai.koog.agents.core.agent.AIAgent import ai.koog.agents.example.ApiKeyService import ai.koog.agents.features.opentelemetry.feature.OpenTelemetry -import ai.koog.agents.features.opentelemetry.integrations.addWeaveExporter +import ai.koog.agents.features.opentelemetry.integration.weave.addWeaveExporter import ai.koog.prompt.executor.clients.openai.OpenAIModels import ai.koog.prompt.executor.llms.all.simpleOpenAIExecutor import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter