Skip to content

Commit

Permalink
feat(shovel): override partition of foreign executions (spinnaker#3667)
Browse files Browse the repository at this point in the history
* feat(shovel): override partition of foreign executions

This should allow the shovel to be used in failover scenarios, between orcas that have a different partition.
  • Loading branch information
dreynaud authored May 8, 2020
1 parent b25cbba commit 9c4b8f8
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ class RedisQueueShovelConfiguration {
previousQueue = previousQueueImpl,
registry = registry,
activator = discoveryActivator,
config = dynamicConfigService
config = dynamicConfigService,
executionRepository = null
)

@Bean
Expand All @@ -179,6 +180,7 @@ class RedisQueueShovelConfiguration {
previousQueue = previousQueueImpl,
registry = registry,
activator = discoveryActivator,
config = dynamicConfigService
config = dynamicConfigService,
executionRepository = null
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.netflix.spinnaker.config
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.QueueShovel
import com.netflix.spinnaker.q.Activator
import com.netflix.spinnaker.q.metrics.EventPublisher
Expand Down Expand Up @@ -76,13 +77,15 @@ class SqlQueueShovelConfiguration {
@Qualifier("previousSqlQueue") previousQueue: SqlQueue,
registry: Registry,
@Qualifier("discoveryActivator") activator: Activator,
config: DynamicConfigService
config: DynamicConfigService,
executionRepository: ExecutionRepository
): QueueShovel {
return QueueShovel(
queue = queue,
previousQueue = previousQueue,
registry = registry,
activator = activator,
config = config)
config = config,
executionRepository = executionRepository)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class SqlRedisQueueShovelConfiguration {
previousQueue = previousQueue,
registry = registry,
activator = discoveryActivator,
config = dynamicConfigService)
config = dynamicConfigService,
executionRepository = null)
}

@Bean
Expand Down Expand Up @@ -119,7 +120,8 @@ class SqlRedisQueueShovelConfiguration {
previousQueue = previousQueue,
registry = registry,
activator = discoveryActivator,
config = dynamicConfigService)
config = dynamicConfigService,
executionRepository = null)
}

