Skip to content

feat(instrumentation-aws-lambda): support streaming handlers #2970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 178 additions & 52 deletions packages/instrumentation-aws-lambda/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import {
Callback,
Context,
Handler,
StreamifyHandler,
} from 'aws-lambda';

import { AwsLambdaInstrumentationConfig, EventContextExtractor } from './types';
Expand All @@ -69,6 +70,10 @@ const headerGetter: TextMapGetter<APIGatewayProxyEventHeaders> = {
};

export const lambdaMaxInitInMilliseconds = 10_000;
export const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for(
'aws.lambda.runtime.handler.streaming'
);
export const AWS_HANDLER_STREAMING_RESPONSE = 'response';

export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstrumentationConfig> {
private declare _traceForceFlusher?: () => Promise<void>;
Expand All @@ -91,6 +96,19 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
return [];
}

// Provide a temporary awslambda polyfill for CommonJS modules during loading
// This prevents ReferenceError when modules use awslambda.streamifyResponse at load time
if (typeof globalThis.awslambda === 'undefined') {
(globalThis as any).awslambda = {
streamifyResponse: (handler: any) => {
// Add the streaming symbols that the instrumentation looks for
handler[AWS_HANDLER_STREAMING_SYMBOL] =
AWS_HANDLER_STREAMING_RESPONSE;
return handler;
},
};
}

const handler = path.basename(handlerDef);
const moduleRoot = handlerDef.substring(
0,
Expand Down Expand Up @@ -175,12 +193,40 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
}

private _getHandler(handlerLoadStartTime: number) {
return (original: Handler) => {
return (
original: Handler | StreamifyHandler
): Handler | StreamifyHandler => {
if (this._isStreamingHandler(original)) {
const patchedHandler = this._getPatchHandler(
original,
handlerLoadStartTime
);

// Streaming handlers have special symbols that we need to copy over to the patched handler.
for (const symbol of Object.getOwnPropertySymbols(original)) {
(patchedHandler as unknown as Record<symbol, unknown>)[symbol] = (
original as unknown as Record<symbol, unknown>
)[symbol];
}

return patchedHandler;
}

return this._getPatchHandler(original, handlerLoadStartTime);
};
}

private _getPatchHandler(original: Handler, lambdaStartTime: number) {
private _getPatchHandler(original: Handler, lambdaStartTime: number): Handler;

private _getPatchHandler(
original: StreamifyHandler,
lambdaStartTime: number
): StreamifyHandler;

private _getPatchHandler(
original: Handler | StreamifyHandler,
lambdaStartTime: number
): Handler | StreamifyHandler {
diag.debug('patch handler function');
const plugin = this;

Expand Down Expand Up @@ -215,6 +261,43 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
}
}

if (this._isStreamingHandler(original)) {
return function patchedStreamingHandler(
this: never,
// The event can be a user type, it truly is any.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
event: any,
responseStream: Parameters<StreamifyHandler>[1],
context: Context
) {
_onRequest();

const parent = plugin._determineParent(event, context);
const span = plugin._createSpanForRequest(
event,
context,
requestIsColdStart,
parent
);
plugin._applyRequestHook(span, event, context);

return otelContext.with(trace.setSpan(parent, span), () => {
const maybePromise = safeExecuteInTheMiddle(
() => original.apply(this, [event, responseStream, context]),
error => {
if (error != null) {
// Exception thrown synchronously before resolving promise.
plugin._applyResponseHook(span, error);
plugin._endSpan(span, error, () => {});
}
}
) as Promise<{}> | undefined;

return plugin._handlePromiseResult(span, maybePromise);
});
};
}

return function patchedHandler(
this: never,
// The event can be a user type, it truly is any.
Expand All @@ -225,44 +308,16 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
) {
_onRequest();

const config = plugin.getConfig();
const parent = AwsLambdaInstrumentation._determineParent(
const parent = plugin._determineParent(event, context);

const span = plugin._createSpanForRequest(
event,
context,
config.eventContextExtractor ||
AwsLambdaInstrumentation._defaultEventContextExtractor
);

const name = context.functionName;
const span = plugin.tracer.startSpan(
name,
{
kind: SpanKind.SERVER,
attributes: {
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
[SEMRESATTRS_CLOUD_ACCOUNT_ID]:
AwsLambdaInstrumentation._extractAccountId(
context.invokedFunctionArn
),
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
...AwsLambdaInstrumentation._extractOtherEventFields(event),
},
},
requestIsColdStart,
parent
);

const { requestHook } = config;
if (requestHook) {
safeExecuteInTheMiddle(
() => requestHook(span, { event, context }),
e => {
if (e)
diag.error('aws-lambda instrumentation: requestHook error', e);
},
true
);
}
plugin._applyRequestHook(span, event, context);

return otelContext.with(trace.setSpan(parent, span), () => {
// Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling
Expand All @@ -280,27 +335,98 @@ export class AwsLambdaInstrumentation extends InstrumentationBase<AwsLambdaInstr
}
}
) as Promise<{}> | undefined;
if (typeof maybePromise?.then === 'function') {
return maybePromise.then(
value => {
plugin._applyResponseHook(span, null, value);
return new Promise(resolve =>
plugin._endSpan(span, undefined, () => resolve(value))
);
},
(err: Error | string) => {
plugin._applyResponseHook(span, err);
return new Promise((resolve, reject) =>
plugin._endSpan(span, err, () => reject(err))
);
}
);
}
return maybePromise;

return plugin._handlePromiseResult(span, maybePromise);
});
};
}

