Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2edc7fb

Browse files
dcapwellbelliottsmith
authored andcommittedNov 22, 2024
Key transaction recovery should witness range transactions
patch by Benedict; reviewed by David for CASSANDRA-20105
1 parent ad6d9c7 commit 2edc7fb

File tree

7 files changed

+64
-38
lines changed

7 files changed

+64
-38
lines changed
 

‎accord-core/src/main/java/accord/impl/CommandsSummary.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,23 @@
2222
import accord.local.SafeCommandStore.TestDep;
2323
import accord.local.SafeCommandStore.TestStartedAt;
2424
import accord.local.SafeCommandStore.TestStatus;
25+
import accord.primitives.Routables;
2526
import accord.primitives.Timestamp;
2627
import accord.primitives.Txn.Kind.Kinds;
2728
import accord.primitives.TxnId;
2829

2930
public interface CommandsSummary
3031
{
31-
<P1, T> T mapReduceFull(TxnId testTxnId,
32+
<P1, T> T mapReduceFull(Routables<?> keysOrRanges,
33+
TxnId testTxnId,
3234
Kinds testKind,
3335
TestStartedAt testStartedAt,
3436
TestDep testDep,
3537
TestStatus testStatus,
3638
CommandFunction<P1, T, T> map, P1 p1, T initialValue);
3739

38-
<P1, T> T mapReduceActive(Timestamp startedBefore,
40+
<P1, T> T mapReduceActive(Routables<?> keysOrRanges,
41+
Timestamp startedBefore,
3942
Kinds testKind,
4043
CommandFunction<P1, T, T> map, P1 p1, T initialValue);
4144
}

‎accord-core/src/main/java/accord/impl/InMemoryCommandStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ public TxnInfo(Command command)
862862
public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T accumulate)
863863
{
864864
accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> {
865-
return commands.mapReduceActive(startedBefore, testKind, map, p1, prev);
865+
return commands.mapReduceActive(keysOrRanges, startedBefore, testKind, map, p1, prev);
866866
}, accumulate);
867867

868868
return mapReduceRangesInternal(keysOrRanges, startedBefore, null, testKind, STARTED_BEFORE, ANY_DEPS, ANY_STATUS, map, p1, accumulate);
@@ -873,7 +873,7 @@ public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, Timestamp startedB
873873
public <P1, T> T mapReduceFull(Unseekables<?> keysOrRanges, TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate)
874874
{
875875
accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> {
876-
return commands.mapReduceFull(testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, prev);
876+
return commands.mapReduceFull(keysOrRanges, testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, prev);
877877
}, accumulate);
878878

879879
return mapReduceRangesInternal(keysOrRanges, testTxnId, testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, accumulate);

‎accord-core/src/main/java/accord/local/cfk/CommandsForKey.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import accord.local.SafeCommandStore.TestDep;
4141
import accord.local.SafeCommandStore.TestStartedAt;
4242
import accord.local.SafeCommandStore.TestStatus;
43+
import accord.primitives.Routables;
4344
import accord.primitives.SaveStatus;
4445
import accord.primitives.Status;
4546
import accord.local.cfk.PostProcess.NotifyUnmanagedResult;
@@ -1052,7 +1053,8 @@ public TxnId blockedOnTxnId(TxnId txnId, @Nullable Timestamp executeAt)
10521053
* commands that do not know any deps will be ignored, as will any with an executeAt prior to the txnId.
10531054
* <p>
10541055
*/
1055-
public <P1, T> T mapReduceFull(TxnId testTxnId,
1056+
public <P1, T> T mapReduceFull(Routables<?> keysOrRanges,
1057+
TxnId testTxnId,
10561058
Kinds testKind,
10571059
TestStartedAt testStartedAt,
10581060
TestDep testDep,
@@ -1141,7 +1143,8 @@ else if (loadingFor == NO_TXNIDS)
11411143
return initialValue;
11421144
}
11431145

1144-
public <P1, T> T mapReduceActive(Timestamp startedBefore,
1146+
public <P1, T> T mapReduceActive(Routables<?> keysOrRanges,
1147+
Timestamp startedBefore,
11451148
Kinds testKind,
11461149
CommandFunction<P1, T, T> map, P1 p1, T initialValue)
11471150
{

‎accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import net.nicoulaj.compilecommand.annotations.Inline;
2525

2626
import static accord.utils.SortedArrays.Search.CEIL;
27+
import static accord.utils.SortedArrays.Search.FLOOR;
2728

2829
public class CheckpointIntervalArray<Ranges, Range, Key>
2930
{
@@ -216,4 +217,34 @@ else if ((ri & BIT29) != 0)
216217
forEachRange.accept(p1, p2, p3, p4, start, end);
217218
return end;
218219
}
220+
221+
@Inline
222+
public <P1, P2, P3, P4> int forEachKey(Key key, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
223+
{
224+
if (accessor.size(ranges) == 0 || minIndex == accessor.size(ranges))
225+
return minIndex;
226+
227+
int end = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), key, (a, b) -> -accessor.compareStartTo(b, a), FLOOR);
228+
if (end < 0) end = -1 - end;
229+
if (end <= minIndex) return minIndex;
230+
231+
int floor = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), key, (a, b) -> -accessor.compareStartTo(b, a), CEIL);
232+
int start = floor;
233+
if (floor < 0)
234+
{
235+
// if there's no precise match on start, step backwards;
236+
// if this range does not overlap us, step forwards again for start
237+
// but retain the floor index for performing scan and checkpoint searches from
238+
// as this contains all ranges that might overlap us (whereas those that end
239+
// after us but before the next range's start would be missed by the next range index)
240+
start = floor = -2 - floor;
241+
if (start < 0)
242+
start = floor = 0;
243+
else if (accessor.compareEndTo(accessor.get(ranges, start), key) < 0)
244+
++start;
245+
}
246+
247+
int bound = accessor.endInclusive(ranges) ? -1 : 0;
248+
return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
249+
}
219250
}

