Skip to content

Commit 5665afd

Browse files
author
Krzysztof Majk
committed
add first implemention of large payload support without tests
1 parent bb64025 commit 5665afd

File tree

4 files changed

+60
-12
lines changed

4 files changed

+60
-12
lines changed

Sources/AMQPClient/AMQPChannel.swift

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ public final class AMQPChannel: Sendable {
3232
private let isConfirmMode = ManagedAtomic(false)
3333
private let isTxMode = ManagedAtomic(false)
3434
private let deliveryTag = ManagedAtomic(UInt64(1))
35+
private let frameMax: UInt32
3536

36-
init(channelID: Frame.ChannelID, eventLoop: EventLoop, channel: AMQPChannelHandler) {
37+
init(channelID: Frame.ChannelID, eventLoop: EventLoop, channel: AMQPChannelHandler, frameMax: UInt32) {
3738
ID = channelID
3839
self.eventLoop = eventLoop
3940
self.channel = channel
41+
self.frameMax = frameMax
4042
}
4143

4244
/// Close the channel
@@ -88,7 +90,25 @@ public final class AMQPChannel: Sendable {
8890

8991
let header = Frame.Payload.header(.init(classID: classID, weight: 0, bodySize: UInt64(body.readableBytes), properties: properties))
9092

91-
let result: EventLoopFuture<Void> = channel.send(payloads: [publish, header, .body(body)])
93+
let payloads: [Frame.Payload]
94+
95+
if body.readableBytes <= frameMax {
96+
payloads = [publish, header, .body(body)]
97+
} else {
98+
var parts = [publish, header]
99+
var buffer = body
100+
101+
while(buffer.readableBytes > 0) {
102+
guard let bytes = buffer.readBytes(length: frameMax < buffer.readableBytes ? Int(frameMax) : buffer.readableBytes) else {
103+
preconditionFailure("invalid bytes read")
104+
}
105+
parts.append(.body(.init(bytes: bytes)))
106+
}
107+
108+
payloads = parts
109+
}
110+
111+
let result: EventLoopFuture<Void> = channel.send(payloads: payloads)
92112
return result.map {
93113
if self.isConfirmMode.load(ordering: .relaxed) {
94114
let count = self.deliveryTag.loadThenWrappingIncrement(ordering: .sequentiallyConsistent)

Sources/AMQPClient/AMQPConnection.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,17 @@ public final class AMQPConnection: Sendable {
3131

3232
public var closeFuture: NIOCore.EventLoopFuture<Void> { connectionHandler.channel.closeFuture }
3333
public var eventLoop: EventLoop { return connectionHandler.channel.eventLoop }
34+
private let frameMax: UInt32
3435

3536
private let connectionHandler: AMQPConnectionHandler
3637
private var channel: NIOCore.Channel { connectionHandler.channel }
3738

3839
private let state = NIOLockedValueBox(ConnectionState.open)
3940
private let channels: NIOLockedValueBox<AMQPChannels>
4041

41-
init(connectionHandler: AMQPConnectionHandler, channelMax: UInt16) {
42+
init(connectionHandler: AMQPConnectionHandler, channelMax: UInt16, frameMax: UInt32) {
4243
self.connectionHandler = connectionHandler
44+
self.frameMax = frameMax
4345
channels = .init(AMQPChannels(channelMax: channelMax))
4446
}
4547

@@ -54,7 +56,8 @@ public final class AMQPConnection: Sendable {
5456
connectionHandler.startConnection().map {
5557
AMQPConnection(
5658
connectionHandler: connectionHandler,
57-
channelMax: $0.channelMax
59+
channelMax: $0.channelMax,
60+
frameMax: $0.frameMax
5861
)
5962
}
6063
}
@@ -79,7 +82,7 @@ public final class AMQPConnection: Sendable {
7982
future.whenFailure { _ in self.channels.withLockedValue { $0.remove(id: channelID) } }
8083

8184
return future.map { channel in
82-
let amqpChannel = AMQPChannel(channelID: channelID, eventLoop: self.eventLoop, channel: channel)
85+
let amqpChannel = AMQPChannel(channelID: channelID, eventLoop: self.eventLoop, channel: channel, frameMax: self.frameMax)
8386
self.channels.withLockedValue { $0.add(channel: amqpChannel) }
8487
return amqpChannel
8588
}

Sources/AMQPClient/AMQPResponse.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public enum AMQPResponse: Sendable {
131131

132132
public struct Connected: Sendable {
133133
public let channelMax: UInt16
134+
public let frameMax: UInt32
134135
}
135136
}
136137
}

Sources/AMQPClient/ChannelHandlers/AMQPConnectionMultiplexHandler.swift

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
4242
// NOTE: this can be extended to keep some state of the open request so a response can be verified against its request
4343
var pendingRequests: Deque<EventLoopPromise<AMQPResponse>>
4444
weak var eventHandler: AMQPChannelHandler?
45-
var nextMessage: (frame: Frame.Method.Basic, properties: Properties?)?
45+
var nextMessage: (frame: Frame.Method.Basic, properties: Properties?, bodySize: UInt64?, prevBody: ByteBuffer?)?
4646

4747
init(initialResponsePromise: EventLoopPromise<AMQPResponse>) {
4848
pendingRequests = .init([initialResponsePromise])
@@ -53,6 +53,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
5353
private var context: ChannelHandlerContext!
5454
private var channels: [Frame.ChannelID: ChannelState] = [:]
5555
private var channelMax: UInt16 = 0
56+
private var frameMax: UInt32 = 0
5657
private var state: State = .unblocked
5758

5859
private let config: AMQPConnectionConfiguration.Server
@@ -131,14 +132,15 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
131132
context.writeAndFlush(wrapOutboundOut(.frame(startOk)), promise: nil)
132133
case let .tune(channelMax, frameMax, heartbeat):
133134
self.channelMax = channelMax
135+
self.frameMax = frameMax
134136

135137
let tuneOk = Frame(channelID: frame.channelID,
136138
payload: .method(.connection(.tuneOk(channelMax: channelMax, frameMax: frameMax, heartbeat: heartbeat))))
137139
let open = Frame(channelID: frame.channelID, payload: .method(.connection(.open(.init(vhost: config.vhost)))))
138140

139141
context.writeAndFlush(wrapOutboundOut(.bulk([tuneOk, open])), promise: nil)
140142
case .openOk:
141-
channel.fulfilNextPendingRequest(with: .connection(.connected(.init(channelMax: channelMax))))
143+
channel.fulfilNextPendingRequest(with: .connection(.connected(.init(channelMax: channelMax, frameMax: frameMax))))
142144
case let .close(close):
143145
let closeOk = Frame(channelID: frame.channelID, payload: .method(.connection(.closeOk)))
144146
context.writeAndFlush(wrapOutboundOut(.frame(closeOk)), promise: nil)
@@ -205,7 +207,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
205207
channel.fulfilNextPendingRequest(with: .channel(.message(.get())))
206208
case .deliver, .getOk, .return:
207209
// TODO: wrap this away more nicely, assert message must be nil
208-
channel.nextMessage = (frame: basic, nil)
210+
channel.nextMessage = (frame: basic, nil, nil, nil)
209211
case .recoverOk:
210212
channel.fulfilNextPendingRequest(with: .channel(.basic(.recovered)))
211213
case let .consumeOk(consumerTag):
@@ -265,12 +267,34 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
265267
}
266268
case let .header(header):
267269
channel.nextMessage?.properties = header.properties
270+
channel.nextMessage?.bodySize = header.bodySize
268271
case let .body(body):
269-
guard let msg = channel.nextMessage, let properties = msg.properties else {
272+
guard let msg = channel.nextMessage, let properties = msg.properties, let bodySize = msg.bodySize else {
270273
// TODO: take down channel
271274
return
272275
}
273276

277+
let prevSize = msg.prevBody?.readableBytes ?? 0
278+
if (prevSize + body.readableBytes < bodySize) {
279+
if var prevBody = msg.prevBody {
280+
prevBody.writeImmutableBuffer(body)
281+
channel.nextMessage?.prevBody = prevBody
282+
} else {
283+
channel.nextMessage?.prevBody = body
284+
}
285+
286+
return
287+
}
288+
289+
let completeBody: ByteBuffer
290+
291+
if var prevBody = msg.prevBody {
292+
prevBody.writeImmutableBuffer(body)
293+
completeBody = prevBody
294+
} else {
295+
completeBody = body
296+
}
297+
274298
switch msg.frame {
275299
case let .getOk(getOk):
276300
channel.fulfilNextPendingRequest(with: .channel(.message(.get(.init(
@@ -280,7 +304,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
280304
deliveryTag: getOk.deliveryTag,
281305
properties: properties,
282306
redelivered: getOk.redelivered,
283-
body: body
307+
body: completeBody
284308
),
285309
messageCount: getOk.messageCount
286310
)))))
@@ -292,7 +316,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
292316
deliveryTag: deliver.deliveryTag,
293317
properties: properties,
294318
redelivered: deliver.redelivered,
295-
body: body
319+
body: completeBody
296320
),
297321
for: deliver.consumerTag
298322
)
@@ -303,7 +327,7 @@ internal final class AMQPConnectionMultiplexHandler: ChannelDuplexHandler {
303327
exchange: `return`.exchange,
304328
routingKey: `return`.routingKey,
305329
properties: properties,
306-
body: body
330+
body: completeBody
307331
))
308332
default:
309333
// TODO: take down channel

0 commit comments

Comments
 (0)