From d35d7b07b82418fa8a19e03be6e2072f825f6a70 Mon Sep 17 00:00:00 2001 From: oneshcheret <33333155+sashaNeshcheret@users.noreply.github.com> Date: Sat, 7 May 2022 11:22:22 +0300 Subject: [PATCH] Mssql destination: enable DAT tests, use nvarchar and datetime2 by default (#12305) * Mssql destination: enable DAT tests for mssql destination, use nvarchar and datetime2 by default * Mssql destination: update array handling in test * Mssql destination: update array and JSON handling in test * Mssql destination: remove unused method * bugfix bigquery tests, dataset_location added * basic-normalization.md updated Signed-off-by: Sergey Chvalyuk * Mssql destination: change parent class for mssql test Co-authored-by: Sergey Chvalyuk --- .../bases/base-normalization/Dockerfile | 2 +- .../macros/cross_db_utils/datatypes.sql | 6 +-- .../integration_tests/dbt_integration_test.py | 1 + .../JdbcDestinationAcceptanceTest.java | 2 +- .../mssql/MSSQLDestinationAcceptanceTest.java | 47 ++++++++++-------- .../mssql/MSSQLTestDataComparator.java | 48 +++++++++++++++++++ .../NormalizationRunnerFactory.java | 2 +- .../basic-normalization.md | 3 +- 8 files changed, 84 insertions(+), 27 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 16a96353ef47..6d3ba1ddb6b0 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.77 +LABEL io.airbyte.version=0.1.78 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql index 07600de91fff..18e2d61ecb8c 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql @@ -33,7 +33,7 @@ {%- endmacro -%} {%- macro sqlserver__type_json() -%} - VARCHAR(max) + NVARCHAR(max) {%- endmacro -%} {% macro clickhouse__type_json() %} @@ -52,7 +52,7 @@ {%- endmacro -%} {% macro sqlserver__type_string() %} - VARCHAR(max) + NVARCHAR(max) {%- endmacro -%} {%- macro clickhouse__type_string() -%} @@ -154,7 +154,7 @@ {%- macro sqlserver__type_timestamp_with_timezone() -%} {#-- in TSQL timestamp is really datetime or datetime2 --#} {#-- https://docs.microsoft.com/en-us/sql/t-sql/functions/date-and-time-data-types-and-functions-transact-sql?view=sql-server-ver15#DateandTimeDataTypes --#} - datetime + datetime2 {%- endmacro -%} {% macro clickhouse__type_timestamp_with_timezone() %} diff --git a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py index f7a75de2042e..b3c1b4b543f8 100644 --- a/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py +++ b/airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py @@ -326,6 +326,7 @@ def generate_profile_yaml_file( "credentials_json": json.dumps(credentials), "dataset_id": self.target_schema, "project_id": credentials["project_id"], + "dataset_location": "US", } elif destination_type.value == DestinationType.MYSQL.value: profiles_config["database"] = self.target_schema diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.java index 8c7140aca435..21dde6f904c3 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.java @@ -22,7 +22,7 @@ protected JsonNode getJsonFromRecord(Record record) { var value = record.get(field); switch (field.getDataType().getTypeName()) { - case "varchar", "jsonb", "other": + case "varchar", "nvarchar", "jsonb", "other": var stringValue = (value != null ? value.toString() : null); if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$") || stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) { diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java index b2da0d07a551..d0a2dc25eb7b 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java @@ -5,28 +5,29 @@ package io.airbyte.integrations.destination.mssql; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; -import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; -import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.MSSQLServerContainer; -public class MSSQLDestinationAcceptanceTest extends DestinationAcceptanceTest { +public class MSSQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { private static MSSQLServerContainer db; private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); + private final ObjectMapper mapper = new ObjectMapper(); private JsonNode configWithoutDbName; private JsonNode config; @@ -80,7 +81,7 @@ protected List retrieveRecords(final TestDestinationEnv env, throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() - .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) + .map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA)) .collect(Collectors.toList()); } @@ -96,19 +97,6 @@ protected List retrieveNormalizedRecords(final TestDestinationEnv env, return retrieveRecordsFromTable(tableName, namespace); } - @Override - protected List resolveIdentifier(final String identifier) { - final List result = new ArrayList<>(); - final String resolved = namingResolver.getIdentifier(identifier); - result.add(identifier); - result.add(resolved); - if (!resolved.startsWith("\"")) { - result.add(resolved.toLowerCase()); - result.add(resolved.toUpperCase()); - } - return result; - } - private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { return Databases.createSqlServerDatabase(db.getUsername(), db.getPassword(), db.getJdbcUrl()).query( @@ -117,8 +105,7 @@ private List retrieveRecordsFromTable(final String tableName, final St return ctx .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .stream() - .map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat())) - .map(Jsons::deserialize) + .map(this::getJsonFromRecord) .collect(Collectors.toList()); }); } @@ -167,6 +154,26 @@ protected void setup(final TestDestinationEnv testEnv) throws SQLException { @Override protected void tearDown(final TestDestinationEnv testEnv) {} + @Override + protected TestDataComparator getTestDataComparator() { + return new MSSQLTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @AfterAll static void cleanUp() { db.stop(); diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java new file mode 100644 index 000000000000..fa98a860f767 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLTestDataComparator.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +public class MSSQLTestDataComparator extends AdvancedTestDataComparator { + + public static final String ACTUAL_MSSQL_AIRBYTE_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.S"; + private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); + + @Override + protected boolean compareDateTimeValues(String airbyteMessageValue, String destinationValue) { + if (!isDateTimeValue(destinationValue)) { + destinationValue = LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(ACTUAL_MSSQL_AIRBYTE_DATETIME_FORMAT)).toString(); + } + return super.compareDateTimeValues(airbyteMessageValue, destinationValue); + } + + @Override + protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { + LocalDateTime parsedDateTime = LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(ACTUAL_MSSQL_AIRBYTE_DATETIME_FORMAT)); + return ZonedDateTime.of(parsedDateTime, ZoneOffset.UTC); + } + + @Override + protected List resolveIdentifier(final String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + result.add(resolved.toUpperCase()); + } + return result; + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index 0c1944eb0a79..a5d582776173 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.1.77"; + public static final String NORMALIZATION_VERSION = "0.1.78"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index aea2607de536..1b1d4a9200d4 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -352,7 +352,8 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------| :--- | :--- | :--- | :--- | -| | 0.1.77 | 2022-04-19 | [\#9610](https://github.com/airbytehq/airbyte/pull/9610) | Add support redshift SUPER type | +| | 0.1.78 | 2022-05-06 | [\#12305](https://github.com/airbytehq/airbyte/pull/12305) | Mssql: use NVARCHAR and datetime2 by default | +| 0.36.2-alpha | 0.1.77 | 2022-04-19 | [\#12064](https://github.com/airbytehq/airbyte/pull/12064) | Add support redshift SUPER type | | 0.35.65-alpha | 0.1.75 | 2022-04-09 | [\#11511](https://github.com/airbytehq/airbyte/pull/11511) | Move DBT modules from `/tmp/dbt_modules` to `/dbt` | | 0.35.61-alpha | 0.1.74 | 2022-03-24 | [\#10905](https://github.com/airbytehq/airbyte/pull/10905) | Update clickhouse dbt version | | 0.35.60-alpha | 0.1.73 | 2022-03-25 | [\#11267](https://github.com/airbytehq/airbyte/pull/11267) | Set `--event-buffer-size` to reduce memory usage |