Skip to content

Commit 04c3236

Browse files
authored
fix(kafka): handle tombstone events (#4991)
1 parent 8e1359e commit 04c3236

3 files changed

Lines changed: 117 additions & 88 deletions

File tree

packages/kafka/src/consumer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ const deserializeRecord = async (
196196
},
197197
originalKey: key,
198198
get value() {
199+
if (isNull(value)) return null;
199200
const deserializedValue = deserialize({
200201
value: value,
201202
deserializer: deserializerValue,

packages/kafka/src/types/types.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ type Record = {
201201
/**
202202
* Base64-encoded value of the record
203203
*/
204-
value: string;
204+
value: string | null;
205205
/**
206206
* Array of record headers
207207
*/
@@ -246,13 +246,13 @@ type ProtobufMessage<T = unknown> = {
246246
};
247247

248248
type Deserializer = (
249-
input: string,
249+
input: string | null,
250250
schema?: unknown,
251251
schemaMetadata?: SchemaMetadata
252252
) => unknown;
253253

254254
type DeserializeOptions = {
255-
value: string;
255+
value: string | null;
256256
deserializer: Deserializer;
257257
config?: SchemaConfigValue;
258258
schemaMetadata: SchemaMetadata;

packages/kafka/tests/unit/consumer.test.ts

Lines changed: 113 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -136,43 +136,44 @@ describe('Kafka consumer', () => {
136136
event: structuredClone(avroTestEvent),
137137
error: KafkaConsumerMissingSchemaError,
138138
},
139-
])(
140-
'throws when schemaStr not passed for $type event',
141-
async ({ type, error, event }) => {
142-
// Prepare
143-
const handler = kafkaConsumer(
144-
async (event) => {
145-
const results = [];
146-
for (const record of event.records) {
147-
try {
148-
results.push(record.value);
149-
await setTimeout(1); // simulate some processing time
150-
} catch (error) {
151-
return error;
152-
}
139+
])('throws when schemaStr not passed for $type event', async ({
140+
type,
141+
error,
142+
event,
143+
}) => {
144+
// Prepare
145+
const handler = kafkaConsumer(
146+
async (event) => {
147+
const results = [];
148+
for (const record of event.records) {
149+
try {
150+
results.push(record.value);
151+
await setTimeout(1); // simulate some processing time
152+
} catch (error) {
153+
return error;
153154
}
154-
return results;
155-
},
156-
{
157-
// @ts-expect-error - testing missing schemaStr
158-
value: { type },
159155
}
160-
);
161-
162-
// Act
163-
const result = await handler(event, context);
164-
165-
// Assess
166-
expect(result).toEqual(
167-
expect.objectContaining({
168-
message: expect.stringContaining(
169-
`Schema string is required for ${type} deserialization`
170-
),
171-
name: error.name,
172-
})
173-
);
174-
}
175-
);
156+
return results;
157+
},
158+
{
159+
// @ts-expect-error - testing missing schemaStr
160+
value: { type },
161+
}
162+
);
163+
164+
// Act
165+
const result = await handler(event, context);
166+
167+
// Assess
168+
expect(result).toEqual(
169+
expect.objectContaining({
170+
message: expect.stringContaining(
171+
`Schema string is required for ${type} deserialization`
172+
),
173+
name: error.name,
174+
})
175+
);
176+
});
176177

177178
it('throws if using an unsupported schema type', async () => {
178179
// Prepare
@@ -239,59 +240,58 @@ describe('Kafka consumer', () => {
239240
return event;
240241
})(),
241242
},
242-
])(
243-
'throws when parser schema validation fails for $type',
244-
async ({ event }) => {
245-
// Prepare
246-
const handler = kafkaConsumer(
247-
async (event) => {
248-
const results = [];
249-
for (const record of event.records) {
250-
try {
251-
const { value, key } = record;
252-
await setTimeout(1); // simulate some processing time
253-
results.push([value, key]);
254-
} catch (error) {
255-
return error;
256-
}
243+
])('throws when parser schema validation fails for $type', async ({
244+
event,
245+
}) => {
246+
// Prepare
247+
const handler = kafkaConsumer(
248+
async (event) => {
249+
const results = [];
250+
for (const record of event.records) {
251+
try {
252+
const { value, key } = record;
253+
await setTimeout(1); // simulate some processing time
254+
results.push([value, key]);
255+
} catch (error) {
256+
return error;
257257
}
258-
return results;
259-
},
260-
{
261-
value: {
262-
type: SchemaType.JSON,
263-
parserSchema: z.object({
264-
id: z.number(),
265-
name: z.string(),
266-
price: z.number().positive({
267-
message: "Price can't be negative",
268-
}),
269-
}),
270-
},
271-
key: {
272-
type: SchemaType.JSON,
273-
parserSchema: z.string(),
274-
},
275258
}
276-
);
277-
278-
// Act & Assess
279-
const result = await handler(event, context);
280-
281-
expect(result).toEqual(
282-
expect.objectContaining({
283-
message: expect.stringContaining('Schema validation failed'),
284-
name: 'KafkaConsumerParserError',
285-
cause: expect.arrayContaining([
286-
expect.objectContaining({
287-
code: expect.any(String),
288-
message: expect.any(String),
259+
return results;
260+
},
261+
{
262+
value: {
263+
type: SchemaType.JSON,
264+
parserSchema: z.object({
265+
id: z.number(),
266+
name: z.string(),
267+
price: z.number().positive({
268+
message: "Price can't be negative",
289269
}),
290-
]),
291-
})
292-
);
293-
}
294-
);
270+
}),
271+
},
272+
key: {
273+
type: SchemaType.JSON,
274+
parserSchema: z.string(),
275+
},
276+
}
277+
);
278+
279+
// Act & Assess
280+
const result = await handler(event, context);
281+
282+
expect(result).toEqual(
283+
expect.objectContaining({
284+
message: expect.stringContaining('Schema validation failed'),
285+
name: 'KafkaConsumerParserError',
286+
cause: expect.arrayContaining([
287+
expect.objectContaining({
288+
code: expect.any(String),
289+
message: expect.any(String),
290+
}),
291+
]),
292+
})
293+
);
294+
});
295295

296296
it('throws when non MSK event passed kafka consumer', async () => {
297297
// Prepare
@@ -581,4 +581,32 @@ describe('Kafka consumer', () => {
581581
})
582582
);
583583
});
584+
585+
it('handles tombstone events with null as message value', async () => {
586+
// Prepare
587+
const event = structuredClone(jsonTestEvent);
588+
event.records['mytopic-0'][0].value = null;
589+
590+
const handler = kafkaConsumer<string, unknown>(
591+
async (event) => {
592+
await setTimeout(1); // simulate some processing time
593+
const firstRecord = event.records[0];
594+
if (firstRecord) {
595+
return firstRecord.value;
596+
}
597+
return undefined;
598+
},
599+
{
600+
value: {
601+
type: SchemaType.JSON,
602+
},
603+
}
604+
);
605+
606+
// Act
607+
const result = await handler(event, context);
608+
609+
// Assess
610+
expect(result).toBeNull();
611+
});
584612
});

0 commit comments

Comments
 (0)