Skip to content

Commit

Permalink
Fix Normalization failing with "adapter" does not exist (airbytehq#2941)
Browse files Browse the repository at this point in the history
* Fix normalization dedup on non-string primary key columns

* Bumpversion of normalization image

* Add test cases to standard test
  • Loading branch information
ChristopheDuong authored Apr 19, 2021
1 parent 3e68438 commit 5859e0c
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 20 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ WORKDIR /airbyte

ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.version=0.1.21
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,13 @@ def generate_id_hashing_model(self, from_table: str, column_names: Dict[str, Tup
return sql

def safe_cast_to_strings(self, column_names: Dict[str, Tuple[str, str]]) -> List[str]:
return [StreamProcessor.safe_cast_to_string(field, self.properties[field], column_names[field][1]) for field in column_names]
return [StreamProcessor.safe_cast_to_string(self.properties[field], column_names[field][1]) for field in column_names]

@staticmethod
def safe_cast_to_string(property_name: str, definition: Dict, column_name: str) -> str:
def safe_cast_to_string(definition: Dict, column_name: str) -> str:
"""
Note that the result from this static method should always be used within a jinja context (for example, from jinja macro surrogate_key call)
"""
if "type" not in definition:
return column_name
elif is_boolean(definition["type"]):
Expand Down Expand Up @@ -523,9 +526,9 @@ def get_primary_key_from_path(self, column_names: Dict[str, Tuple[str, str]], pa
property_type = "object"
if is_number(property_type) or is_boolean(property_type) or is_array(property_type) or is_object(property_type):
# some destinations don't handle float columns (or other types) as primary keys, turn everything to string
return f"cast({self.safe_cast_to_string(field, self.properties[field], column_names[field][1])} as {jinja_call('dbt_utils.type_string()')})"
return f"cast({jinja_call(self.safe_cast_to_string(self.properties[field], column_names[field][1]))} as {jinja_call('dbt_utils.type_string()')})"
else:
return field
return column_names[field][0]
else:
# using an airbyte generated column
return f"cast({field} as {jinja_call('dbt_utils.type_string()')})"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ def test_cursor_field(cursor_field: List[str], expecting_exception: bool, expect
@pytest.mark.parametrize(
"primary_key, column_type, expecting_exception, expected_primary_keys, expected_final_primary_key_string",
[
([["id"]], "string", False, ["id"], "id"),
([["id"]], "string", False, ["id"], "{{ adapter.quote('id') }}"),
([["first_name"], ["last_name"]], "string", False, ["first_name", "last_name"], "first_name, last_name"),
([["float_id"]], "number", False, ["float_id"], "cast(adapter.quote('float_id') as {{ dbt_utils.type_string() }})"),
([["float_id"]], "number", False, ["float_id"], "cast({{ 'float_id' }} as {{ dbt_utils.type_string() }})"),
([["_airbyte_emitted_at"]], "string", False, [], "cast(_airbyte_emitted_at as {{ dbt_utils.type_string() }})"),
(None, "string", True, [], ""),
([["parent", "nested_field"]], "string", True, [], ""),
Expand Down Expand Up @@ -290,10 +290,7 @@ def test_primary_key(
from_table="",
)
try:
assert (
stream_processor.get_primary_key(column_names={key: (key, f"adapter.quote('{key}')") for key in expected_primary_keys})
== expected_final_primary_key_string
)
assert stream_processor.get_primary_key(column_names=stream_processor.extract_column_names()) == expected_final_primary_key_string
except ValueError as e:
if not expecting_exception:
raise e
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ public void testSecondSync() throws Exception {
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("currency", "USD")
.put("date", "2020-03-31T00:00:00Z")
.put("HKD", 10)
.put("NZD", 700)
Expand Down Expand Up @@ -363,6 +365,8 @@ public void testIncrementalSync() throws Exception {
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("currency", "USD")
.put("date", "2020-03-31T00:00:00Z")
.put("HKD", 10)
.put("NZD", 700)
Expand Down Expand Up @@ -423,7 +427,7 @@ public void testIncrementalDedupeSync() throws Exception {
s.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP);
s.withCursorField(Collections.emptyList());
// use composite primary key of various types (string, float)
s.withPrimaryKey(List.of(List.of("currency"), List.of("date"), List.of("NZD")));
s.withPrimaryKey(List.of(List.of("id"), List.of("currency"), List.of("date"), List.of("NZD")));
});

final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
Expand All @@ -438,6 +442,7 @@ public void testIncrementalDedupeSync() throws Exception {
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 2)
.put("currency", "EUR")
.put("date", "2020-09-01T00:00:00Z")
.put("HKD", 10.5)
Expand All @@ -449,6 +454,7 @@ public void testIncrementalDedupeSync() throws Exception {
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli() + 100L)
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("currency", "USD")
.put("date", "2020-09-01T00:00:00Z")
.put("HKD", 5.4)
Expand All @@ -464,7 +470,8 @@ public void testIncrementalDedupeSync() throws Exception {
.stream()
.filter(message -> message.getType() == Type.RECORD && message.getRecord() != null)
.collect(Collectors.toMap(
message -> message.getRecord().getData().get("currency").asText() +
message -> message.getRecord().getData().get("id").asText() +
message.getRecord().getData().get("currency").asText() +
message.getRecord().getData().get("date").asText() +
message.getRecord().getData().get("NZD").asText(),
message -> message,
Expand All @@ -475,7 +482,8 @@ public void testIncrementalDedupeSync() throws Exception {
.stream()
.filter(message -> message.getType() == Type.RECORD && message.getRecord() != null)
.filter(message -> {
final String key = message.getRecord().getData().get("currency").asText() +
final String key = message.getRecord().getData().get("id").asText() +
message.getRecord().getData().get("currency").asText() +
message.getRecord().getData().get("date").asText() +
message.getRecord().getData().get("NZD").asText();
return message.getRecord().getEmittedAt().equals(latestMessagesOnly.get(key).getRecord().getEmittedAt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"name": "exchange_rate",
"json_schema": {
"properties": {
"id": {
"type": "integer"
},
"currency": {
"type": "string"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "currency": "USD", "date": "2020-08-29T00:00:00Z", "NZD": 0.12, "HKD": 2.13}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": 1, "currency": "USD", "date": "2020-08-29T00:00:00Z", "NZD": 0.12, "HKD": 2.13}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "currency": "USD", "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": 1, "currency": "USD", "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": 2, "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": 2, "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);

public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.20";
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.21";

private final DestinationType destinationType;
private final ProcessBuilderFactory pbf;
Expand Down

0 comments on commit 5859e0c

Please sign in to comment.