Skip to content

Commit

Permalink
destination: add implementation for mysql as destination (airbytehq#3242
Browse files Browse the repository at this point in the history
)

* destination: add implementation for mysql as destination

* Fix formatting errors.

* address review comments + fix flaky test

* fix formatting

* address Davin's review comments

* add missing todo

* enable namespace test + only provide test user the minimum permissions required

Co-authored-by: Davin Chia <[email protected]>
  • Loading branch information
subodh1810 and davinchia authored May 7, 2021
1 parent 0c3cca2 commit 601ea5e
Show file tree
Hide file tree
Showing 16 changed files with 676 additions and 28 deletions.
22 changes: 22 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import java.util.Optional;
import org.apache.commons.dbcp2.BasicDataSource;
import org.jooq.SQLDialect;

Expand Down Expand Up @@ -60,6 +61,17 @@ public static JdbcDatabase createJdbcDatabase(final String username,
return new DefaultJdbcDatabase(connectionPool);
}

public static JdbcDatabase createJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final String connectionProperties) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.of(connectionProperties));

return new DefaultJdbcDatabase(connectionPool);
}

public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
Expand All @@ -75,11 +87,21 @@ private static BasicDataSource createBasicDataSource(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName) {
return createBasicDataSource(username, password, jdbcConnectionString, driverClassName,
Optional.empty());
}

private static BasicDataSource createBasicDataSource(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final Optional<String> connectionProperties) {
final BasicDataSource connectionPool = new BasicDataSource();
connectionPool.setDriverClassName(driverClassName);
connectionPool.setUsername(username);
connectionPool.setPassword(password);
connectionPool.setUrl(jdbcConnectionString);
connectionProperties.ifPresent(connectionPool::setConnectionProperties);
return connectionPool;
}

Expand Down
11 changes: 11 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ default void execute(String sql) throws SQLException {
execute(connection -> connection.createStatement().execute(sql));
}

default void executeWithinTransaction(List<String> queries) throws SQLException {
execute(connection -> {
connection.setAutoCommit(false);
for (String s : queries) {
connection.createStatement().execute(s);
}
connection.commit();
connection.setAutoCommit(true);
});
}

