Skip to content

Consumer not reading messages when batching is disabled #271

Open
@chriscameron-vertexinc

Description

@chriscameron-vertexinc

Hello!

I'm using a Key_Shared topic to ensure no two consumers receive messages with the same key.

To avoid collisions when scaling consumers up/down I'm trying to set my receiver queue size to 0. I don't want to have any prefetch messages, I'd like to make sure I'm only ever consuming one message at a time.

When I produce messages to my topic the consumers throw the following exceptions:

[20:18:24 Error]
consumer(0, b4e1c, -1) Closing consumer due to unsupported received batch-message with zero receiver queue size
[20:18:24 Error]
consumer(0, b4e1c, -1) Batch reading exception 42:0:-1
Pulsar.Client.Api.InvalidMessageException: Unsupported Batch message with 0 size receiver queue for [CNR.ExampleSubscriber.API]-[]
   at Pulsar.Client.Api.ZeroQueueConsumerImpl`1.ReceiveIndividualMessagesFromBatch(RawMessage _arg2, FSharpFunc`2 _arg1) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1699
   at <StartupCode$Pulsar-Client>[email protected](Unit unitVar0) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 728
   at Pulsar.Client.Common.Tools.wrapException[a](FSharpFunc`2 f) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Common\Tools.fs:line 171
[20:18:25 Information]
consumer(0, b4e1c, -1) Message 42:0:-1 was discarded due to BatchDeSerializeError
[20:18:25 Information]
consumer(0, b4e1c, -1) starting close
[20:18:25 Information]
consumer(0, b4e1c, -1) UnackedTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) GroupingTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) NegativeTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) ConnectionHandler ConnectionHandler mailbox has stopped normally
[20:18:25 Error]
Failed to receive message
Pulsar.Client.Api.AlreadyClosedException: Consumer is already closed
   at <StartupCode$Pulsar-Client>[email protected]() in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1439
   at Pulsar.Client.Common.Tools.reraize[a](Exception ex) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Common\Tools.fs:line 67
   at <StartupCode$Pulsar-Client>.$ConsumerImpl.Pulsar-Client-Api-IConsumer<'T>[email protected]() in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1712
   at CNR.Common.Eventing.Pulsar.Subscribers.PulsarEventSubscriber`1.ExecuteAsync(CancellationToken stoppingToken) in C:\Workspace\cnr-common-eventing\src\CNR.Common.Eventing.Pulsar\Subscribers\PulsarEventSubscriber.cs:line 76
[20:18:25 Information]
consumer(0, b4e1c, -1) stopped
[20:18:25 Information]
consumer(0, b4e1c, -1) mailbox has stopped normally

Ok, so we can't read batched messages when only handling 1 message at a time.

When I disable batching on the producer side my consumer no-longer receives any messages at all. When I check pulsar-admin I can see my msgInCounter incrementing, but msgOutCounter never goes up.

As a workaround I've had to enable batching on the producer, and set the receive queue size to 1. Is this going to be adequate for my use case?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions