Skip to content

Commit

Permalink
Refactor implementation of data I/O
Browse files Browse the repository at this point in the history
Refactor how I/O is implemented to make it simpler to support different
I/O types. For example, without this refactoring, it is difficult to
implement "native" support for Sequence files.

The primary refactoring is for MapReduce jobs generated by Scoobi to
use InputFormats and OutputFormats specific to the type of I/O required.
This is in contrast to the current implementation where I/O was
hard-coded to text files with BridgeStores being a special case.
Using InputFormats and OutputFormats in their raw form allows the many
exsiting classes to be leveraged. A consequence of this change is that
InputConverters and OutputConverters are implmeneted to convert between
the key-value types of InputFormats/OutputFormats and the types of
DLists.

As a demonstration of this refactoring, also implmenent support for
reading and writing Sequence Files.
  • Loading branch information
blever committed Feb 12, 2012
1 parent e1f568e commit 616096b
Show file tree
Hide file tree
Showing 37 changed files with 695 additions and 530 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/nicta/scoobij/DGroupedTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nicta.scoobij;


/**
* A DGroupTable is a DTable<K, V> where V is actually a list of values. It is
* the return result of DList.groupByKey. A DGroupedTable has the additional
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/nicta/scoobij/DTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nicta.scoobij;


/**
* DTable is a DList that contains a key-value pair
*/
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/nicta/scoobij/DoFn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nicta.scoobij;


/**
* Used by DList.parallelDo to process data. Setup is called before the any
* elements. Then process is called for each element And then cleanup is called
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/nicta/scoobij/OrderedWireFormatType.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.nicta.scoobij;


/**
* A WireFormatType that also contains ordering, this is mainly used for
* comparison and grouping (e.g. DList.groupByKey)
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/nicta/scoobij/WireFormatType.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
*/
package com.nicta.scoobij;

import scala.reflect.Manifest;

/**
* Gives runtime information based on the type. If you have a DList<Integer>'s
* you'll often need a WireFormatType<Integer>, this is largely because Java's
* type erasure -- but also contains a wireFormat which says how to serialize
* and deserialize this type
* */
public interface WireFormatType<T> {
public Class<T> typeInfo();
public Manifest<T> typeInfo();

public com.nicta.scoobi.WireFormat<T> wireFormat();
}
55 changes: 35 additions & 20 deletions src/main/java/com/nicta/scoobij/WireFormats.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import java.io.DataOutput;

import scala.Tuple2;
import scala.reflect.Manifest;

import com.nicta.scoobi.WireFormat;
import com.nicta.scoobij.impl.Conversions;

public class WireFormats {

Expand Down Expand Up @@ -46,33 +49,45 @@ public int compare(Integer a, Integer b) {
};
}

