Skip to content

Commit 8b7ac8a

Browse files
authored
Avoid Source Storage Server Being Overloaded by Data Movements with Replica Consistency Check (#12164)
* add ss metrics for fetch key * bug fix * revert checkTimeSpanSec * fix adjustRelocationParallelismForSrc * code cleanup * fix replicaComparison * remove unnecessary counters * fix large storage server data structure * address comments * address comments * address comments * code cleanup * bug fix * fix bug
1 parent 4ccba1f commit 8b7ac8a

File tree

15 files changed

+207
-106
lines changed

15 files changed

+207
-106
lines changed

fdbclient/BackupAgentBase.actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ ACTOR Future<Void> readCommitted(Database cx,
567567
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
568568
if (CLIENT_KNOBS->ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS) {
569569
tr.setOption(FDBTransactionOptions::ENABLE_REPLICA_CONSISTENCY_CHECK);
570-
int64_t requiredReplicas = CLIENT_KNOBS->CONSISTENCY_CHECK_REQUIRED_REPLICAS;
570+
int64_t requiredReplicas = CLIENT_KNOBS->BACKUP_CONSISTENCY_CHECK_REQUIRED_REPLICAS;
571571
tr.setOption(FDBTransactionOptions::CONSISTENCY_CHECK_REQUIRED_REPLICAS,
572572
StringRef((uint8_t*)&requiredReplicas, sizeof(int64_t)));
573573
}

fdbclient/ClientKnobs.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ void ClientKnobs::initialize(Randomize randomize) {
200200
init( BLOB_GRANULE_RESTORE_CHECK_INTERVAL, 10 );
201201
init( BACKUP_CONTAINER_LOCAL_ALLOW_RELATIVE_PATH, false );
202202
init( ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS, false ); if( randomize && BUGGIFY ) { ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS = true; }
203-
init( CONSISTENCY_CHECK_REQUIRED_REPLICAS, -2 ); // Do consistency check based on all available storage replicas
203+
init( BACKUP_CONSISTENCY_CHECK_REQUIRED_REPLICAS, -2 ); // Do consistency check based on all available storage replicas
204204
init( BULKLOAD_JOB_HISTORY_COUNT_MAX, 10 ); if (randomize && BUGGIFY) BULKLOAD_JOB_HISTORY_COUNT_MAX = deterministicRandom()->randomInt(1, 10);
205205
init( BULKLOAD_VERBOSE_LEVEL, 10 );
206206
init( S3CLIENT_VERBOSE_LEVEL, 10 );

fdbclient/NativeAPI.actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4706,6 +4706,7 @@ Future<RangeResultFamily> getRange(Reference<TransactionState> trState,
47064706
req.tenantInfo = useTenant ? trState->getTenantInfo() : TenantInfo();
47074707
req.options = trState->readOptions;
47084708
req.version = trState->readVersion();
4709+
req.taskID = trState->taskID;
47094710

47104711
trState->cx->getLatestCommitVersions(beginServer.locations, trState, req.ssLatestCommitVersions);
47114712

fdbclient/ServerKnobs.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
177177
init( FETCH_KEYS_THROTTLE_PRIORITY_THRESHOLD, 0 ); if( randomize && BUGGIFY ) FETCH_KEYS_THROTTLE_PRIORITY_THRESHOLD = 700;
178178

179179
init( ENABLE_REPLICA_CONSISTENCY_CHECK_ON_DATA_MOVEMENT, false ); ENABLE_REPLICA_CONSISTENCY_CHECK_ON_DATA_MOVEMENT = isSimulated;
180-
init( CONSISTENCY_CHECK_REQUIRED_REPLICAS, 1 );
181-
180+
init( DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS, 1 );
181+
init( ENABLE_CONSERVATIVE_RELOCATION_WHEN_REPLICA_CONSISTENCY_CHECK, false ); if (isSimulated) ENABLE_CONSERVATIVE_RELOCATION_WHEN_REPLICA_CONSISTENCY_CHECK = deterministicRandom()->coinflip();
182182
init( PROBABILITY_TEAM_REDUNDANT_DATAMOVE_CHOOSE_TRUE_BEST_DEST, 0.0 ); if (isSimulated) PROBABILITY_TEAM_REDUNDANT_DATAMOVE_CHOOSE_TRUE_BEST_DEST = deterministicRandom()->random01();
183183
init( PROBABILITY_TEAM_UNHEALTHY_DATAMOVE_CHOOSE_TRUE_BEST_DEST, 0.0 ); if (isSimulated) PROBABILITY_TEAM_UNHEALTHY_DATAMOVE_CHOOSE_TRUE_BEST_DEST = deterministicRandom()->random01();
184184

fdbclient/include/fdbclient/ClientKnobs.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ClientKnobs : public KnobsImpl<ClientKno
200200
int BLOB_GRANULE_RESTORE_CHECK_INTERVAL;
201201
bool BACKUP_CONTAINER_LOCAL_ALLOW_RELATIVE_PATH;
202202
bool ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS;
203-
int CONSISTENCY_CHECK_REQUIRED_REPLICAS;
203+
int BACKUP_CONSISTENCY_CHECK_REQUIRED_REPLICAS;
204204
int BULKLOAD_JOB_HISTORY_COUNT_MAX; // the max number of bulk load job history to keep. The oldest job history will
205205
// be removed when the count exceeds this value. Set to 0 to disable history.
206206
// Do not set the value to a large number, e.g. <= 10.

fdbclient/include/fdbclient/ServerKnobs.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,11 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
199199
// STORAGE_FETCH_KEYS_RATE_LIMIT.
200200
int FETCH_KEYS_THROTTLE_PRIORITY_THRESHOLD;
201201

202-
bool ENABLE_REPLICA_CONSISTENCY_CHECK_ON_DATA_MOVEMENT;
203-
int CONSISTENCY_CHECK_REQUIRED_REPLICAS;
202+
bool ENABLE_REPLICA_CONSISTENCY_CHECK_ON_DATA_MOVEMENT; // Enable to check replica consistency on data movement
203+
int DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS; // The number of extra replicas to check for replica consistency
204+
// on data movement read range requests by fetchKeys
205+
bool ENABLE_CONSERVATIVE_RELOCATION_WHEN_REPLICA_CONSISTENCY_CHECK; // Enable to slow down relocation when replica
206+
// consistency check on data movement is enabled
204207

205208
// Probability that a team redundant data move set TrueBest when get destination team
206209
double PROBABILITY_TEAM_REDUNDANT_DATAMOVE_CHOOSE_TRUE_BEST_DEST;

fdbclient/include/fdbclient/StorageServerInterface.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ struct GetKeyValuesRequest : TimedRequest {
391391
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
392392
// to this client, of all storage replicas that
393393
// serve the given key
394+
Optional<TaskPriority> taskID; // includes the information about read purpose
394395

395396
GetKeyValuesRequest() {}
396397

@@ -410,6 +411,7 @@ struct GetKeyValuesRequest : TimedRequest {
410411
tenantInfo,
411412
options,
412413
ssLatestCommitVersions,
414+
taskID,
413415
arena);
414416
}
415417
};
@@ -447,6 +449,7 @@ struct GetMappedKeyValuesRequest : TimedRequest {
447449
VersionVector ssLatestCommitVersions; // includes the latest commit versions, as known
448450
// to this client, of all storage replicas that
449451
// serve the given key range
452+
Optional<TaskPriority> taskID; // includes the information about read purpose
450453

451454
GetMappedKeyValuesRequest() {}
452455

@@ -467,6 +470,7 @@ struct GetMappedKeyValuesRequest : TimedRequest {
467470
tenantInfo,
468471
options,
469472
ssLatestCommitVersions,
473+
taskID,
470474
arena);
471475
}
472476
};

fdbrpc/include/fdbrpc/LoadBalance.actor.h

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -311,32 +311,56 @@ Future<Void> replicaComparison(Req req,
311311
}
312312
} else if (!srcLB.present() || !srcLB.get().error.present()) {
313313
// Verify that the other SS servers in the team have the same data.
314-
state std::vector<Future<Optional<ErrorOr<Resp>>>> restOfTeamFutures;
315-
restOfTeamFutures.reserve(ssTeam->size() - 1);
314+
std::vector<uint64_t> candidates;
315+
// candidates includes all healthy SS endpoints in the team except the one we already
316+
// have a response from
316317
for (int i = 0; i < ssTeam->size(); i++) {
317318
RequestStream<Req, P> const* si = &ssTeam->get(i, channel);
318-
if (si->getEndpoint().token.first() !=
319-
srcEndpointId) { // don't re-request to SS we already have a response from
320-
if (!IFailureMonitor::failureMonitor().getState(si->getEndpoint()).failed) {
321-
resetReply(req);
322-
restOfTeamFutures.push_back((
323-
requiredReplicas == BEST_EFFORT
324-
? timeout(si->tryGetReply(req), FLOW_KNOBS->LOAD_BALANCE_FETCH_REPLICA_TIMEOUT)
325-
: timeout(errorOr(si->getReply(req)), FLOW_KNOBS->LOAD_BALANCE_FETCH_REPLICA_TIMEOUT)));
326-
} else if (requiredReplicas == ALL_REPLICAS) {
327-
TraceEvent(SevWarnAlways, "UnreachableStorageServer")
328-
.detail("SSID", ssTeam->getInterface(i).id());
329-
throw unreachable_storage_replica();
330-
}
319+
if (si->getEndpoint().token.first() == srcEndpointId) {
320+
// Don't re-request to SS we already have a response from
321+
continue;
322+
}
323+
if (!IFailureMonitor::failureMonitor().getState(si->getEndpoint()).failed) {
324+
candidates.push_back(si->getEndpoint().token.first());
325+
} else if (requiredReplicas == ALL_REPLICAS) {
326+
TraceEvent(SevWarnAlways, "UnreachableStorageServer").detail("SSID", ssTeam->getInterface(i).id());
327+
throw unreachable_storage_replica();
331328
}
332329
}
330+
int numReplicaToRead = candidates.size();
331+
if (requiredReplicas != BEST_EFFORT && requiredReplicas != ALL_REPLICAS) {
332+
ASSERT(requiredReplicas > 0);
333+
numReplicaToRead = std::min((int)candidates.size(), requiredReplicas);
334+
if (FLOW_KNOBS->ENABLE_WARNING_READ_CONSISTENCY_CHECK_NOT_ENOUGH_REPLICA &&
335+
candidates.size() < requiredReplicas) {
336+
TraceEvent(SevWarn, "ReplicaConsistencyCheckNotEnoughReplica")
337+
.suppressFor(5.0)
338+
.detail("RequiredReplicas", requiredReplicas)
339+
.detail("AvailableReplicas", candidates.size());
340+
}
341+
}
342+
state std::vector<Future<Optional<ErrorOr<Resp>>>> restOfTeamFutures;
343+
restOfTeamFutures.reserve(numReplicaToRead);
344+
// Randomly select numReplicaToRead SSes to read from
345+
deterministicRandom()->randomShuffle(candidates);
346+
candidates.erase(candidates.begin() + numReplicaToRead, candidates.end());
347+
std::unordered_set<uint64_t> ssToRead(candidates.begin(), candidates.end());
333348

334-
if (requiredReplicas == BEST_EFFORT || requiredReplicas == ALL_REPLICAS) {
335-
wait(waitForAllReady(restOfTeamFutures));
336-
} else {
337-
wait(waitForQuorumReplies(&restOfTeamFutures, requiredReplicas));
349+
for (int i = 0; i < ssTeam->size(); i++) {
350+
RequestStream<Req, P> const* si = &ssTeam->get(i, channel);
351+
if (!ssToRead.contains(si->getEndpoint().token.first())) {
352+
// Only send requests to the SSes that we randomly selected
353+
continue;
354+
}
355+
resetReply(req);
356+
restOfTeamFutures.push_back(
357+
(requiredReplicas == BEST_EFFORT
358+
? timeout(si->tryGetReply(req), FLOW_KNOBS->LOAD_BALANCE_FETCH_REPLICA_TIMEOUT)
359+
: timeout(errorOr(si->getReply(req)), FLOW_KNOBS->LOAD_BALANCE_FETCH_REPLICA_TIMEOUT)));
338360
}
339361

362+
wait(waitForAllReady(restOfTeamFutures));
363+
340364
int numError = 0;
341365
int numMismatch = 0;
342366
int numFetchReplicaTimeout = 0;
@@ -480,16 +504,31 @@ struct RequestData : NonCopyable {
480504
int requiredReplicas) {
481505
if (model && (compareReplicas || FLOW_KNOBS->ENABLE_REPLICA_CONSISTENCY_CHECK_ON_READS)) {
482506
ASSERT(requestStream != nullptr);
483-
int requiredReplicaCount =
484-
compareReplicas ? requiredReplicas : FLOW_KNOBS->CONSISTENCY_CHECK_REQUIRED_REPLICAS;
507+
if (compareReplicas) {
508+
// In case compareReplicas == true, we may read extra requiredReplicas replica.
509+
// The value of compareReplicas is decided by the caller and the knobs.
510+
// If the caller is fetchKeys, when ENABLE_REPLICA_CONSISTENCY_CHECK_ON_DATA_MOVEMENT is on,
511+
// the value is DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS.
512+
// If the caller is backup agents, when ENABLE_REPLICA_CONSISTENCY_CHECK_ON_BACKUP_READS is on,
513+
// the value is BACKUP_CONSISTENCY_CHECK_REQUIRED_REPLICAS.
514+
// Otherwise, the value is 0.
515+
return replicaComparison(request,
516+
response,
517+
requestStream->getEndpoint().token.first(),
518+
alternatives,
519+
channel,
520+
requiredReplicas);
521+
}
522+
// In case ENABLE_REPLICA_CONSISTENCY_CHECK_ON_READS is on, we read extra
523+
// READ_CONSISTENCY_CHECK_REQUIRED_REPLICAS replica and conduct consistency
524+
// check among replica for any read request.
485525
return replicaComparison(request,
486526
response,
487527
requestStream->getEndpoint().token.first(),
488528
alternatives,
489529
channel,
490-
requiredReplicaCount);
530+
FLOW_KNOBS->READ_CONSISTENCY_CHECK_REQUIRED_REPLICAS);
491531
}
492-
493532
return Void();
494533
}
495534

fdbserver/DDRelocationQueue.actor.cpp

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -428,25 +428,55 @@ std::string Busyness::toString() {
428428
return result;
429429
}
430430

431+
double adjustRelocationParallelismForSrc(double srcParallelism) {
432+
double res = srcParallelism;
433+
if (SERVER_KNOBS->ENABLE_CONSERVATIVE_RELOCATION_WHEN_REPLICA_CONSISTENCY_CHECK &&
434+
SERVER_KNOBS->ENABLE_REPLICA_CONSISTENCY_CHECK_ON_DATA_MOVEMENT &&
435+
srcParallelism >= 1.0 + SERVER_KNOBS->DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS) {
436+
// DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS is the number of extra replicas that the destination
437+
// servers will read from the source team.
438+
res = res / (1.0 + SERVER_KNOBS->DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS);
439+
}
440+
ASSERT(res > 0);
441+
return res;
442+
}
443+
431444
// find the "workFactor" for this, were it launched now
432445
int getSrcWorkFactor(RelocateData const& relocation, int singleRegionTeamSize) {
446+
// RELOCATION_PARALLELISM_PER_SOURCE_SERVER is the number of concurrent replications that can be launched on a
447+
// single storage server at a time, given the team size is 1 --- only this storage server is available to serve
448+
// fetchKey read requests from the dest team.
449+
// The real parallelism is adjusted by the number of source servers of a source team that can serve
450+
// fetchKey requests.
451+
// When ENABLE_REPLICA_CONSISTENCY_CHECK_ON_DATA_MOVEMENT is enabled, the fetchKeys on
452+
// destination servers will read from DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS + 1 replicas from the source team
453+
// (suppose the team size is large enough). As a result it is possible that the source team can be overloaded by the
454+
// fetchKey read requests. This is especially true when the shard split data movements are launched. So, we
455+
// introduce ENABLE_CONSERVATIVE_RELOCATION_WHEN_REPLICA_CONSISTENCY_CHECK knob to adjust the relocation parallelism
456+
// accordingly. The adjustment is to reduce the relocation parallelism by a factor of
457+
// (1 + DATAMOVE_CONSISTENCY_CHECK_REQUIRED_REPLICAS).
433458
if (relocation.bulkLoadTask.present())
434459
return 0;
435460
else if (relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT ||
436461
relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT)
437-
return WORK_FULL_UTILIZATION / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
462+
return WORK_FULL_UTILIZATION /
463+
adjustRelocationParallelismForSrc(SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER);
438464
else if (relocation.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT)
439-
return WORK_FULL_UTILIZATION / 2 / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
465+
return WORK_FULL_UTILIZATION /
466+
adjustRelocationParallelismForSrc(2 * SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER);
440467
else if (relocation.healthPriority == SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE)
441468
// we want to set PRIORITY_PERPETUAL_STORAGE_WIGGLE to a reasonably large value
442469
// to make this parallelism take effect
443-
return WORK_FULL_UTILIZATION / SERVER_KNOBS->WIGGLING_RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
470+
return WORK_FULL_UTILIZATION /
471+
adjustRelocationParallelismForSrc(SERVER_KNOBS->WIGGLING_RELOCATION_PARALLELISM_PER_SOURCE_SERVER);
444472
else if (relocation.priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD)
445-
return WORK_FULL_UTILIZATION / SERVER_KNOBS->MERGE_RELOCATION_PARALLELISM_PER_TEAM;
473+
return WORK_FULL_UTILIZATION /
474+
adjustRelocationParallelismForSrc(SERVER_KNOBS->MERGE_RELOCATION_PARALLELISM_PER_TEAM);
446475
else { // for now we assume that any message at a lower priority can best be assumed to have a full team left for
447476
// work
448-
449-
return WORK_FULL_UTILIZATION / singleRegionTeamSize / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
477+
return WORK_FULL_UTILIZATION /
478+
adjustRelocationParallelismForSrc(singleRegionTeamSize *
479+
SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER);
450480
}
451481
}
452482

fdbserver/StorageMetrics.actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ CommonStorageCounters::CommonStorageCounters(const std::string& name,
2828
: cc(name, id), finishedQueries("FinishedQueries", cc), bytesQueried("BytesQueried", cc),
2929
bytesFetched("BytesFetched", cc), bytesInput("BytesInput", cc), mutationBytes("MutationBytes", cc),
3030
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
31-
clearRangeMutations("ClearRangeMutations", cc) {
31+
clearRangeMutations("ClearRangeMutations", cc), kvFetchServed("KVFetchServed", cc),
32+
kvFetchBytesServed("KVFetchBytesServed", cc), fetchKeyErrors("FetchKeyErrors", cc) {
3233
if (metrics) {
3334
specialCounter(cc, "BytesStored", [metrics]() { return metrics->byteSample.getEstimate(allKeys); });
3435
specialCounter(cc, "BytesReadSampleCount", [metrics]() { return metrics->bytesReadSample.queue.size(); });

0 commit comments

Comments
 (0)