Skip to content

Commit a722da3

Browse files
committed
Bindings: Move queue declaration settings to the binding
Options for the queue and consumer configuration on the bindings are now moved to the Binding config struct. This enable the user to have different options for different handlers. This aligns more with how AMQP generally. We also default to using qourum-queues when creating bindings. The PrefetchCount configuration is also moved to the binding. This allows for different prefetch counts for different bindings. Exchanges are now declared as a seperate step due to the fact that RabbitMQ comes with all the default exchanges ready to be used by default. Those exchanges have now gotten constants. Most often, users will not need to declare any exchanges at all. The Mandatory flag is now set on the request instead of on the client as a whole. The Immediate flag was removed in RabbitMQ 3 and was never really supported by this library. ResponseWriter no longer has the option to set the Mandatory flag. Mandatory would close the server connection if the client is no longer running and the reply-to queue removed. Connections are now named. Allowing them to be identified in the management ui. The FanoutBinding convenience function is removed. Fanout exchanges are a bit weird and it is probably better to use a topic binding instead.
1 parent d8c223f commit a722da3

22 files changed

+664
-611
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
CURL ?= curl
22
DOCKER_COMPOSE = docker-compose
3-
GOLANGCI_VERSION = v1.58.1
3+
GOLANGCI_VERSION = v1.61.0
44
GOPATH = $(shell go env GOPATH)
55

66
all: lint test ## Run linting and testing

README.md

Lines changed: 22 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
## Description
2323

24-
This is a framework to use [RabbitMQ] as a client/server RPC setup togheter with
25-
the [Go amqp] implementation. The framework can manage a fully funcitonal
24+
This is a framework to use [RabbitMQ] as a client/server RPC setup together with
25+
the [Go amqp] implementation. The framework can manage a fully functional
2626
message queue setup with reconnects, disconnects, graceful shutdown and other
2727
stability mechanisms. By providing this RabbitMQ can be used as transport and
2828
service discovery to quickly get up and running with a micro service
@@ -35,7 +35,7 @@ nomenclature is unique for RabbitMQ some prior experience is preferred.
3535

3636
## Project status
3737

38-
This project has been used in production since october 2018 handling millions of
38+
This project has been used in production since October 2018 handling millions of
3939
requests both as server and client.
4040

4141
## Server
@@ -44,7 +44,7 @@ The server is inspired by the HTTP library where the user maps a [RabbitMQ
4444
binding] to a handler function. A response writer is passed to the handler which
4545
may be used as an `io.Writer` to write the response.
4646

47-
This is an an example of how to get up and running with a server responding to
47+
This is an example of how to get up and running with a server responding to
4848
all messages published to the given routing key.
4949

5050
```go
@@ -67,7 +67,6 @@ type.
6767

6868
```go
6969
server.Bind(DirectBinding("routing_key", handleFunc))
70-
server.Bind(FanoutBinding("fanout-exchange", handleFunc))
7170
server.Bind(TopicBinding("queue-name", "routing_key.#", handleFunc))
7271
server.Bind(HeadersBinding("queue-name", amqp.Table{"x-match": "all", "foo": "bar"}, handleFunc))
7372
```
@@ -76,14 +75,9 @@ If the default variables doesn't result in the desired result you can setup the
7675
binding with the type manually.
7776

7877
```go
79-
customBinding := HandlerBinding{
80-
QueueName: "oh-sweet-queue",
81-
ExchangeName: "my-exchange",
82-
ExchangeType: ExchangeDirect,
83-
RoutingKey: "my-key",
84-
BindHeaders: amqp.Table{},
85-
Handler: handleFunc,
86-
}
78+
customBinding := CreateBinding("oh-sweet-queue", DefaultExchangeNameDirect, handleFunc).
79+
WithPrefetchCount(100).
80+
WithAutoAck(false)
8781

8882
server.Bind(customBinding)
8983
```
@@ -94,18 +88,13 @@ can be changed by calling chainable methods.
9488

9589
```go
9690
server := NewServer("amqp://guest:guest@localhost:5672").
97-
WithConsumeSettings(ConsumeSettings{}).
98-
WithQueueDeclareSettings(QueueDeclareSettings{}).
99-
WithExchangeDeclareSettings(ExchangeDeclareSettings{}).
10091
WithDebugLogger(log.Printf).
10192
WithErrorLogger(log.Printf).
102-
WithDialConfig(amqp.Config{}).
10393
WithTLS(&tls.Config{})
10494
```
10595

106-
QoS is by default set to a prefetch count of `10` and a prefetch size of `0` (no
107-
limit). If you want to change this you can use the
108-
`WithConsumeSettings(settings)` function.
96+
QoS is by default set to a prefetch count of `10`. If you want to change this
97+
you can modify the binding by setting the `PrefetchCount` to something else.
10998

11099
## Client
111100

@@ -142,7 +131,7 @@ methods.
142131
client := NewClient("amqp://guest:guest@localhost:5672").
143132
WithTimeout(5000 * time.Milliseconds)
144133

145-
// Will not connect and may be changed untill this call.
134+
// Will not connect and may be changed until this call.
146135
client.Send(NewRequest().WithRoutingKey("routing_key"))
147136
```
148137

