Skip to content

Commit

Permalink
move api requests into the test harness (#11287)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfsiega-airbyte committed Feb 16, 2024
1 parent e2f7ba9 commit d73d44d
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 432 deletions.

Large diffs are not rendered by default.

43 changes: 28 additions & 15 deletions airbyte-test-utils/src/main/java/io/airbyte/test/utils/Asserts.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionState;
import io.airbyte.api.client.model.generated.JobInfoRead;
import io.airbyte.api.client.model.generated.Pagination;
import io.airbyte.api.client.model.generated.StreamDescriptor;
import io.airbyte.api.client.model.generated.StreamState;
import io.airbyte.api.client.model.generated.StreamStatusJobType;
import io.airbyte.api.client.model.generated.StreamStatusListRequestBody;
import io.airbyte.api.client.model.generated.StreamStatusRead;
import io.airbyte.api.client.model.generated.StreamStatusReadList;
import io.airbyte.api.client.model.generated.StreamStatusRunState;
Expand All @@ -37,6 +37,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -200,7 +201,7 @@ private static void dropAirbyteSystemColumns(final List<JsonNode> destinationRec
* @param expectedJobType The expected type of the stream status.
*/
public static void assertStreamStatuses(
final AirbyteApiClient apiClient,
final AcceptanceTestHarness testHarness,
final UUID workspaceId,
final UUID connectionId,
final JobInfoRead jobInfoRead,
Expand All @@ -209,14 +210,32 @@ public static void assertStreamStatuses(
final var jobId = jobInfoRead.getJob().getId();
final var attemptNumber = jobInfoRead.getAttempts().size() - 1;

final List<StreamStatusRead> streamStatuses = fetchStreamStatus(apiClient, workspaceId, connectionId, jobId, attemptNumber);
final List<StreamStatusRead> streamStatuses = fetchStreamStatus(testHarness, workspaceId, connectionId, jobId, attemptNumber);
assertNotNull(streamStatuses);
final List<StreamStatusRead> filteredStreamStatuses = streamStatuses.stream().filter(s -> expectedJobType.equals(s.getJobType())).toList();
assertFalse(filteredStreamStatuses.isEmpty());
filteredStreamStatuses.forEach(status -> assertEquals(expectedRunState, status.getRunState()));

}

/**
* Assert that all the expected stream descriptors are contained in the state for the given
* connection.
*
* @param connectionId to check the state
* @param expectedStreamDescriptors the streams we expect to find
* @throws Exception if the API requests fail
*/
public static void assertStreamStateContainsStream(final AcceptanceTestHarness testHarness,
final UUID connectionId,
final List<StreamDescriptor> expectedStreamDescriptors)
throws Exception {
final ConnectionState state = testHarness.getConnectionState(connectionId);
final List<StreamDescriptor> streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList();

Assertions.assertTrue(streamDescriptors.containsAll(expectedStreamDescriptors) && expectedStreamDescriptors.containsAll(streamDescriptors));
}

/**
* Fetches the stream status associated with the provided information, retrying for a set amount of
* attempts if the status is currently not available.
Expand All @@ -227,29 +246,23 @@ public static void assertStreamStatuses(
* @param attempt The attempt number associated with the sync execution.
*/
private static List<StreamStatusRead> fetchStreamStatus(
final AirbyteApiClient apiClient,
final AcceptanceTestHarness testHarness,
final UUID workspaceId,
final UUID connectionId,
final Long jobId,
final Integer attempt) {
List<StreamStatusRead> results = List.of();
final StreamStatusListRequestBody streamStatusListRequestBody = new StreamStatusListRequestBody()
.connectionId(connectionId)
.jobId(jobId)
.attemptNumber(attempt)
.workspaceId(workspaceId)
.pagination(new Pagination().pageSize(100).rowOffset(0));

int count = 0;
while (count < 60 && results.isEmpty()) {
LOGGER.debug("Fetching stream status for {}...", streamStatusListRequestBody);
LOGGER.debug("Fetching stream status for {} {} {} {}...", connectionId, jobId, attempt, workspaceId);
try {
final StreamStatusReadList result = apiClient.getStreamStatusesApi().getStreamStatuses(streamStatusListRequestBody);
final StreamStatusReadList result = testHarness.getStreamStatuses(connectionId, jobId, attempt, workspaceId);
if (result != null) {
LOGGER.debug("Stream status result for connection {}: {}", connectionId, result);
results = result.getStreamStatuses();
}
} catch (final ApiException e) {
} catch (final Exception e) {
LOGGER.info("Unable to call stream status API.", e);
}
count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

import static io.airbyte.test.utils.AcceptanceTestHarness.COLUMN_ID;
import static io.airbyte.test.utils.AcceptanceTestHarness.PUBLIC_SCHEMA_NAME;
import static io.airbyte.test.utils.AcceptanceTestHarness.waitForConnectionState;
import static io.airbyte.test.utils.AcceptanceTestHarness.waitForSuccessfulJob;
import static io.airbyte.test.utils.AcceptanceTestHarness.waitWhileJobHasStatus;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -22,13 +19,11 @@
import io.airbyte.api.client.model.generated.AirbyteCatalog;
import io.airbyte.api.client.model.generated.AirbyteStream;
import io.airbyte.api.client.model.generated.AttemptInfoRead;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionState;
import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationDefinitionRead;
import io.airbyte.api.client.model.generated.DestinationRead;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.api.client.model.generated.JobInfoRead;
import io.airbyte.api.client.model.generated.JobRead;
import io.airbyte.api.client.model.generated.JobStatus;
Expand Down Expand Up @@ -89,14 +84,13 @@ class AdvancedAcceptanceTests {
private static final String COLUMN1 = "column1";

private static AcceptanceTestHarness testHarness;
private static AirbyteApiClient apiClient;
private static UUID workspaceId;
private static final String AIRBYTE_SERVER_HOST = Optional.ofNullable(System.getenv("AIRBYTE_SERVER_HOST")).orElse("http://localhost:8001");

@BeforeAll
static void init() throws URISyntaxException, IOException, InterruptedException, ApiException {
final URI url = new URI(AIRBYTE_SERVER_HOST);
apiClient = new AirbyteApiClient(
final var apiClient = new AirbyteApiClient(
new ApiClient().setScheme(url.getScheme())
.setHost(url.getHost())
.setPort(url.getPort())
Expand All @@ -115,7 +109,7 @@ static void init() throws URISyntaxException, IOException, InterruptedException,
LOGGER.info("pg source definition: {}", sourceDef.getDockerImageTag());
LOGGER.info("pg destination definition: {}", destinationDef.getDockerImageTag());

testHarness = new AcceptanceTestHarness(apiClient, workspaceId);
testHarness = new AcceptanceTestHarness(apiClient, null, workspaceId);
}

@AfterAll
Expand Down Expand Up @@ -146,14 +140,14 @@ void testManualSync() throws Exception {
discoverResult.getCatalogId())
.build());
final var connectionId = conn.getConnectionId();
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
final JobInfoRead connectionSyncRead = testHarness.syncConnection(connectionId);
testHarness.waitForSuccessfulJob(connectionSyncRead.getJob());
Asserts.assertSourceAndDestinationDbRawRecordsInSync(testHarness.getSourceDatabase(), testHarness.getDestinationDatabase(), PUBLIC_SCHEMA_NAME,
conn.getNamespaceFormat(), false, false);

LOGGER.info("===== before stream");
final var finalJob = apiClient.getJobsApi().getJobInfoWithoutLogs(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()));
Asserts.assertStreamStatuses(apiClient, workspaceId, connectionId, finalJob, StreamStatusRunState.COMPLETE, StreamStatusJobType.SYNC);
final var finalJob = testHarness.getJobInfoRead(connectionSyncRead.getJob().getId());
Asserts.assertStreamStatuses(testHarness, workspaceId, connectionId, finalJob, StreamStatusRunState.COMPLETE, StreamStatusJobType.SYNC);

testHarness.cleanup();
}
Expand Down Expand Up @@ -203,21 +197,20 @@ void testCheckpointing() throws Exception {
catalog,
discoverResult.getCatalogId()).build())
.getConnectionId();
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
final JobInfoRead connectionSyncRead1 = testHarness.syncConnection(connectionId);

// wait to get out of pending.
final JobRead runningJob = waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING));
final JobRead runningJob = testHarness.waitWhileJobHasStatus(connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING));
// wait to get out of running.
waitWhileJobHasStatus(apiClient.getJobsApi(), runningJob, Sets.newHashSet(JobStatus.RUNNING));
testHarness.waitWhileJobHasStatus(runningJob, Sets.newHashSet(JobStatus.RUNNING));
// now cancel it so that we freeze state!
try {
apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId()));
testHarness.cancelSync(connectionSyncRead1.getJob().getId());
} catch (final Exception e) {
LOGGER.error("error:", e);
}

