Skip to content

Extensions to the peek logic to get it to work correctly with version vector/unicast #12281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions fdbserver/LogSystemPeekCursor.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,43 @@ static void resetBestServerIfNotAvailable(
}
}

static bool canReturnEmptyVersionRange(
int bestServer,
int currentServer,
Version end,
const Optional<std::vector<uint16_t>>& knownLockedTLogIds = Optional<std::vector<uint16_t>>(),
Optional<int> bestSet = Optional<int>(),
Optional<int> currentSet = Optional<int>()) {
ASSERT(SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST);
bool returnEmptyIfStopped = false;
if ((!bestSet.present() || bestSet.get() >= 0) && end != std::numeric_limits<Version>::max()) {
if (bestServer >= 0) {
if ((!bestSet.present() || bestSet.get() == currentSet) && currentServer == bestServer) {
if (knownLockedTLogIds.present()) {
ASSERT(std::find(knownLockedTLogIds.get().begin(), knownLockedTLogIds.get().end(), currentServer) !=
knownLockedTLogIds.get().end());
}
// The best server (that is available and is known to have been locked) can return
// an empty version range.
returnEmptyIfStopped = true;
}
} else {
if (knownLockedTLogIds.present()) {
if (std::find(knownLockedTLogIds.get().begin(), knownLockedTLogIds.get().end(), currentServer) !=
knownLockedTLogIds.get().end()) {
// A non-buddy server (that is available and is known to have been locked) can
// return an empty version range.
returnEmptyIfStopped = true;
}
} else {
// The current server belongs to an old epoch, hence can return an empty version range.
returnEmptyIfStopped = true;
}
}
}
return returnEmptyIfStopped;
}

