Skip to content

Commit 13b26dc

Browse files
authored
Add support for registering consumer at specific Message ID (#7)
1 parent d95817e commit 13b26dc

File tree

4 files changed

+87
-23
lines changed

4 files changed

+87
-23
lines changed

.go-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.12.7
1+
1.14.2

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ addons:
55
apt:
66
packages:
77
- redis-server
8-
go: "1.12.7"
8+
go: "1.14.2"
99
env:
1010
- GO111MODULE=on
1111
install:

consumer.go

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import (
1414
// and process Messages.
1515
type ConsumerFunc func(*Message) error
1616

17+
type registeredConsumer struct {
18+
fn ConsumerFunc
19+
id string
20+
}
21+
1722
// ConsumerOptions provide options to configure the Consumer.
1823
type ConsumerOptions struct {
1924
// Name sets the name of this consumer. This will be used when fetching from
@@ -62,12 +67,12 @@ type Consumer struct {
6267
// channels.
6368
Errors chan error
6469

65-
options *ConsumerOptions
66-
redis *redis.Client
67-
funcs map[string]ConsumerFunc
68-
streams []string
69-
queue chan *Message
70-
wg *sync.WaitGroup
70+
options *ConsumerOptions
71+
redis *redis.Client
72+
consumers map[string]registeredConsumer
73+
streams []string
74+
queue chan *Message
75+
wg *sync.WaitGroup
7176

7277
stopReclaim chan struct{}
7378
stopPoll chan struct{}
@@ -118,25 +123,43 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
118123
return &Consumer{
119124
Errors: make(chan error),
120125

121-
options: options,
122-
redis: r,
123-
funcs: map[string]ConsumerFunc{},
124-
streams: make([]string, 0),
125-
queue: make(chan *Message, options.BufferSize),
126-
wg: &sync.WaitGroup{},
126+
options: options,
127+
redis: r,
128+
consumers: make(map[string]registeredConsumer),
129+
streams: make([]string, 0),
130+
queue: make(chan *Message, options.BufferSize),
131+
wg: &sync.WaitGroup{},
127132

128133
stopReclaim: make(chan struct{}, 1),
129134
stopPoll: make(chan struct{}, 1),
130135
stopWorkers: make(chan struct{}, options.Concurrency),
131136
}, nil
132137
}
133138

139+
// RegisterWithLastID is the same as Register, except that it also lets you
140+
// specify the oldest message to receive when first creating the consumer group.
141+
// This can be any valid message ID, "0" for all messages in the stream, or "$"
142+
// for only new messages.
143+
//
144+
// If the consumer group already exists the id field is ignored, meaning you'll
145+
// receive unprocessed messages.
146+
func (c *Consumer) RegisterWithLastID(stream string, id string, fn ConsumerFunc) {
147+
if len(id) == 0 {
148+
id = "0"
149+
}
150+
151+
c.consumers[stream] = registeredConsumer{
152+
fn: fn,
153+
id: id,
154+
}
155+
}
156+
134157
// Register takes in a stream name and a ConsumerFunc that will be called when a
135158
// message comes in from that stream. Register must be called at least once
136159
// before Run is called. If the same stream name is passed in twice, the first
137160
// ConsumerFunc is overwritten by the second.
138161
func (c *Consumer) Register(stream string, fn ConsumerFunc) {
139-
c.funcs[stream] = fn
162+
c.RegisterWithLastID(stream, "0", fn)
140163
}
141164

142165
// Run starts all of the worker goroutines and starts processing from the
@@ -146,22 +169,22 @@ func (c *Consumer) Register(stream string, fn ConsumerFunc) {
146169
// creating the consumer group in Redis. Run will block until Shutdown is called
147170
// and all of the in-flight messages have been processed.
148171
func (c *Consumer) Run() {
149-
if len(c.funcs) == 0 {
172+
if len(c.consumers) == 0 {
150173
c.Errors <- errors.New("at least one consumer function needs to be registered")
151174
return
152175
}
153176

154-
for stream := range c.funcs {
177+
for stream, consumer := range c.consumers {
155178
c.streams = append(c.streams, stream)
156-
err := c.redis.XGroupCreateMkStream(stream, c.options.GroupName, "0").Err()
179+
err := c.redis.XGroupCreateMkStream(stream, c.options.GroupName, consumer.id).Err()
157180
// ignoring the BUSYGROUP error makes this a noop
158181
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
159182
c.Errors <- errors.Wrap(err, "error creating consumer group")
160183
return
161184
}
162185
}
163186

164-
for i := 0; i < len(c.funcs); i++ {
187+
for i := 0; i < len(c.consumers); i++ {
165188
c.streams = append(c.streams, ">")
166189
}
167190

@@ -214,7 +237,7 @@ func (c *Consumer) reclaim() {
214237
c.stopPoll <- struct{}{}
215238
return
216239
case <-ticker.C:
217-
for stream := range c.funcs {
240+
for stream := range c.consumers {
218241
start := "-"
219242
end := "+"
220243

@@ -373,7 +396,6 @@ func (c *Consumer) process(msg *Message) (err error) {
373396
err = errors.Errorf("ConsumerFunc panic: %v", r)
374397
}
375398
}()
376-
fn := c.funcs[msg.Stream]
377-
err = fn(msg)
399+
err = c.consumers[msg.Stream].fn(msg)
378400
return
379401
}

consumer_test.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,52 @@ func TestRegister(t *testing.T) {
7777

7878
c.Register(tt.Name(), fn)
7979

80-
assert.Len(tt, c.funcs, 1)
80+
assert.Len(tt, c.consumers, 1)
8181
})
8282
}
8383

84+
func TestRegisterWithLastID(t *testing.T) {
85+
fn := func(msg *Message) error {
86+
return nil
87+
}
88+
89+
tests := []struct {
90+
name string
91+
stream string
92+
id string
93+
want map[string]registeredConsumer
94+
}{
95+
{
96+
name: "custom_id",
97+
id: "42",
98+
want: map[string]registeredConsumer{
99+
"test": {id: "42", fn: fn},
100+
},
101+
},
102+
{
103+
name: "no_id",
104+
id: "",
105+
want: map[string]registeredConsumer{
106+
"test": {id: "0", fn: fn},
107+
},
108+
},
109+
}
110+
111+
for _, tt := range tests {
112+
t.Run(tt.name, func(t *testing.T) {
113+
c, err := NewConsumer()
114+
require.NoError(t, err)
115+
116+
c.RegisterWithLastID("test", tt.id, fn)
117+
118+
assert.Len(t, c.consumers, 1)
119+
assert.Contains(t, c.consumers, "test")
120+
assert.Equal(t, c.consumers["test"].id, tt.want["test"].id)
121+
assert.NotNil(t, c.consumers["test"].fn)
122+
})
123+
}
124+
}
125+
84126
func TestRun(t *testing.T) {
85127
t.Run("sends an error if no ConsumerFuncs are registered", func(tt *testing.T) {
86128
c, err := NewConsumer()

0 commit comments

Comments
 (0)