Skip to content

Commit

Permalink
Revert "fix(orca) stop just re-queueing maxed out CompleteExecution m…
Browse files Browse the repository at this point in the history
…essages"

This reverts commit 6790b8c.
  • Loading branch information
robfletcher committed Aug 14, 2017
1 parent dcca184 commit 1e77b85
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.netflix.spinnaker.config

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.config.RedisConfiguration
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.handler.DeadMessageHandler
import com.netflix.spinnaker.orca.q.redis.RedisDeadMessageHandler
import com.netflix.spinnaker.orca.q.redis.RedisQueue
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
Expand All @@ -45,14 +45,12 @@ open class RedisQueueConfiguration {
registry: Registry
) = RedisConfiguration.createPool(redisPoolConfig, connection, timeout, registry, "queueJedisPool")

@Bean open fun deadMessageHandler(
executionRepository: ExecutionRepository,
@Bean() open fun redisDeadMessageHandler(
@Qualifier("queueJedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
clock: Clock
) =
RedisDeadMessageHandler(
executionRepository,
deadLetterQueueName = redisQueueProperties.deadLetterQueueName,
pool = redisPool,
clock = clock
Expand All @@ -73,4 +71,5 @@ open class RedisQueueConfiguration {
publisher = publisher,
ackTimeout = Duration.ofSeconds(redisQueueProperties.ackTimeoutSeconds.toLong())
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package com.netflix.spinnaker.orca.q.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.Message
import com.netflix.spinnaker.orca.q.Queue
import com.netflix.spinnaker.orca.q.handler.DeadMessageHandler
Expand All @@ -26,11 +25,10 @@ import redis.clients.util.Pool
import java.time.Clock

class RedisDeadMessageHandler(
executionRepository: ExecutionRepository,
val deadLetterQueueName: String,
private val pool: Pool<Jedis>,
private val clock: Clock
) : DeadMessageHandler(executionRepository) {
) : DeadMessageHandler() {

private val dlqKey = "$deadLetterQueueName.messages"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package com.netflix.spinnaker.orca.q.redis

import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.CompleteExecution
import com.netflix.spinnaker.orca.q.Queue
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.spek.and
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.reset
import com.nhaarman.mockito_kotlin.verify
Expand All @@ -36,15 +36,14 @@ import java.time.ZoneId

object RedisDeadMessageHandlerTest : SubjectSpek<RedisDeadMessageHandler>({

val repository: ExecutionRepository = mock()
val queue: Queue = mock()
val redis: Jedis = mock()
val redisPool: Pool<Jedis> = mock()
val clock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault())

subject(CachingMode.GROUP) {
whenever(redisPool.resource).thenReturn(redis)
RedisDeadMessageHandler(repository, "dlq", redisPool, clock)
RedisDeadMessageHandler("dlq", redisPool, clock)
}

fun resetMocks() = reset(queue, redis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.config.OrcaConfiguration.applyThreadPoolMetrics
import com.netflix.spinnaker.orca.log.BlackholeExecutionLogRepository
import com.netflix.spinnaker.orca.log.ExecutionLogRepository
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.Queue
import com.netflix.spinnaker.orca.q.handler.DeadMessageHandler
import com.netflix.spinnaker.orca.q.memory.InMemoryQueue
Expand All @@ -35,6 +34,7 @@ import org.springframework.context.event.SimpleApplicationEventMulticaster
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.time.Clock
import java.util.concurrent.ThreadPoolExecutor

@Configuration
@ComponentScan(basePackages = arrayOf("com.netflix.spinnaker.orca.q", "com.netflix.spinnaker.orca.log", "com.netflix.spinnaker.orca.q.trafficshaping"))
Expand All @@ -43,10 +43,6 @@ open class QueueConfiguration {
@Bean @ConditionalOnMissingBean(Clock::class)
open fun systemClock(): Clock = Clock.systemDefaultZone()

@Bean @ConditionalOnMissingBean(DeadMessageHandler::class)
open fun deadMessageHandler(executionRepository: ExecutionRepository) =
DeadMessageHandler(executionRepository)

@Bean(name = arrayOf("queueImpl")) @ConditionalOnMissingBean(Queue::class)
open fun inMemoryQueue(clock: Clock, deadMessageHandler: DeadMessageHandler, publisher: ApplicationEventPublisher) =
InMemoryQueue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@
package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus.TERMINAL
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

open class DeadMessageHandler(
private val executionRepository: ExecutionRepository
) {
@Component open class DeadMessageHandler {
private val log = LoggerFactory.getLogger(javaClass)

open fun handle(queue: Queue, message: Message) {
log.error("Dead message: $message")
when (message) {
is CompleteExecution -> executionRepository.updateStatus(message.executionId, TERMINAL)
is TaskLevel -> queue.push(CompleteTask(message, TERMINAL))
is StageLevel -> queue.push(CompleteStage(message, TERMINAL))
is ExecutionLevel -> queue.push(CompleteExecution(message))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,31 @@ package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus.TERMINAL
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.*
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.reset
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.verifyZeroInteractions
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP
import org.jetbrains.spek.subject.SubjectSpek

object DeadMessageHandlerTest : SubjectSpek<DeadMessageHandler>({

val repository: ExecutionRepository = mock()
val queue: Queue = mock()

subject(GROUP) {
DeadMessageHandler(repository)
DeadMessageHandler()
}

fun resetMocks() = reset(queue)

describe("handling a CompleteExecution message") {
val message = CompleteExecution(Pipeline::class.java, "1", "spinnaker")

afterGroup(::resetMocks)

on("receiving a message") {
subject.handle(queue, message)
}

it("immediately terminates the execution") {
verify(repository).updateStatus(message.executionId, TERMINAL)
}

it("does not queue any messages") {
verifyZeroInteractions(queue)
}
}

describe("handling an execution level message") {
val message = StartExecution(Pipeline::class.java, "1", "spinnaker")

afterGroup(::resetMocks)

on("receiving a message") {
action("the handler receives a message") {
subject.handle(queue, message)
}

Expand All @@ -78,7 +56,7 @@ object DeadMessageHandlerTest : SubjectSpek<DeadMessageHandler>({

afterGroup(::resetMocks)

on("receiving a message") {
action("the handler receives a message") {
subject.handle(queue, message)
}

Expand All @@ -92,7 +70,7 @@ object DeadMessageHandlerTest : SubjectSpek<DeadMessageHandler>({

afterGroup(::resetMocks)

on("receiving a message") {
action("the handler receives a message") {
subject.handle(queue, message)
}

Expand Down

0 comments on commit 1e77b85

Please sign in to comment.