Skip to content

Commit 0165ca3

Browse files
committedMay 28, 2025
Fix for cancellation token registration in case of already canceled token #312
1 parent 582683b commit 0165ca3

File tree

3 files changed

+60
-30
lines changed

3 files changed

+60
-30
lines changed
 

‎src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -754,18 +754,25 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
754754
if incomingMessages.Count > 0 then
755755
replyWithMessage channel <| dequeueMessage()
756756
else
757+
let mutable synchronouslyCanceled = false
757758
let tokenRegistration =
758759
if cancellationToken.CanBeCanceled then
759-
let rec cancellationTokenRegistration =
760+
let mutable cancellationTokenRegistration = None
761+
cancellationTokenRegistration <-
760762
cancellationToken.Register((fun () ->
761763
Log.Logger.LogDebug("{0} receive cancelled", prefix)
762-
post this.Mb (CancelWaiter(cancellationTokenRegistration, channel))
764+
match cancellationTokenRegistration with
765+
| None -> synchronouslyCanceled <- true
766+
| Some _ -> post this.Mb (CancelWaiter(cancellationTokenRegistration, channel))
763767
), false) |> Some
764768
cancellationTokenRegistration
765769
else
766770
None
767-
waiters.AddLast(struct(tokenRegistration, channel)) |> ignore
768-
Log.Logger.LogDebug("{0} Receive waiting", prefix)
771+
if synchronouslyCanceled then
772+
channel.SetCanceled()
773+
else
774+
waiters.AddLast(struct(tokenRegistration, channel)) |> ignore
775+
Log.Logger.LogDebug("{0} Receive waiting", prefix)
769776