‎accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ public interface Accessor<Ranges, Range, RoutingKey>
7676
RoutingKey end(Ranges ranges, int index);
7777
RoutingKey end(Range range);
7878
Comparator<RoutingKey> keyComparator();
79+
int compareEndTo(Range range, RoutingKey key);
80+
int compareStartTo(Range range, RoutingKey key);
81+
boolean endInclusive(Ranges ranges);
7982
int binarySearch(Ranges ranges, int from, int to, RoutingKey find, AsymmetricComparator<RoutingKey, Range> comparator, SortedArrays.Search op);
8083
}
8184

‎accord-core/src/main/java/accord/utils/SearchableRangeList.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222
import accord.primitives.RoutableKey;
2323
import accord.utils.CheckpointIntervalArrayBuilder.Links;
2424
import accord.utils.CheckpointIntervalArrayBuilder.Strategy;
25-
import net.nicoulaj.compilecommand.annotations.Inline;
2625

2726
import static accord.utils.CheckpointIntervalArrayBuilder.Links.LINKS;
2827
import static accord.utils.CheckpointIntervalArrayBuilder.Strategy.ACCURATE;
29-
import static accord.utils.SortedArrays.Search.*;
3028

3129
/**
3230
* Based on CINTIA, the Checkpoint INTerval Array
@@ -85,36 +83,6 @@ public SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers, int
8583
super(SearchableRangeListBuilder.RANGE_ACCESSOR, ranges, lowerBounds, headers, checkpointLists, maxScanAndCheckpointMatches);
8684
}
8785

88-
@Inline
89-
public <P1, P2, P3, P4> int forEachKey(RoutableKey key, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
90-
{
91-
if (ranges.length == 0 || minIndex == ranges.length)
92-
return minIndex;
93-
94-
int end = SortedArrays.binarySearch(ranges, minIndex, ranges.length, key, (a, b) -> -b.compareStartTo(a), FLOOR);
95-
if (end < 0) end = -1 - end;
96-
if (end <= minIndex) return minIndex;
97-
98-
int floor = SortedArrays.binarySearch(ranges, minIndex, ranges.length, key, (a, b) -> -b.compareStartTo(a), CEIL);
99-
int start = floor;
100-
if (floor < 0)
101-
{
102-
// if there's no precise match on start, step backwards;
103-
// if this range does not overlap us, step forwards again for start
104-
// but retain the floor index for performing scan and checkpoint searches from
105-
// as this contains all ranges that might overlap us (whereas those that end
106-
// after us but before the next range's start would be missed by the next range index)
107-
start = floor = -2 - floor;
108-
if (start < 0)
109-
start = floor = 0;
110-
else if (ranges[start].compareEndTo(key) < 0)
111-
++start;
112-
}
113-
114-
int bound = ranges[0].endInclusive() ? -1 : 0;
115-
return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
116-
}
117-
11886
public static SearchableRangeList build(Range[] ranges)
11987
{
12088
if (ranges.length == 0)

‎accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,24 @@ public Comparator<RoutableKey> keyComparator()
7474
return Comparator.naturalOrder();
7575
}
7676

77+
@Override
78+
public int compareEndTo(Range range, RoutableKey key)
79+
{
80+
return range.compareEndTo(key);
81+
}
82+
83+
@Override
84+
public int compareStartTo(Range range, RoutableKey key)
85+
{
86+
return range.compareStartTo(key);
87+
}
88+
89+
@Override
90+
public boolean endInclusive(Range[] ranges)
91+
{
92+
return ranges[0].endInclusive();
93+
}
94+
7795
@Override
7896
public int binarySearch(Range[] ranges, int from, int to, RoutableKey find, AsymmetricComparator<RoutableKey, Range> comparator, SortedArrays.Search op)
7997
{

0 commit comments

Comments
 (0)
Please sign in to comment.