Skip to content

Commit

Permalink
fix(core): don't immediately restart pipelines if another is running
Browse files Browse the repository at this point in the history
If a pipeline has `limitConcurrent = true` and someone tries to restart a failed pipeline while another is already running, that restart will be queued just like any other executions.
  • Loading branch information
robfletcher committed May 4, 2018
1 parent 74a8854 commit 4bea4e7
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.queueing

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.netflix.spinnaker.q.Message
import org.springframework.stereotype.Component
import redis.clients.jedis.Jedis
import redis.clients.util.Pool

@Component
class RedisPipelineQueue(
private val pool: Pool<Jedis>,
private val mapper: ObjectMapper
) : PipelineQueue {
override fun enqueue(pipelineConfigId: String, message: Message) {
pool.resource.use { redis ->
redis.lpush(listName(pipelineConfigId), mapper.writeValueAsString(message))
}
}

override fun popOldest(pipelineConfigId: String): Message? =
pool.resource.use { redis ->
redis
.rpop(listName(pipelineConfigId))
?.let { mapper.readValue(it) }
}

override fun popNewest(pipelineConfigId: String): Message? =
pool.resource.use { redis ->
redis
.lpop(listName(pipelineConfigId))
?.let { mapper.readValue(it) }
}

override fun purge(pipelineConfigId: String, callback: (Message) -> Unit) {
pool.resource.use { redis ->
while (redis.llen(listName(pipelineConfigId)) > 0L) {
popOldest(pipelineConfigId)?.let(callback)
}
}
}

override fun depth(pipelineConfigId: String): Int =
pool.resource.use { redis ->
redis.llen(listName(pipelineConfigId)).toInt()
}

private fun listName(pipelineConfigId: String) =
"orca.pipeline.queue.$pipelineConfigId"
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package com.netflix.spinnaker.orca.q.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.config.RedisOrcaQueueConfiguration
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.orca.config.JedisConfiguration
import com.netflix.spinnaker.orca.config.RedisConfiguration
import com.netflix.spinnaker.orca.q.QueueIntegrationTest
import com.netflix.spinnaker.orca.q.TestConfig
import com.netflix.spinnaker.orca.queueing.RedisPipelineQueue
import com.netflix.spinnaker.orca.test.redis.EmbeddedRedisConfiguration
import org.junit.runner.RunWith
import org.springframework.boot.test.context.SpringBootTest
Expand All @@ -40,6 +41,10 @@ class RedisTestConfig {
@Bean
fun redisClientSelector(redisClientDelegates: List<RedisClientDelegate>) =
RedisClientSelector(redisClientDelegates)

@Bean
fun pipelineQueue(jedisPool: Pool<Jedis>, mapper: ObjectMapper) =
RedisPipelineQueue(jedisPool, mapper)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.queueing

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.kork.jedis.EmbeddedRedis
import com.netflix.spinnaker.orca.fixture.pipeline
import com.netflix.spinnaker.orca.fixture.stage
import com.netflix.spinnaker.orca.q.RestartStage
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.orca.queueing.RedisPipelineQueue
import com.netflix.spinnaker.q.Message
import com.nhaarman.mockito_kotlin.*
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.*
import java.util.*

internal object RedisPipelineQueueTest : Spek({

lateinit var redis: EmbeddedRedis
val mapper = ObjectMapper().apply {
registerModule(KotlinModule())
registerSubtypes(StartExecution::class.java, RestartStage::class.java)
}
lateinit var subject: RedisPipelineQueue

beforeGroup {
redis = EmbeddedRedis.embed()
subject = RedisPipelineQueue(redis.pool, mapper)
}

afterGroup {
redis.destroy()
}

fun flushAll() {
redis.pool.resource.use { it.flushAll() }
}

val id = UUID.randomUUID().toString()
val pipeline = pipeline {
pipelineConfigId = id
stage {
refId = "1"
}
stage {
refId = "2"
requisiteStageRefIds = setOf("1")
}
}
val startMessage = StartExecution(pipeline)
val restartMessage = RestartStage(pipeline.stageByRef("2"), "[email protected]")

sequenceOf<Message>(startMessage, restartMessage).forEach { message ->
describe("enqueueing a ${message.javaClass.simpleName} message") {
given("the queue is empty") {
beforeGroup {
assertThat(subject.depth(id)).isZero()
}

on("enqueueing the message") {
subject.enqueue(id, message)

it("makes the depth 1") {
assertThat(subject.depth(id)).isOne()
}
}

afterGroup(::flushAll)
}
}
}

describe("popping a message") {
given("the queue is empty") {
beforeGroup {
assertThat(subject.depth(id)).isZero()
}

on("popping a message") {
val popped = subject.popOldest(id)

it("returns null") {
assertThat(popped).isNull()
}
}
}

given("a message was enqueued") {
beforeGroup {
subject.enqueue(id, startMessage)
}

on("popping a message") {
val popped = subject.popOldest(id)

it("returns the message") {
assertThat(popped).isEqualTo(startMessage)
}

it("removes the message from the queue") {
assertThat(subject.depth(id)).isZero()
}
}

afterGroup(::flushAll)
}

given("multiple messages were enqueued") {
beforeEachTest {
subject.enqueue(id, startMessage)
subject.enqueue(id, restartMessage)
}

on("popping the oldest message") {
val popped = subject.popOldest(id)

it("returns the oldest message") {
assertThat(popped).isEqualTo(startMessage)
}

it("removes the message from the queue") {
assertThat(subject.depth(id)).isOne()
}
}

on("popping the newest message") {
val popped = subject.popNewest(id)

it("returns the newest message") {
assertThat(popped).isEqualTo(restartMessage)
}

it("removes the message from the queue") {
assertThat(subject.depth(id)).isOne()
}
}

afterEachTest(::flushAll)
}
}

describe("purging the queue") {
val callback = mock<(Message) -> Unit>()

given("there are some messages on the queue") {
beforeGroup {
subject.enqueue(id, startMessage)
subject.enqueue(id, restartMessage)
}

on("purging the queue") {
subject.purge(id, callback)

it("makes the queue empty") {
assertThat(subject.depth(id)).isZero()
}

it("invokes the callback passing each message") {
verify(callback).invoke(startMessage)
verify(callback).invoke(restartMessage)
}
}

afterGroup(::flushAll)
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.exceptions.ExceptionHandler
import com.netflix.spinnaker.orca.ext.parent
import com.netflix.spinnaker.orca.pipeline.model.Execution
Expand Down Expand Up @@ -86,4 +87,18 @@ internal interface OrcaMessageHandler<M : Message> : MessageHandler<M> {
}
}
}

fun Execution.shouldQueue(): Boolean {
if (!isLimitConcurrent) {
return false
}
return pipelineConfigId?.let { configId ->
val criteria = ExecutionRepository.ExecutionCriteria().setLimit(1).setStatuses(ExecutionStatus.RUNNING)
!repository
.retrievePipelinesForPipelineConfigId(configId, criteria)
.isEmpty
.toBlocking()
.first()
} == true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.RestartStage
import com.netflix.spinnaker.orca.q.StartStage
import com.netflix.spinnaker.orca.queueing.PipelineQueue
import com.netflix.spinnaker.q.Queue
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.time.Clock

Expand All @@ -32,21 +35,32 @@ class RestartStageHandler(
override val queue: Queue,
override val repository: ExecutionRepository,
override val stageDefinitionBuilderFactory: StageDefinitionBuilderFactory,
private val pipelineQueue: PipelineQueue,
private val clock: Clock
) : OrcaMessageHandler<RestartStage>, StageBuilderAware {

override val messageType = RestartStage::class.java

private val log: Logger get() = LoggerFactory.getLogger(javaClass)

override fun handle(message: RestartStage) {
message.withStage { stage ->
// If RestartStage is requested for a synthetic stage, operate on its parent
val topStage = stage.topLevelStage
val startMessage = StartStage(message.executionType, message.executionId, message.application, topStage.id)
if (topStage.status.isComplete) {
topStage.addRestartDetails(message.user)
topStage.reset()
repository.updateStatus(topStage.execution.type, topStage.execution.id, RUNNING)
queue.push(StartStage(startMessage))
if (stage.execution.shouldQueue()) {
// this pipeline is already running and has limitConcurrent = true
stage.execution.pipelineConfigId?.let {
log.info("Queueing restart of {} {} {}", stage.execution.application, stage.execution.name, stage.execution.id)
pipelineQueue.enqueue(it, message)
}
} else {
// If RestartStage is requested for a synthetic stage, operate on its parent
val topStage = stage.topLevelStage
val startMessage = StartStage(message.executionType, message.executionId, message.application, topStage.id)
if (topStage.status.isComplete) {
topStage.addRestartDetails(message.user)
topStage.reset()
repository.updateStatus(topStage.execution.type, topStage.execution.id, RUNNING)
queue.push(StartStage(startMessage))
}
}
}
}
Expand Down
Loading

0 comments on commit 4bea4e7

Please sign in to comment.