From e46324b8e7a51070d68bedd87f43138e75ce0f2b Mon Sep 17 00:00:00 2001 From: kiitosu Date: Thu, 15 May 2025 17:51:00 +0900 Subject: [PATCH 1/4] update comment --- packages/parser/src/schemas/kinesis.ts | 40 ++++++++++++-------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 8e21724305..c6dffe3074 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -70,37 +70,33 @@ const KinesisDynamoDBStreamSchema = z.object({ * "partitionKey": "1", * "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", * "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", - * "approximateArrivalTimestamp": 1545084650.987 + * "approximateArrivalTimestamp": 1607497475.000 * }, * "eventSource": "aws:kinesis", * "eventVersion": "1.0", * "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", * "eventName": "aws:kinesis:record", - * "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", - * "awsRegion": "us-east-2", - * "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" - * }, - * { - * "kinesis": { - * "kinesisSchemaVersion": "1.0", - * "partitionKey": "1", - * "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", - * "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", - * "approximateArrivalTimestamp": 1545084711.166 - * }, - * "eventSource": "aws:kinesis", - * "eventVersion": "1.0", - * "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", - * "eventName": "aws:kinesis:record", - * "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", - * "awsRegion": "us-east-2", - * "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + * "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + * "awsRegion": "us-east-1", + * "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" * } - * ] + * ], + * "window": { + * "start": "2020-12-09T07:04:00Z", + * "end": "2020-12-09T07:06:00Z" + * }, + * "state": { + * "1": 282, + * "2": 715 + * }, + * "shardId": "shardId-000000000006", + * "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + * "isFinalInvokeForWindow": false, + * "isWindowTerminatedEarly": false * } *``` * @see {@link types.KinesisDataStreamEvent | KinesisDataStreamEvent} - * @see {@link https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-event-example} + * @see {@link https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html#streams-tumbling-processing} * */ const KinesisDataStreamSchema = z.object({ From 39093e2d847715da75dd97f0552206821a770877 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Thu, 15 May 2025 18:25:31 +0900 Subject: [PATCH 2/4] add parameters to kinesis stream --- packages/parser/src/schemas/kinesis.ts | 17 ++++++++++ .../kinesis/stream-tumbling-window.json | 32 +++++++++++++++++++ .../parser/tests/unit/schema/kinesis.test.ts | 31 ++++++++++++++++++ 3 files changed, 80 insertions(+) create mode 100644 packages/parser/tests/events/kinesis/stream-tumbling-window.json diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index c6dffe3074..2e4e7a7c20 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -22,6 +22,23 @@ const KinesisDataStreamRecordPayload = z.object({ }), }); +interface KinesisStreamWindow { + start: string; + end: string; +} + +type KinesisStreamState = Record; + +interface KinesisStreamEvent { + Records: Array>; + window?: KinesisStreamWindow; + state?: KinesisStreamState; + shardId?: string; + eventSourceARN?: string; + isFinalInvokeForWindow?: boolean; + isWindowTerminatedEarly?: boolean; +} + const decompress = (data: string): string => { try { return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); diff --git a/packages/parser/tests/events/kinesis/stream-tumbling-window.json b/packages/parser/tests/events/kinesis/stream-tumbling-window.json new file mode 100644 index 0000000000..a1737b7a32 --- /dev/null +++ b/packages/parser/tests/events/kinesis/stream-tumbling-window.json @@ -0,0 +1,32 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.0 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index 94f7bc944a..57711afb70 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -51,6 +51,13 @@ describe('Schema: Kinesis', () => { } ); + const kinesisStreamTumblingWindowEvent = getTestEvent( + { + eventsPath, + filename: 'stream-tumbling-window', + } + ); + it('parses kinesis event', () => { // Prepare const testEvent = structuredClone(kinesisStreamEvent); @@ -167,6 +174,30 @@ describe('Schema: Kinesis', () => { expect(parsed).toStrictEqual(transformedInput); }); + it('parses Kinesis event with tumbling window', () => { + // Prepare + const testEvent = structuredClone(kinesisStreamTumblingWindowEvent); + + + // Act + const parsed = KinesisDataStreamSchema.parse(testEvent); + + const transformedInput = { + Records: testEvent.Records.map((record, index) => { + return { + ...record, + kinesis: { + ...record.kinesis, + data: Buffer.from(record.kinesis.data, 'base64').toString(), + }, + }; + }), + }; + + // Assess + expect(parsed).toStrictEqual(transformedInput); + }); + it('throws if cannot parse SQS record of KinesisFirehoseSqsRecord', () => { // Prepare const testEvent = getTestEvent({ From 5e45e1cb9e4ab5465faf465a0be67c46ca03f2b9 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Thu, 15 May 2025 19:20:35 +0900 Subject: [PATCH 3/4] add parameters to dynamodb stream --- packages/parser/src/schemas/dynamodb.ts | 169 +++++++++++------- .../events/dynamodb/tumbling-window.json | 102 +++++++++++ .../parser/tests/unit/schema/dynamodb.test.ts | 94 ++++++++++ 3 files changed, 304 insertions(+), 61 deletions(-) create mode 100644 packages/parser/tests/events/dynamodb/tumbling-window.json diff --git a/packages/parser/src/schemas/dynamodb.ts b/packages/parser/src/schemas/dynamodb.ts index 00170ac672..bf9da60f82 100644 --- a/packages/parser/src/schemas/dynamodb.ts +++ b/packages/parser/src/schemas/dynamodb.ts @@ -151,69 +151,105 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({ * @example * ```json * { - * "Records": [ - * { - * "eventID": "1", - * "eventVersion": "1.0", - * "dynamodb": { - * "ApproximateCreationDateTime": 1693997155.0, - * "Keys": { - * "Id": { - * "N": "101" - * } - * }, - * "NewImage": { - * "Message": { - * "S": "New item!" - * }, - * "Id": { - * "N": "101" - * } - * }, - * "StreamViewType": "NEW_AND_OLD_IMAGES", - * "SequenceNumber": "111", - * "SizeBytes": 26 + * "Records":[ + * { + * "eventID":"1", + * "eventName":"INSERT", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "NewImage":{ + * "Message":{ + * "S":"New item!" + * }, + * "Id":{ + * "N":"101" + * } + * }, + * "SequenceNumber":"111", + * "SizeBytes":26, + * "StreamViewType":"NEW_AND_OLD_IMAGES" + * }, + * "eventSourceARN":"stream-ARN" * }, - * "awsRegion": "us-west-2", - * "eventName": "INSERT", - * "eventSourceARN": "eventsource_arn", - * "eventSource": "aws:dynamodb" - * }, - * { - * "eventID": "2", - * "eventVersion": "1.0", - * "dynamodb": { - * "OldImage": { - * "Message": { - * "S": "New item!" - * }, - * "Id": { - * "N": "101" - * } - * }, - * "SequenceNumber": "222", - * "Keys": { - * "Id": { - * "N": "101" - * } - * }, - * "SizeBytes": 59, - * "NewImage": { - * "Message": { - * "S": "This item has changed" - * }, - * "Id": { - * "N": "101" - * } - * }, - * "StreamViewType": "NEW_AND_OLD_IMAGES" + * { + * "eventID":"2", + * "eventName":"MODIFY", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "NewImage":{ + * "Message":{ + * "S":"This item has changed" + * }, + * "Id":{ + * "N":"101" + * } + * }, + * "OldImage":{ + * "Message":{ + * "S":"New item!" + * }, + * "Id":{ + * "N":"101" + * } + * }, + * "SequenceNumber":"222", + * "SizeBytes":59, + * "StreamViewType":"NEW_AND_OLD_IMAGES" + * }, + * "eventSourceARN":"stream-ARN" * }, - * "awsRegion": "us-west-2", - * "eventName": "MODIFY", - * "eventSourceARN": "source_arn", - * "eventSource": "aws:dynamodb" - * } - * ] + * { + * "eventID":"3", + * "eventName":"REMOVE", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "OldImage":{ + * "Message":{ + * "S":"This item has changed" + * }, + * "Id":{ + * "N":"101" + * } + * }, + * "SequenceNumber":"333", + * "SizeBytes":38, + * "StreamViewType":"NEW_AND_OLD_IMAGES" + * }, + * "eventSourceARN":"stream-ARN" + * } + * ], + * "window": { + * "start": "2020-07-30T17:00:00Z", + * "end": "2020-07-30T17:05:00Z" + * }, + * "state": { + * "1": "state1" + * }, + * "shardId": "shard123456789", + * "eventSourceARN": "stream-ARN", + * "isFinalInvokeForWindow": false, + * "isWindowTerminatedEarly": false * } * ``` * @@ -222,6 +258,17 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({ */ const DynamoDBStreamSchema = z.object({ Records: z.array(DynamoDBStreamRecord).min(1), + window: z + .object({ + start: z.string(), + end: z.string(), + }) + .optional(), + state: z.record(z.string(), z.string()).optional(), + shardId: z.string().optional(), + eventSourceARN: z.string().optional(), + isFinalInvokeForWindow: z.boolean().optional(), + isWindowTerminatedEarly: z.boolean().optional(), }); export { diff --git a/packages/parser/tests/events/dynamodb/tumbling-window.json b/packages/parser/tests/events/dynamodb/tumbling-window.json new file mode 100644 index 0000000000..b4d72b87b8 --- /dev/null +++ b/packages/parser/tests/events/dynamodb/tumbling-window.json @@ -0,0 +1,102 @@ + +{ + "Records":[ + { + "eventID":"1", + "eventName":"INSERT", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "awsRegion":"us-east-1", + "dynamodb":{ + "Keys":{ + "Id":{ + "N":"101" + } + }, + "NewImage":{ + "Message":{ + "S":"New item!" + }, + "Id":{ + "N":"101" + } + }, + "SequenceNumber":"111", + "SizeBytes":26, + "StreamViewType":"NEW_AND_OLD_IMAGES" + }, + "eventSourceARN":"stream-ARN" + }, + { + "eventID":"2", + "eventName":"MODIFY", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "awsRegion":"us-east-1", + "dynamodb":{ + "Keys":{ + "Id":{ + "N":"101" + } + }, + "NewImage":{ + "Message":{ + "S":"This item has changed" + }, + "Id":{ + "N":"101" + } + }, + "OldImage":{ + "Message":{ + "S":"New item!" + }, + "Id":{ + "N":"101" + } + }, + "SequenceNumber":"222", + "SizeBytes":59, + "StreamViewType":"NEW_AND_OLD_IMAGES" + }, + "eventSourceARN":"stream-ARN" + }, + { + "eventID":"3", + "eventName":"REMOVE", + "eventVersion":"1.0", + "eventSource":"aws:dynamodb", + "awsRegion":"us-east-1", + "dynamodb":{ + "Keys":{ + "Id":{ + "N":"101" + } + }, + "OldImage":{ + "Message":{ + "S":"This item has changed" + }, + "Id":{ + "N":"101" + } + }, + "SequenceNumber":"333", + "SizeBytes":38, + "StreamViewType":"NEW_AND_OLD_IMAGES" + }, + "eventSourceARN":"stream-ARN" + } + ], + "window": { + "start": "2020-07-30T17:00:00Z", + "end": "2020-07-30T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/packages/parser/tests/unit/schema/dynamodb.test.ts b/packages/parser/tests/unit/schema/dynamodb.test.ts index 62b4ab8d06..48b596aa11 100644 --- a/packages/parser/tests/unit/schema/dynamodb.test.ts +++ b/packages/parser/tests/unit/schema/dynamodb.test.ts @@ -9,6 +9,11 @@ describe('Schema: DynamoDB', () => { filename: 'base', }); + const tumblingWindowEvent = getTestEvent({ + eventsPath: 'dynamodb', + filename: 'tumbling-window', + }); + it('parses a DynamoDB Stream event', () => { // Prepare const event = structuredClone(baseEvent); @@ -112,4 +117,93 @@ describe('Schema: DynamoDB', () => { // Act & Assess expect(() => DynamoDBStreamSchema.parse(event)).toThrow(); }); + + it('parses a DynamoDB Stream with tumbling window event', () => { + // Prepare + const event = structuredClone(tumblingWindowEvent); + + // Act + const result = DynamoDBStreamSchema.parse(event); + + // Assess + expect(result).toStrictEqual({ + Records: [ + { + eventID: "1", + eventName: "INSERT", + eventVersion: "1.0", + eventSource: "aws:dynamodb", + awsRegion: "us-east-1", + dynamodb: { + Keys: { + Id: 101 + }, + NewImage: { + Message: "New item!", + Id: 101, + }, + SequenceNumber: "111", + SizeBytes: 26, + StreamViewType: "NEW_AND_OLD_IMAGES", + }, + eventSourceARN: "stream-ARN", + }, + { + eventID: "2", + eventName: "MODIFY", + eventVersion: "1.0", + eventSource: "aws:dynamodb", + awsRegion: "us-east-1", + dynamodb: { + Keys: { + Id: 101, + }, + NewImage: { + Message: "This item has changed", + Id: 101, + }, + OldImage: { + Message: "New item!", + Id: 101, + }, + SequenceNumber: "222", + SizeBytes: 59, + StreamViewType: "NEW_AND_OLD_IMAGES", + }, + eventSourceARN: "stream-ARN", + }, + { + eventID: "3", + eventName: "REMOVE", + eventVersion: "1.0", + eventSource: "aws:dynamodb", + awsRegion: "us-east-1", + dynamodb: { + Keys: { + Id: 101, + }, + OldImage: { + Message: "This item has changed", + Id: 101 + }, + SequenceNumber: "333", + SizeBytes: 38, + StreamViewType: "NEW_AND_OLD_IMAGES", + }, + eventSourceARN: "stream-ARN", + }, + ], + window: { + start: "2020-07-30T17:00:00Z", + end: "2020-07-30T17:05:00Z", + }, + state: { + "1": "state1", + }, + shardId: "shard123456789", + eventSourceARN: "stream-ARN", + isFinalInvokeForWindow: false, + isWindowTerminatedEarly: false, + }); + }); }); From 6039e6a476311cf55a53d304b13a6d9a48561afa Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Fri, 16 May 2025 16:49:17 +0200 Subject: [PATCH 4/4] chore: fix kinesis types + test + docstrings --- packages/parser/src/schemas/dynamodb.ts | 192 +++++++++--------- packages/parser/src/schemas/kinesis.ts | 28 +-- .../events/dynamodb/tumbling-window.json | 102 ---------- .../kinesis/stream-tumbling-window.json | 32 --- .../parser/tests/unit/schema/dynamodb.test.ts | 95 ++++----- .../parser/tests/unit/schema/kinesis.test.ts | 22 +- 6 files changed, 160 insertions(+), 311 deletions(-) delete mode 100644 packages/parser/tests/events/dynamodb/tumbling-window.json delete mode 100644 packages/parser/tests/events/kinesis/stream-tumbling-window.json diff --git a/packages/parser/src/schemas/dynamodb.ts b/packages/parser/src/schemas/dynamodb.ts index bf9da60f82..5d735ee5bf 100644 --- a/packages/parser/src/schemas/dynamodb.ts +++ b/packages/parser/src/schemas/dynamodb.ts @@ -151,105 +151,103 @@ const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({ * @example * ```json * { - * "Records":[ - * { - * "eventID":"1", - * "eventName":"INSERT", - * "eventVersion":"1.0", - * "eventSource":"aws:dynamodb", - * "awsRegion":"us-east-1", - * "dynamodb":{ - * "Keys":{ - * "Id":{ - * "N":"101" - * } - * }, - * "NewImage":{ - * "Message":{ - * "S":"New item!" - * }, - * "Id":{ - * "N":"101" - * } - * }, - * "SequenceNumber":"111", - * "SizeBytes":26, - * "StreamViewType":"NEW_AND_OLD_IMAGES" - * }, - * "eventSourceARN":"stream-ARN" + * "Records":[{ + * "eventID":"1", + * "eventName":"INSERT", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } * }, - * { - * "eventID":"2", - * "eventName":"MODIFY", - * "eventVersion":"1.0", - * "eventSource":"aws:dynamodb", - * "awsRegion":"us-east-1", - * "dynamodb":{ - * "Keys":{ - * "Id":{ - * "N":"101" - * } - * }, - * "NewImage":{ - * "Message":{ - * "S":"This item has changed" - * }, - * "Id":{ - * "N":"101" - * } - * }, - * "OldImage":{ - * "Message":{ - * "S":"New item!" - * }, - * "Id":{ - * "N":"101" - * } - * }, - * "SequenceNumber":"222", - * "SizeBytes":59, - * "StreamViewType":"NEW_AND_OLD_IMAGES" - * }, - * "eventSourceARN":"stream-ARN" + * "NewImage":{ + * "Message":{ + * "S":"New item!" + * }, + * "Id":{ + * "N":"101" + * } * }, - * { - * "eventID":"3", - * "eventName":"REMOVE", - * "eventVersion":"1.0", - * "eventSource":"aws:dynamodb", - * "awsRegion":"us-east-1", - * "dynamodb":{ - * "Keys":{ - * "Id":{ - * "N":"101" - * } - * }, - * "OldImage":{ - * "Message":{ - * "S":"This item has changed" - * }, - * "Id":{ - * "N":"101" - * } - * }, - * "SequenceNumber":"333", - * "SizeBytes":38, - * "StreamViewType":"NEW_AND_OLD_IMAGES" - * }, - * "eventSourceARN":"stream-ARN" - * } - * ], - * "window": { - * "start": "2020-07-30T17:00:00Z", - * "end": "2020-07-30T17:05:00Z" + * "SequenceNumber":"111", + * "SizeBytes":26, + * "StreamViewType":"NEW_AND_OLD_IMAGES" * }, - * "state": { - * "1": "state1" + * "eventSourceARN":"stream-ARN" + * }, + * { + * "eventID":"2", + * "eventName":"MODIFY", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "NewImage":{ + * "Message":{ + * "S":"This item has changed" + * }, + * "Id":{ + * "N":"101" + * } + * }, + * "OldImage":{ + * "Message":{ + * "S":"New item!" + * }, + * "Id":{ + * "N":"101" + * } + * }, + * "SequenceNumber":"222", + * "SizeBytes":59, + * "StreamViewType":"NEW_AND_OLD_IMAGES" + * }, + * "eventSourceARN":"stream-ARN" + * }, + * { + * "eventID":"3", + * "eventName":"REMOVE", + * "eventVersion":"1.0", + * "eventSource":"aws:dynamodb", + * "awsRegion":"us-east-1", + * "dynamodb":{ + * "Keys":{ + * "Id":{ + * "N":"101" + * } + * }, + * "OldImage":{ + * "Message":{ + * "S":"This item has changed" + * }, + * "Id":{ + * "N":"101" + * } + * }, + * "SequenceNumber":"333", + * "SizeBytes":38, + * "StreamViewType":"NEW_AND_OLD_IMAGES" * }, - * "shardId": "shard123456789", - * "eventSourceARN": "stream-ARN", - * "isFinalInvokeForWindow": false, - * "isWindowTerminatedEarly": false + * "eventSourceARN":"stream-ARN" + * }], + * "window": { + * "start": "2020-07-30T17:00:00Z", + * "end": "2020-07-30T17:05:00Z" + * }, + * "state": { + * "1": "state1" + * }, + * "shardId": "shard123456789", + * "eventSourceARN": "stream-ARN", + * "isFinalInvokeForWindow": false, + * "isWindowTerminatedEarly": false * } * ``` * @@ -260,8 +258,8 @@ const DynamoDBStreamSchema = z.object({ Records: z.array(DynamoDBStreamRecord).min(1), window: z .object({ - start: z.string(), - end: z.string(), + start: z.string().datetime(), + end: z.string().datetime(), }) .optional(), state: z.record(z.string(), z.string()).optional(), diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 2e4e7a7c20..9e0b8ba8e2 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -22,23 +22,6 @@ const KinesisDataStreamRecordPayload = z.object({ }), }); -interface KinesisStreamWindow { - start: string; - end: string; -} - -type KinesisStreamState = Record; - -interface KinesisStreamEvent { - Records: Array>; - window?: KinesisStreamWindow; - state?: KinesisStreamState; - shardId?: string; - eventSourceARN?: string; - isFinalInvokeForWindow?: boolean; - isWindowTerminatedEarly?: boolean; -} - const decompress = (data: string): string => { try { return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); @@ -118,6 +101,17 @@ const KinesisDynamoDBStreamSchema = z.object({ */ const KinesisDataStreamSchema = z.object({ Records: z.array(KinesisDataStreamRecord).min(1), + window: z + .object({ + start: z.string().datetime(), + end: z.string().datetime(), + }) + .optional(), + state: z.record(z.string(), z.unknown()).optional(), + shardId: z.string().optional(), + eventSourceARN: z.string().optional(), + isFinalInvokeForWindow: z.boolean().optional(), + isWindowTerminatedEarly: z.boolean().optional(), }); export { diff --git a/packages/parser/tests/events/dynamodb/tumbling-window.json b/packages/parser/tests/events/dynamodb/tumbling-window.json deleted file mode 100644 index b4d72b87b8..0000000000 --- a/packages/parser/tests/events/dynamodb/tumbling-window.json +++ /dev/null @@ -1,102 +0,0 @@ - -{ - "Records":[ - { - "eventID":"1", - "eventName":"INSERT", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "awsRegion":"us-east-1", - "dynamodb":{ - "Keys":{ - "Id":{ - "N":"101" - } - }, - "NewImage":{ - "Message":{ - "S":"New item!" - }, - "Id":{ - "N":"101" - } - }, - "SequenceNumber":"111", - "SizeBytes":26, - "StreamViewType":"NEW_AND_OLD_IMAGES" - }, - "eventSourceARN":"stream-ARN" - }, - { - "eventID":"2", - "eventName":"MODIFY", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "awsRegion":"us-east-1", - "dynamodb":{ - "Keys":{ - "Id":{ - "N":"101" - } - }, - "NewImage":{ - "Message":{ - "S":"This item has changed" - }, - "Id":{ - "N":"101" - } - }, - "OldImage":{ - "Message":{ - "S":"New item!" - }, - "Id":{ - "N":"101" - } - }, - "SequenceNumber":"222", - "SizeBytes":59, - "StreamViewType":"NEW_AND_OLD_IMAGES" - }, - "eventSourceARN":"stream-ARN" - }, - { - "eventID":"3", - "eventName":"REMOVE", - "eventVersion":"1.0", - "eventSource":"aws:dynamodb", - "awsRegion":"us-east-1", - "dynamodb":{ - "Keys":{ - "Id":{ - "N":"101" - } - }, - "OldImage":{ - "Message":{ - "S":"This item has changed" - }, - "Id":{ - "N":"101" - } - }, - "SequenceNumber":"333", - "SizeBytes":38, - "StreamViewType":"NEW_AND_OLD_IMAGES" - }, - "eventSourceARN":"stream-ARN" - } - ], - "window": { - "start": "2020-07-30T17:00:00Z", - "end": "2020-07-30T17:05:00Z" - }, - "state": { - "1": "state1" - }, - "shardId": "shard123456789", - "eventSourceARN": "stream-ARN", - "isFinalInvokeForWindow": false, - "isWindowTerminatedEarly": false -} diff --git a/packages/parser/tests/events/kinesis/stream-tumbling-window.json b/packages/parser/tests/events/kinesis/stream-tumbling-window.json deleted file mode 100644 index a1737b7a32..0000000000 --- a/packages/parser/tests/events/kinesis/stream-tumbling-window.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "Records": [ - { - "kinesis": { - "kinesisSchemaVersion": "1.0", - "partitionKey": "1", - "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", - "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", - "approximateArrivalTimestamp": 1607497475.0 - }, - "eventSource": "aws:kinesis", - "eventVersion": "1.0", - "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", - "eventName": "aws:kinesis:record", - "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", - "awsRegion": "us-east-1", - "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" - } - ], - "window": { - "start": "2020-12-09T07:04:00Z", - "end": "2020-12-09T07:06:00Z" - }, - "state": { - "1": 282, - "2": 715 - }, - "shardId": "shardId-000000000006", - "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", - "isFinalInvokeForWindow": false, - "isWindowTerminatedEarly": false -} diff --git a/packages/parser/tests/unit/schema/dynamodb.test.ts b/packages/parser/tests/unit/schema/dynamodb.test.ts index 48b596aa11..d10940b289 100644 --- a/packages/parser/tests/unit/schema/dynamodb.test.ts +++ b/packages/parser/tests/unit/schema/dynamodb.test.ts @@ -9,11 +9,6 @@ describe('Schema: DynamoDB', () => { filename: 'base', }); - const tumblingWindowEvent = getTestEvent({ - eventsPath: 'dynamodb', - filename: 'tumbling-window', - }); - it('parses a DynamoDB Stream event', () => { // Prepare const event = structuredClone(baseEvent); @@ -120,7 +115,18 @@ describe('Schema: DynamoDB', () => { it('parses a DynamoDB Stream with tumbling window event', () => { // Prepare - const event = structuredClone(tumblingWindowEvent); + const event = structuredClone(baseEvent); + event.window = { + start: '2020-07-30T17:00:00Z', + end: '2020-07-30T17:05:00Z', + }; + event.state = { + '1': 'state1', + }; + event.shardId = 'shard123456789'; + event.eventSourceARN = 'stream-ARN'; + event.isFinalInvokeForWindow = false; + event.isWindowTerminatedEarly = false; // Act const result = DynamoDBStreamSchema.parse(event); @@ -129,79 +135,60 @@ describe('Schema: DynamoDB', () => { expect(result).toStrictEqual({ Records: [ { - eventID: "1", - eventName: "INSERT", - eventVersion: "1.0", - eventSource: "aws:dynamodb", - awsRegion: "us-east-1", + eventID: '1', + eventVersion: '1.0', dynamodb: { + ApproximateCreationDateTime: 1693997155.0, Keys: { - Id: 101 + Id: 101, }, NewImage: { - Message: "New item!", + Message: 'New item!', Id: 101, }, - SequenceNumber: "111", + StreamViewType: 'NEW_IMAGE', + SequenceNumber: '111', SizeBytes: 26, - StreamViewType: "NEW_AND_OLD_IMAGES", }, - eventSourceARN: "stream-ARN", + awsRegion: 'us-west-2', + eventName: 'INSERT', + eventSourceARN: 'eventsource_arn', + eventSource: 'aws:dynamodb', }, { - eventID: "2", - eventName: "MODIFY", - eventVersion: "1.0", - eventSource: "aws:dynamodb", - awsRegion: "us-east-1", + eventID: '2', + eventVersion: '1.0', dynamodb: { - Keys: { - Id: 101, - }, - NewImage: { - Message: "This item has changed", - Id: 101, - }, OldImage: { - Message: "New item!", + Message: 'New item!', Id: 101, }, - SequenceNumber: "222", - SizeBytes: 59, - StreamViewType: "NEW_AND_OLD_IMAGES", - }, - eventSourceARN: "stream-ARN", - }, - { - eventID: "3", - eventName: "REMOVE", - eventVersion: "1.0", - eventSource: "aws:dynamodb", - awsRegion: "us-east-1", - dynamodb: { + SequenceNumber: '222', Keys: { Id: 101, }, - OldImage: { - Message: "This item has changed", - Id: 101 + SizeBytes: 59, + NewImage: { + Message: 'This item has changed', + Id: 101, }, - SequenceNumber: "333", - SizeBytes: 38, - StreamViewType: "NEW_AND_OLD_IMAGES", + StreamViewType: 'NEW_AND_OLD_IMAGES', }, - eventSourceARN: "stream-ARN", + awsRegion: 'us-west-2', + eventName: 'MODIFY', + eventSourceARN: 'source_arn', + eventSource: 'aws:dynamodb', }, ], window: { - start: "2020-07-30T17:00:00Z", - end: "2020-07-30T17:05:00Z", + start: '2020-07-30T17:00:00Z', + end: '2020-07-30T17:05:00Z', }, state: { - "1": "state1", + '1': 'state1', }, - shardId: "shard123456789", - eventSourceARN: "stream-ARN", + shardId: 'shard123456789', + eventSourceARN: 'stream-ARN', isFinalInvokeForWindow: false, isWindowTerminatedEarly: false, }); diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index 57711afb70..0f8817abf3 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -51,13 +51,6 @@ describe('Schema: Kinesis', () => { } ); - const kinesisStreamTumblingWindowEvent = getTestEvent( - { - eventsPath, - filename: 'stream-tumbling-window', - } - ); - it('parses kinesis event', () => { // Prepare const testEvent = structuredClone(kinesisStreamEvent); @@ -176,13 +169,24 @@ describe('Schema: Kinesis', () => { it('parses Kinesis event with tumbling window', () => { // Prepare - const testEvent = structuredClone(kinesisStreamTumblingWindowEvent); - + const testEvent = structuredClone(kinesisStreamEvent); + testEvent.window = { + start: '2020-07-30T17:00:00Z', + end: '2020-07-30T17:05:00Z', + }; + testEvent.state = { + '1': 'state1', + }; + testEvent.shardId = 'shard123456789'; + testEvent.eventSourceARN = 'stream-ARN'; + testEvent.isFinalInvokeForWindow = false; + testEvent.isWindowTerminatedEarly = false; // Act const parsed = KinesisDataStreamSchema.parse(testEvent); const transformedInput = { + ...testEvent, Records: testEvent.Records.map((record, index) => { return { ...record,