Skip to content

feat(opentelemetry-sampler-aws-xray): Add Rate Limiter and Sampling Targets Poller Logic #2924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class AWSXRaySamplingClient {
this.makeSamplingRequest<GetSamplingTargetsResponse>(
this.samplingTargetsEndpoint,
callback,
this.samplerDiag.debug,
(message: string) => this.samplerDiag.debug(message),
JSON.stringify(requestBody)
);
}
Expand All @@ -56,7 +56,7 @@ export class AWSXRaySamplingClient {
this.makeSamplingRequest<GetSamplingRulesResponse>(
this.getSamplingRulesEndpoint,
callback,
this.samplerDiag.error
(message: string) => this.samplerDiag.error(message)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@
import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api';
import {
Sampler,
SamplingDecision,
SamplingResult,
TraceIdRatioBasedSampler,
} from '@opentelemetry/sdk-trace-base';
import { RateLimitingSampler } from './rate-limiting-sampler';

// FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
export class FallbackSampler implements Sampler {
private fixedRateSampler: TraceIdRatioBasedSampler;
private rateLimitingSampler: RateLimitingSampler;

constructor() {
this.fixedRateSampler = new TraceIdRatioBasedSampler(0.05);
constructor(ratio = 0.05, quota = 1) {
this.fixedRateSampler = new TraceIdRatioBasedSampler(ratio);
this.rateLimitingSampler = new RateLimitingSampler(quota);
}

shouldSample(
Expand All @@ -41,7 +45,19 @@ export class FallbackSampler implements Sampler {
attributes: Attributes,
links: Link[]
): SamplingResult {
// TODO: implement and use Rate Limiting Sampler
const samplingResult: SamplingResult =
this.rateLimitingSampler.shouldSample(
context,
traceId,
spanName,
spanKind,
attributes,
links
);

if (samplingResult.decision !== SamplingDecision.NOT_RECORD) {
return samplingResult;
}

return this.fixedRateSampler.shouldSample(context, traceId);
}
Expand Down
65 changes: 65 additions & 0 deletions incubator/opentelemetry-sampler-aws-xray/src/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Includes work from:
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

/*
* The RateLimiter keeps track of the current reservoir quota balance available (measured via available time)
* If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time)
* A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available.
*/
export class RateLimiter {
// Quota assigned to client to dictate maximum quota balance that can be consumed per second.
private quota: number;
private MAX_BALANCE_MILLIS: number;
// Used to measure current quota balance.
private walletFloorMillis: number;

constructor(quota: number, maxBalanceInSeconds = 1) {
this.MAX_BALANCE_MILLIS = maxBalanceInSeconds * 1000.0;
this.quota = quota;
this.walletFloorMillis = Date.now();
// current "balance" would be `ceiling - floor`
}

public take(cost = 1): boolean {
if (this.quota === 0) {
return false;
}

// assume divide by zero not possible
const costInMillis: number = (cost * 1000.0) / this.quota;

const walletCeilingMillis: number = Date.now();
let currentBalanceMillis: number =
walletCeilingMillis - this.walletFloorMillis;
currentBalanceMillis = Math.min(
currentBalanceMillis,
this.MAX_BALANCE_MILLIS
);
const pendingRemainingBalanceMillis: number =
currentBalanceMillis - costInMillis;
if (pendingRemainingBalanceMillis >= 0) {
this.walletFloorMillis =
walletCeilingMillis - pendingRemainingBalanceMillis;
return true;
}
// No changes to the wallet state
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Includes work from:
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api';
import {
Sampler,
SamplingDecision,
SamplingResult,
} from '@opentelemetry/sdk-trace-base';
import { RateLimiter } from './rate-limiter';

export class RateLimitingSampler implements Sampler {
private quota: number;
private reservoir: RateLimiter;

constructor(quota: number) {
this.quota = quota;
this.reservoir = new RateLimiter(quota);
}

shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
if (this.reservoir.take(1)) {
return {
decision: SamplingDecision.RECORD_AND_SAMPLED,
attributes: attributes,
};
}
return { decision: SamplingDecision.NOT_RECORD, attributes: attributes };
}

public toString(): string {
return `RateLimitingSampler{rate limiting sampling with sampling config of ${this.quota} req/sec and 0% of additional requests}`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need 'and 0% of additional requests' in the context of rate limiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but I kept this text to be aligned with the Python implementation.

}
}
83 changes: 80 additions & 3 deletions incubator/opentelemetry-sampler-aws-xray/src/remote-sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ import { FallbackSampler } from './fallback-sampler';
import {
AWSXRayRemoteSamplerConfig,
GetSamplingRulesResponse,
GetSamplingTargetsBody,
GetSamplingTargetsResponse,
SamplingRuleRecord,
SamplingTargetDocument,
TargetMap,
} from './types';
import { RuleCache } from './rule-cache';
import {
DEFAULT_TARGET_POLLING_INTERVAL_SECONDS,
RuleCache,
} from './rule-cache';

import { SamplingRuleApplier } from './sampling-rule-applier';
import { PACKAGE_NAME } from './version';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this file is generated? I tried 'npm run setup:dev' and 'npm run compile', none of them generate this file and that is causing build failure.

Copy link
Contributor Author

@jj22ee jj22ee Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is generated by npm run version:update. However, while this package is in the incubator folder, this command isn't automatically run for this package until it is moved out of this directory.


// 5 minute default sampling rules polling interval
const DEFAULT_RULES_POLLING_INTERVAL_SECONDS: number = 5 * 60;
Expand Down Expand Up @@ -94,17 +102,22 @@ export class AWSXRayRemoteSampler implements Sampler {
// Not intended for external use, use Parent-based `AWSXRayRemoteSampler` instead.
export class _AWSXRayRemoteSampler implements Sampler {
private rulePollingIntervalMillis: number;
private targetPollingInterval: number;
private awsProxyEndpoint: string;
private ruleCache: RuleCache;
private fallbackSampler: FallbackSampler;
private samplerDiag: DiagLogger;
private rulePoller: NodeJS.Timeout | undefined;
private targetPoller: NodeJS.Timeout | undefined;
private clientId: string;
private rulePollingJitterMillis: number;
private targetPollingJitterMillis: number;
private samplingClient: AWSXRaySamplingClient;

constructor(samplerConfig: AWSXRayRemoteSamplerConfig) {
this.samplerDiag = diag;
this.samplerDiag = diag.createComponentLogger({
namespace: PACKAGE_NAME,
});

if (
samplerConfig.pollingInterval == null ||
Expand All @@ -120,6 +133,8 @@ export class _AWSXRayRemoteSampler implements Sampler {
}

this.rulePollingJitterMillis = Math.random() * 5 * 1000;
this.targetPollingInterval = this.getDefaultTargetPollingInterval();
this.targetPollingJitterMillis = (Math.random() / 10) * 1000;

this.awsProxyEndpoint = samplerConfig.endpoint
? samplerConfig.endpoint
Expand All @@ -137,7 +152,12 @@ export class _AWSXRayRemoteSampler implements Sampler {
// Start the Sampling Rules poller
this.startSamplingRulesPoller();

// TODO: Start the Sampling Targets poller
// Start the Sampling Targets poller where the first poll occurs after the default interval
this.startSamplingTargetsPoller();
}

public getDefaultTargetPollingInterval(): number {
return DEFAULT_TARGET_POLLING_INTERVAL_SECONDS;
}

public shouldSample(
Expand Down Expand Up @@ -203,6 +223,7 @@ export class _AWSXRayRemoteSampler implements Sampler {

public stopPollers() {
clearInterval(this.rulePoller);
clearInterval(this.targetPoller);
}

private startSamplingRulesPoller(): void {
Expand All @@ -216,6 +237,27 @@ export class _AWSXRayRemoteSampler implements Sampler {
this.rulePoller.unref();
}

private startSamplingTargetsPoller(): void {
// Update sampling targets every targetPollingInterval (usually 10 seconds)
this.targetPoller = setInterval(
() => this.getAndUpdateSamplingTargets(),
this.targetPollingInterval * 1000 + this.targetPollingJitterMillis
);
this.targetPoller.unref();
}

private getAndUpdateSamplingTargets(): void {
const requestBody: GetSamplingTargetsBody = {
SamplingStatisticsDocuments:
this.ruleCache.createSamplingStatisticsDocuments(this.clientId),
};

this.samplingClient.fetchSamplingTargets(
requestBody,
this.updateSamplingTargets.bind(this)
);
}

private getAndUpdateSamplingRules(): void {
this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this));
}
Expand All @@ -242,6 +284,41 @@ export class _AWSXRayRemoteSampler implements Sampler {
}
}

private updateSamplingTargets(
responseObject: GetSamplingTargetsResponse
): void {
try {
const targetDocuments: TargetMap = {};

// Create Target-Name-to-Target-Map from sampling targets response
responseObject.SamplingTargetDocuments.forEach(
(newTarget: SamplingTargetDocument) => {
targetDocuments[newTarget.RuleName] = newTarget;
}
);

// Update targets in the cache
const [refreshSamplingRules, nextPollingInterval]: [boolean, number] =
this.ruleCache.updateTargets(
targetDocuments,
responseObject.LastRuleModification
);
this.targetPollingInterval = nextPollingInterval;
clearInterval(this.targetPoller);
this.startSamplingTargetsPoller();

if (refreshSamplingRules) {
this.samplerDiag.debug(
'Performing out-of-band sampling rule polling to fetch updated rules.'
);
clearInterval(this.rulePoller);
this.startSamplingRulesPoller();
}
} catch (error: unknown) {
this.samplerDiag.debug('Error occurred when updating Sampling Targets');
}
}

private static generateClientId(): string {
const hexChars: string[] = [
'0',
Expand Down
Loading
Loading