Skip to content

Commit

Permalink
Merge pull request twitter#905 from twitter/jco/scalariform
Browse files Browse the repository at this point in the history
Enable scalariform
  • Loading branch information
johnynek committed Jun 20, 2014
2 parents d6e4bba + 995e737 commit 095c763
Show file tree
Hide file tree
Showing 115 changed files with 4,716 additions and 4,874 deletions.
13 changes: 12 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@ import sbtassembly.Plugin._
import AssemblyKeys._
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import com.typesafe.tools.mima.plugin.MimaKeys._
import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform._

import scala.collection.JavaConverters._

object ScaldingBuild extends Build {
val printDependencyClasspath = taskKey[Unit]("Prints location of the dependencies")

val sharedSettings = Project.defaultSettings ++ assemblySettings ++ Seq(
val sharedSettings = Project.defaultSettings ++ assemblySettings ++ scalariformSettings ++ Seq(
organization := "com.twitter",

//TODO: Change to 2.10.* when Twitter moves to Scala 2.10 internally
scalaVersion := "2.9.3",

crossScalaVersions := Seq("2.9.3", "2.10.3"),

ScalariformKeys.preferences := formattingPreferences,

javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),

javacOptions in doc := Seq("-source", "1.6"),
Expand Down Expand Up @@ -147,6 +151,13 @@ object ScaldingBuild extends Build {
maple
)

lazy val formattingPreferences = {
import scalariform.formatter.preferences._
FormattingPreferences().
setPreference(AlignParameters, false).
setPreference(PreserveSpaceBeforeArguments, true)
}

/**
* This returns the youngest jar we released that is compatible with
* the current.
Expand Down
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")

addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
130 changes: 63 additions & 67 deletions scalding-args/src/main/scala/com/twitter/scalding/Args.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,148 +18,144 @@ package com.twitter.scalding
case class ArgsException(message: String) extends RuntimeException(message)

/**
* The args class does a simple command line parsing. The rules are:
* keys start with one or more "-". Each key has zero or more values
* following.
*/
* The args class does a simple command line parsing. The rules are:
* keys start with one or more "-". Each key has zero or more values
* following.
*/
object Args {
/**
* Split on whitespace and then parse.
*/
def apply(argString : String) : Args = Args(argString.split("\\s+"))
* Split on whitespace and then parse.
*/
def apply(argString: String): Args = Args(argString.split("\\s+"))

/**
* parses keys as starting with a dash, except single dashed digits.
* All following non-dashed args are a list of values.
* If the list starts with non-dashed args, these are associated with the
* empty string: ""
**/
def apply(args : Iterable[String]) : Args = {
def startingDashes(word : String) = word.takeWhile { _ == '-' }.length
* parses keys as starting with a dash, except single dashed digits.
* All following non-dashed args are a list of values.
* If the list starts with non-dashed args, these are associated with the
* empty string: ""
*/
def apply(args: Iterable[String]): Args = {
def startingDashes(word: String) = word.takeWhile { _ == '-' }.length
new Args(
//Fold into a list of (arg -> List[values])
args
.filter{ a => !a.matches("\\s*") }
.foldLeft(List("" -> List[String]())) { (acc, arg) =>
val noDashes = arg.dropWhile{ _ == '-'}
if(arg == noDashes || isNumber(arg))
val noDashes = arg.dropWhile{ _ == '-' }
if (arg == noDashes || isNumber(arg))
(acc.head._1 -> (arg :: acc.head._2)) :: acc.tail
else
(noDashes -> List()) :: acc
}
//Now reverse the values to keep the same order
.map {case (key, value) => key -> value.reverse}.toMap
)
.map { case (key, value) => key -> value.reverse }.toMap)
}

def isNumber(arg : String) : Boolean = {
def isNumber(arg: String): Boolean = {
try {
arg.toDouble
true
}
catch {
case e : NumberFormatException => false
} catch {
case e: NumberFormatException => false
}
}
}

class Args(val m : Map[String,List[String]]) extends java.io.Serializable {
class Args(val m: Map[String, List[String]]) extends java.io.Serializable {

//Replace or add a given key+args pair:
def +(keyvals : (String,Iterable[String])) : Args = new Args(m + (keyvals._1 -> keyvals._2.toList))
def +(keyvals: (String, Iterable[String])): Args = new Args(m + (keyvals._1 -> keyvals._2.toList))

/**
* Does this Args contain a given key?
*/
def boolean(key : String) : Boolean = m.contains(key)
* Does this Args contain a given key?
*/
def boolean(key: String): Boolean = m.contains(key)

/**
* Get the list of values associated with a given key.
* if the key is absent, return the empty list. NOTE: empty
* does not mean the key is absent, it could be a key without
* a value. Use boolean() to check existence.
*/
def list(key : String) : List[String] = m.get(key).getOrElse(List())
* Get the list of values associated with a given key.
* if the key is absent, return the empty list. NOTE: empty
* does not mean the key is absent, it could be a key without
* a value. Use boolean() to check existence.
*/
def list(key: String): List[String] = m.get(key).getOrElse(List())

/**
* This is a synonym for required
*/
def apply(key : String) : String = required(key)
* This is a synonym for required
*/
def apply(key: String): String = required(key)

/**
* Gets the list of positional arguments
*/
def positional : List[String] = list("")
def positional: List[String] = list("")

/**
* return required positional value.
*/
def required(position: Int) : String = positional match {
* return required positional value.
*/
def required(position: Int): String = positional match {
case l if l.size > position => l(position)
case _ => throw ArgsException("Please provide " + (position + 1) + " positional arguments")
}

/**
* This is a synonym for required
*/
def apply(position : Int) : String = required(position)
* This is a synonym for required
*/
def apply(position: Int): String = required(position)

override def equals(other : Any) : Boolean = {
if( other.isInstanceOf[Args] ) {
override def equals(other: Any): Boolean = {
if (other.isInstanceOf[Args]) {
other.asInstanceOf[Args].m.equals(m)
}
else {
} else {
false
}
}

/**
* Equivalent to .optional(key).getOrElse(default)
*/
def getOrElse(key : String, default : String) : String = optional(key).getOrElse(default)
* Equivalent to .optional(key).getOrElse(default)
*/
def getOrElse(key: String, default: String): String = optional(key).getOrElse(default)

/**
* return exactly one value for a given key.
* If there is more than one value, you get an exception
*/
def required(key : String) : String = list(key) match {
* return exactly one value for a given key.
* If there is more than one value, you get an exception
*/
def required(key: String): String = list(key) match {
case List() => throw ArgsException("Please provide a value for --" + key)
case List(a) => a
case _ => throw ArgsException("Please only provide a single value for --" + key)
}

def toList : List[String] = {
def toList: List[String] = {
m.foldLeft(List[String]()) { (args, kvlist) =>
val k = kvlist._1
val values = kvlist._2
if( k != "") {
if (k != "") {
//Make sure positional args are first
args ++ ((("--" + k) :: values))
}
else {
} else {
// These are positional args (no key), put them first:
values ++ args
}
}
}

/**
* Asserts whether all the args belong to the given set of accepted arguments.
* If an arg does not belong to the given set, you get an error.
*/
def restrictTo(acceptedArgs: Set[String]) : Unit = {
* Asserts whether all the args belong to the given set of accepted arguments.
* If an arg does not belong to the given set, you get an error.
*/
def restrictTo(acceptedArgs: Set[String]): Unit = {
val invalidArgs = m.keySet.filter(!_.startsWith("scalding.")) -- (acceptedArgs + "" + "tool.graph" + "hdfs" + "local")
if (!invalidArgs.isEmpty) throw ArgsException("Invalid args: " + invalidArgs.map("--" + _).mkString(", "))
}

// TODO: if there are spaces in the keys or values, this will not round-trip
override def toString : String = toList.mkString(" ")
override def toString: String = toList.mkString(" ")

/**
* If there is zero or one element, return it as an Option.
* If there is a list of more than one item, you get an error
*/
def optional(key : String) : Option[String] = list(key) match {
* If there is zero or one element, return it as an Option.
* If there is a list of more than one item, you get an error
*/
def optional(key: String): Option[String] = list(key) match {
case List() => None
case List(a) => Some(a)
case _ => throw ArgsException("Please provide at most one value for --" + key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.twitter.scalding.avro

import cascading.avro.AvroScheme
import cascading.avro.PackedAvroScheme
import cascading.avro.local.{AvroScheme => LAvroScheme, PackedAvroScheme => LPackedAvroScheme}
import cascading.avro.local.{ AvroScheme => LAvroScheme, PackedAvroScheme => LPackedAvroScheme }
import com.twitter.scalding._
import org.apache.avro.Schema
import cascading.scheme.Scheme
Expand All @@ -26,8 +26,7 @@ import java.io.OutputStream
import java.util.Properties
import cascading.tuple.Fields
import collection.JavaConverters._
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}

import org.apache.hadoop.mapred.{ OutputCollector, RecordReader, JobConf }

trait UnpackedAvroFileScheme extends FileSource {
def schema: Option[Schema]
Expand Down Expand Up @@ -66,8 +65,7 @@ object UnpackedAvroSource {
new UnpackedAvroSource[T](Seq(path), schema)
}

case class UnpackedAvroSource[T](paths: Seq[String], schema: Option[Schema])
(implicit val conv: TupleConverter[T], tset: TupleSetter[T])
case class UnpackedAvroSource[T](paths: Seq[String], schema: Option[Schema])(implicit val conv: TupleConverter[T], tset: TupleSetter[T])

extends FixedPathSource(paths: _*)
with UnpackedAvroFileScheme with Mappable[T] with TypedSink[T] {
Expand All @@ -85,17 +83,13 @@ case class UnpackedAvroSource[T](paths: Seq[String], schema: Option[Schema])

override def setter[U <: T] = TupleSetter.asSubSetter[T, U](tset)


}


object PackedAvroSource {
def apply[T: AvroSchemaType : Manifest : TupleConverter](path: String)
= new PackedAvroSource[T](Seq(path))
def apply[T: AvroSchemaType: Manifest: TupleConverter](path: String) = new PackedAvroSource[T](Seq(path))
}

case class PackedAvroSource[T](paths: Seq[String])
(implicit val mf: Manifest[T], conv: TupleConverter[T], tset: TupleSetter[T], avroType: AvroSchemaType[T])
case class PackedAvroSource[T](paths: Seq[String])(implicit val mf: Manifest[T], conv: TupleConverter[T], tset: TupleSetter[T], avroType: AvroSchemaType[T])
extends FixedPathSource(paths: _*) with PackedAvroFileScheme[T] with Mappable[T] with TypedSink[T] {
override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](conv)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.avro.specific.SpecificRecord

import java.nio.ByteBuffer


trait AvroSchemaType[T] extends Serializable {
def schema: Schema
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@ import collection.JavaConverters._
import cascading.tuple.Fields

package object avro {
def writePackedAvro[T](pipe: TypedPipe[T], path: String)
(implicit mf: Manifest[T],
st: AvroSchemaType[T],
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
def writePackedAvro[T](pipe: TypedPipe[T], path: String)(implicit mf: Manifest[T],
st: AvroSchemaType[T],
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
val sink = PackedAvroSource[T](path)
pipe.write(sink)
}

def writeUnpackedAvro[T <: Product](pipe: TypedPipe[T], path: String, schema: Schema)
(implicit mf: Manifest[T],
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
def writeUnpackedAvro[T <: Product](pipe: TypedPipe[T], path: String, schema: Schema)(implicit mf: Manifest[T],
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
mode: Mode): Unit = {
import Dsl._
val sink = UnpackedAvroSource[T](path, Some(schema))
val outFields = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import cascading.pipe.Pipe
import cascading.tuple.{ Fields, TupleEntry }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.slf4j.{Logger, LoggerFactory => LogManager}
import org.slf4j.{ Logger, LoggerFactory => LogManager }

/**
* Checkpoint provides a simple mechanism to read and write intermediate results
Expand Down Expand Up @@ -115,13 +115,13 @@ object Checkpoint {
}
}

// Wrapper for Checkpoint when using a TypedPipe
// Wrapper for Checkpoint when using a TypedPipe
def apply[A](checkpointName: String)(flow: => TypedPipe[A])(implicit args: Args, mode: Mode, flowDef: FlowDef,
conv: TupleConverter[A], setter: TupleSetter[A]): TypedPipe[A] = {
val rPipe = apply(checkpointName, Dsl.intFields(0 until conv.arity)) {
flow.toPipe(Dsl.intFields(0 until conv.arity))
}
TypedPipe.from[A](rPipe,Dsl.intFields(0 until conv.arity))
TypedPipe.from[A](rPipe, Dsl.intFields(0 until conv.arity))
}

// Helper class for looking up checkpoint arguments, either the base value from
Expand Down
Loading

0 comments on commit 095c763

Please sign in to comment.