Skip to content

Commit

Permalink
MINOR: Fix metric collection NPE during shutdown
Browse files Browse the repository at this point in the history
Collecting socket server metrics during shutdown may throw NullPointerException

Author: Xavier Léauté <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes apache#2221 from xvrl/fix-metrics-npe-on-shutdown
  • Loading branch information
xvrl authored and ijuma committed Dec 8, 2016
1 parent 1949a76 commit 006630f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ public Map<MetricName, KafkaMetric> metrics() {
return this.metrics;
}

public KafkaMetric metric(MetricName metricName) {
return this.metrics.get(metricName);
}

/**
* This iterates over every Sensor and triggers a removeSensor if it has expired
* Package private for testing
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time

newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
def value = allMetricNames.map( metricName =>
metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
def value = allMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(_.value)
}.sum / totalProcessorThreads
}
)

Expand Down Expand Up @@ -389,7 +390,7 @@ private[kafka] class Processor(val id: Int,
newGauge("IdlePercent",
new Gauge[Double] {
def value = {
metrics.metrics().get(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value)
}
},
metricTags.asScala
Expand Down
34 changes: 25 additions & 9 deletions core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@

package kafka.network

import java.net._
import javax.net.ssl._
import java.io._
import java.util.HashMap
import java.util.Random
import java.net._
import java.nio.ByteBuffer
import java.util.{HashMap, Random}
import javax.net.ssl._

import com.yammer.metrics.core.Gauge
import com.yammer.metrics.{Metrics => YammerMetrics}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.NetworkSend
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.record.MemoryRecords
import org.junit.Assert._
import org.junit._
import org.scalatest.junit.JUnitSuite

import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable.ArrayBuffer

class SocketServerTest extends JUnitSuite {
Expand Down Expand Up @@ -395,4 +397,18 @@ class SocketServerTest extends JUnitSuite {

}

@Test
def testMetricCollectionAfterShutdown(): Unit = {
server.shutdown()

val sum = YammerMetrics
.defaultRegistry
.allMetrics.asScala
.filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent"))
.collect { case (_, metric: Gauge[_]) => metric.value.asInstanceOf[Double] }
.sum

assertEquals(0, sum, 0)
}

}

0 comments on commit 006630f

Please sign in to comment.