Skip to content

Commit

Permalink
feat(queue): Active executions monitoring (spinnaker#1855)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert authored Dec 15, 2017
1 parent 033c7d4 commit dfff6b7
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 35 deletions.
31 changes: 0 additions & 31 deletions gradle/spek.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,3 @@ junitPlatform {
}
}
}

// JUnit5 needs all this extra crap to generate HTML reports
configurations {
junitXmlToHtml
}

dependencies {
junitXmlToHtml "org.apache.ant:ant-junit:1.9.7"
}

task generateHtmlTestReports << {
def reportsDir = new File(buildDir, "test-reports")
reportsDir.mkdirs()

ant.taskdef(
name: "junitReport",
classname: "org.apache.tools.ant.taskdefs.optional.junit.XMLResultAggregator",
classpath: configurations.junitXmlToHtml.asPath
)

ant.junitReport(todir: "$buildDir/test-results/junit-platform", tofile: "aggregated-test-results.xml") {
fileset dir: "$buildDir/test-results/junit-platform"
report format: "frames", todir: reportsDir
}
}

afterEvaluate {
def junitPlatformTestTask = tasks.getByName("junitPlatformTest")
generateHtmlTestReports.dependsOn(junitPlatformTestTask)
check.dependsOn(generateHtmlTestReports)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ package com.netflix.spinnaker.orca.dryrun

import com.natpryce.hamkrest.greaterThan
import com.natpryce.hamkrest.should.shouldMatch
import com.netflix.spinnaker.orca.q.*
import com.netflix.spinnaker.orca.q.handler.plan
import com.netflix.spinnaker.orca.q.multiTaskStage
import com.netflix.spinnaker.orca.q.pipeline
import com.netflix.spinnaker.orca.q.singleTaskStage
import com.netflix.spinnaker.orca.q.stage
import com.netflix.spinnaker.orca.q.stageWithParallelBranches
import com.netflix.spinnaker.orca.q.stageWithSyntheticBefore
import com.netflix.spinnaker.orca.q.stageWithSyntheticBeforeAndNoTasks
import com.netflix.spinnaker.orca.q.zeroTaskStage
import com.netflix.spinnaker.spek.shouldEqual
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.q.redis

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Id
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.events.ExecutionComplete
import com.netflix.spinnaker.orca.events.ExecutionEvent
import com.netflix.spinnaker.orca.events.ExecutionStarted
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.ApplicationListener
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import redis.clients.jedis.Jedis
import redis.clients.util.Pool
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

/**
* A monitor that records active executions.
*
* On every ExecutionStarted event, a record is added to a Redis hash by its execution ID. In the background, the
* monitor will update an in-memory snapshot of active executions and register dynamic gauges into Spectator.
*
* Lastly a slower background job will iterate through all executions it knows about to verify that no active
* executions have been orphaned by out-of-band failures.
*
* TODO rz - Add cloudProviders, accounts as tags
*/
@Component
class RedisActiveExecutionsMonitor(
private val executionRepository: ExecutionRepository,
@Qualifier("jedisPool") private val pool: Pool<Jedis>,
private val objectMapper: ObjectMapper,
private val registry: Registry
) : ApplicationListener<ExecutionEvent> {

private val log = LoggerFactory.getLogger(javaClass)

private val REDIS_KEY = "monitor.activeExecutions"

private var snapshot: MutableMap<Id, AtomicLong> = ConcurrentHashMap()

@Scheduled(fixedRateString = "\${queue.monitor.activeExecutions.register.frequency.ms:60000}")
fun registerGauges() {
snapshotActivity().also {
log.info("Registering new active execution gauges")

getActiveExecutions().map { execution ->
val expectedId = execution.getMetricId()
registry.gauges()
.filter { it.id() == expectedId }
.findFirst()
.orElse(registry.gauge(expectedId, null, {
snapshot[expectedId]?.toDouble() ?: 0.0
}))
}
}
}

private fun snapshotActivity(): List<ActiveExecution> {
log.info("Snapshotting active executions")

val activeExecutions = getActiveExecutions()

val working = mutableMapOf<Id, AtomicLong>()
activeExecutions.map { it.getMetricId() }.forEach {
if (working[it] == null) {
working[it] = AtomicLong()
}
working[it]?.incrementAndGet()
}

// Update snapshot from working copy
working.forEach { snapshot[it.key] = it.value }

// Remove keys that are not in the working copy
snapshot.keys
.filterNot { working.containsKey(it) }
.forEach { snapshot.remove(it) }

return activeExecutions
}

@Scheduled(fixedDelayString = "\${queue.monitor.activeExecutions.cleanup.frequency.ms:300000}")
fun cleanup() {
val orphans = getActiveExecutions()
.map {
val execution: Execution
try {
execution = executionRepository.retrieve(it.type, it.id)
} catch (e: ExecutionNotFoundException) {
return@map it.id
}
return@map if (execution.status.isComplete) it.id else null
}
.filterNotNull()
.toTypedArray()

log.info("Cleaning up ${orphans.size} orphaned active executions")
if (orphans.isNotEmpty()) {
pool.resource.use { redis ->
redis.hdel(REDIS_KEY, *orphans)
}
}
}

override fun onApplicationEvent(event: ExecutionEvent) {
if (event is ExecutionStarted) {
startExecution(event.executionType, event.executionId)
} else if (event is ExecutionComplete) {
completeExecution(event.executionId)
}
}

private fun startExecution(executionType: Execution.ExecutionType, executionId: String) {
val execution: Execution
try {
execution = executionRepository.retrieve(executionType, executionId)
} catch (e: ExecutionNotFoundException) {
log.error("Received start execution event, but was unable to read execution from the database")
return
}

pool.resource.use { redis ->
redis.hset(REDIS_KEY, execution.id, objectMapper.writeValueAsString(ActiveExecution(
id = execution.id,
type = execution.type,
application = execution.application
)))
}
}

private fun completeExecution(executionId: String) {
pool.resource.use { redis ->
redis.hdel(REDIS_KEY, executionId)
}
}

private fun getActiveExecutions() =
pool.resource.use { redis ->
redis.hgetAll(REDIS_KEY).map { objectMapper.readValue(it.value, ActiveExecution::class.java) }
}

private fun ActiveExecution.getMetricId() =
registry.createId("executions.active")
.withTag("executionType", type.toString())

data class ActiveExecution(
val id: String,
val type: Execution.ExecutionType,
val application: String
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ package com.netflix.spinnaker.orca.q
import com.natpryce.hamkrest.allElements
import com.natpryce.hamkrest.equalTo
import com.natpryce.hamkrest.should.shouldMatch
import com.netflix.appinfo.InstanceInfo.InstanceStatus.*
import com.netflix.appinfo.InstanceInfo.InstanceStatus.OUT_OF_SERVICE
import com.netflix.appinfo.InstanceInfo.InstanceStatus.STARTING
import com.netflix.appinfo.InstanceInfo.InstanceStatus.UP
import com.netflix.discovery.StatusChangeEvent
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.config.QueueConfiguration
import com.netflix.spinnaker.kork.eureka.RemoteStatusChangedEvent
import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.ExecutionStatus.CANCELED
import com.netflix.spinnaker.orca.ExecutionStatus.FAILED_CONTINUE
import com.netflix.spinnaker.orca.ExecutionStatus.NOT_STARTED
import com.netflix.spinnaker.orca.ExecutionStatus.RUNNING
import com.netflix.spinnaker.orca.ExecutionStatus.SKIPPED
import com.netflix.spinnaker.orca.ExecutionStatus.STOPPED
import com.netflix.spinnaker.orca.ExecutionStatus.SUCCEEDED
import com.netflix.spinnaker.orca.ExecutionStatus.TERMINAL
import com.netflix.spinnaker.orca.TaskResult
import com.netflix.spinnaker.orca.config.OrcaConfiguration
import com.netflix.spinnaker.orca.exceptions.DefaultExceptionHandler
Expand All @@ -43,7 +52,16 @@ import com.netflix.spinnaker.orca.pipeline.util.StageNavigator
import com.netflix.spinnaker.orca.test.redis.EmbeddedRedisConfiguration
import com.netflix.spinnaker.spek.shouldAllEqual
import com.netflix.spinnaker.spek.shouldEqual
import com.nhaarman.mockito_kotlin.*
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.argThat
import com.nhaarman.mockito_kotlin.check
import com.nhaarman.mockito_kotlin.doAnswer
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.never
import com.nhaarman.mockito_kotlin.reset
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import org.junit.After
import org.junit.Before
import org.junit.Test
Expand Down

0 comments on commit dfff6b7

Please sign in to comment.