Skip to content

Commit

Permalink
[FLINK-21206] Write savepoints in unified format from HeapStateBackend
Browse files Browse the repository at this point in the history
This closes apache#14809
  • Loading branch information
dawidwys committed Feb 9, 2021
1 parent d67ace2 commit fc995d3
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* The snapshot strategy for this backend. This determines, e.g., if snapshots are synchronous
* or asynchronous.
*/
private final SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner;
private final SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner;

private final SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner;

private final StateTableFactory<K> stateTableFactory;

Expand All @@ -125,7 +127,8 @@ public HeapKeyedStateBackend(
Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
LocalRecoveryConfig localRecoveryConfig,
HeapPriorityQueueSetFactory priorityQueueSetFactory,
SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner,
SnapshotStrategyRunner<KeyedStateHandle, ?> checkpointStrategyRunner,
SnapshotStrategyRunner<KeyedStateHandle, ?> savepointStrategyRunner,
StateTableFactory<K> stateTableFactory,
InternalKeyContext<K> keyContext) {
super(
Expand All @@ -141,7 +144,8 @@ public HeapKeyedStateBackend(
this.registeredPQStates = registeredPQStates;
this.localRecoveryConfig = localRecoveryConfig;
this.priorityQueueSetFactory = priorityQueueSetFactory;
this.snapshotStrategyRunner = snapshotStrategyRunner;
this.checkpointStrategyRunner = checkpointStrategyRunner;
this.savepointStrategyRunner = savepointStrategyRunner;
this.stateTableFactory = stateTableFactory;
LOG.info("Initializing heap keyed state backend with stream factory.");
}
Expand Down Expand Up @@ -356,8 +360,13 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {

return snapshotStrategyRunner.snapshot(
checkpointId, timestamp, streamFactory, checkpointOptions);
if (checkpointOptions.getCheckpointType().isSavepoint()) {
return savepointStrategyRunner.snapshot(
checkpointId, timestamp, streamFactory, checkpointOptions);
} else {
return checkpointStrategyRunner.snapshot(
checkpointId, timestamp, streamFactory, checkpointOptions);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
HeapSnapshotStrategy<K> snapshotStrategy =
initSnapshotStrategy(registeredKVStates, registeredPQStates);
HeapSavepointStrategy<K> savepointStrategy =
new HeapSavepointStrategy<>(
registeredKVStates,
registeredPQStates,
keyGroupCompressionDecorator,
keyGroupRange,
keySerializerProvider,
numberOfKeyGroups);
InternalKeyContext<K> keyContext =
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);

Expand Down Expand Up @@ -124,6 +132,11 @@ public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
snapshotStrategy,
cancelStreamRegistryForBackend,
asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
new SnapshotStrategyRunner<>(
"Heap backend savepoint",
savepointStrategy,
cancelStreamRegistryForBackend,
asynchronousSnapshots ? ASYNCHRONOUS : SYNCHRONOUS),
stateTableFactory,
keyContext);
}
Expand Down Expand Up @@ -187,6 +200,7 @@ private HeapSnapshotStrategy<K> initSnapshotStrategy(
keyGroupCompressionDecorator,
localRecoveryConfig,
keyGroupRange,
keySerializerProvider);
keySerializerProvider,
numberOfKeyGroups);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StreamCompressionDecorator;

import javax.annotation.Nonnull;

import java.util.Map;

/** A strategy how to perform a snapshot of a {@link HeapKeyedStateBackend}. */
class HeapSavepointStrategy<K>
implements SnapshotStrategy<KeyedStateHandle, FullSnapshotResources<K>> {

private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
private final StreamCompressionDecorator keyGroupCompressionDecorator;
private final KeyGroupRange keyGroupRange;
private final StateSerializerProvider<K> keySerializerProvider;
private final int totalKeyGroups;

HeapSavepointStrategy(
Map<String, StateTable<K, ?, ?>> registeredKVStates,
Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
StreamCompressionDecorator keyGroupCompressionDecorator,
KeyGroupRange keyGroupRange,
StateSerializerProvider<K> keySerializerProvider,
int totalKeyGroups) {
this.registeredKVStates = registeredKVStates;
this.registeredPQStates = registeredPQStates;
this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
this.keyGroupRange = keyGroupRange;
this.keySerializerProvider = keySerializerProvider;
this.totalKeyGroups = totalKeyGroups;
}

@Override
public FullSnapshotResources<K> syncPrepareResources(long checkpointId) {
return HeapSnapshotResources.create(
registeredKVStates,
registeredPQStates,
keyGroupCompressionDecorator,
keyGroupRange,
getKeySerializer(),
totalKeyGroups);
}

@Override
public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
FullSnapshotResources<K> syncPartResource,
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) {

assert checkpointOptions.getCheckpointType().isSavepoint();
return new FullSnapshotAsyncWriter<>(
checkpointOptions.getCheckpointType(),
() ->
CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE, streamFactory),
syncPartResource);
}

