Skip to content

Commit

Permalink
feat(queue): support for dynamic activation of the queue shovel (spin…
Browse files Browse the repository at this point in the history
…naker#3630)

* feat(queue): support for dynamic action of the queue shovel

This makes it possible to have a QueueShovel bean enabled by default, but not active.

Once the `queue.shovel.active` flag becomes true, it will start actively shoveling messages for some time then stop and check again.

* chore(queue-shovel): switch to a fixed rate direct execution

Instead of spawning threads with a fixed delay. Also:
- add a log message when the shovel becomes active
- derive the work duration from the execution rate (i.e. strive to work for a slightly shorter duration than the scheduled rate)
- make isActive() responsible for checking both the property and the activator status
- only check the activator in `isActive` instead of in every invocation of `migrateSome`

* Update orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt

Co-Authored-By: Mark Vulfson <[email protected]>

* fix(config): add missing SqlQueueShovelConfiguration

Co-authored-by: Mark Vulfson <[email protected]>
  • Loading branch information
dreynaud and marchello2000 authored May 4, 2020
1 parent b870571 commit a1f6f39
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public long getDynamicBackoffPeriod(StageExecution stage, Duration taskDuration)
return completion.toEpochMilli() - now.toEpochMilli();
}
}

return getBackoffPeriod();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.netflix.spinnaker.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.kork.jedis.JedisDriverProperties
import com.netflix.spinnaker.kork.jedis.JedisPoolFactory
import com.netflix.spinnaker.orca.q.QueueShovel
Expand Down Expand Up @@ -153,13 +154,15 @@ class RedisQueueShovelConfiguration {
queue: AbstractRedisQueue,
@Qualifier("previousQueue") previousQueueImpl: RedisQueue,
registry: Registry,
discoveryActivator: Activator
discoveryActivator: Activator,
dynamicConfigService: DynamicConfigService
) =
QueueShovel(
queue = queue,
previousQueue = previousQueueImpl,
registry = registry,
activator = discoveryActivator
activator = discoveryActivator,
config = dynamicConfigService
)

