Skip to content

Commit

Permalink
[FLINK-30614][serializer] Remove old method of resolving schema compa…
Browse files Browse the repository at this point in the history
…tibility.
  • Loading branch information
yinhan.yh authored and Zakelly committed Sep 27, 2024
1 parent be4549d commit 6a76eee
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,9 @@ There are no ways to specify the compatibility with the old serializer in the ne
not supported in some scenarios.

So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` has been marked as deprecated
and will be removed in the future. it is highly recommended to migrate from the old one to
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this are as follows:
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` is now removed and needs to be replaced with
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`.
To make this transition, follow these steps:

1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic
should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,9 @@ There are no ways to specify the compatibility with the old serializer in the ne
not supported in some scenarios.

So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` has been marked as deprecated
and will be removed in the future. it is highly recommended to migrate from the old one to
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this are as follows:
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` is now removed and needs to be replaced with
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`.
To make this transition, follow these steps:

1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic
should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,51 +112,6 @@ void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLo
*/
TypeSerializer<T> restoreSerializer();

/**
* Checks a new serializer's compatibility to read data written by the prior serializer.
*
* <p>When a checkpoint/savepoint is restored, this method checks whether the serialization
* format of the data in the checkpoint/savepoint is compatible for the format of the serializer
* used by the program that restores the checkpoint/savepoint. The outcome can be that the
* serialization format is compatible, that the program's serializer needs to reconfigure itself
* (meaning to incorporate some information from the TypeSerializerSnapshot to be compatible),
* that the format is outright incompatible, or that a migration needed. In the latter case, the
* TypeSerializerSnapshot produces a serializer to deserialize the data, and the restoring
* program's serializer re-serializes the data, thus converting the format during the restore
* operation.
*
* @deprecated This method has been replaced by {@link TypeSerializerSnapshot
* #resolveSchemaCompatibility(TypeSerializerSnapshot)} and will be removed in the future
* release. It's strongly recommended to migrate from old method to the new one, see the doc
* section "Migrating from deprecated
* `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`" for
* more details.
* @param newSerializer the new serializer to check.
* @return the serializer compatibility result.
*/
@Deprecated
default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
// A temporal check to ensure that at least one method is implemented to avoid infinite loop
TypeSerializerSnapshot<T> newSerializerSnapshot = newSerializer.snapshotConfiguration();
try {
Class<?> subClass =
newSerializerSnapshot
.getClass()
.getMethod("resolveSchemaCompatibility", TypeSerializerSnapshot.class)
.getDeclaringClass();
if (subClass == TypeSerializerSnapshot.class) {
throw new UnsupportedOperationException(
"Must implement at least one method about 'resolveSchemaCompatibility', "
+ "Recommend strongly to implement TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot), see FLIP-263 for more details");
}
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
// Call the new method to resolve the schema compatibility which must be implemented
return newSerializerSnapshot.resolveSchemaCompatibility(this);
}

/**
* Checks current serializer's compatibility to read data written by the prior serializer.
*
Expand All @@ -176,29 +131,8 @@ default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
* @param oldSerializerSnapshot the old serializer snapshot to check.
* @return the serializer compatibility result.
*/
default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializerSnapshot<T> oldSerializerSnapshot) {
// A temporal check to ensure that at least one method is implemented to avoid infinite
// loop,
// which will be removed after removing the deprecated method
// TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer).
try {
Class<?> subClass =
oldSerializerSnapshot
.getClass()
.getMethod("resolveSchemaCompatibility", TypeSerializer.class)
.getDeclaringClass();
if (subClass == TypeSerializerSnapshot.class) {
throw new UnsupportedOperationException(
"Must implement at least one method about 'resolveSchemaCompatibility', "
+ "Recommend strongly to implement TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot), see FLIP-263 for more details");
}
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
// Call the old method to resolve the schema compatibility which must be implemented
return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());
}
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializerSnapshot<T> oldSerializerSnapshot);