/**
Expand Down Expand Up @@ -158,6 +160,7 @@ class SqlRedisQueueShovelConfiguration {
previousQueue = previousQueue,
registry = registry,
activator = discoveryActivator,
config = dynamicConfigService)
config = dynamicConfigService,
executionRepository = null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package com.netflix.spinnaker.orca.q

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.q.Activator
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import java.time.Duration
import java.time.Instant
Expand All @@ -41,7 +44,8 @@ class QueueShovel(
private val previousQueue: Queue,
private val registry: Registry,
private val activator: Activator,
private val config: DynamicConfigService
private val config: DynamicConfigService,
private val executionRepository: ExecutionRepository?
) {
private val log = LoggerFactory.getLogger(javaClass)

Expand Down Expand Up @@ -71,17 +75,54 @@ class QueueShovel(
registry.counter(pollOpsRateId).increment()
previousQueue.poll { message, ack ->
try {
log.debug("Shoveling message $message")

// transfer the ownership _before_ pushing the message on the queue
// we don't want a task handler running that message if the execution is not local
transferOwnership(message)

queue.push(message)
ack.invoke()
registry.counter(shoveledMessageId).increment()
} catch (e: ExecutionNotFoundException) {
// no need to log the stack trace on ExecutionNotFoundException, which can be somewhat expected
log.error("Failed shoveling message from previous queue to active (message: $message) " +
"because of exception $e")
registry.counter(shovelErrorId).increment()
} catch (e: Throwable) {
log.error("Failed shoveling message from previous queue to active (message: $message)", e)
registry.counter(shovelErrorId).increment()
}
}
}

private fun transferOwnership(message: Message) {
if (executionRepository == null) {
return
}

if (message !is ExecutionLevel) {
log.warn("Message $message does not implement ExecutionLevel, can not inspect partition")
return
}

// don't catch exceptions on retrieve/store (e.g. ExecutionNotFoundException), so that we can short-circuit shoveling
// of this message
val execution = executionRepository.retrieve(message.executionType, message.executionId)
val isForeign = !executionRepository.handlesPartition(execution.partition)
if (isForeign) {
log.info("Taking ownership of foreign execution ${execution.id} with partition '${execution.partition}'. " +
"Setting partition to '${executionRepository.partition}'")
execution.partition = executionRepository.partition
executionRepository.store(execution)
}
}

@PostConstruct
fun confirmShovelUsage() =
fun confirmShovelUsage() {
log.info("${javaClass.simpleName} migrator from $previousQueue to $queue is enabled")
if (executionRepository == null) {
log.warn("${javaClass.simpleName} configured without an ExecutionRepository, won't be able to transfer ownership")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,135 @@ package com.netflix.spinnaker.orca.q
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.q.Activator
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.QueueCallback
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doAnswer
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.never
import com.nhaarman.mockito_kotlin.reset
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.verifyZeroInteractions
import com.nhaarman.mockito_kotlin.whenever
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.jetbrains.spek.api.lifecycle.CachingMode
import org.jetbrains.spek.subject.SubjectSpek
import org.mockito.ArgumentMatchers.anyString

class QueueShovelTest : SubjectSpek<QueueShovel>({

val message = StartExecution(PIPELINE, "executionId", "app")
val queue: Queue = mock()
val previousQueue: Queue = mock()
val registry = NoopRegistry()
val ackCallback = mock<() -> Unit>()
val previousQueue: Queue = mock()
val executionRepository: ExecutionRepository = mock()
val execution: PipelineExecution = mock()

val activator = object : Activator {
override val enabled = true
}

subject(CachingMode.GROUP) {
QueueShovel(queue, previousQueue, registry, activator, DynamicConfigService.NOOP)
QueueShovel(queue, previousQueue, registry, activator, DynamicConfigService.NOOP, executionRepository)
}

beforeEachTest {
whenever(previousQueue.poll(any())).doAnswer {
it.getArgument<QueueCallback>(0)(message, ackCallback)
}

whenever(execution.partition).doReturn("some-partition")
}

afterEachTest {
reset(executionRepository, queue, ackCallback, execution)
}

describe("polling the previous queue") {
val message = StartExecution(PIPELINE, "1", "spinnaker")
beforeGroup {
whenever(executionRepository.retrieve(any(), anyString())).doReturn(execution)
whenever(executionRepository.handlesPartition(anyString())).doReturn(true)
}

on("the shovel poll method is invoked") {
subject.migrateOne()

it("pushes the message onto the current queue and acks it") {
verify(queue).push(message)
verify(ackCallback).invoke()
}
}
}

describe("dealing with a foreign execution") {
beforeGroup {
whenever(previousQueue.poll(any())) doAnswer {
it.getArgument<QueueCallback>(0)(message, {})
whenever(executionRepository.retrieve(any(), anyString())).doReturn(execution)
whenever(executionRepository.handlesPartition(anyString())).doReturn(false)
whenever(executionRepository.partition).thenReturn("local-partition")
}

on("a poll cycle where the message belongs to a foreign execution") {
subject.migrateOne()

it("overwrites the partition to be the local one") {
verify(execution).partition = "local-partition"
verify(executionRepository).store(execution)
verify(queue).push(message)
}
}
}

on("the shovel poll method is invoked") {
describe("dealing with execution repository read errors") {
beforeGroup {
whenever(executionRepository.retrieve(any(), anyString())).thenThrow(ExecutionNotFoundException("womp womp"))
}

on("a poll cycle") {
subject.migrateOne()

it("leaves the message on the old queue") {
// not pushed
verifyZeroInteractions(queue)

// not acked
verifyZeroInteractions(ackCallback)

// execution not updated
verify(executionRepository, never()).handlesPartition(anyString())
verify(executionRepository, never()).store(any())
}
}
}

describe("dealing with execution repository write errors") {
beforeGroup {
whenever(executionRepository.retrieve(any(), anyString())).doReturn(execution)
whenever(executionRepository.handlesPartition(anyString())).doReturn(false)
whenever(executionRepository.partition).thenReturn("local-partition")
whenever(executionRepository.store(execution)).thenThrow(RuntimeException("something unexpected"))
}

it("pushes the message onto the current queue") {
verify(queue).push(message)
on("a poll cycle") {
subject.migrateOne()

it("leaves the message on the old queue") {
// attempted to transfer ownership
verify(execution).partition = "local-partition"
verify(executionRepository).store(execution)

// not pushed
verifyZeroInteractions(queue)

// not acked
verifyZeroInteractions(ackCallback)
}
}
}
})
1 change: 0 additions & 1 deletion orca-web/orca-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ dependencies {
testImplementation("cglib:cglib-nodep")
testImplementation("org.objenesis:objenesis")
testImplementation("org.junit.jupiter:junit-jupiter-api")
testImplementation("org.hamcrest:hamcrest-core:1.3")
testImplementation("com.netflix.spinnaker.keiko:keiko-mem:$keikoVersion")
}

Expand Down

0 comments on commit 9c4b8f8

Please sign in to comment.