Skip to content

Commit

Permalink
Add memory_logs_enabled param (ray-project#284)
Browse files Browse the repository at this point in the history
* Add memory_logs_enabled param

* Add higher-order conditional logging function

* Create new DeltaCATLoggerAdapter class

---------

Co-authored-by: Kevin Yan <[email protected]>
  • Loading branch information
yankevn and Kevin Yan authored Apr 2, 2024
1 parent 4db4716 commit a305f86
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 8 deletions.
10 changes: 10 additions & 0 deletions deltacat/compute/compactor/model/compact_partition_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def of(params: Optional[Dict]) -> CompactPartitionParams:
result.drop_duplicates = params.get("drop_duplicates", DROP_DUPLICATES)
result.ray_custom_resources = params.get("ray_custom_resources")

result.memory_logs_enabled = params.get("memory_logs_enabled", False)

result.metrics_config = params.get("metrics_config")

if not importlib.util.find_spec("memray"):
Expand Down Expand Up @@ -355,6 +357,14 @@ def sort_keys(self) -> Optional[List[SortKey]]:
def sort_keys(self, keys: List[SortKey]) -> None:
self["sort_keys"] = keys

@property
def memory_logs_enabled(self) -> bool:
return self.get("memory_logs_enabled")

@memory_logs_enabled.setter
def memory_logs_enabled(self, value: bool) -> None:
self["memory_logs_enabled"] = value

@property
def metrics_config(self) -> Optional[MetricsConfig]:
return self.get("metrics_config")
Expand Down
4 changes: 4 additions & 0 deletions deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def _execute_compaction(
average_record_size_bytes=params.average_record_size_bytes,
primary_keys=params.primary_keys,
ray_custom_resources=params.ray_custom_resources,
memory_logs_enabled=params.memory_logs_enabled,
)

total_input_records_count = np.int64(0)
Expand Down Expand Up @@ -296,6 +297,7 @@ def hash_bucket_input_provider(index, item):
object_store=params.object_store,
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
memory_logs_enabled=params.memory_logs_enabled,
)
}

Expand Down Expand Up @@ -388,6 +390,7 @@ def hash_bucket_input_provider(index, item):
deltacat_storage=params.deltacat_storage,
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
ray_custom_resources=params.ray_custom_resources,
memory_logs_enabled=params.memory_logs_enabled,
)

def merge_input_provider(index, item):
Expand Down Expand Up @@ -417,6 +420,7 @@ def merge_input_provider(index, item):
deltacat_storage_kwargs=params.deltacat_storage_kwargs,
delete_strategy=delete_strategy,
delete_file_envelopes=delete_file_envelopes,
memory_logs_enabled=params.memory_logs_enabled,
)
}

Expand Down
6 changes: 6 additions & 0 deletions deltacat/compute/compactor_v2/model/hash_bucket_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def of(
object_store: Optional[IObjectStore] = None,
deltacat_storage=unimplemented_deltacat_storage,
deltacat_storage_kwargs: Optional[Dict[str, Any]] = None,
memory_logs_enabled: Optional[bool] = None,
) -> HashBucketInput:

