Skip to content

Commit 9328b83

Browse files
committed
Fix topology loading
- in burn test, use validateShardStateForTesting to ensure that latest topology update matches the shard state, which simulates book - add an initializeTopologyUnsafe for an implementation to use to _create_ shards during boot so that shards could be initialized without replaying all preceding topologies Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20294
1 parent d9fd50a commit 9328b83

File tree

4 files changed

+56
-47
lines changed

4 files changed

+56
-47
lines changed

accord-core/src/main/java/accord/api/Journal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public interface Journal
4747
// TODO (required): use OnDone instead of Runnable
4848
void saveCommand(int store, CommandUpdate value, Runnable onFlush);
4949

50-
Iterator<TopologyUpdate> replayTopologies(); // reverse iterator
50+
Iterator<TopologyUpdate> replayTopologies();
5151
void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);
5252

5353
void purge(CommandStores commandStores);

accord-core/src/main/java/accord/local/CommandStores.java

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.Arrays;
23-
import java.util.Iterator;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Objects;
@@ -133,7 +132,7 @@ protected static class ShardHolder
133132
this.store = store;
134133
}
135134

136-
private ShardHolder(CommandStore store, RangesForEpoch ranges)
135+
public ShardHolder(CommandStore store, RangesForEpoch ranges)
137136
{
138137
this.store = store;
139138
this.ranges = ranges;
@@ -144,7 +143,7 @@ public ShardHolder withStoreUnsafe(CommandStore store)
144143
return new ShardHolder(store, ranges);
145144
}
146145

147-
RangesForEpoch ranges()
146+
public RangesForEpoch ranges()
148147
{
149148
return ranges;
150149
}
@@ -381,43 +380,6 @@ public Ranges removed(long presentIn, long removedByInclusive)
381380
}
382381
}
383382

