diff --git a/aws_lambda_powertools/utilities/data_masking/base.py b/aws_lambda_powertools/utilities/data_masking/base.py index 9b80e50bd58..00650789696 100644 --- a/aws_lambda_powertools/utilities/data_masking/base.py +++ b/aws_lambda_powertools/utilities/data_masking/base.py @@ -3,6 +3,7 @@ import functools import logging import warnings +from copy import deepcopy from typing import TYPE_CHECKING, Any, Callable, Mapping, Sequence, overload from jsonpath_ng.ext import parse @@ -94,8 +95,42 @@ def erase(self, data: tuple, fields: list[str]) -> tuple[str]: ... @overload def erase(self, data: dict, fields: list[str]) -> dict: ... - def erase(self, data: Sequence | Mapping, fields: list[str] | None = None) -> str | list[str] | tuple[str] | dict: - return self._apply_action(data=data, fields=fields, action=self.provider.erase) + @overload + def erase(self, data: dict[Any, Any], *, masking_rules: dict[str, object]) -> dict[Any, Any]: ... + + @overload + def erase( + self, + data: dict, + fields: list[str], + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, + ) -> dict: ... + + def erase( + self, + data: Sequence | Mapping, + fields: list[str] | None = None, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, + masking_rules: dict | None = None, + ) -> str | list[str] | tuple[str] | dict: + if masking_rules: + return self._apply_masking_rules(data, masking_rules) + else: + return self._apply_action( + data=data, + fields=fields, + action=self.provider.erase, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + ) def _apply_action( self, @@ -103,6 +138,10 @@ def _apply_action( fields: list[str] | None, action: Callable, provider_options: dict | None = None, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, **encryption_context: str, ): """ @@ -136,11 +175,23 @@ def _apply_action( fields=fields, action=action, provider_options=provider_options, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, **encryption_context, ) else: logger.debug(f"Running action {action.__name__} with the entire data") - return action(data=data, provider_options=provider_options, **encryption_context) + return action( + data=data, + provider_options=provider_options, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + **encryption_context, + ) def _apply_action_to_fields( self, @@ -148,6 +199,10 @@ def _apply_action_to_fields( fields: list, action: Callable, provider_options: dict | None = None, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, **encryption_context: str, ) -> dict | str: """ @@ -194,6 +249,8 @@ def _apply_action_to_fields( new_dict = {'a': {'b': {'c': '*****'}}, 'x': {'y': '*****'}} ``` """ + if not fields: + raise ValueError("Fields parameter cannot be empty") data_parsed: dict = self._normalize_data_to_parse(fields, data) @@ -204,6 +261,10 @@ def _apply_action_to_fields( self._call_action, action=action, provider_options=provider_options, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, **encryption_context, # type: ignore[arg-type] ) @@ -225,12 +286,6 @@ def _apply_action_to_fields( # For in-place updates, json_parse accepts a callback function # that receives 3 args: field_value, fields, field_name # We create a partial callback to pre-populate known provider options (action, provider opts, enc ctx) - update_callback = functools.partial( - self._call_action, - action=action, - provider_options=provider_options, - **encryption_context, # type: ignore[arg-type] - ) json_parse.update( data_parsed, @@ -239,6 +294,55 @@ def _apply_action_to_fields( return data_parsed + def _apply_masking_rules(self, data: dict, masking_rules: dict) -> dict: + """ + Apply masking rules to data, supporting both simple field names and complex path expressions. + + Args: + data: The dictionary containing data to mask + masking_rules: Dictionary mapping field names or path expressions to masking rules + + Returns: + dict: The masked data dictionary + """ + result = deepcopy(data) + + for path, rule in masking_rules.items(): + try: + jsonpath_expr = parse(f"$.{path}") + matches = jsonpath_expr.find(result) + + if not matches: + warnings.warn(f"No matches found for path: {path}", stacklevel=2) + continue + + for match in matches: + try: + value = match.value + if value is not None: + masked_value = self.provider.erase(str(value), **rule) + match.full_path.update(result, masked_value) + + except Exception as e: + warnings.warn(f"Error masking value for path {path}: {str(e)}", stacklevel=2) + continue + + except Exception as e: + warnings.warn(f"Error processing path {path}: {str(e)}", stacklevel=2) + continue + + return result + + def _mask_nested_field(self, data: dict, field_path: str, mask_function): + keys = field_path.split(".") + current = data + for key in keys[:-1]: + current = current.get(key, {}) + if not isinstance(current, dict): + return + if keys[-1] in current: + current[keys[-1]] = mask_function(current[keys[-1]]) + @staticmethod def _call_action( field_value: Any, @@ -246,6 +350,10 @@ def _call_action( field_name: str, action: Callable, provider_options: dict[str, Any] | None = None, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, **encryption_context, ) -> None: """ @@ -263,7 +371,15 @@ def _call_action( Returns: - fields[field_name]: Returns the processed field value """ - fields[field_name] = action(field_value, provider_options=provider_options, **encryption_context) + fields[field_name] = action( + field_value, + provider_options=provider_options, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + **encryption_context, + ) return fields[field_name] def _normalize_data_to_parse(self, fields: list, data: str | dict) -> dict: diff --git a/aws_lambda_powertools/utilities/data_masking/provider/base.py b/aws_lambda_powertools/utilities/data_masking/provider/base.py index 28bc8384f8d..02e6406b862 100644 --- a/aws_lambda_powertools/utilities/data_masking/provider/base.py +++ b/aws_lambda_powertools/utilities/data_masking/provider/base.py @@ -2,10 +2,14 @@ import functools import json -from typing import Any, Callable, Iterable +import re +from typing import Any, Callable from aws_lambda_powertools.utilities.data_masking.constants import DATA_MASKING_STRING +PRESERVE_CHARS = set("-_. ") +_regex_cache = {} + class BaseProvider: """ @@ -63,19 +67,142 @@ def decrypt(self, data, provider_options: dict | None = None, **encryption_conte """ raise NotImplementedError("Subclasses must implement decrypt()") - def erase(self, data, **kwargs) -> Iterable[str]: - """ - This method irreversibly erases data. + def erase( + self, + data: Any, + dynamic_mask: bool | None = None, + custom_mask: str | None = None, + regex_pattern: str | None = None, + mask_format: str | None = None, + masking_rules: dict | None = None, + **kwargs, + ) -> Any: + + result = DATA_MASKING_STRING + + if not any([dynamic_mask, custom_mask, regex_pattern, mask_format, masking_rules]): + if isinstance(data, (str, int, float, dict, bytes)): + return DATA_MASKING_STRING + elif isinstance(data, (list, tuple, set)): + return type(data)([DATA_MASKING_STRING] * len(data)) + else: + return DATA_MASKING_STRING + + if isinstance(data, (str, int, float)): + result = self._mask_primitive(str(data), dynamic_mask, custom_mask, regex_pattern, mask_format, **kwargs) + elif isinstance(data, dict): + result = self._mask_dict( + data, + dynamic_mask, + custom_mask, + regex_pattern, + mask_format, + masking_rules, + **kwargs, + ) + elif isinstance(data, (list, tuple, set)): + result = self._mask_iterable( + data, + dynamic_mask, + custom_mask, + regex_pattern, + mask_format, + masking_rules, + **kwargs, + ) + + return result + + def _mask_primitive( + self, + data: str, + dynamic_mask: bool | None, + custom_mask: str | None, + regex_pattern: str | None, + mask_format: str | None, + **kwargs, + ) -> str: + if regex_pattern and mask_format: + return self._regex_mask(data, regex_pattern, mask_format) + elif custom_mask: + return self._pattern_mask(data, custom_mask) + elif dynamic_mask: + return self._custom_erase(data, **kwargs) + else: + return DATA_MASKING_STRING - If the data to be erased is of type `str`, `dict`, or `bytes`, - this method will return an erased string, i.e. "*****". + def _mask_dict( + self, + data: dict, + dynamic_mask: bool | None, + custom_mask: str | None, + regex_pattern: str | None, + mask_format: str | None, + masking_rules: dict | None, + **kwargs, + ) -> dict: + if masking_rules: + return self._apply_masking_rules(data, masking_rules) + else: + return { + k: self.erase( + v, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + masking_rules=masking_rules, + **kwargs, + ) + for k, v in data.items() + } + + def _mask_iterable( + self, + data: list | tuple | set, + dynamic_mask: bool | None, + custom_mask: str | None, + regex_pattern: str | None, + mask_format: str | None, + masking_rules: dict | None, + **kwargs, + ) -> list | tuple | set: + masked_data = [ + self.erase( + item, + dynamic_mask=dynamic_mask, + custom_mask=custom_mask, + regex_pattern=regex_pattern, + mask_format=mask_format, + masking_rules=masking_rules, + **kwargs, + ) + for item in data + ] + return type(data)(masked_data) + + def _apply_masking_rules(self, data: dict, masking_rules: dict) -> Any: + """Apply masking rules to dictionary data.""" + return { + key: self.erase(str(value), **masking_rules[key]) if key in masking_rules else str(value) + for key, value in data.items() + } - If the data to be erased is of an iterable type like `list`, `tuple`, - or `set`, this method will return a new object of the same type as the - input data but with each element replaced by the string "*****". - """ - if isinstance(data, (str, dict, bytes)): - return DATA_MASKING_STRING - elif isinstance(data, (list, tuple, set)): - return type(data)([DATA_MASKING_STRING] * len(data)) - return DATA_MASKING_STRING + def _pattern_mask(self, data: str, pattern: str) -> str: + """Apply pattern masking to string data.""" + return pattern[: len(data)] if len(pattern) >= len(data) else pattern + + def _regex_mask(self, data: str, regex_pattern: str, mask_format: str) -> str: + """Apply regex masking to string data.""" + try: + if regex_pattern not in _regex_cache: + _regex_cache[regex_pattern] = re.compile(regex_pattern) + return _regex_cache[regex_pattern].sub(mask_format, data) + except re.error: + return data + + def _custom_erase(self, data: str, **kwargs) -> str: + if not data: + return "" + + return "".join("*" if char not in PRESERVE_CHARS else char for char in data) diff --git a/docs/utilities/data_masking.md b/docs/utilities/data_masking.md index 162292e79a0..94e470aa965 100644 --- a/docs/utilities/data_masking.md +++ b/docs/utilities/data_masking.md @@ -43,7 +43,7 @@ stateDiagram-v2 ## Terminology -**Erasing** replaces sensitive information **irreversibly** with a non-sensitive placeholder _(`*****`)_. This operation replaces data in-memory, making it a one-way action. +**Erasing** replaces sensitive information **irreversibly** with a non-sensitive placeholder _(`*****`)_, or with a customized mask. This operation replaces data in-memory, making it a one-way action. **Encrypting** transforms plaintext into ciphertext using an encryption algorithm and a cryptographic key. It allows you to encrypt any sensitive data, so only allowed personnel to decrypt it. Learn more about encryption [here](https://aws.amazon.com/blogs/security/importance-of-encryption-and-how-aws-can-help/){target="_blank"}. @@ -117,6 +117,52 @@ Erasing will remove the original data and replace it with a `*****`. This means --8<-- "examples/data_masking/src/getting_started_erase_data_output.json" ``` +#### Custom masking + +The `erase` method also supports additional flags for more advanced and flexible masking: + +=== "dynamic_mask" + + (bool) Enables dynamic masking behavior when set to `True`, by maintaining the original length and structure of the text replacing with *. + + > Expression: `data_masker.erase(data, fields=["address.zip"], dynamic_mask=True)` + + > Field result: `'street': '*** **** **'` + +=== "custom_mask" + + (str) Specifies a simple pattern for masking data. This pattern is applied directly to the input string, replacing all the original characters. For example, with a `custom_mask` of "XX-XX" applied to "12345", the result would be "XX-XX". + + > Expression: `data_masker.erase(data, fields=["address.zip"], custom_mask="XX")` + + > Field result: `'zip': 'XX'` + +=== "regex_pattern & mask_format" + + (str) `regex_pattern` defines a regular expression pattern used to identify parts of the input string that should be masked. This allows for more complex and flexible masking rules. It's used in conjunction with `mask_format`. + `mask_format` specifies the format to use when replacing parts of the string matched by `regex_pattern`. It can include placeholders (like \1, \2) to refer to captured groups in the regex pattern, allowing some parts of the original string to be preserved. + + > Expression: `data_masker.erase(data, fields=["email"], regex_pattern=r"(.)(.*)(@.*)", mask_format=r"\1****\3")` + + > Field result: `'email': 'j****@example.com'` + +=== "masking_rules" + + (dict) Allows you to apply different masking rules (flags) for each data field. + ```python hl_lines="20" + --8<-- "examples/data_masking/src/custom_data_masking.py" + ``` +=== "Input example" + + ```json + --8<-- "examples/data_masking/src/payload_custom_masking.json" + ``` +=== "Masking rules output example" + + ```json hl_lines="4 5 10 21" + --8<-- "examples/data_masking/src/output_custom_masking.json" + ``` + ### Encrypting data ???+ note "About static typing and encryption" diff --git a/examples/data_masking/src/custom_data_masking.py b/examples/data_masking/src/custom_data_masking.py new file mode 100644 index 00000000000..7b96f6f379f --- /dev/null +++ b/examples/data_masking/src/custom_data_masking.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from aws_lambda_powertools.utilities.data_masking import DataMasking +from aws_lambda_powertools.utilities.typing import LambdaContext + +data_masker = DataMasking() + + +def lambda_handler(event: dict, context: LambdaContext) -> dict: + data: dict = event.get("body", {}) + + # Masking rules for each field + masking_rules = { + "email": {"regex_pattern": "(.)(.*)(@.*)", "mask_format": r"\1****\3"}, + "age": {"dynamic_mask": True}, + "address.zip": {"custom_mask": "xxx"}, + "$.other_address[?(@.postcode > 12000)]": {"custom_mask": "Masked"}, + } + + result = data_masker.erase(data, masking_rules=masking_rules) + + return result diff --git a/examples/data_masking/src/output_custom_masking.json b/examples/data_masking/src/output_custom_masking.json new file mode 100644 index 00000000000..0571da99808 --- /dev/null +++ b/examples/data_masking/src/output_custom_masking.json @@ -0,0 +1,29 @@ +{ + "id": 1, + "name": "John Doe", + "age": "**", + "email": "j****@example.com", + "address": { + "street": "123 Main St", + "city": "Anytown", + "state": "CA", + "zip": "xxx", + "postcode": 12345, + "product": { + "name": "Car" + } + }, + "other_address": [ + { + "postcode": 11345, + "street": "123 Any Drive" + }, + "Masked" + ], + "company_address": { + "street": "456 ACME Ave", + "city": "Anytown", + "state": "CA", + "zip": "12345" + } +} \ No newline at end of file diff --git a/examples/data_masking/src/payload_custom_masking.json b/examples/data_masking/src/payload_custom_masking.json new file mode 100644 index 00000000000..d50b715ffa4 --- /dev/null +++ b/examples/data_masking/src/payload_custom_masking.json @@ -0,0 +1,34 @@ +{ + "body": { + "id": 1, + "name": "Jane Doe", + "age": 30, + "email": "janedoe@example.com", + "address": { + "street": "123 Main St", + "city": "Anytown", + "state": "CA", + "zip": "12345", + "postcode": 12345, + "product": { + "name": "Car" + } + }, + "other_address": [ + { + "postcode": 11345, + "street": "123 Any Drive" + }, + { + "postcode": 67890, + "street": "100 Main Street," + } + ], + "company_address": { + "street": "456 ACME Ave", + "city": "Anytown", + "state": "CA", + "zip": "12345" + } + } +} \ No newline at end of file diff --git a/tests/unit/data_masking/_aws_encryption_sdk/test_unit_data_masking.py b/tests/unit/data_masking/_aws_encryption_sdk/test_unit_data_masking.py index 4fbbc188ceb..93588445034 100644 --- a/tests/unit/data_masking/_aws_encryption_sdk/test_unit_data_masking.py +++ b/tests/unit/data_masking/_aws_encryption_sdk/test_unit_data_masking.py @@ -25,6 +25,16 @@ def test_erase_int(data_masker): assert erased_string == DATA_MASKING_STRING +def test_erase_int_custom_mask(data_masker): + # GIVEN an int data type + + # WHEN erase is called with no fields argument + erased_string = data_masker.erase(42, custom_mask="XX") + + # THEN the result is the data masked + assert erased_string == "XX" + + def test_erase_float(data_masker): # GIVEN a float data type @@ -205,3 +215,74 @@ def test_parsing_nonexistent_fields_warning_on_missing_field(): # THEN the "erased" payload is the same of the original assert masked_json_string == data + + +def test_regex_mask(data_masker): + # GIVEN a str data type + data = "Hello! My name is John Doe" + + # WHEN erase is called with regex pattern and mask format + regex_pattern = r"\b[A-Z][a-z]+ [A-Z][a-z]+\b" + mask_format = "XXXX XXXX" + + result = data_masker.erase(data, regex_pattern=regex_pattern, mask_format=mask_format) + + # THEN the result is the regex part masked by the masked format + assert result == "Hello! My name is XXXX XXXX" + + +def test_erase_json_dict_with_fields_and_masks(data_masker): + # GIVEN the data type is a json representation of a dictionary + data = json.dumps( + { + "a": { + "1": {"None": "hello", "four": "world"}, + "b": {"3": {"4": "goodbye", "e": "world"}}, + }, + }, + ) + + # WHEN erase is called with a list of fields specified + masked_json_string = data_masker.erase(data, fields=["a.'1'.None", "a..'4'"], dynamic_mask=True) + + # THEN the result is only the specified fields are erased + assert masked_json_string == { + "a": { + "1": {"None": "*****", "four": "world"}, + "b": {"3": {"4": "*******", "e": "world"}}, + }, + } + + +def test_erase_json_dict_with_complex_masking_rules(data_masker): + # GIVEN the data type is a json representation of a dictionary with nested and filtered paths + data = json.dumps( + { + "email": "john.doe@example.com", + "age": 30, + "addres": [ + {"postcode": 13000, "street": "123 Main St", "details": {"name": "Home", "type": "Primary"}}, + {"postcode": 14000, "street": "456 Other Street", "details": {"name": "Office", "type": "Secondary"}}, + ], + }, + ) + + # WHEN erase is called with complex masking rules + masking_rules = { + "email": {"regex_pattern": "(.)(.*)(@.*)", "mask_format": r"\1****\3"}, + "age": {"dynamic_mask": True}, + "addres..name": {"custom_mask": "xxx"}, + "addres[?(@.postcode > 12000)]": {"dynamic_mask": True}, + } + + masked_json_string = data_masker.erase(data, masking_rules=masking_rules) + + # THEN the result should have all specified fields masked according to their rules + assert masked_json_string == { + "email": "j****@example.com", + "age": "*****", + "addres": [ + {"postcode": "*****", "street": "*** *** **", "details": {"name": "xxx", "type": "*******"}}, + {"postcode": "*****", "street": "*** ***** ******", "details": {"name": "xxx", "type": "********"}}, + ], + }