11
11
//
12
12
//===----------------------------------------------------------------------===//
13
13
14
+ import NIOConcurrencyHelpers
14
15
import NIOCore
15
16
import NIOPosix
16
17
import NIOSSL
17
- import NIOConcurrencyHelpers
18
18
19
- public final class AMQPConnection {
19
+ public final class AMQPConnection : Sendable {
20
20
internal enum ConnectionState {
21
21
case open
22
22
case shuttingDown
23
23
case closed
24
24
}
25
-
25
+
26
26
public var isConnected : Bool {
27
27
// `Channel.isActive` is set to false before the `closeFuture` resolves in cases where the channel might be
28
28
// closed, or closing, before our state has been updated
29
- return self . channel. isActive && self . state. withLockedValue { $0 == . open }
30
- }
31
-
32
- public var closeFuture : NIOCore . EventLoopFuture < Void > {
33
- return self . channel. closeFuture
29
+ return channel. isActive && state. withLockedValue { $0 == . open }
34
30
}
35
31
36
- public var eventLoop : EventLoop { return self . channel. eventLoop }
32
+ public var closeFuture : NIOCore . EventLoopFuture < Void > { connectionHandler. channel. closeFuture }
33
+ public var eventLoop : EventLoop { return connectionHandler. channel. eventLoop }
37
34
38
- private let channel : NIOCore . Channel
39
- private let multiplexer : AMQPConnectionMultiplexHandler
35
+ private let connectionHandler : AMQPConnectionHandler
36
+ private var channel : NIOCore . Channel { connectionHandler . channel }
40
37
41
38
private let state = NIOLockedValueBox ( ConnectionState . open)
42
39
private let channels : NIOLockedValueBox < AMQPChannels >
43
40
44
- init ( channel: NIOCore . Channel , multiplexer: AMQPConnectionMultiplexHandler , channelMax: UInt16 ) {
45
- self . channel = channel
46
- self . multiplexer = multiplexer
47
- self . channels = . init( AMQPChannels ( channelMax: channelMax) )
41
+ init ( connectionHandler: AMQPConnectionHandler , channelMax: UInt16 ) {
42
+ self . connectionHandler = connectionHandler
43
+ channels = . init( AMQPChannels ( channelMax: channelMax) )
48
44
}
49
45
50
46
/// Connect to broker.
@@ -54,20 +50,27 @@ public final class AMQPConnection {
54
50
/// - Returns: EventLoopFuture with AMQP Connection.
55
51
public static func connect( use eventLoop: EventLoop , from config: AMQPConnectionConfiguration ) -> EventLoopFuture < AMQPConnection > {
56
52
let promise = eventLoop. makePromise ( of: AMQPResponse . self)
57
- let multiplexer = AMQPConnectionMultiplexHandler ( eventLoop: eventLoop, config: config. server, onReady: promise)
58
53
59
54
return eventLoop. flatSubmit { ( ) -> EventLoopFuture < AMQPConnection > in
60
- let result = self . boostrapChannel ( use: eventLoop, from: config, with: multiplexer) . flatMap { channel in
55
+ let multiplexer = NIOLoopBound (
56
+ AMQPConnectionMultiplexHandler ( eventLoop: eventLoop, config: config. server, onReady: promise) ,
57
+ eventLoop: eventLoop
58
+ )
59
+ let result = self . boostrapChannel ( use: eventLoop, from: config, with: multiplexer. value) . flatMap { channel in
61
60
promise. futureResult. flatMapThrowing { response in
62
- guard case . connection( let connection) = response, case . connected( let connected) = connection else {
61
+ guard case let . connection( connection) = response, case let . connected( connected) = connection else {
63
62
throw AMQPConnectionError . invalidResponse ( response)
64
63
}
65
64
66
- return AMQPConnection ( channel: channel, multiplexer: multiplexer, channelMax: connected. channelMax)
65
+ return AMQPConnection (
66
+ connectionHandler: . init( channel: channel, multiplexer: multiplexer. value) ,
67
+ channelMax: connected. channelMax
68
+ )
67
69
}
68
70
}
69
71
70
- result. whenFailure { err in multiplexer. failAllResponses ( because: err) }
72
+ // TODO: fix passing around of multiplexer
73
+ result. whenFailure { err in multiplexer. value. failAllResponses ( because: err) }
71
74
return result
72
75
}
73
76
}
@@ -77,24 +80,22 @@ public final class AMQPConnection {
77
80
/// Channel ID is automatically assigned (next free one).
78
81
/// - Returns: EventLoopFuture with AMQP Channel.
79
82
public func openChannel( ) -> EventLoopFuture < AMQPChannel > {
80
- guard self . isConnected else { return self . eventLoop. makeFailedFuture ( AMQPConnectionError . connectionClosed ( ) ) }
83
+ guard isConnected else { return eventLoop. makeFailedFuture ( AMQPConnectionError . connectionClosed ( ) ) }
81
84
82
85
let channelID = channels. withLockedValue { $0. reserveNext ( ) }
83
-
86
+
84
87
guard let channelID = channelID else {
85
- return self . eventLoop. makeFailedFuture ( AMQPConnectionError . tooManyOpenedChannels)
88
+ return eventLoop. makeFailedFuture ( AMQPConnectionError . tooManyOpenedChannels)
86
89
}
87
90
88
- return self . eventLoop. flatSubmit {
89
- let future = self . multiplexer. openChannel ( id: channelID)
91
+ let future = connectionHandler. openChannel ( id: channelID)
90
92
91
- future. whenFailure { _ in self . channels. withLockedValue { $0. remove ( id: channelID) } }
92
-
93
- return future. map { channel in
94
- let amqpChannel = AMQPChannel ( channelID: channelID, eventLoop: self . eventLoop, channel: channel)
95
- self . channels. withLockedValue { $0. add ( channel: amqpChannel) }
96
- return amqpChannel
97
- }
93
+ future. whenFailure { _ in self . channels. withLockedValue { $0. remove ( id: channelID) } }
94
+
95
+ return future. map { channel in
96
+ let amqpChannel = AMQPChannel ( channelID: channelID, eventLoop: self . eventLoop, channel: channel)
97
+ self . channels. withLockedValue { $0. add ( channel: amqpChannel) }
98
+ return amqpChannel
98
99
}
99
100
}
100
101
@@ -109,37 +110,35 @@ public final class AMQPConnection {
109
110
state = . shuttingDown
110
111
return true
111
112
}
112
-
113
+
113
114
return false
114
115
}
115
-
116
- guard shouldClose else { return self . channel. closeFuture }
117
-
118
- return self . eventLoop. flatSubmit {
119
- let result = self . multiplexer. close ( reason: reason, code: code)
120
- . map { ( ) in
121
- return nil as Error ?
116
+
117
+ guard shouldClose else { return closeFuture }
118
+
119
+ let result = connectionHandler. close ( reason: reason, code: code)
120
+ . map { ( ) in
121
+ nil as Error ?
122
+ }
123
+ . recover { $0 }
124
+ . flatMap { result in
125
+ self . channel. close ( ) . map {
126
+ self . state. withLockedValue { $0 = . closed }
127
+ return ( result, nil ) as ( Error ? , Error ? )
122
128
}
123
- . recover { $0 }
124
- . flatMap { result in
125
- self . channel. close ( ) . map {
129
+ . recover { error in
130
+ if case ChannelError . alreadyClosed = error {
126
131
self . state. withLockedValue { $0 = . closed }
127
- return ( result, nil ) as ( Error ? , Error ? )
128
- }
129
- . recover { error in
130
- if case ChannelError . alreadyClosed = error {
131
- self . state. withLockedValue { $0 = . closed }
132
- return ( result, nil )
133
- }
134
-
135
- return ( result, error)
132
+ return ( result, nil )
136
133
}
134
+
135
+ return ( result, error)
137
136
}
138
- return result. flatMapThrowing {
139
- let ( broker, conn) = $0
140
- if ( broker ?? conn) != nil { throw AMQPConnectionError . connectionClose ( broker: broker, connection: conn) }
141
- return ( )
142
137
}
138
+ return result. flatMapThrowing {
139
+ let ( broker, conn) = $0
140
+ if ( broker ?? conn) != nil { throw AMQPConnectionError . connectionClose ( broker: broker, connection: conn) }
141
+ return ( )
143
142
}
144
143
}
145
144
@@ -161,17 +160,16 @@ public final class AMQPConnection {
161
160
channel. pipeline. addHandlers ( [
162
161
MessageToByteHandler ( AMQPFrameEncoder ( ) ) ,
163
162
ByteToMessageHandler ( AMQPFrameDecoder ( ) ) ,
164
- handler
163
+ handler,
165
164
] )
166
165
}
167
166
. connect ( host: config. server. host, port: config. server. port)
168
- . map { channelPromise. succeed ( $0) }
169
- . cascadeFailure ( to: channelPromise)
167
+ . cascade ( to: channelPromise)
170
168
} catch {
171
169
channelPromise. fail ( error)
172
170
}
173
171
174
- return channelPromise. futureResult
172
+ return channelPromise. futureResult
175
173
}
176
174
177
175
private static func boostrapClient(
@@ -182,17 +180,17 @@ public final class AMQPConnection {
182
180
preconditionFailure ( " Cannot create bootstrap for the supplied EventLoop " )
183
181
}
184
182
185
- switch config. connection {
186
- case . plain:
183
+ switch config. connection {
184
+ case . plain:
187
185
return NIOClientTCPBootstrap ( clientBootstrap, tls: NIOInsecureNoTLS ( ) )
188
- case . tls( let tls, let sniServerName) :
186
+ case let . tls( tls, sniServerName) :
189
187
let sslContext = try NIOSSLContext ( configuration: tls ?? TLSConfiguration . clientDefault)
190
188
let tlsProvider = try NIOSSLClientTLSProvider < ClientBootstrap > ( context: sslContext, serverHostname: sniServerName ?? config. server. host)
191
189
let bootstrap = NIOClientTCPBootstrap ( clientBootstrap, tls: tlsProvider)
192
190
return bootstrap. enableTLS ( )
193
- }
191
+ }
194
192
}
195
-
193
+
196
194
deinit {
197
195
if isConnected {
198
196
assertionFailure ( " close() was not called before deinit! " )
0 commit comments