Skip to content

Commit

Permalink
removed config
Browse files Browse the repository at this point in the history
  • Loading branch information
ec2-user committed Apr 17, 2014
1 parent e269c24 commit 3d6ba56
Show file tree
Hide file tree
Showing 12 changed files with 620 additions and 9 deletions.
133 changes: 133 additions & 0 deletions conf/metrics.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# syntax: [instance].sink|source.[name].[options]=[value]

# This file configures Spark's internal metrics system. The metrics system is
# divided into instances which correspond to internal components.
# Each instance can be configured to report its metrics to one or more sinks.
# Accepted values for [instance] are "master", "worker", "executor", "driver",
# and "applications". A wild card "*" can be used as an instance name, in
# which case all instances will inherit the supplied property.
#
# Within an instance, a "source" specifies a particular set of grouped metrics.
# there are two kinds of sources:
# 1. Spark internal sources, like MasterSource, WorkerSource, etc, which will
# collect a Spark component's internal state. Each instance is paired with a
# Spark source that is added automatically.
# 2. Common sources, like JvmSource, which will collect low level state.
# These can be added through configuration options and are then loaded
# using reflection.
#
# A "sink" specifies where metrics are delivered to. Each instance can be
# assigned one or more sinks.
#
# The sink|source field specifies whether the property relates to a sink or
# source.
#
# The [name] field specifies the name of source or sink.
#
# The [options] field is the specific property of this source or sink. The
# source or sink is responsible for parsing this property.
#
# Notes:
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
# 4. A metrics specific configuration
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
# added to Java properties using -Dspark.metrics.conf=xxx if you want to
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
# and it will be loaded automatically.
# 5. MetricsServlet is added by default as a sink in master, worker and client
# driver, you can send http request "/metrics/json" to get a snapshot of all the
# registered metrics in json format. For master, requests "/metrics/master/json" and
# "/metrics/applications/json" can be sent seperately to get metrics snapshot of
# instance master and applications. MetricsServlet may not be configured by self.
#

## List of available sinks and their properties.

# org.apache.spark.metrics.sink.ConsoleSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period

# org.apache.spark.metrics.sink.CSVSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period
# directory /tmp Where to store CSV files

# org.apache.spark.metrics.sink.GangliaSink
# Name: Default: Description:
# host NONE Hostname or multicast group of Ganglia server
# port NONE Port of Ganglia server(s)
# period 10 Poll period
# unit seconds Units of poll period
# ttl 1 TTL of messages sent by Ganglia
# mode multicast Ganglia network mode ('unicast' or 'multicast')

# org.apache.spark.metrics.sink.JmxSink

# org.apache.spark.metrics.sink.MetricsServlet
# Name: Default: Description:
# path VARIES* Path prefix from the web server root
# sample false Whether to show entire set of samples for histograms ('false' or 'true')
#
# * Default path is /metrics/json for all instances except the master. The master has two paths:
# /metrics/aplications/json # App information
# /metrics/master/json # Master information

# org.apache.spark.metrics.sink.GraphiteSink
# Name: Default: Description:
# host NONE Hostname of Graphite server
# port NONE Port of Graphite server
# period 10 Poll period
# unit seconds Units of poll period
# prefix EMPTY STRING Prefix to prepend to metric name

## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink

# Polling period for ConsoleSink
#*.sink.console.period=10

#*.sink.console.unit=seconds

# Master instance overlap polling period
#master.sink.console.period=15

#master.sink.console.unit=seconds
*.sink.Cloudwatch.class=org.apache.spark.metrics.sink.CloudwatchSink

# Enable CsvSink for all instances
*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink

# Polling period for CsvSink
*.sink.csv.period=1

*.sink.csv.unit=minutes

# Polling directory for CsvSink
*.sink.csv.directory=/tmp/

# Worker instance overlap polling period
#worker.sink.csv.period=10

#worker.sink.csv.unit=minutes

# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource

#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource

executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Binary file added core/lib/metrics-cloudwatch-3.0.1-1.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private[spark] class Executor(

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
def memInfo = ManagementFactory.getMemoryMXBean()

// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)

val metricRegistry = new MetricRegistry()
// TODO: It would be nice to pass the application name here
val sourceName = "executor.%s".format(executorId)
//val sourceName = "executor.%s".format(executorId)
//Changing the sourceName to gather all executors metric via Cloudwatch
val sourceName = "executors"

// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
Expand All @@ -59,7 +61,19 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})

metricRegistry.register(MetricRegistry.name("mem", "heapusedratio"), new Gauge[Double] {
override def getValue: Double = executor.memInfo.getHeapMemoryUsage().getUsed().toDouble/executor.memInfo.getHeapMemoryUsage().getMax().toDouble
})

// metricRegistry.register(MetricRegistry.name("mem", "heapused"), new Gauge[Long] {
// override def getValue: Long = executor.memInfo.getUsed()
//})

// metricRegistry.register(MetricRegistry.name("mem", "heapmax"), new Gauge[Long] {
// override def getValue: Long = executor.memInfo.getMax()
//})

// Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.spark.metrics.sink


import java.util.Properties
import com.codahale.metrics.MetricRegistry
import com.plausiblelabs.metrics.reporting.CloudWatchReporter
import com.amazonaws.auth.BasicAWSCredentials
import java.util.concurrent.TimeUnit.SECONDS
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.AmazonClientException
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider
import org.apache.spark.Logging
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient
import org.apache.spark.SecurityManager


private[spark] class CloudwatchSink(val property: Properties, val registry: MetricRegistry,securityMgr: SecurityManager) extends Sink with Logging {

println("started")
val reporter = new CloudWatchReporter.Builder(registry,"spark",new AmazonCloudWatchClient(getCredentials())).withEC2InstanceIdDimension().build()
val reporteras = new CloudWatchReporter.Builder(registry,"spark",new AmazonCloudWatchClient(getCredentials())).withInstanceIdDimension("spark-cluster").build()

override def start() {
reporter.start(60, SECONDS)
reporteras.start(60, SECONDS)
}

override def stop() {
reporter.stop()
reporteras.stop()
}

private def getCredentials():AWSCredentialsProvider = {

var credentialsProvider:AWSCredentialsProvider=null
try {
credentialsProvider = new InstanceProfileCredentialsProvider()
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials()
logInfo("Obtained credentials from the IMDS.")
} catch {
case e:AmazonClientException => {
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider()
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials()
logInfo("Obtained credentials from the properties file.")
}
}
credentialsProvider
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.spark.streaming.examples

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.StreamingContext._


object KinesisWordCount {

def main(args: Array[String]): Unit = {


if (args.length < 1) {
System.err.println("Usage: KinesisWordCount <master> <streamname>" + " [accesskey] [accessSecretKey]")
System.exit(1)
}

val master=args(0)
val kinesisStream=args(1)
val accesskey=args(2)
val accessSecretKey=args(3)


val ssc = new StreamingContext(master, "KinesisWordCOunt", Seconds(2),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

val lines = KinesisUtils.createStream(ssc, accesskey, accessSecretKey, kinesisStream)

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print
ssc.start

}
}
73 changes: 73 additions & 0 deletions external/AmazonKinesis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-amazonkinesis</artifactId>
<packaging>jar</packaging>
<name>Spark Project External Amazon Kinesis</name>
<url>http://spark.incubator.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 3d6ba56

Please sign in to comment.