Skip to content
forked from NICTA/scoobi

A Scala productivity framework for Hadoop.

Notifications You must be signed in to change notification settings

rathboma/scoobi

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

69 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Scoobi - Bringing the productivity of Scala to Hadoop

Hadoop MapReduce is awesome, but it seems a little bit crazy when you have to write this to count words. Wouldn't it be nicer if you could simply write what you want to do:

  val lines = fromTextFile("hdfs://in/...")

  val counts = lines.flatMap(_.split(" "))
                    .map(word => (word, 1))
                    .groupByKey
                    .combine(_+_)

  persist(toTextFile(counts, "hdfs://out/..."))

This is what Scoobi is all about. Scoobi is a Scala library that focuses on making you more productive at building Hadoop applications. It stands on the functional programming shoulders of Scala and allows you to just write what you want rather than how to do it.

Scoobi is a library that leverages the Scala programming language to provide a programmer friendly abstraction around Hadoop's MapReduce to facilitate rapid development of analytics and machine-learning algorithms.

Quick start

To start using Scoobi you will obviously need Scala and Hadoop. In addition, the Scoobi library and Scoobi applications use sbt for dependency managment and building (also check out the prerequisites below).

To build Scoobi:

  $ cd scoobi
  $ sbt publish-local

Then build and package one of the examples:

  $ cd examples/wordCount
  $ sbt package-hadoop

Finally, run on Hadoop:

  $ hadoop jar ./target/Scoobi_Word_Count-hadoop.jar <input> <output>

Overview

Scoobi is centered around the idea of a distributed collection, which is implemented by the DList (distributed list) class. In a lot of ways, DList objects are similar to normal Scala List objects: they are parameterized by a type and they provide methods that can be used to produce new DList objects, often parameterized by higher-order functions. For example:

  // Converting a List[Int] to a List[String] keeping only evens
  val stringList = intList filter { _ % 2 == 0 } map { _.toString }

  // Converting a DList[Int] to a DList[String] keeping only evens
  val stringDList = intDList filter { _ % 2 == 0 } map { _.toString }

However, unlike a Scala List object, the contents of DList objects are not stored on the JVM heap but stored in HDFS. Secondly, calling DList methods will not immediately result in data being generated in HDFS. This is because, behind the scenes, Scoobi implements a staging compiler. The purpose of DList methods are to construct a graph of data transformations. Then, the act of persisting a DList triggers the compilation of the graph into one or more MapReduce jobs and their execution.

So, DList objects essentially provide two abstractions:

  1. The contents of a DList object abstracts the storage of data and files in HDFS;
  2. Calling methods on DList objects to transform and manipulate them abstracts the mapper, combiner, reducer and sort-and-shuffle phases of MapReduce.

So, what are some of the advantages of using Scoobi?

  • The collections abstraction implemented by DList is a familiar one: The methods for the DList class have been designed to be the same or as similar to those implemented in the standard Scala collections. There aren't as many methods, but if you grok the semantics of Scala collections, you shouldn't have too much trouble getting up to speed with Scoobi;
  • The DList class is strongly typed: Like the Scala collections, the DList interface is strongly typed so that more errors are caught at compile time. This is a major improvement over standard Hadoop MapReduce where type-based run-time errors often occur;
  • The DList class can be easily paramertised on rich data types: Unlike Hadoop MapReduce, which requires that you go off implementing a myriad of classes that implement the Writable interface, Scoobi allows DList objects to be paramertised by normal Scala types. This includes the primitive types (e.g. Int, String, Double), tuple types (with arbitrary nesting, e.g. (String, (Int, Char), Double)) as well as case classes. This is all implemented without sacrificing performance in serialization and derserialization;
  • Scoobi applications are optimised accross library boundaries: Over time it makes sense to partition Scoobi code into seprate logical entities - into seprate classes and libraries. The advantage of Scoobi is that it's staging compiler works accross library boundaries. Therefore you'll get the same Hadoop performance as if you had everything in the one file but with the productivity gains of having modular software;
  • It's Scala: Of course, with Scala you don't loose access to those precious Java libraries, but you also get functional programming and concise syntax which makes writing Hadoop applications with Scoobi very productive ... and fun!

Word count decomposed