public static <T, V> WireFormat<scala.Tuple2<T, V>> wireFormatPair(
final WireFormat<T> ord1, final WireFormat<V> ord2) {

return new WireFormat<scala.Tuple2<T, V>>() {
public static <T, V> WireFormatType<scala.Tuple2<T, V>> wireFormatPair(
final WireFormatType<T> ord1, final WireFormatType<V> ord2) {

private static final long serialVersionUID = 1L;
return new WireFormatType<scala.Tuple2<T, V>>() {

@Override
public Tuple2<T, V> fromWire(DataInput arg0) {

T t = ord1.fromWire(arg0);
V v = ord2.fromWire(arg0);

return new Tuple2<T, V>(t, v);
public com.nicta.scoobi.WireFormat<scala.Tuple2<T,V>> wireFormat() {
return new WireFormat<scala.Tuple2<T, V>>() {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<T, V> fromWire(DataInput arg0) {

T t = ord1.wireFormat().fromWire(arg0);
V v = ord2.wireFormat().fromWire(arg0);

return new Tuple2<T, V>(t, v);
}

@Override
public String show(Tuple2<T, V> arg0) {
return "Tuple2(" + ord1.wireFormat().show(arg0._1) + ", "
+ ord2.wireFormat().show(arg0._2) + ")";
}

@Override
public void toWire(Tuple2<T, V> arg0, DataOutput arg1) {
ord1.wireFormat().toWire(arg0._1, arg1);
ord2.wireFormat().toWire(arg0._2, arg1);
}
};
}

@Override
public String show(Tuple2<T, V> arg0) {
return "Tuple2(" + ord1.show(arg0._1) + ", "
+ ord2.show(arg0._2) + ")";
}
public Manifest<scala.Tuple2<T, V>> typeInfo() {
return Conversions.toManifest(scala.Tuple2.class, ord1.typeInfo(), ord2.typeInfo());
};


@Override
public void toWire(Tuple2<T, V> arg0, DataOutput arg1) {
ord1.toWire(arg0._1, arg1);
ord2.toWire(arg0._2, arg1);
}

};
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/nicta/scoobij/impl/DGroupedTableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public DTable<K, V> combine(Combiner<V> combiner,

return new DTableImpl<K, V>(getImpl().combine(
Conversions.toScala(combiner), evidence,
Conversions.toManifest(keyFormat.typeInfo()),
keyFormat.typeInfo(),
keyFormat.wireFormat(),
Conversions.toScala(keyFormat.ordering()),
Conversions.toManifest(valueBundle.typeInfo()),
valueBundle.typeInfo(),
valueBundle.wireFormat()));
}
}
13 changes: 6 additions & 7 deletions src/main/java/com/nicta/scoobij/impl/DListImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ public DListImpl(com.nicta.scoobi.DList<A> dl) {
@Override
public <B> DList<B> parallelDo(DoFn<A, B> fun, WireFormatType<B> wf) {
return new DListImpl<B>(getImpl().parallelDo(Conversions.toScala(fun),
Conversions.toManifest(wf.typeInfo()), wf.wireFormat()));
wf.typeInfo(), wf.wireFormat()));
}

@Override
public <V> DListImpl<V> flatMap(FlatMapper<A, V> fm,
WireFormatType<V> bundle) {
return new DListImpl<V>(impl.flatMap(Conversions.toScala(fm),
Conversions.toManifest(bundle.typeInfo()), bundle.wireFormat()));
bundle.typeInfo(), bundle.wireFormat()));
}

@Override
Expand All @@ -55,14 +55,13 @@ public <B, C> DTable<B, C> tableFlatMap(TableFlatMapper<A, B, C> tm,

scala.reflect.Manifest<scala.Tuple2<B, C>> manifest = Conversions
.toManifest(scala.Tuple2.class,
Conversions.toManifest(bundleB.typeInfo()),
Conversions.toManifest(bundleC.typeInfo()));
bundleB.typeInfo(),
bundleC.typeInfo());

return new DTableImpl<B, C>(impl.flatMap(
Conversions.toScala(tm),
manifest,
WireFormats.wireFormatPair(bundleB.wireFormat(),
bundleC.wireFormat())));
WireFormats.wireFormatPair(bundleB, bundleC).wireFormat()));
}

// ++ from scoobi.DList
Expand Down Expand Up @@ -90,7 +89,7 @@ public <B> DList<B> map(Mapper<A, B> mapper, WireFormatType<B> wf) {

return new DListImpl<B>(getImpl().parallelDo(
Conversions.toScalaDoFn(mapper),
Conversions.toManifest(wf.typeInfo()), wf.wireFormat()));
wf.typeInfo(), wf.wireFormat()));
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/nicta/scoobij/impl/DTableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public DGroupedTable<K, V> groupByKey(OrderedWireFormatType<K> keyBundle,
// .conforms();

return new DGroupedTableImpl<K, V>(getImpl().groupByKey(confirms,
Conversions.toManifest(keyBundle.typeInfo()),
keyBundle.typeInfo(),
keyBundle.wireFormat(),
Conversions.toScala(keyBundle.ordering()),
Conversions.toManifest(valueBundle.typeInfo()),
valueBundle.typeInfo(),
valueBundle.wireFormat()));
}
}
6 changes: 4 additions & 2 deletions src/main/java/com/nicta/scoobij/impl/WireFormatImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.nicta.scoobij.impl;

