@@ -42,7 +42,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
42
42
// NOTE: this can be extended to keep some state of the open request so a response can be verified against its request
43
43
var pendingRequests : Deque < EventLoopPromise < AMQPResponse > >
44
44
weak var eventHandler : AMQPChannelHandler ?
45
- var nextMessage : ( frame: Frame . Method . Basic , properties: Properties ? ) ?
45
+ var nextMessage : ( frame: Frame . Method . Basic , properties: Properties ? , bodySize : UInt64 ? , prevBody : ByteBuffer ? ) ?
46
46
47
47
init ( initialResponsePromise: EventLoopPromise < AMQPResponse > ) {
48
48
pendingRequests = . init( [ initialResponsePromise] )
@@ -53,6 +53,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
53
53
private var context : ChannelHandlerContext !
54
54
private var channels : [ Frame . ChannelID : ChannelState ] = [ : ]
55
55
private var channelMax : UInt16 = 0
56
+ private var frameMax : UInt32 = 0
56
57
private var state : State = . unblocked
57
58
58
59
private let config : AMQPConnectionConfiguration . Server
@@ -131,14 +132,15 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
131
132
context. writeAndFlush ( wrapOutboundOut ( . frame( startOk) ) , promise: nil )
132
133
case let . tune( channelMax, frameMax, heartbeat) :
133
134
self . channelMax = channelMax
135
+ self . frameMax = frameMax
134
136
135
137
let tuneOk = Frame ( channelID: frame. channelID,
136
138
payload: . method( . connection( . tuneOk( channelMax: channelMax, frameMax: frameMax, heartbeat: heartbeat) ) ) )
137
139
let open = Frame ( channelID: frame. channelID, payload: . method( . connection( . open( . init( vhost: config. vhost) ) ) ) )
138
140
139
141
context. writeAndFlush ( wrapOutboundOut ( . bulk( [ tuneOk, open] ) ) , promise: nil )
140
142
case . openOk:
141
- channel. fulfilNextPendingRequest ( with: . connection( . connected( . init( channelMax: channelMax) ) ) )
143
+ channel. fulfilNextPendingRequest ( with: . connection( . connected( . init( channelMax: channelMax, frameMax : frameMax ) ) ) )
142
144
case let . close( close) :
143
145
let closeOk = Frame ( channelID: frame. channelID, payload: . method( . connection( . closeOk) ) )
144
146
context. writeAndFlush ( wrapOutboundOut ( . frame( closeOk) ) , promise: nil )
@@ -205,7 +207,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
205
207
channel. fulfilNextPendingRequest ( with: . channel( . message( . get( ) ) ) )
206
208
case . deliver, . getOk, . return:
207
209
// TODO: wrap this away more nicely, assert message must be nil
208
- channel. nextMessage = ( frame: basic, nil )
210
+ channel. nextMessage = ( frame: basic, nil , nil , nil )
209
211
case . recoverOk:
210
212
channel. fulfilNextPendingRequest ( with: . channel( . basic( . recovered) ) )
211
213
case let . consumeOk( consumerTag) :
@@ -265,12 +267,34 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
265
267
}
266
268
case let . header( header) :
267
269
channel. nextMessage? . properties = header. properties
270
+ channel. nextMessage? . bodySize = header. bodySize
268
271
case let . body( body) :
269
- guard let msg = channel. nextMessage, let properties = msg. properties else {
272
+ guard let msg = channel. nextMessage, let properties = msg. properties, let bodySize = msg . bodySize else {
270
273
// TODO: take down channel
271
274
return
272
275
}
273
276
277
+ let prevSize = msg. prevBody? . readableBytes ?? 0
278
+ if ( prevSize + body. readableBytes < bodySize) {
279
+ if var prevBody = msg. prevBody {
280
+ prevBody. writeImmutableBuffer ( body)
281
+ channel. nextMessage? . prevBody = prevBody
282
+ } else {
283
+ channel. nextMessage? . prevBody = body
284
+ }
285
+
286
+ return
287
+ }
288
+
289
+ let completeBody : ByteBuffer
290
+
291
+ if var prevBody = msg. prevBody {
292
+ prevBody. writeImmutableBuffer ( body)
293
+ completeBody = prevBody
294
+ } else {
295
+ completeBody = body
296
+ }
297
+
274
298
switch msg. frame {
275
299
case let . getOk( getOk) :
276
300
channel. fulfilNextPendingRequest ( with: . channel( . message( . get( . init(
@@ -280,7 +304,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
280
304
deliveryTag: getOk. deliveryTag,
281
305
properties: properties,
282
306
redelivered: getOk. redelivered,
283
- body: body
307
+ body: completeBody
284
308
) ,
285
309
messageCount: getOk. messageCount
286
310
) ) ) ) )
@@ -292,7 +316,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
292
316
deliveryTag: deliver. deliveryTag,
293
317
properties: properties,
294
318
redelivered: deliver. redelivered,
295
- body: body
319
+ body: completeBody
296
320
) ,
297
321
for: deliver. consumerTag
298
322
)
@@ -303,7 +327,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
303
327
exchange: `return`. exchange,
304
328
routingKey: `return`. routingKey,
305
329
properties: properties,
306
- body: body
330
+ body: completeBody
307
331
) )
308
332
default :
309
333
// TODO: take down channel
0 commit comments