Skip to content

Commit 4cf0070

Browse files
committedOct 2, 2024
1 parent e2e7228 commit 4cf0070

File tree

16 files changed

+165
-37
lines changed

16 files changed

+165
-37
lines changed
 

‎accord-core/src/main/java/accord/coordinate/Barrier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ private void createSyncPoint()
182182
{
183183
AsyncSyncPoint async = syncPoint.apply(node, route);
184184
coordinateSyncPoint = async.async;
185-
if (!barrierType.global)
185+
if (barrierType.async)
186186
{
187187
Invariants.checkState(barrierType.async);
188188
TxnId txnId = async.txnId;

‎accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import accord.api.RequestTimeouts;
2626
import accord.local.Node;
27+
import accord.utils.ArrayBuffers.BufferList;
2728
import accord.utils.LogGroupTimers;
2829

2930
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -49,7 +50,8 @@ public void cancel()
4950
lock.lock();
5051
try
5152
{
52-
timeouts.remove(this);
53+
if (isInHeap())
54+
timeouts.remove(this);
5355
}
5456
finally
5557
{
@@ -91,16 +93,41 @@ public RegisteredTimeout tryRegister(Timeout timeout, long now, long deadline)
9193
@Override
9294
public void run()
9395
{
94-
lock.lock();
95-
try
96+
try (BufferList<Registered> collect = new BufferList<>())
9697
{
97-
long now = node.elapsed(MILLISECONDS);
98-
// TODO (expected): should we handle reentrancy? Or at least throw an exception?
99-
timeouts.advance(now, this, (s, r) -> r.timeout.timeout());
100-
}
101-
finally
102-
{
103-
lock.unlock();
98+
int i = 0;
99+
try
100+
{
101+
lock.lock();
102+
try
103+
{
104+
long now = node.elapsed(MILLISECONDS);
105+
// TODO (expected): should we handle reentrancy? Or at least throw an exception?
106+
timeouts.advance(now, collect, BufferList::add);
107+
}
108+
finally
109+
{
110+
lock.unlock();
111+
}
112+
113+
while (i < collect.size())
114+
collect.get(i++).timeout.timeout();
115+
}
116+
catch (Throwable t)
117+
{
118+
while (i < collect.size())
119+
{
120+
try
121+
{
122+
collect.get(i++).timeout.timeout();
123+
}
124+
catch (Throwable t2)
125+
{
126+
t.addSuppressed(t2);
127+
}
128+
}
129+
throw t;
130+
}
104131
}
105132
}
106133

‎accord-core/src/main/java/accord/impl/ErasedSafeCommand.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,13 @@ public class ErasedSafeCommand extends SafeCommand
3737
public ErasedSafeCommand(TxnId txnId, SaveStatus saveStatus)
3838
{
3939
super(txnId);
40+
this.erased = erased(txnId, saveStatus);
41+
}
42+
43+
public static Command erased(TxnId txnId, SaveStatus saveStatus)
44+
{
4045
Invariants.checkArgument(saveStatus.compareTo(Erased) >= 0);
41-
this.erased = new Command.Truncated(txnId, saveStatus, saveStatus == ErasedOrVestigial ? NotDurable : UniversalOrInvalidated, StoreParticipants.empty(txnId), null, null, null);
46+
return new Command.Truncated(txnId, saveStatus, saveStatus == ErasedOrVestigial ? NotDurable : UniversalOrInvalidated, StoreParticipants.empty(txnId), null, null, null);
4247
}
4348

4449
@Override

‎accord-core/src/main/java/accord/impl/progresslog/HomeState.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,15 @@ final void runHome(DefaultProgressLog instance, SafeCommandStore safeStore, Safe
132132
{
133133
Invariants.checkState(!isHomeDoneOrUninitialised());
134134
Command command = safeCommand.current();
135+
if (command.hasBeen(Status.Truncated))
136+
{
137+
// TODO (required): validate this better
138+
setHomeDone(instance);
139+
return;
140+
}
135141
Invariants.checkState(!command.hasBeen(Status.Truncated), "Command %s is truncated", command);
136142

143+
Invariants.checkState(command.durability() != null);
137144
// TODO (expected): when invalidated, safer to maintain HomeState until known to be globally invalidated
138145
// TODO (now): validate that we clear HomeState when we receive a Durable reply, to replace the token check logic
139146
Invariants.checkState(!command.durability().isDurableOrInvalidated(), "Command is durable or invalidated, but we have not cleared the ProgressLog");
@@ -149,6 +156,13 @@ static void recoverCallback(SafeCommandStore safeStore, SafeCommand safeCommand,
149156
if (state == null)
150157
return;
151158

159+
Command command = safeCommand.current();
160+
if (command.is(Status.Truncated))
161+
{
162+
state.setHomeDone(instance);
163+
return;
164+
}
165+
152166
CoordinatePhase status = state.phase();
153167
if (status.isAtMostReadyToExecute() && state.homeProgress() == Querying)
154168
{
@@ -159,7 +173,6 @@ static void recoverCallback(SafeCommandStore safeStore, SafeCommand safeCommand,
159173
}
160174
else
161175
{
162-
Command command = safeCommand.current();
163176
ProgressToken token = success.asProgressToken().merge(command);
164177
if (prevProgressToken != null)
165178
token = token.merge(prevProgressToken);

‎accord-core/src/main/java/accord/local/Command.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,18 +197,18 @@ private AbstractCommand(TxnId txnId, SaveStatus status, Durability durability, @
197197
{
198198
this.txnId = txnId;
199199
this.status = validateCommandClass(txnId, status, getClass());
200-
this.durability = durability;
201-
this.participants = participants;
202-
this.promised = promised;
200+
this.durability = Invariants.nonNull(durability);
201+
this.participants = Invariants.nonNull(participants);
202+
this.promised = Invariants.nonNull(promised);
203203
}
204204

205205
private AbstractCommand(CommonAttributes common, SaveStatus status, Ballot promised)
206206
{
207207
this.txnId = common.txnId();
208208
this.status = validateCommandClass(txnId, status, getClass());
209-
this.durability = common.durability();
210-
this.participants = common.participants();
211-
this.promised = promised;
209+
this.durability = Invariants.nonNull(common.durability());
210+
this.participants = Invariants.nonNull(common.participants());
211+
this.promised = Invariants.nonNull(promised);
212212
}
213213

214214
@Override
@@ -1020,6 +1020,8 @@ private Committed(CommonAttributes common, SaveStatus status, Timestamp executeA
10201020
{
10211021
super(common, status, promised, executeAt, accepted);
10221022
this.waitingOn = waitingOn;
1023+
Invariants.nonNull(common.participants());
1024+
Invariants.nonNull(common.route());
10231025
Invariants.checkState(common.route().kind().isFullRoute(), "Expected a full route but given %s", common.route().kind());
10241026
}
10251027

‎accord-core/src/main/java/accord/local/CommandStores.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import accord.primitives.RoutingKeys;
5353
import accord.primitives.Timestamp;
5454
import accord.primitives.TxnId;
55+
import accord.topology.Shard;
5556
import accord.topology.Topology;
5657
import accord.utils.Invariants;
5758
import accord.utils.MapReduce;
@@ -452,8 +453,10 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top
452453
// TODO (desired): only sync affected shards
453454
Ranges ranges = shard.ranges().currentRanges();
454455
// ranges can be empty when ranges are lost or consolidated across epochs.
455-
if (epoch > 1 && startSync && !ranges.isEmpty())
456+
if (epoch > 1 && startSync && requiresSync(ranges, prev.global, newTopology))
457+
{
456458
bootstrapUpdates.add(shard.store.sync(node, ranges, epoch));
459+
}
457460
result.add(shard);
458461
}
459462

@@ -489,6 +492,37 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top
489492
return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology), bootstrap);
490493
}
491494

495+
private static boolean requiresSync(Ranges ranges, Topology oldTopology, Topology newTopology)
496+
{
497+
List<Shard> oldShards = oldTopology.foldl(ranges, (oldShard, shards, i) -> {
498+
shards.add(oldShard);
499+
return shards;
500+
}, new ArrayList<>());
501+
502+
List<Shard> newShards = newTopology.foldl(ranges, (newShard, shards, i) -> {
503+
shards.add(newShard);
504+
return shards;
505+
}, new ArrayList<>());
506+
507+
if (oldShards.size() != newShards.size())
508+
return true;
509+
510+
for (int i = 0 ; i < oldShards.size() ; ++i)
511+
{
512+
Shard oldShard = oldShards.get(i);
513+
Shard newShard = newShards.get(i);
514+
if (!oldShard.fastPathElectorate.containsAll(newShard.fastPathElectorate))
515+
return true;
516+
517+
if (!newShard.fastPathElectorate.containsAll(oldShard.fastPathElectorate))
518+
return true;
519+
520+
if (!newShard.nodes.equals(oldShard.nodes))
521+
return true;
522+
}
523+
return false;
524+
}
525+
492526
public <R> R unsafeFoldLeft(R initial, BiFunction<R, CommandStore, R> f)
493527
{
494528
Snapshot snapshot = current;

‎accord-core/src/main/java/accord/local/Commands.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -605,14 +605,12 @@ public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand safeC
605605
.intersecting(executed.writes().keys);
606606
if (executes == null || executes.isEmpty())
607607
{
608-
// TODO (desirable, performance): Mark no-ops in CFK so we can notify later transactions immediately
609608
logger.trace("{}: applying no-op", txnId);
610609
safeCommand.applied(safeStore);
611610
safeStore.notifyListeners(safeCommand, command);
612611
}
613612
else
614613
{
615-
// TODO (now): we should set applying within apply to avoid applying multiple times
616614
safeCommand.applying(safeStore);
617615
safeStore.notifyListeners(safeCommand, command);
618616
logger.trace("{}: applying", command.txnId());

‎accord-core/src/main/java/accord/local/CommonAttributes.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import accord.primitives.TxnId;
2626
import accord.utils.Invariants;
2727

28+
import static accord.primitives.Status.Durability.NotDurable;
29+
2830
public interface CommonAttributes
2931
{
3032
TxnId txnId();
@@ -51,6 +53,7 @@ public Mutable(TxnId txnId)
5153
{
5254
this.txnId = txnId;
5355
this.participants = StoreParticipants.empty(txnId);
56+
this.durability = NotDurable;
5457
}
5558

5659
public Mutable(CommonAttributes attributes)

‎accord-core/src/main/java/accord/primitives/Timestamp.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,19 +209,28 @@ public Timestamp next()
209209
public int compareTo(@Nonnull Timestamp that)
210210
{
211211
if (this == that) return 0;
212-
int c = Long.compareUnsigned(this.msb, that.msb);
213-
if (c == 0) c = Long.compare(lowHlc(this.lsb), lowHlc(that.lsb));
214-
if (c == 0) c = Long.compare(this.lsb & IDENTITY_FLAGS, that.lsb & IDENTITY_FLAGS);
212+
int c = compareMsb(this.msb, that.msb);
213+
if (c == 0) c = compareLsb(this.lsb, that.lsb);
215214
if (c == 0) c = this.node.compareTo(that.node);
216215
return c;
217216
}
218217

218+
public static int compareMsb(long msbA, long msbB)
219+
{
220+
return Long.compareUnsigned(msbA, msbB);
221+
}
222+
223+
public static int compareLsb(long lsbA, long lsbB)
224+
{
225+
int c = Long.compare(lowHlc(lsbA), lowHlc(lsbB));
226+
return c != 0 ? c : Long.compare(lsbA & IDENTITY_FLAGS, lsbB & IDENTITY_FLAGS);
227+
}
228+
219229
public int compareToWithoutEpoch(@Nonnull Timestamp that)
220230
{
221231
if (this == that) return 0;
222232
int c = Long.compare(highHlc(this.msb), highHlc(that.msb));
223-
if (c == 0) c = Long.compare(lowHlc(this.lsb), lowHlc(that.lsb));
224-
if (c == 0) c = Long.compare(this.lsb & IDENTITY_FLAGS, that.lsb & IDENTITY_FLAGS);
233+
if (c == 0) c = compareLsb(this.lsb, that.lsb);
225234
if (c == 0) c = this.node.compareTo(that.node);
226235
return c;
227236
}

‎accord-core/src/main/java/accord/topology/TopologyManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ private Topologies withSufficientEpochsAtLeast(Unseekables<?> select, long minEp
731731
// An issue was found where a range was removed from a replica and min selection picked the epoch before that,
732732
// which caused a node to get included in the txn that actually lost the range
733733
// See CASSANDRA-18804
734-
while (i < maxi && !select.isEmpty())
734+
while (i < maxi)
735735
{
736736
EpochState epochState = snapshot.epochs[i++];
737737
topologies.add(epochState.global.forSelection(select, false));

‎accord-core/src/main/java/accord/utils/ArrayBuffers.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import accord.primitives.Range;
2424
import accord.primitives.TxnId;
2525

26+
import java.io.Closeable;
2627
import java.lang.reflect.Array;
28+
import java.util.AbstractList;
2729
import java.util.Arrays;
2830
import java.util.function.IntFunction;
2931

@@ -783,4 +785,38 @@ public void realDiscard(int[] buffer, int size)
783785
}
784786
}
785787

788+
public static class BufferList<E> extends AbstractList<E> implements Closeable
789+
{
790+
private static final Object[] EMPTY = new Object[0];
791+
private Object[] buffer = EMPTY;
792+
private int size;
793+
794+
@Override
795+
public E get(int index)
796+
{
797+
return (E) buffer[index];
798+
}
799+
800+
@Override
801+
public int size()
802+
{
803+
return size;
804+
}
805+
806+
@Override
807+
public boolean add(E e)
808+
{
809+
if (size == buffer.length)
810+
buffer = cachedAny().resize(buffer, size, Math.max(8, size * 2));
811+
buffer[size++] = e;
812+
return true;
813+
}
814+
815+
public void close()
816+
{
817+
if (buffer == null) return;
818+
cachedAny().forceDiscard(buffer, size);
819+
buffer = null;
820+
}
821+
}
786822
}

‎accord-core/src/main/java/accord/utils/LogGroupTimers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public Scheduling(Scheduler scheduler, Function<T, Runnable> taskFactory, long s
9595

9696
public void ensureScheduled(long now)
9797
{
98-
now = Math.max(lastNow, now);
98+
lastNow = now = Math.max(lastNow, now);
9999
T next = peekIfSoonerThan(now + preciseDelayThreshold);
100100
long runAt;
101101
if (next == null)

‎accord-core/src/test/java/accord/impl/LocalListenersTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,18 +430,20 @@ static class TestCommand extends Command
430430
final TxnId txnId;
431431
final SaveStatus saveStatus;
432432
final Durability durability;
433+
final StoreParticipants participants;
433434

434435
TestCommand(TxnId txnId, SaveStatus saveStatus, Durability durability)
435436
{
436437
this.txnId = txnId;
437438
this.saveStatus = saveStatus;
438439
this.durability = durability;
440+
this.participants = StoreParticipants.empty(txnId);
439441
}
440442

441443
@Override
442444
public StoreParticipants participants()
443445
{
444-
return null;
446+
return participants;
445447
}
446448

447449
@Override

‎accord-core/src/test/java/accord/local/ImmutableCommandTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ void noConflictWitnessTest()
131131
Assertions.assertNull(command.executeAt());
132132
}
133133
SafeCommandStore safeStore = commands.beginOperation(PreLoadContext.contextFor(txnId, keys.toParticipants()));
134-
StoreParticipants participants = StoreParticipants.update(safeStore, keys.toParticipants(), txnId.epoch(), txnId, txnId.epoch());
134+
StoreParticipants participants = StoreParticipants.update(safeStore, ROUTE, txnId.epoch(), txnId, txnId.epoch());
135135
SafeCommand safeCommand = safeStore.get(txnId, participants);
136136
Commands.preaccept(safeStore, safeCommand, participants, txnId, txnId.epoch(), txn.slice(FULL_RANGES, true), ROUTE);
137137
Command command = safeStore.get(txnId).current();
@@ -161,7 +161,7 @@ void supersedingEpochWitnessTest() throws ExecutionException {
161161
((TestableConfigurationService)node.configService()).reportTopology(TopologyUtils.withEpoch(support.local.get(), 2));
162162
Timestamp expectedTimestamp = Timestamp.fromValues(2, 110, ID1);
163163
getUninterruptibly(commands.execute(context, (Consumer<? super SafeCommandStore>) safeStore -> {
164-
StoreParticipants participants = StoreParticipants.update(safeStore, keys.toParticipants(), txnId.epoch(), txnId, 2);
164+
StoreParticipants participants = StoreParticipants.update(safeStore, ROUTE, txnId.epoch(), txnId, 2);
165165
Commands.preaccept(safeStore, safeStore.get(txnId, participants), participants, txnId, txnId.epoch(), txn.slice(FULL_RANGES, true), ROUTE);
166166
}));
167167
commands.execute(PreLoadContext.contextFor(txnId, txn.keys().toParticipants()), safeStore -> {

0 commit comments

Comments
 (0)