Skip to content

Commit db9ec0c

Browse files
authored
feat(kafka): add logic to handle delimited protobufs (#4071)
1 parent aeb6592 commit db9ec0c

19 files changed

+1698
-673
lines changed

package-lock.json

Lines changed: 0 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/kafka/package.json

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@
5151
"@standard-schema/spec": "^1.0.0"
5252
},
5353
"peerDependencies": {
54-
"zod": ">=3.24.0",
54+
"arktype": ">=2.0.0",
5555
"valibot": ">=1.0.0",
56-
"arktype": ">=2.0.0"
56+
"zod": ">=3.24.0"
5757
},
5858
"peerDependenciesMeta": {
5959
"zod": {
@@ -93,12 +93,12 @@
9393
},
9494
"./types": {
9595
"require": {
96-
"types": "./lib/cjs/types/types.d.ts",
97-
"default": "./lib/cjs/types/types.js"
96+
"types": "./lib/cjs/types/index.d.ts",
97+
"default": "./lib/cjs/types/index.js"
9898
},
9999
"import": {
100-
"types": "./lib/esm/types/types.d.ts",
101-
"default": "./lib/esm/types/types.js"
100+
"types": "./lib/esm/types/index.d.ts",
101+
"default": "./lib/esm/types/index.js"
102102
}
103103
}
104104
},
@@ -109,8 +109,8 @@
109109
"lib/esm/errors.d.ts"
110110
],
111111
"types": [
112-
"lib/cjs/types/types.d.ts",
113-
"lib/esm/types/types.d.ts"
112+
"lib/cjs/types/index.d.ts",
113+
"lib/esm/types/index.d.ts"
114114
]
115115
}
116116
},

