Skip to content

Commit

Permalink
Fixed the Atlan integration glue db-schema switch (#2152)
Browse files Browse the repository at this point in the history
Fixed the Atlan integration glue db-schema switch
  • Loading branch information
tombaeyens authored Aug 21, 2024
1 parent 73f18da commit 88234fa
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 1 deletion.
11 changes: 10 additions & 1 deletion soda/atlan/soda/atlan/atlan_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def __init__(self, logs: Logs, plugin_name: str, plugin_yaml_files: list[YamlFil

def process_contract_results(self, contract_result: ContractResult) -> None:
error_messages: list[str] = []

atlan_is_glue: bool = contract_result.data_source_yaml_dict.get("atlan_is_glue")
is_glue: bool = isinstance(atlan_is_glue, bool) and atlan_is_glue

atlan_qualified_name: str = contract_result.data_source_yaml_dict.get("atlan_qualified_name")
if not isinstance(atlan_qualified_name, str):
error_messages.append("atlan_qualified_name is required in a data source configuration yaml")
Expand All @@ -31,7 +35,12 @@ def process_contract_results(self, contract_result: ContractResult) -> None:
error_messages.append("schema is required in the contract yaml")

dataset_name: str = contract_result.contract.dataset_name
dataset_atlan_qualified_name: str = f"{atlan_qualified_name}/{database_name}/{schema_name}/{dataset_name}"

dataset_atlan_qualified_name: str = (
f"{atlan_qualified_name}/{database_name}/{schema_name}/{dataset_name}"
if not is_glue
else f"{atlan_qualified_name}/AwsDataCatalog/{database_name}/{dataset_name}"
)

if error_messages:
error_messages_text = ", ".join(error_messages)
Expand Down
114 changes: 114 additions & 0 deletions soda/atlan/tests/atlan/test_spark_df_atlan_contract_push_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import annotations

from textwrap import dedent

import pytest
from dotenv import load_dotenv
from helpers.fixtures import project_root_dir
from helpers.test_table import TestTable
from pyspark.sql import SparkSession
from soda.execution.data_type import DataType
from spark_df_contract_data_source_test_helper import (
SparkDfContractDataSourceTestHelper,
)

from soda.contracts.contract_verification import ContractVerificationResult

contracts_spark_df_atlan_contract_test_table = TestTable(
name="contracts_spark_df_atlan_contract",
# fmt: off
columns=[
("id", DataType.TEXT),
("country", DataType.TEXT)
],
values=[
('1', 'US'),
('2', 'US'),
('3', 'BE'),
]
# fmt: on
)


class AtlanSparkDfContractDataSourceTestHelper(SparkDfContractDataSourceTestHelper):

def _create_contract_data_source_yaml_dict(self, database_name: str | None, schema_name: str | None) -> dict:
return {"atlan_qualified_name": "default/postgres/1718112025", "atlan_is_glue": True}


@pytest.mark.skip(
"Takes too long to be part of the local development test suite & depends on Atlan & Soda Cloud services"
)
def test_spark_df_atlan_contract_push_plugin():
load_dotenv(f"{project_root_dir}/.env", override=True)

contract_data_source_test_helper: AtlanSparkDfContractDataSourceTestHelper = (
AtlanSparkDfContractDataSourceTestHelper()
)
contract_data_source_test_helper.start_test_session()
exception: Exception | None = None
try:
unique_table_name: str = contract_data_source_test_helper.ensure_test_table(
contracts_spark_df_atlan_contract_test_table
)

spark_session: SparkSession = contract_data_source_test_helper.contract_data_source.spark_session
df = spark_session.sql(f"SELECT * FROM {unique_table_name}")
df.createOrReplaceTempView("students")

contract_yaml_str: str = dedent(
f"""
data_source: spark_ds
database: gluedb
dataset: students
columns:
- name: id
data_type: varchar
- name: country
data_type: varchar
"""
)

soda_cloud_yaml_str: str = dedent(
"""
api_key_id: ${DEV_SODADATA_IO_API_KEY_ID}
api_key_secret: ${DEV_SODADATA_IO_API_KEY_SECRET}
"""
)

atlan_yaml_str: str = dedent(
"""
plugin: atlan
atlan_api_key: ${ATLAN_API_KEY}
atlan_base_url: https://soda-partner.atlan.com
"""
)

contract_verification_result: ContractVerificationResult = (
contract_data_source_test_helper.create_test_verification_builder()
.with_contract_yaml_str(contract_yaml_str)
.with_soda_cloud_yaml_str(soda_cloud_yaml_str)
.with_plugin_yaml_str(atlan_yaml_str)
.execute()
)

contract_verification_result.assert_ok()

except Exception as e:
exception = e
finally:
contract_data_source_test_helper.end_test_session(exception=exception)

# data_source_yaml_str: str = dedent(
# """
# name: spark_ds
# type: spark_df
# atlan_qualified_name: default/postgres/1718112025
# connection:
# host: ${CONTRACTS_POSTGRES_HOST}
# database: ${CONTRACTS_POSTGRES_DATABASE}
# user: ${CONTRACTS_POSTGRES_USERNAME}
# password: ${CONTRACTS_POSTGRES_PASSWORD}
# schema: contracts
# """
# )

0 comments on commit 88234fa

Please sign in to comment.