Skip to content

Commit

Permalink
🐛Source-mssql: added support for missed data types (airbytehq#9094)
Browse files Browse the repository at this point in the history
* [7728] Source-mssql: added support for missed data types
  • Loading branch information
etsybaev authored Jan 7, 2022
1 parent 04a113e commit 1877e2a
Show file tree
Hide file tree
Showing 13 changed files with 365 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.3.12",
"dockerImageTag": "0.3.13",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mssql",
"icon": "mssql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.3.12
dockerImageTag: 0.3.13
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3846,7 +3846,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.3.12"
- dockerImage: "airbyte/source-mssql:0.3.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand Down
4 changes: 4 additions & 0 deletions airbyte-db/lib/src/main/java/io/airbyte/db/DataTypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public static String toISO8601String(final java.util.Date date) {
return DATE_FORMAT.format(date);
}

public static String toISOTimeString(final LocalDateTime dateTime) {
return DateTimeFormatter.ISO_TIME.format(dateTime.toLocalTime());
}

public static String toISO8601String(final LocalDate date) {
return toISO8601String(date.atStartOfDay());
}
Expand Down
16 changes: 16 additions & 0 deletions airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.Spliterator;
Expand Down Expand Up @@ -166,6 +167,21 @@ public Stream<JsonNode> query(final String sql, final String... params) throws S
}, sourceOperations::rowToJson);
}

public ResultSetMetaData queryMetadata(final String sql, final String... params) throws SQLException {
try (final Stream<ResultSetMetaData> q = query(c -> {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
for (String param : params) {
statement.setString(i, param);
++i;
}
return statement;
},
ResultSet::getMetaData)) {
return q.findFirst().orElse(null);
}
}

public abstract DatabaseMetaData getMetaData() throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@

package io.airbyte.integrations.debezium.internals;

import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MSSQLConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class);;
private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class);

private final String SMALLDATETIME_TYPE = "SMALLDATETIME";
private final Set<String> DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "DATETIMEOFFSET", "SMALLDATETIME");
private final String TIME_TYPE = "TIME";
private final String SMALLMONEY_TYPE = "SMALLMONEY";

@Override
Expand All @@ -26,12 +30,13 @@ public void configure(Properties props) {}
@Override
public void converterFor(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
if (SMALLDATETIME_TYPE.equalsIgnoreCase(field.typeName())) {
if (DATE_TYPES.contains(field.typeName().toUpperCase())) {
registerDate(field, registration);
} else if (SMALLMONEY_TYPE.equalsIgnoreCase(field.typeName())) {
registerMoney(field, registration);
} else if (TIME_TYPE.equalsIgnoreCase(field.typeName())) {
registerTime(field, registration);
}

}

private void registerDate(final RelationalColumn field,
Expand All @@ -45,6 +50,23 @@ private void registerDate(final RelationalColumn field,
});
}

private void registerTime(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}

if (input instanceof Timestamp) {
return DataTypeUtils.toISOTimeString(((Timestamp) input).toLocalDateTime());
}

LOGGER.warn("Uncovered time class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}

private void registerMoney(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.float64(), input -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.12
LABEL io.airbyte.version=0.3.13
LABEL io.airbyte.name=airbyte/source-mssql
Loading

0 comments on commit 1877e2a

Please sign in to comment.