Skip to content

Commit

Permalink
Updates for 141206 SDK release
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Wills committed Dec 9, 2014
1 parent 695104c commit fdad2a5
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 126 deletions.
37 changes: 10 additions & 27 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,16 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<name>Cloud Dataflow Examples</name>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>examples</artifactId>
<version>1</version>
<name>Dataflow on Spark</name>
<groupId>com.cloudera.dataflow.spark</groupId>
<artifactId>dataflow-spark</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<repositories>
<repository>
<id>dataflow-snapshot</id>
<url>file://${project.basedir}/jars</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>

<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -89,11 +75,15 @@
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>dataflow-sdk</artifactId>
<version>0.0.1</version>
<version>1.0.141206</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
Expand Down Expand Up @@ -131,13 +121,6 @@
<version>2.4.2</version>
</dependency>

<dependency>
<!-- Command line parsing library. -->
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
Expand Down
41 changes: 28 additions & 13 deletions src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
*/
package com.cloudera.dataflow.spark;

import com.google.cloud.dataflow.sdk.runners.PipelineOptions;
import com.google.cloud.dataflow.sdk.streaming.KeyedState;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.joda.time.Instant;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -50,12 +52,12 @@ public DoFnFunction(
@Override
public Iterable<O> call(Iterator<I> iter) throws Exception {
ProcCtxt<I, O> ctxt = new ProcCtxt(fn);
fn.startBatch(ctxt);
fn.startBundle(ctxt);
while (iter.hasNext()) {
ctxt.element = iter.next();
fn.processElement(ctxt);
}
fn.finishBatch(ctxt);
fn.finishBundle(ctxt);
return ctxt.outputs;
}

Expand All @@ -73,14 +75,19 @@ public PipelineOptions getPipelineOptions() {
return runtimeContext.getPipelineOptions();
}

@Override
public <T> T sideInput(PCollectionView<T, ?> view) {
return (T) sideInputs.get(view.getTagInternal()).getValue();
}

@Override
public synchronized void output(O o) {
outputs.add(o);
}

@Override
public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
// A no-op if we don't know about it ahead of time
// A no-op in this context; maybe add some logging
}

@Override
Expand All @@ -97,20 +104,28 @@ public <AI, AO> Aggregator<AI> createAggregator(
return runtimeContext.createAggregator(named, sfunc);
}

@Override
public <T> T sideInput(TupleTag<T> tag) {
BroadcastHelper<T> bh = (BroadcastHelper<T>) sideInputs.get(tag);
return bh == null ? null : bh.getValue();
}

@Override
public I element() {
return element;
}

@Override
public KeyedState keyedState() {
public DoFn.KeyedState keyedState() {
throw new UnsupportedOperationException();
}

@Override
public void outputWithTimestamp(O output, Instant timestamp) {
}

@Override
public Instant timestamp() {
return null;
}

@Override
public Collection<? extends BoundedWindow> windows() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ void setOutputRDD(PTransform transform, JavaRDDLike rdd) {
rdds.put((PValue) getOutput(transform), rdd);
}

void setPObjectValue(PObject pobj, Object value) {
pobjects.put(pobj, value);
}
void setPObjectValue(PObject pobject, Object value) {
pobjects.put(pobject, value);
}

JavaRDDLike getRDD(PValue pvalue) {
JavaRDDLike rdd = rdds.get(pvalue);
Expand Down Expand Up @@ -135,7 +135,7 @@ PObjectValueTuple getPObjectTuple(PTransform transform) {
void setPObjectTuple(PTransform transform, PObjectValueTuple outputValues) {
PObjectTuple pot = (PObjectTuple) pipeline.getOutput(transform);
for (Map.Entry<TupleTag<?>, PObject<?>> e : pot.getAll().entrySet()) {
setPObjectValue(e.getValue(), outputValues.get(e.getKey()));
pobjects.put(e.getValue(), outputValues.get(e.getKey()));
}
}
}
38 changes: 27 additions & 11 deletions src/main/java/com/cloudera/dataflow/spark/MultiDoFnFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
*/
package com.cloudera.dataflow.spark;

import com.google.cloud.dataflow.sdk.runners.PipelineOptions;
import com.google.cloud.dataflow.sdk.streaming.KeyedState;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.joda.time.Instant;
import scala.Tuple2;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

Expand All @@ -55,12 +58,12 @@ public MultiDoFnFunction(
@Override
public Iterable<Tuple2<TupleTag<?>, Object>> call(Iterator<I> iter) throws Exception {
ProcCtxt<I, O> ctxt = new ProcCtxt(fn);
fn.startBatch(ctxt);
fn.startBundle(ctxt);
while (iter.hasNext()) {
ctxt.element = iter.next();
fn.processElement(ctxt);
}
fn.finishBatch(ctxt);
fn.finishBundle(ctxt);
return Iterables.transform(ctxt.outputs.entries(),
new Function<Map.Entry<TupleTag<?>, Object>, Tuple2<TupleTag<?>, Object>>() {
public Tuple2<TupleTag<?>, Object> apply(Map.Entry<TupleTag<?>, Object> input) {
Expand All @@ -83,6 +86,11 @@ public PipelineOptions getPipelineOptions() {
return runtimeContext.getPipelineOptions();
}

@Override
public <T> T sideInput(PCollectionView<T, ?> view) {
return (T) sideInputs.get(view.getTagInternal()).getValue();
}

@Override
public synchronized void output(O o) {
outputs.put(mainOutputTag, o);
Expand All @@ -107,20 +115,28 @@ public <AI, AO> Aggregator<AI> createAggregator(
return runtimeContext.createAggregator(named, sfunc);
}

@Override
public <T> T sideInput(TupleTag<T> tag) {
BroadcastHelper<T> bh = (BroadcastHelper<T>) sideInputs.get(tag);
return bh == null ? null : bh.getValue();
}

@Override
public I element() {
return element;
}

@Override
public KeyedState keyedState() {
public DoFn.KeyedState keyedState() {
throw new UnsupportedOperationException();
}

@Override
public void outputWithTimestamp(O output, Instant timestamp) {
}

@Override
public Instant timestamp() {
return null;
}

@Override
public Collection<? extends BoundedWindow> windows() {
return null;
}
}
}
Loading

0 comments on commit fdad2a5

Please sign in to comment.