import scala.reflect.Manifest;

import com.nicta.scoobi.WireFormat;
import com.nicta.scoobij.OrderedWireFormatType;
import com.nicta.scoobij.Ordering;
Expand All @@ -28,8 +30,8 @@ public WireFormatImpl(Class<T> c, com.nicta.scoobi.WireFormat<T> wf,
}

@Override
public Class<T> typeInfo() {
return clazz;
public Manifest<T> typeInfo() {
return Conversions.toManifest(clazz);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/nicta/scoobij/io/text/TextInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import com.nicta.scoobij.impl.DListImpl;

public class TextInput {
public static DList<String> fromTextFile(String path) {
public static DList<String> fromTextFile(String... path) {

return new DListImpl<String>(
com.nicta.scoobi.io.text.TextInput$.MODULE$.fromTextFile(path));
com.nicta.scoobi.io.text.TextInput$.MODULE$.fromTextFile(Conversions.toScalaSeq(path)));
}

public static <T> DList<T> extractFromDelimitedTextFile(String seperator,
Expand All @@ -35,7 +35,7 @@ public static <T> DList<T> extractFromDelimitedTextFile(String seperator,
com.nicta.scoobi.io.text.TextInput$.MODULE$
.extractFromDelimitedTextFile(seperator, path,
Conversions.toScala(extractor),
Conversions.toManifest(bundle.typeInfo()),
bundle.typeInfo(),
bundle.wireFormat()));
}
}
6 changes: 2 additions & 4 deletions src/main/java/com/nicta/scoobij/io/text/TextOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
import com.nicta.scoobij.DTable;
import com.nicta.scoobij.WireFormats;
import com.nicta.scoobij.WireFormatType;

public class TextOutput {

// The return type of this should be used with Scoobi.Persist
public static <T> com.nicta.scoobi.DListPersister<T> toTextFile(
DList<T> dl, String path, WireFormatType<T> bundle) {

return com.nicta.scoobi.io.text.TextOutput.toTextFile(dl.getImpl(),
path, bundle.wireFormat());
path, bundle.typeInfo());
}

public static <K, V> com.nicta.scoobi.DListPersister<scala.Tuple2<K, V>> toTextFile(
Expand All @@ -37,8 +36,7 @@ public static <K, V> com.nicta.scoobi.DListPersister<scala.Tuple2<K, V>> toTextF
return com.nicta.scoobi.io.text.TextOutput.toTextFile(
dt.getImpl(),
path,
WireFormats.wireFormatPair(bundleK.wireFormat(),
bundleV.wireFormat()));
WireFormats.wireFormatPair(bundleK, bundleV).typeInfo());
}

}
15 changes: 14 additions & 1 deletion src/main/scala/com/nicta/scoobi/WireFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,22 @@ object WireFormat {
}

/*
* Built-in Hadoop Writable types.
* Hadoop Writable types.
*/
implicit def WritableFmt[T <: Writable : Manifest] = new WireFormat[T] {
def toWire(x: T, out: DataOutput) = x.write(out)
def fromWire(in: DataInput): T = {
val x: T = implicitly[Manifest[T]].erasure.newInstance.asInstanceOf[T]
x.readFields(in)
x
}
def show(x: T) = x.toString
}


