Skip to content

Commit

Permalink
separate the retrieval of incremental and full refresh streams in the…
Browse files Browse the repository at this point in the history
… jdbc abstract source (airbytehq#2582)

* separate full refresh and incremental iterator creation

* fix underspecified test

* fmt

* fmt

* respond to nit and fix test
  • Loading branch information
jrhizor authored Mar 24, 2021
1 parent b03b5b1 commit 0530f85
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.time.Instant;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -67,6 +68,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -163,24 +165,13 @@ public AutoCloseableIterator<AirbyteMessage> read(JsonNode config, ConfiguredAir
.stream()
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getSchemaName(), t.getName()), Function.identity()));

final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();

for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final String streamName = airbyteStream.getStream().getName();
if (!tableNameToTable.containsKey(streamName)) {
LOGGER.info("Skipping stream {} because it is not in the source", streamName);
continue;
}

final TableInfoInternal table = tableNameToTable.get(streamName);
final AutoCloseableIterator<AirbyteMessage> tableReadIterator = createReadIterator(
database,
airbyteStream,
table,
stateManager,
emittedAt);
iteratorList.add(tableReadIterator);
}
final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators =
getFullRefreshIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = Stream.of(incrementalIterators, fullRefreshIterators)
.flatMap(Collection::stream)
.collect(Collectors.toList());

return AutoCloseableIterators.appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList), () -> {
LOGGER.info("Closing database connection pool.");
Expand All @@ -189,6 +180,64 @@ public AutoCloseableIterator<AirbyteMessage> read(JsonNode config, ConfiguredAir
});
}

public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(JdbcDatabase database,
ConfiguredAirbyteCatalog catalog,
Map<String, TableInfoInternal> tableNameToTable,
JdbcStateManager stateManager,
Instant emittedAt) {
return getSelectedIterators(
database,
catalog,
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.INCREMENTAL));
}

public List<AutoCloseableIterator<AirbyteMessage>> getFullRefreshIterators(JdbcDatabase database,
ConfiguredAirbyteCatalog catalog,
Map<String, TableInfoInternal> tableNameToTable,
JdbcStateManager stateManager,
Instant emittedAt) {
return getSelectedIterators(
database,
catalog,
tableNameToTable,
stateManager,
emittedAt,
configuredStream -> configuredStream.getSyncMode().equals(SyncMode.FULL_REFRESH));
}

private List<AutoCloseableIterator<AirbyteMessage>> getSelectedIterators(JdbcDatabase database,
ConfiguredAirbyteCatalog catalog,
Map<String, TableInfoInternal> tableNameToTable,
JdbcStateManager stateManager,
Instant emittedAt,
Predicate<ConfiguredAirbyteStream> selector) {
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>();

for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
if (selector.test(airbyteStream)) {
final String streamName = airbyteStream.getStream().getName();
if (!tableNameToTable.containsKey(streamName)) {
LOGGER.info("Skipping stream {} because it is not in the source", streamName);
continue;
}

final TableInfoInternal table = tableNameToTable.get(streamName);
final AutoCloseableIterator<AirbyteMessage> tableReadIterator = createReadIterator(
database,
airbyteStream,
table,
stateManager,
emittedAt);
iteratorList.add(tableReadIterator);
}
}

return iteratorList;
}

private AutoCloseableIterator<AirbyteMessage> createReadIterator(JdbcDatabase database,
ConfiguredAirbyteStream airbyteStream,
TableInfoInternal table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String strea
}

public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String streamName, List<Field> fields) {
return new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields)));
return new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields)))
.withSyncMode(SyncMode.FULL_REFRESH);
}

public static ConfiguredAirbyteStream createIncrementalConfiguredAirbyteStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.process.IntegrationLauncher;
Expand All @@ -73,6 +74,7 @@ class DefaultAirbyteSourceTest {
private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog()
.withStreams(Collections.singletonList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withStream(new AirbyteStream()
.withName("hudi:latest")
.withJsonSchema(CatalogHelpers.fieldsToJsonSchema(new Field(FIELD_NAME, Field.JsonSchemaPrimitive.STRING))))));
Expand Down

0 comments on commit 0530f85

Please sign in to comment.