Skip to content

Commit b263932

Browse files
committedJan 27, 2015
[streaming] Refactor iterative datastream for clear self-contained functionality
1 parent 517289d commit b263932

File tree

3 files changed

+36
-35
lines changed

3 files changed

+36
-35
lines changed
 

‎flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java

+5-17
Original file line numberDiff line numberDiff line change
@@ -212,20 +212,23 @@ public void addIterationHead(String vertexName, String iterationHead, Integer it
212212
* Max waiting time for next record
213213
*/
214214
public void addIterationTail(String vertexName, String iterationTail, Integer iterationID,
215-
int parallelism, long waitTime) {
215+
long waitTime) {
216216

217217
if (bufferTimeouts.get(iterationTail) == 0) {
218218
throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
219219
}
220220

221-
addVertex(vertexName, StreamIterationTail.class, null, null, parallelism);
221+
addVertex(vertexName, StreamIterationTail.class, null, null, getParallelism(iterationTail));
222222

223223
iterationIds.put(vertexName, iterationID);
224224
iterationIDtoTailName.put(iterationID, vertexName);
225225

226226
setSerializersFrom(iterationTail, vertexName);
227227
iterationTimeouts.put(iterationIDtoTailName.get(iterationID), waitTime);
228228

229+
setParallelism(iterationIDtoHeadName.get(iterationID), getParallelism(iterationTail));
230+
setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail));
231+
229232
if (LOG.isDebugEnabled()) {
230233
LOG.debug("ITERATION SINK: {}", vertexName);
231234
}
@@ -364,21 +367,6 @@ public void addOperatorState(String veretxName, String stateName, OperatorState<
364367
operatorStates.put(veretxName, states);
365368
}
366369

367-
/**
368-
* Sets the parallelism and buffertimeout of the iteration head of the given
369-
* iteration id to the parallelism given.
370-
*
371-
* @param iterationID
372-
* ID of the iteration
373-
* @param iterationTail
374-
* ID of the iteration tail
375-
*/
376-
public void setIterationSourceSettings(String iterationID, String iterationTail) {
377-
setParallelism(iterationIDtoHeadName.get(iterationID),
378-
operatorParallelisms.get(iterationTail));
379-
setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail));
380-
}
381-
382370
/**
383371
* Sets a user defined {@link OutputSelector} for the given operator. Used
384372
* for directed emits.

‎flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java

-15
Original file line numberDiff line numberDiff line change
@@ -1095,16 +1095,6 @@ private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
10951095
return returnStream;
10961096
}
10971097

1098-
protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) {
1099-
1100-
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null,
1101-
null, true);
1102-
1103-
streamGraph.addIterationHead(returnStream.getId(), this.getId(), iterationID,
1104-
degreeOfParallelism, waitTime);
1105-
1106-
return this.copy();
1107-
}
11081098

11091099
/**
11101100
* Method for passing user defined invokables along with the type
@@ -1132,11 +1122,6 @@ protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitT
11321122

11331123
connectGraph(inputStream, returnStream.getId(), 0);
11341124

1135-
if (inputStream instanceof IterativeDataStream) {
1136-
IterativeDataStream<OUT> iterativeStream = (IterativeDataStream<OUT>) inputStream;
1137-
returnStream.addIterationSource(iterativeStream.iterationID, iterativeStream.waitTime);
1138-
}
1139-
11401125
return returnStream;
11411126
}
11421127

‎flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java

+31-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.flink.streaming.api.datastream;
1919

20+
import org.apache.flink.api.common.typeinfo.TypeInformation;
21+
import org.apache.flink.streaming.api.invokable.StreamInvokable;
22+
2023
/**
2124
* The iterative data stream represents the start of an iteration in a
2225
* {@link DataStream}.
@@ -61,17 +64,42 @@ protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID, lo
6164
*
6265
*/
6366
public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
64-
DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "iterationSink", null,
67+
DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null,
6568
null);
6669

70+
// We add an iteration sink to the tail which will send tuples to the
71+
// iteration head
6772
streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
68-
iterationTail.getParallelism(), waitTime);
73+
waitTime);
6974

70-
streamGraph.setIterationSourceSettings(iterationID.toString(), iterationTail.getId());
7175
connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
7276
return iterationTail;
7377
}
7478

79+
@Override
80+
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
81+
TypeInformation<R> outTypeInfo, StreamInvokable<IN, R> invokable) {
82+
83+
// We call the superclass tranform method
84+
SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,
85+
invokable);
86+
87+
// Then we add a source that will take care of receiving feedback tuples
88+
// from the tail
89+
addIterationSource(returnStream);
90+
91+
return returnStream;
92+
}
93+
94+
private <X> void addIterationSource(DataStream<X> dataStream) {
95+
96+
DataStream<X> iterationSource = new DataStreamSource<X>(environment, "Iteration Source",
97+
null, null, true);
98+
99+
streamGraph.addIterationHead(iterationSource.getId(), dataStream.getId(), iterationID,
100+
dataStream.getParallelism(), waitTime);
101+
}
102+
75103
@Override
76104
public IterativeDataStream<IN> copy() {
77105
return new IterativeDataStream<IN>(this, iterationID, waitTime);

0 commit comments

Comments
 (0)
Please sign in to comment.