Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c2ef064

Browse files
committedJun 12, 2025
Fix:
- Do not query local topology when deciding what keys to fetch to avoid TopologyRetiredException - Ensure we propagate information back to the requesting CommandStore by using the store id to ensure it is included - BurnTest topology fetching was broken by earlier patch - Topology callbacks were not being invoked as we were not calling .begin() - Topology mismatch failure during notAccept phase was not being reported due to CoordinatePreAccept already having isDone==true patch by Benedict; reviewed by David Capwell for CASSANDRA-20711
1 parent bf85660 commit c2ef064

26 files changed

+612
-636
lines changed
 

‎accord-core/src/main/java/accord/coordinate/CheckShards.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*/
4040
public abstract class CheckShards<U extends Participants<?>> extends ReadCoordinator<CheckStatusReply>
4141
{
42-
final U route;
42+
final U query;
4343

4444
/**
4545
* The epoch we want to fetch data from remotely
@@ -54,17 +54,17 @@ public abstract class CheckShards<U extends Participants<?>> extends ReadCoordin
5454
protected boolean truncated;
5555

5656
// srcEpoch is either txnId.epoch() or executeAt.epoch()
57-
protected CheckShards(Node node, TxnId txnId, U route, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf)
57+
protected CheckShards(Node node, TxnId txnId, U query, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf)
5858
{
59-
this(node, txnId, route, txnId.epoch(), includeInfo, bumpBallot, previouslyKnownToBeInvalidIf);
59+
this(node, txnId, query, txnId.epoch(), includeInfo, bumpBallot, previouslyKnownToBeInvalidIf);
6060
Invariants.require(txnId.isVisible());
6161
}
6262

63-
protected CheckShards(Node node, TxnId txnId, U route, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf)
63+
protected CheckShards(Node node, TxnId txnId, U query, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf)
6464
{
65-
super(node, topologyFor(node, txnId, route, srcEpoch), txnId);
65+
super(node, topologyFor(node, txnId, query, srcEpoch), txnId);
6666
this.sourceEpoch = srcEpoch;
67-
this.route = route;
67+
this.query = query;
6868
this.includeInfo = includeInfo;
6969
this.bumpBallot = bumpBallot;
7070
this.previouslyKnownToBeInvalidIf = previouslyKnownToBeInvalidIf;
@@ -79,7 +79,7 @@ private static Topologies topologyFor(Node node, TxnId txnId, Unseekables<?> con
7979
@Override
8080
protected void contact(Id id)
8181
{
82-
Participants<?> unseekables = route.slice(topologies().computeRangesForNode(id));
82+
Participants<?> unseekables = query.slice(topologies().computeRangesForNode(id));
8383
node.send(id, new CheckStatus(txnId, unseekables, sourceEpoch, includeInfo, bumpBallot), this);
8484
}
8585

@@ -119,7 +119,7 @@ protected Action process(Id from, CheckStatusReply reply)
119119
@Override
120120
protected void finishOnExhaustion()
121121
{
122-
if (merged != null && merged.map.hasFullyTruncated(route)) finishOnFailure(new Truncated(txnId, null), false);
122+
if (merged != null && merged.map.hasFullyTruncated(query)) finishOnFailure(new Truncated(txnId, null), false);
123123
else super.finishOnExhaustion();
124124
}
125125
}

‎accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ void onNewEpochTopologyMismatch(TopologyMismatch mismatch)
142142
proposeInvalidate(node, node.uniqueTimestamp(Ballot::fromValues), txnId, route.homeKey(), (outcome, failure) -> {
143143
if (failure != null)
144144
mismatch.addSuppressed(failure);
145-
setFailure(mismatch);
145+
callback.accept(null, mismatch);
146146
});
147147
}
148148

‎accord-core/src/main/java/accord/coordinate/FetchData.java

Lines changed: 30 additions & 194 deletions
Large diffs are not rendered by default.
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package accord.coordinate;
20+
21+
import java.util.function.BiConsumer;
22+
23+
import accord.local.Commands;
24+
import accord.local.Node;
25+
import accord.local.SafeCommand;
26+
import accord.local.SafeCommandStore;
27+
import accord.primitives.Known;
28+
import accord.messages.CheckStatus.CheckStatusOk;
29+
import accord.messages.CheckStatus.IncludeInfo;
30+
import accord.primitives.WithQuorum;
31+
import accord.primitives.Participants;
32+
import accord.primitives.Route;
33+
import accord.primitives.TxnId;
34+
import accord.utils.MapReduceConsume;
35+
36+
import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
37+
import static accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
38+
import static accord.local.CommandStores.*;
39+
import static accord.primitives.Known.Nothing;
40+
import static accord.primitives.WithQuorum.HasQuorum;
41+
42+
/**
43+
* Find some Route for a txnId using some known participants
44+
*/
45+
public class FetchSomeRoute extends CheckShards<Participants<?>>
46+
{
47+
final LatentStoreSelector reportTo;
48+
final BiConsumer<Route<?>, Throwable> callback;
49+
FetchSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback)
50+
{
51+
super(node, txnId, contactable, IncludeInfo.Route, null, invalidIf);
52+
this.reportTo = reportTo;
53+
this.callback = callback;
54+
}
55+
56+
public static void fetchSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback)
57+
{
58+
if (!node.topology().hasEpoch(txnId.epoch()))
59+
{
60+
node.withEpochAtLeast(txnId.epoch(), callback, () -> fetchSomeRoute(node, txnId, invalidIf, unseekables, reportTo, callback));
61+
return;
62+
}
63+
64+
FetchSomeRoute fetchSomeRoute = new FetchSomeRoute(node, txnId, invalidIf, unseekables, reportTo, callback);
65+
fetchSomeRoute.start();
66+
}
67+
68+
69+
public static void fetchSomeRoute(Node node, TxnId txnId, Participants<?> contactable, BiConsumer<Route<?>, Throwable> callback)
70+
{
71+
fetchSomeRoute(node, txnId, contactable, LatentStoreSelector.standard(), callback);
72+
}
73+
74+
public static void fetchSomeRoute(Node node, TxnId txnId, Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback)
75+
{
76+
fetchSomeRoute(node, txnId, NotKnownToBeInvalid, contactable, reportTo, callback);
77+
}
78+
79+
@Override
80+
protected boolean isSufficient(CheckStatusOk ok)
81+
{
82+
return ok.route != null;
83+
}
84+
85+
@Override
86+
protected void onDone(Success success, Throwable failure)
87+
{
88+
if (failure != null) callback.accept(null, failure);
89+
else
90+
{
91+
final Route<?> route = merged == null ? null : merged.route;
92+
if (route == null)
93+
{
94+
Known known = Nothing;
95+
if (merged != null)
96+
known = merged.finish(query, query, query, success.withQuorum, previouslyKnownToBeInvalidIf).knownFor(txnId, query, query);
97+
reportRouteNotFound(node, success.withQuorum, known, txnId, query, reportTo, callback);
98+
}
99+
else
100+
{
101+
StoreSelector selector = reportTo.refine(txnId, null, query);
102+
node.mapReduceConsumeLocal(txnId, selector, new MapReduceConsume<>()
103+
{
104+
@Override
105+
public void accept(Object result, Throwable failure)
106+
{
107+
callback.accept(route, null);
108+
}
109+
110+
@Override
111+
public Object apply(SafeCommandStore safeStore)
112+
{
113+
SafeCommand safeCommand = safeStore.ifInitialised(txnId);
114+
if (safeCommand != null)
115+
Commands.updateRoute(safeStore, safeCommand, route);
116+
return null;
117+
}
118+
119+
@Override
120+
public Object reduce(Object o1, Object o2)
121+
{
122+
return null;
123+
}
124+
});
125+
}
126+
}
127+
}
128+
129+
private static void reportRouteNotFound(Node node, WithQuorum withQuorum, Known found, TxnId txnId, Participants<?> participants, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback)
130+
{
131+
switch (found.outcome())
132+
{
133+
default: throw new AssertionError("Unknown outcome: " + found.outcome());
134+
case Abort:
135+
locallyInvalidateAndCallback(node, txnId, reportTo.refine(txnId, null, participants), participants, null, callback);
136+
break;
137+
138+
case Unknown:
139+
if (withQuorum == HasQuorum && found.canProposeInvalidation())
140+
{
141+
Invalidate.invalidate(node, txnId, participants, false, reportTo, (outcome, throwable) -> callback.accept(null, throwable));
142+
break;
143+
}
144+
case Erased:
145+
case WasApply:
146+
case Apply:
147+
callback.accept(null, null);
148+
}
149+
}
150+
151+
}

‎accord-core/src/main/java/accord/coordinate/FindRoute.java

Lines changed: 0 additions & 81 deletions
This file was deleted.

‎accord-core/src/main/java/accord/coordinate/FindSomeRoute.java

Lines changed: 0 additions & 85 deletions
This file was deleted.

0 commit comments

Comments
 (0)
Please sign in to comment.