Skip to content

Commit

Permalink
[FLINK-11485] [core] Refactor multiple nested serializer compatibilit…
Browse files Browse the repository at this point in the history
…y resolution logic to CompositeTypeSerializerUtil

This commit refactors the logic of resolving overall compatibility
results across multiple nested serializers of a composite serializer out
of the CompositeTypeSerializerSnapshot class.

This allows us to reuse this functionality when implementing the
compatibility check for the PojoSerializerSnapshot
  • Loading branch information
tzulitai committed Feb 22, 2019
1 parent cf7b86d commit d5d45c8
Show file tree
Hide file tree
Showing 4 changed files with 597 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

import static org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -304,57 +304,15 @@ private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityRe
TypeSerializer<?>[] newNestedSerializers,
TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {

Preconditions.checkArgument(newNestedSerializers.length == nestedSerializerSnapshots.length,
"Different number of new serializers and existing serializer snapshots.");

TypeSerializer<?>[] reconfiguredNestedSerializers = new TypeSerializer[newNestedSerializers.length];

// check nested serializers for compatibility
boolean nestedSerializerRequiresMigration = false;
boolean hasReconfiguredNestedSerializers = false;
for (int i = 0; i < nestedSerializerSnapshots.length; i++) {
TypeSerializerSchemaCompatibility<?> compatibility =
resolveCompatibility(newNestedSerializers[i], nestedSerializerSnapshots[i]);

// if any one of the new nested serializers is incompatible, we can just short circuit the result
if (compatibility.isIncompatible()) {
return TypeSerializerSchemaCompatibility.incompatible();
}

if (compatibility.isCompatibleAfterMigration()) {
nestedSerializerRequiresMigration = true;
} else if (compatibility.isCompatibleWithReconfiguredSerializer()) {
hasReconfiguredNestedSerializers = true;
reconfiguredNestedSerializers[i] = compatibility.getReconfiguredSerializer();
} else if (compatibility.isCompatibleAsIs()) {
reconfiguredNestedSerializers[i] = newNestedSerializers[i];
} else {
throw new IllegalStateException("Undefined compatibility type.");
}
}

if (nestedSerializerRequiresMigration) {
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
}
IntermediateCompatibilityResult<T> intermediateResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(newNestedSerializers, nestedSerializerSnapshots);

if (hasReconfiguredNestedSerializers) {
if (intermediateResult.isCompatibleWithReconfiguredSerializer()) {
@SuppressWarnings("unchecked")
TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(reconfiguredNestedSerializers);
TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(intermediateResult.getNestedSerializers());
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
}

// ends up here if everything is compatible as is
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}

@SuppressWarnings("unchecked")
private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
TypeSerializer<?> serializer,
TypeSerializerSnapshot<?> snapshot) {

TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;

return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
return intermediateResult.getFinalResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Utilities for the {@link CompositeTypeSerializerSnapshot}.
Expand Down Expand Up @@ -62,4 +65,147 @@ public static void setNestedSerializersSnapshots(
NestedSerializersSnapshotDelegate delegate = new NestedSerializersSnapshotDelegate(nestedSnapshots);
compositeSnapshot.setNestedSerializersSnapshotDelegate(delegate);
}

/**
* Constructs an {@link IntermediateCompatibilityResult} with the given array of nested serializers and their
* corresponding serializer snapshots.
*
* <p>This result is considered "intermediate", because the actual final result is not yet built if it isn't
* defined. This is the case if the final result is supposed to be
* {@link TypeSerializerSchemaCompatibility#compatibleWithReconfiguredSerializer(TypeSerializer)}, where
* construction of the reconfigured serializer instance should be done by the caller.
*
* <p>For other cases, i.e. {@link TypeSerializerSchemaCompatibility#compatibleAsIs()},
* {@link TypeSerializerSchemaCompatibility#compatibleAfterMigration()}, and
* {@link TypeSerializerSchemaCompatibility#incompatible()}, these results are considered final.
*
* @param newNestedSerializers the new nested serializers to check for compatibility.
* @param nestedSerializerSnapshots the associated nested serializers' snapshots.
*
* @return the intermediate compatibility result of the new nested serializers.
*/
public static <T> IntermediateCompatibilityResult<T> constructIntermediateCompatibilityResult(
TypeSerializer<?>[] newNestedSerializers,
TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {

Preconditions.checkArgument(newNestedSerializers.length == nestedSerializerSnapshots.length,
"Different number of new serializers and existing serializer snapshots.");

TypeSerializer<?>[] nestedSerializers = new TypeSerializer[newNestedSerializers.length];

// check nested serializers for compatibility
boolean nestedSerializerRequiresMigration = false;
boolean hasReconfiguredNestedSerializers = false;
for (int i = 0; i < nestedSerializerSnapshots.length; i++) {
TypeSerializerSchemaCompatibility<?> compatibility =
resolveCompatibility(newNestedSerializers[i], nestedSerializerSnapshots[i]);

// if any one of the new nested serializers is incompatible, we can just short circuit the result
if (compatibility.isIncompatible()) {
return IntermediateCompatibilityResult.definedIncompatibleResult();
}

if (compatibility.isCompatibleAfterMigration()) {
nestedSerializerRequiresMigration = true;
} else if (compatibility.isCompatibleWithReconfiguredSerializer()) {
hasReconfiguredNestedSerializers = true;
nestedSerializers[i] = compatibility.getReconfiguredSerializer();
} else if (compatibility.isCompatibleAsIs()) {
nestedSerializers[i] = newNestedSerializers[i];
} else {
throw new IllegalStateException("Undefined compatibility type.");
}
}

if (nestedSerializerRequiresMigration) {
return IntermediateCompatibilityResult.definedCompatibleAfterMigrationResult();
}

if (hasReconfiguredNestedSerializers) {
return IntermediateCompatibilityResult.undefinedReconfigureResult(nestedSerializers);
}

// ends up here if everything is compatible as is
return IntermediateCompatibilityResult.definedCompatibleAsIsResult(nestedSerializers);
}

public static class IntermediateCompatibilityResult<T> {

private final TypeSerializerSchemaCompatibility.Type compatibilityType;
private final TypeSerializer<?>[] nestedSerializers;

static <T> IntermediateCompatibilityResult<T> definedCompatibleAsIsResult(TypeSerializer<?>[] originalSerializers) {
return new IntermediateCompatibilityResult<>(TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AS_IS, originalSerializers);
}

static <T> IntermediateCompatibilityResult<T> definedIncompatibleResult() {
return new IntermediateCompatibilityResult<>(TypeSerializerSchemaCompatibility.Type.INCOMPATIBLE, null);
}

static <T> IntermediateCompatibilityResult<T> definedCompatibleAfterMigrationResult() {
return new IntermediateCompatibilityResult<>(TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_MIGRATION, null);
}

static <T> IntermediateCompatibilityResult<T> undefinedReconfigureResult(TypeSerializer<?>[] reconfiguredNestedSerializers) {
return new IntermediateCompatibilityResult<>(TypeSerializerSchemaCompatibility.Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER, reconfiguredNestedSerializers);
}

private IntermediateCompatibilityResult(
TypeSerializerSchemaCompatibility.Type compatibilityType,
TypeSerializer<?>[] nestedSerializers) {
this.compatibilityType = checkNotNull(compatibilityType);
this.nestedSerializers = nestedSerializers;
}

public boolean isCompatibleWithReconfiguredSerializer() {
return compatibilityType == TypeSerializerSchemaCompatibility.Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER;
}

public boolean isCompatibleAsIs() {
return compatibilityType == TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AS_IS;
}

public boolean isCompatibleAfterMigration() {
return compatibilityType == TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AFTER_MIGRATION;
}

public boolean isIncompatible() {
return compatibilityType == TypeSerializerSchemaCompatibility.Type.INCOMPATIBLE;
}

public TypeSerializerSchemaCompatibility<T> getFinalResult() {
checkState(
compatibilityType != TypeSerializerSchemaCompatibility.Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
"unable to build final result if intermediate compatibility type is COMPATIBLE_WITH_RECONFIGURED_SERIALIZER.");
switch (compatibilityType) {
case COMPATIBLE_AS_IS:
return TypeSerializerSchemaCompatibility.compatibleAsIs();
case COMPATIBLE_AFTER_MIGRATION:
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
case INCOMPATIBLE:
return TypeSerializerSchemaCompatibility.incompatible();
default:
throw new IllegalStateException("unrecognized compatibility type.");
}
}

public TypeSerializer<?>[] getNestedSerializers() {
checkState(
compatibilityType == TypeSerializerSchemaCompatibility.Type.COMPATIBLE_AS_IS
|| compatibilityType == TypeSerializerSchemaCompatibility.Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
"only intermediate compatibility types COMPATIBLE_AS_IS and COMPATIBLE_WITH_RECONFIGURED_SERIALIZER have nested serializers.");
return nestedSerializers;
}
}

@SuppressWarnings("unchecked")
private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
TypeSerializer<?> serializer,
TypeSerializerSnapshot<?> snapshot) {

TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;

return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils;

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer;
import org.junit.Test;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;

/**
* Tests for the {@link CompositeTypeSerializerUtil}.
*/
public class CompositeTypeSerializerUtilTest {

// ------------------------------------------------------------------------------------------------
// Tests for CompositeTypeSerializerUtil#constructIntermediateCompatibilityResult
// ------------------------------------------------------------------------------------------------

@Test
public void testCompatibleAsIsIntermediateCompatibilityResult() {
final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
};

final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer(),
new SchemaCompatibilityTestingSerializer.InitialSerializer(),
};

IntermediateCompatibilityResult<?> intermediateCompatibilityResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(testNewSerializers, testSerializerSnapshots);

assertTrue(intermediateCompatibilityResult.isCompatibleAsIs());
assertTrue(intermediateCompatibilityResult.getFinalResult().isCompatibleAsIs());
assertArrayEquals(testNewSerializers, intermediateCompatibilityResult.getNestedSerializers());
}

@Test
public void testCompatibleWithReconfiguredSerializerIntermediateCompatibilityResult() {
final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
};

final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer(),
new SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(),
};

