Skip to content

Commit

Permalink
[FLINK-11741] [runtime] Replace ArrayListSerializer's ensureCompatibi…
Browse files Browse the repository at this point in the history
…lity method with SelfResolvingTypeSerializer implementation
  • Loading branch information
tzulitai committed Feb 28, 2019
1 parent d954b25 commit 154bb6e
Showing 1 changed file with 23 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,21 @@
*/
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;
import java.util.ArrayList;

@SuppressWarnings("ForLoopReplaceableByForEach")
final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>>
implements TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer<ArrayList<T>> {

private static final long serialVersionUID = 1119562170939152304L;

Expand Down Expand Up @@ -146,29 +144,28 @@ public TypeSerializerSnapshot<ArrayList<T>> snapshotConfiguration() {
}

/**
* NOTE: this method cannot be removed until {@link CollectionSerializerConfigSnapshot} is fully removed.
* We need to implement this method as a {@link TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer}
* because this serializer was previously returning a shared {@link CollectionSerializerConfigSnapshot}
* as its snapshot.
*
* <p>When the {@link CollectionSerializerConfigSnapshot} is restored, it is incapable of redirecting
* the compatibility check to {@link ArrayListSerializerSnapshot}, so we do it here.
*/
@Override
@SuppressWarnings("deprecation")
public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousElemSerializerAndConfig =
((CollectionSerializerConfigSnapshot<?, ?>) configSnapshot).getSingleNestedSerializerAndConfig();

CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
previousElemSerializerAndConfig.f0,
UnloadableDummyTypeSerializer.class,
previousElemSerializerAndConfig.f1,
elementSerializer);

if (!compatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new ArrayListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
public TypeSerializerSchemaCompatibility<ArrayList<T>> resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(
TypeSerializerConfigSnapshot<ArrayList<T>> deprecatedConfigSnapshot) {

if (deprecatedConfigSnapshot instanceof CollectionSerializerConfigSnapshot) {
CollectionSerializerConfigSnapshot<ArrayList<T>, T> castedLegacySnapshot =
(CollectionSerializerConfigSnapshot<ArrayList<T>, T>) deprecatedConfigSnapshot;

ArrayListSerializerSnapshot<T> newSnapshot = new ArrayListSerializerSnapshot<>();
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
this,
newSnapshot,
castedLegacySnapshot.getNestedSerializerSnapshots());
}

return CompatibilityResult.requiresMigration();
return TypeSerializerSchemaCompatibility.incompatible();
}
}

0 comments on commit 154bb6e

Please sign in to comment.