770777
let batchReceive (receiveCallbacks: ReceiveCallbacks<'T>) =
771778
Log.Logger.LogDebug("{0} BatchReceive", prefix)
@@ -778,23 +785,32 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
778785
replyWithBatch channel
779786
else
780787
let batchCts = new CancellationTokenSource()
781-
let registration =
788+
let mutable synchronouslyCanceled = false
789+
let tokenRegistration =
782790
if cancellationToken.CanBeCanceled then
783-
let rec cancellationTokenRegistration =
791+
let mutable cancellationTokenRegistration = None
792+
cancellationTokenRegistration <-
784793
cancellationToken.Register((fun () ->
785794
Log.Logger.LogDebug("{0} batch receive cancelled", prefix)
786-
post this.Mb (CancelBatchWaiter(batchCts, cancellationTokenRegistration, channel))
795+
match cancellationTokenRegistration with
796+
| None -> synchronouslyCanceled <- true
797+
| Some _ -> post this.Mb (CancelBatchWaiter(batchCts, cancellationTokenRegistration, channel))
787798
), false) |> Some
788799
cancellationTokenRegistration
789800
else
790801
None
791-
batchWaiters.AddLast(struct(batchCts, registration, channel)) |> ignore
792-
asyncDelay
793-
consumerConfig.BatchReceivePolicy.Timeout
794-
(fun () ->
795-
if not batchCts.IsCancellationRequested then
796-
post this.Mb SendBatchByTimeout)
797-
Log.Logger.LogDebug("{0} BatchReceive waiting", prefix)
802+
if synchronouslyCanceled then
803+
channel.SetCanceled()
804+
else
805+
batchWaiters.AddLast(struct(batchCts, tokenRegistration, channel)) |> ignore
806+
asyncDelay
807+
consumerConfig.BatchReceivePolicy.Timeout
808+
(fun () ->
809+
if not batchCts.IsCancellationRequested then
810+
post this.Mb SendBatchByTimeout
811+
else
812+
batchCts.Dispose())
813+
Log.Logger.LogDebug("{0} BatchReceive waiting", prefix)
798814

799815
let consumerOperations = {
800816
MessageReceived = fun rawMessage -> post this.Mb (MessageReceived rawMessage)

‎src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -539,18 +539,25 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
539539
if incomingMessages.Count > 0 then
540540
replyWithMessage channel <| dequeueMessage()
541541
else
542+
let mutable synchronouslyCanceled = false
542543
let tokenRegistration =
543544
if cancellationToken.CanBeCanceled then
544-
let rec cancellationTokenRegistration =
545+
let mutable cancellationTokenRegistration = None
546+
cancellationTokenRegistration <-
545547
cancellationToken.Register((fun () ->
546548
Log.Logger.LogDebug("{0} receive cancelled", prefix)
547-
post this.Mb (CancelWaiter(cancellationTokenRegistration, channel))
549+
match cancellationTokenRegistration with
550+
| None -> synchronouslyCanceled <- true
551+
| Some _ -> post this.Mb (CancelWaiter(cancellationTokenRegistration, channel))
548552
), false) |> Some
549553
cancellationTokenRegistration
550554
else
551555
None
552-
waiters.AddLast(struct(tokenRegistration, channel)) |> ignore
553-
Log.Logger.LogDebug("{0} Receive waiting", prefix)
556+
if synchronouslyCanceled then
557+
channel.SetCanceled()
558+
else
559+
waiters.AddLast(struct(tokenRegistration, channel)) |> ignore
560+
Log.Logger.LogDebug("{0} Receive waiting", prefix)
554561

555562
let batchReceive (receiveCallbacks: ReceiveCallbacks<'T>) =
556563
Log.Logger.LogDebug("{0} BatchReceive", prefix)
@@ -563,26 +570,33 @@ type internal MultiTopicsConsumerImpl<'T> (consumerConfig: ConsumerConfiguration
563570
replyWithBatch channel
564571
else
565572
let batchCts = new CancellationTokenSource()
573+
let mutable synchronouslyCanceled = false
566574
let registration =
567575
if cancellationToken.CanBeCanceled then
568-
let rec cancellationTokenRegistration =
576+
let mutable cancellationTokenRegistration = None
577+
cancellationTokenRegistration <-
569578
cancellationToken.Register((fun () ->
570579
Log.Logger.LogDebug("{0} batch receive cancelled", prefix)
571-
post this.Mb (CancelBatchWaiter(batchCts, cancellationTokenRegistration, channel))
580+
match cancellationTokenRegistration with
581+
| None -> synchronouslyCanceled <- true
582+
| Some _ -> post this.Mb (CancelBatchWaiter(batchCts, cancellationTokenRegistration, channel))
572583
), false)
573584
|> Some
574585
cancellationTokenRegistration
575586
else
576587
None
577-
batchWaiters.AddLast(struct(batchCts, registration, channel)) |> ignore
578-
asyncDelay
579-
consumerConfig.BatchReceivePolicy.Timeout
580-
(fun () ->
581-
if not batchCts.IsCancellationRequested then
582-
post this.Mb SendBatchByTimeout
583-
else
584-
batchCts.Dispose())
585-
Log.Logger.LogDebug("{0} BatchReceive waiting", prefix)
588+
if synchronouslyCanceled then
589+
channel.SetCanceled()
590+
else
591+
batchWaiters.AddLast(struct(batchCts, registration, channel)) |> ignore
592+
asyncDelay
593+
consumerConfig.BatchReceivePolicy.Timeout
594+
(fun () ->
595+
if not batchCts.IsCancellationRequested then
596+
post this.Mb SendBatchByTimeout
597+
else
598+
batchCts.Dispose())
599+
Log.Logger.LogDebug("{0} BatchReceive waiting", prefix)
586600

587601
let runPoller (ct: CancellationToken) =
588602
(Task.Run<unit>(fun () ->

‎src/Pulsar.Client/Pulsar.Client.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<WarningLevel>3</WarningLevel>
3434
<WarningsAsErrors>3239;0025</WarningsAsErrors>
3535
<DebugType>portable</DebugType>
36-
<NoWarn>3186;40</NoWarn>
36+
<NoWarn>3186</NoWarn>
3737
<PackageIcon>pulsar-client-dotnet.png</PackageIcon>
3838
<PackageIconUrl />
3939
</PropertyGroup>

0 commit comments

Comments
 (0)