Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 10acbf9

Browse files
committedJun 13, 2025·
Fix abnormal transmission of session inbox version.
1 parent 1b9499e commit 10acbf9

File tree

3 files changed

+47
-7
lines changed

3 files changed

+47
-7
lines changed
 

‎bifromq-inbox/bifromq-inbox-server/src/main/java/org/apache/bifromq/inbox/server/scheduler/BatchDetachCall.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121

2222
import static java.util.Collections.emptySet;
2323

24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.Map;
27+
import java.util.Queue;
28+
import java.util.Set;
29+
import lombok.extern.slf4j.Slf4j;
2430
import org.apache.bifromq.basekv.client.IMutationPipeline;
2531
import org.apache.bifromq.basekv.client.exception.BadVersionException;
2632
import org.apache.bifromq.basekv.client.exception.TryLaterException;
@@ -37,12 +43,6 @@
3743
import org.apache.bifromq.inbox.storage.proto.InboxServiceRWCoProcInput;
3844
import org.apache.bifromq.inbox.storage.proto.InboxVersion;
3945
import org.apache.bifromq.inbox.storage.proto.Replica;
40-
import java.util.HashMap;
41-
import java.util.HashSet;
42-
import java.util.Map;
43-
import java.util.Queue;
44-
import java.util.Set;
45-
import lombok.extern.slf4j.Slf4j;
4646

4747
@Slf4j
4848
class BatchDetachCall extends BatchMutationCall<DetachRequest, DetachReply> {
@@ -69,10 +69,12 @@ protected RWCoProcInput makeBatch(
6969
BatchDetachRequest.Params.Builder paramsBuilder = BatchDetachRequest.Params.newBuilder()
7070
.setTenantId(request.getClient().getTenantId())
7171
.setInboxId(request.getInboxId())
72-
.setVersion(request.getVersion())
7372
.setExpirySeconds(request.getExpirySeconds())
7473
.setDiscardLWT(request.getDiscardLWT())
7574
.setNow(request.getNow());
75+
if (request.hasVersion()) {
76+
paramsBuilder.setVersion(request.getVersion());
77+
}
7678
reqBuilder.addParams(paramsBuilder.build());
7779
}
7880

‎bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTConnectHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ private CompletableFuture<ExpireResult> expireInbox(long reqId,
387387
if (requestClientId.isEmpty()) {
388388
return CompletableFuture.completedFuture(ExpireResult.NOT_FOUND);
389389
}
390+
// detach and expire the latest version immediately
390391
return inboxClient.detach(DetachRequest.newBuilder()
391392
.setReqId(reqId)
392393
.setInboxId(userSessionId)

‎bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/integration/v5/MQTTConnectTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import static org.mockito.Mockito.when;
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertFalse;
27+
import static org.testng.Assert.assertNull;
28+
import static org.testng.Assert.assertTrue;
2729

2830
import org.apache.bifromq.mqtt.integration.MQTTTest;
2931
import org.apache.bifromq.mqtt.integration.v5.client.MqttTestClient;
@@ -134,4 +136,39 @@ public void forceTransient() {
134136
IMqttToken token = client.connect(connOpts);
135137
assertEquals(token.getResponseProperties().getSessionExpiryInterval(), 0L);
136138
}
139+
140+
/**
141+
* Test whether it can reconnect when cleanStart=true but the session expiration interval is not 0, when reconnecting.
142+
*/
143+
@Test(groups = "integration")
144+
public void reconnectTest() {
145+
when(authProvider.auth(any(MQTT5AuthData.class)))
146+
.thenReturn(CompletableFuture.completedFuture(MQTT5AuthResult.newBuilder()
147+
.setSuccess(Success.newBuilder()
148+
.setTenantId("tenant")
149+
.setUserId("testUser")
150+
.build()).build()));
151+
when(authProvider.checkPermission(any(), any()))
152+
.thenReturn(CompletableFuture.completedFuture(
153+
CheckResult.newBuilder().setGranted(Granted.newBuilder().build()).build()));
154+
when(settingProvider.provide(eq(Setting.ForceTransient), eq("tenant"))).thenReturn(false);
155+
156+
MqttConnectionOptions connOpts = new MqttConnectionOptions();
157+
connOpts.setCleanStart(true);
158+
connOpts.setSessionExpiryInterval(1800L);
159+
connOpts.setUserName("tenant/testUser");
160+
161+
MqttTestClient client1 = new MqttTestClient(BROKER_URI, "client_id");
162+
IMqttToken token1 = client1.connect(connOpts);
163+
assertTrue(token1.isComplete());
164+
assertNull(token1.getException());
165+
client1.disconnect();
166+
167+
// reconnect
168+
MqttTestClient client2 = new MqttTestClient(BROKER_URI, "client_id");
169+
IMqttToken token2 = client2.connect(connOpts);
170+
assertTrue(token2.isComplete());
171+
assertNull(token2.getException());
172+
}
173+
137174
}

0 commit comments

Comments
 (0)
Please sign in to comment.