Let's take a step-by-step look at the simple word count example from above. The complete application for word count looks like this:

  import com.nicta.scoobi.Scoobi._
  import com.nicta.scoobi.DList._
  import com.nicta.scoobi.io.text.TextInput._
  import com.nicta.scoobi.io.text.TextOutput._

  object WordCount {
    def main(allArgs: Array[String]) = withHadoopArgs(allArgs) { args =>

      val lines: DList[String] = fromTextFile(args(0))

      val counts: DList[(String, Int)] = lines.flatMap(_.split(" "))
                                              .map(word => (word, 1))
                                              .groupByKey
                                              .combine(_+_)

      persist(toTextFile(counts, args(1)))
    }
  }

Our word count example is implemented by the object WordCount. First, there are few imports to specify for brining in DList and text I/O. The guts of the implementation are within the eclosing braces of the withHadoopArgs control structure. The purpose of withHadoopArgs is to parse the generic Hadoop command line options before passing the remaing arguements to the guts of the implementation. This implementation uses the remaing two arguements for the input (args(0)) and output directory (args(1)).

Within the implementation guts, the first task is to construct a DList representing the data located at the input directory. In this situation, because the input data are simple text files, we can use the fromTextFile method that takes our input directory as an arguement and returns a DList[String] object. Here our DList object is a distributed collection where each collection element is a line from the input data and it's assigned to lines.

The second task is to compute a DList of word counts given all the lines of text from our input data. This is implemented in four steps:

  1. A flatMap is performed on lines. Like List's flatMap, a parameterising function is supplied which will take as its input a given line (a String) and will return 0 or more Strings as its result. In this case, that function is the method split which will split the input string (a line) into a collection of words based on the occurence of whitespace. The result of the flatMap then is another DList[String] representing a distributed collection of words.

  2. A map is performed on the distributed collection of words. Like List's map, a parameterising function is supplied which takes as its input a given word (a String) and will reutrn another value. In this case the supplied function takes the input word and returns a pair: the word and the vaule 1. The resulting object is a new distributed collection of type DList[(String, Int)].

  3. A groupByKey is performed on the (String, Int) distributed colllection. groupByKey has no direct counterpart in List (although there is a groupBy defined on DLists). groupByKey must be called on a key-value DList object else the program will not type check. The effect of groupByKey is to collect all distributed collection values with the same key. In this case the DList object is of type (String, Int) so a new DList object will be returned of type (String, Iterable[Int]). That is, the counts for the same words will be grouped together.

  4. To get the total count for each word, a combine is performed. combine also has no counterpart in List but its semantics are to take a DList[(K, Iterable[V])] and return a DList[(K, V)] by reducing all the values. It is paramertised by a function of type (V, V) => V that must be associative. In our case we are simply performing addition to sum all the counts.

The final task is to take the counts object, which represents counts for each word, and persist it. In this case we will simply persist it as a text file, whose path is specfied by the second command line arguement, using toTextFile. Note that toTextFile is used within persist. Although not demonstrated in this example, persist takes a variable number of arguements, each of which specifies what DList is being persisted and how.

Until persist is called, our application will only be running on the local client. The act of calling persist, along with the DList(s) to be persisted, will trigger Scoobi's staging compiler to take the sequence of DList transformations and turn them into one or more Hadoop MapReduce jobs. In this example Scoobi will generate a single MapReduce job that would be executed:

  • The functionality associated with the flatMap and map will become part of a mapper tasks;
  • The transformation associated with groupByKey will be occur as a consequence of the sort-and-shuffle phase;
  • The functionality of the combine will become part of both a combiner and reducer task.

The word count example is one of a number of examples included with Scoobi. The top level directory examples contains a number of self-contained tutorial-like examples, as well as a guide to building and deploying them. This is an additional starting point for learning and using scoobi.

Loading and persisting data

DList objects are merely nodes in a graph describing a series of data computation we want to perform. However, at some point we need to specify what the inputs and outputs to that compuation are. We have already seen this in the previous example with fromTextFile(...) and persist(toTextFile(...)). The former is an example of loading data and the latter is an example of persisting data.

Loading

Most of the time when we create DList objects, it is the result of calling a method on another DList object (e.g. map). Loading, on the other hand, is the only way to create a DList object that is not based on any others. It is the means by which we associate a DList object with some data files on HDFS. Scoobi provides functions to create DList objects associated with text files on HDFS, which are implemented in the object com.nicta.scoobi.io.text.TextInput.

The simplest, which we have seen already, is fromTextFile. It takes a path (globs are supported) to text files on HDFS (or which ever file sytem Hadoop has been configured for) and returns a DList[String] object where each element of the distributed collection refers to one of the lines of text from the files.