result = HashBucketInput()
Expand All @@ -36,6 +37,7 @@ def of(
result["object_store"] = object_store
result["deltacat_storage"] = deltacat_storage
result["deltacat_storage_kwargs"] = deltacat_storage_kwargs or {}
result["memory_logs_enabled"] = memory_logs_enabled

return result

Expand Down Expand Up @@ -82,3 +84,7 @@ def deltacat_storage(self) -> unimplemented_deltacat_storage:
@property
def deltacat_storage_kwargs(self) -> Optional[Dict[str, Any]]:
return self.get("deltacat_storage_kwargs")

@property
def memory_logs_enabled(self) -> Optional[bool]:
return self.get("memory_logs_enabled")
6 changes: 6 additions & 0 deletions deltacat/compute/compactor_v2/model/merge_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def of(
delete_file_envelopes: Optional[List] = None,
deltacat_storage=unimplemented_deltacat_storage,
deltacat_storage_kwargs: Optional[Dict[str, Any]] = None,
memory_logs_enabled: Optional[bool] = None,
) -> MergeInput:

result = MergeInput()
Expand All @@ -67,6 +68,7 @@ def of(
result["delete_strategy"] = delete_strategy
result["deltacat_storage"] = deltacat_storage
result["deltacat_storage_kwargs"] = deltacat_storage_kwargs or {}
result["memory_logs_enabled"] = memory_logs_enabled
return result

@property
Expand Down Expand Up @@ -133,6 +135,10 @@ def deltacat_storage(self) -> unimplemented_deltacat_storage:
def deltacat_storage_kwargs(self) -> Optional[Dict[str, Any]]:
return self.get("deltacat_storage_kwargs")

@property
def memory_logs_enabled(self) -> Optional[bool]:
return self.get("memory_logs_enabled")

@property
def delete_file_envelopes(
self,
Expand Down
3 changes: 2 additions & 1 deletion deltacat/compute/compactor_v2/steps/hash_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def log_peak_memory():
f"({process_util.max_memory/BYTES_PER_GIBIBYTE} GB)"
)

process_util.schedule_callback(log_peak_memory, 10)
if input.memory_logs_enabled:
process_util.schedule_callback(log_peak_memory, 10)

hash_bucket_result, duration = timed_invocation(
func=_timed_hash_bucket, input=input
Expand Down
3 changes: 2 additions & 1 deletion deltacat/compute/compactor_v2/steps/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ def log_peak_memory():
f"({process_util.max_memory/BYTES_PER_GIBIBYTE} GB)"
)

process_util.schedule_callback(log_peak_memory, 10)
if input.memory_logs_enabled:
process_util.schedule_callback(log_peak_memory, 10)

merge_result, duration = timed_invocation(func=_timed_merge, input=input)

Expand Down
12 changes: 8 additions & 4 deletions deltacat/compute/compactor_v2/utils/task_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def hash_bucket_resource_options_provider(
average_record_size_bytes: float,
primary_keys: List[str] = None,
ray_custom_resources: Optional[Dict] = None,
memory_logs_enabled: Optional[bool] = None,
**kwargs,
) -> Dict:
debug_memory_params = {"hash_bucket_task_index": index}
Expand Down Expand Up @@ -191,8 +192,9 @@ def hash_bucket_resource_options_provider(
# Consider buffer
total_memory = total_memory * (1 + TOTAL_MEMORY_BUFFER_PERCENTAGE / 100.0)
debug_memory_params["total_memory_with_buffer"] = total_memory
logger.debug(
f"[Hash bucket task {index}]: Params used for calculating hash bucketing memory: {debug_memory_params}"
logger.debug_conditional(
f"[Hash bucket task {index}]: Params used for calculating hash bucketing memory: {debug_memory_params}",
memory_logs_enabled,
)

return get_task_options(0.01, total_memory, ray_custom_resources)
Expand All @@ -210,6 +212,7 @@ def merge_resource_options_provider(
primary_keys: Optional[List[str]] = None,
deltacat_storage=unimplemented_deltacat_storage,
deltacat_storage_kwargs: Optional[Dict] = {},
memory_logs_enabled: Optional[bool] = None,
**kwargs,
) -> Dict:
debug_memory_params = {"merge_task_index": index}
Expand Down Expand Up @@ -298,8 +301,9 @@ def merge_resource_options_provider(

total_memory = total_memory * (1 + TOTAL_MEMORY_BUFFER_PERCENTAGE / 100.0)
debug_memory_params["total_memory_with_buffer"] = total_memory
logger.debug(
f"[Merge task {index}]: Params used for calculating merge memory: {debug_memory_params}"
logger.debug_conditional(
f"[Merge task {index}]: Params used for calculating merge memory: {debug_memory_params}",
memory_logs_enabled,
)

return get_task_options(0.01, total_memory, ray_custom_resources)
31 changes: 29 additions & 2 deletions deltacat/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import pathlib
from logging import FileHandler, Handler, Logger, LoggerAdapter, handlers
from typing import Union
from typing import Any, Dict, Optional, Union

import ray
from ray.runtime_context import RuntimeContext
Expand All @@ -26,7 +26,32 @@
DEFAULT_BACKUP_COUNT = 0


class RayRuntimeContextLoggerAdapter(logging.LoggerAdapter):
class DeltaCATLoggerAdapter(logging.LoggerAdapter):
"""
Logger Adapter class with additional functionality
"""

def __init__(self, logger: Logger, extra: Optional[Dict[str, Any]] = {}):
super().__init__(logger, extra)

def debug_conditional(self, msg, do_print: bool, *args, **kwargs):
if do_print:
self.debug(msg, *args, **kwargs)

def info_conditional(self, msg, do_print: bool, *args, **kwargs):
if do_print:
self.info(msg, *args, **kwargs)

def warning_conditional(self, msg, do_print: bool, *args, **kwargs):
if do_print:
self.warning(msg, *args, **kwargs)

def error_conditional(self, msg, do_print: bool, *args, **kwargs):
if do_print:
self.error(msg, *args, **kwargs)


class RayRuntimeContextLoggerAdapter(DeltaCATLoggerAdapter):
"""
Logger Adapter for injecting Ray Runtime Context into logging messages.
"""
Expand Down Expand Up @@ -147,6 +172,8 @@ def _configure_logger(
ray_runtime_ctx = ray.get_runtime_context()
if ray_runtime_ctx.worker.connected:
logger = RayRuntimeContextLoggerAdapter(logger, ray_runtime_ctx)
else:
logger = DeltaCATLoggerAdapter(logger)

return logger

Expand Down
5 changes: 5 additions & 0 deletions deltacat/tests/compute/test_compact_partition_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def setUpClass(cls):
"partitionValues": [],
"partitionId": "79612ea39ac5493eae925abe60767d42",
},
"memory_logs_enabled": True,
"metrics_config": MetricsConfig("us-east-1", MetricsTarget.CLOUDWATCH_EMF),
}

Expand Down Expand Up @@ -135,6 +136,10 @@ def test_serialize_returns_json_string_with_all_fields(self):
json.loads(serialized_params)["destination_partition_locator"]
== params.destination_partition_locator
)
assert (
json.loads(serialized_params)["memory_logs_enabled"]
== params.memory_logs_enabled
)
assert (
json.loads(serialized_params)["metrics_config"]["metrics_target"]
== params.metrics_config.metrics_target
Expand Down

0 comments on commit a305f86

Please sign in to comment.