Skip to content

Commit

Permalink
feat(cli): make deprecations, renames easier to notice (datahub-proje…
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Feb 17, 2023
1 parent aa388f0 commit 5690560
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
source:
type: "file"
config:
filename: "./examples/demo_data/demo_data.json"
type: "demo-data"
config: {}

sink:
type: "datahub-rest"
Expand Down
2 changes: 0 additions & 2 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None:
f"timed out with {e} waiting for version stats to be computed... skipping ahead."
)

sys.exit(ret)

# main function begins
logger.info("DataHub CLI version: %s", datahub_package.nice_version_name())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
import pydantic

from datahub.configuration.common import ConfigurationWarning
from datahub.utilities.global_warning_util import add_global_warning


def pydantic_field_deprecated(field: str, message: Optional[str] = None) -> classmethod:
def pydantic_field_deprecated(
field: str, new_field: Optional[str] = None, message: Optional[str] = None
) -> classmethod:
if message:
output = message
else:
output = f"{field} is deprecated and will be removed in a future release. Please remove it from your config."

def _validate_deprecated(cls: Type, values: dict) -> dict:
if field in values:
if field in values and new_field not in values:
add_global_warning(output)
warnings.warn(output, ConfigurationWarning, stacklevel=2)
return values

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class EnvBasedSourceConfigBase(ConfigModel):

_env_deprecation = pydantic_field_deprecated(
"env",
"env is deprecated and will be removed in a future release. Please use platform_instance instead.",
new_field="platform_instance",
message="env is deprecated and will be removed in a future release. Please use platform_instance instead.",
)

@validator("env")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pydantic

from datahub.configuration.common import ConfigurationWarning
from datahub.utilities.global_warning_util import add_global_warning

_T = TypeVar("_T")

Expand All @@ -26,8 +27,10 @@ def _validate_field_rename(cls: Type, values: dict) -> dict:
)
else:
if print_warning:
msg = f"{old_name} is deprecated, please use {new_name} instead."
add_global_warning(msg)
warnings.warn(
f"{old_name} is deprecated, please use {new_name} instead.",
msg,
ConfigurationWarning,
stacklevel=2,
)
Expand Down
35 changes: 26 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.telemetry import stats, telemetry
from datahub.utilities.global_warning_util import get_global_warnings
from datahub.utilities.lossy_collections import LossyDict, LossyList

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -450,18 +451,22 @@ def raise_from_status(self, raise_warnings: bool = False) -> None:
)
if self.sink.get_report().failures:
raise PipelineExecutionError("Sink reported errors", self.sink.get_report())
if raise_warnings and (
self.source.get_report().warnings or self.sink.get_report().warnings
):
raise PipelineExecutionError(
"Source reported warnings", self.source.get_report()
)
if raise_warnings:
if self.source.get_report().warnings:
raise PipelineExecutionError(
"Source reported warnings", self.source.get_report()
)
if self.sink.get_report().warnings:
raise PipelineExecutionError(
"Sink reported warnings", self.sink.get_report()
)

def log_ingestion_stats(self) -> None:
source_failures = self._approx_all_vals(self.source.get_report().failures)
source_warnings = self._approx_all_vals(self.source.get_report().warnings)
sink_failures = len(self.sink.get_report().failures)
sink_warnings = len(self.sink.get_report().warnings)
global_warnings = len(get_global_warnings())

telemetry.telemetry_instance.ping(
"ingest_stats",
Expand All @@ -475,8 +480,11 @@ def log_ingestion_stats(self) -> None:
"source_warnings": stats.discretize(source_warnings),
"sink_failures": stats.discretize(sink_failures),
"sink_warnings": stats.discretize(sink_warnings),
"global_warnings": global_warnings,
"failures": stats.discretize(source_failures + sink_failures),
"warnings": stats.discretize(source_warnings + sink_warnings),
"warnings": stats.discretize(
source_warnings + sink_warnings + global_warnings
),
},
self.ctx.graph,
)
Expand Down Expand Up @@ -508,6 +516,10 @@ def pretty_print_summary(
click.echo(self.source.get_report().as_string())
click.secho(f"Sink ({self.config.sink.type}) report:", bold=True)
click.echo(self.sink.get_report().as_string())
global_warnings = get_global_warnings()
if len(global_warnings) > 0:
click.secho("Global Warnings:", bold=True)
click.echo(global_warnings)
click.echo()
workunits_produced = self.source.get_report().events_produced
duration_message = f"in {humanfriendly.format_timespan(self.source.get_report().running_time)}."
Expand All @@ -527,11 +539,16 @@ def pretty_print_summary(
bold=True,
)
return 1
elif self.source.get_report().warnings or self.sink.get_report().warnings:
elif (
self.source.get_report().warnings
or self.sink.get_report().warnings
or len(global_warnings) > 0
):
num_warn_source = self._approx_all_vals(self.source.get_report().warnings)
num_warn_sink = len(self.sink.get_report().warnings)
num_warn_global = len(global_warnings)
click.secho(
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_warn_source+num_warn_sink} warnings{' so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_warn_source+num_warn_sink+num_warn_global} warnings{' so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
fg=self._get_text_color(
running=currently_running, failures=False, warnings=True
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class BasicSQLAlchemyConfig(SQLAlchemyConfig):

_database_alias_deprecation = pydantic_field_deprecated(
"database_alias",
new_field="platform_instance",
message="database_alias is deprecated. Use platform_instance instead.",
)

Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/utilities/global_warning_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import List

global_warnings: List = []


def add_global_warning(warn: str) -> None:
global_warnings.append(warn)


def get_global_warnings() -> List:
return global_warnings
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def _ensure_ingestion_source_present(

if num_execs is not None:
ingestion_source = res_data["data"]["ingestionSource"]
assert ingestion_source["executions"]["total"] == num_execs
assert ingestion_source["executions"]["total"] >= num_execs

return res_data

Expand Down

0 comments on commit 5690560

Please sign in to comment.