@Bean
Expand All @@ -168,12 +171,14 @@ class RedisQueueShovelConfiguration {
queue: AbstractRedisQueue,
@Qualifier("previousClusterQueue") previousQueueImpl: AbstractRedisQueue,
registry: Registry,
discoveryActivator: Activator
discoveryActivator: Activator,
dynamicConfigService: DynamicConfigService
) =
QueueShovel(
queue = queue,
previousQueue = previousQueueImpl,
registry = registry,
activator = discoveryActivator
activator = discoveryActivator,
config = dynamicConfigService
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.q.QueueShovel
import com.netflix.spinnaker.q.Activator
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.sql.SqlDeadMessageHandler
import com.netflix.spinnaker.q.sql.SqlQueue
import java.time.Clock
import java.util.Optional
import org.jooq.DSLContext
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

const val SOURCE_POOL_NAME_PROPERTY = "queue.shovel.source-pool-name"

@Configuration
@EnableConfigurationProperties(SqlQueueProperties::class)
@ConditionalOnProperty(value = ["queue.shovel.enabled"])
class SqlQueueShovelConfiguration {

@Bean(name = ["previousSqlQueue"])
@ConditionalOnProperty(value = ["queue.shovel.kind"], havingValue = "sql-to-sql")
fun previousSqlQueue(
jooq: DSLContext,
clock: Clock,
mapper: ObjectMapper,
deadMessageHandler: SqlDeadMessageHandler,
publisher: EventPublisher,
serializationMigrator: Optional<SerializationMigrator>,
properties: SqlQueueProperties,
@Value("\${$SOURCE_POOL_NAME_PROPERTY}") poolName: String
) =
SqlQueue(
queueName = properties.queueName,
schemaVersion = 1,
jooq = jooq,
clock = clock,
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
sqlRetryProperties = properties.retries,
poolName = poolName
)

@Bean
@ConditionalOnProperty(value = ["queue.shovel.kind"], havingValue = "sql-to-sql")
fun sqlToSqlQueueShovel(
queue: SqlQueue,
@Qualifier("previousSqlQueue") previousQueue: SqlQueue,
registry: Registry,
@Qualifier("discoveryActivator") activator: Activator,
config: DynamicConfigService
): QueueShovel {
return QueueShovel(
queue = queue,
previousQueue = previousQueue,
registry = registry,
activator = activator,
config = config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.netflix.spinnaker.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.kork.jedis.JedisDriverProperties
import com.netflix.spinnaker.kork.jedis.JedisPoolFactory
import com.netflix.spinnaker.orca.q.QueueShovel
Expand Down Expand Up @@ -47,7 +48,7 @@ class SqlRedisQueueShovelConfiguration {

@Bean
@ConditionalOnBean(SqlQueue::class)
@ConditionalOnProperty(value = ["redis.cluster-enabled"], havingValue = "false", matchIfMissing = true)
@ConditionalOnProperty(value = ["queue.shovel.kind"], havingValue = "redis-to-sql")
fun redisToSqlQueueShovel(
@Value("\${redis.connection:redis://localhost:6379}") mainConnection: String,
@Value("\${redis.timeout:2000}") timeout: Int,
Expand All @@ -59,7 +60,8 @@ class SqlRedisQueueShovelConfiguration {
serializationMigrator: Optional<SerializationMigrator>,
redisQueueProperties: RedisQueueProperties,
registry: Registry,
discoveryActivator: Activator
discoveryActivator: Activator,
dynamicConfigService: DynamicConfigService
): QueueShovel {
val jedisPool = JedisPoolFactory(registry).build(
"previousQueue",
Expand All @@ -83,12 +85,13 @@ class SqlRedisQueueShovelConfiguration {
queue = queue,
previousQueue = previousQueue,
registry = registry,
activator = discoveryActivator)
activator = discoveryActivator,
config = dynamicConfigService)
}

@Bean
@ConditionalOnBean(SqlQueue::class)
@ConditionalOnProperty(value = ["redis.cluster-enabled"], havingValue = "true", matchIfMissing = false)
@ConditionalOnProperty(value = ["queue.shovel.kind"], havingValue = "redis-cluster-to-sql")
fun redisClusterToSqlQueueShovel(
queue: SqlQueue,
cluster: JedisCluster,
Expand All @@ -98,7 +101,8 @@ class SqlRedisQueueShovelConfiguration {
serializationMigrator: Optional<SerializationMigrator>,
redisQueueProperties: RedisQueueProperties,
registry: Registry,
discoveryActivator: Activator
discoveryActivator: Activator,
dynamicConfigService: DynamicConfigService
): QueueShovel {
val previousQueue = RedisClusterQueue(
queueName = redisQueueProperties.queueName,
Expand All @@ -114,7 +118,8 @@ class SqlRedisQueueShovelConfiguration {
queue = queue,
previousQueue = previousQueue,
registry = registry,
activator = discoveryActivator)
activator = discoveryActivator,
config = dynamicConfigService)
}

/**
Expand All @@ -123,6 +128,7 @@ class SqlRedisQueueShovelConfiguration {
*/
@Bean
@ConditionalOnBean(AbstractRedisQueue::class)
@ConditionalOnProperty(value = ["queue.shovel.kind"], havingValue = "sql-to-redis")
fun sqlToRedisQueueShovel(
queue: AbstractRedisQueue,
jooq: DSLContext,
Expand All @@ -132,7 +138,8 @@ class SqlRedisQueueShovelConfiguration {
serializationMigrator: Optional<SerializationMigrator>,
sqlQueueProperties: SqlQueueProperties,
registry: Registry,
discoveryActivator: Activator
discoveryActivator: Activator,
dynamicConfigService: DynamicConfigService
): QueueShovel {
val previousQueue = SqlQueue(
queueName = sqlQueueProperties.queueName,
Expand All @@ -150,6 +157,7 @@ class SqlRedisQueueShovelConfiguration {
queue = queue,
previousQueue = previousQueue,
registry = registry,
activator = discoveryActivator)
activator = discoveryActivator,
config = dynamicConfigService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
package com.netflix.spinnaker.orca.q

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.q.Activator
import com.netflix.spinnaker.q.Queue
import java.time.Duration
import java.time.Instant
import javax.annotation.PostConstruct
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Scheduled

private const val rateMs: Long = 5_000
private const val workDurationMs = (0.95 * rateMs).toLong()

/**
* The QueueShovel can be used to migrate from one queue implementation to another without an
* operator needing to perform any substantial external work.
Expand All @@ -34,32 +40,48 @@ class QueueShovel(
private val queue: Queue,
private val previousQueue: Queue,
private val registry: Registry,
private val activator: Activator
private val activator: Activator,
private val config: DynamicConfigService
) {
private val log = LoggerFactory.getLogger(javaClass)

private val pollOpsRateId = registry.createId("orca.nu.shovel.pollOpsRate")
private val shoveledMessageId = registry.createId("orca.nu.shovel.pushedMessageRate")
private val shovelErrorId = registry.createId("orca.nu.shovel.pushedMessageErrorRate")

@Scheduled(fixedDelayString = "\${queue.shovel.poll-frequency.ms:50}")
@Scheduled(fixedRate = rateMs)
fun migrateIfActive() {
if (!isActive()) {
return
}

log.info("Actively shoveling from $previousQueue to $queue")
val workDuration = Duration.ofMillis(workDurationMs)
val start = Instant.now()

while (Duration.between(start, Instant.now()) < workDuration) {
migrateOne()
Thread.sleep(50)
}
}

private fun isActive() = config.getConfig(Boolean::class.java, "queue.shovel.active", false) && activator.enabled

fun migrateOne() {
activator.ifEnabled {
registry.counter(pollOpsRateId).increment()
previousQueue.poll { message, ack ->
try {
queue.push(message)
ack.invoke()
registry.counter(shoveledMessageId).increment()
} catch (e: Throwable) {
log.error("Failed shoveling message from previous queue to active (message: {})", message, e)
registry.counter(shovelErrorId).increment()
}
registry.counter(pollOpsRateId).increment()
previousQueue.poll { message, ack ->
try {
queue.push(message)
ack.invoke()
registry.counter(shoveledMessageId).increment()
} catch (e: Throwable) {
log.error("Failed shoveling message from previous queue to active (message: $message)", e)
registry.counter(shovelErrorId).increment()
}
}
}

@PostConstruct
fun confirmShovelUsage() =
log.info("Running ${javaClass.simpleName} migrator")
log.info("${javaClass.simpleName} migrator from $previousQueue to $queue is enabled")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.spinnaker.orca.q

import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import com.netflix.spinnaker.q.Activator
import com.netflix.spinnaker.q.Queue
Expand All @@ -41,7 +42,7 @@ class QueueShovelTest : SubjectSpek<QueueShovel>({
}

subject(CachingMode.GROUP) {
QueueShovel(queue, previousQueue, registry, activator)
QueueShovel(queue, previousQueue, registry, activator, DynamicConfigService.NOOP)
}

describe("polling the previous queue") {
Expand Down

0 comments on commit a1f6f39

Please sign in to comment.