From 5236155c1f7bb3afd57b569b1b3249311a37507f Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 08:13:20 +0900 Subject: [PATCH 01/21] add data for tumbling window --- .../data_classes/kinesis_stream_event.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 6b189f937fd..dccb1931dce 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -100,6 +100,18 @@ def kinesis(self) -> KinesisStreamRecordPayload: return KinesisStreamRecordPayload(self["kinesis"]) +class KinesisStreamWindow(DictWrapper): + @property + def start(self) -> str: + """The time window started""" + return self["start"] + + @property + def end(self) -> str: + """The time window will end""" + return self["end"] + + class KinesisStreamEvent(DictWrapper): """Kinesis stream event @@ -113,6 +125,30 @@ def records(self) -> Iterator[KinesisStreamRecord]: for record in self["Records"]: yield KinesisStreamRecord(record) + @property + def window(self) -> KinesisStreamWindow: + return KinesisStreamWindow(self["window"]) + + @property + def state(self) -> dict: + return self["state"] + + @property + def shard_id(self) -> str: + return self["shardId"] + + @property + def event_source_arn(self) -> str: + return self["eventSourceARN"] + + @property + def is_final_invoke_for_window(self) -> bool: + return self["isFinalInvokeForWindow"] + + @property + def is_window_terminated_early(self) -> bool: + return self["isWindowTerminatedEarly"] + def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]: return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records] From 8b79b7bb9150041fd0b5ec35e144b5779b5eb412 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 21:08:41 +0900 Subject: [PATCH 02/21] add unit test --- .../utilities/data_classes/kinesis_stream_event.py | 1 + tests/events/kinesisStreamEvent.json | 14 +++++++++++++- .../test_kinesis_stream_event.py | 7 +++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index dccb1931dce..d8fcf7ff649 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -118,6 +118,7 @@ class KinesisStreamEvent(DictWrapper): Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html + - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html """ @property diff --git a/tests/events/kinesisStreamEvent.json b/tests/events/kinesisStreamEvent.json index ef8e2096388..cf3a3415ef0 100644 --- a/tests/events/kinesisStreamEvent.json +++ b/tests/events/kinesisStreamEvent.json @@ -32,5 +32,17 @@ "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } - ] + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false } diff --git a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py index 5410ed81974..7d72442b0a2 100644 --- a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py @@ -41,6 +41,13 @@ def test_kinesis_stream_event(): assert kinesis.data_as_bytes() == b"Hello, this is a test." assert kinesis.data_as_text() == "Hello, this is a test." + assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.state == raw_event["state"] + assert parsed_event.shard_id == raw_event["shardId"] + assert parsed_event.event_source_arn == raw_event["eventSourceARN"] + assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"] + assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"] + def test_kinesis_stream_event_json_data(): json_value = {"test": "value"} From 81e8dfb10a28b903c614c0a456311bd2dc7267d5 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:01:06 +0900 Subject: [PATCH 03/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index d8fcf7ff649..d23f13714f1 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -135,7 +135,7 @@ def state(self) -> dict: return self["state"] @property - def shard_id(self) -> str: + def shard_id(self) -> str | None: return self["shardId"] @property From 39c1fda085ecc133df06dd1f8b8e1679e6d9ffd4 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:01:21 +0900 Subject: [PATCH 04/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index d23f13714f1..1b5ffafbb06 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -136,7 +136,7 @@ def state(self) -> dict: @property def shard_id(self) -> str | None: - return self["shardId"] + return self.get("shardId") @property def event_source_arn(self) -> str: From 32fc93be6a623c321396ce49a2135817ef0db711 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:01:31 +0900 Subject: [PATCH 05/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 1b5ffafbb06..9731a54ec6c 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -139,7 +139,7 @@ def shard_id(self) -> str | None: return self.get("shardId") @property - def event_source_arn(self) -> str: + def event_source_arn(self) -> str | None: return self["eventSourceARN"] @property From a7ea3e3ff39d87df53eb482dc5fa2d97ca718a39 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:01:40 +0900 Subject: [PATCH 06/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 9731a54ec6c..7effeb0b87d 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -140,7 +140,7 @@ def shard_id(self) -> str | None: @property def event_source_arn(self) -> str | None: - return self["eventSourceARN"] + return self.get("eventSourceARN") @property def is_final_invoke_for_window(self) -> bool: From bff39f0f47a616c932eedfb01f2b5eea9f3e607b Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:01:47 +0900 Subject: [PATCH 07/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 7effeb0b87d..34ec8bd28ae 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -143,7 +143,7 @@ def event_source_arn(self) -> str | None: return self.get("eventSourceARN") @property - def is_final_invoke_for_window(self) -> bool: + def is_final_invoke_for_window(self) -> bool | None: return self["isFinalInvokeForWindow"] @property From 523435cb3dfbdfc805ef6bbd694e72f11602c37f Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:02:01 +0900 Subject: [PATCH 08/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 34ec8bd28ae..69111964bca 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -131,7 +131,7 @@ def window(self) -> KinesisStreamWindow: return KinesisStreamWindow(self["window"]) @property - def state(self) -> dict: + def state(self) -> dict[str, Any]: return self["state"] @property From 95809ff2aa7c1cdd45fa4371537d17d37333226b Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:02:11 +0900 Subject: [PATCH 09/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 69111964bca..621ca412f7d 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -127,7 +127,7 @@ def records(self) -> Iterator[KinesisStreamRecord]: yield KinesisStreamRecord(record) @property - def window(self) -> KinesisStreamWindow: + def window(self) -> KinesisStreamWindow | None: return KinesisStreamWindow(self["window"]) @property From af4c5e9fc268dc1af9d72eaa0df2fd7837ad8019 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:02:32 +0900 Subject: [PATCH 10/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 621ca412f7d..a8a192f676d 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -144,7 +144,7 @@ def event_source_arn(self) -> str | None: @property def is_final_invoke_for_window(self) -> bool | None: - return self["isFinalInvokeForWindow"] + return self.get("isFinalInvokeForWindow") @property def is_window_terminated_early(self) -> bool: From 066cb2f708fcb7010358032aa84eab1484ef2e8c Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:02:44 +0900 Subject: [PATCH 11/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index a8a192f676d..210ecdcefac 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -147,7 +147,7 @@ def is_final_invoke_for_window(self) -> bool | None: return self.get("isFinalInvokeForWindow") @property - def is_window_terminated_early(self) -> bool: + def is_window_terminated_early(self) -> bool | None: return self["isWindowTerminatedEarly"] From ac03550e5e3b5b41043e6acaf93b2fbc12fc137b Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:02:57 +0900 Subject: [PATCH 12/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 210ecdcefac..030fd30caf0 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -148,7 +148,7 @@ def is_final_invoke_for_window(self) -> bool | None: @property def is_window_terminated_early(self) -> bool | None: - return self["isWindowTerminatedEarly"] + return self.get("isWindowTerminatedEarly") def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]: From 44589bbb9b02940392101b090fe1349fada9b731 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:03:13 +0900 Subject: [PATCH 13/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 030fd30caf0..36ee007d700 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -132,7 +132,7 @@ def window(self) -> KinesisStreamWindow | None: @property def state(self) -> dict[str, Any]: - return self["state"] + return self.get("state") or {} @property def shard_id(self) -> str | None: From eb4bef7a313da9bd97d4d156ae820ebf1f46d490 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:03:26 +0900 Subject: [PATCH 14/21] Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/kinesis_stream_event.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index 36ee007d700..ed3952a4398 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -128,7 +128,11 @@ def records(self) -> Iterator[KinesisStreamRecord]: @property def window(self) -> KinesisStreamWindow | None: - return KinesisStreamWindow(self["window"]) + window = self.get("window") + if window: + return KinesisStreamWindow(window) + + return window @property def state(self) -> dict[str, Any]: From 3f07f432302f78f583875472a68693f4509d5002 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:17:44 +0900 Subject: [PATCH 15/21] fix window function --- .../utilities/data_classes/kinesis_stream_event.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py index ed3952a4398..64fd26f3a30 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py @@ -3,7 +3,7 @@ import base64 import json import zlib -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import ( CloudWatchLogsDecodedData, @@ -128,10 +128,9 @@ def records(self) -> Iterator[KinesisStreamRecord]: @property def window(self) -> KinesisStreamWindow | None: - window = self.get("window") - if window: - return KinesisStreamWindow(window) - + window = self.get("window") + if window: + return KinesisStreamWindow(window) return window @property From a0adaeed470a9552959b56ad4629d555d8cef382 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:18:07 +0900 Subject: [PATCH 16/21] add test test_kinesis_stream_with_tumbling_window_event --- .../kinesisStreamTumblingWindowEvent.json | 33 ++++++++++++++++ .../test_kinesis_stream_event.py | 38 +++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 tests/events/kinesisStreamTumblingWindowEvent.json diff --git a/tests/events/kinesisStreamTumblingWindowEvent.json b/tests/events/kinesisStreamTumblingWindowEvent.json new file mode 100644 index 00000000000..0209525835c --- /dev/null +++ b/tests/events/kinesisStreamTumblingWindowEvent.json @@ -0,0 +1,33 @@ + +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", + "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "approximateArrivalTimestamp": 1607497475.000 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" + } + ], + "window": { + "start": "2020-12-09T07:04:00Z", + "end": "2020-12-09T07:06:00Z" + }, + "state": { + "1": 282, + "2": 715 + }, + "shardId": "shardId-000000000006", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py index 7d72442b0a2..e72766747f9 100644 --- a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py @@ -63,3 +63,41 @@ def test_kinesis_stream_event_cloudwatch_logs_data_extraction(): individual_logs = [extract_cloudwatch_logs_from_record(record) for record in event.records] assert len(extracted_logs) == len(individual_logs) + + +def test_kinesis_stream_with_tumbling_window_event(): + raw_event = load_event("kinesisStreamTumblingWindowEvent.json") + parsed_event = KinesisStreamEvent(raw_event) + + records = list(parsed_event.records) + assert len(records) == 1 + record = records[0] + + record_raw = raw_event["Records"][0] + + assert record.aws_region == record_raw["awsRegion"] + assert record.event_id == record_raw["eventID"] + assert record.event_name == record_raw["eventName"] + assert record.event_source == record_raw["eventSource"] + assert record.event_source_arn == record_raw["eventSourceARN"] + assert record.event_version == record_raw["eventVersion"] + assert record.invoke_identity_arn == record_raw["invokeIdentityArn"] + + kinesis = record.kinesis + kinesis_raw = raw_event["Records"][0]["kinesis"] + + assert kinesis.approximate_arrival_timestamp == kinesis_raw["approximateArrivalTimestamp"] + assert kinesis.data == kinesis_raw["data"] + assert kinesis.kinesis_schema_version == kinesis_raw["kinesisSchemaVersion"] + assert kinesis.partition_key == kinesis_raw["partitionKey"] + assert kinesis.sequence_number == kinesis_raw["sequenceNumber"] + + assert kinesis.data_as_bytes() == b"Hello, this is a test." + assert kinesis.data_as_text() == "Hello, this is a test." + + assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.state == raw_event["state"] + assert parsed_event.shard_id == raw_event["shardId"] + assert parsed_event.event_source_arn == raw_event["eventSourceARN"] + assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"] + assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"] From b018614ae2b88e558c01dba795d6e36eba39591d Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 22:41:51 +0900 Subject: [PATCH 17/21] add properties for tumbling window to dynamodb stream --- .../data_classes/dynamo_db_stream_event.py | 40 +++++++ .../dynamoStreamTumblingWindowEvent.json | 101 ++++++++++++++++++ .../test_dynamo_db_stream_event.py | 34 ++++++ 3 files changed, 175 insertions(+) create mode 100644 tests/events/dynamoStreamTumblingWindowEvent.json diff --git a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py index 8da2c983f88..33a45a72dee 100644 --- a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py @@ -140,12 +140,25 @@ def user_identity(self) -> dict: return self.get("userIdentity") or {} +class KinesisStreamWindow(DictWrapper): + @property + def start(self) -> str: + """The time window started""" + return self["start"] + + @property + def end(self) -> str: + """The time window will end""" + return self["end"] + + class DynamoDBStreamEvent(DictWrapper): """Dynamo DB Stream Event Documentation: ------------- - https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html + - https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-windows.html Example ------- @@ -167,3 +180,30 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext): def records(self) -> Iterator[DynamoDBRecord]: for record in self["Records"]: yield DynamoDBRecord(record) + + @property + def window(self) -> KinesisStreamWindow | None: + window = self.get("window") + if window: + return KinesisStreamWindow(window) + return window + + @property + def state(self) -> dict[str, Any]: + return self.get("state") or {} + + @property + def shard_id(self) -> str | None: + return self.get("shardId") + + @property + def event_source_arn(self) -> str | None: + return self.get("eventSourceARN") + + @property + def is_final_invoke_for_window(self) -> bool | None: + return self.get("isFinalInvokeForWindow") + + @property + def is_window_terminated_early(self) -> bool | None: + return self.get("isWindowTerminatedEarly") diff --git a/tests/events/dynamoStreamTumblingWindowEvent.json b/tests/events/dynamoStreamTumblingWindowEvent.json new file mode 100644 index 00000000000..035d08978e9 --- /dev/null +++ b/tests/events/dynamoStreamTumblingWindowEvent.json @@ -0,0 +1,101 @@ +{ + "Records": [ + { + "eventID": "1", + "eventName": "INSERT", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "111", + "SizeBytes": 26, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "2", + "eventName": "MODIFY", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "222", + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + }, + { + "eventID": "3", + "eventName": "REMOVE", + "eventVersion": "1.0", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "OldImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "333", + "SizeBytes": 38, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "eventSourceARN": "stream-ARN" + } + ], + "window": { + "start": "2020-07-30T17:00:00Z", + "end": "2020-07-30T17:05:00Z" + }, + "state": { + "1": "state1" + }, + "shardId": "shard123456789", + "eventSourceARN": "stream-ARN", + "isFinalInvokeForWindow": false, + "isWindowTerminatedEarly": false +} diff --git a/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py b/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py index 8c6b62867ae..050560d4b93 100644 --- a/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py @@ -46,6 +46,40 @@ def test_dynamodb_stream_trigger_event(): assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES +def test_dynamodb_stream_trigger_with_tumbling_window_event(): + raw_event = load_event("dynamoStreamTumblingWindowEvent.json") + parsed_event = DynamoDBStreamEvent(raw_event) + + records = list(parsed_event.records) + + record = records[0] + record_raw = raw_event["Records"][0] + assert record.aws_region == record_raw["awsRegion"] + assert record.event_id == record_raw["eventID"] + assert record.event_name is DynamoDBRecordEventName.INSERT + assert record.event_source == record_raw["eventSource"] + assert record.event_source_arn == record_raw["eventSourceARN"] + assert record.event_version == record_raw["eventVersion"] + assert record.user_identity == {} + dynamodb = record.dynamodb + assert dynamodb is not None + keys = dynamodb.keys + assert keys is not None + assert keys["Id"] == DECIMAL_CONTEXT.create_decimal(101) + assert dynamodb.new_image.get("Message") == record_raw["dynamodb"]["NewImage"]["Message"]["S"] + assert dynamodb.old_image == {} + assert dynamodb.sequence_number == record_raw["dynamodb"]["SequenceNumber"] + assert dynamodb.size_bytes == record_raw["dynamodb"]["SizeBytes"] + assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES + + assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.state == raw_event["state"] + assert parsed_event.shard_id == raw_event["shardId"] + assert parsed_event.event_source_arn == raw_event["eventSourceARN"] + assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"] + assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"] + + def test_dynamodb_stream_record_deserialization_large_int(): data = { "Keys": {"key1": {"attr1": "value1"}}, From 41adc0bf63312ffaee426cf0ebbd1ea4ecc32fa7 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 23:03:10 +0900 Subject: [PATCH 18/21] Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/dynamo_db_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py index 33a45a72dee..0ada725a083 100644 --- a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py @@ -140,7 +140,7 @@ def user_identity(self) -> dict: return self.get("userIdentity") or {} -class KinesisStreamWindow(DictWrapper): +class DynamoDBStreamWindow(DictWrapper): @property def start(self) -> str: """The time window started""" From 58ab8f10fb6f58136996b83dc463ab4659ba41a0 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 23:03:19 +0900 Subject: [PATCH 19/21] Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/dynamo_db_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py index 0ada725a083..f509cc3ac9d 100644 --- a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py @@ -182,7 +182,7 @@ def records(self) -> Iterator[DynamoDBRecord]: yield DynamoDBRecord(record) @property - def window(self) -> KinesisStreamWindow | None: + def window(self) -> DynamoDBStreamWindow | None: window = self.get("window") if window: return KinesisStreamWindow(window) From d092a9ed3f4acdb3f7365a37529789245618c994 Mon Sep 17 00:00:00 2001 From: kiitosu Date: Tue, 13 May 2025 23:03:30 +0900 Subject: [PATCH 20/21] Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py Co-authored-by: Leandro Damascena Signed-off-by: kiitosu --- .../utilities/data_classes/dynamo_db_stream_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py index f509cc3ac9d..d8efbccbb61 100644 --- a/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py +++ b/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py @@ -185,7 +185,7 @@ def records(self) -> Iterator[DynamoDBRecord]: def window(self) -> DynamoDBStreamWindow | None: window = self.get("window") if window: - return KinesisStreamWindow(window) + return DynamoDBStreamWindow(window) return window @property From fb77d5c272789e6cf200206cd8adaee6521dd394 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Wed, 14 May 2025 14:28:43 +0100 Subject: [PATCH 21/21] Converage --- .../required_dependencies/test_dynamo_db_stream_event.py | 2 ++ .../required_dependencies/test_kinesis_stream_event.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py b/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py index 050560d4b93..02fdab9582e 100644 --- a/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py @@ -73,6 +73,8 @@ def test_dynamodb_stream_trigger_with_tumbling_window_event(): assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.window.start == raw_event["window"]["start"] + assert parsed_event.window.end == raw_event["window"]["end"] assert parsed_event.state == raw_event["state"] assert parsed_event.shard_id == raw_event["shardId"] assert parsed_event.event_source_arn == raw_event["eventSourceARN"] diff --git a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py index e72766747f9..2eab5fe90fe 100644 --- a/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py +++ b/tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py @@ -96,6 +96,8 @@ def test_kinesis_stream_with_tumbling_window_event(): assert kinesis.data_as_text() == "Hello, this is a test." assert parsed_event.window.raw_event == raw_event["window"] + assert parsed_event.window.start == raw_event["window"]["start"] + assert parsed_event.window.end == raw_event["window"]["end"] assert parsed_event.state == raw_event["state"] assert parsed_event.shard_id == raw_event["shardId"] assert parsed_event.event_source_arn == raw_event["eventSourceARN"]