Skip to content

Commit 312760a

Browse files
committed
ZOOKEEPER-4883: Rollover leader epoch when zxid counter is exhausted
The rollover procedure: 1. Treats last proposal of an epoch as rollover proposal. 2. Requests from next epoch are proposed normally. 3. Fences next epoch once rollover proposal persisted. 4. Proposals from next epoch will not be written to disk before rollover committed. 5. Leader commits rollover proposal once it get quorum ACKs. 6. Blocked new epoch proposals are logged once rollover proposal is committed in corresponding nodes. This results in: 1. No other leader cloud lead using next epoch number once rollover proposal is considered committed. 2. No proposals from next epoch will be written to disk before rollover proposal is considered committed. Refs: ZOOKEEPER-1277, ZOOKEEPER-2789, ZOOKEEPER-4870, ZOOKEEPER-4882, ZOOKEEPER-4570 and ZOOKEEPER-4571
1 parent 3d6c0d1 commit 312760a

20 files changed

+1270
-123
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.zookeeper.server.quorum.LearnerHandler;
3535
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
3636
import org.apache.zookeeper.server.util.AuthUtil;
37+
import org.apache.zookeeper.server.util.ZxidUtils;
3738
import org.apache.zookeeper.txn.TxnDigest;
3839
import org.apache.zookeeper.txn.TxnHeader;
3940
import org.slf4j.Logger;
@@ -377,6 +378,10 @@ public boolean isQuorum() {
377378
}
378379
}
379380

