Skip to content

Commit

Permalink
STORM-2258: Implementation of CoGroupByKey
Browse files Browse the repository at this point in the history
  • Loading branch information
kamleshbhatt authored and arunmahadevan committed Aug 23, 2017
1 parent 12e3f91 commit 58baf80
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 0 deletions.
15 changes: 15 additions & 0 deletions docs/Stream-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ PairStream<String, Double> scores = ...
// list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20])
PairStream<String, Iterable<Integer>> userScores = scores.window(...).groupByKey();
```

### <a name="coGroupbykey"></a> coGroupByKey

`coGroupByKey` Groups the values of this stream with the values having the same key from the other stream.

```java
// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3)
PairStream<String, String> stream1 = ...

// another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3)
PairStream<String, String> stream2 = ...

// the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))
PairStream<String, Iterable<String>> userScores = stream1.window(...).coGroupByKey(stream2);
```

### <a name="countbykey"></a> countByKey
`countByKey` counts the values for each key of this stream.
Expand Down
34 changes: 34 additions & 0 deletions storm-client/src/jvm/org/apache/storm/streams/PairStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.storm.streams.processors.MergeAggregateByKeyProcessor;
import org.apache.storm.streams.processors.ReduceByKeyProcessor;
import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
import org.apache.storm.streams.processors.CoGroupByKeyProcessor;
import org.apache.storm.streams.windowing.Window;
import org.apache.storm.tuple.Fields;

Expand All @@ -45,6 +46,7 @@
import java.util.Optional;
import java.util.Set;


/**
* Represents a stream of key-value pairs.
*
Expand Down Expand Up @@ -380,6 +382,27 @@ public <R> StreamState<K, R> updateStateByKey(StateUpdater<? super V, ? extends
return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
}

/**
* Groups the values of this stream with the values having the same key from
* the other stream.
* If stream1 has values - (k1, v1), (k2, v2), (k2, v3)
* and stream2 has values - (k1, x1), (k1, x2), (k3, x3)
* The the co-grouped stream would contain -
* (k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3]))
* <p>
* Note: The parallelism of this stream is carried forward to the co-grouped stream.
* </p>
*
* @param otherStream the other stream
* @param <V1> the type of the values in the other stream
* @return the new stream
*/
public <V1> PairStream<K, Pair<Iterable<V>, Iterable<V1>>> coGroupByKey(PairStream<K, V1> otherStream) {
return partitionByKey().coGroupByKeyPartition(otherStream);
}



private <R> StreamState<K, R> updateStateByKeyPartition(StateUpdater<? super V, ? extends R> stateUpdater) {
return new StreamState<>(
new PairStream<>(streamBuilder,
Expand All @@ -400,6 +423,17 @@ private <R, V1> PairStream<K, R> joinPartition(PairStream<K, V1> otherStream,
return new PairStream<>(streamBuilder, joinNode);
}

private <R, V1> PairStream<K,R> coGroupByKeyPartition(PairStream<K, V1> otherStream) {
String firstStream = stream;
String secondStream = otherStream.stream;
Node coGroupNode = addProcessorNode(
new CoGroupByKeyProcessor<>(firstStream, secondStream),
KEY_VALUE,
true);
addNode(otherStream.getNode(), coGroupNode, coGroupNode.getParallelism());
return new PairStream<>(streamBuilder, coGroupNode);
}

private PairStream<K, V> partitionByKey() {
return shouldPartitionByKey() ? partitionBy(KEY) : this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.streams.processors;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.apache.storm.streams.Pair;

import java.util.ArrayList;

/**
* co-group by key implementation
*/
public class CoGroupByKeyProcessor<K,V1, V2> extends BaseProcessor<Pair<K, ?>> implements BatchProcessor {
private final String firstStream;
private final String secondStream;
private final Multimap<K, V1> firstMap = ArrayListMultimap.create();
private final Multimap<K, V2> secondMap = ArrayListMultimap.create();


public CoGroupByKeyProcessor(String firstStream, String secondStream) {
this.firstStream = firstStream;
this.secondStream = secondStream;
}

@Override
public void execute(Pair<K, ?> input, String sourceStream) {
K key = input.getFirst();
if (sourceStream.equals(firstStream)) {
V1 val = (V1) input.getSecond();
firstMap.put(key, val);
} else if (sourceStream.equals(secondStream)) {
V2 val = (V2) input.getSecond();
secondMap.put(key, val);
}
if (!context.isWindowed()) {
forwardValues();
}

}

@Override
public void finish() {
forwardValues();
firstMap.clear();
secondMap.clear();
}

private void forwardValues() {
firstMap.asMap().forEach((key, values) -> {
context.forward(Pair.of(key, Pair.of(new ArrayList<>(values), secondMap.removeAll(key))));
});

secondMap.asMap().forEach((key, values) -> {
context.forward(Pair.of(key, Pair.of(firstMap.removeAll(key), new ArrayList<>(values))));
});

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.streams.processors;

import org.apache.curator.shaded.com.google.common.collect.ImmutableBiMap;
import org.apache.curator.shaded.com.google.common.collect.ImmutableMultimap;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.PairValueJoiner;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Collection;
import java.util.Arrays;


import static org.junit.Assert.assertEquals;

public class CoGroupByKeyProcessorTest {
private CoGroupByKeyProcessor<Integer, Integer, Integer> coGroupByKeyProcessor;
private String firstStream = "first";
private String secondStream = "second";
private List<Pair<Integer, Pair<List<Integer>, List<Integer>>>> res = new ArrayList<>();

private ProcessorContext context = new ProcessorContext() {
@Override
public <T> void forward(T input) {
res.add((Pair<Integer, Pair<List<Integer>, List<Integer>>>)input);
}

@Override
public <T> void forward(T input, String stream) {
}

@Override
public boolean isWindowed() {
return true;
}

@Override
public Set<String> getWindowedParentStreams() {
return null;
}
};

private List<Pair<Integer, Integer>> firstKeyValues = Arrays.asList(
Pair.of(2, 4),
Pair.of(5, 25),
Pair.of(7, 49),
Pair.of(7, 87)
);

private List<Pair<Integer, Integer>> secondKeyValues = Arrays.asList(
Pair.of(1, 1),
Pair.of(2, 8),
Pair.of(5, 125),
Pair.of(5,50),
Pair.of(6, 216)

);

@Test
public void testCoGroupByKey() throws Exception {
coGroupByKeyProcessor = new CoGroupByKeyProcessor<>(firstStream, secondStream);
processValues();
List<Pair<Integer, Pair<Collection<Integer>, Collection<Integer>>>> expected = new ArrayList<>();
Collection<Integer> list1 = new ArrayList<>();
list1.add(25);
Collection<Integer> list2 = new ArrayList<>();
list2.add(125);
list2.add(50);
expected.add(Pair.of(5, Pair.of(list1, list2)));
assertEquals(expected.get(0), res.get(1));
list1.clear();
list2.clear();
list1.add(49);
list1.add(87);
expected.clear();
expected.add(Pair.of(7, Pair.of(list1, list2)));
assertEquals(expected.get(0), res.get(2));
}


private void processValues() {
res.clear();
coGroupByKeyProcessor.init(context);
for (Pair<Integer, Integer> kv : firstKeyValues) {
coGroupByKeyProcessor.execute(kv, firstStream);
}
for (Pair<Integer, Integer> kv : secondKeyValues) {
coGroupByKeyProcessor.execute(kv, secondStream);
}
coGroupByKeyProcessor.finish();
}

}

0 comments on commit 58baf80

Please sign in to comment.