Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit cb9c294

Browse files
authoredAug 21, 2024··
Merge pull request #48 from funcmike/prepare-beta-realease
Prepare beta realease
2 parents 315471c + 5d7af4c commit cb9c294

File tree

3 files changed

+42
-15
lines changed

3 files changed

+42
-15
lines changed
 

‎README.md

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# RabbitMQNIO
22
[<img src="https://img.shields.io/badge/platform-macOS | Linux-brightgreen.svg" alt="Platform macOS | Linux" />](https://swift.org)
3-
[<img src="https://img.shields.io/badge/swift-5.7-brightgreen.svg" alt="Swift 5.7" />](https://swift.org)
3+
[<img src="https://img.shields.io/badge/swift-5.10-brightgreen.svg" alt="Swift 5.10" />](https://swift.org)
44

55

66
A Swift implementation of AMQP 0.9.1 protocol: decoder + encoder (AMQPProtocol) and non-blocking client (AMQPClient).
@@ -13,21 +13,19 @@ Swift-NIO related code is based on other NIO projects like:
1313
* https://gitlab.com/swift-server-community/RediStack
1414

1515
## State of the project
16-
17-
**!!! WARNING !!!** <br>
18-
This project is in alpha stage and still under development - API can change in the near future before 1.0 release. <br>
19-
Please do extensive tests of Your use case before using it on production! <br>
20-
Nevertheless, current client operations are tested and appears to be stable so do not be afraid to use it. <br>
21-
Please report bugs or missing features. <br>
22-
**!!! WARNING !!!**
23-
24-
AMQPProtocol library currently should cover all of AMQP 0.9.1 specification.
25-
26-
AMQPClient library's architecture using NIO Channels is already done and all of AMQP operations (without WebSockets) should be supported.
27-
Current work is focused on testing, finding bugs, API stabilization and code refactoring / polishing (based on Swift Server Side Community feedback).
16+
This project is in beta stage - API still can change before the first stable release.<br>
17+
It's been used in production for more than a year now.<br>
18+
19+
AMQPProtocol library covers all of AMQP 0.9.1 specification.
20+
AMQPClient library's architecture using NIO Channels is already done, and all of the common AMQP operations are working (without WebSockets support).<br>
21+
The main goal of the project is now to release the first stable version. To do this project needs:
22+
* API stabilization.
23+
* Closing the issues (most of them are quality improvements to the project),
24+
* Refactoring swift-nio code with the newest async stuff.
25+
* Writing proper documentation.
26+
* Polishing based on Swift Server Side Community feedback (please use it and report bugs and suggestions!).
2827

2928
## Basic usage
30-
3129
Create a connection and connect to the AMQP broker using connection string.
3230
```swift
3331
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
@@ -149,7 +147,6 @@ do {
149147
```
150148

151149
## Connection recovery patterns.
152-
153150
Handling broker closing channel or connection disconnects.
154151
Connection to AMQP broker is sustained by answering to heartbeat messages, however on network problem or broker restart connection can be broken.
155152
Broker can also close channel or connection on bad command or other error.

‎Sources/AMQPClient/AMQPChannel.swift

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public final class AMQPChannel: Sendable {
4747
/// - code: Any number - might be logged by the server.
4848
/// - Returns: EventLoopFuture waiting for close response.
4949
@discardableResult
50+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
5051
public func close(reason: String = "", code: UInt16 = 200) -> EventLoopFuture<Void> {
5152
return channel.send(payload: .method(.channel(.close(.init(replyCode: code, replyText: reason, classID: 0, methodID: 0)))))
5253
.flatMapThrowing { response in
@@ -76,6 +77,7 @@ public final class AMQPChannel: Sendable {
7677
/// DeliveryTag is 0 when channel is not in confirm mode.
7778
/// DeliveryTag is > 0 (monotonically increasing) when channel is in confirm mode.
7879
@discardableResult
80+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
7981
public func basicPublish(
8082
from body: ByteBuffer,
8183
exchange: String,
@@ -121,6 +123,7 @@ public final class AMQPChannel: Sendable {
121123
/// - queue: Name of the queue.
122124
/// - noAck: Controls whether message will be acked or nacked automatically (true) or manually (false).
123125
/// - Returns: EventLoopFuture with optional message when queue is not empty.
126+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
124127
public func basicGet(queue: String, noAck: Bool = false) -> EventLoopFuture<AMQPResponse.Channel.Message.Get?> {
125128
return channel.send(payload: .method(.basic(.get(.init(reserved1: 0, queue: queue, noAck: noAck)))))
126129
.flatMapThrowing { response in
@@ -141,6 +144,7 @@ public final class AMQPChannel: Sendable {
141144
/// - listener: Callback when Delivery arrives - automatically registered.
142145
/// - Returns: EventLoopFuture with response confirming that broker has accepted consume request.
143146
@discardableResult
147+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
144148
public func basicConsume(
145149
queue: String,
146150
consumerTag: String = "",
@@ -184,6 +188,7 @@ public final class AMQPChannel: Sendable {
184188
/// - Parameters:
185189
/// - consumerTag: Identifer of the consumer.
186190
/// - Returns: EventLoopFuture waiting for cancel response.
191+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
187192
public func basicCancel(consumerTag: String) -> EventLoopFuture<Void> {
188193
let found: Bool
189194

@@ -216,6 +221,7 @@ public final class AMQPChannel: Sendable {
216221
/// - deliveryTag: Number (identifier) of the message..
217222
/// - multiple: Controls whether only this message is acked (false) or additionally all other up to it (true).
218223
/// - Returns: EventLoopFuture that will be resolved when ack is sent.
224+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
219225
public func basicAck(deliveryTag: UInt64, multiple: Bool = false) -> EventLoopFuture<Void> {
220226
return channel.send(payload: .method(.basic(.ack(deliveryTag: deliveryTag, multiple: multiple))))
221227
}
@@ -225,6 +231,7 @@ public final class AMQPChannel: Sendable {
225231
/// - message: Received message.
226232
/// - multiple: Controls whether only this message is acked (false) or additionally all other up to it (true).
227233
/// - Returns: EventLoopFuture that will be resolved when ack is sent.
234+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
228235
public func basicAck(message: AMQPResponse.Channel.Message.Delivery, multiple: Bool = false) -> EventLoopFuture<Void> {
229236
return basicAck(deliveryTag: message.deliveryTag, multiple: multiple)
230237
}
@@ -235,6 +242,7 @@ public final class AMQPChannel: Sendable {
235242
/// - multiple: Controls whether only this message is rejected (false) or additionally all other up to it (true).
236243
/// - requeue: Controls whether to requeue message after reject.
237244
/// - Returns: EventLoopFuture that will be resolved when nack is sent.
245+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
238246
public func basicNack(deliveryTag: UInt64, multiple: Bool = false, requeue: Bool = false) -> EventLoopFuture<Void> {
239247
return channel.send(payload: .method(.basic(.nack(.init(deliveryTag: deliveryTag, multiple: multiple, requeue: requeue)))))
240248
}
@@ -245,6 +253,7 @@ public final class AMQPChannel: Sendable {
245253
/// - multiple: Controls whether only this message is rejected (false) or additionally all other up to it (true).
246254
/// - requeue: Controls whether to requeue message after reject.
247255
/// - Returns: EventLoopFuture that will be resolved when nack is sent.
256+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
248257
public func basicNack(message: AMQPResponse.Channel.Message.Delivery, multiple: Bool = false, requeue: Bool = false) -> EventLoopFuture<Void> {
249258
return basicNack(deliveryTag: message.deliveryTag, multiple: multiple, requeue: requeue)
250259
}
@@ -254,6 +263,7 @@ public final class AMQPChannel: Sendable {
254263
/// - deliveryTag: Number ((identifier) of the message.
255264
/// - requeue: Controls whether to requeue message after reject.
256265
/// - Returns: EventLoopFuture that will be resolved when reject is sent.
266+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
257267
public func basicReject(deliveryTag: UInt64, requeue: Bool = false) -> EventLoopFuture<Void> {
258268
return channel.send(payload: .method(.basic(.reject(deliveryTag: deliveryTag, requeue: requeue))))
259269
}
@@ -263,6 +273,7 @@ public final class AMQPChannel: Sendable {
263273
/// - message: Received Message.
264274
/// - requeue: Controls whether to requeue message after reject.
265275
/// - Returns: EventLoopFuture that will be resolved when reject is sent.
276+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
266277
public func basicReject(message: AMQPResponse.Channel.Message.Delivery, requeue: Bool = false) -> EventLoopFuture<Void> {
267278
return basicReject(deliveryTag: message.deliveryTag, requeue: requeue)
268279
}
@@ -272,6 +283,7 @@ public final class AMQPChannel: Sendable {
272283
/// - Parameters:
273284
/// - requeue: Controls whether to requeue all messages after rejecting them.
274285
/// - Returns: EventLoopFuture waiting for recover response.
286+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
275287
public func basicRecover(requeue: Bool) -> EventLoopFuture<Void> {
276288
return channel.send(payload: .method(.basic(.recover(requeue: requeue))))
277289
.flatMapThrowing { response in
@@ -288,6 +300,7 @@ public final class AMQPChannel: Sendable {
288300
/// - count: Size of the limit.
289301
/// - global: Whether the limit will be shared across all consumers on the channel.
290302
/// - Returns: EventLoopFuture waiting for qos response.
303+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
291304
public func basicQos(count: UInt16, global: Bool = false) -> EventLoopFuture<Void> {
292305
return channel.send(payload: .method(.basic(.qos(prefetchSize: 0, prefetchCount: count, global: global))))
293306
.flatMapThrowing { response in
@@ -305,6 +318,7 @@ public final class AMQPChannel: Sendable {
305318
/// - active: Flow enabled or disabled.
306319
/// - Returns: EventLoopFuture with response confirming that broker has accepted a flow request.
307320
@discardableResult
321+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
308322
public func flow(active: Bool) -> EventLoopFuture<AMQPResponse.Channel.Flowed> {
309323
return channel.send(payload: .method(.channel(.flow(active: active))))
310324
.flatMapThrowing { response in
@@ -326,6 +340,7 @@ public final class AMQPChannel: Sendable {
326340
/// - arguments: Additional arguments (check rabbitmq documentation).
327341
/// - Returns: EventLoopFuture with response confirming that broker has accepted a request.
328342
@discardableResult
343+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
329344
public func queueDeclare(
330345
name: String,
331346
passive: Bool = false,
@@ -357,6 +372,7 @@ public final class AMQPChannel: Sendable {
357372
/// - ifEmpty: If enabled queue will be deleted only when it's empty.
358373
/// - Returns: EventLoopFuture with response confirming that broker has accepted a delete request.
359374
@discardableResult
375+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
360376
public func queueDelete(name: String, ifUnused: Bool = false, ifEmpty: Bool = false) -> EventLoopFuture<AMQPResponse.Channel.Queue.Deleted> {
361377
return channel.send(payload: .method(.queue(.delete(.init(reserved1: 0, queueName: name, ifUnused: ifUnused, ifEmpty: ifEmpty, noWait: false)))))
362378
.flatMapThrowing { response in
@@ -372,6 +388,7 @@ public final class AMQPChannel: Sendable {
372388
/// - name: Name of the queue.
373389
/// - Returns: EventLoopFuture with response confirming that broker has accepted a delete request.
374390
@discardableResult
391+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
375392
public func queuePurge(name: String) -> EventLoopFuture<AMQPResponse.Channel.Queue.Purged> {
376393
return channel.send(payload: .method(.queue(.purge(.init(reserved1: 0, queueName: name, noWait: false)))))
377394
.flatMapThrowing { response in
@@ -389,6 +406,7 @@ public final class AMQPChannel: Sendable {
389406
/// - routingKey: Bind only to messages matching routingKey.
390407
/// - arguments: Bind only to message matching given options.
391408
/// - Returns: EventLoopFuture waiting for bind response.
409+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
392410
public func queueBind(queue: String, exchange: String, routingKey: String = "", args arguments: Table = Table()) -> EventLoopFuture<Void> {
393411
return channel.send(payload: .method(.queue(.bind(.init(reserved1: 0,
394412
queueName: queue,
@@ -411,6 +429,7 @@ public final class AMQPChannel: Sendable {
411429
/// - routingKey: Unbind only from messages matching routingKey.
412430
/// - arguments: Unbind only from messages matching given options.
413431
/// - Returns: EventLoopFuturewaiting for bind response unbind response.
432+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
414433
public func queueUnbind(queue: String, exchange: String, routingKey: String = "", args arguments: Table = Table()) -> EventLoopFuture<Void> {
415434
return channel.send(payload: .method(.queue(.unbind(.init(reserved1: 0,
416435
queueName: queue,
@@ -434,6 +453,7 @@ public final class AMQPChannel: Sendable {
434453
/// - internal: Whether the exchange cannot be directly published to client.
435454
/// - arguments: Additional arguments (check rabbitmq documentation).
436455
/// - Returns: EventLoopFuture waiting for declare response.
456+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
437457
public func exchangeDeclare(
438458
name: String,
439459
type: String,
@@ -465,6 +485,7 @@ public final class AMQPChannel: Sendable {
465485
/// - name: Name of the queue.
466486
/// - ifUnused: If enabled exchange will be deleted only when it's not used.
467487
/// - Returns: EventLoopFuture waiting for delete response.
488+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
468489
public func exchangeDelete(name: String, ifUnused: Bool = false) -> EventLoopFuture<Void> {
469490
return channel.send(payload: .method(.exchange(.delete(.init(reserved1: 0, exchangeName: name, ifUnused: ifUnused, noWait: false)))))
470491
.flatMapThrowing { response in
@@ -482,6 +503,7 @@ public final class AMQPChannel: Sendable {
482503
/// - routingKey: Bind only to messages matching routingKey.
483504
/// - arguments: Bind only to messages matching given options.
484505
/// - Returns: EventLoopFuture waiting for bind response.
506+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
485507
public func exchangeBind(destination: String, source: String, routingKey: String, args arguments: Table = Table()) -> EventLoopFuture<Void> {
486508
return channel.send(payload: .method(.exchange(.bind(.init(reserved1: 0,
487509
destination: destination,
@@ -504,6 +526,7 @@ public final class AMQPChannel: Sendable {
504526
/// - routingKey: Unbind only from messages matching routingKey.
505527
/// - arguments: Unbind only from messages matching given options.
506528
/// - Returns: EventLoopFuture waiting for unbind response.
529+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
507530
public func exchangeUnbind(destination: String, source: String, routingKey: String, args arguments: Table = Table()) -> EventLoopFuture<Void> {
508531
return channel.send(payload: .method(.exchange(.unbind(.init(reserved1: 0,
509532
destination: destination,
@@ -521,6 +544,7 @@ public final class AMQPChannel: Sendable {
521544

522545
/// Set channel in publish confirm mode, each published message will be acked or nacked.
523546
/// - Returns: EventLoopFuture waiting for confirm select response.
547+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
524548
public func confirmSelect() -> EventLoopFuture<Void> {
525549
guard !isConfirmMode.load(ordering: .relaxed) else {
526550
return eventLoop.makeSucceededFuture(())
@@ -540,6 +564,7 @@ public final class AMQPChannel: Sendable {
540564

541565
/// Set channel in transaction mode.
542566
/// - Returns: EventLoopFuture waiting for tx select response.
567+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
543568
public func txSelect() -> EventLoopFuture<Void> {
544569
guard !isTxMode.load(ordering: .relaxed) else {
545570
return eventLoop.makeSucceededFuture(())
@@ -559,6 +584,7 @@ public final class AMQPChannel: Sendable {
559584

560585
/// Commit a transaction.
561586
/// - Returns: EventLoopFuture waiting for commit response.
587+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
562588
public func txCommit() -> EventLoopFuture<Void> {
563589
return channel.send(payload: .method(.tx(.commit)))
564590
.flatMapThrowing { response in
@@ -571,6 +597,7 @@ public final class AMQPChannel: Sendable {
571597

572598
/// Rollback a transaction.
573599
/// - Returns: EventLoopFuture waiting for rollback response.
600+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
574601
public func txRollback() -> EventLoopFuture<Void> {
575602
return channel.send(payload: .method(.tx(.rollback)))
576603
.flatMapThrowing { response in

‎Sources/AMQPClient/AMQPConnection.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public final class AMQPConnection: Sendable {
5050
/// - eventLoop: EventLoop on which to connect.
5151
/// - config: Configuration data.
5252
/// - Returns: EventLoopFuture with AMQP Connection.
53+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
5354
public static func connect(use eventLoop: EventLoop, from config: AMQPConnectionConfiguration) -> EventLoopFuture<AMQPConnection> {
5455
eventLoop.flatSubmit {
5556
self.boostrapChannel(use: eventLoop, from: config).flatMap { connectionHandler in
@@ -68,6 +69,7 @@ public final class AMQPConnection: Sendable {
6869
/// Can be used only when connection is connected.
6970
/// Channel ID is automatically assigned (next free one).
7071
/// - Returns: EventLoopFuture with AMQP Channel.
72+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
7173
public func openChannel() -> EventLoopFuture<AMQPChannel> {
7274
guard isConnected else { return eventLoop.makeFailedFuture(AMQPConnectionError.connectionClosed()) }
7375

@@ -95,6 +97,7 @@ public final class AMQPConnection: Sendable {
9597
/// - reason: Reason that can be logged by broker.
9698
/// - code: Code that can be logged by broker.
9799
/// - Returns: EventLoopFuture that is resolved when connection is closed.
100+
@available(*, deprecated, message: "EventLoopFuture based public API will be removed in first stable release, please use Async API")
98101
public func close(reason: String = "", code: UInt16 = 200) -> EventLoopFuture<Void> {
99102
let shouldClose = state.withLockedValue { state in
100103
if state == .open {

0 commit comments

Comments
 (0)
Please sign in to comment.