Skip to content

Commit

Permalink
[FLINK-1417] Automatically register types with Kryo
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Feb 18, 2015
1 parent 5015ab4 commit 354efec
Show file tree
Hide file tree
Showing 40 changed files with 881 additions and 158 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ tmp
_site
docs/api
build-target
flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import com.esotericsoftware.kryo.Serializer;

import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.ArrayList;
import java.util.List;

/**
* A configuration config for configuring behavior of the system, such as whether to use
Expand All @@ -44,12 +42,17 @@ public class ExecutionConfig implements Serializable {

private boolean objectReuse = false;

private boolean disableAutoTypeRegistration = false;


// Serializers and types registered with Kryo and the PojoSerializer
private final Map<Class<?>, Serializer<?>> registeredKryoSerializers = new HashMap<Class<?>, Serializer<?>>();
private final Map<Class<?>, Class<? extends Serializer<?>>> registeredKryoSerializersClasses = new HashMap<Class<?>, Class<? extends Serializer<?>>>();
private final Set<Class<?>> registeredKryoTypes = new HashSet<Class<?>>();
private final Set<Class<?>> registeredPojoTypes = new HashSet<Class<?>>();
// we store them in lists to ensure they are registered in order in all kryo instances.
private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>();
private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>();
private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
private final List<Class<?>> registeredKryoTypes = new ArrayList<Class<?>>();
private final List<Class<?>> registeredPojoTypes = new ArrayList<Class<?>>();

/**
* Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
Expand Down Expand Up @@ -191,17 +194,18 @@ public boolean isObjectReuseEnabled() {
// Registry for types and serializers
// --------------------------------------------------------------------------------------------



/**
* Registers the given Serializer as a default serializer for the given type at the
* {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
* Adds a new Kryo default serializer to the Runtime.
*
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) {
public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
if (type == null || serializer == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
Expand All @@ -210,23 +214,66 @@ public void registerKryoSerializer(Class<?> type, Serializer<?> serializer) {
+ "as defined by java.io.Serializable. For stateless serializers, you can use the "
+ "'registerSerializer(Class, Class)' method to register the serializer via its class.");
}

registeredKryoSerializers.put(type, serializer);
Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer);
if(!defaultKryoSerializers.contains(e)) {
defaultKryoSerializers.add(e);
}
}

/**
* Registers the given Serializer via its class as a serializer for the given type at the
* {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
* Adds a new Kryo default serializer to the Runtime.
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
public void registerKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
if(!defaultKryoSerializerClasses.contains(e)) {
defaultKryoSerializerClasses.add(e);
}
}

/**
* Registers the given type with a Kryo Serializer.
*
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*
* @param type The class of the types serialized with the given serializer.
* @param serializer The serializer to use.
*/
public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
if (type == null || serializer == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
if (!(serializer instanceof java.io.Serializable)) {
throw new IllegalArgumentException("The serializer instance must be serializable, (for distributing it in the cluster), "
+ "as defined by java.io.Serializable. For stateless serializers, you can use the "
+ "'registerSerializer(Class, Class)' method to register the serializer via its class.");
}
Entry<Class<?>, Serializer<?>> e = new Entry<Class<?>, Serializer<?>>(type, serializer);
if(!registeredTypesWithKryoSerializers.contains(e)) {
registeredTypesWithKryoSerializers.add(e);
}
}

registeredKryoSerializersClasses.put(type, serializerClass);
/**
* Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer
*
* @param type The class of the types serialized with the given serializer.
* @param serializerClass The class of the serializer to use.
*/
public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
if (type == null || serializerClass == null) {
throw new NullPointerException("Cannot register null class or serializer.");
}
Entry<Class<?>, Class<? extends Serializer<?>>> e = new Entry<Class<?>, Class<? extends Serializer<?>>>(type, serializerClass);
if(!registeredTypesWithKryoSerializerClasses.contains(e)) {
registeredTypesWithKryoSerializerClasses.add(e);
}
}

