Skip to content

Commit 7ee196d

Browse files
committed
ZOOKEEPER-4925: Fix data loss due to propagation of discontinuous committedLog (#2254)
There are two variants of `ZooKeeperServer::processTxn`. Those two variants diverge significantly since ZOOKEEPER-3484. `processTxn(Request request)` pops outstanding change from `outstandingChanges` and adds txn to `committedLog` for follower to sync in addition to what `processTxn(TxnHeader hdr, Record txn)` does. The `Learner` uses `processTxn(TxnHeader hdr, Record txn)` to commit txn to memory after ZOOKEEPER-4394, which means it leaves `committedLog` untouched in `SYNCHRONIZATION` phase. This way, a stale follower will have hole in its `committedLog` after joining cluster. The stale follower will propagate the in memory hole to other stale nodes after becoming leader. This causes data loss. The test case fails on master and 3.9.3, and passes on 3.9.2. So only 3.9.3 is affected. This commit drops `processTxn(TxnHeader hdr, Record txn)` as `processTxn(Request request)` is capable in `SYNCHRONIZATION` phase too. Also, this commit rejects discontinuous proposals in `syncWithLeader` and `committedLog`, so to avoid possible data loss. Refs: ZOOKEEPER-4925, ZOOKEEPER-4394, ZOOKEEPER-3484 Reviewers: li4wang Author: kezhuw Closes #2254 from kezhuw/ZOOKEEPER-4925-fix-data-loss (cherry picked from commit e5dd60b) Signed-off-by: Kezhu Wang <[email protected]>
1 parent 3112398 commit 7ee196d

File tree

11 files changed

+196
-81
lines changed

11 files changed

+196
-81
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,19 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon
7878
this.authInfo = null;
7979
}
8080

81+
public Request(TxnHeader hdr, Record txn, TxnDigest digest) {
82+
this.sessionId = hdr.getClientId();
83+
this.cxid = hdr.getCxid();
84+
this.type = hdr.getType();
85+
this.hdr = hdr;
86+
this.txn = txn;
87+
this.zxid = hdr.getZxid();
88+
this.request = null;
89+
this.cnxn = null;
90+
this.authInfo = null;
91+
this.txnDigest = digest;
92+
}
93+
8194
public final long sessionId;
8295

8396
public final int cxid;

zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,8 @@ public TxnHeader getHeader() {
4747
public TxnDigest getDigest() {
4848
return digest;
4949
}
50+
51+
public Request toRequest() {
52+
return new Request(header, txn, digest);
53+
}
5054
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
5959
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
6060
import org.apache.zookeeper.server.util.SerializeUtils;
61+
import org.apache.zookeeper.server.util.ZxidUtils;
6162
import org.apache.zookeeper.txn.TxnDigest;
6263
import org.apache.zookeeper.txn.TxnHeader;
6364
import org.slf4j.Logger;
@@ -82,6 +83,8 @@ public class ZKDatabase {
8283
protected FileTxnSnapLog snapLog;
8384
protected long minCommittedLog, maxCommittedLog;
8485

86+
private final boolean allowDiscontinuousProposals = Boolean.getBoolean("zookeeper.test.allowDiscontinuousProposals");
87+
8588
/**
8689
* Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot
8790
*/
@@ -170,8 +173,6 @@ public boolean isInitialized() {
170173
* data structures in zkdatabase.
171174
*/
172175
public void clear() {
173-
minCommittedLog = 0;
174-
maxCommittedLog = 0;
175176
/* to be safe we just create a new
176177
* datatree.
177178
*/
@@ -182,6 +183,8 @@ public void clear() {
182183
try {
183184
lock.lock();
184185
committedLog.clear();
186+
minCommittedLog = 0;
187+
maxCommittedLog = 0;
185188
} finally {
186189
lock.unlock();
187190
}
@@ -320,17 +323,30 @@ public void addCommittedProposal(Request request) {
320323
WriteLock wl = logLock.writeLock();
321324
try {
322325
wl.lock();
323-
if (committedLog.size() > commitLogCount) {
324-
committedLog.remove();
325-
minCommittedLog = committedLog.peek().getZxid();
326-
}
327326
if (committedLog.isEmpty()) {
328327
minCommittedLog = request.zxid;
329328
maxCommittedLog = request.zxid;
329+
} else if (request.zxid <= maxCommittedLog) {
330+
// This could happen if lastProcessedZxid is rewinded and database is re-synced.
331+
// Currently, it only happens in test codes, but it should also be safe for production path.
332+
return;
333+
} else if (!allowDiscontinuousProposals
334+
&& request.zxid != maxCommittedLog + 1
335+
&& ZxidUtils.getEpochFromZxid(request.zxid) <= ZxidUtils.getEpochFromZxid(maxCommittedLog)) {
336+
String msg = String.format(
337+
"Committed proposal cached out of order: 0x%s is not the next proposal of 0x%s",
338+
ZxidUtils.zxidToString(request.zxid),
339+
ZxidUtils.zxidToString(maxCommittedLog));
340+
LOG.error(msg);
341+
throw new IllegalStateException(msg);
330342
}
331343
PureRequestProposal p = new PureRequestProposal(request);
332344
committedLog.add(p);
333345
maxCommittedLog = p.getZxid();
346+
if (committedLog.size() > commitLogCount) {
347+
committedLog.remove();
348+
minCommittedLog = committedLog.peek().getZxid();
349+
}
334350
} finally {
335351
wl.unlock();
336352
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1846,13 +1846,6 @@ private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader r
18461846
cnxn.sendResponse(replyHeader, record, "response");
18471847
}
18481848

1849-
// entry point for quorum/Learner.java
1850-
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
1851-
processTxnForSessionEvents(null, hdr, txn);
1852-
return processTxnInDB(hdr, txn, null);
1853-
}
1854-
1855-
// entry point for FinalRequestProcessor.java
18561849
public ProcessTxnResult processTxn(Request request) {
18571850
TxnHeader hdr = request.getHdr();
18581851
processTxnForSessionEvents(request, hdr, request.getTxn());
@@ -1864,8 +1857,10 @@ public ProcessTxnResult processTxn(Request request) {
18641857
if (!writeRequest && !quorumRequest) {
18651858
return new ProcessTxnResult();
18661859
}
1860+
1861+
ProcessTxnResult rc;
18671862
synchronized (outstandingChanges) {
1868-
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
1863+
rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
18691864

18701865
// request.hdr is set for write requests, which are the only ones
18711866
// that add to outstandingChanges.
@@ -1886,13 +1881,13 @@ public ProcessTxnResult processTxn(Request request) {
18861881
}
18871882
}
18881883
}
1884+
}
18891885

1890-
// do not add non quorum packets to the queue.
1891-
if (quorumRequest) {
1892-
getZKDatabase().addCommittedProposal(request);
1893-
}
1894-
return rc;
1886+
// do not add non quorum packets to the queue.
1887+
if (quorumRequest) {
1888+
getZKDatabase().addCommittedProposal(request);
18951889
}
1890+
return rc;
18961891
}
18971892

18981893
private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.zookeeper.server.util.SerializeUtils;
3636
import org.apache.zookeeper.server.util.ZxidUtils;
3737
import org.apache.zookeeper.txn.SetDataTxn;
38-
import org.apache.zookeeper.txn.TxnDigest;
3938
import org.apache.zookeeper.txn.TxnHeader;
4039

4140
/**
@@ -164,7 +163,6 @@ protected void processPacket(QuorumPacket qp) throws Exception {
164163
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
165164
TxnHeader hdr = logEntry.getHeader();
166165
Record txn = logEntry.getTxn();
167-
TxnDigest digest = logEntry.getDigest();
168166
if (hdr.getZxid() != lastQueued + 1) {
169167
LOG.warn(
170168
"Got zxid 0x{} expected 0x{}",
@@ -179,7 +177,7 @@ protected void processPacket(QuorumPacket qp) throws Exception {
179177
self.setLastSeenQuorumVerifier(qv, true);
180178
}
181179

182-
fzk.logRequest(hdr, txn, digest);
180+
fzk.logRequest(logEntry.toRequest());
183181
if (hdr != null) {
184182
/*
185183
* Request header is created only by the leader, so this is only set

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.concurrent.ConcurrentLinkedQueue;
2323
import java.util.concurrent.LinkedBlockingQueue;
2424
import javax.management.JMException;
25-
import org.apache.jute.Record;
2625
import org.apache.zookeeper.jmx.MBeanRegistry;
2726
import org.apache.zookeeper.metrics.MetricsContext;
2827
import org.apache.zookeeper.server.ExitCode;
@@ -33,8 +32,6 @@
3332
import org.apache.zookeeper.server.SyncRequestProcessor;
3433
import org.apache.zookeeper.server.ZKDatabase;
3534
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
36-
import org.apache.zookeeper.txn.TxnDigest;
37-
import org.apache.zookeeper.txn.TxnHeader;
3835
import org.apache.zookeeper.util.ServiceUtils;
3936
import org.slf4j.Logger;
4037
import org.slf4j.LoggerFactory;
@@ -79,20 +76,17 @@ protected void setupRequestProcessors() {
7976

8077
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();
8178

82-
public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
83-
final Request request = buildRequestToProcess(hdr, txn, digest);
79+
public void logRequest(Request request) {
80+
if ((request.zxid & 0xffffffffL) != 0) {
81+
pendingTxns.add(request);
82+
}
8483
syncProcessor.processRequest(request);
8584
}
8685

8786
/**
88-
* Build a request for the txn and append it to the transaction log
89-
* @param hdr the txn header
90-
* @param txn the txn
91-
* @param digest the digest of txn
87+
* Append txn request to the transaction log directly without go through request processors.
9288
*/
93-
public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
94-
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
95-
request.setTxnDigest(digest);
89+
public void appendRequest(Request request) throws IOException {
9690
getZKDatabase().append(request);
9791
}
9892

@@ -188,20 +182,4 @@ protected void unregisterMetrics() {
188182
rootContext.unregisterGauge("synced_observers");
189183

190184
}
191-
192-
/**
193-
* Build a request for the txn
194-
* @param hdr the txn header
195-
* @param txn the txn
196-
* @param digest the digest of txn
197-
* @return a request moving through a chain of RequestProcessors
198-
*/
199-
private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
200-
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
201-
request.setTxnDigest(digest);
202-
if ((request.zxid & 0xffffffffL) != 0) {
203-
pendingTxns.add(request);
204-
}
205-
return request;
206-
}
207185
}

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ static class PacketInFlight {
8282
Record rec;
8383
TxnDigest digest;
8484

85+
Request toRequest() {
86+
return new Request(hdr, rec, digest);
87+
}
88+
8589
}
8690

8791
QuorumPeer self;
@@ -535,6 +539,27 @@ protected long registerWithLeader(int pktType) throws IOException {
535539
}
536540
}
537541

542+
long enforceContinuousProposal(long lastQueued, PacketInFlight pif) throws Exception {
543+
if (lastQueued == 0) {
544+
LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(pif.hdr.getZxid()));
545+
} else if (pif.hdr.getZxid() != lastQueued + 1) {
546+
if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) {
547+
String msg = String.format(
548+
"DIFF sync got proposal 0x%s, last queued 0x%s, expected 0x%s",
549+
Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued),
550+
Long.toHexString(lastQueued + 1));
551+
LOG.error(msg);
552+
throw new Exception(msg);
553+
}
554+
// We can't tell whether it is a data loss. Given that new epoch is rare,
555+
// log at warn should not be too verbose.
556+
LOG.warn("DIFF sync got new epoch proposal 0x{}, last queued 0x{}, expected 0x{}",
557+
Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued),
558+
Long.toHexString(lastQueued + 1));
559+
}
560+
return pif.hdr.getZxid();
561+
}
562+
538563
/**
539564
* Finally, synchronize our history with the Leader (if Follower)
540565
* or the LearnerMaster (if Observer).
@@ -609,6 +634,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
609634
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
610635
zk.createSessionTracker();
611636

637+
// TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently
638+
// LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now.
612639
long lastQueued = 0;
613640

614641
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
@@ -630,13 +657,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
630657
pif.hdr = logEntry.getHeader();
631658
pif.rec = logEntry.getTxn();
632659
pif.digest = logEntry.getDigest();
633-
if (pif.hdr.getZxid() != lastQueued + 1) {
634-
LOG.warn(
635-
"Got zxid 0x{} expected 0x{}",
636-
Long.toHexString(pif.hdr.getZxid()),
637-
Long.toHexString(lastQueued + 1));
638-
}
639-
lastQueued = pif.hdr.getZxid();
660+
lastQueued = enforceContinuousProposal(lastQueued, pif);
640661

641662
if (pif.hdr.getType() == OpCode.reconfig) {
642663
SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
@@ -666,7 +687,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
666687
Long.toHexString(qp.getZxid()),
667688
Long.toHexString(pif.hdr.getZxid()));
668689
} else {
669-
zk.processTxn(pif.hdr, pif.rec);
690+
zk.processTxn(pif.toRequest());
670691
packetsNotLogged.remove();
671692
}
672693
} else {
@@ -696,18 +717,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
696717
packet.rec = logEntry.getTxn();
697718
packet.hdr = logEntry.getHeader();
698719
packet.digest = logEntry.getDigest();
699-
// Log warning message if txn comes out-of-order
700-
if (packet.hdr.getZxid() != lastQueued + 1) {
701-
LOG.warn(
702-
"Got zxid 0x{} expected 0x{}",
703-
Long.toHexString(packet.hdr.getZxid()),
704-
Long.toHexString(lastQueued + 1));
705-
}
706-
lastQueued = packet.hdr.getZxid();
720+
lastQueued = enforceContinuousProposal(lastQueued, packet);
707721
}
708722
if (!writeToTxnLog) {
709723
// Apply to db directly if we haven't taken the snapshot
710-
zk.processTxn(packet.hdr, packet.rec);
724+
zk.processTxn(packet.toRequest());
711725
} else {
712726
packetsNotLogged.add(packet);
713727
packetsCommitted.add(qp.getZxid());
@@ -780,8 +794,9 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
780794
continue;
781795
}
782796
packetsNotLogged.removeFirst();
783-
fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
784-
fzk.processTxn(pif.hdr, pif.rec);
797+
Request request = pif.toRequest();
798+
fzk.appendRequest(request);
799+
fzk.processTxn(request);
785800
}
786801

787802
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
@@ -823,7 +838,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
823838
if (zk instanceof FollowerZooKeeperServer) {
824839
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
825840
for (PacketInFlight p : packetsNotLogged) {
826-
fzk.logRequest(p.hdr, p.rec, p.digest);
841+
fzk.logRequest(p.toRequest());
827842
}
828843
LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size());
829844

@@ -847,8 +862,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
847862
continue;
848863
}
849864
packetsCommitted.remove();
850-
Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
851-
request.setTxnDigest(p.digest);
865+
Request request = p.toRequest();
852866
ozk.commitRequest(request);
853867
}
854868
} else {

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,8 @@ protected void processPacket(QuorumPacket qp) throws Exception {
202202
case Leader.INFORM:
203203
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
204204
logEntry = SerializeUtils.deserializeTxn(qp.getData());
205-
hdr = logEntry.getHeader();
206-
txn = logEntry.getTxn();
207-
digest = logEntry.getDigest();
208-
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
205+
Request request = logEntry.toRequest();
209206
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
210-
request.setTxnDigest(digest);
211207
ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
212208
obs.commitRequest(request);
213209
break;
@@ -219,13 +215,10 @@ protected void processPacket(QuorumPacket qp) throws Exception {
219215
byte[] remainingdata = new byte[buffer.remaining()];
220216
buffer.get(remainingdata);
221217
logEntry = SerializeUtils.deserializeTxn(remainingdata);
222-
hdr = logEntry.getHeader();
223218
txn = logEntry.getTxn();
224-
digest = logEntry.getDigest();
225219
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));
226220

227-
request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
228-
request.setTxnDigest(digest);
221+
request = logEntry.toRequest();
229222
obs = (ObserverZooKeeperServer) zk;
230223

231224
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);

0 commit comments

Comments
 (0)