Skip to content

Commit

Permalink
fix: improve UI reactions to changes in connection sync status (#14355)
Browse files Browse the repository at this point in the history
  • Loading branch information
chandlerprall committed Oct 21, 2024
1 parent 2c40e55 commit d64b416
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 184 deletions.
9 changes: 8 additions & 1 deletion airbyte-api/server-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2808,7 +2808,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/BooleanRead"
$ref: "#/components/schemas/JobReadResponse"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
Expand Down Expand Up @@ -10172,6 +10172,8 @@ components:
lastSuccessfulSync:
type: integer
format: int64
activeJob:
$ref: "#/components/schemas/JobRead"
lastSyncJobId:
$ref: "#/components/schemas/JobId"
lastSyncAttemptNumber:
Expand Down Expand Up @@ -11850,6 +11852,11 @@ components:
$ref: "#/components/schemas/JobAggregatedStats"
streamAggregatedStats:
$ref: "#/components/schemas/StreamAggregatedStats"
JobReadResponse:
type: object
properties:
job:
$ref: "#/components/schemas/JobRead"
JobAggregatedStats:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,8 @@ public List<ConnectionStatusRead> getConnectionStatuses(
final List<Job> jobs = jobPersistence.listJobsLight(REPLICATION_TYPES,
connectionId.toString(),
maxJobLookback);
final boolean isRunning = jobs.stream().anyMatch(job -> JobStatus.NON_TERMINAL_STATUSES.contains(job.getStatus()));
final Optional<Job> activeJob = jobs.stream().findFirst().filter(job -> JobStatus.NON_TERMINAL_STATUSES.contains(job.getStatus()));
final boolean isRunning = activeJob.isPresent();

final Optional<Job> lastSucceededOrFailedJob =
jobs.stream().filter(job -> JobStatus.TERMINAL_STATUSES.contains(job.getStatus()) && job.getStatus() != JobStatus.CANCELLED).findFirst();
Expand All @@ -1127,6 +1128,7 @@ public List<ConnectionStatusRead> getConnectionStatuses(

final ConnectionStatusRead connectionStatus = new ConnectionStatusRead()
.connectionId(connectionId)
.activeJob(activeJob.map(JobConverter::getJobRead).orElse(null))
.lastSuccessfulSync(lastSuccessTimestamp.orElse(null))
.scheduleData(connectionRead.getScheduleData());
if (lastSucceededOrFailedJob.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.airbyte.commons.server.handlers
import io.airbyte.api.model.generated.ConnectionStream
import io.airbyte.api.model.generated.DestinationIdRequestBody
import io.airbyte.api.model.generated.RefreshMode
import io.airbyte.commons.server.converters.JobConverter
import io.airbyte.commons.server.handlers.helpers.ConnectionTimelineEventHelper
import io.airbyte.commons.server.scheduler.EventRunner
import io.airbyte.config.JobConfig.ConfigType
Expand Down Expand Up @@ -36,7 +37,7 @@ class StreamRefreshesHandler(
connectionId: UUID,
refreshMode: RefreshMode,
streams: List<ConnectionStream>,
): Boolean {
): io.airbyte.api.model.generated.JobRead? {
val destinationId = connectionService.getStandardSync(connectionId).destinationId
val destinationDefinitionVersion =
actorDefinitionVersionHandler.getActorDefinitionVersionForDestinationId(
Expand All @@ -45,7 +46,7 @@ class StreamRefreshesHandler(
val shouldRunRefresh = destinationDefinitionVersion.supportsRefreshes

if (!shouldRunRefresh) {
return false
return null
}

val streamDescriptors: List<StreamDescriptor> =
Expand Down Expand Up @@ -76,7 +77,7 @@ class StreamRefreshesHandler(
connectionTimelineEventService.writeEvent(connectionId, refreshStartedEvent, userId)
}

return true
return if (job == null) null else JobConverter.getJobRead(job)
}

fun getRefreshesForConnection(connectionId: UUID): List<StreamRefresh> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2769,6 +2769,7 @@ void testConnectionStatus() throws IOException, JsonValidationException, ConfigN
final ConnectionStatusRead connectionStatus = status.get(0);
assertEquals(connectionId, connectionStatus.getConnectionId());
assertEquals(802L, connectionStatus.getLastSuccessfulSync());
assertEquals(0L, connectionStatus.getActiveJob().getId());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,30 @@ import io.airbyte.api.model.generated.RefreshMode
import io.airbyte.commons.server.handlers.StreamRefreshesHandler.Companion.connectionStreamsToStreamDescriptors
import io.airbyte.commons.server.handlers.helpers.ConnectionTimelineEventHelper
import io.airbyte.commons.server.scheduler.EventRunner
import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult
import io.airbyte.config.Job
import io.airbyte.config.JobConfig
import io.airbyte.config.RefreshConfig
import io.airbyte.config.StandardSync
import io.airbyte.config.StreamDescriptor
import io.airbyte.config.persistence.StreamRefreshesRepository
import io.airbyte.config.persistence.domain.StreamRefresh
import io.airbyte.data.repositories.entities.ConnectionTimelineEvent
import io.airbyte.data.services.ConnectionService
import io.airbyte.data.services.ConnectionTimelineEventService
import io.airbyte.data.services.shared.ConnectionEvent
import io.airbyte.persistence.job.JobPersistence
import io.mockk.clearAllMocks
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import io.mockk.verifyOrder
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.util.Optional
import java.util.UUID

internal class StreamRefreshesHandlerTest {
Expand Down Expand Up @@ -72,7 +77,7 @@ internal class StreamRefreshesHandlerTest {

val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, listOf())

assertFalse(result)
assertNull(result)

verify(exactly = 0) { streamRefreshesRepository.saveAll(any<List<StreamRefresh>>()) }
verify(exactly = 0) { eventRunner.startNewManualSync(connectionId) }
Expand All @@ -83,15 +88,23 @@ internal class StreamRefreshesHandlerTest {
mockSupportRefresh(true)

every { streamRefreshesRepository.saveAll(any<List<StreamRefresh>>()) } returns listOf()
every { eventRunner.startNewManualSync(connectionId) } returns null
every { eventRunner.startNewManualSync(connectionId) } returns
ManualOperationResult.builder().jobId(Optional.of(0L)).build()
every { connectionTimelineEventHelper.currentUserIdIfExist } returns UUID.randomUUID()
every { connectionTimelineEventService.writeEvent(any(), any(), any()) } returns
ConnectionTimelineEvent(
connectionId = UUID.randomUUID(),
eventType = ConnectionEvent.Type.REFRESH_STARTED.toString(),
)
every { jobPersistence.getJob(any()) } returns
Job(
0L, JobConfig.ConfigType.REFRESH, "scope_id",
null, listOf(), io.airbyte.config.JobStatus.SUCCEEDED, 0L, 0L, 0L,
JobConfig().withRefresh(RefreshConfig()),
listOf(), io.airbyte.config.JobStatus.SUCCEEDED, 0L, 0L, 0L,
)
val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, connectionStream)

assertTrue(result)
assertNotNull(result)

verifyOrder {
streamRefreshesRepository.saveAll(any<List<StreamRefresh>>())
Expand All @@ -104,16 +117,24 @@ internal class StreamRefreshesHandlerTest {
mockSupportRefresh(true)

every { streamRefreshesRepository.saveAll(any<List<StreamRefresh>>()) } returns listOf()
every { eventRunner.startNewManualSync(connectionId) } returns null
every { eventRunner.startNewManualSync(connectionId) } returns
ManualOperationResult.builder().jobId(Optional.of(0L)).build()
every { connectionTimelineEventHelper.currentUserIdIfExist } returns UUID.randomUUID()
every { connectionTimelineEventService.writeEvent(any(), any(), any()) } returns
ConnectionTimelineEvent(
connectionId = UUID.randomUUID(),
eventType = ConnectionEvent.Type.REFRESH_STARTED.toString(),
)
every { connectionService.getAllStreamsForConnection(connectionId) } returns streamDescriptors
every { jobPersistence.getJob(any()) } returns
Job(
0L, JobConfig.ConfigType.REFRESH, "scope_id",
null, listOf(), io.airbyte.config.JobStatus.SUCCEEDED, 0L, 0L, 0L,
JobConfig().withRefresh(RefreshConfig()),
listOf(), io.airbyte.config.JobStatus.SUCCEEDED, 0L, 0L, 0L,
)
val result = streamRefreshesHandler.createRefreshesForConnection(connectionId, RefreshMode.TRUNCATE, listOf())

assertTrue(result)
assertNotNull(result)

verifyOrder {
streamRefreshesRepository.saveAll(any<List<StreamRefresh>>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import io.airbyte.api.generated.ConnectionApi;
import io.airbyte.api.model.generated.ActorDefinitionRequestBody;
import io.airbyte.api.model.generated.BooleanRead;
import io.airbyte.api.model.generated.ConnectionAndJobIdRequestBody;
import io.airbyte.api.model.generated.ConnectionAutoPropagateResult;
import io.airbyte.api.model.generated.ConnectionAutoPropagateSchemaChange;
Expand Down Expand Up @@ -42,6 +41,8 @@
import io.airbyte.api.model.generated.GetTaskQueueNameRequest;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobReadResponse;
import io.airbyte.api.model.generated.JobSyncResultRead;
import io.airbyte.api.model.generated.ListConnectionsForWorkspacesRequestBody;
import io.airbyte.api.model.generated.PostprocessDiscoveredCatalogRequestBody;
Expand Down Expand Up @@ -121,7 +122,7 @@ public InternalOperationResult autoDisableConnection(@Body final ConnectionIdReq
@Post(uri = "/backfill_events")
@Secured({ADMIN})
@ExecuteOn(AirbyteTaskExecutors.IO)
public void backfillConnectionEvents(ConnectionEventsBackfillRequestBody connectionEventsBackfillRequestBody) {
public void backfillConnectionEvents(final ConnectionEventsBackfillRequestBody connectionEventsBackfillRequestBody) {
ApiHelper.execute(() -> {
connectionsHandler.backfillConnectionEvents(connectionEventsBackfillRequestBody);
return null;
Expand Down Expand Up @@ -175,11 +176,12 @@ public ConnectionReadList listConnectionsForWorkspacesPaginated(
@ExecuteOn(AirbyteTaskExecutors.SCHEDULER)
@RequiresIntent(Intent.RunAndCancelConnectionSyncAndRefresh)
@Override
public BooleanRead refreshConnectionStream(@Body final ConnectionStreamRefreshRequestBody connectionStreamRefreshRequestBody) {
return ApiHelper.execute(() -> new BooleanRead().value(streamRefreshesHandler.createRefreshesForConnection(
public JobReadResponse refreshConnectionStream(@Body final ConnectionStreamRefreshRequestBody connectionStreamRefreshRequestBody) {
final JobRead job = ApiHelper.execute(() -> streamRefreshesHandler.createRefreshesForConnection(
connectionStreamRefreshRequestBody.getConnectionId(),
connectionStreamRefreshRequestBody.getRefreshMode(),
connectionStreamRefreshRequestBody.getStreams() != null ? connectionStreamRefreshRequestBody.getStreams() : new ArrayList<>())));
connectionStreamRefreshRequestBody.getStreams() != null ? connectionStreamRefreshRequestBody.getStreams() : new ArrayList<>()));
return new JobReadResponse().job(job);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ export const ConnectionActionsBlock: React.FC = () => {
const onDelete = () => deleteConnection(connection);

const onReset = useCallback(async () => {
// empty streams array will clear _all_ streams
await clearStreams([]);
await clearStreams();
registerNotification({
id: "clearData.successfulStart",
text: formatMessage({
Expand Down
Loading

0 comments on commit d64b416

Please sign in to comment.