IntermediateCompatibilityResult<?> intermediateCompatibilityResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(testNewSerializers, testSerializerSnapshots);

assertTrue(intermediateCompatibilityResult.isCompatibleWithReconfiguredSerializer());

final TypeSerializer<?>[] expectedReconfiguredNestedSerializers = new TypeSerializer<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer(),
new SchemaCompatibilityTestingSerializer.InitialSerializer(),
};
assertArrayEquals(expectedReconfiguredNestedSerializers, intermediateCompatibilityResult.getNestedSerializers());
}

@Test
public void testCompatibleAfterMigrationIntermediateCompatibilityResult() {
final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
};

final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] {
new SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(),
new SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<>(),
new SchemaCompatibilityTestingSerializer.InitialSerializer()
};

IntermediateCompatibilityResult<?> intermediateCompatibilityResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(testNewSerializers, testSerializerSnapshots);

assertTrue(intermediateCompatibilityResult.isCompatibleAfterMigration());
assertTrue(intermediateCompatibilityResult.getFinalResult().isCompatibleAfterMigration());
}

@Test
public void testIncompatibleIntermediateCompatibilityResult() {
final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
};

final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] {
new SchemaCompatibilityTestingSerializer.InitialSerializer(),
new SchemaCompatibilityTestingSerializer.IncompatibleSerializer<>(),
new SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(),
new SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer()
};

