Skip to content

Commit

Permalink
[hotfix] Make IterableUtils.flatMap return Iterable rather than Iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing authored and zhuzhurk committed Mar 26, 2021
1 parent 94ce6f9 commit 75f6cc9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 31 deletions.
47 changes: 24 additions & 23 deletions flink-core/src/main/java/org/apache/flink/util/IterableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,33 +60,34 @@ public static <E> Stream<E> toStream(Iterable<E> iterable) {
* Iterable}
*/
@Internal
public static <K, V, G extends Iterable<K>> Iterator<V> flatMap(
public static <K, V, G extends Iterable<K>> Iterable<V> flatMap(
Iterable<G> itemGroups, Function<K, V> mapper) {
return new Iterator<V>() {
private final Iterator<G> groupIterator = itemGroups.iterator();
private Iterator<K> itemIterator;
return () ->
new Iterator<V>() {
private final Iterator<G> groupIterator = itemGroups.iterator();
private Iterator<K> itemIterator;

@Override
public boolean hasNext() {
while (itemIterator == null || !itemIterator.hasNext()) {
if (!groupIterator.hasNext()) {
return false;
} else {
itemIterator = groupIterator.next().iterator();
@Override
public boolean hasNext() {
while (itemIterator == null || !itemIterator.hasNext()) {
if (!groupIterator.hasNext()) {
return false;
} else {
itemIterator = groupIterator.next().iterator();
}
}
return true;
}
}
return true;
}

@Override
public V next() {
if (hasNext()) {
return mapper.apply(itemIterator.next());
} else {
throw new NoSuchElementException();
}
}
};
@Override
public V next() {
if (hasNext()) {
return mapper.apply(itemIterator.next());
} else {
throw new NoSuchElementException();
}
}
};
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.util.IterableUtils;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.flink.util.IterableUtils.flatMap;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Default implementation of {@link SchedulingExecutionVertex}. */
Expand Down Expand Up @@ -80,7 +80,7 @@ public ExecutionState getState() {

@Override
public Iterable<DefaultResultPartition> getConsumedResults() {
return () -> flatMap(consumedPartitionGroups, resultPartitionRetriever);
return IterableUtils.flatMap(consumedPartitionGroups, resultPartitionRetriever);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.util.IterableUtils;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.flink.util.IterableUtils.flatMap;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Default implementation of {@link SchedulingResultPartition}. */
Expand Down Expand Up @@ -108,7 +108,7 @@ public DefaultExecutionVertex getProducer() {

@Override
public Iterable<DefaultExecutionVertex> getConsumers() {
return () -> flatMap(consumerVertexGroups, executionVertexRetriever);
return IterableUtils.flatMap(consumerVertexGroups, executionVertexRetriever);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.IterableUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.util.IterableUtils.flatMap;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A simple scheduling execution vertex for testing purposes. */
Expand Down Expand Up @@ -76,7 +76,7 @@ public void setState(ExecutionState state) {

@Override
public Iterable<TestingSchedulingResultPartition> getConsumedResults() {
return () -> flatMap(consumedPartitionGroups, resultPartitionsById::get);
return IterableUtils.flatMap(consumedPartitionGroups, resultPartitionsById::get);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.util.IterableUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.util.IterableUtils.flatMap;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A simple implementation of {@link SchedulingResultPartition} for testing. */
Expand Down Expand Up @@ -84,7 +84,7 @@ public TestingSchedulingExecutionVertex getProducer() {

@Override
public Iterable<TestingSchedulingExecutionVertex> getConsumers() {
return () -> flatMap(consumerVertexGroups, executionVerticesById::get);
return IterableUtils.flatMap(consumerVertexGroups, executionVerticesById::get);
}

@Override
Expand Down

0 comments on commit 75f6cc9

Please sign in to comment.