Skip to content

Commit

Permalink
[SPARK-12692][BUILD][STREAMING] Scala style: Fix the style violation …
Browse files Browse the repository at this point in the history
…(Space before "," or ":")

Fix the style violation (space before , and :).
This PR is a followup for apache#10643.

Author: Kousuke Saruta <[email protected]>

Closes apache#10685 from sarutak/SPARK-12692-followup-streaming.
  • Loading branch information
sarutak authored and rxin committed Jan 12, 2016
1 parent aaa2c3b commit 39ae04e
Show file tree
Hide file tree
Showing 30 changed files with 108 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import java.net.ServerSocket
import java.util.Random

/** Represents a page view on a website with associated dimension data. */
class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int)
class PageView(val url: String, val status: Int, val zipCode: Int, val userID: Int)
extends Serializable {
override def toString() : String = {
override def toString(): String = {
"%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
}
}

object PageView extends Serializable {
def fromString(in : String) : PageView = {
def fromString(in: String): PageView = {
val parts = in.split("\t")
new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
}
Expand All @@ -58,9 +58,9 @@ object PageViewGenerator {
404 -> .05)
val userZipCode = Map(94709 -> .5,
94117 -> .5)
val userID = Map((1 to 100).map(_ -> .01) : _*)
val userID = Map((1 to 100).map(_ -> .01): _*)

def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
def pickFromDistribution[T](inputMap: Map[T, Double]): T = {
val rand = new Random().nextDouble()
var total = 0.0
for ((item, prob) <- inputMap) {
Expand All @@ -72,15 +72,15 @@ object PageViewGenerator {
inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
}

def getNextClickEvent() : String = {
def getNextClickEvent(): String = {
val id = pickFromDistribution(userID)
val page = pickFromDistribution(pages)
val status = pickFromDistribution(httpStatus)
val zipCode = pickFromDistribution(userZipCode)
new PageView(page, status, zipCode, id).toString()
}

def main(args : Array[String]) {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
System.exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import org.slf4j.{Logger, LoggerFactory}
private[sink] trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = null
@transient private var _log: Logger = null

// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
if (_log == null) {
initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
className = className.substring(0, className.length - 1)
}
log_ = LoggerFactory.getLogger(className)
_log = LoggerFactory.getLogger(className)
}
log_
_log
}

// Log methods that take only a String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ import org.apache.spark.util.Utils

private[streaming]
class FlumeInputDStream[T: ClassTag](
ssc_ : StreamingContext,
_ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {

override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel, enableDecompression)
Expand All @@ -60,7 +60,7 @@ class FlumeInputDStream[T: ClassTag](
* which are not serializable.
*/
class SparkFlumeEvent() extends Externalizable {
var event : AvroFlumeEvent = new AvroFlumeEvent()
var event: AvroFlumeEvent = new AvroFlumeEvent()

/* De-serialize from bytes. */
def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -75,12 +75,12 @@ class SparkFlumeEvent() extends Externalizable {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
in.readFully(keyBuff)
val key : String = Utils.deserialize(keyBuff)
val key: String = Utils.deserialize(keyBuff)

val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
in.readFully(valBuff)
val value : String = Utils.deserialize(valBuff)
val value: String = Utils.deserialize(valBuff)

headers.put(key, value)
}
Expand Down Expand Up @@ -109,7 +109,7 @@ class SparkFlumeEvent() extends Externalizable {
}

private[streaming] object SparkFlumeEvent {
def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
event
Expand All @@ -118,13 +118,13 @@ private[streaming] object SparkFlumeEvent {

/** A simple server that implements Flume's Avro protocol. */
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
override def append(event: AvroFlumeEvent): Status = {
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}

override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ class DirectKafkaInputDStream[
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
ssc_ : StreamingContext,
_ssc: StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {
) extends InputDStream[R](_ssc) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class KafkaInputDStream[
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
ssc_ : StreamingContext,
_ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {

def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
private var ssc: StreamingContext = _
private var tempDirectory: File = null

override def beforeAll() : Unit = {
override def beforeAll(): Unit = {
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class MQTTInputDStream(
ssc_ : StreamingContext,
_ssc: StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {
) extends ReceiverInputDStream[String](_ssc) {

private[streaming] override def name: String = s"MQTT stream [$id]"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class TwitterInputDStream(
ssc_ : StreamingContext,
_ssc: StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
) extends ReceiverInputDStream[Status](ssc_) {
) extends ReceiverInputDStream[Status](_ssc) {

private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
Expand Down
12 changes: 12 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ object MimaExcludes {
) ++ Seq(
// SPARK-12510 Refactor ActorReceiver to support Java
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver")
) ++ Seq(
// SPARK-12692 Scala style: Fix the style violation (Space before "," or ":")
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log__="),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler.org$apache$spark$streaming$flume$sink$Logging$$log__="),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$log__="),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class CheckpointWriter(
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
private var fs_ : FileSystem = _
private var _fs: FileSystem = _

@volatile private var latestCheckpointTime: Time = null

Expand Down Expand Up @@ -298,12 +298,12 @@ class CheckpointWriter(
}

private def fs = synchronized {
if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
fs_
if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
_fs
}

private def reset() = synchronized {
fs_ = null
_fs = null
}
}

Expand Down Expand Up @@ -370,8 +370,8 @@ object CheckpointReader extends Logging {
}

private[streaming]
class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
extends ObjectInputStream(inputStream_) {
class ObjectInputStreamWithLoader(_inputStream: InputStream, loader: ClassLoader)
extends ObjectInputStream(_inputStream) {

override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookMan
* of the context by `stop()` or by an exception.
*/
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
_sc: SparkContext,
_cp: Checkpoint,
_batchDur: Duration
) extends Logging {

/**
Expand Down Expand Up @@ -126,18 +126,18 @@ class StreamingContext private[streaming] (
}


if (sc_ == null && cp_ == null) {
if (_sc == null && _cp == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}

private[streaming] val isCheckpointPresent = (cp_ != null)
private[streaming] val isCheckpointPresent = (_cp != null)

private[streaming] val sc: SparkContext = {
if (sc_ != null) {
sc_
if (_sc != null) {
_sc
} else if (isCheckpointPresent) {
SparkContext.getOrCreate(cp_.createSparkConf())
SparkContext.getOrCreate(_cp.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
Expand All @@ -154,13 +154,13 @@ class StreamingContext private[streaming] (

private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
_cp.graph.setContext(this)
_cp.graph.restoreCheckpointData()
_cp.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph.setBatchDuration(_batchDur)
newGraph
}
}
Expand All @@ -169,15 +169,15 @@ class StreamingContext private[streaming] (

private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
sc.setCheckpointDir(_cp.checkpointDir)
_cp.checkpointDir
} else {
null
}
}

private[streaming] val checkpointDuration: Duration = {
if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
if (isCheckpointPresent) _cp.checkpointDuration else graph.batchDuration
}

private[streaming] val scheduler = new JobScheduler(this)
Expand Down Expand Up @@ -246,7 +246,7 @@ class StreamingContext private[streaming] (
}

private[streaming] def initialCheckpoint: Checkpoint = {
if (isCheckpointPresent) cp_ else null
if (isCheckpointPresent) _cp else null
}

private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
Expand Down Expand Up @@ -460,7 +460,7 @@ class StreamingContext private[streaming] (
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
val conf = _sc.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in
* the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = {
def countByWindow(windowDuration: Duration, slideDuration: Duration): JavaDStream[jl.Long] = {
dstream.countByWindow(windowDuration, slideDuration)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {
class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
extends InputDStream[T](_ssc) {

require(rdd != null,
"parameter rdd null is illegal, which will lead to NPE in the following transformation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]

@transient private var fileSystem : FileSystem = null
@transient private var fileSystem: FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]

/**
Expand Down
Loading

0 comments on commit 39ae04e

Please sign in to comment.