-
Notifications
You must be signed in to change notification settings - Fork 590
feat(opentelemetry-sampler-aws-xray): Add Rate Limiter and Sampling Targets Poller Logic #2924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f5c1380
c8a8bb9
496cc32
e2f97f5
39f6360
a2c0165
d9c42de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
// Includes work from: | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
/* | ||
* The RateLimiter keeps track of the current reservoir quota balance available (measured via available time) | ||
* If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time) | ||
* A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available. | ||
*/ | ||
export class RateLimiter { | ||
// Quota assigned to client to dictate maximum quota balance that can be consumed per second. | ||
private quota: number; | ||
private MAX_BALANCE_MILLIS: number; | ||
// Used to measure current quota balance. | ||
private walletFloorMillis: number; | ||
|
||
constructor(quota: number, maxBalanceInSeconds = 1) { | ||
this.MAX_BALANCE_MILLIS = maxBalanceInSeconds * 1000.0; | ||
this.quota = quota; | ||
this.walletFloorMillis = Date.now(); | ||
// current "balance" would be `ceiling - floor` | ||
} | ||
|
||
public take(cost = 1): boolean { | ||
if (this.quota === 0) { | ||
return false; | ||
} | ||
|
||
// assume divide by zero not possible | ||
const costInMillis: number = (cost * 1000.0) / this.quota; | ||
|
||
const walletCeilingMillis: number = Date.now(); | ||
let currentBalanceMillis: number = | ||
walletCeilingMillis - this.walletFloorMillis; | ||
currentBalanceMillis = Math.min( | ||
currentBalanceMillis, | ||
this.MAX_BALANCE_MILLIS | ||
); | ||
const pendingRemainingBalanceMillis: number = | ||
currentBalanceMillis - costInMillis; | ||
if (pendingRemainingBalanceMillis >= 0) { | ||
this.walletFloorMillis = | ||
walletCeilingMillis - pendingRemainingBalanceMillis; | ||
return true; | ||
} | ||
// No changes to the wallet state | ||
return false; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
// Includes work from: | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api'; | ||
import { | ||
Sampler, | ||
SamplingDecision, | ||
SamplingResult, | ||
} from '@opentelemetry/sdk-trace-base'; | ||
import { RateLimiter } from './rate-limiter'; | ||
|
||
export class RateLimitingSampler implements Sampler { | ||
private quota: number; | ||
private reservoir: RateLimiter; | ||
|
||
constructor(quota: number) { | ||
this.quota = quota; | ||
this.reservoir = new RateLimiter(quota); | ||
} | ||
|
||
shouldSample( | ||
context: Context, | ||
traceId: string, | ||
spanName: string, | ||
spanKind: SpanKind, | ||
attributes: Attributes, | ||
links: Link[] | ||
): SamplingResult { | ||
if (this.reservoir.take(1)) { | ||
return { | ||
decision: SamplingDecision.RECORD_AND_SAMPLED, | ||
attributes: attributes, | ||
}; | ||
} | ||
return { decision: SamplingDecision.NOT_RECORD, attributes: attributes }; | ||
} | ||
|
||
public toString(): string { | ||
return `RateLimitingSampler{rate limiting sampling with sampling config of ${this.quota} req/sec and 0% of additional requests}`; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do we need 'and 0% of additional requests' in the context of rate limiting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, but I kept this text to be aligned with the Python implementation. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,11 +36,19 @@ import { FallbackSampler } from './fallback-sampler'; | |
import { | ||
AWSXRayRemoteSamplerConfig, | ||
GetSamplingRulesResponse, | ||
GetSamplingTargetsBody, | ||
GetSamplingTargetsResponse, | ||
SamplingRuleRecord, | ||
SamplingTargetDocument, | ||
TargetMap, | ||
} from './types'; | ||
import { RuleCache } from './rule-cache'; | ||
import { | ||
DEFAULT_TARGET_POLLING_INTERVAL_SECONDS, | ||
RuleCache, | ||
} from './rule-cache'; | ||
|
||
import { SamplingRuleApplier } from './sampling-rule-applier'; | ||
import { PACKAGE_NAME } from './version'; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How this file is generated? I tried 'npm run setup:dev' and 'npm run compile', none of them generate this file and that is causing build failure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is generated by |
||
|
||
// 5 minute default sampling rules polling interval | ||
const DEFAULT_RULES_POLLING_INTERVAL_SECONDS: number = 5 * 60; | ||
|
@@ -94,17 +102,22 @@ export class AWSXRayRemoteSampler implements Sampler { | |
// Not intended for external use, use Parent-based `AWSXRayRemoteSampler` instead. | ||
export class _AWSXRayRemoteSampler implements Sampler { | ||
private rulePollingIntervalMillis: number; | ||
private targetPollingInterval: number; | ||
private awsProxyEndpoint: string; | ||
private ruleCache: RuleCache; | ||
private fallbackSampler: FallbackSampler; | ||
private samplerDiag: DiagLogger; | ||
private rulePoller: NodeJS.Timeout | undefined; | ||
private targetPoller: NodeJS.Timeout | undefined; | ||
private clientId: string; | ||
private rulePollingJitterMillis: number; | ||
private targetPollingJitterMillis: number; | ||
private samplingClient: AWSXRaySamplingClient; | ||
|
||
constructor(samplerConfig: AWSXRayRemoteSamplerConfig) { | ||
this.samplerDiag = diag; | ||
this.samplerDiag = diag.createComponentLogger({ | ||
namespace: PACKAGE_NAME, | ||
}); | ||
|
||
if ( | ||
samplerConfig.pollingInterval == null || | ||
|
@@ -120,6 +133,8 @@ export class _AWSXRayRemoteSampler implements Sampler { | |
} | ||
|
||
this.rulePollingJitterMillis = Math.random() * 5 * 1000; | ||
this.targetPollingInterval = this.getDefaultTargetPollingInterval(); | ||
this.targetPollingJitterMillis = (Math.random() / 10) * 1000; | ||
|
||
this.awsProxyEndpoint = samplerConfig.endpoint | ||
? samplerConfig.endpoint | ||
|
@@ -137,7 +152,12 @@ export class _AWSXRayRemoteSampler implements Sampler { | |
// Start the Sampling Rules poller | ||
this.startSamplingRulesPoller(); | ||
|
||
// TODO: Start the Sampling Targets poller | ||
// Start the Sampling Targets poller where the first poll occurs after the default interval | ||
this.startSamplingTargetsPoller(); | ||
} | ||
|
||
public getDefaultTargetPollingInterval(): number { | ||
return DEFAULT_TARGET_POLLING_INTERVAL_SECONDS; | ||
} | ||
|
||
public shouldSample( | ||
|
@@ -203,6 +223,7 @@ export class _AWSXRayRemoteSampler implements Sampler { | |
|
||
public stopPollers() { | ||
clearInterval(this.rulePoller); | ||
clearInterval(this.targetPoller); | ||
} | ||
|
||
private startSamplingRulesPoller(): void { | ||
|
@@ -216,6 +237,27 @@ export class _AWSXRayRemoteSampler implements Sampler { | |
this.rulePoller.unref(); | ||
} | ||
|
||
private startSamplingTargetsPoller(): void { | ||
// Update sampling targets every targetPollingInterval (usually 10 seconds) | ||
this.targetPoller = setInterval( | ||
() => this.getAndUpdateSamplingTargets(), | ||
this.targetPollingInterval * 1000 + this.targetPollingJitterMillis | ||
); | ||
this.targetPoller.unref(); | ||
} | ||
|
||
private getAndUpdateSamplingTargets(): void { | ||
const requestBody: GetSamplingTargetsBody = { | ||
SamplingStatisticsDocuments: | ||
this.ruleCache.createSamplingStatisticsDocuments(this.clientId), | ||
}; | ||
|
||
this.samplingClient.fetchSamplingTargets( | ||
requestBody, | ||
this.updateSamplingTargets.bind(this) | ||
); | ||
} | ||
|
||
private getAndUpdateSamplingRules(): void { | ||
this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this)); | ||
} | ||
|
@@ -242,6 +284,41 @@ export class _AWSXRayRemoteSampler implements Sampler { | |
} | ||
} | ||
|
||
private updateSamplingTargets( | ||
responseObject: GetSamplingTargetsResponse | ||
): void { | ||
try { | ||
const targetDocuments: TargetMap = {}; | ||
|
||
// Create Target-Name-to-Target-Map from sampling targets response | ||
responseObject.SamplingTargetDocuments.forEach( | ||
(newTarget: SamplingTargetDocument) => { | ||
targetDocuments[newTarget.RuleName] = newTarget; | ||
} | ||
); | ||
|
||
// Update targets in the cache | ||
const [refreshSamplingRules, nextPollingInterval]: [boolean, number] = | ||
this.ruleCache.updateTargets( | ||
targetDocuments, | ||
responseObject.LastRuleModification | ||
); | ||
this.targetPollingInterval = nextPollingInterval; | ||
clearInterval(this.targetPoller); | ||
this.startSamplingTargetsPoller(); | ||
|
||
if (refreshSamplingRules) { | ||
this.samplerDiag.debug( | ||
'Performing out-of-band sampling rule polling to fetch updated rules.' | ||
); | ||
clearInterval(this.rulePoller); | ||
this.startSamplingRulesPoller(); | ||
} | ||
} catch (error: unknown) { | ||
this.samplerDiag.debug('Error occurred when updating Sampling Targets'); | ||
} | ||
} | ||
|
||
private static generateClientId(): string { | ||
const hexChars: string[] = [ | ||
'0', | ||
|
Uh oh!
There was an error while loading. Please reload this page.