31
31
32
32
import accord .api .Scheduler ;
33
33
import accord .coordinate .CoordinateGloballyDurable ;
34
- import accord .coordinate .CoordinateShardDurable ;
35
34
import accord .coordinate .CoordinationFailed ;
36
35
import accord .coordinate .ExecuteSyncPoint .SyncPointErased ;
37
36
import accord .local .Node ;
48
47
import accord .utils .async .AsyncChain ;
49
48
import accord .utils .async .AsyncResult ;
50
49
50
+ import static accord .coordinate .CoordinateShardDurable .coordinate ;
51
51
import static accord .coordinate .CoordinateSyncPoint .exclusiveSyncPoint ;
52
52
import static accord .primitives .Txn .Kind .ExclusiveSyncPoint ;
53
53
import static java .util .concurrent .TimeUnit .MICROSECONDS ;
54
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
55
+ import static java .util .concurrent .TimeUnit .MINUTES ;
54
56
55
57
/**
56
58
* Helper methods and classes to invoke coordination to propagate information about durability.
@@ -85,15 +87,15 @@ public class CoordinateDurabilityScheduling
85
87
private Scheduler .Scheduled scheduled ;
86
88
87
89
/*
88
- * In each round at each node wait this amount of time between initiating new CoordinateShardDurable
90
+ * In each cycle, attempt to split the range into this many pieces; if we fail, we increase the number of pieces
89
91
*/
90
- private long frequencyMicros = TimeUnit . MILLISECONDS . toMicros ( 500L ) ;
92
+ private int targetShardSplits = 64 ;
91
93
92
94
/*
93
95
* In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
94
96
* and coordinating the shard durability
95
97
*/
96
- private long txnIdLagMicros = TimeUnit .SECONDS .toMicros (1L );
98
+ private long txnIdLagMicros = TimeUnit .SECONDS .toMicros (5L );
97
99
98
100
/*
99
101
* In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
@@ -120,6 +122,10 @@ public class CoordinateDurabilityScheduling
120
122
*/
121
123
private long globalCycleTimeMicros = TimeUnit .SECONDS .toMicros (30 );
122
124
125
+ private long defaultRetryDelayMicros = TimeUnit .SECONDS .toMicros (1 );
126
+ private long maxRetryDelayMicros = TimeUnit .MINUTES .toMicros (1 );
127
+ private int maxNumberOfSplits = 1 << 10 ;
128
+
123
129
private Topology currentGlobalTopology ;
124
130
private final Map <Range , ShardScheduler > shardSchedulers = new HashMap <>();
125
131
private int globalIndex ;
@@ -131,16 +137,15 @@ private class ShardScheduler
131
137
{
132
138
Shard shard ;
133
139
134
- // based on ideal number of splits
140
+ // time to start a new cycle
135
141
int nodeOffset ;
136
142
137
143
int index ;
138
- int numberOfSplits , desiredNumberOfSplits ;
139
- boolean defunct ;
140
- long shardCycleTimeMicros ;
144
+ int numberOfSplits ;
141
145
Scheduler .Scheduled scheduled ;
142
- long rangeStartedAtMicros ;
143
- long cycleStartedAtMicros = -1 ;
146
+ long rangeStartedAtMicros , cycleStartedAtMicros ;
147
+ long retryDelayMicros = defaultRetryDelayMicros ;
148
+ boolean defunct ;
144
149
145
150
private ShardScheduler ()
146
151
{
@@ -150,32 +155,80 @@ synchronized void update(Shard shard, int offset)
150
155
{
151
156
this .shard = shard ;
152
157
this .nodeOffset = offset ;
153
- this .shardCycleTimeMicros = Math .max (CoordinateDurabilityScheduling .this .shardCycleTimeMicros , shard .rf () * 3L * frequencyMicros );
154
- this .desiredNumberOfSplits = (int ) ((shardCycleTimeMicros + frequencyMicros - 1 ) / frequencyMicros );
155
- if (numberOfSplits == 0 || numberOfSplits < desiredNumberOfSplits )
156
- {
157
- index = offset ;
158
- numberOfSplits = desiredNumberOfSplits ;
159
- }
158
+ if (numberOfSplits == 0 || numberOfSplits < targetShardSplits )
159
+ numberOfSplits = targetShardSplits ;
160
160
}
161
161
162
162
synchronized void markDefunct ()
163
163
{
164
164
defunct = true ;
165
+ logger .info ("Discarding defunct shard durability scheduler for {}" , shard );
165
166
}
166
167
167
- synchronized void schedule ()
168
+ synchronized void retryCoordinateDurability (Node node , SyncPoint <Range > exclusiveSyncPoint , int nextIndex )
169
+ {
170
+ if (defunct )
171
+ return ;
172
+
173
+ // TODO (expected): back-off
174
+ coordinateShardDurableAfterExclusiveSyncPoint (node , exclusiveSyncPoint , nextIndex );
175
+ }
176
+
177
+ synchronized void restart ()
168
178
{
169
179
if (defunct )
170
180
return ;
171
181
172
182
long nowMicros = node .elapsed (MICROSECONDS );
173
- int cyclePosition = (nodeOffset + (((index * shard .rf ()) + numberOfSplits - 1 ) / numberOfSplits )) % shard .rf ();
174
- long microsOffset = (cyclePosition * shardCycleTimeMicros ) / shard .rf ();
183
+ long microsOffset = (nodeOffset * shardCycleTimeMicros ) / shard .rf ();
175
184
long scheduleAt = nowMicros - (nowMicros % shardCycleTimeMicros ) + microsOffset ;
176
185
if (nowMicros > scheduleAt )
177
186
scheduleAt += shardCycleTimeMicros ;
178
187
188
+ if (numberOfSplits < targetShardSplits )
189
+ numberOfSplits = targetShardSplits ;
190
+
191
+ cycleStartedAtMicros = scheduleAt ;
192
+ scheduleAt (nowMicros , scheduleAt );
193
+ }
194
+
195
+ synchronized void schedule ()
196
+ {
197
+ if (defunct )
198
+ return ;
199
+
200
+ long nowMicros = node .elapsed (MICROSECONDS );
201
+ long microsOffset = (index * shardCycleTimeMicros ) / numberOfSplits ;
202
+ long scheduleAt = cycleStartedAtMicros + microsOffset ;
203
+ if (retryDelayMicros > defaultRetryDelayMicros )
204
+ {
205
+ retryDelayMicros = Math .max (defaultRetryDelayMicros , (long )(0.9 * retryDelayMicros ));
206
+ }
207
+ if (numberOfSplits > targetShardSplits && index % 4 == 0 )
208
+ {
209
+ index /= 4 ;
210
+ numberOfSplits /=4 ;
211
+ }
212
+ scheduleAt (nowMicros , scheduleAt );
213
+ }
214
+
215
+ synchronized void retry ()
216
+ {
217
+ if (defunct )
218
+ return ;
219
+
220
+ long nowMicros = node .elapsed (MICROSECONDS );
221
+ long scheduleAt = nowMicros + retryDelayMicros ;
222
+ retryDelayMicros += retryDelayMicros / 2 ;
223
+ if (retryDelayMicros > maxRetryDelayMicros )
224
+ {
225
+ retryDelayMicros = maxRetryDelayMicros ;
226
+ }
227
+ scheduleAt (nowMicros , scheduleAt );
228
+ }
229
+
230
+ synchronized void scheduleAt (long nowMicros , long scheduleAt )
231
+ {
179
232
ShardDistributor distributor = node .commandStores ().shardDistributor ();
180
233
Range range ;
181
234
int nextIndex ;
@@ -188,11 +241,13 @@ synchronized void schedule()
188
241
nextIndex = i ;
189
242
}
190
243
191
- scheduled = node . scheduler (). once ( () -> {
244
+ Runnable schedule = () -> {
192
245
// TODO (required): allocate stale HLC from a reservation of HLCs for this purpose
193
246
TxnId syncId = node .nextTxnId (ExclusiveSyncPoint , Domain .Range );
194
247
startShardSync (syncId , Ranges .of (range ), nextIndex );
195
- }, scheduleAt - nowMicros , MICROSECONDS );
248
+ };
249
+ if (scheduleAt <= nowMicros ) schedule .run ();
250
+ else scheduled = node .scheduler ().once (schedule , scheduleAt - nowMicros , MICROSECONDS );
196
251
}
197
252
198
253
/**
@@ -204,6 +259,7 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex)
204
259
scheduled = node .scheduler ().once (() -> node .withEpoch (syncId .epoch (), (ignored , withEpochFailure ) -> {
205
260
if (withEpochFailure != null )
206
261
{
262
+ // don't wait on epoch failure - we aren't the cause of any problems
207
263
startShardSync (syncId , ranges , nextIndex );
208
264
Throwable wrapped = CoordinationFailed .wrap (withEpochFailure );
209
265
logger .trace ("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + syncId .epoch (), wrapped );
@@ -219,10 +275,13 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex)
219
275
{
220
276
synchronized (ShardScheduler .this )
221
277
{
222
- index *= 2 ;
223
- numberOfSplits *= 2 ;
224
278
// TODO (required): try to recover or invalidate prior sync point
225
- schedule ();
279
+ retry ();
280
+ if (numberOfSplits * 2 <= maxNumberOfSplits )
281
+ {
282
+ index *= 2 ;
283
+ numberOfSplits *= 2 ;
284
+ }
226
285
logger .warn ("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits , syncId , ranges , fail );
227
286
}
228
287
}
@@ -240,68 +299,92 @@ private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint<
240
299
scheduled = node .scheduler ().once (() -> {
241
300
scheduled = null ;
242
301
node .commandStores ().any ().execute (() -> {
243
- CoordinateShardDurable .coordinate (node , exclusiveSyncPoint )
244
- .addCallback ((success , fail ) -> {
245
- if (fail != null && fail .getClass () != SyncPointErased .class )
246
- {
247
- logger .trace ("Exception coordinating local shard durability, will retry immediately" , fail );
248
- coordinateShardDurableAfterExclusiveSyncPoint (node , exclusiveSyncPoint , nextIndex );
249
- }
250
- else
251
- {
252
- synchronized (ShardScheduler .this )
253
- {
254
- index = nextIndex ;
255
- if (index >= numberOfSplits )
256
- {
257
- index = 0 ;
258
- long nowMicros = node .elapsed (MICROSECONDS );
259
- String reportTime = "" ;
260
- if (cycleStartedAtMicros > 0 )
261
- reportTime = "in " + MICROSECONDS .toSeconds (nowMicros - cycleStartedAtMicros ) + 's' ;
262
- logger .info ("Successfully completed one cycle of durability scheduling for shard {}{}" , shard .range , reportTime );
263
- if (numberOfSplits > desiredNumberOfSplits )
264
- numberOfSplits = Math .max (desiredNumberOfSplits , (int )(numberOfSplits * 0.9 ));
265
- cycleStartedAtMicros = nowMicros ;
266
- }
267
- else
268
- {
269
- long nowMicros = node .elapsed (MICROSECONDS );
270
- logger .debug ("Successfully coordinated shard durability for range {} in {}s" , shard .range , MICROSECONDS .toSeconds (nowMicros - rangeStartedAtMicros ));
271
- }
272
-
273
- schedule ();
274
- }
275
- }
276
- });
302
+ coordinate (node , exclusiveSyncPoint )
303
+ .addCallback ((success , fail ) -> {
304
+ if (fail != null && fail .getClass () != SyncPointErased .class )
305
+ {
306
+ logger .debug ("Exception coordinating shard durability for {}, will retry" , exclusiveSyncPoint .route .toRanges (), fail );
307
+ retryCoordinateDurability (node , exclusiveSyncPoint , nextIndex );
308
+ }
309
+ else
310
+ {
311
+ try
312
+ {
313
+ synchronized (ShardScheduler .this )
314
+ {
315
+ int prevIndex = index ;
316
+ index = nextIndex ;
317
+ if (index >= numberOfSplits )
318
+ {
319
+ index = 0 ;
320
+ long nowMicros = node .elapsed (MICROSECONDS );
321
+ long timeTakenSeconds = MICROSECONDS .toSeconds (nowMicros - cycleStartedAtMicros );
322
+ long targetTimeSeconds = MILLISECONDS .toSeconds (shardCycleTimeMicros );
323
+ logger .info ("Successfully completed one cycle of durability scheduling for shard {} in {}s (vs {}s target)" , shard .range , timeTakenSeconds , targetTimeSeconds );
324
+ restart ();
325
+ }
326
+ else
327
+ {
328
+ long nowMicros = node .elapsed (MICROSECONDS );
329
+ int prevRfCycle = (prevIndex * shard .rf ()) / numberOfSplits ;
330
+ int curRfCycle = (index * shard .rf ()) / numberOfSplits ;
331
+ if (prevRfCycle != curRfCycle )
332
+ {
333
+ long targetTimeSeconds = MICROSECONDS .toSeconds ((index * shardCycleTimeMicros ) / numberOfSplits );
334
+ long timeTakenSeconds = MICROSECONDS .toSeconds (nowMicros - cycleStartedAtMicros );
335
+ logger .info ("Successfully completed {}/{} cycle of durability scheduling covering range {}. Completed in {}s (vs {}s target)." , curRfCycle , shard .rf (), exclusiveSyncPoint .route .toRanges (), timeTakenSeconds , targetTimeSeconds );
336
+ }
337
+ else if (logger .isTraceEnabled ())
338
+ {
339
+ logger .trace ("Successfully coordinated shard durability for range {} in {}s" , shard .range , MICROSECONDS .toSeconds (nowMicros - rangeStartedAtMicros ));
340
+ }
341
+ schedule ();
342
+ }
343
+ }
344
+ }
345
+ catch (Throwable t )
346
+ {
347
+ retry ();
348
+ logger .error ("Unexpected exception handling durability scheduling callback; starting from scratch" , t );
349
+ }
350
+ }
351
+ });
277
352
});
278
353
}, durabilityLagMicros , MICROSECONDS );
279
354
}
280
-
281
355
}
282
356
283
-
284
357
public CoordinateDurabilityScheduling (Node node )
285
358
{
286
359
this .node = node ;
287
360
}
288
361
289
- public void setFrequency (int frequency , TimeUnit units )
362
+ public void setTargetShardSplits (int targetShardSplits )
363
+ {
364
+ this .targetShardSplits = targetShardSplits ;
365
+ }
366
+
367
+ public void setDefaultRetryDelay (long retryDelay , TimeUnit units )
290
368
{
291
- this .frequencyMicros = Ints . saturatedCast ( units .toMicros (frequency ) );
369
+ this .defaultRetryDelayMicros = units .toMicros (retryDelay );
292
370
}
293
371
294
- public void setTxnIdLag (int txnIdLag , TimeUnit units )
372
+ public void setMaxRetryDelay (long retryDelay , TimeUnit units )
373
+ {
374
+ this .maxRetryDelayMicros = units .toMicros (retryDelay );
375
+ }
376
+
377
+ public void setTxnIdLag (long txnIdLag , TimeUnit units )
295
378
{
296
379
this .txnIdLagMicros = Ints .saturatedCast (units .toMicros (txnIdLag ));
297
380
}
298
381
299
- public void setDurabilityLag (int durabilityLag , TimeUnit units )
382
+ public void setDurabilityLag (long durabilityLag , TimeUnit units )
300
383
{
301
384
this .durabilityLagMicros = Ints .saturatedCast (units .toMicros (durabilityLag ));
302
385
}
303
386
304
- public void setShardCycleTime (int shardCycleTime , TimeUnit units )
387
+ public void setShardCycleTime (long shardCycleTime , TimeUnit units )
305
388
{
306
389
this .shardCycleTimeMicros = Ints .saturatedCast (units .toMicros (shardCycleTime ));
307
390
}
@@ -319,7 +402,7 @@ public synchronized void start()
319
402
Invariants .checkState (!stop ); // cannot currently restart safely
320
403
long nowMicros = node .elapsed (MICROSECONDS );
321
404
setNextGlobalSyncTime (nowMicros );
322
- scheduled = node .scheduler ().recurring (this ::run , frequencyMicros , MICROSECONDS );
405
+ scheduled = node .scheduler ().recurring (this ::run , 1L , MINUTES );
323
406
}
324
407
325
408
public void stop ()
@@ -351,7 +434,6 @@ private void run()
351
434
}
352
435
}
353
436
354
-
355
437
private void startGlobalSync ()
356
438
{
357
439
try
@@ -369,7 +451,7 @@ private void startGlobalSync()
369
451
}
370
452
}
371
453
372
- private void updateTopology ()
454
+ public synchronized void updateTopology ()
373
455
{
374
456
Topology latestGlobal = node .topology ().current ();
375
457
if (latestGlobal == currentGlobalTopology )
@@ -395,7 +477,10 @@ private void updateTopology()
395
477
shardSchedulers .put (shard .range , scheduler );
396
478
scheduler .update (shard , shard .nodes .find (node .id ()));
397
479
if (prevScheduler == null )
398
- scheduler .schedule ();
480
+ {
481
+ logger .info ("Starting shard durability scheduler for {}" , shard );
482
+ scheduler .restart ();
483
+ }
399
484
}
400
485
prev .forEach ((r , s ) -> s .markDefunct ());
401
486
}
@@ -432,6 +517,6 @@ private void setNextGlobalSyncTime(long nowMicros)
432
517
if (targetTimeInCurrentRound < nowMicros )
433
518
targetTime += totalRoundDuration ;
434
519
435
- nextGlobalSyncTimeMicros = targetTime - nowMicros ;
520
+ nextGlobalSyncTimeMicros = targetTime ;
436
521
}
437
522
}
0 commit comments