Often we are interested in loading delimited text files, for example, comma separated value (CSV) files. In this case, we can use fromTextFile followed by a map to pull out fields of interest:

  // load CSV with schema "id,first-name,second-name,age"
  val lines: DList[String] = fromTextFile("hdfs://path/to/CVS/files/*")

  // pull out id and second-name
  val names: DList[(Int, String)] = lines map { line =>
    val fields = line.split(",")
    (fields(0).toInt, fields(2))
  }

This works fine, but because it's such a common task, TextInput also provides the function extractFromDelimitedTextFile specifically for these types of field extractions:

  // load CSV and pull out id and second-name
  val names: DList[(Int, String)] = extractFromDelimitedTextFile(",", "hdfs://path/to/CVS/files/*") {
    case id :: first-name :: second-name :: age :: _ => (id.toInt, second-name)
  }

When using extractFromDelimitedTextFile, the first arguement specifies the deliminator and the second is the path. However, there is also a second parameter list which is used to specify what to do with fields once they are separated out. This is specified by supplying a partial function that takes a list of seprated String fields as its input and returns a value whose type will set the type of the resulting DList - i.e. a PartialFunction[List[String], A] will create a DList[A] (where A is (Int, String) above). In this example, we use Scala's pattern matching feature to pull out the four fields and return the first and third.

One of the advantages of this approach is that we have at our disposal all of the Scala pattern matching features, and becuase we are providing a partial function, any fields that don't match against the supplied pattern will not be present in the returned DList. This allows us implement simple filtering inline with the extraction:

  // load CSV and pull out id and second-name if first-name is "Harry"
  val names: DList[(Int, String)] = extractFromDelimitedTextFile(",", "hdfs://path/to/CVS/files/*") {
    case id :: "Harry" :: second-name :: age :: _ => (id.toInt, second-name)
  }

We can of couse supply multiple patterns:

  // load CSV and pull out id and second-name if first-name is "Harry" or "Lucy"
  val names: DList[(Int, String)] = extractFromDelimitedTextFile(",", "hdfs://path/to/CVS/files/*") {
    case id :: "Harry" :: second-name :: age :: _ => (id.toInt, second-name)
    case id :: "Lucy"  :: second-name :: age :: _ => (id.toInt, second-name)
  }

And, a more interesting example is when the value of one field influences the semantics of another. For example:

  val thisYear: Int = ...

  // load CSV with schema "event,year,year-designation" and pull out event and how many years ago it occured
  val yearsAgo: DList[(String, Int)] = extractFromDelimitedTextFile(",", "hdfs://path/to/CVS/files/*") {
    case event :: year :: "BC" :: _ => (event, thisYear + year.toInt)
    case event :: year :: "AD" :: _ => (event, thisYear - year.toInt)
  }

These are nice features, however, one of the probelms with these examples is their conversion of a String fields into an Int. If the field is not supplied (e.g. empty string) or the files are simply erroneous, a run-time exception will occur when toInt is called. This exception will be caught by Hadoop and likely cause the MapReduce job to fail. As a solution to this problem, TextInput provides Scala extractors for Ints, Longs and Doubles. Using the Int extractor we can re-write one of the above examples:.

  // load CSV and pull out id and second-name
  val names: DList[(Int, String)] = extractFromDelimitedTextFile(",", "hdfs://path/to/CVS/files/*") {
    case Int(id) :: first-name :: second-name :: Int(age) :: _ => (id, second-name)
  }

Here, the pattern will only match if the id (and age) field(s) can be converted successfully from a String to an Int. If not, the pattern will not match and that line will not be extracted into the resulting DList.

Persisting

Persisting is the mechanism Scoobi uses for specifying that the result of executing the computational graph associated with a DList object is to be associated with a particular data file on HDFS. There are two parts to persisting:

  1. Calling persist, which bundles all DList objects being persisted;
  2. Specifying how each DList object is to be persisted.

Scoobi currently only provides one mechanism for specifying how a DList is to be persisted. It is toTextFile and is implemented in the object com.nicta.scoobi.io.text.TextOutput. As we have seen previously, toTextFile takes two arguements: the DList object being persisted and the directory path to write the resulting data:

  val rankings: DList[(String, Int)] = ...

  persist(toTextFile(rankings, "hdfs://path/to/output"))

persist can of course bundle together more than one DList. For example:

  val rankings: DList[(String, Int)] = ...
  val rankings-reverse: DList[(Int, String)] = rankings map { swap }
  val rankings-example: DList[(Int, String)] = rankings-reverse.groupByKey.map{ case (ranking, items) => (ranking, items.head) }

  persist(toTextFile(rankings,         "hdfs://path/to/output"),
          toTextFile(rankings-reverse, "hdfs://path/to/output-reverse"),
          toTextFile(rankings-example, "hdfs://path/to/output-example"))

