Skip to content

Commit 4a8566a

Browse files
committed
More follow-up to CASSANDRA-19967 and CASSANDRA-19869
1 parent 4cf0070 commit 4a8566a

26 files changed

+322
-246
lines changed

accord-core/src/main/java/accord/impl/InMemoryCommandStore.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@
5959
import accord.local.CommandStore;
6060
import accord.local.CommandStores.RangesForEpoch;
6161
import accord.local.Commands;
62+
import accord.local.DurableBefore;
6263
import accord.local.KeyHistory;
6364
import accord.local.Node;
6465
import accord.local.NodeTimeService;
6566
import accord.local.PreLoadContext;
67+
import accord.local.RedundantBefore;
6668
import accord.local.RedundantStatus;
6769
import accord.local.RejectBefore;
6870
import accord.local.SafeCommand;
@@ -397,7 +399,7 @@ public void run()
397399
boolean done = command.hasBeen(Truncated);
398400
if (!done)
399401
{
400-
if (redundantBefore().status(txnId, command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
402+
if (unsafeGetRedundantBefore().status(txnId, command.route()) == RedundantStatus.PRE_BOOTSTRAP_OR_STALE)
401403
return;
402404

403405
Route<?> route = command.route().slice(allRanges);
@@ -759,7 +761,7 @@ protected void update(Command prev, Command updated)
759761
return;
760762

761763
Ranges slice = ranges(txnId, updated.executeAtOrTxnId());
762-
slice = commandStore.redundantBefore().removeShardRedundant(txnId, updated.executeAtOrTxnId(), slice);
764+
slice = commandStore.unsafeGetRedundantBefore().removeShardRedundant(txnId, updated.executeAtOrTxnId(), slice);
763765
commandStore.rangeCommands.computeIfAbsent(txnId, ignore -> new RangeCommand(commandStore.commands.get(txnId)))
764766
.update(((AbstractRanges)updated.participants().touches()).toRanges().slice(slice, Minimal));
765767
}
@@ -801,6 +803,18 @@ public void setRangesForEpoch(RangesForEpoch rangesForEpoch)
801803
ranges = rangesForEpoch;
802804
}
803805

806+
@Override
807+
public void upsertRedundantBefore(RedundantBefore addRedundantBefore)
808+
{
809+
unsafeUpsertRedundantBefore(addRedundantBefore);
810+
}
811+
812+
@Override
813+
public void upsertDurableBefore(DurableBefore addDurableBefore)
814+
{
815+
unsafeUpsertDurableBefore(addDurableBefore);
816+
}
817+
804818
@Override
805819
public NodeTimeService time()
806820
{

accord-core/src/main/java/accord/impl/TimestampsForKeys.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import accord.api.RoutingKey;
2222
import accord.api.VisibleForImplementation;
23-
import accord.local.CommandStore;
23+
import accord.local.SafeCommandStore;
2424
import accord.primitives.RoutingKeys;
2525
import accord.primitives.Timestamp;
2626
import accord.primitives.TxnId;
@@ -38,15 +38,15 @@ public class TimestampsForKeys
3838

3939
private TimestampsForKeys() {}
4040

41-
public static TimestampsForKey updateLastExecutionTimestamps(CommandStore commandStore, SafeTimestampsForKey tfk, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn)
41+
public static TimestampsForKey updateLastExecutionTimestamps(SafeCommandStore safeStore, SafeTimestampsForKey tfk, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn)
4242
{
4343
TimestampsForKey current = tfk.current();
4444

4545
Timestamp lastWrite = current.lastWriteTimestamp();
4646

4747
if (executeAt.compareTo(lastWrite) < 0)
4848
{
49-
if (commandStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
49+
if (safeStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
5050
return current;
5151
throw illegalState("%s is less than the most recent write timestamp %s", executeAt, lastWrite);
5252
}
@@ -59,7 +59,7 @@ public static TimestampsForKey updateLastExecutionTimestamps(CommandStore comman
5959

6060
if (cmp < 0)
6161
{
62-
if (!commandStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
62+
if (!safeStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
6363
return current;
6464
throw illegalState("%s is less than the most recent executed timestamp %s", executeAt, lastExecuted);
6565
}
@@ -83,6 +83,6 @@ public static TimestampsForKey updateLastExecutionTimestamps(CommandStore comman
8383
@VisibleForImplementation
8484
public static <D> TimestampsForKey updateLastExecutionTimestamps(AbstractSafeCommandStore<?,?,?> safeStore, RoutingKey key, TxnId txnId, Timestamp executeAt, boolean isForWriteTxn)
8585
{
86-
return updateLastExecutionTimestamps(safeStore.commandStore(), safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn);
86+
return updateLastExecutionTimestamps(safeStore, safeStore.timestampsForKey(key), txnId, executeAt, isForWriteTxn);
8787
}
8888
}

accord-core/src/main/java/accord/local/Cleanup.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,20 @@
5252
public enum Cleanup
5353
{
5454
NO(Uninitialised),
55+
// we don't know if the command has been applied or invalidated as we have incomplete information
56+
// so erase what information we don't need in future to decide this
57+
// TODO (required): tighten up semantics here (and maybe infer more aggressively)
58+
EXPUNGE_PARTIAL(TruncatedApplyWithOutcome),
5559
TRUNCATE_WITH_OUTCOME(TruncatedApplyWithOutcome),
5660
TRUNCATE(TruncatedApply),
5761
INVALIDATE(Invalidated),
5862
VESTIGIAL(ErasedOrVestigial),
5963
ERASE(Erased),
60-
// erase all fields except any participants and committed executeAt
61-
EXPUNGE_PARTIAL(Erased),
6264
// we can stop storing the record entirely
6365
EXPUNGE(Erased);
6466

67+
private static final Cleanup[] VALUES = values();
68+
6569
public final SaveStatus appliesIfNot;
6670

6771
Cleanup(SaveStatus appliesIfNot)
@@ -76,18 +80,19 @@ public final Cleanup filter(SaveStatus saveStatus)
7680

7781
public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command command)
7882
{
79-
return shouldCleanup(safeStore.commandStore(), command, command.participants());
83+
return shouldCleanup(safeStore, command, command.participants());
8084
}
8185

8286
public static Cleanup shouldCleanup(SafeCommandStore safeStore, Command command, @Nonnull StoreParticipants participants)
8387
{
84-
return shouldCleanup(safeStore.commandStore(), command, participants);
88+
return shouldCleanup(command.txnId(), command.saveStatus(), command.durability(), participants,
89+
safeStore.redundantBefore(), safeStore.durableBefore());
8590
}
8691

87-
public static Cleanup shouldCleanup(CommandStore commandStore, Command command, @Nonnull StoreParticipants participants)
92+
public static Cleanup shouldCleanup(Command command, RedundantBefore redundantBefore, DurableBefore durableBefore)
8893
{
89-
return shouldCleanup(command.txnId(), command.saveStatus(), command.durability(), participants,
90-
commandStore.redundantBefore(), commandStore.durableBefore());
94+
return shouldCleanup(command.txnId(), command.saveStatus(), command.durability(), command.participants(),
95+
redundantBefore, durableBefore);
9196
}
9297

9398
public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore)
@@ -238,4 +243,9 @@ private static boolean expunge(TxnId txnId, SaveStatus saveStatus, DurableBefore
238243
// then we can safely erase. Revisit as part of rationalising RedundantBefore registers.
239244
return redundantBefore.shardStatus(txnId) == SHARD_REDUNDANT;
240245
}
246+
247+
public static Cleanup forOrdinal(int ordinal)
248+
{
249+
return VALUES[ordinal];
250+
}
241251
}

accord-core/src/main/java/accord/local/Command.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1368,7 +1368,7 @@ public static Update initialise(SafeCommandStore safeStore, TxnId txnId, Timesta
13681368
long maxEpoch = prevEpoch;
13691369
long epoch = rangesForEpoch.epochs[i];
13701370
Ranges ranges = rangesForEpoch.ranges[i];
1371-
ranges = safeStore.commandStore().redundantBefore().removePreBootstrap(txnId, ranges);
1371+
ranges = safeStore.redundantBefore().removePreBootstrap(txnId, ranges);
13721372
if (!ranges.isEmpty())
13731373
{
13741374
Unseekables<?> executionParticipants = participants.route.slice(ranges, Minimal);

0 commit comments

Comments
 (0)