23
23
import java .util .List ;
24
24
import java .util .Map ;
25
25
26
+ import accord .local .Command ;
26
27
import accord .local .SafeCommandStore ;
27
28
import accord .messages .ReadData ;
28
29
import accord .utils .async .AsyncChain ;
50
51
import accord .utils .async .AsyncChains ;
51
52
import accord .utils .async .AsyncResult ;
52
53
import accord .utils .async .AsyncResults ;
54
+
55
+ import javax .annotation .Nonnull ;
53
56
import javax .annotation .Nullable ;
54
57
58
+ import static accord .messages .ReadEphemeralTxnData .retryInLaterEpoch ;
55
59
import static accord .primitives .SaveStatus .Applied ;
56
60
import static accord .primitives .SaveStatus .TruncatedApply ;
57
61
import static accord .messages .ReadData .CommitOrReadNack .Insufficient ;
@@ -123,13 +127,8 @@ public CommandStore commandStore()
123
127
}
124
128
125
129
protected abstract PartialTxn rangeReadTxn (Ranges ranges );
126
-
127
130
protected abstract void onReadOk (Node .Id from , CommandStore commandStore , Data data , Ranges ranges );
128
-
129
- protected FetchRequest newFetchRequest (long sourceEpoch , TxnId syncId , Ranges ranges , PartialDeps partialDeps , PartialTxn partialTxn )
130
- {
131
- return new FetchRequest (sourceEpoch , syncId , ranges , partialDeps , rangeReadTxn (ranges ));
132
- }
131
+ protected abstract FetchRequest newFetchRequest (long sourceEpoch , TxnId syncId , Ranges ranges , PartialDeps partialDeps , PartialTxn partialTxn );
133
132
134
133
@ Override
135
134
public void contact (Node .Id to , Ranges ranges )
@@ -187,7 +186,7 @@ public void onSuccess(Node.Id from, ReadReply reply)
187
186
}
188
187
189
188
// TODO (now): make sure it works if invoked in either order
190
- inflight .remove (key ).started (ok .maxApplied );
189
+ inflight .remove (key ).started (ok .safeToReadAfter );
191
190
onReadOk (to , commandStore , ok .data , received );
192
191
// received must be invoked after submitting the persistence future, as it triggers onDone
193
192
// which creates a ReducingFuture over {@code persisting}
@@ -237,11 +236,16 @@ void abort(Ranges abort)
237
236
// TODO (expected): implement abort
238
237
}
239
238
240
- public static class FetchRequest extends ReadData
239
+ public static abstract class FetchRequest extends ReadData
241
240
{
241
+ // Note for future: we cannot safely execute on an Erased sync point without more work.
242
+ // Specifically, if the range has partially lost ownership on the recipient, the SyncPoint
243
+ // will not represent a safe point to snapshot from, and we won't have enough information to
244
+ // report the range as unavailable.
242
245
private static final ExecuteOn EXECUTE_ON = new ExecuteOn (Applied , TruncatedApply );
243
246
public final PartialTxn read ;
244
247
public final PartialDeps partialDeps ;
248
+ private transient Timestamp safeToReadAfter ;
245
249
246
250
public FetchRequest (long sourceEpoch , TxnId syncId , Ranges ranges , PartialDeps partialDeps , PartialTxn partialTxn )
247
251
{
@@ -268,22 +272,45 @@ protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp execu
268
272
return read .read (safeStore , executeAt , unavailable );
269
273
}
270
274
275
+ // must be invoked by implementations some time after the read has started OR must override safeToReadAt()
276
+ protected void readStarted (SafeCommandStore safeStore , Ranges unavailable )
277
+ {
278
+ safeToReadAfter = Timestamp .nonNullOrMax (Timestamp .NONE , Timestamp .nonNullOrMax (safeToReadAfter , safeStore .commandStore ().unsafeGetMaxConflicts ().foldl (Timestamp ::nonNullOrMax )));
279
+ }
280
+
281
+ protected Timestamp safeToReadAfter ()
282
+ {
283
+ return safeToReadAfter ;
284
+ }
285
+
271
286
@ Override
272
287
protected void readComplete (CommandStore commandStore , Data result , Ranges unavailable )
273
288
{
274
- Ranges reportUnavailable = unavailable .slice ((Ranges )this .readScope , Minimal );
289
+ Ranges reportUnavailable = unavailable == null ? null : unavailable .slice ((Ranges )this .readScope , Minimal );
275
290
super .readComplete (commandStore , result , reportUnavailable );
276
291
}
277
292
278
293
@ Override
279
294
protected ReadOk constructReadOk (Ranges unavailable , Data data )
280
295
{
281
- return new FetchResponse (unavailable , data , maxApplied ());
296
+ Timestamp safeToReadAfter = safeToReadAfter ();
297
+ Invariants .checkState (data == null || safeToReadAfter != null );
298
+ return new FetchResponse (unavailable , data , safeToReadAfter );
282
299
}
283
300
284
- protected Timestamp maxApplied ()
301
+ @ Override
302
+ protected void read (SafeCommandStore safeStore , Command command )
285
303
{
286
- return null ;
304
+ long retryInLaterEpoch = retryInLaterEpoch (executeAtEpoch , safeStore , command );
305
+ if (retryInLaterEpoch > 0 )
306
+ {
307
+ Ranges unavailable = ((Ranges )readScope ).slice (safeStore .ranges ().allAt (executeAtEpoch ), Minimal );
308
+ readComplete (safeStore .commandStore (), null , unavailable );
309
+ }
310
+ else
311
+ {
312
+ super .read (safeStore , command );
313
+ }
287
314
}
288
315
289
316
@ Override
@@ -295,11 +322,13 @@ public MessageType type()
295
322
296
323
public static class FetchResponse extends ReadOk
297
324
{
298
- public final @ Nullable Timestamp maxApplied ;
299
- public FetchResponse (@ Nullable Ranges unavailable , @ Nullable Data data , @ Nullable Timestamp maxApplied )
325
+ // only null if retryInFutureEpoch is set
326
+ public final @ Nullable Timestamp safeToReadAfter ;
327
+
328
+ public FetchResponse (@ Nullable Ranges unavailable , @ Nullable Data data , @ Nonnull Timestamp safeToReadAfter )
300
329
{
301
330
super (unavailable , data );
302
- this .maxApplied = maxApplied ;
331
+ this .safeToReadAfter = safeToReadAfter ;
303
332
}
304
333
305
334
@ Override
0 commit comments