/**
Expand All @@ -241,7 +288,9 @@ public void registerPojoType(Class<?> type) {
if (type == null) {
throw new NullPointerException("Cannot register null type class.");
}
registeredPojoTypes.add(type);
if(!registeredPojoTypes.contains(type)) {
registeredPojoTypes.add(type);
}
}

/**
Expand All @@ -260,29 +309,48 @@ public void registerKryoType(Class<?> type) {
}

/**
* Returns the registered Kryo Serializers.
* Returns the registered types with Kryo Serializers.
*/
public Map<Class<?>, Serializer<?>> getRegisteredKryoSerializers() {
return registeredKryoSerializers;
public List<Entry<Class<?>, Serializer<?>>> getRegisteredTypesWithKryoSerializers() {
return registeredTypesWithKryoSerializers;
}

/**
* Returns the registered Kryo Serializer classes.
* Returns the registered types with their Kryo Serializer classes.
*/
public Map<Class<?>, Class<? extends Serializer<?>>> getRegisteredKryoSerializersClasses() {
return registeredKryoSerializersClasses;
public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getRegisteredTypesWithKryoSerializerClasses() {
return registeredTypesWithKryoSerializerClasses;
}


/**
* Returns the registered default Kryo Serializers.
*/
public List<Entry<Class<?>, Serializer<?>>> getDefaultKryoSerializers() {
return defaultKryoSerializers;
}

/**
* Returns the registered default Kryo Serializer classes.
*/
public List<Entry<Class<?>, Class<? extends Serializer<?>>>> getDefaultKryoSerializerClasses() {
return defaultKryoSerializerClasses;
}

/**
* Returns the registered Kryo types.
*/
public Set<Class<?>> getRegisteredKryoTypes() {
public List<Class<?>> getRegisteredKryoTypes() {
if (isForceKryoEnabled()) {
// if we force kryo, we must also return all the types that
// were previously only registered as POJO
Set<Class<?>> result = new HashSet<Class<?>>();
List<Class<?>> result = new ArrayList<Class<?>>();
result.addAll(registeredKryoTypes);
result.addAll(registeredPojoTypes);
for(Class<?> t : registeredPojoTypes) {
if (!result.contains(t)) {
result.add(t);
}
}
return result;
} else {
return registeredKryoTypes;
Expand All @@ -292,7 +360,76 @@ public Set<Class<?>> getRegisteredKryoTypes() {
/**
* Returns the registered POJO types.
*/
public Set<Class<?>> getRegisteredPojoTypes() {
public List<Class<?>> getRegisteredPojoTypes() {
return registeredPojoTypes;
}


public boolean isDisableAutoTypeRegistration() {
return disableAutoTypeRegistration;
}

/**
* Control whether Flink is automatically registering all types in the user programs with
* Kryo.
*
* @param disableAutoTypeRegistration
*/
public void setDisableAutoTypeRegistration(boolean disableAutoTypeRegistration) {
this.disableAutoTypeRegistration = disableAutoTypeRegistration;
}


public static class Entry<K, V> implements Serializable {
private final K k;
private final V v;
public Entry(K k, V v) {
this.k = k;
this.v = v;
}

public K getKey() {
return k;
}

public V getValue() {
return v;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

Entry entry = (Entry) o;

if (k != null ? !k.equals(entry.k) : entry.k != null) {
return false;
}
if (v != null ? !v.equals(entry.v) : entry.v != null) {
return false;
}

return true;
}

@Override
public int hashCode() {
int result = k != null ? k.hashCode() : 0;
result = 31 * result + (v != null ? v.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "Entry{" +
"k=" + k +
", v=" + v +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,8 @@ public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orde
}
return getNewComparator(config);
}



public static class FlatFieldDescriptor {
private int keyPosition;
private TypeInformation<?> type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class TypeSerializer<T> implements Serializable {
// --------------------------------------------------------------------------------------------
// General information about the type and the serializer
// --------------------------------------------------------------------------------------------

/**
* Gets whether the type is an immutable type.
*
Expand Down
2 changes: 2 additions & 0 deletions flink-dist/src/main/flink-bin/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ under the Apache License (v 2.0):
- Apache Kafka (http://kafka.apache.org)
- Apache Flume (http://flume.apache.org)
- Apache Sling (http://sling.apache.org)
- Apache Thrift (http://thrift.apache.org)
- Google Guava (https://code.google.com/p/guava-libraries/)
- Google Protocol Buffers (https://github.com/google/protobuf/)
- Netty v4.0.21 (http://netty.io)
- Powermock (http://www.powermock.org)
- Javassist (http://www.javassist.org)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Loading

0 comments on commit 354efec

Please sign in to comment.