/**
* Use a connection to create a {@link ResultSet} and map it into a list. The entire
* {@link ResultSet} will be buffered in memory before the list is returned. The caller does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ public abstract class AbstractJdbcDestination extends BaseConnector implements D
private final NamingConventionTransformer namingResolver;
private final SqlOperations sqlOperations;

protected String getDriverClass() {
return driverClass;
}

protected NamingConventionTransformer getNamingResolver() {
return namingResolver;
}

protected SqlOperations getSqlOperations() {
return sqlOperations;
}

public AbstractJdbcDestination(final String driverClass,
final NamingConventionTransformer namingResolver,
final SqlOperations sqlOperations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void insertRecords(JdbcDatabase database, List<AirbyteRecordMessage> reco
});
}

private void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records) throws Exception {
protected void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records) throws Exception {
PrintWriter writer = null;
try {
writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8);
Expand All @@ -124,7 +124,6 @@ private void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records)
writer.close();
}
}

}

@Override
Expand All @@ -138,8 +137,14 @@ public String copyTableQuery(String schemaName, String srcTableName, String dstT
}

@Override
public void executeTransaction(JdbcDatabase database, String queries) throws Exception {
database.execute("BEGIN;\n" + queries + "COMMIT;");
public void executeTransaction(JdbcDatabase database, List<String> queries) throws Exception {
final StringBuilder appendedQueries = new StringBuilder();
appendedQueries.append("BEGIN;\n");
for (String query : queries) {
appendedQueries.append(query);
}
appendedQueries.append("COMMIT;");
database.execute(appendedQueries.toString());
}

@Override
Expand All @@ -151,6 +156,11 @@ private String dropTableIfExistsQuery(String schemaName, String tableName) {
return String.format("DROP TABLE IF EXISTS %s.%s;\n", schemaName, tableName);
}

@Override
public boolean isSchemaRequired() {
return true;
}

@Override
public boolean isValidData(String data) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -66,7 +67,7 @@ public static AirbyteMessageConsumer create(JdbcDatabase database,
NamingConventionTransformer namingResolver,
JsonNode config,
ConfiguredAirbyteCatalog catalog) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog);
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired());

return new BufferedStreamConsumer(
onStartFunction(database, sqlOperations, writeConfigs),
Expand All @@ -77,25 +78,42 @@ public static AirbyteMessageConsumer create(JdbcDatabase database,
sqlOperations::isValidData);
}

private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer namingResolver, JsonNode config, ConfiguredAirbyteCatalog catalog) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer namingResolver,
JsonNode config,
ConfiguredAirbyteCatalog catalog,
boolean schemaRequired) {
if (schemaRequired) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
}
final Instant now = Instant.now();
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now)).collect(Collectors.toList());
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now, schemaRequired)).collect(Collectors.toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(NamingConventionTransformer namingResolver,
private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
NamingConventionTransformer namingResolver,
JsonNode config,
Instant now) {
Instant now,
boolean schemaRequired) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream abStream = stream.getStream();

final String defaultSchemaName = namingResolver.getIdentifier(config.get("schema").asText());
final String defaultSchemaName = schemaRequired ? namingResolver.getIdentifier(config.get("schema").asText())
: namingResolver.getIdentifier(config.get("database").asText());
final String outputSchema = getOutputSchema(abStream, defaultSchemaName);

final String streamName = abStream.getName();
final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName));
final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);
String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);

// TODO (#2948): Refactor into StandardNameTransformed , this is for MySQL destination, the table
// names can't have more than 64 characters.
if (tmpTableName.length() > 64) {
String prefix = tmpTableName.substring(0, 31); // 31
String suffix = tmpTableName.substring(32, 63); // 31
tmpTableName = prefix + "__" + suffix;
}

final DestinationSyncMode syncMode = stream.getDestinationSyncMode();

return new WriteConfig(streamName, abStream.getNamespace(), outputSchema, tmpTableName, tableName, syncMode);
Expand Down Expand Up @@ -155,7 +173,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati
return (hasFailed) -> {
// copy data
if (!hasFailed) {
final StringBuilder queries = new StringBuilder();
List<String> queryList = new ArrayList<>();
LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size());
for (WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
Expand All @@ -166,16 +184,16 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case OVERWRITE -> queries.append(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case APPEND -> {}
case APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
queries.append(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName));
queryList.add(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName));
}

LOGGER.info("Executing finalization of tables.");
sqlOperations.executeTransaction(database, queries.toString());
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
}
// clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,18 @@ public interface SqlOperations {
* @param queries queries to execute
* @throws Exception exception
*/
void executeTransaction(JdbcDatabase database, String queries) throws Exception;
void executeTransaction(JdbcDatabase database, List<String> queries) throws Exception;

/**
* Check if the data record is valid and ok to be written to destination
*/
boolean isValidData(final String data);

/**
* Denotes whether the destination has the concept of schema or not
*
* @return true if the destination supports schema (ex: Postgres), false if it doesn't(MySQL)
*/
boolean isSchemaRequired();

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void close(boolean hasFailed) throws Exception {
public void closeAsOneTransaction(List<StreamCopier> streamCopiers, boolean hasFailed, JdbcDatabase db) throws Exception {
Exception firstException = null;
try {
StringBuilder mergeCopiersToFinalTableQuery = new StringBuilder();
List<String> queries = new ArrayList<>();
for (var copier : streamCopiers) {
try {
copier.closeStagingUploader(hasFailed);
Expand All @@ -149,7 +149,7 @@ public void closeAsOneTransaction(List<StreamCopier> streamCopiers, boolean hasF
copier.copyStagingFileToTemporaryTable();
var destTableName = copier.createDestinationTable();
var mergeQuery = copier.generateMergeStatement(destTableName);
mergeCopiersToFinalTableQuery.append(mergeQuery);
queries.add(mergeQuery);
}
} catch (Exception e) {
final String message = String.format("Failed to finalize copy to temp table due to: %s", e);
Expand All @@ -161,7 +161,7 @@ public void closeAsOneTransaction(List<StreamCopier> streamCopiers, boolean hasF
}
}
if (!hasFailed) {
sqlOperations.executeTransaction(db, mergeCopiersToFinalTableQuery.toString());
sqlOperations.executeTransaction(db, queries);
}
} finally {
for (var copier : streamCopiers) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/destination-mysql/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION destination-mysql

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-mysql
26 changes: 26 additions & 0 deletions airbyte-integrations/connectors/destination-mysql/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.mysql.MySQLDestination'
}

dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-jdbc')

implementation 'mysql:mysql-connector-java:8.0.22'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
integrationTestJavaImplementation "org.testcontainers:mysql:1.15.1"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}

Loading

0 comments on commit 601ea5e

Please sign in to comment.