Skip to content

Commit

Permalink
Add details to error logs. (#8697)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Oct 2, 2023
1 parent 047d30d commit e293681
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ protected NamespaceInfo getNamespaceInfo(final WorkflowServiceStubs temporalServ
*/
public <T> T withBackgroundHeartbeat(final AtomicReference<Runnable> afterCancellationCallbackRef,
final Callable<T> callable,
final Supplier<ActivityExecutionContext> activityContext) {
final ActivityExecutionContext activityContext) {
final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

try {
// Schedule the cancellation handler.
scheduledExecutor.scheduleAtFixedRate(() -> {
final CancellationHandler cancellationHandler = new CancellationHandler.TemporalCancellationHandler(activityContext.get());
final CancellationHandler cancellationHandler = new CancellationHandler.TemporalCancellationHandler(activityContext);

cancellationHandler.checkAndHandleCancellation(() -> {
// After cancellation cleanup.
Expand Down Expand Up @@ -331,7 +331,9 @@ public <T> T withBackgroundHeartbeat(final AtomicReference<Runnable> afterCancel
} else {
// Heartbeat thread failed to stop, we may leak a thread if this happens.
// We should not fail the execution because of this.
log.info("Temporal heartbeating didn't stop within {} seconds, continuing the shutdown.", HEARTBEAT_SHUTDOWN_GRACE_PERIOD.toSeconds());
log.info("Temporal heartbeating didn't stop within {} seconds, continuing the shutdown. (WorkflowId={}, ActivityId={}, RunId={})",
HEARTBEAT_SHUTDOWN_GRACE_PERIOD.toSeconds(), activityContext.getInfo().getWorkflowId(),
activityContext.getInfo().getActivityId(), activityContext.getInfo().getRunId());
}
} catch (InterruptedException e) {
// We got interrupted while attempting to shutdown the executor. Not much more we can do.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.temporal.stubs.TestWorkflow.TestActivityImplTest;
import io.airbyte.commons.temporal.stubs.TestWorkflow.TestWorkflowImpl;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityCancellationType;
import io.temporal.activity.ActivityExecutionContext;
Expand Down Expand Up @@ -184,93 +179,6 @@ void testWorkerExceptionOnHeartbeatWrapper() {
assertEquals(0, timesReachedEnd.get());
}

@Test
void testHeartbeatWithContext() throws InterruptedException {
final TemporalUtils temporalUtils = new TemporalUtils(null, null, null, null, null, null, null);
final TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance();

final Worker worker = testEnv.newWorker(TASK_QUEUE);

worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
final WorkflowClient client = testEnv.getWorkflowClient();

final CountDownLatch latch = new CountDownLatch(2);
final Runnable cancellationCallback = mock(Runnable.class);

worker.registerActivitiesImplementations(new TestActivityImplTest(() -> {
final ActivityExecutionContext context = Activity.getExecutionContext();
temporalUtils.withBackgroundHeartbeat(
new AtomicReference<>(cancellationCallback),
// TODO (itaseski) figure out how to decrease heartbeat intervals using reflection
() -> {
latch.await();
return new Object();
},
() -> {
latch.countDown();
return context;
});
}));

testEnv.start();

final io.airbyte.commons.temporal.stubs.TestWorkflow testWorkflow = client.newWorkflowStub(
io.airbyte.commons.temporal.stubs.TestWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(TASK_QUEUE)
.build());

// use async execution to avoid blocking the test thread
WorkflowClient.start(testWorkflow::execute);

assertTrue(latch.await(25, TimeUnit.SECONDS));
// The activity is expected to succeed, we should never call the cancellation callback.
verify(cancellationCallback, never()).run();

}

@Test
void testHeartbeatWithContextAndCallbackRef() throws InterruptedException {
final TemporalUtils temporalUtils = new TemporalUtils(null, null, null, null, null, null, null);
final TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance();

final Worker worker = testEnv.newWorker(TASK_QUEUE);

worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
final WorkflowClient client = testEnv.getWorkflowClient();

final CountDownLatch latch = new CountDownLatch(2);

worker.registerActivitiesImplementations(new TestActivityImplTest(() -> {
final ActivityExecutionContext context = Activity.getExecutionContext();
temporalUtils.withBackgroundHeartbeat(
// TODO (itaseski) figure out how to decrease heartbeat intervals using reflection
new AtomicReference<>(() -> {}),
() -> {
latch.await();
return new Object();
},
() -> {
latch.countDown();
return context;
});
}));

testEnv.start();

final io.airbyte.commons.temporal.stubs.TestWorkflow testWorkflow = client.newWorkflowStub(
io.airbyte.commons.temporal.stubs.TestWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(TASK_QUEUE)
.build());

// use async execution to avoid blocking the test thread
WorkflowClient.start(testWorkflow::execute);

assertTrue(latch.await(25, TimeUnit.SECONDS));

}

@WorkflowInterface
public interface TestWorkflow {

Expand Down Expand Up @@ -411,7 +319,7 @@ public void activity(final String arg) {
return null;
}
},
() -> context);
context);
timesReachedEnd.incrementAndGet();
LOGGER.info(BEFORE, ACTIVITY1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public ConnectorJobOutput runWithJobOutput(final CheckConnectionInput args) {
() -> context);
return temporalAttemptExecution.get();
},
() -> context);
context);
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
() -> context);
return temporalAttemptExecution.get();
},
() -> context);
context);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final Integration

return temporalAttemptExecution.get();
},
() -> context);
context);
}

private CheckedSupplier<Worker<JobGetSpecConfig, ConnectorJobOutput>, Exception> getWorkerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public Void run(final JobRunConfig jobRunConfig,

return temporalAttemptExecution.get();
},
() -> context);
context);
}

private CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> getLegacyWorkerFactory(final IntegrationLauncherConfig destinationLauncherConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,

return temporalAttemptExecution.get();
},
() -> context);
context);
}

private NormalizationInput hydrateNormalizationInput(NormalizationInput input) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public StandardSyncOutput replicateV2(final ReplicationActivityInput replication

return standardSyncOutput;
},
() -> context);
context);
}

// Marking task queue as nullable because we changed activity signature; thus runs started before
Expand Down Expand Up @@ -265,7 +265,7 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,

return standardSyncOutput;
},
() -> context));
context));
}

private StandardSyncInput getHydratedSyncInput(final StandardSyncInput syncInput) {
Expand Down

0 comments on commit e293681

Please sign in to comment.