11
11
//
12
12
//===----------------------------------------------------------------------===//
13
13
14
+ import NIOConcurrencyHelpers
14
15
import NIOCore
15
16
import NIOPosix
16
17
import NIOSSL
17
- import NIOConcurrencyHelpers
18
18
19
- public final class AMQPConnection {
19
+ public final class AMQPConnection : Sendable {
20
20
internal enum ConnectionState {
21
21
case open
22
22
case shuttingDown
23
23
case closed
24
24
}
25
-
25
+
26
26
public var isConnected : Bool {
27
27
// `Channel.isActive` is set to false before the `closeFuture` resolves in cases where the channel might be
28
28
// closed, or closing, before our state has been updated
29
- return self . channel. isActive && self . state. withLockedValue { $0 == . open }
29
+ return channel. isActive && state. withLockedValue { $0 == . open }
30
30
}
31
31
32
- public var closeFuture : NIOCore . EventLoopFuture < Void > {
33
- return self . channel. closeFuture
34
- }
35
-
36
- public var eventLoop : EventLoop { return self . channel. eventLoop }
32
+ public var closeFuture : NIOCore . EventLoopFuture < Void > { connectionHandler. channel. closeFuture }
33
+ public var eventLoop : EventLoop { return connectionHandler. channel. eventLoop }
37
34
38
- private let channel : NIOCore . Channel
39
- private let multiplexer : AMQPConnectionMultiplexHandler
35
+ private let connectionHandler : AMQPConnectionHandler
36
+ private var channel : NIOCore . Channel { connectionHandler . channel }
40
37
41
38
private let state = NIOLockedValueBox ( ConnectionState . open)
42
39
private let channels : NIOLockedValueBox < AMQPChannels >
43
40
44
- init ( channel: NIOCore . Channel , multiplexer: AMQPConnectionMultiplexHandler , channelMax: UInt16 ) {
45
- self . channel = channel
46
- self . multiplexer = multiplexer
47
- self . channels = . init( AMQPChannels ( channelMax: channelMax) )
41
+ init ( connectionHandler: AMQPConnectionHandler , channelMax: UInt16 ) {
42
+ self . connectionHandler = connectionHandler
43
+ channels = . init( AMQPChannels ( channelMax: channelMax) )
48
44
}
49
45
50
46
/// Connect to broker.
@@ -53,22 +49,15 @@ public final class AMQPConnection {
53
49
/// - config: Configuration data.
54
50
/// - Returns: EventLoopFuture with AMQP Connection.
55
51
public static func connect( use eventLoop: EventLoop , from config: AMQPConnectionConfiguration ) -> EventLoopFuture < AMQPConnection > {
56
- let promise = eventLoop. makePromise ( of: AMQPResponse . self)
57
- let multiplexer = AMQPConnectionMultiplexHandler ( eventLoop: eventLoop, config: config. server, onReady: promise)
58
-
59
- return eventLoop. flatSubmit { ( ) -> EventLoopFuture < AMQPConnection > in
60
- let result = self . boostrapChannel ( use: eventLoop, from: config, with: multiplexer) . flatMap { channel in
61
- promise. futureResult. flatMapThrowing { response in
62
- guard case . connection( let connection) = response, case . connected( let connected) = connection else {
63
- throw AMQPConnectionError . invalidResponse ( response)
64
- }
65
-
66
- return AMQPConnection ( channel: channel, multiplexer: multiplexer, channelMax: connected. channelMax)
52
+ eventLoop. flatSubmit {
53
+ self . boostrapChannel ( use: eventLoop, from: config) . flatMap { connectionHandler in
54
+ connectionHandler. startConnection ( ) . map {
55
+ AMQPConnection (
56
+ connectionHandler: connectionHandler,
57
+ channelMax: $0. channelMax
58
+ )
67
59
}
68
60
}
69
-
70
- result. whenFailure { err in multiplexer. failAllResponses ( because: err) }
71
- return result
72
61
}
73
62
}
74
63
@@ -77,24 +66,22 @@ public final class AMQPConnection {
77
66
/// Channel ID is automatically assigned (next free one).
78
67
/// - Returns: EventLoopFuture with AMQP Channel.
79
68
public func openChannel( ) -> EventLoopFuture < AMQPChannel > {
80
- guard self . isConnected else { return self . eventLoop. makeFailedFuture ( AMQPConnectionError . connectionClosed ( ) ) }
69
+ guard isConnected else { return eventLoop. makeFailedFuture ( AMQPConnectionError . connectionClosed ( ) ) }
81
70
82
71
let channelID = channels. withLockedValue { $0. reserveNext ( ) }
83
-
72
+
84
73
guard let channelID = channelID else {
85
- return self . eventLoop. makeFailedFuture ( AMQPConnectionError . tooManyOpenedChannels)
74
+ return eventLoop. makeFailedFuture ( AMQPConnectionError . tooManyOpenedChannels)
86
75
}
87
76
88
- return self . eventLoop. flatSubmit {
89
- let future = self . multiplexer. openChannel ( id: channelID)
77
+ let future = connectionHandler. openChannel ( id: channelID)
90
78
91
- future. whenFailure { _ in self . channels. withLockedValue { $0. remove ( id: channelID) } }
92
-
93
- return future. map { channel in
94
- let amqpChannel = AMQPChannel ( channelID: channelID, eventLoop: self . eventLoop, channel: channel)
95
- self . channels. withLockedValue { $0. add ( channel: amqpChannel) }
96
- return amqpChannel
97
- }
79
+ future. whenFailure { _ in self . channels. withLockedValue { $0. remove ( id: channelID) } }
80
+
81
+ return future. map { channel in
82
+ let amqpChannel = AMQPChannel ( channelID: channelID, eventLoop: self . eventLoop, channel: channel)
83
+ self . channels. withLockedValue { $0. add ( channel: amqpChannel) }
84
+ return amqpChannel
98
85
}
99
86
}
100
87
@@ -109,69 +96,64 @@ public final class AMQPConnection {
109
96
state = . shuttingDown
110
97
return true
111
98
}
112
-
99
+
113
100
return false
114
101
}
115
-
116
- guard shouldClose else { return self . channel. closeFuture }
117
-
118
- return self . eventLoop. flatSubmit {
119
- let result = self . multiplexer. close ( reason: reason, code: code)
120
- . map { ( ) in
121
- return nil as Error ?
102
+
103
+ guard shouldClose else { return closeFuture }
104
+
105
+ let result = connectionHandler. close ( reason: reason, code: code)
106
+ . map { ( ) in
107
+ nil as Error ?
108
+ }
109
+ . recover { $0 }
110
+ . flatMap { result in
111
+ self . channel. close ( ) . map {
112
+ self . state. withLockedValue { $0 = . closed }
113
+ return ( result, nil ) as ( Error ? , Error ? )
122
114
}
123
- . recover { $0 }
124
- . flatMap { result in
125
- self . channel. close ( ) . map {
115
+ . recover { error in
116
+ if case ChannelError . alreadyClosed = error {
126
117
self . state. withLockedValue { $0 = . closed }
127
- return ( result, nil ) as ( Error ? , Error ? )
128
- }
129
- . recover { error in
130
- if case ChannelError . alreadyClosed = error {
131
- self . state. withLockedValue { $0 = . closed }
132
- return ( result, nil )
133
- }
134
-
135
- return ( result, error)
118
+ return ( result, nil )
136
119
}
120
+
121
+ return ( result, error)
137
122
}
138
- return result. flatMapThrowing {
139
- let ( broker, conn) = $0
140
- if ( broker ?? conn) != nil { throw AMQPConnectionError . connectionClose ( broker: broker, connection: conn) }
141
- return ( )
142
123
}
124
+ return result. flatMapThrowing {
125
+ let ( broker, conn) = $0
126
+ if ( broker ?? conn) != nil { throw AMQPConnectionError . connectionClose ( broker: broker, connection: conn) }
127
+ return ( )
143
128
}
144
129
}
145
130
146
131
private static func boostrapChannel(
147
132
use eventLoop: EventLoop ,
148
- from config: AMQPConnectionConfiguration ,
149
- with handler: AMQPConnectionMultiplexHandler
150
- ) -> EventLoopFuture < NIOCore . Channel > {
151
- let channelPromise = eventLoop. makePromise ( of: NIOCore . Channel. self)
152
-
133
+ from config: AMQPConnectionConfiguration
134
+ ) -> EventLoopFuture < AMQPConnectionHandler > {
153
135
do {
154
136
let bootstrap = try boostrapClient ( use: eventLoop, from: config)
137
+ let multiplexer = NIOLoopBound (
138
+ AMQPConnectionMultiplexHandler ( eventLoop: eventLoop, config: config. server) ,
139
+ eventLoop: eventLoop)
155
140
156
- bootstrap
141
+ return bootstrap
157
142
. channelOption ( ChannelOptions . socketOption ( . so_reuseaddr) , value: 1 )
158
143
. channelOption ( ChannelOptions . socket ( IPPROTO_TCP, TCP_NODELAY) , value: 1 )
159
144
. connectTimeout ( config. server. timeout)
160
145
. channelInitializer { channel in
161
146
channel. pipeline. addHandlers ( [
162
147
MessageToByteHandler ( AMQPFrameEncoder ( ) ) ,
163
148
ByteToMessageHandler ( AMQPFrameDecoder ( ) ) ,
164
- handler
149
+ multiplexer . value ,
165
150
] )
166
151
}
167
152
. connect ( host: config. server. host, port: config. server. port)
168
- . map { channelPromise. succeed ( $0) }
169
- . cascadeFailure ( to: channelPromise)
153
+ . map { AMQPConnectionHandler ( channel: $0, multiplexer: multiplexer. value) }
170
154
} catch {
171
- channelPromise . fail ( error)
155
+ return eventLoop . makeFailedFuture ( error)
172
156
}
173
-
174
- return channelPromise. futureResult
175
157
}
176
158
177
159
private static func boostrapClient(
@@ -182,17 +164,17 @@ public final class AMQPConnection {
182
164
preconditionFailure ( " Cannot create bootstrap for the supplied EventLoop " )
183
165
}
184
166
185
- switch config. connection {
186
- case . plain:
167
+ switch config. connection {
168
+ case . plain:
187
169
return NIOClientTCPBootstrap ( clientBootstrap, tls: NIOInsecureNoTLS ( ) )
188
- case . tls( let tls, let sniServerName) :
170
+ case let . tls( tls, sniServerName) :
189
171
let sslContext = try NIOSSLContext ( configuration: tls ?? TLSConfiguration . clientDefault)
190
172
let tlsProvider = try NIOSSLClientTLSProvider < ClientBootstrap > ( context: sslContext, serverHostname: sniServerName ?? config. server. host)
191
173
let bootstrap = NIOClientTCPBootstrap ( clientBootstrap, tls: tlsProvider)
192
174
return bootstrap. enableTLS ( )
193
- }
175
+ }
194
176
}
195
-
177
+
196
178
deinit {
197
179
if isConnected {
198
180
assertionFailure ( " close() was not called before deinit! " )
0 commit comments