Skip to content

Commit

Permalink
[FLINK-2716] [gelly] [apis] New checksum method on DataSet and Graph
Browse files Browse the repository at this point in the history
This closes apache#1462
  • Loading branch information
greghogan authored and StephanEwen committed Jan 15, 2016
1 parent 98fad04 commit a8be52a
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 0 deletions.
93 changes: 93 additions & 0 deletions flink-java/src/main/java/org/apache/flink/api/java/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.commons.lang3.StringUtils;

import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
Expand Down Expand Up @@ -158,6 +160,97 @@ public void close() {
}
}

public static class ChecksumHashCode implements SimpleAccumulator<ChecksumHashCode> {

private static final long serialVersionUID = 1L;

private long count;
private long checksum;

public ChecksumHashCode() {}

public ChecksumHashCode(long count, long checksum) {
this.count = count;
this.checksum = checksum;
}

public long getCount() {
return count;
}

public long getChecksum() {
return checksum;
}

@Override
public void add(ChecksumHashCode value) {
this.count += value.count;
this.checksum += value.checksum;
}

@Override
public ChecksumHashCode getLocalValue() {
return this;
}

@Override
public void resetLocal() {
this.count = 0;
this.checksum = 0;
}

@Override
public void merge(Accumulator<ChecksumHashCode, ChecksumHashCode> other) {
this.add(other.getLocalValue());
}

@Override
public ChecksumHashCode clone() {
return new ChecksumHashCode(count, checksum);
}

@Override
public String toString() {
return "ChecksumHashCode " + this.checksum + ", count " + this.count;
}
}

@SkipCodeAnalysis
public static class ChecksumHashCodeHelper<T> extends RichOutputFormat<T> {

private static final long serialVersionUID = 1L;

private final String id;
private long counter;
private long checksum;

public ChecksumHashCodeHelper(String id) {
this.id = id;
this.counter = 0L;
this.checksum = 0L;
}

@Override
public void configure(Configuration parameters) {}

@Override
public void open(int taskNumber, int numTasks) {}

@Override
public void writeRecord(T record) throws IOException {
counter++;
// convert 32-bit integer to non-negative long
checksum += record.hashCode() & 0xffffffffL;
}

@Override
public void close() throws IOException {
ChecksumHashCode update = new ChecksumHashCode(counter, checksum);
getRuntimeContext().addAccumulator(id, update);
}
}


// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.java.utils;

import com.google.common.collect.Lists;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.java.DataSet;
Expand All @@ -30,6 +31,7 @@
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;

import java.util.Collections;
Expand Down Expand Up @@ -246,6 +248,25 @@ public static <T> DataSet<T> sampleWithSize(
return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation);
}

// --------------------------------------------------------------------------------------------
// Checksum
// --------------------------------------------------------------------------------------------

/**
* Convenience method to get the count (number of elements) of a DataSet
* as well as the checksum (sum over element hashes).
*
* @return A ChecksumHashCode that represents the count and checksum of elements in the data set.
*/
public static <T> Utils.ChecksumHashCode checksumHashCode(DataSet<T> input) throws Exception {
final String id = new AbstractID().toString();

input.output(new Utils.ChecksumHashCodeHelper<T>(id)).name("ChecksumHashCode");

JobExecutionResult res = input.getExecutionEnvironment().execute();
return res.<Utils.ChecksumHashCode> getAccumulatorResult(id);
}


// *************************************************************************
// UTIL METHODS
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.Utils.ChecksumHashCode
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._

import scala.reflect.ClassTag

package object utils {
/**
* This class provides utility methods for computing checksums over a Graph.
*
* @param self Graph
*/
implicit class GraphUtils[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {

/**
* Computes the ChecksumHashCode over the Graph.
*
* @return the ChecksumHashCode over the vertices and edges.
*/
@throws(classOf[Exception])
def checksumHashCode(): ChecksumHashCode = {
val checksum: ChecksumHashCode = self.getVertices.checksumHashCode()
checksum.add(self.getEdges checksumHashCode())
checksum
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.scala.test.util

import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.utils._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized

@RunWith(classOf[Parameterized])
class GraphUtilsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {

@Test
@throws(classOf[Exception])
def testChecksumHashCodeVerticesAndEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val checksum = graph.checksumHashCode()
assertEquals(checksum.getCount, 12L)
assertEquals(checksum.getChecksum, 19665L)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.utils;

import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Graph;

public class GraphUtils {

/**
* Computes the checksum over the Graph
*
* @return the checksum over the vertices and edges.
*/
public static Utils.ChecksumHashCode checksumHashCode(Graph graph) throws Exception {
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(graph.getVertices());
checksum.add(DataSetUtils.checksumHashCode(graph.getEdges()));
return checksum;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.test.util;

import static org.junit.Assert.assertEquals;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class GraphUtilsITCase extends MultipleProgramsTestBase {

public GraphUtilsITCase(TestExecutionMode mode){
super(mode);
}

@Test
public void testChecksumHashCodeVerticesAndEdges() throws Exception {
/*
* Test checksum hashcode
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Graph<Long, Long, Long> graph = Graph.fromDataSet(
TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env),
env);

ChecksumHashCode checksum = GraphUtils.checksumHashCode(graph);

assertEquals(checksum.getCount(), 12L);
assertEquals(checksum.getChecksum(), 19665L);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.flink.api.scala

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.Utils
import org.apache.flink.api.java.Utils.ChecksumHashCode
import org.apache.flink.api.java.utils.{DataSetUtils => jutils}
import org.apache.flink.util.AbstractID

import _root_.scala.language.implicitConversions
import _root_.scala.reflect.ClassTag
Expand Down Expand Up @@ -103,6 +105,25 @@ package object utils {
: DataSet[T] = {
wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, seed))
}

// --------------------------------------------------------------------------------------------
// Checksum
// --------------------------------------------------------------------------------------------

/**
* Convenience method to get the count (number of elements) of a DataSet
* as well as the checksum (sum over element hashes).
*
* @return A ChecksumHashCode with the count and checksum of elements in the data set.
*
* @see [[org.apache.flink.api.java.Utils.ChecksumHashCodeHelper]]
*/
def checksumHashCode(): ChecksumHashCode = {
val id = new AbstractID().toString
self.javaSet.output(new Utils.ChecksumHashCodeHelper[T](id))
val res = self.javaSet.getExecutionEnvironment.execute()
res.getAccumulatorResult[ChecksumHashCode](id)
}
}

}
Loading

0 comments on commit a8be52a

Please sign in to comment.