packages/kafka/src/consumer.ts

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
import type {
1515
ConsumerRecord,
1616
ConsumerRecords,
17+
DeserializeOptions,
1718
Deserializer,
1819
Record as KafkaRecord,
1920
MSKEvent,
@@ -74,11 +75,12 @@ const deserializeHeaders = (headers: Record<string, number[]>[] | null) => {
7475
* @param config - The schema configuration to use for deserialization. See {@link SchemaConfigValue | `SchemaConfigValue`}.
7576
* If not provided, the value is decoded as a UTF-8 string.
7677
*/
77-
const deserialize = (
78-
value: string,
79-
deserializer: Deserializer,
80-
config?: SchemaConfigValue
81-
) => {
78+
const deserialize = ({
79+
value,
80+
deserializer,
81+
config,
82+
schemaMetadata,
83+
}: DeserializeOptions) => {
8284
if (config === undefined) {
8385
return deserializer(value);
8486
}
@@ -100,7 +102,7 @@ const deserialize = (
100102
'Schema string is required for protobuf deserialization'
101103
);
102104
}
103-
return deserializer(value, config.schema);
105+
return deserializer(value, config.schema, schemaMetadata);
104106
}
105107
};
106108

@@ -162,7 +164,14 @@ const deserializeRecord = async (
162164
record: KafkaRecord,
163165
config?: SchemaConfig
164166
) => {
165-
const { key, value, headers, ...rest } = record;
167+
const {
168+
key,
169+
value,
170+
headers,
171+
valueSchemaMetadata,
172+
keySchemaMetadata,
173+
...rest
174+
} = record;
166175
const { key: keyConfig, value: valueConfig } = config || {};
167176

168177
const deserializerKey = await getDeserializer(keyConfig?.type);
@@ -175,19 +184,25 @@ const deserializeRecord = async (
175184
return undefined;
176185
}
177186
if (isNull(key)) return null;
178-
const deserializedKey = deserialize(key, deserializerKey, keyConfig);
187+
const deserializedKey = deserialize({
188+
value: key,
189+
deserializer: deserializerKey,
190+
config: keyConfig,
191+
schemaMetadata: keySchemaMetadata,
192+
});
179193

180194
return keyConfig?.parserSchema
181195
? parseSchema(deserializedKey, keyConfig.parserSchema)
182196
: deserializedKey;
183197
},
184198
originalKey: key,
185199
get value() {
186-
const deserializedValue = deserialize(
187-
value,
188-
deserializerValue,
189-
valueConfig
190-
);
200+
const deserializedValue = deserialize({
201+
value: value,
202+
deserializer: deserializerValue,
203+
config: valueConfig,
204+
schemaMetadata: valueSchemaMetadata,
205+
});
191206

192207
return valueConfig?.parserSchema
193208
? parseSchema(deserializedValue, valueConfig.parserSchema)
Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1-
import type { Message } from 'protobufjs';
1+
import { BufferReader, type Message } from 'protobufjs';
22
import { KafkaConsumerDeserializationError } from '../errors.js';
3-
import type { ProtobufMessage } from '../types/types.js';
3+
import type { ProtobufMessage, SchemaMetadata } from '../types/types.js';
4+
5+
/**
6+
* Default order of varint types used in Protobuf to attempt deserializing Confluent Schema Registry messages.
7+
*/
8+
const varintOrder: Array<'int32' | 'sint32'> = ['int32', 'sint32'];
49

510
/**
611
* Deserialize a Protobuf message from a base64-encoded string.
@@ -10,15 +15,80 @@ import type { ProtobufMessage } from '../types/types.js';
1015
* @param data - The base64-encoded string representing the Protobuf binary data.
1116
* @param messageType - The Protobuf message type definition - see {@link Message | `Message`} from {@link https://www.npmjs.com/package/protobufjs | `protobufjs`}.
1217
*/
13-
const deserialize = <T>(data: string, messageType: ProtobufMessage<T>): T => {
18+
const deserialize = <T>(
19+
data: string,
20+
messageType: ProtobufMessage<T>,
21+
schemaMetadata: SchemaMetadata
22+
): T => {
23+
const buffer = Buffer.from(data, 'base64');
1424
try {
15-
const buffer = Buffer.from(data, 'base64');
16-
return messageType.decode(buffer, buffer.length);
25+
if (schemaMetadata.schemaId === undefined) {
26+
return messageType.decode(buffer, buffer.length);
27+
}
28+
/**
29+
* If `schemaId` is longer than 10 chars, it's an UUID, otherwise it's a numeric ID.
30+
*
31+
* When this is the case, we know the schema is coming from Glue Schema Registry,
32+
* and the first byte of the buffer is a magic byte that we need to remove before
33+
* decoding the message.
34+
*/
35+
if (schemaMetadata.schemaId.length > 10) {
36+
// remove the first byte from the buffer
37+
const reader = new BufferReader(buffer);
38+
reader.uint32();
39+
return messageType.decode(reader);
40+
}
1741
} catch (error) {
1842
throw new KafkaConsumerDeserializationError(
1943
`Failed to deserialize Protobuf message: ${error}, message: ${data}, messageType: ${messageType}`
2044
);
2145
}
46+
47+
/**
48+
* If schemaId is numeric, inferred from its length, we know it's coming from Confluent Schema Registry,
49+
* so we need to remove the MessageIndex bytes.
50+
* We don't know the type of the index, so we try both `int32` and `sint32`. If both fail, we throw an error.
51+
*/
52+
try {
53+
const newBuffer = clipConfluentSchemaRegistryBuffer(buffer, varintOrder[0]);
54+
return messageType.decode(newBuffer);
55+
} catch (error) {
56+
try {
57+
const newBuffer = clipConfluentSchemaRegistryBuffer(
58+
buffer,
59+
varintOrder[1]
60+
);
61+
const decoded = messageType.decode(newBuffer);
62+
// swap varint order if the first attempt failed so we can use the correct one for subsequent messages
63+
varintOrder.reverse();
64+
return decoded;
65+
} catch {
66+
throw new KafkaConsumerDeserializationError(
67+
`Failed to deserialize Protobuf message: ${error}, message: ${data}, messageType: ${messageType}`
68+
);
69+
}
70+
}
71+
};
72+
73+
/**
74+
* Clip the Confluent Schema Registry buffer to remove the index bytes.
75+
*
76+
* @param buffer - The buffer to clip.
77+
* @param intType - The type of the integer to read from the buffer, either 'int32' or 'sint32'.
78+
*/
79+
const clipConfluentSchemaRegistryBuffer = (
80+
buffer: Buffer,
81+
intType: 'int32' | 'sint32'
82+
) => {
83+
const reader = new BufferReader(buffer);
84+
/**
85+
* Read the first varint byte to get the index count or 0.
86+
* Doing so, also advances the reader position to the next byte after the index count.
87+
*/
88+
const indexCount = intType === 'int32' ? reader.int32() : reader.sint32();
89+
// Skip the index bytes
90+
reader.skip(indexCount);
91+
return reader;
2292
};
2393

2494
export { deserialize };

packages/kafka/src/types/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
export type {
2+
ConsumerRecord,
3+
ConsumerRecords,
4+
MSKEvent,
5+
ProtobufMessage,
6+
Record,
7+
RecordHeader,
8+
SchemaType,
9+
SchemaConfig,
10+
SchemaConfigValue,
11+
SchemaMetadata,
12+
} from './types.js';

0 commit comments

Comments
 (0)