public TypeSerializer<K> getKeySerializer() {
return keySerializerProvider.currentSchemaSerializer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.heap;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyValueStateIterator;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A set of resources required to take a checkpoint or savepoint from a {@link
* HeapKeyedStateBackend}.
*/
@Internal
final class HeapSnapshotResources<K> implements FullSnapshotResources<K> {
private final List<StateMetaInfoSnapshot> metaInfoSnapshots;
private final Map<StateUID, StateSnapshot> cowStateStableSnapshots;
private final StreamCompressionDecorator streamCompressionDecorator;
private final Map<StateUID, Integer> stateNamesToId;
private final KeyGroupRange keyGroupRange;
private final TypeSerializer<K> keySerializer;
private final int totalKeyGroups;

private HeapSnapshotResources(
List<StateMetaInfoSnapshot> metaInfoSnapshots,
Map<StateUID, StateSnapshot> cowStateStableSnapshots,
StreamCompressionDecorator streamCompressionDecorator,
Map<StateUID, Integer> stateNamesToId,
KeyGroupRange keyGroupRange,
TypeSerializer<K> keySerializer,
int totalKeyGroups) {
this.metaInfoSnapshots = metaInfoSnapshots;
this.cowStateStableSnapshots = cowStateStableSnapshots;
this.streamCompressionDecorator = streamCompressionDecorator;
this.stateNamesToId = stateNamesToId;
this.keyGroupRange = keyGroupRange;
this.keySerializer = keySerializer;
this.totalKeyGroups = totalKeyGroups;
}

public static <K> HeapSnapshotResources<K> create(
Map<String, StateTable<K, ?, ?>> registeredKVStates,
Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
StreamCompressionDecorator streamCompressionDecorator,
KeyGroupRange keyGroupRange,
TypeSerializer<K> keySerializer,
int totalKeyGroups) {

if (registeredKVStates.isEmpty() && registeredPQStates.isEmpty()) {
return new HeapSnapshotResources<>(
Collections.emptyList(),
Collections.emptyMap(),
streamCompressionDecorator,
Collections.emptyMap(),
keyGroupRange,
keySerializer,
totalKeyGroups);
}

int numStates = registeredKVStates.size() + registeredPQStates.size();

Preconditions.checkState(
numStates <= Short.MAX_VALUE,
"Too many states: "
+ numStates
+ ". Currently at most "
+ Short.MAX_VALUE
+ " states are supported");

final List<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<>(numStates);
final Map<StateUID, Integer> stateNamesToId = new HashMap<>(numStates);
final Map<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<>(numStates);

processSnapshotMetaInfoForAllStates(
metaInfoSnapshots,
cowStateStableSnapshots,
stateNamesToId,
registeredKVStates,
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);

processSnapshotMetaInfoForAllStates(
metaInfoSnapshots,
cowStateStableSnapshots,
stateNamesToId,
registeredPQStates,
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);

return new HeapSnapshotResources<>(
metaInfoSnapshots,
cowStateStableSnapshots,
streamCompressionDecorator,
stateNamesToId,
keyGroupRange,
keySerializer,
totalKeyGroups);
}

private static void processSnapshotMetaInfoForAllStates(
List<StateMetaInfoSnapshot> metaInfoSnapshots,
Map<StateUID, StateSnapshot> cowStateStableSnapshots,
Map<StateUID, Integer> stateNamesToId,
Map<String, ? extends StateSnapshotRestore> registeredStates,
StateMetaInfoSnapshot.BackendStateType stateType) {

for (Map.Entry<String, ? extends StateSnapshotRestore> kvState :
registeredStates.entrySet()) {
final StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
stateNamesToId.put(stateUid, stateNamesToId.size());
StateSnapshotRestore state = kvState.getValue();
if (null != state) {
final StateSnapshot stateSnapshot = state.stateSnapshot();
metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
cowStateStableSnapshots.put(stateUid, stateSnapshot);
}
}
}

@Override
public void release() {
for (StateSnapshot stateSnapshot : cowStateStableSnapshots.values()) {
stateSnapshot.release();
}
}

public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
return metaInfoSnapshots;
}

@Override
public KeyValueStateIterator createKVStateIterator() throws IOException {
return new HeapKeyValueStateIterator(
keyGroupRange,
keySerializer,
totalKeyGroups,
stateNamesToId,
cowStateStableSnapshots);
}

@Override
public KeyGroupRange getKeyGroupRange() {
return keyGroupRange;
}

@Override
public TypeSerializer<K> getKeySerializer() {
return keySerializer;
}

@Override
public StreamCompressionDecorator getStreamCompressionDecorator() {
return streamCompressionDecorator;
}

public Map<StateUID, StateSnapshot> getCowStateStableSnapshots() {
return cowStateStableSnapshots;
}

public Map<StateUID, Integer> getStateNamesToId() {
return stateNamesToId;
}
}
Loading

0 comments on commit fc995d3

Please sign in to comment.