As mentioned previously, persist is the trigger for executing the computational graph associated with its DList objects. By bundling DList objects together, persist is able to determine computations that are shared by those outputs and ensure that they are only performed once.

Data types

We've seen in many of the examples that it's possible for DList objects to be parameterised by normal Scala primitive (value) types. Not surprisingly, Scoobi supports DList objects that are paramertised by any of the Scala primitive types:

  val x: DList[Byte] = ...
  val x: DList[Char] = ...
  val x: DList[Int] = ...
  val x: DList[Long] = ...
  val x: DList[Double] = ...

And as we've also see, although not a primitive, Scoobi supports DLists of Strings:

  val x: DList[String] = ...

Some of the examples also use DList objects that are paramertised by a pair (Scala Tuple2 type). In fact, Scoobi supports DList objects that are paramertised by Scala tuples up to arity 8, and in addition, supports arbitrary nesting:

  val x: DList[(Int, String)] = ...
  val x: DList[(String, Long)] = ...
  val x: DList[(Int, String, Long)] = ...
  val x: DList[(Int, (String, String), Int, (Long, Long, Long))] = ...
  val x: DList[(Int, (String, (Long, Long)), Char)] = ...

Finally, Scoobi also supports DList objects that are paramertised by the Scala Option and Either types, which can also be combined with any of the Tuple and primitive types:

  val x: Option[Int] = ...
  val x: Option[String] = ...
  val x: Option[(Long, String)] = ...

  val x: Either[Int, String] = ...
  val x: Either[String, (Long, Long)] = ...
  val x: Either[Long, Either[String, Int]] = ...
  val x: Either[Int, Option[Long]] = ...

Notice that in all these cases, the DList ojbect is paramertised by a standard Scala type and not some wrapper type. This is really convenient. It means, for example, that the use of a higher-order function like map can directly call any of the methods associated with those types. In contrast, programming MapReduce jobs directly using Hadoop's API requires that all types implement the Writable interface, resulting in the use of wrapper types such as IntWritable rather than just int. Of course the reason for this is that Writable specifies methods for serialisation and deserialisation of data within the Hadoop framework. However, given that DList objects eventually result in code that is executed by the Hadoop framework, how is serialisation and deserialisation specified?

Scoobi requires that the type parameterising a DList object has an implementation of the WireFormat type class (Scala context bound). Thus, the DList class is actually specifed as:

  class DList[A : WireFormat] { ... }

If the compiler can not find a WireFormat implemenation for the type parameterising a specific DList object, that code will not compile. Implementations of WireFormat specify serialisation and deserialisation in their toWire and fromWire methods, which end up finding their way into Writable's write and readFields methods.

To make life easy, the WireFormat object includes WireFormat implementations for the types listed above (that is why they work out of the box). However, the real advantage of using type classes is they allow you to extend the set of types that can be used with DList objects and that set can include types that already exist, maybe even in some other compilation unit. So long as a type has a WireFormat implementation, it can parameterise a DList. This is extremely useful because whilst, say, you can represnt a lot with nested tuples, much can be gained in terms of type safety, readability and maintenance by using custom types. For example, say we were building an application to analyse stock ticker-data. In that situation it would be nice to work with DList[Tick] objects. We can do that if we write a WireFormat implementation for Tick:

  case class Tick(val date: Int, val symbol: String, val price: Double)

  implicit def TickFmt = new WireFormat[Tick] {
    def toWire(tick: Tick, out: DataOutput) = {
      out.writeInt(tick.date)
      out.writeUTF(tick.symbol)
      out.writeDouble(tick.price)
    }
    def fromWire(in: DataInput): Tick = {
      val date = in.readInt
      val symbol = in.readUTF
      val price = in.readDouble
      Tick(date, symbol, price)
    }
    def show(tick: Tick): String = tick.toString
  }

  val ticks: DList[Tick] = ...  /* OK */

Then we can actually make use of the Tick type:

  /* Funtion to compute Hi and Low for a stock for a given day */
  def hilo(ts: Iterable[Tick]): (Double, Double) = {
    val start = ts.head.price
    ts.tail.foldLeft((start, start)) { case ((high, low), tick) => (max(high, tick.price), min(low, tick.price)) }
  }

  /* Group tick data by date and symbol */
  val ticks: DList[Tick] = ...
  val ticksGrouped = ticks.groupBy(t => (t.symbol, t.date))

  /* Compute highs and lows for each stock for each day */
  val highLow = ticksGrouped map { case ((symbol, date), ticks) => (symbol, date, hilo(ticks)) }

