Skip to content

Commit

Permalink
fix(webhook): Move all the webhook response processing out of task ex… (
Browse files Browse the repository at this point in the history
spinnaker#3901)

* fix(webhook): Move all the webhook response processing out of task execute method

* Update orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java

Co-authored-by: Rob Zienert <[email protected]>

* Update orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java

Co-authored-by: Rob Zienert <[email protected]>

* Update orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java

Co-authored-by: Rob Zienert <[email protected]>

* Update orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java

Co-authored-by: Rob Zienert <[email protected]>

* Update orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java

Co-authored-by: Rob Zienert <[email protected]>

* Update orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/WebhookResponseProcessor.java

Co-authored-by: Rob Zienert <[email protected]>

* fix(misc): fix tests asserting error messages

Co-authored-by: Rob Zienert <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 11, 2020
1 parent d1c1d0d commit 206da4d
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,198 +17,48 @@

package com.netflix.spinnaker.orca.webhook.tasks

import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.jayway.jsonpath.JsonPath
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus
import com.netflix.spinnaker.orca.api.pipeline.RetryableTask
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import com.netflix.spinnaker.orca.api.pipeline.TaskResult
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import com.netflix.spinnaker.orca.webhook.config.WebhookProperties
import com.netflix.spinnaker.orca.webhook.pipeline.WebhookStage
import com.netflix.spinnaker.orca.webhook.service.WebhookService
import groovy.util.logging.Slf4j
import org.apache.http.HttpHeaders
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.http.HttpMethod
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Component
import org.springframework.web.client.HttpStatusCodeException

import java.util.regex.Matcher
import java.util.regex.Pattern

@Slf4j
@Component
class CreateWebhookTask implements RetryableTask {

private static final Pattern URL_SCHEME = Pattern.compile("(.*)://(.*)")

long backoffPeriod = 10000
long timeout = 300000

WebhookService webhookService
WebhookProperties webhookProperties
ObjectMapper objectMapper

@Autowired
CreateWebhookTask(WebhookService webhookService, WebhookProperties webhookProperties) {
CreateWebhookTask(WebhookService webhookService, WebhookProperties webhookProperties, ObjectMapper objectMapper) {
this.webhookService = webhookService
this.webhookProperties = webhookProperties
this.objectMapper = objectMapper
}

@Override
TaskResult execute(StageExecution stage) {
Map<String, ?> outputs = [webhook: [:]]
WebhookStage.StageData stageData = stage.mapTo(WebhookStage.StageData)

def response
try {
response = webhookService.exchange(stageData.method, stageData.url, stageData.payload, stageData.customHeaders)
} catch (HttpStatusCodeException e) {
def statusCode = e.getStatusCode()

outputs.webhook << [statusCode: statusCode, statusCodeValue: statusCode.value()]
if (e.responseBodyAsString) {
// Best effort parse of body in case it's JSON
def body = e.responseBodyAsString
try {
ObjectMapper objectMapper = new ObjectMapper()

if (body.startsWith("{")) {
body = objectMapper.readValue(body, Map.class)
} else if (body.startsWith("[")) {
body = objectMapper.readValue(body, List.class)
}
} catch (JsonParseException | JsonMappingException ex) {
// Just leave body as string, probs not JSON
log.warn("Failed to parse webhook payload as JSON", ex)
}

outputs.webhook << [body: body]
}
WebhookResponseProcessor responseProcessor = new WebhookResponseProcessor(objectMapper, stage, webhookProperties)

if ((stageData.failFastStatusCodes != null) &&
(stageData.failFastStatusCodes.contains(statusCode.value()))) {
String webhookMessage = "Received a status code configured to fail fast, terminating stage."
outputs.webhook << [error: webhookMessage]

return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}

if (statusCode.is5xxServerError() || statusCode.value() in webhookProperties.defaultRetryStatusCodes) {
String errorMessage = "error submitting webhook for pipeline ${stage.execution.id} to ${stageData.url}, will retry."
log.warn(errorMessage, e)

outputs.webhook << [error: errorMessage]

return TaskResult.builder(ExecutionStatus.RUNNING).context(outputs).build()
}

String errorMessage = "Error submitting webhook for pipeline ${stage.execution.id} to ${stageData.url} with status code ${statusCode.value()}."
outputs.webhook << [error: errorMessage]

return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
try {
def response = webhookService.exchange(stageData.method, stageData.url, stageData.payload, stageData.customHeaders)
return responseProcessor.process(response, null)
} catch (Exception e) {
if (e instanceof UnknownHostException || e.cause instanceof UnknownHostException) {
String errorMessage = "name resolution failure in webhook for pipeline ${stage.execution.id} to ${stageData.url}, will retry."
log.warn(errorMessage, e)
outputs.webhook << [error: errorMessage]
return TaskResult.builder(ExecutionStatus.RUNNING).context(outputs).build()
}
if (stageData.method == HttpMethod.GET && (e instanceof SocketTimeoutException || e.cause instanceof SocketTimeoutException)) {
String errorMessage = "Socket timeout in webhook on GET request to ${stageData.url}, will retry."
log.warn(errorMessage, e)
outputs.webhook << [error: errorMessage]
return TaskResult.builder(ExecutionStatus.RUNNING).context(outputs).build()
}

String errorMessage = "an exception occurred in webhook to ${stageData.url}: ${e}"
log.error(errorMessage, e)
outputs.webhook << [error: errorMessage]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}

def statusCode = response.statusCode

outputs.webhook << [statusCode: statusCode, statusCodeValue: statusCode.value()]
if (response.body) {
outputs.webhook << [body: response.body]
}

if (statusCode.is2xxSuccessful() || statusCode.is3xxRedirection()) {
if (stageData.waitForCompletion) {
String statusUrl
try {
statusUrl = determineWebHookStatusCheckUrl(response, stageData)
} catch (Exception e) {
outputs.webhook << [error: 'Exception while resolving status check URL: ' + e.message]
log.error('Exception received while determining status check url', e)
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}

if (!statusUrl) {
outputs.webhook << [
error : "The status URL couldn't be resolved, but 'Wait for completion' was checked",
statusEndpoint: statusUrl
]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}
stage.context.statusEndpoint = statusUrl
outputs.webhook << [statusEndpoint: statusUrl]
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context(outputs).build()
}
if (stage.context.containsKey("expectedArtifacts") && !((List) stage.context.get("expectedArtifacts")).isEmpty()) {
try {
def artifacts = JsonPath.parse(response.body).read("artifacts")
outputs << [artifacts: artifacts]
} catch (Exception e) {
outputs.webhook << [error: "Expected artifacts in webhook response couldn't be parsed " + e.toString()]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}
}
return TaskResult.builder(ExecutionStatus.SUCCEEDED).context(outputs).build()
} else {
outputs.webhook << [error: "The webhook request failed"]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}
}

private String determineWebHookStatusCheckUrl(ResponseEntity response, WebhookStage.StageData stageData) {

String statusCheckUrl

switch (stageData.statusUrlResolution) {
case WebhookProperties.StatusUrlResolution.getMethod:
statusCheckUrl = stageData.url
break
case WebhookProperties.StatusUrlResolution.locationHeader:
statusCheckUrl = response.headers.getFirst(HttpHeaders.LOCATION)
break
case WebhookProperties.StatusUrlResolution.webhookResponse:
statusCheckUrl = JsonPath.compile(stageData.statusUrlJsonPath as String).read(response.body)
break
}
log.info('Web hook status check url as resolved: {}', statusCheckUrl)

// Preserve the protocol scheme of original webhook that was called, when calling for status check of a webhook.
if (statusCheckUrl != stageData.url) {
Matcher statusUrlMatcher = URL_SCHEME.matcher(statusCheckUrl)
URI statusCheckUri = URI.create(statusCheckUrl).normalize()
String statusCheckHost = statusCheckUri.getHost()

URI webHookUri = URI.create(stageData.url).normalize()
String webHookHost = webHookUri.getHost()
if (webHookHost == statusCheckHost &&
webHookUri.getScheme() != statusCheckUri.getScheme() && statusUrlMatcher.find()) {
// Same hosts keep the original protocol scheme of the webhook that was originally set.
statusCheckUrl = webHookUri.getScheme() + '://' + statusUrlMatcher.group(2)
log.info('Adjusted Web hook status check url: {}', statusCheckUrl)
}
return responseProcessor.process(null, e)
}

return statusCheckUrl
}

}
Loading

0 comments on commit 206da4d

Please sign in to comment.