/*
* "Primitive" types.
*/
implicit def IntFmt = new WireFormat[Int] {
def toWire(x: Int, out: DataOutput) { out.writeInt(x) }
def fromWire(in: DataInput): Int = in.readInt()
Expand Down
74 changes: 54 additions & 20 deletions src/main/scala/com/nicta/scoobi/impl/exec/BridgeStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,82 @@
*/
package com.nicta.scoobi.impl.exec

import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
import scala.collection.mutable.{Map => MMap}
import org.apache.hadoop.mapreduce.Job

import com.nicta.scoobi.Scoobi
import com.nicta.scoobi.io.DataStore
import com.nicta.scoobi.io.DataSource
import com.nicta.scoobi.io.DataSink
import com.nicta.scoobi.io.InputConverter
import com.nicta.scoobi.io.OutputConverter
import com.nicta.scoobi.io.Helper
import com.nicta.scoobi.impl.plan.AST
import com.nicta.scoobi.impl.util.UniqueInt
import com.nicta.scoobi.impl.rtt.ScoobiWritable
import com.nicta.scoobi.impl.rtt.RuntimeClass


/** A bridge store is any data that moves between MSCRs. It must first be computed, but
* may be removed once all successor MSCRs have consumed it. */
final case class BridgeStore(n: AST.Node[_], val path: Path) extends DataStore(n) with DataSource with DataSink {
final case class BridgeStore[A](n: AST.Node[A])
extends DataStore(n)
with DataSource[NullWritable, ScoobiWritable[A], A]
with DataSink[NullWritable, ScoobiWritable[A], A] {

def inputTypeName = typeName
val inputPath = new Path(path, "ch*")
val inputFormat = classOf[SequenceFileInputFormat[_,_]]
lazy val logger = LogFactory.getLog(this.getClass.getName)

def outputTypeName = typeName
val outputPath = path
val outputFormat = classOf[SequenceFileOutputFormat[_,_]]
private val id = BridgeId.get
private val path = new Path(Scoobi.getWorkingDirectory(Scoobi.conf), "bridges/" + id)
val typeName = "BS" + id

/** Free up the disk space being taken up by this intermediate data. */
def freePath: Unit = {
val fs = outputPath.getFileSystem(Scoobi.conf)
fs.delete(outputPath, true)
/* rtClass will be created at runtime as part of building the MapReduce job. */
var rtClass: Option[RuntimeClass] = None


/* Output (i.e. input to bridge) */
val outputFormat = classOf[SequenceFileOutputFormat[NullWritable, ScoobiWritable[A]]]
val outputKeyClass = classOf[NullWritable]
def outputValueClass = rtClass.orNull.clazz.asInstanceOf[Class[ScoobiWritable[A]]]
def outputCheck() = {}
def outputConfigure(job: Job) = FileOutputFormat.setOutputPath(job, path)
val outputConverter = new BridgeOutputConverter[A](typeName)


/* Input (i.e. output of bridge) */
val inputFormat = classOf[SequenceFileInputFormat[NullWritable, ScoobiWritable[A]]]
def inputCheck() = {}
def inputConfigure(job: Job) = FileInputFormat.addInputPath(job, new Path(path, "ch*"))
def inputSize(): Long = Helper.pathSize(new Path(path, "ch*"))
val inputConverter = new InputConverter[NullWritable, ScoobiWritable[A], A] {
def fromKeyValue(key: NullWritable, value: ScoobiWritable[A]): A = value.get
}
}

/** Companion object for automating the creation of random temporary paths as the
* location for bridge stores. */
object BridgeStore {

private object TmpId extends UniqueInt
/* Free up the disk space being taken up by this intermediate data. */
def freePath: Unit = {
val fs = path.getFileSystem(Scoobi.conf)
fs.delete(path, true)
}
}

def apply(node: AST.Node[_]): BridgeStore = {
val tmpPath = new Path(Scoobi.getWorkingDirectory(Scoobi.conf), "bridges/" + TmpId.get.toString)
BridgeStore(node, tmpPath)
/** OutputConverter for a bridges. The expectation is that by the time toKeyValue is called,
* the Class for 'value' will exist and be known by the ClassLoader. */
class BridgeOutputConverter[A](typeName: String) extends OutputConverter[NullWritable, ScoobiWritable[A], A] {
lazy val value: ScoobiWritable[A] = Class.forName(typeName).newInstance.asInstanceOf[ScoobiWritable[A]]
def toKeyValue(x: A): (NullWritable, ScoobiWritable[A]) = {
value.set(x)
(NullWritable.get, value)
}
}

object BridgeId extends UniqueInt
Loading

0 comments on commit 616096b

Please sign in to comment.