Skip to content

Commit 66f183f

Browse files
committed
1 parent 3d6c0d1 commit 66f183f

File tree

7 files changed

+136
-2
lines changed

7 files changed

+136
-2
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ static class AuthData {
168168

169169
private final int sessionTimeout;
170170

171+
private final long newSessionTimeout;
172+
171173
private final ZKWatchManager watchManager;
172174

173175
private long sessionId;
@@ -398,6 +400,36 @@ public ClientCnxn(
398400
long sessionId,
399401
byte[] sessionPasswd,
400402
boolean canBeReadOnly
403+
) throws IOException {
404+
this(hostProvider, sessionTimeout, Long.MAX_VALUE, clientConfig, defaultWatcher, clientCnxnSocket, sessionId, sessionPasswd, canBeReadOnly);
405+
}
406+
407+
/**
408+
* Creates a connection object. The actual network connect doesn't get
409+
* established until needed. The start() instance method must be called
410+
* after construction.
411+
*
412+
* @param hostProvider the list of ZooKeeper servers to connect to
413+
* @param sessionTimeout the timeout for connections.
414+
* @param newSessionTimeout the timeout before giving up brand-new session establishment.
415+
* @param clientConfig the client configuration.
416+
* @param defaultWatcher default watcher for this connection
417+
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
418+
* @param sessionId session id if re-establishing session
419+
* @param sessionPasswd session passwd if re-establishing session
420+
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
421+
* @throws IOException in cases of broken network
422+
*/
423+
public ClientCnxn(
424+
HostProvider hostProvider,
425+
int sessionTimeout,
426+
long newSessionTimeout,
427+
ZKClientConfig clientConfig,
428+
Watcher defaultWatcher,
429+
ClientCnxnSocket clientCnxnSocket,
430+
long sessionId,
431+
byte[] sessionPasswd,
432+
boolean canBeReadOnly
401433
) throws IOException {
402434
this.hostProvider = hostProvider;
403435
this.sessionTimeout = sessionTimeout;
@@ -413,6 +445,7 @@ public ClientCnxn(
413445
this.connectTimeout = sessionTimeout / hostProvider.size();
414446
this.readTimeout = sessionTimeout * 2 / 3;
415447
this.expirationTimeout = sessionTimeout * 4 / 3;
448+
this.newSessionTimeout = newSessionTimeout == 0 ? expirationTimeout : newSessionTimeout;
416449

417450
this.sendThread = new SendThread(clientCnxnSocket);
418451
this.eventThread = new EventThread();
@@ -1192,7 +1225,12 @@ public void run() {
11921225
to = connectTimeout - clientCnxnSocket.getIdleSend();
11931226
}
11941227

1195-
int expiration = sessionId == 0 ? Integer.MAX_VALUE : expirationTimeout - clientCnxnSocket.getIdleRecv();
1228+
long expiration;
1229+
if (sessionId == 0) {
1230+
expiration = newSessionTimeout - clientCnxnSocket.getIdleRecv();
1231+
} else {
1232+
expiration = expirationTimeout - clientCnxnSocket.getIdleRecv();
1233+
}
11961234
if (expiration <= 0) {
11971235
String warnInfo = String.format(
11981236
"Client session timed out, have not heard from server in %dms for session id 0x%s",

zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,7 @@ public ZooKeeper(
672672
ClientCnxn createConnection(
673673
HostProvider hostProvider,
674674
int sessionTimeout,
675+
long newSessionTimeout,
675676
ZKClientConfig clientConfig,
676677
Watcher defaultWatcher,
677678
ClientCnxnSocket clientCnxnSocket,
@@ -682,6 +683,7 @@ ClientCnxn createConnection(
682683
return new ClientCnxn(
683684
hostProvider,
684685
sessionTimeout,
686+
newSessionTimeout,
685687
clientConfig,
686688
defaultWatcher,
687689
clientCnxnSocket,
@@ -1111,6 +1113,7 @@ public ZooKeeper(ZooKeeperOptions options) throws IOException {
11111113
cnxn = createConnection(
11121114
hostProvider,
11131115
sessionTimeout,
1116+
options.getNewSessionTimeoutMs(),
11141117
this.clientConfig,
11151118
watcher,
11161119
getClientCnxnSocket(),

zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
public class ZooKeeperBuilder {
4040
private final String connectString;
4141
private final Duration sessionTimeout;
42+
private Duration newSessionTimeout = Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L);
4243
private Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
4344
private Watcher defaultWatcher;
4445
private boolean canBeReadOnly = false;
@@ -128,6 +129,21 @@ public ZooKeeperBuilder withSession(long sessionId, byte[] sessionPasswd) {
128129
return this;
129130
}
130131

132+
/**
133+
* Specifies timeout to establish a brand-new session.
134+
*
135+
* @param timeout timeout to get {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} in establishing a
136+
* brand-new session. {@code Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L)}, which is the default,
137+
* means endless retry until connected. {@code Duration.ZERO} means a sensible value deduced from
138+
* specified session timeout, currently, it is approximate {@code sessionTimeout * 4 / 3}.
139+
* @return this
140+
* @since 3.10.0
141+
*/
142+
public ZooKeeperBuilder withNewSessionTimeout(Duration timeout) {
143+
this.newSessionTimeout = timeout;
144+
return this;
145+
}
146+
131147
/**
132148
* Specifies the client config used to construct ZooKeeper instances.
133149
*
@@ -152,6 +168,7 @@ public ZooKeeperOptions toOptions() {
152168
return new ZooKeeperOptions(
153169
connectString,
154170
sessionTimeout,
171+
newSessionTimeout,
155172
defaultWatcher,
156173
hostProvider,
157174
canBeReadOnly,

zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
public class ZooKeeperOptions {
3434
private final String connectString;
3535
private final Duration sessionTimeout;
36+
private final Duration newSessionTimeout;
3637
private final Watcher defaultWatcher;
3738
private final Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
3839
private final boolean canBeReadOnly;
@@ -42,6 +43,7 @@ public class ZooKeeperOptions {
4243

4344
ZooKeeperOptions(String connectString,
4445
Duration sessionTimeout,
46+
Duration newSessionTimeout,
4547
Watcher defaultWatcher,
4648
Function<Collection<InetSocketAddress>, HostProvider> hostProvider,
4749
boolean canBeReadOnly,
@@ -50,6 +52,7 @@ public class ZooKeeperOptions {
5052
ZKClientConfig clientConfig) {
5153
this.connectString = connectString;
5254
this.sessionTimeout = sessionTimeout;
55+
this.newSessionTimeout = newSessionTimeout;
5356
this.hostProvider = hostProvider;
5457
this.defaultWatcher = defaultWatcher;
5558
this.canBeReadOnly = canBeReadOnly;
@@ -66,6 +69,14 @@ public int getSessionTimeoutMs() {
6669
return (int) Long.min(Integer.MAX_VALUE, sessionTimeout.toMillis());
6770
}
6871

72+
public long getNewSessionTimeoutMs() {
73+
try {
74+
return newSessionTimeout.toMillis();
75+
} catch (ArithmeticException ignored) {
76+
return Long.MAX_VALUE;
77+
}
78+
}
79+
6980
public Watcher getDefaultWatcher() {
7081
return defaultWatcher;
7182
}

zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ class CustomClientCnxn extends ClientCnxn {
282282
public CustomClientCnxn(
283283
HostProvider hostProvider,
284284
int sessionTimeout,
285+
long newSessionTimeout,
285286
ZKClientConfig zkClientConfig,
286287
Watcher defaultWatcher,
287288
ClientCnxnSocket clientCnxnSocket,
@@ -292,6 +293,7 @@ public CustomClientCnxn(
292293
super(
293294
hostProvider,
294295
sessionTimeout,
296+
newSessionTimeout,
295297
zkClientConfig,
296298
defaultWatcher,
297299
clientCnxnSocket,
@@ -357,6 +359,7 @@ public boolean isAlive() {
357359
ClientCnxn createConnection(
358360
HostProvider hostProvider,
359361
int sessionTimeout,
362+
long newSessionTimeout,
360363
ZKClientConfig clientConfig,
361364
Watcher defaultWatcher,
362365
ClientCnxnSocket clientCnxnSocket,
@@ -369,6 +372,7 @@ ClientCnxn createConnection(
369372
ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(
370373
hostProvider,
371374
sessionTimeout,
375+
newSessionTimeout,
372376
clientConfig,
373377
defaultWatcher,
374378
clientCnxnSocket,
@@ -378,4 +382,4 @@ ClientCnxn createConnection(
378382
return ClientCnxnSocketFragilityTest.this.cnxn;
379383
}
380384
}
381-
}
385+
}

zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class CustomClientCnxn extends ClientCnxn {
225225
CustomClientCnxn(
226226
HostProvider hostProvider,
227227
int sessionTimeout,
228+
long newSessionTimeout,
228229
ZKClientConfig clientConfig,
229230
Watcher defaultWatcher,
230231
ClientCnxnSocket clientCnxnSocket,
@@ -235,6 +236,7 @@ class CustomClientCnxn extends ClientCnxn {
235236
super(
236237
hostProvider,
237238
sessionTimeout,
239+
newSessionTimeout,
238240
clientConfig,
239241
defaultWatcher,
240242
clientCnxnSocket,
@@ -286,6 +288,7 @@ public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher
286288
ClientCnxn createConnection(
287289
HostProvider hostProvider,
288290
int sessionTimeout,
291+
long newSessionTimeout,
289292
ZKClientConfig clientConfig,
290293
Watcher defaultWatcher,
291294
ClientCnxnSocket clientCnxnSocket,
@@ -296,6 +299,7 @@ ClientCnxn createConnection(
296299
return new CustomClientCnxn(
297300
hostProvider,
298301
sessionTimeout,
302+
newSessionTimeout,
299303
clientConfig,
300304
defaultWatcher,
301305
clientCnxnSocket,

zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.zookeeper.test;
2020

2121
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.hamcrest.Matchers.greaterThan;
2223
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2324
import static org.hamcrest.Matchers.lessThan;
2425
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -29,6 +30,7 @@
2930
import java.io.IOException;
3031
import java.net.ServerSocket;
3132
import java.net.Socket;
33+
import java.time.Duration;
3234
import java.util.Arrays;
3335
import java.util.List;
3436
import java.util.concurrent.CompletableFuture;
@@ -42,6 +44,7 @@
4244
import org.apache.zookeeper.Watcher;
4345
import org.apache.zookeeper.ZooDefs;
4446
import org.apache.zookeeper.ZooKeeper;
47+
import org.apache.zookeeper.client.ZooKeeperBuilder;
4548
import org.apache.zookeeper.common.Time;
4649
import org.junit.jupiter.api.BeforeEach;
4750
import org.junit.jupiter.api.Test;
@@ -201,6 +204,60 @@ public void testSessionExpirationWhenNoServerUp() throws Exception {
201204
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
202205
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
203206
}
207+
208+
// when: try to establish a brand-new session using builder
209+
watcher.reset();
210+
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(sessionTimeout))
211+
.withDefaultWatcher(watcher)
212+
.build()) {
213+
// then: never Expired
214+
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
215+
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
216+
}
217+
218+
// when: try to establish a brand-new session using builder with null newSessionTimeout
219+
watcher.reset();
220+
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(sessionTimeout))
221+
.withDefaultWatcher(watcher)
222+
.withNewSessionTimeout(null)
223+
.build()) {
224+
// then: never Expired
225+
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
226+
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
227+
}
228+
229+
// when: try to establish a brand-new session using builder with Duration.ZERO newSessionTimeout
230+
watcher.reset();
231+
long start = Time.currentElapsedTime();
232+
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(sessionTimeout))
233+
.withDefaultWatcher(watcher)
234+
.withNewSessionTimeout(Duration.ZERO)
235+
.build()) {
236+
// then: get Expired after some delay
237+
watcher.expired.join();
238+
long elapsed = Time.currentElapsedTime() - start;
239+
assertThat(elapsed, greaterThan((long) sessionTimeout));
240+
assertThat(elapsed, lessThan(sessionTimeout * 10L));
241+
// then: future request will get SessionExpiredException
242+
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
243+
}
244+
245+
// when: try to establish a brand-new session using builder with custom newSessionTimeout
246+
watcher.reset();
247+
start = Time.currentElapsedTime();
248+
Duration newSessionTimeout = Duration.ofMillis(300);
249+
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, Duration.ofMillis(30000))
250+
.withDefaultWatcher(watcher)
251+
.withNewSessionTimeout(newSessionTimeout)
252+
.build()) {
253+
// then: get Expired after newSessionTimeout
254+
watcher.expired.join();
255+
long elapsed = Time.currentElapsedTime() - start;
256+
assertThat(elapsed, greaterThanOrEqualTo(newSessionTimeout.toMillis()));
257+
assertThat(elapsed, lessThan(newSessionTimeout.toMillis() * 10));
258+
// then: future request will get SessionExpiredException
259+
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
260+
}
204261
}
205262

206263
@Test

0 commit comments

Comments
 (0)