Skip to content

Support asynchronous CommandStore message processing #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions accord-core/src/main/java/accord/impl/CommandsForKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import accord.api.Key;
import accord.local.*;
import accord.local.SafeCommandStore.CommandFunction;
import accord.local.SafeCommandStore.SearchFunction;
import accord.local.SafeCommandStore.TestDep;
import accord.local.SafeCommandStore.TestKind;
import accord.primitives.Keys;
Expand Down Expand Up @@ -55,7 +55,7 @@ enum TestTimestamp { BEFORE, AFTER }
<T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
TestDep testDep, @Nullable TxnId depId,
@Nullable Status minStatus, @Nullable Status maxStatus,
CommandFunction<T, T> map, T initialValue, T terminalValue);
SearchFunction<T, T> map, T initialValue, T terminalValue);
}

public abstract Key key();
Expand Down
195 changes: 142 additions & 53 deletions accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@

package accord.impl;

import accord.local.*;
import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails compilation if this is in correct position
import accord.local.CommandStore;
import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
import accord.impl.InMemoryCommandStore.SingleThread.AsyncState;
import accord.local.SafeCommandStore;
import accord.local.Command;
import accord.local.CommandListener;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.Status;
import accord.local.SyncCommandStores;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
import accord.impl.InMemoryCommandStore.SingleThread.AsyncState;
import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
import accord.local.CommandStores.RangesForEpochHolder;
import accord.local.CommandStores.RangesForEpoch;
import accord.impl.CommandsForKey.CommandTimeseries;
Expand All @@ -35,7 +42,9 @@
import accord.utils.Invariants;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -65,22 +74,25 @@ public static abstract class State implements SafeCommandStore
private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
private final NavigableMap<RoutableKey, InMemoryCommandsForKey> commandsForKey = new TreeMap<>();
// TODO (find library, efficiency): this is obviously super inefficient, need some range map
private final TreeMap<TxnId, RangeCommand> rangeCommands = new TreeMap<>();
private final TreeMap<RangeAndTxnId, Command> rangeCommands = new TreeMap<>();

static class RangeCommand
static class RangeAndTxnId implements Comparable<RangeAndTxnId>
{
final Command command;
Ranges ranges;
final Range range;
final TxnId txnId;

RangeCommand(Command command)
{
this.command = command;

RangeAndTxnId(Range range, TxnId txnId) {
this.range = range;
this.txnId = txnId;
}

void update(Ranges add)
@Override
public int compareTo(@Nonnull RangeAndTxnId that)
{
if (ranges == null) ranges = add;
else ranges = ranges.with(add);
int c = this.range.compare(that.range);
if (c == 0) c = this.txnId.compareTo(that.txnId);
return c;
}
}

Expand Down Expand Up @@ -202,10 +214,10 @@ private Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
{
Timestamp timestamp = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> Timestamp.max(forKey.max(), prev), Timestamp.NONE, null);
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
for (RangeCommand command : rangeCommands.values())
for (Map.Entry<RangeAndTxnId, Command> e : rangeCommands.entrySet())
{
if (command.ranges.intersects(sliced))
timestamp = Timestamp.max(timestamp, command.command.executeAt());
if (sliced.intersects(e.getKey().range))
timestamp = Timestamp.max(timestamp, e.getValue().executeAt());
}
return timestamp;
}
Expand Down Expand Up @@ -249,7 +261,104 @@ public void forCommittedInEpoch(Ranges ranges, long epoch, Consumer<Command> con
}

@Override
public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
public <T> Future<T> slowFold(Seekables<?, ?> keysOrRanges, Ranges slice, SlowSearchFunction<T, T> fold, T accumulate, T terminalValue)
{
accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, acc) ->
fold.apply(new SlowSearcher() {
@Override
public <T2> T2 fold(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, SearchFunction<T2, T2> apply, T2 acc, T2 term)
{
CommandTimeseries timeseries;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case STARTED_BEFORE:
timeseries = forKey.byId();
break;
case EXECUTES_AFTER:
case MAY_EXECUTE_BEFORE:
timeseries = forKey.byExecuteAt();
}
CommandTimeseries.TestTimestamp remapTestTimestamp;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case EXECUTES_AFTER:
remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
break;
case STARTED_BEFORE:
case MAY_EXECUTE_BEFORE:
remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
}
return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, apply, acc, term);
}
}, forKey.key(), acc), accumulate, terminalValue);

