Skip to content

Commit

Permalink
fix build due to file renaming (airbytehq#1423)
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-tricot authored Dec 22, 2020
1 parent 1214e2b commit 54ec74f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.integrations.base.StandardSQLNaming;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
Expand Down Expand Up @@ -65,10 +64,10 @@ public class LocalJsonDestination implements Destination {

static final String DESTINATION_PATH_FIELD = "destination_path";

private final SQLNamingResolvable namingResolver;
private final StandardNameTransformer namingResolver;

public LocalJsonDestination() {
namingResolver = new StandardSQLNaming();
namingResolver = new StandardNameTransformer();
}

@Override
Expand All @@ -88,7 +87,7 @@ public AirbyteConnectionStatus check(JsonNode config) {
}

@Override
public SQLNamingResolvable getNamingResolver() {
public StandardNameTransformer getNamingTransformer() {
return namingResolver;
}

Expand All @@ -107,8 +106,8 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
final Map<String, WriteConfig> writeConfigs = new HashMap<>();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getStream().getName();
final Path finalPath = destinationDir.resolve(getNamingResolver().getRawTableName(streamName) + ".jsonl");
final Path tmpPath = destinationDir.resolve(getNamingResolver().getTmpTableName(streamName) + ".jsonl");
final Path finalPath = destinationDir.resolve(getNamingTransformer().getRawTableName(streamName) + ".jsonl");
final Path tmpPath = destinationDir.resolve(getNamingTransformer().getTmpTableName(streamName) + ".jsonl");

final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL;
if (isIncremental && finalPath.toFile().exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.StandardSQLNaming;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -72,8 +72,9 @@ public void testCheckConnectionInvalidCredentials() {}
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception {
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
final Optional<Path> streamOutput =
allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardSQLNaming().getRawTableName(streamName))).findFirst();
final Optional<Path> streamOutput = allOutputs.stream()
.filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
.findFirst();

assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
# Standard Source Test Suite

Test methods start with `test`. Other methods are internal helpers in the java class implementing the test suite.
### testGetSpec

## testGetSpec

Verify that a spec operation issued to the connector returns a valid spec.

### testCheckConnection
## testCheckConnection

Verify that a check operation issued to the connector with the input config file returns a success response.

### testDiscover
## testDiscover

Verifies when a discover operation is run on the connector using the given config file, a valid catalog is output by the connector.

### testFullRefreshRead
## testFullRefreshRead

Configuring all streams in the input catalog to full refresh mode, verifies that a read operation produces some RECORD messages.

### testIdenticalFullRefreshes
## testIdenticalFullRefreshes

Configuring all streams in the input catalog to full refresh mode, performs two read operations on all streams which support full refresh syncs. It then verifies that the RECORD messages output from both were identical.

### testIncrementalSyncWithState
This test verifies that all streams in the input catalog which support incremental sync can do so correctly. It does this by running two read operations on the connector's Docker image: the first takes the configured catalog and config provided to this test as input. It then verifies that the sync produced a non-zero number of RECORD and STATE messages. The second read takes the same catalog and config used in the first test, plus the last STATE message output by the first read operation as the input state file. It verifies that no records are produced (since we read all records in the first sync). This test is performed only for streams which support incremental. Streams which do not support incremental sync are ignored. If no streams in the input catalog support incremental sync, this test is skipped.
## testIncrementalSyncWithState

This test verifies that all streams in the input catalog which support incremental sync can do so correctly. It does this by running two read operations on the connector's Docker image: the first takes the configured catalog and config provided to this test as input. It then verifies that the sync produced a non-zero number of RECORD and STATE messages. The second read takes the same catalog and config used in the first test, plus the last STATE message output by the first read operation as the input state file. It verifies that no records are produced \(since we read all records in the first sync\). This test is performed only for streams which support incremental. Streams which do not support incremental sync are ignored. If no streams in the input catalog support incremental sync, this test is skipped.

## testEmptyStateIncrementalIdenticalToFullRefresh

### testEmptyStateIncrementalIdenticalToFullRefresh
If the source does not support incremental sync, this test is skipped. Otherwise, this test runs two syncs: one where all streams provided in the input catalog sync in full refresh mode, and another where all the streams which in the input catalog which support incremental, sync in incremental mode (streams which don't support incremental sync in full refresh mode). Then, the test asserts that the two syncs produced the same RECORD messages. Any other type of message is disregarded.
If the source does not support incremental sync, this test is skipped. Otherwise, this test runs two syncs: one where all streams provided in the input catalog sync in full refresh mode, and another where all the streams which in the input catalog which support incremental, sync in incremental mode \(streams which don't support incremental sync in full refresh mode\). Then, the test asserts that the two syncs produced the same RECORD messages. Any other type of message is disregarded.

0 comments on commit 54ec74f

Please sign in to comment.