From a305f866a02cf535e7f451775f0ebc222ac8244b Mon Sep 17 00:00:00 2001 From: Kevin Yan <43934572+yankevn@users.noreply.github.com> Date: Tue, 2 Apr 2024 12:03:40 -0700 Subject: [PATCH] Add memory_logs_enabled param (#284) * Add memory_logs_enabled param * Add higher-order conditional logging function * Create new DeltaCATLoggerAdapter class --------- Co-authored-by: Kevin Yan --- .../model/compact_partition_params.py | 10 ++++++ .../compactor_v2/compaction_session.py | 4 +++ .../compactor_v2/model/hash_bucket_input.py | 6 ++++ .../compute/compactor_v2/model/merge_input.py | 6 ++++ .../compute/compactor_v2/steps/hash_bucket.py | 3 +- deltacat/compute/compactor_v2/steps/merge.py | 3 +- .../compactor_v2/utils/task_options.py | 12 ++++--- deltacat/logs.py | 31 +++++++++++++++++-- .../compute/test_compact_partition_params.py | 5 +++ 9 files changed, 72 insertions(+), 8 deletions(-) diff --git a/deltacat/compute/compactor/model/compact_partition_params.py b/deltacat/compute/compactor/model/compact_partition_params.py index 50799e7e..9fd62a3f 100644 --- a/deltacat/compute/compactor/model/compact_partition_params.py +++ b/deltacat/compute/compactor/model/compact_partition_params.py @@ -91,6 +91,8 @@ def of(params: Optional[Dict]) -> CompactPartitionParams: result.drop_duplicates = params.get("drop_duplicates", DROP_DUPLICATES) result.ray_custom_resources = params.get("ray_custom_resources") + result.memory_logs_enabled = params.get("memory_logs_enabled", False) + result.metrics_config = params.get("metrics_config") if not importlib.util.find_spec("memray"): @@ -355,6 +357,14 @@ def sort_keys(self) -> Optional[List[SortKey]]: def sort_keys(self, keys: List[SortKey]) -> None: self["sort_keys"] = keys + @property + def memory_logs_enabled(self) -> bool: + return self.get("memory_logs_enabled") + + @memory_logs_enabled.setter + def memory_logs_enabled(self, value: bool) -> None: + self["memory_logs_enabled"] = value + @property def metrics_config(self) -> Optional[MetricsConfig]: return self.get("metrics_config") diff --git a/deltacat/compute/compactor_v2/compaction_session.py b/deltacat/compute/compactor_v2/compaction_session.py index 930476cd..e7a36fcf 100644 --- a/deltacat/compute/compactor_v2/compaction_session.py +++ b/deltacat/compute/compactor_v2/compaction_session.py @@ -260,6 +260,7 @@ def _execute_compaction( average_record_size_bytes=params.average_record_size_bytes, primary_keys=params.primary_keys, ray_custom_resources=params.ray_custom_resources, + memory_logs_enabled=params.memory_logs_enabled, ) total_input_records_count = np.int64(0) @@ -296,6 +297,7 @@ def hash_bucket_input_provider(index, item): object_store=params.object_store, deltacat_storage=params.deltacat_storage, deltacat_storage_kwargs=params.deltacat_storage_kwargs, + memory_logs_enabled=params.memory_logs_enabled, ) } @@ -388,6 +390,7 @@ def hash_bucket_input_provider(index, item): deltacat_storage=params.deltacat_storage, deltacat_storage_kwargs=params.deltacat_storage_kwargs, ray_custom_resources=params.ray_custom_resources, + memory_logs_enabled=params.memory_logs_enabled, ) def merge_input_provider(index, item): @@ -417,6 +420,7 @@ def merge_input_provider(index, item): deltacat_storage_kwargs=params.deltacat_storage_kwargs, delete_strategy=delete_strategy, delete_file_envelopes=delete_file_envelopes, + memory_logs_enabled=params.memory_logs_enabled, ) } diff --git a/deltacat/compute/compactor_v2/model/hash_bucket_input.py b/deltacat/compute/compactor_v2/model/hash_bucket_input.py index ff2e6f5b..c25b065d 100644 --- a/deltacat/compute/compactor_v2/model/hash_bucket_input.py +++ b/deltacat/compute/compactor_v2/model/hash_bucket_input.py @@ -22,6 +22,7 @@ def of( object_store: Optional[IObjectStore] = None, deltacat_storage=unimplemented_deltacat_storage, deltacat_storage_kwargs: Optional[Dict[str, Any]] = None, + memory_logs_enabled: Optional[bool] = None, ) -> HashBucketInput: result = HashBucketInput() @@ -36,6 +37,7 @@ def of( result["object_store"] = object_store result["deltacat_storage"] = deltacat_storage result["deltacat_storage_kwargs"] = deltacat_storage_kwargs or {} + result["memory_logs_enabled"] = memory_logs_enabled return result @@ -82,3 +84,7 @@ def deltacat_storage(self) -> unimplemented_deltacat_storage: @property def deltacat_storage_kwargs(self) -> Optional[Dict[str, Any]]: return self.get("deltacat_storage_kwargs") + + @property + def memory_logs_enabled(self) -> Optional[bool]: + return self.get("memory_logs_enabled") diff --git a/deltacat/compute/compactor_v2/model/merge_input.py b/deltacat/compute/compactor_v2/model/merge_input.py index 3c896df1..27d9d389 100644 --- a/deltacat/compute/compactor_v2/model/merge_input.py +++ b/deltacat/compute/compactor_v2/model/merge_input.py @@ -46,6 +46,7 @@ def of( delete_file_envelopes: Optional[List] = None, deltacat_storage=unimplemented_deltacat_storage, deltacat_storage_kwargs: Optional[Dict[str, Any]] = None, + memory_logs_enabled: Optional[bool] = None, ) -> MergeInput: result = MergeInput() @@ -67,6 +68,7 @@ def of( result["delete_strategy"] = delete_strategy result["deltacat_storage"] = deltacat_storage result["deltacat_storage_kwargs"] = deltacat_storage_kwargs or {} + result["memory_logs_enabled"] = memory_logs_enabled return result @property @@ -133,6 +135,10 @@ def deltacat_storage(self) -> unimplemented_deltacat_storage: def deltacat_storage_kwargs(self) -> Optional[Dict[str, Any]]: return self.get("deltacat_storage_kwargs") + @property + def memory_logs_enabled(self) -> Optional[bool]: + return self.get("memory_logs_enabled") + @property def delete_file_envelopes( self, diff --git a/deltacat/compute/compactor_v2/steps/hash_bucket.py b/deltacat/compute/compactor_v2/steps/hash_bucket.py index 2ac0f3a3..4a787ef8 100644 --- a/deltacat/compute/compactor_v2/steps/hash_bucket.py +++ b/deltacat/compute/compactor_v2/steps/hash_bucket.py @@ -142,7 +142,8 @@ def log_peak_memory(): f"({process_util.max_memory/BYTES_PER_GIBIBYTE} GB)" ) - process_util.schedule_callback(log_peak_memory, 10) + if input.memory_logs_enabled: + process_util.schedule_callback(log_peak_memory, 10) hash_bucket_result, duration = timed_invocation( func=_timed_hash_bucket, input=input diff --git a/deltacat/compute/compactor_v2/steps/merge.py b/deltacat/compute/compactor_v2/steps/merge.py index dd63b0f5..2f2b18ca 100644 --- a/deltacat/compute/compactor_v2/steps/merge.py +++ b/deltacat/compute/compactor_v2/steps/merge.py @@ -548,7 +548,8 @@ def log_peak_memory(): f"({process_util.max_memory/BYTES_PER_GIBIBYTE} GB)" ) - process_util.schedule_callback(log_peak_memory, 10) + if input.memory_logs_enabled: + process_util.schedule_callback(log_peak_memory, 10) merge_result, duration = timed_invocation(func=_timed_merge, input=input) diff --git a/deltacat/compute/compactor_v2/utils/task_options.py b/deltacat/compute/compactor_v2/utils/task_options.py index fa26ba71..32a80e92 100644 --- a/deltacat/compute/compactor_v2/utils/task_options.py +++ b/deltacat/compute/compactor_v2/utils/task_options.py @@ -135,6 +135,7 @@ def hash_bucket_resource_options_provider( average_record_size_bytes: float, primary_keys: List[str] = None, ray_custom_resources: Optional[Dict] = None, + memory_logs_enabled: Optional[bool] = None, **kwargs, ) -> Dict: debug_memory_params = {"hash_bucket_task_index": index} @@ -191,8 +192,9 @@ def hash_bucket_resource_options_provider( # Consider buffer total_memory = total_memory * (1 + TOTAL_MEMORY_BUFFER_PERCENTAGE / 100.0) debug_memory_params["total_memory_with_buffer"] = total_memory - logger.debug( - f"[Hash bucket task {index}]: Params used for calculating hash bucketing memory: {debug_memory_params}" + logger.debug_conditional( + f"[Hash bucket task {index}]: Params used for calculating hash bucketing memory: {debug_memory_params}", + memory_logs_enabled, ) return get_task_options(0.01, total_memory, ray_custom_resources) @@ -210,6 +212,7 @@ def merge_resource_options_provider( primary_keys: Optional[List[str]] = None, deltacat_storage=unimplemented_deltacat_storage, deltacat_storage_kwargs: Optional[Dict] = {}, + memory_logs_enabled: Optional[bool] = None, **kwargs, ) -> Dict: debug_memory_params = {"merge_task_index": index} @@ -298,8 +301,9 @@ def merge_resource_options_provider( total_memory = total_memory * (1 + TOTAL_MEMORY_BUFFER_PERCENTAGE / 100.0) debug_memory_params["total_memory_with_buffer"] = total_memory - logger.debug( - f"[Merge task {index}]: Params used for calculating merge memory: {debug_memory_params}" + logger.debug_conditional( + f"[Merge task {index}]: Params used for calculating merge memory: {debug_memory_params}", + memory_logs_enabled, ) return get_task_options(0.01, total_memory, ray_custom_resources) diff --git a/deltacat/logs.py b/deltacat/logs.py index 4d1e2691..61ef877a 100644 --- a/deltacat/logs.py +++ b/deltacat/logs.py @@ -2,7 +2,7 @@ import os import pathlib from logging import FileHandler, Handler, Logger, LoggerAdapter, handlers -from typing import Union +from typing import Any, Dict, Optional, Union import ray from ray.runtime_context import RuntimeContext @@ -26,7 +26,32 @@ DEFAULT_BACKUP_COUNT = 0 -class RayRuntimeContextLoggerAdapter(logging.LoggerAdapter): +class DeltaCATLoggerAdapter(logging.LoggerAdapter): + """ + Logger Adapter class with additional functionality + """ + + def __init__(self, logger: Logger, extra: Optional[Dict[str, Any]] = {}): + super().__init__(logger, extra) + + def debug_conditional(self, msg, do_print: bool, *args, **kwargs): + if do_print: + self.debug(msg, *args, **kwargs) + + def info_conditional(self, msg, do_print: bool, *args, **kwargs): + if do_print: + self.info(msg, *args, **kwargs) + + def warning_conditional(self, msg, do_print: bool, *args, **kwargs): + if do_print: + self.warning(msg, *args, **kwargs) + + def error_conditional(self, msg, do_print: bool, *args, **kwargs): + if do_print: + self.error(msg, *args, **kwargs) + + +class RayRuntimeContextLoggerAdapter(DeltaCATLoggerAdapter): """ Logger Adapter for injecting Ray Runtime Context into logging messages. """ @@ -147,6 +172,8 @@ def _configure_logger( ray_runtime_ctx = ray.get_runtime_context() if ray_runtime_ctx.worker.connected: logger = RayRuntimeContextLoggerAdapter(logger, ray_runtime_ctx) + else: + logger = DeltaCATLoggerAdapter(logger) return logger diff --git a/deltacat/tests/compute/test_compact_partition_params.py b/deltacat/tests/compute/test_compact_partition_params.py index 1c330555..265b2786 100644 --- a/deltacat/tests/compute/test_compact_partition_params.py +++ b/deltacat/tests/compute/test_compact_partition_params.py @@ -72,6 +72,7 @@ def setUpClass(cls): "partitionValues": [], "partitionId": "79612ea39ac5493eae925abe60767d42", }, + "memory_logs_enabled": True, "metrics_config": MetricsConfig("us-east-1", MetricsTarget.CLOUDWATCH_EMF), } @@ -135,6 +136,10 @@ def test_serialize_returns_json_string_with_all_fields(self): json.loads(serialized_params)["destination_partition_locator"] == params.destination_partition_locator ) + assert ( + json.loads(serialized_params)["memory_logs_enabled"] + == params.memory_logs_enabled + ) assert ( json.loads(serialized_params)["metrics_config"]["metrics_target"] == params.metrics_config.metrics_target