Skip to content

Commit

Permalink
feat(core): Support returning TaskResult from Task.onCancel
Browse files Browse the repository at this point in the history
(cherry picked from commit 236729f6f34c8b263fc2f22558d578f90b17cc9c)
  • Loading branch information
robzienert authored and ajordens committed May 14, 2021
1 parent fa27889 commit bb7f97b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.netflix.spinnaker.kork.annotations.Beta;
import com.netflix.spinnaker.kork.plugins.api.internal.SpinnakerExtensionPoint;
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus;
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand Down Expand Up @@ -58,10 +59,29 @@ public interface Task extends SpinnakerExtensionPoint {
* <p>This method should be used if you need to perform cleanup in response to the task being
* cancelled before it was able to complete.
*
* @deprecated Use onCancelWithResult instead
* @param stage The running state execution state
*/
@Deprecated
default void onCancel(@Nonnull StageExecution stage) {}

/**
* Behavior to be called on Task cancellation.
*
* <p>This method should be used if you need to perform cleanup in response to the task being
* cancelled before it was able to complete.
*
* <p>When returning a {@link TaskResult}, the {@link ExecutionStatus} will be ignored, as the
* resulting status will always be {@link ExecutionStatus#CANCELED}.
*
* @param stage The running state execution state
*/
@Nullable
default TaskResult onCancelWithResult(@Nonnull StageExecution stage) {
onCancel(stage);
return null;
}

/** A collection of known aliases. */
default Collection<String> aliases() {
if (getClass().isAnnotationPresent(Aliases.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ class RunTaskHandler(
taskExecutionInterceptors.forEach { t -> stage = t.beforeTaskExecution(task, stage) }

if (execution.isCanceled) {
task.onCancel(stage)
task.onCancelWithResult(stage)?.run {
stage.processTaskOutput(this)
}
queue.push(CompleteTask(message, CANCELED))
} else if (execution.status.isComplete) {
queue.push(CompleteTask(message, CANCELED))
Expand Down Expand Up @@ -150,30 +152,33 @@ class RunTaskHandler(
}

taskResult!!.let { result: TaskResult ->
// TODO: rather send this data with CompleteTask message
stage.processTaskOutput(result)
when (result.status) {
RUNNING -> {
stage.processTaskOutput(result)
queue.push(message, task.backoffPeriod(taskModel, stage))
trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status)
}
SUCCEEDED, REDIRECT, SKIPPED, FAILED_CONTINUE, STOPPED -> {
stage.processTaskOutput(result)
queue.push(CompleteTask(message, result.status))
trackResult(stage, thisInvocationStartTimeMs, taskModel, result.status)
}
CANCELED -> {
task.onCancel(stage)
stage.processTaskOutput(result.mergeOutputs(task.onCancelWithResult(stage)))
val status = stage.failureStatus(default = result.status)
queue.push(CompleteTask(message, status, result.status))
trackResult(stage, thisInvocationStartTimeMs, taskModel, status)
}
TERMINAL -> {
stage.processTaskOutput(result)
val status = stage.failureStatus(default = result.status)
queue.push(CompleteTask(message, status, result.status))
trackResult(stage, thisInvocationStartTimeMs, taskModel, status)
}
else ->
else -> {
stage.processTaskOutput(result)
TODO("Unhandled task status ${result.status}")
}
}
}
}
Expand Down Expand Up @@ -427,4 +432,23 @@ class RunTaskHandler(
MDC.remove("taskStartTime")
}
}

private fun TaskResult.mergeOutputs(taskResult: TaskResult?): TaskResult {
if (taskResult == null) {
return this
}

return TaskResult.builder(this.status)
.outputs(
this.outputs.toMutableMap().also {
it.putAll(taskResult.outputs)
}
)
.context(
this.context.toMutableMap().also {
it.putAll(taskResult.context)
}
)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}

it("it tries to cancel the task") {
verify(task).onCancel(any())
verify(task).onCancelWithResult(any())
}
}

Expand Down

0 comments on commit bb7f97b

Please sign in to comment.