Skip to content

Commit c7c76d8

Browse files
committed
using single crdt service in Standalone mode which slightly reduced the running overhead
1 parent 72552ed commit c7c76d8

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

build/build-bifromq-starters/src/main/java/com/baidu/bifromq/starter/StandaloneStarter.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public class StandaloneStarter extends BaseEngineStarter<StandaloneConfig> {
104104
private EventCollectorManager eventCollectorMgr;
105105
private SettingProviderManager settingProviderMgr;
106106
private IAgentHost agentHost;
107-
private ICRDTService clientCrdtService;
108-
private ICRDTService serverCrdtService;
107+
private ICRDTService crdtService;
109108
private IRPCServer sharedIORpcServer;
110109
private IRPCServer sharedBaseKVRpcServer;
111110
private ISessionDictClient sessionDictClient;
@@ -202,10 +201,8 @@ protected void init(StandaloneConfig config) {
202201
agentHost = IAgentHost.newInstance(agentHostOptions);
203202
agentHost.start();
204203
log.debug("Agent host started");
205-
clientCrdtService = ICRDTService.newInstance(CRDTServiceOptions.builder().build());
206-
clientCrdtService.start(agentHost);
207-
serverCrdtService = ICRDTService.newInstance(CRDTServiceOptions.builder().build());
208-
serverCrdtService.start(agentHost);
204+
crdtService = ICRDTService.newInstance(CRDTServiceOptions.builder().build());
205+
crdtService.start(agentHost);
209206
log.debug("CRDT service started");
210207

211208
EventLoopGroup rpcServerBossELG =
@@ -225,14 +222,14 @@ protected void init(StandaloneConfig config) {
225222
.port(config.getRpcServerConfig().getPort())
226223
.bossEventLoopGroup(rpcServerBossELG)
227224
.workerEventLoopGroup(ioRPCWorkerELG)
228-
.crdtService(serverCrdtService)
225+
.crdtService(crdtService)
229226
.executor(rpcServerExecutor);
230227
RPCServerBuilder sharedBaseKVRPCServerBuilder = IRPCServer.newBuilder()
231228
.host(config.getBaseKVServerConfig().getHost())
232229
.port(config.getBaseKVServerConfig().getPort())
233230
.bossEventLoopGroup(rpcServerBossELG)
234231
.workerEventLoopGroup(kvRPCWorkerELG)
235-
.crdtService(serverCrdtService)
232+
.crdtService(crdtService)
236233
.executor(baseKVServerExecutor);
237234
if (config.getRpcServerConfig().isEnableSSL()) {
238235
sharedIORPCServerBuilder.sslContext(buildRPCServerSslContext(config.getRpcServerConfig().getSslConfig()));
@@ -246,18 +243,18 @@ protected void init(StandaloneConfig config) {
246243
SslContext baseKVClientSslContext = config.getBaseKVClientConfig().isEnableSSL()
247244
? buildRPCClientSslContext(config.getBaseKVClientConfig().getSslConfig()) : null;
248245
distClient = IDistClient.newBuilder()
249-
.crdtService(clientCrdtService)
246+
.crdtService(crdtService)
250247
.executor(rpcClientExecutor)
251248
.sslContext(rpcClientSslContext)
252249
.build();
253250
retainClient = IRetainClient.newBuilder()
254-
.crdtService(clientCrdtService)
251+
.crdtService(crdtService)
255252
.executor(rpcClientExecutor)
256253
.sslContext(rpcClientSslContext)
257254
.build();
258255
retainStoreClient = IBaseKVStoreClient.newBuilder()
259256
.clusterId(IRetainStore.CLUSTER_NAME)
260-
.crdtService(clientCrdtService)
257+
.crdtService(crdtService)
261258
.executor(baseKVClientExecutor)
262259
.sslContext(baseKVClientSslContext)
263260
.queryPipelinesPerStore(config
@@ -269,7 +266,7 @@ protected void init(StandaloneConfig config) {
269266
.rpcServerBuilder(sharedBaseKVRPCServerBuilder)
270267
.bootstrap(config.isBootstrap())
271268
.agentHost(agentHost)
272-
.crdtService(serverCrdtService)
269+
.crdtService(crdtService)
273270
.storeClient(retainStoreClient)
274271
.queryExecutor(MoreExecutors.directExecutor())
275272
.tickerThreads(config.getStateStoreConfig().getTickerThreads())
@@ -296,13 +293,13 @@ protected void init(StandaloneConfig config) {
296293
.getWalEngineConfig(), "retain_wal")))
297294
.build();
298295
inboxClient = IInboxClient.newBuilder()
299-
.crdtService(clientCrdtService)
296+
.crdtService(crdtService)
300297
.executor(rpcClientExecutor)
301298
.sslContext(rpcClientSslContext)
302299
.build();
303300
inboxStoreClient = IBaseKVStoreClient.newBuilder()
304301
.clusterId(IInboxStore.CLUSTER_NAME)
305-
.crdtService(clientCrdtService)
302+
.crdtService(crdtService)
306303
.executor(baseKVClientExecutor)
307304
.sslContext(baseKVClientSslContext)
308305
.queryPipelinesPerStore(config.getStateStoreConfig().getInboxStoreConfig()
@@ -312,7 +309,7 @@ protected void init(StandaloneConfig config) {
312309
.rpcServerBuilder(sharedBaseKVRPCServerBuilder)
313310
.bootstrap(config.isBootstrap())
314311
.agentHost(agentHost)
315-
.crdtService(serverCrdtService)
312+
.crdtService(crdtService)
316313
.inboxClient(inboxClient)
317314
.storeClient(inboxStoreClient)
318315
.settingProvider(settingProviderMgr)
@@ -354,7 +351,7 @@ protected void init(StandaloneConfig config) {
354351
.build();
355352
distWorkerClient = IBaseKVStoreClient.newBuilder()
356353
.clusterId(IDistWorker.CLUSTER_NAME)
357-
.crdtService(clientCrdtService)
354+
.crdtService(crdtService)
358355
.executor(baseKVClientExecutor)
359356
.sslContext(baseKVClientSslContext)
360357
.queryPipelinesPerStore(config
@@ -363,15 +360,15 @@ protected void init(StandaloneConfig config) {
363360
.getQueryPipelinePerStore())
364361
.build();
365362
mqttBrokerClient = IMqttBrokerClient.newBuilder()
366-
.crdtService(clientCrdtService)
363+
.crdtService(crdtService)
367364
.executor(rpcClientExecutor)
368365
.sslContext(rpcClientSslContext)
369366
.build();
370367

371368
subBrokerManager = new SubBrokerManager(pluginMgr, mqttBrokerClient, inboxClient);
372369

373370
sessionDictClient = ISessionDictClient.newBuilder()
374-
.crdtService(clientCrdtService)
371+
.crdtService(crdtService)
375372
.executor(rpcClientExecutor)
376373
.sslContext(rpcClientSslContext)
377374
.build();
@@ -390,7 +387,7 @@ protected void init(StandaloneConfig config) {
390387
.rpcServerBuilder(sharedBaseKVRPCServerBuilder)
391388
.bootstrap(config.isBootstrap())
392389
.agentHost(agentHost)
393-
.crdtService(serverCrdtService)
390+
.crdtService(crdtService)
394391
.eventCollector(eventCollectorMgr)
395392
.resourceThrottler(resourceThrottlerMgr)
396393
.distClient(distClient)
@@ -421,7 +418,7 @@ protected void init(StandaloneConfig config) {
421418

422419
distServer = IDistServer.nonStandaloneBuilder()
423420
.rpcServerBuilder(sharedIORPCServerBuilder)
424-
.crdtService(serverCrdtService)
421+
.crdtService(crdtService)
425422
.distWorkerClient(distWorkerClient)
426423
.settingProvider(settingProviderMgr)
427424
.eventCollector(eventCollectorMgr)
@@ -572,8 +569,7 @@ public void stop() {
572569
sessionDictClient.stop();
573570
sessionDictServer.shutdown();
574571

575-
clientCrdtService.stop();
576-
serverCrdtService.stop();
572+
crdtService.stop();
577573

578574
agentHost.shutdown();
579575

0 commit comments

Comments
 (0)