@@ -62,6 +62,25 @@ bool shouldCreateCheckpoint(const UID& dataMoveId) {
62
62
return (type == DataMoveType::PHYSICAL || type == DataMoveType::PHYSICAL_EXP);
63
63
}
64
64
65
+ std::unordered_map<std::string, std::string> generateTeamIds (
66
+ std::unordered_map<std::string, std::vector<std::string>>& dcServerIds) {
67
+ std::unordered_map<std::string, std::string> dcTeamIds;
68
+ for (auto & [dc, serverIds] : dcServerIds) {
69
+ std::sort (serverIds.begin (), serverIds.end ());
70
+ std::string teamId;
71
+ for (const auto & serverId : serverIds) {
72
+ if (teamId.size () == 0 ) {
73
+ teamId = serverId;
74
+ } else {
75
+ teamId += " ," + serverId;
76
+ }
77
+ }
78
+ // Use the concatenated server ids as the team id to avoid conflicts.
79
+ dcTeamIds[dc] = teamId;
80
+ }
81
+ return dcTeamIds;
82
+ }
83
+
65
84
// Unassigns keyrange `range` from server `ssId`, except ranges in `shards`.
66
85
// Note: krmSetRangeCoalescing() doesn't work in this case since each shard is assigned an ID.
67
86
ACTOR Future<Void> unassignServerKeys (Transaction* tr, UID ssId, KeyRange range, std::vector<Shard> shards, UID logId) {
@@ -1712,7 +1731,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
1712
1731
serverListEntries.push_back (tr.get (serverListKeyFor (servers[s])));
1713
1732
}
1714
1733
std::vector<Optional<Value>> serverListValues = wait (getAll (serverListEntries));
1715
-
1734
+ state std::unordered_map<std::string, std::vector<std::string>> dcServers;
1716
1735
for (int s = 0 ; s < serverListValues.size (); s++) {
1717
1736
if (!serverListValues[s].present ()) {
1718
1737
// Attempt to move onto a server that isn't in serverList (removed or never added to the
@@ -1721,6 +1740,13 @@ ACTOR static Future<Void> startMoveShards(Database occ,
1721
1740
// TODO(psm): Mark the data move as 'deleting'.
1722
1741
throw move_to_removed_server ();
1723
1742
}
1743
+ auto si = decodeServerListValue (serverListValues[s].get ());
1744
+ ASSERT (si.id () == servers[s]);
1745
+ auto it = dcServers.find (si.locality .describeDcId ());
1746
+ if (it == dcServers.end ()) {
1747
+ dcServers[si.locality .describeDcId ()] = std::vector<std::string>();
1748
+ }
1749
+ dcServers[si.locality .describeDcId ()].push_back (si.id ().shortString ());
1724
1750
}
1725
1751
1726
1752
currentKeys = KeyRangeRef (begin, keys.end );
@@ -1733,6 +1759,15 @@ ACTOR static Future<Void> startMoveShards(Database occ,
1733
1759
state Key endKey = old.back ().key ;
1734
1760
currentKeys = KeyRangeRef (currentKeys.begin , endKey);
1735
1761
1762
+ if (ranges.front () != currentKeys) {
1763
+ TraceEvent (" MoveShardsPartialRange" )
1764
+ .detail (" ExpectedRange" , ranges.front ())
1765
+ .detail (" ActualRange" , currentKeys)
1766
+ .detail (" DataMoveId" , dataMoveId)
1767
+ .detail (" RowLimit" , SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT )
1768
+ .detail (" ByteLimit" , SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT );
1769
+ }
1770
+
1736
1771
// Check that enough servers for each shard are in the correct state
1737
1772
state RangeResult UIDtoTagMap = wait (tr.getRange (serverTagKeys, CLIENT_KNOBS->TOO_MANY ));
1738
1773
ASSERT (!UIDtoTagMap.more && UIDtoTagMap.size () < CLIENT_KNOBS->TOO_MANY );
@@ -1806,6 +1841,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
1806
1841
TraceEvent (
1807
1842
SevWarn, " StartMoveShardsCancelConflictingDataMove" , relocationIntervalId)
1808
1843
.detail (" Range" , rangeIntersectKeys)
1844
+ .detail (" CurrentDataMoveRange" , ranges[0 ])
1809
1845
.detail (" DataMoveID" , dataMoveId.toString ())
1810
1846
.detail (" ExistingDataMoveID" , destId.toString ());
1811
1847
wait (cleanUpDataMove (occ, destId, lock, startMoveKeysLock, keys, ddEnabledState));
@@ -1868,6 +1904,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
1868
1904
dataMove.ranges .clear ();
1869
1905
dataMove.ranges .push_back (KeyRangeRef (keys.begin , currentKeys.end ));
1870
1906
dataMove.dest .insert (servers.begin (), servers.end ());
1907
+ dataMove.dcTeamIds = generateTeamIds (dcServers);
1871
1908
}
1872
1909
1873
1910
if (currentKeys.end == keys.end ) {
@@ -3348,6 +3385,8 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
3348
3385
std::map<Optional<Value>, Tag> dcId_locality;
3349
3386
std::map<UID, Tag> server_tag;
3350
3387
int8_t nextLocality = 0 ;
3388
+ std::unordered_map<std::string, std::vector<std::string>> dcServerIds;
3389
+
3351
3390
for (auto & s : servers) {
3352
3391
if (!dcId_locality.contains (s.locality .dcId ())) {
3353
3392
tr.set (arena, tagLocalityListKeyFor (s.locality .dcId ()), tagLocalityListValue (nextLocality));
@@ -3357,6 +3396,8 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
3357
3396
Tag& t = dcId_locality[s.locality .dcId ()];
3358
3397
server_tag[s.id ()] = Tag (t.locality , t.id );
3359
3398
t.id ++;
3399
+
3400
+ dcServerIds[s.locality .describeDcId ()].push_back (s.id ().shortString ());
3360
3401
}
3361
3402
std::sort (servers.begin (), servers.end ());
3362
3403
@@ -3403,11 +3444,16 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
3403
3444
UnassignShard (false ));
3404
3445
ksValue = keyServersValue (serverSrcUID, /* dest=*/ std::vector<UID>(), shardId, UID ());
3405
3446
krmSetPreviouslyEmptyRange (tr, arena, keyServersPrefix, KeyRangeRef (KeyRef (), allKeys.end ), ksValue, Value ());
3406
-
3407
3447
for (auto & s : servers) {
3408
3448
krmSetPreviouslyEmptyRange (
3409
3449
tr, arena, serverKeysPrefixFor (s.id ()), allKeys, serverKeysValue (shardId), serverKeysFalse);
3410
3450
}
3451
+
3452
+ DataMoveMetaData metadata{ shardId, allKeys };
3453
+ metadata.dcTeamIds = generateTeamIds (dcServerIds);
3454
+
3455
+ // Data move metadata will be clean up on DD restarts.
3456
+ tr.set (arena, dataMoveKeyFor (shardId), dataMoveValue (metadata));
3411
3457
} else {
3412
3458
krmSetPreviouslyEmptyRange (tr, arena, keyServersPrefix, KeyRangeRef (KeyRef (), allKeys.end ), ksValue, Value ());
3413
3459
0 commit comments