Skip to content

Commit

Permalink
feat(metrics): graph max lag
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Mar 1, 2018
1 parent 41f8000 commit 22e1fa8
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 34 deletions.
1 change: 1 addition & 0 deletions gradle/spek.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testCompile "org.jetbrains.spek:spek-api:$spekVersion"
testCompile "org.jetbrains.spek:spek-subject-extension:$spekVersion"
testCompile "com.nhaarman:mockito-kotlin:1.5.0"
testCompile "org.assertj:assertj-core:3.9.0"
testCompile "com.natpryce:hamkrest:1.4.2.2"
testCompile "org.junit.jupiter:junit-jupiter-api:$jupiterVersion"

Expand Down
1 change: 0 additions & 1 deletion orca-dry-run/orca-dry-run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ repositories {
dependencies {
compile project(":orca-core")

testCompile "org.assertj:assertj-core:3.9.0"
testCompile "org.springframework.boot:spring-boot-test:${spinnaker.version('springBoot')}"
testCompile project(":orca-test")
testCompile project(":orca-queue-tck")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.PostConstruct
import kotlin.math.max

/**
* Monitors a queue and generates Atlas metrics.
Expand All @@ -49,8 +50,11 @@ class AtlasQueueMonitor
@EventListener
fun onQueueEvent(event: QueueEvent) {
when (event) {
is QueuePolled -> _lastQueuePoll.set(clock.instant())
is MessageProcessing -> _messageLags.add(event.lag)
QueuePolled -> _lastQueuePoll.set(clock.instant())
is MessageProcessing -> {
_messageLags.updateAndGet { it + event.lag.toMillis() }
_maxMessageLag.updateAndGet { max(it, event.lag.toMillis()) }
}
is RetryPolled -> _lastRetryPoll.set(clock.instant())
is MessagePushed -> event.counter.increment()
is MessageAcknowledged -> event.counter.increment()
Expand Down Expand Up @@ -96,8 +100,13 @@ class AtlasQueueMonitor
.toMillis()
.toDouble()
})
registry.gauge("queue.message.lag", this, {
it.averageMessageLag
registry.gauge("queue.mean.lag", this, {
it.meanMessageLag
.toMillis()
.toDouble()
})
registry.gauge("queue.max.lag", this, {
it.maxMessageLag
.toMillis()
.toDouble()
})
Expand All @@ -121,15 +130,18 @@ class AtlasQueueMonitor
get() = _lastState.get()
private val _lastState = AtomicReference<QueueState>(QueueState(0, 0, 0))

val averageMessageLag: Duration
get() = _messageLags.run {
val avg = map { it.toMillis() }
.average()
.let { Duration.ofMillis(it.toLong()) }
clear()
return avg
}
private val _messageLags = mutableListOf<Duration>()
val meanMessageLag: Duration
get() = _messageLags
.getAndSet(emptyList())
.average()
.let { Duration.ofMillis(it.toLong()) }
private val _messageLags = AtomicReference<List<Long>>(emptyList())

val maxMessageLag: Duration
get() = _maxMessageLag
.getAndSet(0)
.let { Duration.ofMillis(it) }
private val _maxMessageLag = AtomicReference<Long>(0)

/**
* Count of messages pushed to the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELIN
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.orca.time.fixedClock
import com.netflix.spinnaker.q.metrics.*
import com.netflix.spinnaker.spek.shouldEqual
import com.nhaarman.mockito_kotlin.*
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP
import org.jetbrains.spek.subject.SubjectSpek
import java.time.Duration
import java.time.Duration.ZERO
import java.time.Instant.now

object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({
Expand Down Expand Up @@ -62,8 +64,8 @@ object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({

describe("default values") {
it("reports system uptime if the queue has never been polled") {
subject.lastQueuePoll shouldEqual clock.instant()
subject.lastRetryPoll shouldEqual clock.instant()
assertThat(subject.lastQueuePoll).isEqualTo(clock.instant())
assertThat(subject.lastRetryPoll).isEqualTo(clock.instant())
}
}

Expand All @@ -78,7 +80,7 @@ object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({
}

it("updates the last poll time") {
subject.lastQueuePoll shouldEqual clock.instant()
assertThat(subject.lastQueuePoll).isEqualTo(clock.instant())
}
}

Expand All @@ -92,28 +94,47 @@ object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({
}

it("updates the last poll time") {
subject.lastRetryPoll shouldEqual clock.instant()
assertThat(subject.lastRetryPoll).isEqualTo(clock.instant())
}
}

describe("when a message is being processed") {
afterGroup(::resetMocks)
describe("message lag metrics") {
given("no messages have been processed") {
afterGroup(::resetMocks)

val lag = sequenceOf(
Duration.ofSeconds(5),
Duration.ofSeconds(13),
Duration.ofSeconds(7)
)
val events = lag.mapIndexed { i, lag ->
MessageProcessing(StartExecution(PIPELINE, "$i", "covfefe"), lag)
it("reports zero lag time") {
assertThat(subject.meanMessageLag).isEqualTo(ZERO)
assertThat(subject.maxMessageLag).isEqualTo(ZERO)
}
}

on("receiving a ${events.first().javaClass.simpleName} event") {
events.forEach(subject::onQueueEvent)
}
given("some messages have been processed") {
afterGroup(::resetMocks)

val lag = sequenceOf(
Duration.ofSeconds(5),
Duration.ofSeconds(13),
Duration.ofSeconds(7)
)
val events = lag.mapIndexed { i, lag ->
MessageProcessing(StartExecution(PIPELINE, "$i", "covfefe"), lag)
}

it("averages the lag time") {
subject.averageMessageLag shouldEqual lag.map { it.toMillis() }.average().let { Duration.ofMillis(it.toLong()) }
on("receiving a ${events.first().javaClass.simpleName} event") {
events.forEach(subject::onQueueEvent)
}

it("averages the lag time") {
assertThat(subject.meanMessageLag).isEqualTo(lag.average())
// after reading the mean should reset
assertThat(subject.meanMessageLag).isEqualTo(ZERO)
}

it("records the max lag time") {
assertThat(subject.maxMessageLag).isEqualTo(lag.max())
// after reading the max should reset
assertThat(subject.maxMessageLag).isEqualTo(ZERO)
}
}
}

Expand Down Expand Up @@ -216,7 +237,12 @@ object AtlasQueueMonitorTest : SubjectSpek<AtlasQueueMonitor>({
}

it("updates the queue state") {
subject.lastState shouldEqual queueState
assertThat(subject.lastState).isEqualTo(queueState)
}
}
})

private fun Sequence<Duration>.average() =
map { it.toMillis() }
.average()
.let { Duration.ofMillis(it.toLong()) }

0 comments on commit 22e1fa8

Please sign in to comment.