if (accumulate.equals(terminalValue))
return ImmediateFuture.success(accumulate);

Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
for (Map.Entry<RangeAndTxnId, Command> e : rangeCommands.entrySet())
{
if (!sliced.intersects(e.getKey().range))
continue;

accumulate = fold.apply(new SlowSearcher()
{
@Override
public <T2> T2 fold(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, SearchFunction<T2, T2> fold, T2 accumulate, T2 terminalValue)
{
Command command = e.getValue();
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
if (command.txnId().compareTo(timestamp) < 0) return accumulate;
else break;
case STARTED_BEFORE:
if (command.txnId().compareTo(timestamp) > 0) return accumulate;
else break;
case EXECUTES_AFTER:
if (command.executeAt().compareTo(timestamp) < 0) return accumulate;
else break;
case MAY_EXECUTE_BEFORE:
Timestamp compareTo = command.known().executeAt.hasDecidedExecuteAt() ? command.executeAt() : command.txnId();
if (compareTo.compareTo(timestamp) > 0) return accumulate;
else break;
}

if (minStatus != null && command.status().compareTo(minStatus) < 0)
return accumulate;

if (maxStatus != null && command.status().compareTo(maxStatus) > 0)
return accumulate;

if (testKind == Ws && command.txnId().rw().isRead())
return accumulate;

if (testDep != ANY_DEPS)
{
if (!command.known().deps.hasProposedOrDecidedDeps())
return accumulate;

if ((testDep == WITH) == !command.partialDeps().contains(depId))
return accumulate;
}

return fold.apply(e.getKey().range, command.txnId(), command.executeAt(), accumulate);
}
}, e.getKey().range, accumulate);

if (accumulate.equals(terminalValue))
break;
}
return ImmediateFuture.success(accumulate);
}

@Override
public <T> Future<T> fold(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, @Nullable Status minStatus, @Nullable Status maxStatus, SearchFunction<T, T> fold, T accumulate, T terminalValue)
{
accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
CommandTimeseries timeseries;
Expand All @@ -276,18 +385,20 @@ public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind test
case MAY_EXECUTE_BEFORE:
remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
}
return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, ANY_DEPS, null, minStatus, maxStatus, fold, prev, terminalValue);
}, accumulate, terminalValue);

if (accumulate.equals(terminalValue))
return accumulate;
return ImmediateFuture.success(accumulate);

// TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
Map<Range, List<Command>> collect = new TreeMap<>(Range::compare);
for (RangeCommand rangeCommand : rangeCommands.values())
for (Map.Entry<RangeAndTxnId, Command> e : rangeCommands.entrySet())
{
Command command = rangeCommand.command;
if (!sliced.intersects(e.getKey().range))
continue;

Command command = e.getValue();
switch (testTimestamp)
{
default: throw new AssertionError();
Expand Down Expand Up @@ -315,34 +426,13 @@ public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind test
if (testKind == Ws && command.txnId().rw().isRead())
continue;

if (testDep != ANY_DEPS)
{
if (!command.known().deps.hasProposedOrDecidedDeps())
continue;

if ((testDep == WITH) == !command.partialDeps().contains(depId))
continue;
}

if (!rangeCommand.ranges.intersects(sliced))
continue;

Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
// TODO (easy, efficiency): pass command as a parameter to Fold
List<Command> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
if (list.isEmpty() || list.get(list.size() - 1) != command)
list.add(command);
return in;
}, collect);
}

for (Map.Entry<Range, List<Command>> e : collect.entrySet())
{
for (Command command : e.getValue())
accumulate = map.apply(e.getKey(), command.txnId(), command.executeAt(), accumulate);
// TODO (easy, efficiency): pass command as a parameter to Fold
accumulate = fold.apply(e.getKey().range, command.txnId(), command.executeAt(), accumulate);
if (accumulate.equals(terminalValue))
break;
}

return accumulate;
return ImmediateFuture.success(accumulate);
}

@Override
Expand All @@ -355,8 +445,8 @@ public void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command
forEach(keysOrRanges, slice, forKey -> forKey.register(command));
break;
case Range:
rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command))
.update((Ranges)keysOrRanges);
for (Range range : (Ranges)keysOrRanges)
rangeCommands.putIfAbsent(new RangeAndTxnId(range, command.txnId()), command);
}
}

Expand All @@ -370,8 +460,7 @@ public void register(Seekable keyOrRange, Ranges slice, Command command)
forEach(keyOrRange, slice, forKey -> forKey.register(command));
break;
case Range:
rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command))
.update(Ranges.of((Range)keyOrRange));
rangeCommands.putIfAbsent(new RangeAndTxnId((Range)keyOrRange, command.txnId()), command);
}
}

Expand Down Expand Up @@ -447,7 +536,7 @@ private void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey>

public static class Synchronized extends SyncCommandStore
{
public static class SynchronizedState extends State implements SyncCommandStores.SafeSyncCommandStore
public static class SynchronizedState extends InMemoryCommandStore.State implements SyncCommandStores.SafeSyncCommandStore
{
public SynchronizedState(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

package accord.impl;

import accord.local.*;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.local.AsyncCommandStores;
import accord.local.CommandStore;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
import accord.local.ShardDistributor;
import accord.local.SyncCommandStores;
import accord.primitives.Routables;
import accord.utils.MapReduce;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import accord.api.Key;
import accord.local.Command;
import accord.local.SafeCommandStore.CommandFunction;
import accord.local.SafeCommandStore.SearchFunction;
import accord.local.SafeCommandStore.TestDep;
import accord.local.SafeCommandStore.TestKind;
import accord.local.Status;
Expand Down Expand Up @@ -77,7 +77,7 @@ public boolean isEmpty()
public <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
TestDep testDep, @Nullable TxnId depId,
@Nullable Status minStatus, @Nullable Status maxStatus,
CommandFunction<T, T> map, T initialValue, T terminalValue)
SearchFunction<T, T> map, T initialValue, T terminalValue)
{

for (Command cmd : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values())
Expand Down
Loading