From 601ea5eed7667ff39df6ce1c4060d276aeb3c930 Mon Sep 17 00:00:00 2001 From: Subodh Kant Chaturvedi Date: Fri, 7 May 2021 11:44:58 +0530 Subject: [PATCH] destination: add implementation for mysql as destination (#3242) * destination: add implementation for mysql as destination * Fix formatting errors. * address review comments + fix flaky test * fix formatting * address Davin's review comments * add missing todo * enable namespace test + only provide test user the minimum permissions required Co-authored-by: Davin Chia --- .../main/java/io/airbyte/db/Databases.java | 22 +++ .../java/io/airbyte/db/jdbc/JdbcDatabase.java | 11 ++ .../jdbc/AbstractJdbcDestination.java | 12 ++ .../jdbc/DefaultSqlOperations.java | 18 +- .../jdbc/JdbcBufferedConsumerFactory.java | 42 +++-- .../destination/jdbc/SqlOperations.java | 9 +- .../destination/jdbc/copy/CopyConsumer.java | 6 +- .../destination-mysql/.dockerignore | 3 + .../connectors/destination-mysql/Dockerfile | 12 ++ .../connectors/destination-mysql/build.gradle | 26 +++ .../destination/mysql/MySQLDestination.java | 115 ++++++++++++ .../mysql/MySQLNameTransformer.java | 36 ++++ .../destination/mysql/MySQLSqlOperations.java | 171 ++++++++++++++++++ .../src/main/resources/spec.json | 49 +++++ .../mysql/MySQLIntegrationTest.java | 164 +++++++++++++++++ .../protocol/models/CatalogHelpers.java | 8 - 16 files changed, 676 insertions(+), 28 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-mysql/.dockerignore create mode 100644 airbyte-integrations/connectors/destination-mysql/Dockerfile create mode 100644 airbyte-integrations/connectors/destination-mysql/build.gradle create mode 100644 airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java create mode 100644 airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java create mode 100644 airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java create mode 100644 airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json create mode 100644 airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLIntegrationTest.java diff --git a/airbyte-db/src/main/java/io/airbyte/db/Databases.java b/airbyte-db/src/main/java/io/airbyte/db/Databases.java index b80de96d65fc..c0318d7d2b7a 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/Databases.java +++ b/airbyte-db/src/main/java/io/airbyte/db/Databases.java @@ -28,6 +28,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import java.util.Optional; import org.apache.commons.dbcp2.BasicDataSource; import org.jooq.SQLDialect; @@ -60,6 +61,17 @@ public static JdbcDatabase createJdbcDatabase(final String username, return new DefaultJdbcDatabase(connectionPool); } + public static JdbcDatabase createJdbcDatabase(final String username, + final String password, + final String jdbcConnectionString, + final String driverClassName, + final String connectionProperties) { + final BasicDataSource connectionPool = + createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.of(connectionProperties)); + + return new DefaultJdbcDatabase(connectionPool); + } + public static JdbcDatabase createStreamingJdbcDatabase(final String username, final String password, final String jdbcConnectionString, @@ -75,11 +87,21 @@ private static BasicDataSource createBasicDataSource(final String username, final String password, final String jdbcConnectionString, final String driverClassName) { + return createBasicDataSource(username, password, jdbcConnectionString, driverClassName, + Optional.empty()); + } + + private static BasicDataSource createBasicDataSource(final String username, + final String password, + final String jdbcConnectionString, + final String driverClassName, + final Optional connectionProperties) { final BasicDataSource connectionPool = new BasicDataSource(); connectionPool.setDriverClassName(driverClassName); connectionPool.setUsername(username); connectionPool.setPassword(password); connectionPool.setUrl(jdbcConnectionString); + connectionProperties.ifPresent(connectionPool::setConnectionProperties); return connectionPool; } diff --git a/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java b/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java index 999295d44ec3..3f40bd384c23 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java +++ b/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java @@ -50,6 +50,17 @@ default void execute(String sql) throws SQLException { execute(connection -> connection.createStatement().execute(sql)); } + default void executeWithinTransaction(List queries) throws SQLException { + execute(connection -> { + connection.setAutoCommit(false); + for (String s : queries) { + connection.createStatement().execute(s); + } + connection.commit(); + connection.setAutoCommit(true); + }); + } + /** * Use a connection to create a {@link ResultSet} and map it into a list. The entire * {@link ResultSet} will be buffered in memory before the list is returned. The caller does not diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index 959bdbffc803..e8b752d24e0e 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -47,6 +47,18 @@ public abstract class AbstractJdbcDestination extends BaseConnector implements D private final NamingConventionTransformer namingResolver; private final SqlOperations sqlOperations; + protected String getDriverClass() { + return driverClass; + } + + protected NamingConventionTransformer getNamingResolver() { + return namingResolver; + } + + protected SqlOperations getSqlOperations() { + return sqlOperations; + } + public AbstractJdbcDestination(final String driverClass, final NamingConventionTransformer namingResolver, final SqlOperations sqlOperations) { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DefaultSqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DefaultSqlOperations.java index 2b43d1b91c57..af3921caf09d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DefaultSqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/DefaultSqlOperations.java @@ -107,7 +107,7 @@ public void insertRecords(JdbcDatabase database, List reco }); } - private void writeBatchToFile(File tmpFile, List records) throws Exception { + protected void writeBatchToFile(File tmpFile, List records) throws Exception { PrintWriter writer = null; try { writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8); @@ -124,7 +124,6 @@ private void writeBatchToFile(File tmpFile, List records) writer.close(); } } - } @Override @@ -138,8 +137,14 @@ public String copyTableQuery(String schemaName, String srcTableName, String dstT } @Override - public void executeTransaction(JdbcDatabase database, String queries) throws Exception { - database.execute("BEGIN;\n" + queries + "COMMIT;"); + public void executeTransaction(JdbcDatabase database, List queries) throws Exception { + final StringBuilder appendedQueries = new StringBuilder(); + appendedQueries.append("BEGIN;\n"); + for (String query : queries) { + appendedQueries.append(query); + } + appendedQueries.append("COMMIT;"); + database.execute(appendedQueries.toString()); } @Override @@ -151,6 +156,11 @@ private String dropTableIfExistsQuery(String schemaName, String tableName) { return String.format("DROP TABLE IF EXISTS %s.%s;\n", schemaName, tableName); } + @Override + public boolean isSchemaRequired() { + return true; + } + @Override public boolean isValidData(String data) { return true; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java index 84b30f6645ab..0002f9f0f1c2 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcBufferedConsumerFactory.java @@ -41,6 +41,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -66,7 +67,7 @@ public static AirbyteMessageConsumer create(JdbcDatabase database, NamingConventionTransformer namingResolver, JsonNode config, ConfiguredAirbyteCatalog catalog) { - final List writeConfigs = createWriteConfigs(namingResolver, config, catalog); + final List writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired()); return new BufferedStreamConsumer( onStartFunction(database, sqlOperations, writeConfigs), @@ -77,25 +78,42 @@ public static AirbyteMessageConsumer create(JdbcDatabase database, sqlOperations::isValidData); } - private static List createWriteConfigs(NamingConventionTransformer namingResolver, JsonNode config, ConfiguredAirbyteCatalog catalog) { - Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema."); + private static List createWriteConfigs(NamingConventionTransformer namingResolver, + JsonNode config, + ConfiguredAirbyteCatalog catalog, + boolean schemaRequired) { + if (schemaRequired) { + Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema."); + } final Instant now = Instant.now(); - return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now)).collect(Collectors.toList()); + return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now, schemaRequired)).collect(Collectors.toList()); } - private static Function toWriteConfig(NamingConventionTransformer namingResolver, + private static Function toWriteConfig( + NamingConventionTransformer namingResolver, JsonNode config, - Instant now) { + Instant now, + boolean schemaRequired) { return stream -> { Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode"); final AirbyteStream abStream = stream.getStream(); - final String defaultSchemaName = namingResolver.getIdentifier(config.get("schema").asText()); + final String defaultSchemaName = schemaRequired ? namingResolver.getIdentifier(config.get("schema").asText()) + : namingResolver.getIdentifier(config.get("database").asText()); final String outputSchema = getOutputSchema(abStream, defaultSchemaName); final String streamName = abStream.getName(); final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName)); - final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName); + String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName); + + // TODO (#2948): Refactor into StandardNameTransformed , this is for MySQL destination, the table + // names can't have more than 64 characters. + if (tmpTableName.length() > 64) { + String prefix = tmpTableName.substring(0, 31); // 31 + String suffix = tmpTableName.substring(32, 63); // 31 + tmpTableName = prefix + "__" + suffix; + } + final DestinationSyncMode syncMode = stream.getDestinationSyncMode(); return new WriteConfig(streamName, abStream.getNamespace(), outputSchema, tmpTableName, tableName, syncMode); @@ -155,7 +173,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati return (hasFailed) -> { // copy data if (!hasFailed) { - final StringBuilder queries = new StringBuilder(); + List queryList = new ArrayList<>(); LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size()); for (WriteConfig writeConfig : writeConfigs) { final String schemaName = writeConfig.getOutputSchemaName(); @@ -166,16 +184,16 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati sqlOperations.createTableIfNotExists(database, schemaName, dstTableName); switch (writeConfig.getSyncMode()) { - case OVERWRITE -> queries.append(sqlOperations.truncateTableQuery(schemaName, dstTableName)); + case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(schemaName, dstTableName)); case APPEND -> {} case APPEND_DEDUP -> {} default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode()); } - queries.append(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName)); + queryList.add(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName)); } LOGGER.info("Executing finalization of tables."); - sqlOperations.executeTransaction(database, queries.toString()); + sqlOperations.executeTransaction(database, queryList); LOGGER.info("Finalizing tables in destination completed."); } // clean up diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java index cc801acc0c4e..8c4bb18e2bc4 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java @@ -103,11 +103,18 @@ public interface SqlOperations { * @param queries queries to execute * @throws Exception exception */ - void executeTransaction(JdbcDatabase database, String queries) throws Exception; + void executeTransaction(JdbcDatabase database, List queries) throws Exception; /** * Check if the data record is valid and ok to be written to destination */ boolean isValidData(final String data); + /** + * Denotes whether the destination has the concept of schema or not + * + * @return true if the destination supports schema (ex: Postgres), false if it doesn't(MySQL) + */ + boolean isSchemaRequired(); + } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumer.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumer.java index 0ea0227497f5..91c1f3cad22e 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumer.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumer.java @@ -138,7 +138,7 @@ public void close(boolean hasFailed) throws Exception { public void closeAsOneTransaction(List streamCopiers, boolean hasFailed, JdbcDatabase db) throws Exception { Exception firstException = null; try { - StringBuilder mergeCopiersToFinalTableQuery = new StringBuilder(); + List queries = new ArrayList<>(); for (var copier : streamCopiers) { try { copier.closeStagingUploader(hasFailed); @@ -149,7 +149,7 @@ public void closeAsOneTransaction(List streamCopiers, boolean hasF copier.copyStagingFileToTemporaryTable(); var destTableName = copier.createDestinationTable(); var mergeQuery = copier.generateMergeStatement(destTableName); - mergeCopiersToFinalTableQuery.append(mergeQuery); + queries.add(mergeQuery); } } catch (Exception e) { final String message = String.format("Failed to finalize copy to temp table due to: %s", e); @@ -161,7 +161,7 @@ public void closeAsOneTransaction(List streamCopiers, boolean hasF } } if (!hasFailed) { - sqlOperations.executeTransaction(db, mergeCopiersToFinalTableQuery.toString()); + sqlOperations.executeTransaction(db, queries); } } finally { for (var copier : streamCopiers) { diff --git a/airbyte-integrations/connectors/destination-mysql/.dockerignore b/airbyte-integrations/connectors/destination-mysql/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/destination-mysql/Dockerfile b/airbyte-integrations/connectors/destination-mysql/Dockerfile new file mode 100644 index 000000000000..d9f5b4aa9eb2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/Dockerfile @@ -0,0 +1,12 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION destination-mysql + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/destination-mysql diff --git a/airbyte-integrations/connectors/destination-mysql/build.gradle b/airbyte-integrations/connectors/destination-mysql/build.gradle new file mode 100644 index 000000000000..6d45510deab9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/build.gradle @@ -0,0 +1,26 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.destination.mysql.MySQLDestination' +} + +dependencies { + implementation project(':airbyte-db') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:connectors:destination-jdbc') + + implementation 'mysql:mysql-connector-java:8.0.22' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql') + integrationTestJavaImplementation "org.testcontainers:mysql:1.15.1" + + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) +} + diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java new file mode 100644 index 000000000000..3f57431f23e1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -0,0 +1,115 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Databases; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MySQLDestination extends AbstractJdbcDestination implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestination.class); + + public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + + @Override + public AirbyteConnectionStatus check(JsonNode config) { + try (final JdbcDatabase database = getDatabase(config)) { + MySQLSqlOperations mySQLSqlOperations = (MySQLSqlOperations) getSqlOperations(); + + String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText()); + attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), + mySQLSqlOperations); + + mySQLSqlOperations.verifyLocalFileEnabled(database); + + VersionCompatibility compatibility = mySQLSqlOperations.isCompatibleVersion(database); + if (!compatibility.isCompatible()) { + throw new RuntimeException(String + .format("Your MySQL version %s is not compatible with Airbyte", + compatibility.getVersion())); + } + + return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (Exception e) { + LOGGER.error("Exception while checking connection: ", e); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage("Could not connect with provided configuration. \n" + e.getMessage()); + } + } + + public MySQLDestination() { + super(DRIVER_CLASS, new MySQLNameTransformer(), new MySQLSqlOperations()); + } + + @Override + protected JdbcDatabase getDatabase(JsonNode config) { + final JsonNode jdbcConfig = toJdbcConfig(config); + + return Databases.createJdbcDatabase( + jdbcConfig.get("username").asText(), + jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, + jdbcConfig.get("jdbc_url").asText(), + getDriverClass(), + "allowLoadLocalInfile=true"); + } + + @Override + public JsonNode toJdbcConfig(JsonNode config) { + final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText())); + + ImmutableMap.Builder configBuilder = ImmutableMap.builder() + .put("username", config.get("username").asText()) + .put("jdbc_url", jdbcUrl.toString()); + + if (config.has("password")) { + configBuilder.put("password", config.get("password").asText()); + } + + return Jsons.jsonNode(configBuilder.build()); + } + + public static void main(String[] args) throws Exception { + final Destination destination = new MySQLDestination(); + LOGGER.info("starting destination: {}", MySQLDestination.class); + new IntegrationRunner(destination).run(args); + LOGGER.info("completed destination: {}", MySQLDestination.class); + } + +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java new file mode 100644 index 000000000000..717828590450 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java @@ -0,0 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mysql; + +import io.airbyte.integrations.destination.ExtendedNameTransformer; + +public class MySQLNameTransformer extends ExtendedNameTransformer { + + @Override + protected String applyDefaultCase(String input) { + return input.toLowerCase(); + } + +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java new file mode 100644 index 000000000000..680f3b7c81b5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java @@ -0,0 +1,171 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mysql; + +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.jdbc.DefaultSqlOperations; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.stream.Collectors; + +public class MySQLSqlOperations extends DefaultSqlOperations { + + private boolean isLocalFileEnabled = false; + + @Override + public void executeTransaction(JdbcDatabase database, List queries) throws Exception { + database.executeWithinTransaction(queries); + } + + @Override + public void insertRecords(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName) + throws SQLException { + if (records.isEmpty()) { + return; + } + + verifyLocalFileEnabled(database); + try { + File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile(); + + loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile); + + Files.delete(tmpFile.toPath()); + } catch (IOException e) { + throw new SQLException(e); + } + } + + private void loadDataIntoTable(JdbcDatabase database, + List records, + String schemaName, + String tmpTableName, + File tmpFile) + throws SQLException { + database.execute(connection -> { + try { + writeBatchToFile(tmpFile, records); + + String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; + + String query = String.format( + "LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\\r\\n'", + absoluteFile, schemaName, tmpTableName); + + try (Statement stmt = connection.createStatement()) { + stmt.execute(query); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + void verifyLocalFileEnabled(JdbcDatabase database) throws SQLException { + boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database); + if (!localFileEnabled) { + tryEnableLocalFile(database); + } + isLocalFileEnabled = true; + } + + private void tryEnableLocalFile(JdbcDatabase database) throws SQLException { + database.execute(connection -> { + try (Statement statement = connection.createStatement()) { + statement.execute("set global local_infile=true"); + } catch (Exception e) { + throw new RuntimeException( + "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", + e); + } + }); + } + + private double getVersion(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), + resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); + return Double.parseDouble(value.get(0).substring(0, 3)); + } + + VersionCompatibility isCompatibleVersion(JdbcDatabase database) throws SQLException { + double version = getVersion(database); + return new VersionCompatibility(version, version >= 5.7); + } + + @Override + public boolean isSchemaRequired() { + return false; + } + + private boolean checkIfLocalFileIsEnabled(JdbcDatabase database) throws SQLException { + List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); + + return value.get(0).equalsIgnoreCase("on"); + } + + @Override + public String createTableQuery(String schemaName, String tableName) { + // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, + // 256 is enough + return String.format( + "CREATE TABLE IF NOT EXISTS %s.%s ( \n" + + "%s VARCHAR(256) PRIMARY KEY,\n" + + "%s JSON,\n" + + "%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6)\n" + + ");\n", + schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + } + + public static class VersionCompatibility { + + private final double version; + private final boolean isCompatible; + + public VersionCompatibility(double version, boolean isCompatible) { + this.version = version; + this.isCompatible = isCompatible; + } + + public double getVersion() { + return version; + } + + public boolean isCompatible() { + return isCompatible; + } + + } + +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json new file mode 100644 index 000000000000..37aca0a581c4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json @@ -0,0 +1,49 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql", + "supportsIncremental": true, + "supported_destination_sync_modes": ["overwrite", "append"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "MySQL Destination Spec", + "type": "object", + "required": ["host", "port", "username", "database"], + "additionalProperties": false, + "properties": { + "host": { + "title": "Host", + "description": "Hostname of the database.", + "type": "string", + "order": 0 + }, + "port": { + "title": "Port", + "description": "Port of the database.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 3306, + "examples": ["3306"], + "order": 1 + }, + "database": { + "title": "DB Name", + "description": "Name of the database.", + "type": "string", + "order": 2 + }, + "username": { + "title": "User", + "description": "Username to use to access the database.", + "type": "string", + "order": 3 + }, + "password": { + "title": "Password", + "description": "Password associated with the username.", + "type": "string", + "airbyte_secret": true, + "order": 4 + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLIntegrationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLIntegrationTest.java new file mode 100644 index 000000000000..08278cffc80b --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLIntegrationTest.java @@ -0,0 +1,164 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.standardtest.destination.TestDestination; +import java.sql.SQLException; +import java.util.List; +import java.util.stream.Collectors; +import org.jooq.JSONFormat; +import org.jooq.JSONFormat.RecordFormat; +import org.jooq.SQLDialect; +import org.testcontainers.containers.MySQLContainer; + +public class MySQLIntegrationTest extends TestDestination { + + private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); + + private MySQLContainer db; + private ExtendedNameTransformer namingResolver = new MySQLNameTransformer(); + + @Override + protected String getImageName() { + return "airbyte/destination-mysql:dev"; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected JsonNode getConfig() { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("username", db.getUsername()) + .put("password", db.getPassword()) + .put("database", db.getDatabaseName()) + .put("port", db.getFirstMappedPort()) + .build()); + } + + @Override + protected JsonNode getFailCheckConfig() { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", db.getHost()) + .put("username", db.getUsername()) + .put("password", "wrong password") + .put("database", db.getDatabaseName()) + .put("port", db.getFirstMappedPort()) + .build()); + } + + @Override + protected String getDefaultSchema(JsonNode config) { + if (config.get("database") == null) { + return null; + } + return config.get("database").asText(); + } + + @Override + protected List retrieveRecords(TestDestinationEnv testEnv, + String streamName, + String namespace) + throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) + .collect(Collectors.toList()); + } + + private List retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException { + return Databases.createDatabase( + db.getUsername(), + db.getPassword(), + String.format("jdbc:mysql://%s:%s/%s", + db.getHost(), + db.getFirstMappedPort(), + db.getDatabaseName()), + "com.mysql.cj.jdbc.Driver", + SQLDialect.MYSQL).query( + ctx -> ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, + JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(r -> r.formatJSON(JSON_FORMAT)) + .map(Jsons::deserialize) + .collect(Collectors.toList())); + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + db = new MySQLContainer<>("mysql:8.0"); + db.start(); + setLocalInFileToTrue(); + revokeAllPermissions(); + grantCorrectPermissions(); + } + + private void setLocalInFileToTrue() { + executeQuery("set global local_infile=true"); + } + + private void revokeAllPermissions() { + executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';"); + } + + private void grantCorrectPermissions() { + executeQuery("GRANT CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';"); + } + + private void executeQuery(String query) { + try { + Databases.createDatabase( + "root", + "test", + String.format("jdbc:mysql://%s:%s/%s", + db.getHost(), + db.getFirstMappedPort(), + db.getDatabaseName()), + "com.mysql.cj.jdbc.Driver", + SQLDialect.MYSQL).query( + ctx -> ctx + .execute(query)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + db.stop(); + db.close(); + } + +} diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 91705b98dda6..01f701822f3c 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -157,12 +157,4 @@ protected static Set getAllFieldNames(JsonNode node) { return allFieldNames; } - public static Set getStreamNames(ConfiguredAirbyteCatalog catalog) { - return catalog.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toSet()); - } - }