diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index 038b58cff64f7..2edc53339b81b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -20,7 +20,9 @@ import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; +import org.apache.flink.api.common.accumulators.SimpleAccumulator; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; @@ -158,6 +160,97 @@ public void close() { } } + public static class ChecksumHashCode implements SimpleAccumulator { + + private static final long serialVersionUID = 1L; + + private long count; + private long checksum; + + public ChecksumHashCode() {} + + public ChecksumHashCode(long count, long checksum) { + this.count = count; + this.checksum = checksum; + } + + public long getCount() { + return count; + } + + public long getChecksum() { + return checksum; + } + + @Override + public void add(ChecksumHashCode value) { + this.count += value.count; + this.checksum += value.checksum; + } + + @Override + public ChecksumHashCode getLocalValue() { + return this; + } + + @Override + public void resetLocal() { + this.count = 0; + this.checksum = 0; + } + + @Override + public void merge(Accumulator other) { + this.add(other.getLocalValue()); + } + + @Override + public ChecksumHashCode clone() { + return new ChecksumHashCode(count, checksum); + } + + @Override + public String toString() { + return "ChecksumHashCode " + this.checksum + ", count " + this.count; + } + } + + @SkipCodeAnalysis + public static class ChecksumHashCodeHelper extends RichOutputFormat { + + private static final long serialVersionUID = 1L; + + private final String id; + private long counter; + private long checksum; + + public ChecksumHashCodeHelper(String id) { + this.id = id; + this.counter = 0L; + this.checksum = 0L; + } + + @Override + public void configure(Configuration parameters) {} + + @Override + public void open(int taskNumber, int numTasks) {} + + @Override + public void writeRecord(T record) throws IOException { + counter++; + // convert 32-bit integer to non-negative long + checksum += record.hashCode() & 0xffffffffL; + } + + @Override + public void close() throws IOException { + ChecksumHashCode update = new ChecksumHashCode(counter, checksum); + getRuntimeContext().addAccumulator(id, update); + } + } + + // -------------------------------------------------------------------------------------------- /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java index 9a1e95253c9cc..01d801e6f0d2f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.utils; import com.google.common.collect.Lists; +import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.java.DataSet; @@ -30,6 +31,7 @@ import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.AbstractID; import org.apache.flink.util.Collector; import java.util.Collections; @@ -246,6 +248,25 @@ public static DataSet sampleWithSize( return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation); } + // -------------------------------------------------------------------------------------------- + // Checksum + // -------------------------------------------------------------------------------------------- + + /** + * Convenience method to get the count (number of elements) of a DataSet + * as well as the checksum (sum over element hashes). + * + * @return A ChecksumHashCode that represents the count and checksum of elements in the data set. + */ + public static Utils.ChecksumHashCode checksumHashCode(DataSet input) throws Exception { + final String id = new AbstractID().toString(); + + input.output(new Utils.ChecksumHashCodeHelper(id)).name("ChecksumHashCode"); + + JobExecutionResult res = input.getExecutionEnvironment().execute(); + return res. getAccumulatorResult(id); + } + // ************************************************************************* // UTIL METHODS diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala new file mode 100644 index 0000000000000..954e6c3901d1a --- /dev/null +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.Utils.ChecksumHashCode +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ + +import scala.reflect.ClassTag + +package object utils { + /** + * This class provides utility methods for computing checksums over a Graph. + * + * @param self Graph + */ + implicit class GraphUtils[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: + TypeInformation : ClassTag](val self: Graph[K, VV, EV]) { + + /** + * Computes the ChecksumHashCode over the Graph. + * + * @return the ChecksumHashCode over the vertices and edges. + */ + @throws(classOf[Exception]) + def checksumHashCode(): ChecksumHashCode = { + val checksum: ChecksumHashCode = self.getVertices.checksumHashCode() + checksum.add(self.getEdges checksumHashCode()) + checksum + } + } + +} diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala new file mode 100644 index 0000000000000..c6d3c58e5863c --- /dev/null +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.scala.test.util + +import org.apache.flink.api.scala._ +import org.apache.flink.graph.scala._ +import org.apache.flink.graph.scala.utils._ +import org.apache.flink.graph.scala.test.TestGraphUtils +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.junit.Assert.assertEquals +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +@RunWith(classOf[Parameterized]) +class GraphUtilsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends +MultipleProgramsTestBase(mode) { + + @Test + @throws(classOf[Exception]) + def testChecksumHashCodeVerticesAndEdges() { + val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment + val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils + .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) + val checksum = graph.checksumHashCode() + assertEquals(checksum.getCount, 12L) + assertEquals(checksum.getChecksum, 19665L) + } + +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java new file mode 100644 index 0000000000000..e7d3a16951bf4 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; + +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; + +public class GraphUtils { + + /** + * Computes the checksum over the Graph + * + * @return the checksum over the vertices and edges. + */ + public static Utils.ChecksumHashCode checksumHashCode(Graph graph) throws Exception { + ChecksumHashCode checksum = DataSetUtils.checksumHashCode(graph.getVertices()); + checksum.add(DataSetUtils.checksumHashCode(graph.getEdges())); + return checksum; + } +} diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java new file mode 100644 index 0000000000000..51602bc41a95b --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.graph.utils.GraphUtils; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphUtilsITCase extends MultipleProgramsTestBase { + + public GraphUtilsITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testChecksumHashCodeVerticesAndEdges() throws Exception { + /* + * Test checksum hashcode + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet( + TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), + env); + + ChecksumHashCode checksum = GraphUtils.checksumHashCode(graph); + + assertEquals(checksum.getCount(), 12L); + assertEquals(checksum.getChecksum(), 19665L); + } + +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala index 82d43948b1434..d11cf8c525f90 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala @@ -20,7 +20,9 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.Utils +import org.apache.flink.api.java.Utils.ChecksumHashCode import org.apache.flink.api.java.utils.{DataSetUtils => jutils} +import org.apache.flink.util.AbstractID import _root_.scala.language.implicitConversions import _root_.scala.reflect.ClassTag @@ -103,6 +105,25 @@ package object utils { : DataSet[T] = { wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, seed)) } + + // -------------------------------------------------------------------------------------------- + // Checksum + // -------------------------------------------------------------------------------------------- + + /** + * Convenience method to get the count (number of elements) of a DataSet + * as well as the checksum (sum over element hashes). + * + * @return A ChecksumHashCode with the count and checksum of elements in the data set. + * + * @see [[org.apache.flink.api.java.Utils.ChecksumHashCodeHelper]] + */ + def checksumHashCode(): ChecksumHashCode = { + val id = new AbstractID().toString + self.javaSet.output(new Utils.ChecksumHashCodeHelper[T](id)) + val res = self.javaSet.getExecutionEnvironment.execute() + res.getAccumulatorResult[ChecksumHashCode](id) + } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java index 478354a1be6fa..4ccc6e24ba006 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java @@ -23,8 +23,10 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -81,4 +83,15 @@ public Long map(Tuple2 value) throws Exception { Assert.assertEquals(expectedSize, result.size()); } + + @Test + public void testIntegerDataSetChecksumHashCode() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet ds = CollectionDataSets.getIntegerDataSet(env); + + Utils.ChecksumHashCode checksum = DataSetUtils.checksumHashCode(ds); + Assert.assertEquals(checksum.getCount(), 15); + Assert.assertEquals(checksum.getChecksum(), 55); + } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala index 7fff8ff606436..25ecc9c5bdc00 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.util +import org.apache.flink.api.java.Utils.ChecksumHashCode import org.apache.flink.api.scala._ import org.apache.flink.api.scala.utils._ import org.apache.flink.test.util.MultipleProgramsTestBase @@ -61,4 +62,15 @@ class DataSetUtilsITCase ( Assert.assertEquals(expectedSize, result.size) } + + @Test + def testIntegerDataSetChecksumHashCode(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = CollectionDataSets.getIntDataSet(env) + + val checksum: ChecksumHashCode = ds.checksumHashCode() + Assert.assertEquals(checksum.getCount, 15) + Assert.assertEquals(checksum.getChecksum, 55) + } }