// ------------------------------------------------------------------------
// read / write utilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -105,6 +106,14 @@ public GenericArraySerializer<C> restoreSerializer() {
componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
}

@Override
public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
throw new UnsupportedOperationException(
"Unexpected call to GenericArraySerializerConfigSnapshot#resolveSchemaCompatibility."
+ " GenericArraySerializerSnapshot should be used instead.");
}

@Nullable
public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -106,4 +107,12 @@ public EitherSerializer<L, R> restoreSerializer() {
public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots();
}

@Override
public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) {
throw new UnsupportedOperationException(
"Unexpected call to EitherSerializerSnapshot#resolveSchemaCompatibility."
+ " JavaEitherSerializerSnapshot should be used instead.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,86 +24,10 @@
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link TypeSerializerSnapshot} */
class TypeSerializerSnapshotTest {

@Test
void testIllegalSchemaCompatibility() {
TypeSerializerSnapshot<Integer> illegalSnapshot =
new NotCompletedTypeSerializerSnapshot() {};

// Should throw UnsupportedOperationException if both two methods are not implemented
assertThatThrownBy(
() ->
illegalSnapshot.resolveSchemaCompatibility(
new NotCompletedTypeSerializer()))
.isInstanceOf(UnsupportedOperationException.class);
assertThatThrownBy(
() ->
illegalSnapshot.resolveSchemaCompatibility(
new NotCompletedTypeSerializer().snapshotConfiguration()))
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
void testNewSchemaCompatibility() {
TypeSerializerSnapshot<Integer> legalSnapshot =
new NotCompletedTypeSerializerSnapshot() {
@Override
public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility(
TypeSerializerSnapshot<Integer> oldSerializerSnapshot) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
};

// The result of resolving schema compatibility should always be determined by legalSnapshot
assertThat(
new NotCompletedTypeSerializerSnapshot()
.resolveSchemaCompatibility(
new NotCompletedTypeSerializer() {
@Override
public TypeSerializerSnapshot<Integer>
snapshotConfiguration() {
return legalSnapshot;
}
})
.isCompatibleAsIs())
.isTrue();
assertThat(
legalSnapshot
.resolveSchemaCompatibility(
new NotCompletedTypeSerializerSnapshot() {})
.isCompatibleAsIs())
.isTrue();
}

@Test
void testOldSchemaCompatibility() {
TypeSerializerSnapshot<Integer> legalSnapshot =
new NotCompletedTypeSerializerSnapshot() {

@Override
public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility(
TypeSerializer<Integer> newSerializer) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
};

// The result of resolving schema compatibility should always be determined by legalSnapshot
assertThat(
legalSnapshot
.resolveSchemaCompatibility(new NotCompletedTypeSerializer())
.isCompatibleAsIs())
.isTrue();
assertThat(
new NotCompletedTypeSerializerSnapshot()
.resolveSchemaCompatibility(legalSnapshot)
.isCompatibleAsIs())
.isTrue();
}

@Test
void testNestedSchemaCompatibility() {
TypeSerializerSnapshot<Integer> innerSnapshot =
Expand All @@ -119,9 +43,8 @@ public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility(
new NotCompletedTypeSerializerSnapshot() {
@Override
public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility(
TypeSerializer<Integer> newSerializer) {
return innerSnapshot.resolveSchemaCompatibility(
innerSnapshot.restoreSerializer());
TypeSerializerSnapshot<Integer> newSerializer) {
return innerSnapshot.resolveSchemaCompatibility(innerSnapshot);
}
};

Expand Down Expand Up @@ -232,5 +155,11 @@ public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
}
};
}

@Override
public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility(
TypeSerializerSnapshot<Integer> oldSerializerSnapshot) {
return TypeSerializerSchemaCompatibility.incompatible();
}
}
}

0 comments on commit 6a76eee

Please sign in to comment.