From 98ee6f576040453d8084d72675566fb283bd7ad0 Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Wed, 23 Mar 2022 13:03:54 -0700 Subject: [PATCH] Explicitely ignore cached catalog when the refresh source schema button is pressed (#11342) * Disable catalog cache when `getWithRefreshedCatalog` is used * Add a test catalog generation function with some variability the generateBasicApiCatalog function generate different objects with the same attributes which can lead to misleading test results. * Update tests * Format --- .../WebBackendConnectionsHandler.java | 3 +- .../WebBackendConnectionsHandlerTest.java | 30 +++++++++++++++---- .../server/helpers/ConnectionHelpers.java | 11 +++++++ 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 64f2c5c8b277..fcb52c9f46dc 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -182,7 +182,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti final ConnectionRead connection = connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId()); if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) { - final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId()); + final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = + new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId()).disableCache(true); final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq); final AirbyteCatalog original = connection.getSyncCatalog(); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 89ecc2771459..aed914ba80dd 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -8,6 +8,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -195,10 +196,11 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE .memoryRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryRequest()) .memoryLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getMemoryLimit())); - final AirbyteCatalog modifiedCatalog = ConnectionHelpers.generateBasicApiCatalog(); + final AirbyteCatalog modifiedCatalog = ConnectionHelpers.generateMultipleStreamsApiCatalog(2); final SourceDiscoverSchemaRequestBody sourceDiscoverSchema = new SourceDiscoverSchemaRequestBody(); sourceDiscoverSchema.setSourceId(connectionRead.getSourceId()); + sourceDiscoverSchema.setDisableCache(true); when(schedulerHandler.discoverSchemaForSourceFromSourceId(sourceDiscoverSchema)).thenReturn( new SourceDiscoverSchemaRead() .jobInfo(mock(SynchronousJobRead.class)) @@ -303,21 +305,37 @@ public void testWebBackendGetConnection() throws ConfigNotFoundException, IOExce assertEquals(expected, WebBackendConnectionRead); } - @Test - public void testWebBackendGetConnectionWithDiscovery() throws ConfigNotFoundException, IOException, JsonValidationException { + public WebBackendConnectionRead testWebBackendGetConnection(final boolean withCatalogRefresh) + throws JsonValidationException, ConfigNotFoundException, IOException { final ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody(); connectionIdRequestBody.setConnectionId(connectionRead.getConnectionId()); final WebBackendConnectionRequestBody webBackendConnectionIdRequestBody = new WebBackendConnectionRequestBody(); webBackendConnectionIdRequestBody.setConnectionId(connectionRead.getConnectionId()); - webBackendConnectionIdRequestBody.setWithRefreshedCatalog(true); + if (withCatalogRefresh) { + webBackendConnectionIdRequestBody.setWithRefreshedCatalog(true); + } when(connectionsHandler.getConnection(connectionRead.getConnectionId())).thenReturn(connectionRead); when(operationsHandler.listOperationsForConnection(connectionIdRequestBody)).thenReturn(operationReadList); - final WebBackendConnectionRead WebBackendConnectionRead = wbHandler.webBackendGetConnection(webBackendConnectionIdRequestBody); + return wbHandler.webBackendGetConnection(webBackendConnectionIdRequestBody); - assertEquals(expectedWithNewSchema, WebBackendConnectionRead); + } + + @Test + public void testWebBackendGetConnectionWithDiscovery() throws ConfigNotFoundException, IOException, JsonValidationException { + final WebBackendConnectionRead result = testWebBackendGetConnection(true); + verify(schedulerHandler).discoverSchemaForSourceFromSourceId(any()); + assertEquals(expectedWithNewSchema, result); + } + + @Test + public void testWebBackendGetConnectionNoRefreshCatalog() + throws JsonValidationException, ConfigNotFoundException, IOException { + final WebBackendConnectionRead result = testWebBackendGetConnection(false); + verify(schedulerHandler, never()).discoverSchemaForSourceFromSourceId(any()); + assertEquals(expected, result); } @Test diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 1a6ad797d784..5a2ba7720de0 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -28,6 +28,7 @@ import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.server.handlers.helpers.CatalogConverter; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -199,6 +200,16 @@ public static AirbyteCatalog generateBasicApiCatalog() { .config(generateBasicApiStreamConfig()))); } + public static AirbyteCatalog generateMultipleStreamsApiCatalog(final int streamsCount) { + final List streamAndConfigurations = new ArrayList<>(); + for (int i = 0; i < streamsCount; i++) { + streamAndConfigurations.add(new AirbyteStreamAndConfiguration() + .stream(generateBasicApiStream()) + .config(generateBasicApiStreamConfig())); + } + return new AirbyteCatalog().streams(streamAndConfigurations); + } + private static AirbyteStreamConfiguration generateBasicApiStreamConfig() { return new AirbyteStreamConfiguration() .syncMode(SyncMode.INCREMENTAL)