Skip to content

Commit 6f6757b

Browse files
AnanyaSingh2121arshadmohammad
authored andcommitted
ZOOKEEPER-4433 : Backport ZOOKEEPER-2872 for branch-3.5
Author: Ananya Singh <[email protected]> Reviewers: Brahma Reddy Battula <[email protected]>, Norbert Kalmar <[email protected]>, Mohammad Arshad <[email protected]> Closes #1790 from AnanyaSingh2121/ZOOKEEPER-4433
1 parent fc1764b commit 6f6757b

File tree

7 files changed

+34
-18
lines changed

7 files changed

+34
-18
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,13 @@ public void loadData() throws IOException, InterruptedException {
338338
takeSnapshot();
339339
}
340340

341-
public void takeSnapshot(){
341+
public void takeSnapshot() {
342+
takeSnapshot(false);
343+
}
344+
345+
public void takeSnapshot(boolean syncSnap){
342346
try {
343-
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
347+
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
344348
} catch (IOException e) {
345349
LOG.error("Severe unrecoverable error, exiting", e);
346350
// This is a severe error that we cannot recover from,

zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.jute.BinaryOutputArchive;
3838
import org.apache.jute.InputArchive;
3939
import org.apache.jute.OutputArchive;
40+
import org.apache.zookeeper.common.AtomicFileOutputStream;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
4243
import org.apache.zookeeper.server.DataTree;
@@ -215,20 +216,23 @@ protected void serialize(DataTree dt,Map<Long, Integer> sessions,
215216
* @param dt the datatree to be serialized
216217
* @param sessions the sessions to be serialized
217218
* @param snapShot the file to store snapshot into
219+
* @param fsync sync the file immediately after write
218220
*/
219-
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
221+
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)
220222
throws IOException {
221223
if (!close) {
222-
try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
223-
CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
224+
try (CheckedOutputStream crcOut =
225+
new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) :
226+
new FileOutputStream(snapShot)),
227+
new Adler32())) {
224228
//CheckedOutputStream cout = new CheckedOutputStream()
225229
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
226230
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
227231
serialize(dt, sessions, oa, header);
228232
long val = crcOut.getChecksum().getValue();
229233
oa.writeLong(val, "val");
230234
oa.writeString("/", "path");
231-
sessOS.flush();
235+
crcOut.flush();
232236
}
233237
} else {
234238
throw new IOException("FileSnap has already been closed");

zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public long restore(DataTree dt, Map<Long, Integer> sessions,
245245
}
246246
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
247247
* or use Map on save() */
248-
save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
248+
save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
249249
/* return a zxid of zero, since we the database is empty */
250250
return 0;
251251
}
@@ -394,16 +394,18 @@ public long getLastLoggedZxid() {
394394
* @param dataTree the datatree to be serialized onto disk
395395
* @param sessionsWithTimeouts the session timeouts to be
396396
* serialized onto disk
397+
* @param syncSnap sync the snapshot immediately after write
397398
* @throws IOException
398399
*/
399400
public void save(DataTree dataTree,
400-
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
401+
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
402+
boolean syncSnap)
401403
throws IOException {
402404
long lastZxid = dataTree.lastProcessedZxid;
403405
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
404406
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
405407
snapshotFile);
406-
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
408+
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
407409

408410
}
409411

zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapShot.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@ long deserialize(DataTree dt, Map<Long, Integer> sessions)
4444
/**
4545
* persist the datatree and the sessions into a persistence storage
4646
* @param dt the datatree to be serialized
47-
* @param sessions
47+
* @param sessions the session timeouts to be serialized
48+
* @param name the object name to store snapshot into
49+
* @param fsync sync the snapshot immediately after write
4850
* @throws IOException
4951
*/
50-
void serialize(DataTree dt, Map<Long, Integer> sessions,
51-
File name)
52+
void serialize(DataTree dt, Map<Long, Integer> sessions,
53+
File name, boolean fsync)
5254
throws IOException;
5355

5456
/**

zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{
383383
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
384384
// For SNAP and TRUNC the snapshot is needed to save that history
385385
boolean snapshotNeeded = true;
386+
boolean syncSnapshot = false;
386387
readPacket(qp);
387388
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
388389
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
@@ -409,6 +410,9 @@ else if (qp.getType() == Leader.SNAP) {
409410
throw new IOException("Missing signature");
410411
}
411412
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
413+
414+
// immediately persist the latest snapshot when there is txn log gap
415+
syncSnapshot = true;
412416
} else if (qp.getType() == Leader.TRUNC) {
413417
//we need to truncate the log to the lastzxid of the leader
414418
LOG.warn("Truncating log to get in sync with the leader 0x"
@@ -535,7 +539,7 @@ else if (qp.getType() == Leader.SNAP) {
535539
}
536540
}
537541
if (isPreZAB1_0) {
538-
zk.takeSnapshot();
542+
zk.takeSnapshot(syncSnapshot);
539543
self.setCurrentEpoch(newEpoch);
540544
}
541545
self.setZooKeeperServer(zk);
@@ -555,7 +559,7 @@ else if (qp.getType() == Leader.SNAP) {
555559
}
556560

557561
if (snapshotNeeded) {
558-
zk.takeSnapshot();
562+
zk.takeSnapshot(syncSnapshot);
559563
}
560564

561565
self.setCurrentEpoch(newEpoch);

zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversa
366366
Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
367367

368368
// Generate snapshot and close files.
369-
snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
369+
snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false);
370370
snapLog.close();
371371

372372
QuorumPeer peer = createQuorumPeer(tmpDir);
@@ -649,7 +649,7 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa,
649649
Assert.assertEquals(1, f.self.getAcceptedEpoch());
650650
Assert.assertEquals(1, f.self.getCurrentEpoch());
651651
//Make sure that we did take the snapshot now
652-
verify(f.zk).takeSnapshot();
652+
verify(f.zk).takeSnapshot(true);
653653
Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
654654

655655
// Make sure the data was recorded in the filesystem ok
@@ -1246,7 +1246,7 @@ public void testInitialAcceptedCurrent() throws Exception {
12461246
FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
12471247
File version2 = new File(tmpDir, "version-2");
12481248
version2.mkdir();
1249-
logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>());
1249+
logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false);
12501250
long zxid = ZxidUtils.makeZxid(3, 3);
12511251
logFactory.append(new Request(1, 1, ZooDefs.OpCode.error,
12521252
new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error),

zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void testTruncationStreamReset() throws Exception {
7777
ZKDatabase zkdb = new ZKDatabase(snaplog);
7878
// make sure to snapshot, so that we have something there when
7979
// truncateLog reloads the db
80-
snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts());
80+
snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false);
8181

8282
for (int i = 1; i <= 100; i++) {
8383
append(zkdb, i);

0 commit comments

Comments
 (0)