ILogSystem::MergedPeekCursor::MergedPeekCursor(std::vector<Reference<ILogSystem::IPeekCursor>> const& serverCursors,
Version begin)
: serverCursors(serverCursors), tag(invalidTag), bestServer(-1), currentCursor(0), readQuorum(serverCursors.size()),
Expand All @@ -603,7 +640,8 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor(
bool parallelGetMore,
std::vector<LocalityData> const& tLogLocalities,
Reference<IReplicationPolicy> const tLogPolicy,
int tLogReplicationFactor)
int tLogReplicationFactor,
const Optional<std::vector<uint16_t>>& knownLockedTLogIds)
: tag(tag), bestServer(bestServerLogId), currentCursor(0), readQuorum(readQuorum), messageVersion(begin),
hasNextMessage(false), randomID(deterministicRandom()->randomUniqueID()),
tLogReplicationFactor(tLogReplicationFactor) {
Expand All @@ -622,7 +660,7 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor(
for (int i = 0; i < logServers.size(); i++) {
bool returnEmptyIfStopped =
((SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && end != std::numeric_limits<Version>::max())
? (bestServer < 0 || (i == bestServer))
? canReturnEmptyVersionRange(bestServer, i /*currentServer*/, end, knownLockedTLogIds)
: false);
auto cursor = makeReference<ILogSystem::ServerPeekCursor>(
logServers[i], tag, begin, end, bestServer >= 0, parallelGetMore, returnEmptyIfStopped);
Expand Down Expand Up @@ -883,7 +921,8 @@ ILogSystem::SetPeekCursor::SetPeekCursor(std::vector<Reference<LogSet>> const& l
Tag tag,
Version begin,
Version end,
bool parallelGetMore)
bool parallelGetMore,
const Optional<std::vector<uint16_t>>& knownLockedTLogIds)
: logSets(logSets), tag(tag), bestSet(bestSet), bestServer(bestServerLogId), currentSet(bestSet), currentCursor(0),
messageVersion(begin), hasNextMessage(false), useBestSet(true), randomID(deterministicRandom()->randomUniqueID()),
end(end) {
Expand All @@ -895,8 +934,10 @@ ILogSystem::SetPeekCursor::SetPeekCursor(std::vector<Reference<LogSet>> const& l
for (int i = 0; i < logSets.size(); i++) {
for (int j = 0; j < logSets[i]->logServers.size(); j++) {
bool returnEmptyIfStopped =
((SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && end != std::numeric_limits<Version>::max())
? (bestSet < 0 || bestServer < 0 || (i == bestSet && j == bestServer))
((SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && bestSet >= 0 &&
end != std::numeric_limits<Version>::max())
? canReturnEmptyVersionRange(
bestServer, j /*currentServer*/, end, knownLockedTLogIds, bestSet, i /* currentSet */)
: false);
auto cursor = makeReference<ILogSystem::ServerPeekCursor>(
logSets[i]->logServers[j], tag, begin, end, true, parallelGetMore, returnEmptyIfStopped);
Expand Down Expand Up @@ -1051,7 +1092,7 @@ void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) {
c->advanceTo(messageVersion);
if (start <= messageVersion && messageVersion < c->version()) {
advancedPast = true;
CODE_PROBE(true, "Merge peek cursor with logIdx advanced past desired sequence");
CODE_PROBE(true, "Set peek cursor with logIdx advanced past desired sequence");
}
}
}
Expand Down
56 changes: 41 additions & 15 deletions fdbserver/TagPartitionedLogSystem.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,14 +687,11 @@ Future<Version> TagPartitionedLogSystem::push(const ILogSystem::PushVersionSet&
return minVersionWhenReady(waitForAll(quorumResults), allReplies);
}

// Version vector/unicast specific: Logic to get the peeking to work correctly during recovery.
// If the best server is not known to have been locked/stopped then is not guaranteed to have
// received all versions that are relevant to a tag(s) that it is buddy of, hence do not treat
// such a server as the best server. This is so the peek logic will not peek exclusively from
// this server, and hence will correctly fetch all versions that are relevant to the tag(s) that
// it is buddy of. Note that this reset logic get invoked only in the context of the peeks that
// get done during recovery, and the best server should always be available for peeking after
// recovery is done.
// Version vector/unicast specific: If the best server is not known to have been locked/stopped
// then is not guaranteed to have received all versions that are relevant to a tag(s) that it is
// buddy of, hence do not treat such a server as the best server. Note that this reset logic gets
// invoked only in the context of peeks that get done during recovery, and the best server should
// always be available for peeking after recovery is done.
void TagPartitionedLogSystem::resetBestServerIfNotLocked(
int bestSet,
int& bestServer,
Expand Down Expand Up @@ -752,11 +749,14 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekAll(UID dbgid,
.detail("End", end)
.detail("BestLogs", localSets[bestSet]->logServerString());
int bestServer = localSets[bestSet]->bestLocationFor(tag);
Optional<std::vector<uint16_t>> bestKnownLockedTLogIds;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
resetBestServerIfNotLocked(bestSetIdx, bestServer, end, knownLockedTLogIds);
ASSERT_WE_THINK(knownLockedTLogIds.contains(bestSetIdx));
bestKnownLockedTLogIds = knownLockedTLogIds[bestSetIdx];
}
return makeReference<ILogSystem::SetPeekCursor>(
localSets, bestSet, bestServer, tag, begin, end, parallelGetMore);
localSets, bestSet, bestServer, tag, begin, end, parallelGetMore, bestKnownLockedTLogIds);
} else {
std::vector<Reference<ILogSystem::IPeekCursor>> cursors;
std::vector<LogMessageVersion> epochEnds;
Expand All @@ -767,8 +767,15 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekAll(UID dbgid,
.detail("Begin", begin)
.detail("End", end)
.detail("BestLogs", localSets[bestSet]->logServerString());
int bestServer = localSets[bestSet]->bestLocationFor(tag);
Optional<std::vector<uint16_t>> bestKnownLockedTLogIds;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
resetBestServerIfNotLocked(bestSetIdx, bestServer, end, knownLockedTLogIds);
ASSERT_WE_THINK(knownLockedTLogIds.contains(bestSetIdx));
bestKnownLockedTLogIds = knownLockedTLogIds[bestSetIdx];
}
cursors.push_back(makeReference<ILogSystem::SetPeekCursor>(
localSets, bestSet, localSets[bestSet]->bestLocationFor(tag), tag, lastBegin, end, parallelGetMore));
localSets, bestSet, bestServer, tag, lastBegin, end, parallelGetMore, bestKnownLockedTLogIds));
}
for (int i = 0; begin < lastBegin; i++) {
if (i == oldLogData.size()) {
Expand Down Expand Up @@ -1053,8 +1060,15 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
.detail("BestSetStart", tLogs[bestSet]->startVersion)
.detail("LogId", tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor(tag)]->get().id());
if (useMergePeekCursors) {
int bestServer = tLogs[bestSet]->bestLocationFor(tag);
Optional<std::vector<uint16_t>> bestKnownLockedTLogIds;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
resetBestServerIfNotLocked(bestSet, bestServer, end, knownLockedTLogIds);
ASSERT_WE_THINK(knownLockedTLogIds.contains(bestSet));
bestKnownLockedTLogIds = knownLockedTLogIds[bestSet];
}
return makeReference<ILogSystem::MergedPeekCursor>(tLogs[bestSet]->logServers,
tLogs[bestSet]->bestLocationFor(tag),
bestServer,
tLogs[bestSet]->logServers.size() + 1 -
tLogs[bestSet]->tLogReplicationFactor,
tag,
Expand All @@ -1063,7 +1077,8 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
true,
tLogs[bestSet]->tLogLocalities,
tLogs[bestSet]->tLogPolicy,
tLogs[bestSet]->tLogReplicationFactor);
tLogs[bestSet]->tLogReplicationFactor,
bestKnownLockedTLogIds);
} else {
return makeReference<ILogSystem::ServerPeekCursor>(
tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor(tag)], tag, begin, end, false, false);
Expand All @@ -1081,8 +1096,15 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
.detail("BestSetStart", tLogs[bestSet]->startVersion)
.detail("LogId", tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor(tag)]->get().id());
if (useMergePeekCursors) {
int bestServer = tLogs[bestSet]->bestLocationFor(tag);
Optional<std::vector<uint16_t>> bestKnownLockedTLogIds;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
resetBestServerIfNotLocked(bestSet, bestServer, end, knownLockedTLogIds);
ASSERT_WE_THINK(knownLockedTLogIds.contains(bestSet));
bestKnownLockedTLogIds = knownLockedTLogIds[bestSet];
}
cursors.push_back(makeReference<ILogSystem::MergedPeekCursor>(tLogs[bestSet]->logServers,
tLogs[bestSet]->bestLocationFor(tag),
bestServer,
tLogs[bestSet]->logServers.size() + 1 -
tLogs[bestSet]->tLogReplicationFactor,
tag,
Expand All @@ -1091,7 +1113,8 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLocal(UID dbgid,
true,
tLogs[bestSet]->tLogLocalities,
tLogs[bestSet]->tLogPolicy,
tLogs[bestSet]->tLogReplicationFactor));
tLogs[bestSet]->tLogReplicationFactor,
bestKnownLockedTLogIds));
} else {
cursors.push_back(makeReference<ILogSystem::ServerPeekCursor>(
tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor(tag)],
Expand Down Expand Up @@ -1367,15 +1390,18 @@ Reference<ILogSystem::IPeekCursor> TagPartitionedLogSystem::peekLogRouter(
}

int bestServer = localSets[bestSet]->bestLocationFor(tag);
Optional<std::vector<uint16_t>> bestKnownStoppedTLogIds;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
resetBestServerIfNotLocked(bestSetIdx, bestServer, end, knownStoppedTLogIds);
ASSERT_WE_THINK(knownStoppedTLogIds.get().contains(bestSetIdx));
bestKnownStoppedTLogIds = knownStoppedTLogIds.get().at(bestSetIdx);
}

TraceEvent("TLogPeekLogRouterSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
// FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies
// across the WAN
return makeReference<ILogSystem::SetPeekCursor>(
localSets, bestSet, bestServer, tag, begin, end.get(), true);
localSets, bestSet, bestServer, tag, begin, end.get(), true, bestKnownStoppedTLogIds);
} else {
int bestPrimarySet = -1;
int bestSatelliteSet = -1;
Expand Down
6 changes: 4 additions & 2 deletions fdbserver/include/fdbserver/LogSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ struct ILogSystem {
bool parallelGetMore,
std::vector<LocalityData> const& tLogLocalities,
Reference<IReplicationPolicy> const tLogPolicy,
int tLogReplicationFactor);
int tLogReplicationFactor,
const Optional<std::vector<uint16_t>>& knownLockedTLogIds = Optional<std::vector<uint16_t>>());
MergedPeekCursor(std::vector<Reference<IPeekCursor>> const& serverCursors,
LogMessageVersion const& messageVersion,
int bestServer,
Expand Down Expand Up @@ -355,7 +356,8 @@ struct ILogSystem {
Tag tag,
Version begin,
Version end,
bool parallelGetMore);
bool parallelGetMore,
const Optional<std::vector<uint16_t>>& knownLockedTLogIds = Optional<std::vector<uint16_t>>());
SetPeekCursor(std::vector<Reference<LogSet>> const& logSets,
std::vector<std::vector<Reference<IPeekCursor>>> const& serverCursors,
LogMessageVersion const& messageVersion,
Expand Down