Skip to content

Commit

Permalink
feat(webhooks): add support for cancellation to webhooks (spinnaker#3069
Browse files Browse the repository at this point in the history
)

* feat(webhooks): add support for cancellation to webhooks

for webhooks that have `waitForCompletion=true` (i.e. monitored webhook) it is
desireable to have an indication of cancellation (either user cancels execution
or the execution is terminated due to some failure)
  • Loading branch information
marchello2000 authored Jul 30, 2019
1 parent 20d5aa3 commit ac511e2
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ import org.springframework.stereotype.Component
@Component
class PreconfiguredWebhookStage extends WebhookStage {

@Autowired
private WebhookService webhookService
boolean fiatEnabled
FiatService fiatService

@Value('${services.fiat.enabled:false}')
boolean fiatEnabled;
@Autowired
PreconfiguredWebhookStage(
WebhookService webhookService,
@Value('${services.fiat.enabled:false}') boolean fiatEnabled,
FiatService fiatService) {
super(webhookService)

@Autowired(required = false)
FiatService fiatService
this.fiatEnabled = fiatEnabled
this.fiatService = fiatService
}

def fields = PreconfiguredWebhook.declaredFields.findAll {
!it.synthetic && !['props', 'enabled', 'label', 'description', 'type', 'parameters'].contains(it.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,36 @@

package com.netflix.spinnaker.orca.webhook.pipeline

import com.fasterxml.jackson.annotation.JsonFormat
import com.netflix.spinnaker.orca.CancellableStage
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder
import com.netflix.spinnaker.orca.pipeline.TaskNode
import com.netflix.spinnaker.orca.pipeline.graph.StageGraphBuilder
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask
import com.netflix.spinnaker.orca.webhook.config.WebhookProperties
import com.netflix.spinnaker.orca.webhook.service.WebhookService
import com.netflix.spinnaker.orca.webhook.tasks.CreateWebhookTask
import com.netflix.spinnaker.orca.webhook.tasks.MonitorWebhookTask
import com.netflix.spinnaker.orca.pipeline.tasks.artifacts.BindProducedArtifactsTask
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpMethod
import org.springframework.stereotype.Component

import javax.annotation.Nonnull

@Slf4j
@Component
class WebhookStage implements StageDefinitionBuilder, CancellableStage {
class WebhookStage implements StageDefinitionBuilder {

@Autowired
WebhookService webhookService

@Autowired
WebhookStage(WebhookService webhookService) {
this.webhookService = webhookService
}

@Override
void taskGraph(Stage stage, TaskNode.Builder builder) {
Expand All @@ -51,13 +67,57 @@ class WebhookStage implements StageDefinitionBuilder, CancellableStage {
}

@Override
CancellableStage.Result cancel(Stage stage) {
log.info("Cancelling stage (stageId: ${stage.id}, executionId: ${stage.execution.id}, context: ${stage.context as Map})")
return new CancellableStage.Result(stage, [:])
void onFailureStages(@Nonnull Stage stage, @Nonnull StageGraphBuilder graph) {
new MonitorWebhookTask(webhookService).onCancel(stage)
}

static class StageData {
boolean waitForCompletion
int waitBeforeMonitor
// Inputs for webhook
public String url
public Object payload
public Object customHeaders
public List<Integer> failFastStatusCodes
public Boolean waitForCompletion
public WebhookProperties.StatusUrlResolution statusUrlResolution
public String statusUrlJsonPath

@JsonFormat(with = [JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES])
public HttpMethod method = HttpMethod.POST

// Inputs for monitor
public String statusEndpoint
public String statusJsonPath
public String progressJsonPath
public String cancelEndpoint
@JsonFormat(with = [JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES])
public HttpMethod cancelMethod = HttpMethod.POST
public Object cancelPayload
public String successStatuses
public String canceledStatuses
public String terminalStatuses
public List<Integer> retryStatusCodes

public int waitBeforeMonitor

// Outputs
WebhookResponseStageData webhook
}

static class WebhookResponseStageData {
String statusEndpoint
Integer statusCodeValue
String statusCode
Map body
WebhookMonitorResponseStageData monitor
String error
}

static class WebhookMonitorResponseStageData {
Integer statusCodeValue
String statusCode
Map body
String error
String progressMessage
Number percentComplete
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.netflix.spinnaker.orca.RetryableTask
import com.netflix.spinnaker.orca.TaskResult
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.webhook.config.WebhookProperties
import com.netflix.spinnaker.orca.webhook.pipeline.WebhookStage
import com.netflix.spinnaker.orca.webhook.service.WebhookService
import groovy.util.logging.Slf4j
import org.apache.http.HttpHeaders
Expand All @@ -49,11 +50,7 @@ class CreateWebhookTask implements RetryableTask {
@Override
TaskResult execute(Stage stage) {
Map<String, ?> outputs = [webhook: [:]]
// TODO: The below parameter is deprecated and should be removed after some time
Map<String, ?> outputsDeprecated = [deprecationWarning: "All webhook information will be moved beneath the key 'webhook', " +
"and the keys 'statusCode', 'buildInfo', 'statusEndpoint' and 'error' will be removed. Please migrate today."]

StageData stageData = stage.mapTo(StageData)
WebhookStage.StageData stageData = stage.mapTo(WebhookStage.StageData)

def response
try {
Expand Down Expand Up @@ -118,10 +115,8 @@ class CreateWebhookTask implements RetryableTask {
def statusCode = response.statusCode

outputs.webhook << [statusCode: statusCode, statusCodeValue: statusCode.value()]
outputsDeprecated << [statusCode: statusCode]
if (response.body) {
outputs.webhook << [body: response.body]
outputsDeprecated << [buildInfo: response.body]
}

if (statusCode.is2xxSuccessful() || statusCode.is3xxRedirection()) {
Expand Down Expand Up @@ -152,34 +147,21 @@ class CreateWebhookTask implements RetryableTask {
}
stage.context.statusEndpoint = statusUrl
outputs.webhook << [statusEndpoint: statusUrl]
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context(outputsDeprecated + outputs).build()
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context(outputs).build()
}
if (stage.context.containsKey("expectedArtifacts") && !((List) stage.context.get("expectedArtifacts")).isEmpty()) {
try {
def artifacts = JsonPath.parse(response.body).read("artifacts")
outputs << [artifacts: artifacts]
} catch (Exception e) {
outputs.webhook << [error: "Expected artifacts in webhook response none were found"]
outputs.webhook << [error: "Expected artifacts in webhook response couldn't be parsed " + e.toString()]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}
}
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context(outputsDeprecated + outputs).build()
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context(outputs).build()
} else {
outputs.webhook << [error: "The webhook request failed"]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputsDeprecated + outputs).build()
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}
}

private static class StageData {
public String url
public Object payload
public Object customHeaders
public List<Integer> failFastStatusCodes
public Boolean waitForCompletion
public WebhookProperties.StatusUrlResolution statusUrlResolution
public String statusUrlJsonPath

@JsonFormat(with = [JsonFormat.Feature.ACCEPT_CASE_INSENSITIVE_PROPERTIES])
public HttpMethod method = HttpMethod.POST
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.OverridableTimeoutRetryableTask
import com.netflix.spinnaker.orca.TaskResult
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.webhook.pipeline.WebhookStage
import com.netflix.spinnaker.orca.webhook.service.WebhookService
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import org.springframework.web.client.HttpStatusCodeException

import javax.annotation.Nonnull
import java.time.Duration
import java.util.concurrent.TimeUnit

Expand All @@ -40,6 +42,7 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
long backoffPeriod = TimeUnit.SECONDS.toMillis(1)
long timeout = TimeUnit.HOURS.toMillis(1)
private static final String JSON_PATH_NOT_FOUND_ERR_FMT = "Unable to parse %s: JSON property '%s' not found in response body"
WebhookService webhookService

@Override
long getDynamicBackoffPeriod(Stage stage, Duration taskDuration) {
Expand All @@ -52,17 +55,24 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
}

@Autowired
WebhookService webhookService
MonitorWebhookTask(WebhookService webhookService) {
this.webhookService = webhookService
}

@Override
TaskResult execute(Stage stage) {
StageData stageData = stage.mapTo(StageData)
WebhookStage.StageData stageData = stage.mapTo(WebhookStage.StageData)

if (Strings.isNullOrEmpty(stageData.statusEndpoint) || Strings.isNullOrEmpty(stageData.statusJsonPath)) {
throw new IllegalStateException(
"Missing required parameter(s): statusEndpoint = ${stageData.statusEndpoint}, statusJsonPath = ${stageData.statusJsonPath}")
}

// Preserve the responses we got from createWebhookTask, but reset the monitor subkey as we will overwrite it new data
def originalResponse = stage.context.getOrDefault("webhook", [:])
originalResponse["monitor"] = [:]
originalResponse = [webhook: originalResponse]

def response
try {
response = webhookService.getStatus(stageData.statusEndpoint, stageData.customHeaders)
Expand All @@ -81,7 +91,7 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {

String errorMessage = "an exception occurred in webhook monitor to ${stageData.statusEndpoint}: ${e}"
log.error(errorMessage, e)
Map<String, ?> outputs = [webhook: [monitor: [:]]]
Map<String, ?> outputs = originalResponse
outputs.webhook.monitor << [error: errorMessage]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
} catch (HttpStatusCodeException e) {
Expand All @@ -93,30 +103,25 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
((stageData.retryStatusCodes != null) && (stageData.retryStatusCodes.contains(statusValue)))

if (shouldRetry) {
log.warn("Failed to get webhook status from ${stageData.statusEndpoint} with statusCode=${statusCode.value()}, will retry", e)
log.warn("Failed to get webhook status from ${stageData.statusEndpoint} with statusCode=${statusValue}, will retry", e)
return TaskResult.ofStatus(ExecutionStatus.RUNNING)
}

String errorMessage = "an exception occurred in webhook monitor to ${stageData.statusEndpoint}: ${e}"
log.error(errorMessage, e)
Map<String, ?> outputs = [webhook: [monitor: [:]]]
Map<String, ?> outputs = originalResponse
outputs.webhook.monitor << [error: errorMessage]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}

def result
def responsePayload = [
webhook: [
monitor: [
def responsePayload = originalResponse
responsePayload.webhook.monitor = [
body: response.body,
statusCode: response.statusCode,
statusCodeValue: response.statusCode.value()
]
],
buildInfo: response.body, // TODO: deprecated
deprecationWarning: "All webhook information will be moved beneath the key 'webhook', " +
"and the keys 'statusCode', 'buildInfo', 'statusEndpoint' and 'error' will be removed. Please migrate today."
]

try {
result = JsonPath.read(response.body, stageData.statusJsonPath)
} catch (PathNotFoundException e) {
Expand All @@ -141,7 +146,6 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build()
}
if (progress) {
responsePayload << [progressMessage: progress] // TODO: deprecated
responsePayload.webhook.monitor << [progressMessage: progress]
}
}
Expand All @@ -150,14 +154,39 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {

if (result instanceof Number) {
def status = result == 100 ? ExecutionStatus.SUCCEEDED : ExecutionStatus.RUNNING
responsePayload << [percentComplete: result] // TODO: deprecated
responsePayload.webhook.monitor << [percentComplete: result]
return TaskResult.builder(status).context(responsePayload).build()
} else if (statusMap.containsKey(result.toString().toUpperCase())) {
return TaskResult.builder(statusMap[result.toString().toUpperCase()]).context(responsePayload).build()
}

return TaskResult.builder(ExecutionStatus.RUNNING).context(response ? responsePayload : [:]).build()
stage.context
return TaskResult.builder(ExecutionStatus.RUNNING).context(response ? responsePayload : originalResponse).build()
}

@Override void onCancel(@Nonnull Stage stage) {
WebhookStage.StageData stageData = stage.mapTo(WebhookStage.StageData)

// Only do cancellation if we made the initial webhook request and the user specified a cancellation endpoint
if (Strings.isNullOrEmpty(stageData.webhook.statusCode) || Strings.isNullOrEmpty(stageData.cancelEndpoint)) {
return
}

try {
log.info("Sending best effort webhook cancellation to ${stageData.cancelEndpoint}")
def response = webhookService.exchange(stageData.cancelMethod, stageData.cancelEndpoint, stageData.cancelPayload, stageData.customHeaders)
log.debug(
"Received status code {} from cancel endpoint {} in execution {} in stage {}",
response.statusCode,
stageData.cancelEndpoint,
stage.execution.id,
stage.id
)
} catch (HttpStatusCodeException e) {
log.warn("Failed to cancel webhook ${stageData.cancelEndpoint} with statusCode=${e.getStatusCode().value()}", e)
} catch (Exception e) {
log.warn("Failed to cancel webhook ${stageData.cancelEndpoint}", e)
}
}

private static Map<String, ExecutionStatus> createStatusMap(String successStatuses, String canceledStatuses, String terminalStatuses) {
Expand All @@ -175,15 +204,4 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
private static Map<String, ExecutionStatus> mapStatuses(String statuses, ExecutionStatus status) {
statuses.split(",").collectEntries { [(it.trim().toUpperCase()): status] }
}

private static class StageData {
public String statusEndpoint
public String statusJsonPath
public String progressJsonPath
public String successStatuses
public String canceledStatuses
public String terminalStatuses
public Object customHeaders
public List<Integer> retryStatusCodes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class PreconfiguredWebhookStageSpec extends Specification {
def builder = new TaskNode.Builder()

@Subject
preconfiguredWebhookStage = new PreconfiguredWebhookStage(webhookService: webhookService)
preconfiguredWebhookStage = new PreconfiguredWebhookStage(webhookService, false, null)

def "Context should be taken from PreconfiguredWebhookProperties"() {
given:
Expand Down
Loading

0 comments on commit ac511e2

Please sign in to comment.