Notice that by using the custom type Tick it's obvious what fields we are using. If instead the type of ticks was DList[(Int, String, Double)], the code could be far less readable and maintenance would be more difficult if, for example, we added new fields to Tick or modified the order of existing fields.

Being able to have DList objects of custom types is a huge productivity boost, however, there is still the boiler-plate, mechanical work associated with the WireFormat implementation. To overcome this, the WireFormat object also provides a utility function called mkWireFormat that automatically constructs a WireFormat for case classes:

  case class Tick(val date: Int, val symbol: String, val price: Double)
  implicit val tickFmt = mkWireFormat(Tick)(Tick.unapply _)

  val ticks: DList[Tick] = ...  /* Still OK */

mkWireFormat takes as arguements the case class's automatically generated apply and unapply methods. The only requirement on case classes when using mkWireFormat is that all its fields have WireFormat implementations. If not, your DList objects won't type check. The upside to this is that all of the types above that do have WireFormat implementations can be fields in a case class when used in conjunction with mkWireFormat:

  case class Tick(val date: Int, val symbol: String, val price: Double, val high-low: (Double, Double))
  implicit val tickFmt = mkWireFormat(Tick)(Tick.unapply _)

  val ticks: DList[Tick] = ...  /* Amazingly, still OK */

Of course, this will also extend to other case classes as long as they have WireFormat implementations. Thus, it's possible to have nested case classes that can paramertise DList objects:

  case class PriceAttr(val: price: Double, val high-low: (Double, Double))
  implicit val priceAttrFmt = mkWireFormat(PriceAttr)(PriceAttr.unapply _)

  case class Tick(val date: Int, val symbol: String, val attr: PriceAttr)
  implicit val tickFmt = mkWireFormat(Tick)(Tick.unapply _)

  val ticks: DList[Tick] = ...  /* That's right, amazingly, still OK */

In summary, the way data types work in Scoobi is definiately one of its killer features, basically because they don't get in the way!

Creating a Scoobi project with sbt

Scoobi projects are generally developed with sbt, and to simplify the task of building and packaging a project for running on Hadoop, it's a really handy to use the sbt plugin sbt-scoobi. Here are a few steps for creating a new project:

Create a new Scoobi application and add some code:

    $ mkdir my-app
    $ cd my-app
    $ mkdir -p src/main/scala
    $ vi src/main/scala/MyApp.scala

To use the sbt-scoobi plugin we need to include a project/plugins.sbt file with the following contents:

    resolvers += "Scoobi deploy" at "http://nicta.github.com/sbt-scoobi/repository/"

    addSbtPlugin("com.nicta" %% "sbt-scoobi" % "0.0.1")

And, we can add a pretty standard build.sbt that has a dependency on Scoobi:

    name := "MyApp"

    version := "0.1"

    scalaVersion := "2.9.1"

    libraryDependencies += "com.nicta" %% "scoobi" % "0.0.1" % "provided"

The provided is added to the scoobi dependency to let sbt know that Scoobi is provided by the sbt-plugin when it packages everything in a jar. If you don't included this provided nothing bad will happen, but the jar will contain some Scoobi dependencies that are not strictly required.

We can now use sbt to easily build and package our application into a self-contained executable jar to feed directly into Hadoop:

    $ sbt package-hadoop
    $ hadoop jar ./target/MyApp-app-hadoop-0.1.jar <args>

Note that there appears to be a OSX-specific issue associated with calling hadoop in this manner requiring the jar to be added to HADOOP_CLASSPATH and then hadoop being given the correct object to run. e.g.:

    $ export HADOOP_CLASSPATH=$PWD/target/Scoobi_Word_Count-hadoop-0.1.jar
    $ hadoop WordCount inputFile/to/wordcount nonexistent/outputdir

Prerequisites

Scoobi has the following requirements:

  • Hadoop 0.20.2
  • Scala 2.9.1: Note that this is typcially set in build.sbt
  • Sbt 0.11.0: Note that when creating the sbt launcher script, it is worth increasing the max heapspace (the number given to -Xmx) to prevent sbt running out of memory when building.

Issues

Please use our GitHub issue tracker for any bugs, issues, problems or questions you might have. Please tag questions accordingly.

Contributions

Scoobi is released under the Apache license v2. We welcome contributions of bug fixes and/or new features via GitHib pull requests.

About

A Scala productivity framework for Hadoop.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala 88.4%
  • Java 11.6%