@@ -154,10 +143,8 @@ client := NewClient("amqp://guest:guest@localhost:5672").
154143
WithErrorLogger(log.Printf).
155144
WithDialConfig(amqp.Config{}).
156145
WithTLS(&tls.Config{}).
157-
WithQueueDeclareSettings(QueueDeclareSettings{}).
158-
WithConsumeSettings(ConsumeSettings{}).
159-
WithPublishSettings(PublishSettings{}).
160-
WithConfirmMode(true),
146+
WithReplyToConsumerArgs(amqp.Table{}).
147+
WithConfirmMode(false),
161148
WithTimeout(10 * Time.Second)
162149
```
163150

@@ -170,14 +157,12 @@ can read more here](https://www.rabbitmq.com/confirms.html#publisher-confirms)
170157

171158
The client is set in confirm mode by default.
172159

173-
You can use `WithPublishSettings` or `WithConfirmMode` to control this setting.
160+
You can use `WithConfirmMode` to control this setting. It defaults to `true`.
174161

175162
```go
176163
client := NewClient("amqp://guest:guest@localhost:5672").
177164
WithConfirmMode(true)
178165

179-
client := NewClient("amqp://guest:guest@localhost:5672").
180-
WithPublishSettings(true)
181166
```
182167

183168
### Request
@@ -195,8 +180,8 @@ request := NewRequest().
195180
WithExchange("custom.exchange").
196181
WithRoutingKey("routing_key").
197182
WithHeaders(amqp.Headers{}).
198-
WithCorrelationID("custom-correlation-id").
199183
WithTimeout(5 * time.Second).
184+
WithMandatory(true).
200185
WithResponse(true)
201186
```
202187

@@ -219,10 +204,6 @@ if err != nil {
219204
}
220205
```
221206

222-
**Note**: If you request a response when sending to a fanout exchange the
223-
response will be the first one responded from any of the subscribers. There's
224-
currently no way to stream multiple responses for the same request.
225-
226207
### Sender
227208

228209
The client invokes a default `SendFunc` while calling `Send()` where all the
@@ -352,7 +333,7 @@ func myMiddleware(next SendFunc) SendFunc {
352333
client := NewClient("amqp://guest:guest@localhost:5672").
353334
AddMiddleware(myMiddleware)
354335

355-
// Add the middleware to a singlerequest
336+
// Add the middleware to a single request
356337
reuqest := NewRequest().
357338
WithRoutingKey("routing_key").
358339
AddMiddleware(myMiddleware)
@@ -371,38 +352,25 @@ For more examples of client middlewares, see [examples/middleware].
371352
You often want to know when a connection has been established and when it comes
372353
to RabbitMQ also perform some post connection setup. This is enabled by the fact
373354
that both the server and the client holds a list of `OnStarted`. The function
374-
receives the incomming connection, outgoing connection, incomming channel and
355+
receives the incoming connection, outgoing connection, incoming channel and
375356
outgoing channel.
376357

377358
```go
378359
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
379360
```
380361

381-
As an example this is a great place to do some initial QoS setup.
382-
383362
```go
384-
server := NewServer("amqp://guest:guest@localhost:5672")
385-
386-
setupQoS(_, _ *amqp.Connection, inChan, _ *amqp.Channel) {
387-
err := inChan.Qos(
388-
10, // Prefetch count
389-
1024, // Prefetch size
390-
true, // Global
391-
)
392-
393-
if err != nil {
394-
panic(err.Error())
395-
}
363+
server := NewServer("amqp://guest:guest@localhost:5672").
364+
OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
365+
// Do something after connection here...
366+
})
396367
}
397368

398-
// Setup QoS when the connection is established.
399-
server.OnStarted(setupQoS)
400-
401369
server.ListenAndServe()
402370
```
403371

404372
Both the server and the client follow the recommendations for [RabbitMQ
405-
connections] which means separate connections for incomming and outgoing traffic
373+
connections] which means separate connections for incoming and outgoing traffic
406374
and separate channels for consuming and publishing messages. Because of this the
407375
signature looks the same way for both the server and the client.
408376

@@ -431,7 +399,7 @@ server.ListenAndServe()
431399

432400
## Logging
433401

434-
You can specifiy two optional loggers for debugging and errors or unexpected
402+
You can specify two optional loggers for debugging and errors or unexpected
435403
behaviour. By default only error logging is turned on and is logged via the log
436404
package's standard logging.
437405

benchmark_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,7 @@ func Benchmark(b *testing.B) {
2727
fastClient := NewClient(testURL).
2828
WithErrorLogger(log.Printf).
2929
WithTimeout(3 * time.Minute).
30-
WithPublishSettings(PublishSettings{
31-
Mandatory: true,
32-
Immediate: false,
33-
ConfirmMode: false,
34-
})
30+
WithConfirmMode(false)
3531

3632
defer fastClient.Stop()
3733

0 commit comments

Comments
 (0)