@@ -539,6 +539,27 @@ protected long registerWithLeader(int pktType) throws IOException {
539
539
}
540
540
}
541
541
542
+ long enforceContinuousProposal (long lastQueued , PacketInFlight pif ) throws Exception {
543
+ if (lastQueued == 0 ) {
544
+ LOG .info ("DIFF sync got first proposal 0x{}" , Long .toHexString (pif .hdr .getZxid ()));
545
+ } else if (pif .hdr .getZxid () != lastQueued + 1 ) {
546
+ if (ZxidUtils .getEpochFromZxid (pif .hdr .getZxid ()) <= ZxidUtils .getEpochFromZxid (lastQueued )) {
547
+ String msg = String .format (
548
+ "DIFF sync got proposal 0x%s, last queued 0x%s, expected 0x%s" ,
549
+ Long .toHexString (pif .hdr .getZxid ()), Long .toHexString (lastQueued ),
550
+ Long .toHexString (lastQueued + 1 ));
551
+ LOG .error (msg );
552
+ throw new Exception (msg );
553
+ }
554
+ // We can't tell whether it is a data loss. Given that new epoch is rare,
555
+ // log at warn should not be too verbose.
556
+ LOG .warn ("DIFF sync got new epoch proposal 0x{}, last queued 0x{}, expected 0x{}" ,
557
+ Long .toHexString (pif .hdr .getZxid ()), Long .toHexString (lastQueued ),
558
+ Long .toHexString (lastQueued + 1 ));
559
+ }
560
+ return pif .hdr .getZxid ();
561
+ }
562
+
542
563
/**
543
564
* Finally, synchronize our history with the Leader (if Follower)
544
565
* or the LearnerMaster (if Observer).
@@ -613,6 +634,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
613
634
zk .getZKDatabase ().initConfigInZKDatabase (self .getQuorumVerifier ());
614
635
zk .createSessionTracker ();
615
636
637
+ // TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently
638
+ // LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now.
616
639
long lastQueued = 0 ;
617
640
618
641
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
@@ -634,23 +657,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
634
657
pif .hdr = logEntry .getHeader ();
635
658
pif .rec = logEntry .getTxn ();
636
659
pif .digest = logEntry .getDigest ();
637
- if (lastQueued == 0 ) {
638
- LOG .info ("DIFF sync got first proposal 0x{}" , Long .toHexString (pif .hdr .getZxid ()));
639
- } else if (pif .hdr .getZxid () != lastQueued + 1 ) {
640
- if (ZxidUtils .getEpochFromZxid (pif .hdr .getZxid ()) <= ZxidUtils .getEpochFromZxid (lastQueued )) {
641
- String msg = String .format ("DIFF sync got proposal 0x%s which is not next of last proposal 0x%s" ,
642
- Long .toHexString (pif .hdr .getZxid ()),
643
- Long .toHexString (lastQueued ));
644
- LOG .error (msg );
645
- throw new Exception (msg );
646
- }
647
- // We can't tell whether it is a data loss. Given that new epoch is rare,
648
- // log at warn should not be too verbose.
649
- LOG .warn ("DIFF sync got new epoch proposal 0x{}, last proposal 0x{}" ,
650
- Long .toHexString (pif .hdr .getZxid ()),
651
- Long .toHexString (lastQueued ));
652
- }
653
- lastQueued = pif .hdr .getZxid ();
660
+ lastQueued = enforceContinuousProposal (lastQueued , pif );
654
661
655
662
if (pif .hdr .getType () == OpCode .reconfig ) {
656
663
SetDataTxn setDataTxn = (SetDataTxn ) pif .rec ;
@@ -710,23 +717,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
710
717
packet .rec = logEntry .getTxn ();
711
718
packet .hdr = logEntry .getHeader ();
712
719
packet .digest = logEntry .getDigest ();
713
- if (lastQueued == 0 ) {
714
- LOG .info ("DIFF sync got first proposal 0x{}" , Long .toHexString (packet .hdr .getZxid ()));
715
- } else if (packet .hdr .getZxid () != lastQueued + 1 ) {
716
- if (ZxidUtils .getEpochFromZxid (packet .hdr .getZxid ()) <= ZxidUtils .getEpochFromZxid (lastQueued )) {
717
- String msg = String .format ("DIFF sync got proposal 0x%s which is not next of last proposal 0x%s" ,
718
- Long .toHexString (packet .hdr .getZxid ()),
719
- Long .toHexString (lastQueued ));
720
- LOG .error (msg );
721
- throw new Exception (msg );
722
- }
723
- // We can't tell whether it is a data loss. Given that new epoch is rare,
724
- // log at warn should not be too verbose.
725
- LOG .warn ("DIFF sync got new epoch proposal 0x{}, last proposal 0x{}" ,
726
- Long .toHexString (packet .hdr .getZxid ()),
727
- Long .toHexString (lastQueued ));
728
- }
729
- lastQueued = packet .hdr .getZxid ();
720
+ lastQueued = enforceContinuousProposal (lastQueued , packet );
730
721
}
731
722
if (!writeToTxnLog ) {
732
723
// Apply to db directly if we haven't taken the snapshot
@@ -803,8 +794,9 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
803
794
continue ;
804
795
}
805
796
packetsNotLogged .removeFirst ();
806
- fzk .appendRequest (pif .toRequest ());
807
- fzk .processTxn (pif .toRequest ());
797
+ Request request = pif .toRequest ();
798
+ fzk .appendRequest (request );
799
+ fzk .processTxn (request );
808
800
}
809
801
810
802
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
0 commit comments