private _createSpanForRequest(
event: any,
context: Context,
requestIsColdStart: boolean,
parent: OtelContext
): Span {
const name = context.functionName;
return this.tracer.startSpan(
name,
{
kind: SpanKind.SERVER,
attributes: {
[SEMATTRS_FAAS_EXECUTION]: context.awsRequestId,
[SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn,
[SEMRESATTRS_CLOUD_ACCOUNT_ID]:
AwsLambdaInstrumentation._extractAccountId(
context.invokedFunctionArn
),
[ATTR_FAAS_COLDSTART]: requestIsColdStart,
...AwsLambdaInstrumentation._extractOtherEventFields(event),
},
},
parent
);
}

private _applyRequestHook(span: Span, event: any, context: Context): void {
const { requestHook } = this.getConfig();
if (requestHook) {
safeExecuteInTheMiddle(
() => requestHook(span, { event, context }),
e => {
if (e) diag.error('aws-lambda instrumentation: requestHook error', e);
},
true
);
}
}

private _handlePromiseResult(
span: Span,
maybePromise: Promise<{}> | undefined
): Promise<{}> | undefined {
if (typeof maybePromise?.then === 'function') {
return maybePromise.then(
value => {
this._applyResponseHook(span, null, value);
return new Promise(resolve =>
this._endSpan(span, undefined, () => resolve(value))
);
},
(err: Error | string) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that the promise fails silently in this scenario?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fully sure what you mean 🤔 we return a new promise that rejects with the error after the span is ended, so it shouldn't fail silently? (Also I didn't change this part, just extracted it into a function)

this._applyResponseHook(span, err);
return new Promise((resolve, reject) =>
this._endSpan(span, err, () => reject(err))
);
}
);
}

// Handle synchronous return values by ending the span and applying response hook
this._applyResponseHook(span, null, maybePromise);
this._endSpan(span, undefined, () => {});
return maybePromise;
}

private _determineParent(event: any, context: Context): OtelContext {
const config = this.getConfig();
return AwsLambdaInstrumentation._determineParent(
event,
context,
config.eventContextExtractor ||
AwsLambdaInstrumentation._defaultEventContextExtractor
);
}

private _isStreamingHandler<TEvent, TResult>(
handler: Handler<TEvent, TResult> | StreamifyHandler<TEvent, TResult>
): handler is StreamifyHandler<TEvent, TResult> {
return (
(handler as unknown as Record<symbol, unknown>)[
AWS_HANDLER_STREAMING_SYMBOL
] === AWS_HANDLER_STREAMING_RESPONSE
);
}

override setTracerProvider(tracerProvider: TracerProvider) {
super.setTracerProvider(tracerProvider);
this._traceForceFlusher = this._traceForceFlush(tracerProvider);
Expand Down
4 changes: 2 additions & 2 deletions packages/instrumentation-aws-lambda/src/internal-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Handler } from 'aws-lambda';
import { Handler, StreamifyHandler } from 'aws-lambda';

export type LambdaModule = Record<string, Handler>;
export type LambdaModule = Record<string, Handler | StreamifyHandler>;
Loading