Skip to content

Commit

Permalink
Explicitely ignore cached catalog when the refresh source schema butt…
Browse files Browse the repository at this point in the history
…on is pressed (airbytehq#11342)

* Disable catalog cache when `getWithRefreshedCatalog` is used

* Add a test catalog generation function with some variability

the generateBasicApiCatalog function generate different objects with the
same attributes which can lead to misleading test results.

* Update tests

* Format
  • Loading branch information
malikdiarra authored Mar 23, 2022
1 parent d6ec448 commit 98ee6f5
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
final ConnectionRead connection = connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId());

if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) {
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId());
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq =
new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId()).disableCache(true);
final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);

final AirbyteCatalog original = connection.getSyncCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -195,10 +196,11 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE
.memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest())
.memoryLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit()));

final AirbyteCatalog modifiedCatalog = ConnectionHelpers.generateBasicApiCatalog();
final AirbyteCatalog modifiedCatalog = ConnectionHelpers.generateMultipleStreamsApiCatalog(2);

final SourceDiscoverSchemaRequestBody sourceDiscoverSchema = new SourceDiscoverSchemaRequestBody();
sourceDiscoverSchema.setSourceId(connectionRead.getSourceId());
sourceDiscoverSchema.setDisableCache(true);
when(schedulerHandler.discoverSchemaForSourceFromSourceId(sourceDiscoverSchema)).thenReturn(
new SourceDiscoverSchemaRead()
.jobInfo(mock(SynchronousJobRead.class))
Expand Down Expand Up @@ -303,21 +305,37 @@ public void testWebBackendGetConnection() throws ConfigNotFoundException, IOExce
assertEquals(expected, WebBackendConnectionRead);
}

@Test
public void testWebBackendGetConnectionWithDiscovery() throws ConfigNotFoundException, IOException, JsonValidationException {
public WebBackendConnectionRead testWebBackendGetConnection(final boolean withCatalogRefresh)
throws JsonValidationException, ConfigNotFoundException, IOException {
final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody();
connectionIdRequestBody.setConnectionId(connectionRead.getConnectionId());

final WebBackendConnectionRequestBody webBackendConnectionIdRequestBody = new WebBackendConnectionRequestBody();
webBackendConnectionIdRequestBody.setConnectionId(connectionRead.getConnectionId());
webBackendConnectionIdRequestBody.setWithRefreshedCatalog(true);
if (withCatalogRefresh) {
webBackendConnectionIdRequestBody.setWithRefreshedCatalog(true);
}

when(connectionsHandler.getConnection(connectionRead.getConnectionId())).thenReturn(connectionRead);
when(operationsHandler.listOperationsForConnection(connectionIdRequestBody)).thenReturn(operationReadList);

final WebBackendConnectionRead WebBackendConnectionRead = wbHandler.webBackendGetConnection(webBackendConnectionIdRequestBody);
return wbHandler.webBackendGetConnection(webBackendConnectionIdRequestBody);

assertEquals(expectedWithNewSchema, WebBackendConnectionRead);
}

@Test
public void testWebBackendGetConnectionWithDiscovery() throws ConfigNotFoundException, IOException, JsonValidationException {
final WebBackendConnectionRead result = testWebBackendGetConnection(true);
verify(schedulerHandler).discoverSchemaForSourceFromSourceId(any());
assertEquals(expectedWithNewSchema, result);
}

@Test
public void testWebBackendGetConnectionNoRefreshCatalog()
throws JsonValidationException, ConfigNotFoundException, IOException {
final WebBackendConnectionRead result = testWebBackendGetConnection(false);
verify(schedulerHandler, never()).discoverSchemaForSourceFromSourceId(any());
assertEquals(expected, result);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -199,6 +200,16 @@ public static AirbyteCatalog generateBasicApiCatalog() {
.config(generateBasicApiStreamConfig())));
}

public static AirbyteCatalog generateMultipleStreamsApiCatalog(final int streamsCount) {
final List<AirbyteStreamAndConfiguration> streamAndConfigurations = new ArrayList<>();
for (int i = 0; i < streamsCount; i++) {
streamAndConfigurations.add(new AirbyteStreamAndConfiguration()
.stream(generateBasicApiStream())
.config(generateBasicApiStreamConfig()));
}
return new AirbyteCatalog().streams(streamAndConfigurations);
}

private static AirbyteStreamConfiguration generateBasicApiStreamConfig() {
return new AirbyteStreamConfiguration()
.syncMode(SyncMode.INCREMENTAL)
Expand Down

0 comments on commit 98ee6f5

Please sign in to comment.