@@ -73,13 +73,15 @@ public CompletableFuture<Void> restoreFrom(String leader, KVRangeSnapshot rangeS
73
73
CompletableFuture <Void > onDone = session .doneFuture ;
74
74
try {
75
75
IKVReseter restorer = range .toReseter (rangeSnapshot );
76
- log .info ("Restoring from snapshot: session={}, leader={} \n {}" , session .id , leader , rangeSnapshot );
76
+ log .info ("Restoring from snapshot: session={}, leader={}, restorer={} \n {}" ,
77
+ session .id , leader , restorer , rangeSnapshot );
77
78
DisposableObserver <KVRangeMessage > observer = messenger .receive ()
78
79
.filter (m -> m .hasSaveSnapshotDataRequest ()
79
80
&& m .getSaveSnapshotDataRequest ().getSessionId ().equals (session .id ))
80
81
.timeout (idleTimeSec , TimeUnit .SECONDS )
81
82
.observeOn (Schedulers .from (executor ))
82
83
.subscribeWith (new DisposableObserver <KVRangeMessage >() {
84
+ private final IKVReseter reseter = restorer ;
83
85
@ Override
84
86
public void onNext (@ NonNull KVRangeMessage m ) {
85
87
SaveSnapshotDataRequest request = m .getSaveSnapshotDataRequest ();
@@ -90,19 +92,19 @@ public void onNext(@NonNull KVRangeMessage m) {
90
92
for (KVPair kv : request .getKvList ()) {
91
93
bytes += kv .getKey ().size ();
92
94
bytes += kv .getValue ().size ();
93
- restorer .put (kv .getKey (), kv .getValue ());
95
+ reseter .put (kv .getKey (), kv .getValue ());
94
96
}
95
97
metricManager .reportRestore (bytes );
96
98
log .debug ("Saved {} bytes snapshot data, send reply to {}: session={}" ,
97
99
bytes , m .getHostStoreId (), session .id );
98
100
if (request .getFlag () == SaveSnapshotDataRequest .Flag .End ) {
99
101
if (!onDone .isCancelled ()) {
100
- restorer .done ();
102
+ reseter .done ();
101
103
dispose ();
102
104
onDone .complete (null );
103
105
log .info ("Restored from snapshot: session={}" , session .id );
104
106
} else {
105
- restorer .abort ();
107
+ reseter .abort ();
106
108
dispose ();
107
109
log .info ("Snapshot restore canceled: session={}" , session .id );
108
110
}
@@ -136,8 +138,8 @@ public void onNext(@NonNull KVRangeMessage m) {
136
138
137
139
@ Override
138
140
public void onError (@ NonNull Throwable e ) {
139
- if (restorer != null ) {
140
- restorer .abort ();
141
+ if (reseter != null ) {
142
+ reseter .abort ();
141
143
} else {
142
144
log .error ("restorer is null" , e );
143
145
}
0 commit comments