Skip to content

Commit

Permalink
Return errors from task executors when processing actions (temporalio…
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner authored Aug 7, 2023
1 parent df262d6 commit bd909c9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
34 changes: 25 additions & 9 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,27 @@ func (t *transferQueueActiveTaskExecutor) processActivityTask(
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
if mutableState == nil {
release(nil) // release(nil) so that the mutable state is not unloaded from cache
return consts.ErrWorkflowExecutionNotFound
}

ai, ok := mutableState.GetActivityInfo(task.ScheduledEventID)
if !ok {
return nil
release(nil) // release(nil) so that the mutable state is not unloaded from cache
return consts.ErrActivityTaskNotFound
}

err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), ai.Version, task.Version, task)
if err != nil {
return err
}

if !mutableState.IsWorkflowExecutionRunning() {
release(nil) // release(nil) so that the mutable state is not unloaded from cache
return consts.ErrWorkflowCompleted
}

timeout := timestamp.DurationValue(ai.ScheduleToStartTimeout)
directive := worker_versioning.MakeDirectiveForActivityTask(mutableState.GetWorkerVersionStamp(), ai.UseCompatibleVersion)

Expand Down Expand Up @@ -567,8 +575,9 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution(
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
if mutableState == nil {
release(nil) // release(nil) so that the mutable state is not unloaded from cache
return consts.ErrWorkflowExecutionNotFound
}

signalInfo, ok := mutableState.GetSignalInfo(task.InitiatedEventID)
Expand All @@ -583,6 +592,11 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution(
return err
}

if !mutableState.IsWorkflowExecutionRunning() {
release(nil) // release(nil) so that the mutable state is not unloaded from cache
return consts.ErrWorkflowCompleted
}

initiatedEvent, err := mutableState.GetSignalExternalInitiatedEvent(ctx, task.InitiatedEventID)
if err != nil {
return err
Expand Down Expand Up @@ -715,12 +729,14 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
return err
}
if mutableState == nil {
return nil
release(nil) // release(nil) so that the mutable state is not unloaded from cache
return consts.ErrWorkflowExecutionNotFound
}

childInfo, ok := mutableState.GetChildExecutionInfo(task.InitiatedEventID)
if !ok {
return nil
release(nil) // release(nil) so that the mutable state is not unloaded from cache
return consts.ErrChildExecutionNotFound
}
err = CheckTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), childInfo.Version, task.Version, task)
if err != nil {
Expand All @@ -742,12 +758,12 @@ func (t *transferQueueActiveTaskExecutor) processStartChildExecution(
// 1. Once workflow is closed, we can't update mutable state or record child started event.
// If the RPC call for scheduling first workflow task times out but the call actually succeeds on child workflow.
// Then the child workflow can run, complete and another unrelated workflow can reuse this workflowID.
// Now when the start child task retries, we can't rely on requestID to dedup the start child call. (We can use runID instead of requestID to dedup)
// Now when the start child task retries, we can't rely on requestID to dedupe the start child call. (We can use runID instead of requestID to dedupe)
// 2. No update to mutable state and child started event means we are not able to replicate the information
// to the standby cluster, so standby start child logic won't be able to verify the child has started.
// To resolve the issue above, we need to
// 1. Start child workflow and schedule the first workflow task in one transaction. Use runID to perform deduplication
// 2. Standby start child logic need to verify if child worflow actually started instead of relying on the information
// 2. Standby start child logic need to verify if child workflow actually started instead of relying on the information
// in parent mutable state.
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/transfer_queue_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessActivityTask_Duplicati
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
s.ErrorIs(err, consts.ErrActivityTaskNotFound)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessWorkflowTask_FirstWorkflowTask() {
Expand Down Expand Up @@ -2358,7 +2358,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Du
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)

_, _, err = s.transferQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask))
s.Nil(err)
s.Error(err, consts.ErrChildExecutionNotFound)
}

func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_ChildStarted_ParentClosed() {
Expand Down
4 changes: 3 additions & 1 deletion service/history/transfer_queue_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ const (
taskHistoryOpTimeout = 20 * time.Second
)

var errUnknownTransferTask = serviceerror.NewInternal("Unknown transfer task")
var (
errUnknownTransferTask = serviceerror.NewInternal("Unknown transfer task")
)

type (
transferQueueTaskExecutorBase struct {
Expand Down

0 comments on commit bd909c9

Please sign in to comment.