Skip to content

Commit

Permalink
Add logging to JdbcSource and JdbcDestination (airbytehq#1798)
Browse files Browse the repository at this point in the history
* gives better visibility into the progress of phase of both the source and destination.

* 1 log per every 10K records in the source

* destination logs each step in destination including setting up tmp tables, creating final tables, cleaning up tmp tables
  • Loading branch information
cgardens authored Jan 23, 2021
1 parent d46948f commit 8184441
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,17 @@ protected void startTracked() throws Exception {
Preconditions.checkState(!hasStarted, "Consumer has already been started.");
hasStarted = true;

LOGGER.info("{} started.", BufferedStreamConsumer.class);

LOGGER.info("Buffer creation started for {} streams.", streamNames.size());
final Path queueRoot = Files.createTempDirectory("queues");
for (String streamName : streamNames) {
LOGGER.info("Buffer creation for stream {}.", streamName);
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(streamName), streamName);
writeBuffers.put(streamName, writeBuffer);
}
LOGGER.info("Buffer creation completed.");

onStart.call();

writerPool.scheduleWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// Strategy:
// 1. Create a temporary table for each stream
Expand All @@ -56,6 +58,8 @@
// the final table name.
public class JdbcBufferedConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBufferedConsumerFactory.class);

public static DestinationConsumer<AirbyteMessage> create(JdbcDatabase database,
SqlOperations sqlOperations,
NamingConventionTransformer namingResolver,
Expand Down Expand Up @@ -87,13 +91,17 @@ private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer

private static OnStartFunction onStartFunction(JdbcDatabase database, SqlOperations sqlOperations, List<WriteConfig> writeConfigs) {
return () -> {
LOGGER.info("Preparing tmp tables in destination started for {} streams", writeConfigs.size());
for (final WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputNamespaceName();
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Preparing tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(),
schemaName, tmpTableName);

sqlOperations.createSchemaIfNotExists(database, schemaName);
sqlOperations.createTableIfNotExists(database, schemaName, tmpTableName);
}
LOGGER.info("Preparing tables in destination completed.");
};
}

Expand All @@ -120,10 +128,13 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati
// copy data
if (!hasFailed) {
final StringBuilder queries = new StringBuilder();
LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size());
for (WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputNamespaceName();
final String srcTableName = writeConfig.getTmpTableName();
final String dstTableName = writeConfig.getOutputTableName();
LOGGER.info("Finalizing stream {}. schema {}, tmp table {}, final table {}", writeConfig.getStreamName(), schemaName, srcTableName,
dstTableName);

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
Expand All @@ -133,14 +144,22 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati
}
queries.append(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName));
}

LOGGER.info("Executing finalization of tables.");
sqlOperations.executeTransaction(database, queries.toString());
LOGGER.info("Finalizing tables in destination completed.");
}
// clean up
LOGGER.info("Cleaning tmp tables in destination started for {} streams", writeConfigs.size());
for (WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputNamespaceName();
final String tmpTableName = writeConfig.getTmpTableName();
LOGGER.info("Cleaning tmp table in destination started for stream {}. schema {}, tmp table name: {}", writeConfig.getStreamName(), schemaName,
tmpTableName);

sqlOperations.dropTableIfExists(database, schemaName, tmpTableName);
}
LOGGER.info("Cleaning tmp tables in destination completed.");
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -226,7 +227,13 @@ private Stream<AirbyteMessage> createReadStream(JdbcDatabase database,
throw new IllegalArgumentException(String.format("%s does not support sync mode: %s.", airbyteStream.getSyncMode(), AbstractJdbcSource.class));
}

return stream;
final AtomicLong recordCount = new AtomicLong();
return stream.peek(r -> {
final long count = recordCount.incrementAndGet();
if (count % 10000 == 0) {
LOGGER.info("Reading stream {}. Records read: {}", streamName, count);
}
});
}

private static Stream<AirbyteMessage> getIncrementalStream(JdbcDatabase database,
Expand Down

0 comments on commit 8184441

Please sign in to comment.