381+
public boolean isRollover() {
382+
return isQuorum() && zxid > 0 && ZxidUtils.isLastEpochZxid(zxid);
383+
}
384+
380385
public static String op2String(int op) {
381386
switch (op) {
382387
case OpCode.notification:

zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import java.util.concurrent.ThreadLocalRandom;
3030
import java.util.concurrent.TimeUnit;
3131
import org.apache.zookeeper.common.Time;
32+
import org.apache.zookeeper.server.quorum.ObserverZooKeeperServer;
33+
import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
34+
import org.apache.zookeeper.server.util.ZxidUtils;
3235
import org.slf4j.Logger;
3336
import org.slf4j.LoggerFactory;
3437

@@ -205,15 +208,15 @@ public void run() {
205208
// iff this is a read or a throttled request(which doesn't need to be written to the disk),
206209
// and there are no pending flushes (writes), then just pass this to the next processor
207210
if (nextProcessor != null) {
208-
nextProcessor.processRequest(si);
211+
handover(si);
209212
if (nextProcessor instanceof Flushable) {
210213
((Flushable) nextProcessor).flush();
211214
}
212215
}
213216
continue;
214217
}
215218
toFlush.add(si);
216-
if (shouldFlush()) {
219+
if (si.isRollover() || shouldFlush()) {
217220
flush();
218221
}
219222
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
@@ -224,6 +227,19 @@ public void run() {
224227
LOG.info("SyncRequestProcessor exited!");
225228
}
226229

230+
private void handover(Request request) throws IOException, RequestProcessorException {
231+
if (request.isRollover() && zks instanceof QuorumZooKeeperServer) {
232+
long nextEpoch = ZxidUtils.getEpochFromZxid(request.zxid) + 1;
233+
// Fences upcoming epoch in leader election. So there will be no chance for other peer
234+
// to lead next epoch if this request is considered committed.
235+
((QuorumZooKeeperServer) zks).fenceRolloverEpoch(nextEpoch);
236+
if (zks instanceof ObserverZooKeeperServer) {
237+
((ObserverZooKeeperServer) zks).confirmRolloverEpoch(nextEpoch);
238+
}
239+
}
240+
nextProcessor.processRequest(request);
241+
}
242+
227243
private void flush() throws IOException, RequestProcessorException {
228244
if (this.toFlush.isEmpty()) {
229245
return;
@@ -242,7 +258,7 @@ private void flush() throws IOException, RequestProcessorException {
242258
final Request i = this.toFlush.remove();
243259
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
244260
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
245-
this.nextProcessor.processRequest(i);
261+
handover(i);
246262
}
247263
if (this.nextProcessor instanceof Flushable) {
248264
((Flushable) this.nextProcessor).flush();

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public class Follower extends Learner {
4848

4949
ObserverMaster om;
5050

51-
Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) {
51+
// VisibleForTesting
52+
public Follower(final QuorumPeer self, final FollowerZooKeeperServer zk) {
5253
this.self = Objects.requireNonNull(self);
5354
this.fzk = Objects.requireNonNull(zk);
5455
this.zk = zk;

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,19 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
4747

4848
private static final Logger LOG = LoggerFactory.getLogger(FollowerZooKeeperServer.class);
4949

50+
// This should be final as it is constructed with no external variables. It is not to allow mockito spy which
51+
// intercepts `this`.
52+
private ParticipantRequestSyncer requestSyncer;
53+
5054
/*
5155
* Pending sync requests
5256
*/ ConcurrentLinkedQueue<Request> pendingSyncs;
5357

5458
/**
5559
* @throws IOException
5660
*/
57-
FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
61+
// VisibleForTesting
62+
public FollowerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
5863
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
5964
this.pendingSyncs = new ConcurrentLinkedQueue<>();
6065
}
@@ -72,14 +77,17 @@ protected void setupRequestProcessors() {
7277
((FollowerRequestProcessor) firstProcessor).start();
7378
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
7479
syncProcessor.start();
80+
requestSyncer = new ParticipantRequestSyncer(this, LOG, this::syncRequest);
7581
}
7682

7783
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();
7884

7985
public void logRequest(Request request) {
80-
if ((request.zxid & 0xffffffffL) != 0) {
81-
pendingTxns.add(request);
82-
}
86+
requestSyncer.syncRequest(request);
87+
}
88+
89+
private void syncRequest(Request request) {
90+
pendingTxns.add(request);
8391
syncProcessor.processRequest(request);
8492
}
8593

@@ -110,6 +118,7 @@ public void commit(long zxid) {
110118
Request request = pendingTxns.remove();
111119
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
112120
commitProcessor.commit(request);
121+
requestSyncer.finishCommit(request.zxid);
113122
}
114123

115124
public synchronized void sync() {

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,14 +1025,13 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol
10251025
// we're sending the designated leader, and if the leader is changing the followers are
10261026
// responsible for closing the connection - this way we are sure that at least a majority of them
10271027
// receive the commit message.
1028-
commitAndActivate(zxid, designatedLeader);
1028+
commitAndActivate(p, designatedLeader);
10291029
informAndActivate(p, designatedLeader);
10301030
} else {
10311031
p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
1032-
commit(zxid);
1032+
commit(p);
10331033
inform(p);
10341034
}
1035-
zk.commitProcessor.commit(p.request);
10361035
if (pendingSyncs.containsKey(zxid)) {
10371036
for (LearnerSyncRequest r : pendingSyncs.remove(zxid)) {
10381037
sendSync(r);
@@ -1065,16 +1064,7 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA
10651064
LOG.trace("outstanding proposals all");
10661065
}
10671066

1068-
if ((zxid & 0xffffffffL) == 0) {
1069-
/*
1070-
* We no longer process NEWLEADER ack with this method. However,
1071-
* the learner sends an ack back to the leader after it gets
1072-
* UPTODATE, so we just ignore the message.
1073-
*/
1074-
return;
1075-
}
1076-
1077-
if (outstandingProposals.size() == 0) {
1067+
if (outstandingProposals.isEmpty()) {
10781068
LOG.debug("outstanding is 0");
10791069
return;
10801070
}
@@ -1212,25 +1202,30 @@ void sendObserverPacket(QuorumPacket qp) {
12121202
long lastCommitted = -1;
12131203

12141204
/**
1215-
* Create a commit packet and send it to all the members of the quorum
1216-
*
1217-
* @param zxid
1205+
* Commit proposal to all connected followers including itself.
12181206
*/
1219-
public void commit(long zxid) {
1207+
public void commit(Proposal p) {
1208+
long zxid = p.getZxid();
12201209
synchronized (this) {
12211210
lastCommitted = zxid;
12221211
}
1212+
1213+
zk.commit(p.request);
1214+
12231215
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
12241216
sendPacket(qp);
12251217
ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
12261218
}
12271219

12281220
//commit and send some info
1229-
public void commitAndActivate(long zxid, long designatedLeader) {
1221+
public void commitAndActivate(Proposal p, long designatedLeader) {
1222+
long zxid = p.getZxid();
12301223
synchronized (this) {
12311224
lastCommitted = zxid;
12321225
}
12331226

1227+
zk.commit(p.request);
1228+
12341229
byte[] data = new byte[8];
12351230
ByteBuffer buffer = ByteBuffer.wrap(data);
12361231
buffer.putLong(designatedLeader);
@@ -1277,35 +1272,17 @@ public long getEpoch() {
12771272
return ZxidUtils.getEpochFromZxid(lastProposed);
12781273
}
12791274

1280-
@SuppressWarnings("serial")
1281-
public static class XidRolloverException extends Exception {
1282-
1283-
public XidRolloverException(String message) {
1284-
super(message);
1285-
}
1286-
1287-
}
1288-
12891275
/**
12901276
* create a proposal and send it out to all the members
12911277
*
12921278
* @param request
12931279
* @return the proposal that is queued to send to all the members
12941280
*/
1295-
public Proposal propose(Request request) throws XidRolloverException {
1281+
public Proposal propose(Request request) {
12961282
if (request.isThrottled()) {
12971283
LOG.error("Throttled request send as proposal: {}. Exiting.", request);
12981284
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
12991285
}
1300-
/**
1301-
* Address the rollover issue. All lower 32bits set indicate a new leader
1302-
* election. Force a re-election instead. See ZOOKEEPER-1277
1303-
*/
1304-
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
1305-
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
1306-
shutdown(msg);
1307-
throw new XidRolloverException(msg);
1308-
}
13091286

13101287
byte[] data = request.getSerializeData();
13111288
proposalStats.setLastBufferSize(data.length);
@@ -1331,6 +1308,7 @@ public Proposal propose(Request request) throws XidRolloverException {
13311308
sendPacket(pp);
13321309
}
13331310
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
1311+
zk.logRequest(request);
13341312
return p;
13351313
}
13361314

@@ -1465,6 +1443,22 @@ public void reportLookingSid(long sid) {
14651443
}
14661444
}
14671445

1446+
/**
1447+
* Comparing to {@link #getEpochToPropose(long, long)}, this method does not bump `acceptedEpoch`
1448+
* as the rollover txn may not be persisted yet.
1449+
*/
1450+
public void rolloverLeaderEpoch(long newEpoch) {
1451+
synchronized (connectingFollowers) {
1452+
if (waitingForNewEpoch) {
1453+
throw new IllegalStateException("ZAB is still waiting new epoch");
1454+
} else if (newEpoch != epoch + 1) {
1455+
String msg = String.format("can not rollover leader epoch to %s, current epoch is %s", newEpoch, epoch);
1456+
throw new IllegalArgumentException(msg);
1457+
}
1458+
epoch = newEpoch;
1459+
}
1460+
}
1461+
14681462
@Override
14691463
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
14701464
synchronized (connectingFollowers) {

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@
3333
import org.apache.zookeeper.server.RequestProcessor;
3434
import org.apache.zookeeper.server.ServerCnxn;
3535
import org.apache.zookeeper.server.ServerMetrics;
36+
import org.apache.zookeeper.server.SyncRequestProcessor;
3637
import org.apache.zookeeper.server.ZKDatabase;
3738
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
3841

3942
/**
4043
*
@@ -44,13 +47,18 @@
4447
* FinalRequestProcessor
4548
*/
4649
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
50+
private static final Logger LOG = LoggerFactory.getLogger(LeaderZooKeeperServer.class);
4751

4852
private ContainerManager containerManager; // guarded by sync
4953

5054
CommitProcessor commitProcessor;
5155

5256
PrepRequestProcessor prepRequestProcessor;
5357

58+
SyncRequestProcessor syncProcessor;
59+
private final ParticipantRequestSyncer requestSyncer =
60+
new ParticipantRequestSyncer(this, LOG, r -> syncProcessor.processRequest(r));
61+
5462
/**
5563
* @throws IOException
5664
*/
@@ -68,8 +76,10 @@ protected void setupRequestProcessors() {
6876
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
6977
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
7078
commitProcessor.start();
79+
AckRequestProcessor ackProcessor = new AckRequestProcessor(getLeader());
80+
syncProcessor = new SyncRequestProcessor(this, ackProcessor);
81+
syncProcessor.start();
7182
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
72-
proposalProcessor.initialize();
7383
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
7484
prepRequestProcessor.start();
7585
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
@@ -159,6 +169,7 @@ protected synchronized void shutdownComponents() {
159169
if (containerManager != null) {
160170
containerManager.stop();
161171
}
172+
syncProcessor.shutdown();
162173
super.shutdownComponents();
163174
}
164175

@@ -169,6 +180,21 @@ public int getGlobalOutstandingLimit() {
169180
return globalOutstandingLimit;
170181
}
171182

183+
@Override
184+
public void confirmRolloverEpoch(long newEpoch) {
185+
getLeader().rolloverLeaderEpoch(newEpoch);
186+
super.confirmRolloverEpoch(newEpoch);
187+
}
188+
189+
public void logRequest(Request request) {
190+
requestSyncer.syncRequest(request);
191+
}
192+
193+
public void commit(Request request) {
194+
commitProcessor.commit(request);
195+
requestSyncer.finishCommit(request.zxid);
196+
}
197+
172198
@Override
173199
public void createSessionTracker() {
174200
sessionTracker = new LeaderSessionTracker(

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,21 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
825825
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
826826
writePacket(ack, true);
827827
zk.startup();
828+
829+
long lastCommittedZxid = zk.getLastProcessedZxid();
830+
long lastCommittedEpoch = ZxidUtils.getEpochFromZxid(lastCommittedZxid);
831+
if (ZxidUtils.isLastEpochZxid(lastCommittedZxid)) {
832+
lastCommittedEpoch += 1;
833+
}
834+
LOG.debug("lastCommittedZxid {}, lastCommittedEpoch {} newEpoch {}",
835+
Long.toHexString(lastCommittedZxid), lastCommittedEpoch, newEpoch);
836+
if (lastCommittedEpoch > newEpoch) {
837+
LOG.info("Switch to new leader epoch {} from {}", lastCommittedEpoch, newEpoch);
838+
newEpoch = lastCommittedEpoch;
839+
self.setAcceptedEpoch(newEpoch);
840+
self.setCurrentEpoch(newEpoch);
841+
}
842+
828843
/*
829844
* Update the election vote here to ensure that all members of the
830845
* ensemble report the same vote to new servers that start up and

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,8 @@ public void run() {
587587
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
588588
}
589589
} else {
590+
LOG.info("Sending diffs last zxid of peer is 0x{}, zxid of leader is 0x{}",
591+
Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid));
590592
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
591593
syncThrottler.beginSync(exemptFromThrottle);
592594
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());

0 commit comments

Comments
 (0)