Skip to content

Commit

Permalink
[scala] [streaming] Added implicit conversions from java to scala str…
Browse files Browse the repository at this point in the history
…eams
  • Loading branch information
gyfora committed Jan 3, 2015
1 parent fac7734 commit d4ec009
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ import org.apache.flink.streaming.api.function.aggregation.SumFunction
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
import com.amazonaws.services.cloudfront_2012_03_15.model.InvalidArgumentException
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.streaming.StreamingConversions._

class DataStream[T](javaStream: JavaStream[T]) {

Expand All @@ -62,7 +62,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Sets the degree of parallelism of this operation. This must be greater than 1.
*/
def setParallelism(dop: Int) = {
def setParallelism(dop: Int): DataStream[T] = {
javaStream match {
case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
case _ =>
Expand Down Expand Up @@ -91,23 +91,22 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def merge(dataStreams: DataStream[T]*): DataStream[T] =
new DataStream[T](javaStream.merge(dataStreams.map(_.getJavaStream): _*))
javaStream.merge(dataStreams.map(_.getJavaStream): _*)

/**
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations
*
*/
def groupBy(fields: Int*): DataStream[T] =
new DataStream[T](javaStream.groupBy(fields: _*))
def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)

/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations
*
*/
def groupBy(firstField: String, otherFields: String*): DataStream[T] =
new DataStream[T](javaStream.groupBy(firstField +: otherFields.toArray: _*))
javaStream.groupBy(firstField +: otherFields.toArray: _*)

/**
* Groups the elements of a DataStream by the given K key to
Expand All @@ -120,7 +119,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
val cleanFun = clean(fun)
def getKey(in: T) = cleanFun(in)
}
new DataStream[T](javaStream.groupBy(keyExtractor))
javaStream.groupBy(keyExtractor)
}

/**
Expand All @@ -130,7 +129,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def partitionBy(fields: Int*): DataStream[T] =
new DataStream[T](javaStream.partitionBy(fields: _*));
javaStream.partitionBy(fields: _*)

/**
* Sets the partitioning of the DataStream so that the output is
Expand All @@ -139,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
new DataStream[T](javaStream.partitionBy(firstField +: otherFields.toArray: _*))
javaStream.partitionBy(firstField +: otherFields.toArray: _*)

/**
* Sets the partitioning of the DataStream so that the output is
Expand All @@ -153,7 +152,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
val cleanFun = clean(fun)
def getKey(in: T) = cleanFun(in)
}
new DataStream[T](javaStream.partitionBy(keyExtractor))
javaStream.partitionBy(keyExtractor)
}

/**
Expand All @@ -163,7 +162,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* parallel instances of the next processing operator.
*
*/
def broadcast: DataStream[T] = new DataStream[T](javaStream.broadcast())
def broadcast: DataStream[T] = javaStream.broadcast()

/**
* Sets the partitioning of the DataStream so that the output tuples
Expand All @@ -172,7 +171,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* processing operator.
*
*/
def shuffle: DataStream[T] = new DataStream[T](javaStream.shuffle())
def shuffle: DataStream[T] = javaStream.shuffle()

/**
* Sets the partitioning of the DataStream so that the output tuples
Expand All @@ -182,7 +181,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* instances of the next processing operator.
*
*/
def forward: DataStream[T] = new DataStream[T](javaStream.forward())
def forward: DataStream[T] = javaStream.forward()

/**
* Sets the partitioning of the DataStream so that the output tuples
Expand All @@ -191,7 +190,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* the next processing operator.
*
*/
def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
def distribute: DataStream[T] = javaStream.distribute()

/**
* Initiates an iterative part of the program that creates a loop by feeding
Expand All @@ -217,7 +216,7 @@ class DataStream[T](javaStream: JavaStream[T]) {

val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
iterativeStream.closeWith(feedback.getJavaStream)
new DataStream[T](output.getJavaStream)
output
}

/**
Expand Down Expand Up @@ -301,8 +300,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
def map(in: T): R = cleanFun(in)
}

new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
new MapInvokable[T, R](mapper)))
javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
}

/**
Expand All @@ -313,8 +311,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
throw new NullPointerException("Map function must not be null.")
}

new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
new MapInvokable[T, R](mapper)))
javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
}

/**
Expand All @@ -325,8 +322,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]],
new FlatMapInvokable[T, R](flatMapper)))
javaStream.transform("flatMap", implicitly[TypeInformation[R]],
new FlatMapInvokable[T, R](flatMapper))
}

/**
Expand Down Expand Up @@ -368,10 +365,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
throw new NullPointerException("Reduce function must not be null.")
}
javaStream match {
case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce",
javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(),
new StreamReduceInvokable[T](reducer)))
case ds: GroupedDataStream[_] => javaStream.transform("reduce",
javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))
case _ => javaStream.transform("reduce", javaStream.getType(),
new StreamReduceInvokable[T](reducer))
}
}

Expand All @@ -397,7 +394,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (filter == null) {
throw new NullPointerException("Filter function must not be null.")
}
new DataStream[T](javaStream.filter(filter))
javaStream.filter(filter)
}

/**
Expand Down Expand Up @@ -426,7 +423,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* window(List(triggers), List(evicters))
*/
def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
new WindowedDataStream[T](javaStream.window(windowingHelper: _*))
javaStream.window(windowingHelper: _*)

/**
* Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
Expand All @@ -436,7 +433,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]):
WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters))
WindowedDataStream[T] = javaStream.window(triggers, evicters)

/**
*
Expand All @@ -445,7 +442,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* SplitDataStream.
*/
def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream match {
case op: SingleOutputStreamOperator[_, _] => new SplitDataStream[T](op.split(selector))
case op: SingleOutputStreamOperator[_, _] => op.split(selector)
case _ =>
throw new UnsupportedOperationException("Operator " + javaStream.toString + " can not be " +
"split.")
Expand Down Expand Up @@ -503,7 +500,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* written.
*
*/
def print(): DataStream[T] = new DataStream[T](javaStream.print())
def print(): DataStream[T] = javaStream.print()

/**
* Writes a DataStream to the file specified by path in text format. The
Expand All @@ -513,7 +510,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def writeAsText(path: String, millis: Long = 0): DataStream[T] =
new DataStream[T](javaStream.writeAsText(path, millis))
javaStream.writeAsText(path, millis)

/**
* Writes a DataStream to the file specified by path in text format. The
Expand All @@ -523,7 +520,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
new DataStream[T](javaStream.writeAsCsv(path, millis))
javaStream.writeAsCsv(path, millis)

/**
* Adds the given sink to this DataStream. Only streams with sinks added
Expand All @@ -532,7 +529,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
new DataStream[T](javaStream.addSink(sinkFuntion))
javaStream.addSink(sinkFuntion)

/**
* Adds the given sink to this DataStream. Only streams with sinks added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.api.scala.streaming

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
import org.apache.flink.api.scala.streaming.StreamingConversions._

/**
* The SplitDataStream represents an operator that has been split using an
Expand All @@ -39,12 +40,11 @@ class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
/**
* Sets the output names for which the next operator will receive values.
*/
def select(outputNames: String*): DataStream[T] =
new DataStream[T](javaStream.select(outputNames: _*))
def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)

/**
* Selects all output names from a split data stream.
*/
def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
def selectAll(): DataStream[T] = javaStream.selectAll()

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.flink.api.common.functions.CrossFunction
import org.apache.flink.api.scala.typeutils.CaseClassSerializer
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
import org.apache.flink.api.scala.streaming.StreamingConversions._

class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
Expand Down Expand Up @@ -88,7 +89,7 @@ object StreamCrossOperator {
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)

new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
javaStream.setType(implicitly[TypeInformation[R]])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.function.source.FromElementsFunction
import org.apache.flink.streaming.api.function.source.SourceFunction
import scala.collection.JavaConversions._
import org.apache.flink.util.Collector
import org.apache.flink.api.scala.streaming.StreamingConversions._

class StreamExecutionEnvironment(javaEnv: JavaEnv) {

Expand Down Expand Up @@ -82,7 +83,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*
*/
def readTextFile(filePath: String): DataStream[String] =
new DataStream[String](javaEnv.readTextFile(filePath))
javaEnv.readTextFile(filePath)

/**
* Creates a DataStream that represents the Strings produced by reading the
Expand All @@ -91,8 +92,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* testing a topology.
*
*/
def readTextStream(StreamPath: String): DataStream[String] =
new DataStream[String](javaEnv.readTextStream(StreamPath))
def readTextStream(StreamPath: String): DataStream[String] =
javaEnv.readTextStream(StreamPath)

/**
* Creates a new DataStream that contains the strings received infinitely
Expand All @@ -101,7 +102,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*
*/
def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
new DataStream[String](javaEnv.socketTextStream(hostname, port, delimiter))
javaEnv.socketTextStream(hostname, port, delimiter)

/**
* Creates a new DataStream that contains the strings received infinitely
Expand All @@ -110,7 +111,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*
*/
def socketTextStream(hostname: String, port: Int): DataStream[String] =
new DataStream[String](javaEnv.socketTextStream(hostname, port))
javaEnv.socketTextStream(hostname, port)

/**
* Creates a new DataStream that contains a sequence of numbers.
Expand Down Expand Up @@ -151,7 +152,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions
.asJavaCollection(data))), null, typeInfo,
"source", 1);
new DataStream(returnStream)
returnStream
}

/**
Expand All @@ -163,7 +164,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
Validate.notNull(function, "Function must not be null.")
val cleanFun = StreamExecutionEnvironment.clean(function)
val typeInfo = implicitly[TypeInformation[T]]
new DataStream[T](javaEnv.addSource(cleanFun, typeInfo))
javaEnv.addSource(cleanFun, typeInfo)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
import org.apache.flink.streaming.util.keys.KeySelectorUtil
import org.apache.flink.api.java.operators.Keys
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
import org.apache.flink.api.scala.streaming.StreamingConversions._

class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
Expand Down Expand Up @@ -178,7 +179,7 @@ object StreamJoinOperator {
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)

new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
javaStream.setType(implicitly[TypeInformation[R]])
}
}

Expand Down
Loading

0 comments on commit d4ec009

Please sign in to comment.