Skip to content

Commit

Permalink
fix(ingest/bigquery): Lineage edges use datetime with timezone; corre…
Browse files Browse the repository at this point in the history
…ctly parse last_altered (datahub-project#7762)
  • Loading branch information
treff7es authored Apr 6, 2023
1 parent 7ec1daa commit 29d2492
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import traceback
from collections import defaultdict
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Dict, Iterable, List, Optional, Set, Tuple, Type, Union, cast

from google.cloud import bigquery
Expand Down Expand Up @@ -684,7 +684,7 @@ def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
lineage[view] = {
LineageEdge(
table=table,
auditStamp=datetime.now(),
auditStamp=datetime.now(timezone.utc),
type=DatasetLineageTypeClass.VIEW,
)
for table in upstream_tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,11 @@ def get_views_for_dataset(
BigqueryView(
name=table.table_name,
created=table.created,
last_altered=table.get("last_altered", table.created),
last_altered=datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
)
if table.get("last_altered") is not None
else table.created,
comment=table.comment,
view_definition=table.view_definition,
materialized=table.table_type == BigqueryTableType.MATERIALIZED_VIEW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import textwrap
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union

import humanfriendly
Expand Down Expand Up @@ -330,7 +330,6 @@ def _get_bigquery_log_entries(
def _get_exported_bigquery_audit_metadata(
self, bigquery_client: BigQueryClient, limit: Optional[int] = None
) -> Iterable[BigQueryAuditMetadata]:

if self.config.bigquery_audit_metadata_datasets is None:
self.error(
logger, "audit-metadata", "bigquery_audit_metadata_datasets not set"
Expand Down Expand Up @@ -458,7 +457,9 @@ def _create_lineage_map(
lineage_map[destination_table_str].add(
LineageEdge(
table=str(ref_table),
auditStamp=e.end_time if e.end_time else datetime.now(),
auditStamp=e.end_time
if e.end_time
else datetime.now(tz=timezone.utc),
)
)
has_table = True
Expand All @@ -468,7 +469,9 @@ def _create_lineage_map(
lineage_map[destination_table_str].add(
LineageEdge(
table=str(ref_view),
auditStamp=e.end_time if e.end_time else datetime.now(),
auditStamp=e.end_time
if e.end_time
else datetime.now(tz=timezone.utc),
)
)
has_view = True
Expand Down
98 changes: 91 additions & 7 deletions metadata-ingestion/tests/unit/test_bigquery_lineage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import datetime
from typing import Dict, List, Set

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigQueryTableRef,
QueryEvent,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryView
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.lineage import (
BigqueryLineageExtractor,
LineageEdge,
)


def test_parse_view_lineage():
Expand All @@ -21,8 +29,8 @@ def test_parse_view_lineage():
"""
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
created=datetime.datetime.now(tz=datetime.timezone.utc),
last_altered=datetime.datetime.now(tz=datetime.timezone.utc),
comment="",
view_definition=ddl,
)
Expand All @@ -40,8 +48,8 @@ def test_parse_view_lineage_with_two_part_table_name():
ddl = "CREATE VIEW my_view as select * from some_dataset.sometable as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
created=datetime.datetime.now(tz=datetime.timezone.utc),
last_altered=datetime.datetime.now(tz=datetime.timezone.utc),
comment="",
view_definition=ddl,
)
Expand All @@ -59,8 +67,8 @@ def test_one_part_table():
ddl = "CREATE VIEW my_view as select * from sometable as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
created=datetime.datetime.now(tz=datetime.timezone.utc),
last_altered=datetime.datetime.now(tz=datetime.timezone.utc),
comment="",
view_definition=ddl,
)
Expand Down Expand Up @@ -90,3 +98,79 @@ def test_create_statement_with_multiple_table():
assert 2 == len(tables)
assert "my_project_2.my_dataset_2.sometable" == tables[0].get_table_name()
assert "my_project_2.my_dataset_2.sometable2" == tables[1].get_table_name()


def test_lineage_with_timestamps():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report)
lineage_entries: List[QueryEvent] = [
QueryEvent(
timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
actor_email="[email protected]",
query="testQuery",
statementType="SELECT",
project_id="proj_12344",
end_time=None,
referencedTables=[
BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_source_table1"
),
BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_source_table2"
),
],
destinationTable=BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
),
),
QueryEvent(
timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
actor_email="[email protected]",
query="testQuery",
statementType="SELECT",
project_id="proj_12344",
end_time=datetime.datetime.fromtimestamp(
1617295943.17321, tz=datetime.timezone.utc
),
referencedTables=[
BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_source_table3"
),
],
destinationTable=BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
),
),
QueryEvent(
timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
actor_email="[email protected]",
query="testQuery",
statementType="SELECT",
project_id="proj_12344",
referencedViews=[
BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_source_view1"
),
],
destinationTable=BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
),
),
]

bq_table = BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
)

lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map(
iter(lineage_entries)
)

upstream_lineage = extractor.get_lineage_for_table(
bq_table=bq_table,
lineage_metadata=lineage_map,
platform="bigquery",
)
assert upstream_lineage
assert len(upstream_lineage[0].upstreams) == 4
13 changes: 8 additions & 5 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Any, Dict, Optional, cast
from unittest.mock import patch
from unittest.mock import Mock, patch

import pytest
from google.cloud.bigquery.table import Row, TableListItem
Expand Down Expand Up @@ -424,7 +424,7 @@ def create_row(d: Dict[str, Any]) -> Row:


@pytest.fixture
def bigquery_view_1():
def bigquery_view_1() -> BigqueryView:
now = datetime.now(tz=timezone.utc)
return BigqueryView(
name="table1",
Expand All @@ -437,7 +437,7 @@ def bigquery_view_1():


@pytest.fixture
def bigquery_view_2():
def bigquery_view_2() -> BigqueryView:
now = datetime.now(tz=timezone.utc)
return BigqueryView(
name="table2",
Expand All @@ -454,8 +454,11 @@ def bigquery_view_2():
)
@patch("google.cloud.bigquery.client.Client")
def test_get_views_for_dataset(
client_mock, query_mock, bigquery_view_1, bigquery_view_2
):
client_mock: Mock,
query_mock: Mock,
bigquery_view_1: BigqueryView,
bigquery_view_2: BigqueryView,
) -> None:
row1 = create_row(
dict(
table_name=bigquery_view_1.name,
Expand Down

0 comments on commit 29d2492

Please sign in to comment.