IntermediateCompatibilityResult<?> intermediateCompatibilityResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(testNewSerializers, testSerializerSnapshots);

assertTrue(intermediateCompatibilityResult.isIncompatible());
assertTrue(intermediateCompatibilityResult.getFinalResult().isIncompatible());
}

@Test(expected = IllegalStateException.class)
public void testGetFinalResultOnUndefinedReconfigureIntermediateCompatibilityResultFails() {
IntermediateCompatibilityResult<Integer> intermediateCompatibilityResult =
IntermediateCompatibilityResult.undefinedReconfigureResult(new TypeSerializer[]{ IntSerializer.INSTANCE });

intermediateCompatibilityResult.getFinalResult();
}

@Test(expected = IllegalStateException.class)
public void testGetNestedSerializersOnCompatibleAfterMigrationIntermediateCompatibilityResultFails() {
IntermediateCompatibilityResult<Integer> intermediateCompatibilityResult =
IntermediateCompatibilityResult.definedCompatibleAfterMigrationResult();

intermediateCompatibilityResult.getNestedSerializers();
}

@Test(expected = IllegalStateException.class)
public void testGetNestedSerializersOnIncompatibleIntermediateCompatibilityResultFails() {
IntermediateCompatibilityResult<Integer> intermediateCompatibilityResult =
IntermediateCompatibilityResult.definedIncompatibleResult();

intermediateCompatibilityResult.getNestedSerializers();
}
}
Loading

0 comments on commit d5d45c8

Please sign in to comment.