Skip to content

Commit

Permalink
[FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new seria…
Browse files Browse the repository at this point in the history
…lization compatibility APIs for key / namespace serializer checks

This commit lets the InternalTimerServiceImpl properly use
TypeSerializerSchemaCompatibility /
TypeSerializerSnapshot#resolveSchemaCompatibility when attempting to
check the compatibility of new key and namespace serializers.

This also fixes the fact that this check was previously broken, in that
the key / namespace serializer was not reassigned to be reconfigured
ones.
  • Loading branch information
tzulitai committed Feb 28, 2019
1 parent 3f9d9cf commit 14aae59
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
Expand Down Expand Up @@ -142,26 +141,31 @@ public void startTimerService(

// the following is the case where we restore
if (restoredTimersSnapshot != null) {
CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
this.keyDeserializer,
null,
restoredTimersSnapshot.getKeySerializerSnapshot(),
keySerializer);

CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
this.namespaceDeserializer,
null,
restoredTimersSnapshot.getNamespaceSerializerSnapshot(),
namespaceSerializer);

if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
throw new IllegalStateException("Tried to initialize restored TimerService " +
"with incompatible serializers than those used to snapshot its state.");
TypeSerializerSchemaCompatibility<K> keySerializerCompatibility =
restoredTimersSnapshot.getKeySerializerSnapshot().resolveSchemaCompatibility(keySerializer);

if (keySerializerCompatibility.isIncompatible() || keySerializerCompatibility.isCompatibleAfterMigration()) {
throw new IllegalStateException(
"Tried to initialize restored TimerService with new key serializer that requires migration or is incompatible.");
}

TypeSerializerSchemaCompatibility<N> namespaceSerializerCompatibility =
restoredTimersSnapshot.getNamespaceSerializerSnapshot().resolveSchemaCompatibility(namespaceSerializer);

if (namespaceSerializerCompatibility.isIncompatible() || namespaceSerializerCompatibility.isCompatibleAfterMigration()) {
throw new IllegalStateException(
"Tried to initialize restored TimerService with new namespace serializer that requires migration or is incompatible.");
}

this.keySerializer = keySerializerCompatibility.isCompatibleAsIs()
? keySerializer : keySerializerCompatibility.getReconfiguredSerializer();
this.namespaceSerializer = namespaceSerializerCompatibility.isCompatibleAsIs()
? namespaceSerializer : namespaceSerializerCompatibility.getReconfiguredSerializer();
} else {
this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
}

this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
this.keyDeserializer = null;
this.namespaceDeserializer = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.BackwardsCompatibleSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
Expand Down Expand Up @@ -265,8 +266,13 @@ protected void restoreKeyAndNamespaceSerializers(

DataInputViewStream dis = new DataInputViewStream(in);
try {
restoredTimersSnapshot.setKeySerializer(InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true));
restoredTimersSnapshot.setNamespaceSerializer(InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true));
final TypeSerializer<K> keySerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true);
final TypeSerializer<N> namespaceSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true);

restoredTimersSnapshot.setKeySerializer(keySerializer);
restoredTimersSnapshot.setKeySerializerSnapshot(new BackwardsCompatibleSerializerSnapshot<>(keySerializer));
restoredTimersSnapshot.setNamespaceSerializer(namespaceSerializer);
restoredTimersSnapshot.setNamespaceSerializerSnapshot(new BackwardsCompatibleSerializerSnapshot<>(namespaceSerializer));
} catch (ClassNotFoundException exception) {
throw new IOException(exception);
}
Expand Down

0 comments on commit 14aae59

Please sign in to comment.