final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId);
final ConnectionState connectionState = testHarness.waitForConnectionState(connectionId);

/*
* the source is set to emit a state message every 5th message. because of the multithreaded nature,
Expand Down Expand Up @@ -268,15 +261,14 @@ void testBackpressure() throws Exception {
catalog,
discoverResult.getCatalogId()).build())
.getConnectionId();
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
final JobInfoRead connectionSyncRead1 = testHarness.syncConnection(connectionId);

// wait to get out of pending.
final JobRead runningJob = waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING));
final JobRead runningJob = testHarness.waitWhileJobHasStatus(connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING));
// wait to get out of running.
waitWhileJobHasStatus(apiClient.getJobsApi(), runningJob, Sets.newHashSet(JobStatus.RUNNING));
testHarness.waitWhileJobHasStatus(runningJob, Sets.newHashSet(JobStatus.RUNNING));

final JobInfoRead jobInfo = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(runningJob.getId()));
final JobInfoRead jobInfo = testHarness.getJobInfoRead(runningJob.getId());
final AttemptInfoRead attemptInfoRead = jobInfo.getAttempts().get(jobInfo.getAttempts().size() - 1);
assertNotNull(attemptInfoRead);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import static io.airbyte.test.acceptance.BasicAcceptanceTestsResources.MAX_TRIES;
import static io.airbyte.test.acceptance.BasicAcceptanceTestsResources.TRUE;
import static io.airbyte.test.utils.AcceptanceTestHarness.PUBLIC_SCHEMA_NAME;
import static io.airbyte.test.utils.AcceptanceTestHarness.waitForSuccessfulJob;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -171,7 +170,7 @@ static void init() throws Exception {
: UUID.fromString(System.getenv().get(AIRBYTE_ACCEPTANCE_TEST_WORKSPACE_ID));
LOGGER.info("workspaceId = " + workspaceId);

testHarness = new AcceptanceTestHarness(configApiClient, workspaceId);
testHarness = new AcceptanceTestHarness(configApiClient, null, workspaceId);

testHarness.ensureCleanSlate();
final URI publicApiUrl = new URI(AIRBYTE_PUBLIC_API_SERVER_HOST);
Expand Down Expand Up @@ -249,7 +248,7 @@ void testJobsEndpointThroughPublicAPI() throws Exception {
jobCreate.withConnectionId(connectionId.toString());
jobCreate.withJobType(expectedJobType);
airbyteApiClient.jobs.createJob(jobCreate);
waitForSuccessfulJob(configApiClient.getJobsApi(), testHarness.getMostRecentSyncForConnection(connectionId));
testHarness.waitForSuccessfulJob(testHarness.getMostRecentSyncForConnection(connectionId));

// Test regular old get jobs
final ListJobsRequest listJobsRequest = new ListJobsRequest()
Expand Down Expand Up @@ -341,7 +340,7 @@ void testJobsEndpointThroughPublicAPI() throws Exception {
// publicApiJobsClient.createJob(jobCreate);

final JobInfoRead jobInfoRead = configApiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(configApiJobsClient, jobInfoRead.getJob());
testHarness.waitForSuccessfulJob(jobInfoRead.getJob());

// null response type means we return both sync and reset
final ListJobsRequest noFilterRequest = new ListJobsRequest().withLimit(1000);
Expand Down
Loading

0 comments on commit d73d44d

Please sign in to comment.