diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 20cc7ee16241c..d3fad83c13f9d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -113,15 +113,14 @@ public ForStKeyedStateBackend build() throws BackendBuildingException { int keyGroupPrefixBytes = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix( numberOfKeyGroups); - // it is important that we only create the key builder after the restore, and not - // before; - // restore operations may reconfigure the key serializer, so accessing the key - // serializer + // it is important that we only create the key builder after the restore, and not before; + // restore operations may reconfigure the key serializer, so accessing the key serializer // only now we can be certain that the key serializer used in the builder is final. Supplier> serializedKeyBuilder = () -> new SerializedCompositeKeyBuilder<>( - keySerializerProvider.currentSchemaSerializer(), + // must create new copy for each SerializedCompositeKeyBuilder + keySerializerProvider.currentSchemaSerializer().duplicate(), keyGroupPrefixBytes, KEY_SERIALIZER_BUFFER_START_SIZE); Supplier valueSerializerView =