Skip to content

Commit

Permalink
Code refactor according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Jul 24, 2013
1 parent 8d1ef7f commit 31ec72b
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 126 deletions.
73 changes: 72 additions & 1 deletion conf/metrics.properties.template
Original file line number Diff line number Diff line change
@@ -1,11 +1,82 @@
# syntax: [instance].[sink|source].[name].[options]

# "instance" specify "who" (the role) use metrics system. In spark there are
# several roles like master, worker, executor, driver, these roles will
# create metrics system for monitoring. So instance represents these roles.
# Currently in Spark, several instances have already implemented: master,
# worker, executor, driver.
#
# [instance] field can be "master", "worker", "executor", "driver", which means
# only the specified instance has this property.
# a wild card "*" can be used to represent instance name, which means all the
# instances will have this property.
#
# "source" specify "where" (source) to collect metrics data. In metrics system,
# there exists two kinds of source:
# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
# collect Spark component's internal state, these sources are related to
# instance and will be added after specific metrics system is created.
# 2. Common source, like JvmSource, which will collect low level state, is
# configured by configuration and loaded through reflection.
#
# "sink" specify "where" (destination) to output metrics data to. Several sinks
# can be coexisted and flush metrics to all these sinks.
#
# [sink|source] field specify this property is source related or sink, this
# field can only be source or sink.
#
# [name] field specify the name of source or sink, this is custom defined.
#
# [options] field is the specific property of this source or sink, this source
# or sink is responsible for parsing this property.
#
# Notes:
# 1. Sinks should be added through configuration, like console sink, class
# full name should be specified by class property.
# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
# it should be attention minimal polling period is 1 seconds, any period
# below than 1s is illegal.
# 3. Wild card property can be overlapped by specific instance property, for
# example, *.sink.console.period can be overlapped by master.sink.console.period.
# 4. A metrics specific configuration
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
# added to Java property using -Dspark.metrics.conf=xxx if you want to
# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
# metrics system will search and load it automatically.

# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink

# Enable ConsoleSink for all instances by class name
#*.sink.console.class=spark.metrics.sink.ConsoleSink

# Polling period for ConsoleSink
#*.sink.console.period=10

#*.sink.console.unit=second
#*.sink.console.unit=seconds

# Master instance overlap polling period
#master.sink.console.period=15

#master.sink.console.unit=seconds

# Enable CsvSink for all instances
#*.sink.csv.class=spark.metrics.sink.CsvSink

# Polling period for CsvSink
#*.sink.csv.period=1

#*.sink.csv.unit=minutes

# Polling directory for CsvSink
#*.sink.csv.directory=/tmp/

# Worker instance overlap polling period
#worker.sink.csv.period=10

#worker.sink.csv.unit=minutes

# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=spark.metrics.source.JvmSource

#worker.source.jvm.class=spark.metrics.source.JvmSource
Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act

Utils.checkHost(host, "Expected hostname")

val metricsSystem = MetricsSystem.createMetricsSystem("master")
val masterSource = new MasterSource(this)

val masterPublicAddress = {
Expand All @@ -77,12 +78,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())

Master.metricsSystem.registerSource(masterSource)
Master.metricsSystem.start()
metricsSystem.registerSource(masterSource)
metricsSystem.start()
}

override def postStop() {
webUi.stop()
metricsSystem.stop()
}

override def receive = {
Expand Down Expand Up @@ -322,17 +324,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
}
}

override def postStop() {
Master.metricsSystem.stop()
}
}

private[spark] object Master {
private val systemName = "sparkMaster"
private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
private val metricsSystem = MetricsSystem.createMetricsSystem("master")

def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/spark/deploy/master/MasterSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private[spark] class MasterSource(val master: Master) extends Source {
})

// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waiting_apps", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})
}
11 changes: 4 additions & 7 deletions core/src/main/scala/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0

val metricsSystem = MetricsSystem.createMetricsSystem("worker")
val workerSource = new WorkerSource(this)

def coresFree: Int = cores - coresUsed
Expand Down Expand Up @@ -100,10 +101,9 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
startWebUi()

Worker.metricsSystem.registerSource(workerSource)
Worker.metricsSystem.start()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
}

def connectToMaster() {
Expand Down Expand Up @@ -185,14 +185,11 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
webUi.stop()

Worker.metricsSystem.stop()
metricsSystem.stop()
}
}

private[spark] object Worker {
private val metricsSystem = MetricsSystem.createMetricsSystem("worker")

def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/spark/deploy/worker/WorkerSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()

metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})

// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})

// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})

// Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})

// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] {
// Gauge for memory free of this worker
metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}
8 changes: 4 additions & 4 deletions core/src/main/scala/spark/executor/ExecutorSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ class ExecutorSource(val executor: Executor) extends Source {
val sourceName = "executor"

// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
})

// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})

// Gauge for executor thread pool's current number of threads
metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
})

// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
}
50 changes: 32 additions & 18 deletions core/src/main/scala/spark/metrics/MetricsConfig.scala
Original file line number Diff line number Diff line change
@@ -1,44 +1,58 @@
package spark.metrics

import java.util.Properties
import java.io.{File, FileInputStream}
import java.io.{File, FileInputStream, InputStream, IOException}

import scala.collection.mutable
import scala.util.matching.Regex

private[spark] class MetricsConfig(val configFile: String) {
val properties = new Properties()
import spark.Logging

private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
initLogging()

val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
val METRICS_CONF = "metrics.properties"

val properties = new Properties()
var propertyCategories: mutable.HashMap[String, Properties] = null

private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.jmx.enabled", "default")
prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource")
// empty function, any default property can be set here
}

def initilize() {
def initialize() {
//Add default properties in case there's no properties file
setDefaultProperties(properties)

val confFile = new File(configFile)
if (confFile.exists()) {
var fis: FileInputStream = null
try {
fis = new FileInputStream(configFile)
properties.load(fis)
} finally {
fis.close()
// If spark.metrics.conf is not set, try to get file in class path
var is: InputStream = null
try {
is = configFile match {
case Some(f) => new FileInputStream(f)
case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
}

if (is != null) {
properties.load(is)
}
} catch {
case e: Exception => logError("Error loading configure file", e)
} finally {
if (is != null) is.close()
}

propertyCategories = subProperties(properties, INSTANCE_REGEX)
if (propertyCategories.contains(DEFAULT_PREFIX)) {
import scala.collection.JavaConversions._

val defaultProperty = propertyCategories(DEFAULT_PREFIX)
for ((inst, prop) <- propertyCategories; p <- defaultProperty
if inst != DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
prop.setProperty(p._1, p._2)
for { (inst, prop) <- propertyCategories
if (inst != DEFAULT_PREFIX)
(k, v) <- defaultProperty
if (prop.getProperty(k) == null) } {
prop.setProperty(k, v)
}
}
}
Expand All @@ -58,7 +72,7 @@ private[spark] class MetricsConfig(val configFile: String) {
def getInstance(inst: String): Properties = {
propertyCategories.get(inst) match {
case Some(s) => s
case None => propertyCategories(DEFAULT_PREFIX)
case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
}
Expand Down
Loading

0 comments on commit 31ec72b

Please sign in to comment.