384-
// This method should only be used on node startup.
385-
// "Unsafe" because it relies on user to synchronise and sequence the call properly.
386-
public void restoreShardStateUnsafe(Consumer<Topology> reportTopology)
387-
{
388-
Iterator<Journal.TopologyUpdate> iter = journal.replayTopologies();
389-
// First boot
390-
if (!iter.hasNext())
391-
return;
392-
393-
Journal.TopologyUpdate lastUpdate = null;
394-
while (iter.hasNext())
395-
{
396-
Journal.TopologyUpdate update = iter.next();
397-
reportTopology.accept(update.global);
398-
if (lastUpdate == null || update.global.epoch() > lastUpdate.global.epoch())
399-
lastUpdate = update;
400-
}
401-
402-
ShardHolder[] shards = new ShardHolder[lastUpdate.commandStores.size()];
403-
int i = 0;
404-
for (Map.Entry<Integer, RangesForEpoch> e : lastUpdate.commandStores.entrySet())
405-
{
406-
RangesForEpoch ranges = e.getValue();
407-
CommandStore commandStore = null;
408-
for (ShardHolder shard : current.shards)
409-
{
410-
if (shard.ranges.equals(ranges))
411-
commandStore = shard.store;
412-
}
413-
Invariants.nonNull(commandStore, "Command store should have been reloaded").restore();
414-
ShardHolder shard = new ShardHolder(commandStore, e.getValue());
415-
shards[i++] = shard;
416-
}
417-
418-
loadSnapshot(new Snapshot(shards, lastUpdate.local, lastUpdate.global));
419-
}
420-
421383
protected void loadSnapshot(Snapshot toLoad)
422384
{
423385
current = toLoad;
@@ -428,7 +390,7 @@ protected static class Snapshot extends Journal.TopologyUpdate
428390
public final ShardHolder[] shards;
429391
public final Int2ObjectHashMap<CommandStore> byId;
430392

431-
Snapshot(ShardHolder[] shards, Topology local, Topology global)
393+
public Snapshot(ShardHolder[] shards, Topology local, Topology global)
432394
{
433395
super(asMap(shards), local, global);
434396
this.shards = shards;
@@ -771,6 +733,20 @@ protected <O> AsyncChain<O> mapReduce(PreLoadContext context, MapReduce<? super
771733
return chain == null ? AsyncChains.success(null) : chain;
772734
}
773735

736+
/**
737+
* Initialize topology from snapshot on boot.
738+
*/
739+
public synchronized void initializeTopologyUnsafe(Journal.TopologyUpdate update)
740+
{
741+
Invariants.require(current.global.epoch() == 0);
742+
ShardHolder[] shards = new ShardHolder[update.commandStores.size()];
743+
int i = 0;
744+
for (Map.Entry<Integer, RangesForEpoch> e : update.commandStores.entrySet())
745+
shards[i++] = new ShardHolder(supplier.create(e.getKey(), new EpochUpdateHolder()), e.getValue());
746+
747+
loadSnapshot(new Snapshot(shards, update.local, update.global));
748+
}
749+
774750
public synchronized Supplier<EpochReady> updateTopology(Node node, Topology newTopology, boolean startSync)
775751
{
776752
TopologyUpdate update = updateTopology(node, current, newTopology, startSync);

accord-core/src/test/java/accord/impl/basic/Cluster.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -764,15 +764,23 @@ public static Map<MessageType, Stats> run(Id[] nodes, int[] prefixes, MessageLis
764764
Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = copyCommands(stores.all());
765765

766766
for (CommandStore store : stores.all())
767-
{
768767
((InMemoryCommandStore) store).clearForTesting();
769-
}
770-
// Re-create all command stores
771-
nodeMap.get(id).commandStores().restoreShardStateUnsafe(t -> {});
772-
stores = nodeMap.get(id).commandStores();
773768

774769
// Replay journal
775770
Journal journal = journalMap.get(id);
771+
Iterator<Journal.TopologyUpdate> iter = journal.replayTopologies();
772+
Journal.TopologyUpdate lastUpdate = null;
773+
while (iter.hasNext())
774+
{
775+
Journal.TopologyUpdate update = iter.next();
776+
Invariants.require(lastUpdate == null || update.global.epoch() > lastUpdate.global.epoch());
777+
lastUpdate = update;
778+
}
779+
780+
if (lastUpdate != null)
781+
((DelayedCommandStores) nodeMap.get(id).commandStores()).validateShardStateForTesting(lastUpdate);
782+
783+
stores = nodeMap.get(id).commandStores();
776784
journal.replay(stores);
777785

778786
// Re-enable safety checks

accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package accord.impl.basic;
2020

2121
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.LinkedList;
2324
import java.util.List;
2425
import java.util.Map;
@@ -93,6 +94,30 @@ protected boolean shouldBootstrap(Node node, Topology previous, Topology updated
9394
return contains(previous, prefix);
9495
}
9596

97+
public void validateShardStateForTesting(Journal.TopologyUpdate lastUpdate)
98+
{
99+
ShardHolder[] shards = new ShardHolder[lastUpdate.commandStores.size()];
100+
int i = 0;
101+
for (Map.Entry<Integer, RangesForEpoch> e : lastUpdate.commandStores.entrySet())
102+
{
103+
Snapshot current = current();
104+
RangesForEpoch ranges = e.getValue();
105+
CommandStore commandStore = null;
106+
for (ShardHolder shard : current.shards)
107+
{
108+
if (shard.ranges().equals(ranges))
109+
commandStore = shard.store;
110+
}
111+
Invariants.nonNull(commandStore, "Each set of ranges should have a corresponding command store, but %d did not:(%s)",
112+
ranges, Arrays.toString(shards))
113+
.restore();
114+
ShardHolder shard = new ShardHolder(commandStore, e.getValue());
115+
shards[i++] = shard;
116+
}
117+
118+
loadSnapshot(new Snapshot(shards, lastUpdate.local, lastUpdate.global));
119+
}
120+
96121
protected void loadSnapshot(Snapshot nextSnapshot)
97122
{
98123
Snapshot current = current();

0 commit comments

Comments
 (0)