Skip to content

Commit

Permalink
[refactor] Make CheckpointStateOutputStream a top level class
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Jan 19, 2022
1 parent 064b8fd commit 72b0e9f
Show file tree
Hide file tree
Showing 50 changed files with 227 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

Expand All @@ -39,9 +39,8 @@
import static org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.write;

/**
* {@link ChannelStateWriter} implemented using {@link
* CheckpointStreamFactory.CheckpointStateOutputStream CheckpointStateOutputStreams}. Internally, it
* has by default
* {@link ChannelStateWriter} implemented using {@link CheckpointStateOutputStream
* CheckpointStateOutputStreams}. Internally, it has by default
*
* <ul>
* <li>one stream per checkpoint; having multiple streams would mean more files written and more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
/**
* An output stream for checkpoint metadata.
*
* <p>This stream is similar to the {@link CheckpointStreamFactory.CheckpointStateOutputStream}, but
* for metadata files rather thancdata files.
* <p>This stream is similar to the {@link CheckpointStateOutputStream}, but for metadata files
* rather thancdata files.
*
* <p>This stream always creates a file, regardless of the amount of data written.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataOutputStream;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.OutputStream;

/**
* A dedicated output stream that produces a {@link StreamStateHandle} when closed.
*
* <p><b>Important:</b> When closing this stream after the successful case, you must call {@link
* #closeAndGetHandle()} - only that method will actually retain the resource written to. The method
* has the semantics of "close on success". The {@link #close()} method is supposed to remove the
* target resource if called before {@link #closeAndGetHandle()}, hence having the semantics of
* "close on failure". That way, simple try-with-resources statements automatically clean up
* unsuccessful partial state resources in case the writing does not complete.
*
* <p>Note: This is an abstract class and not an interface because {@link OutputStream} is an
* abstract class.
*/
@Internal
public abstract class CheckpointStateOutputStream extends FSDataOutputStream {
/**
* Closes the stream and gets a state handle that can create an input stream producing the data
* written to this stream.
*
* <p>This closing must be called (also when the caller is not interested in the handle) to
* successfully close the stream and retain the produced resource. In contrast, the {@link
* #close()} method removes the target resource when called.
*
* @return A state handle that can create an input stream producing the data written to this
* stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
@Nullable
public abstract StreamStateHandle closeAndGetHandle() throws IOException;

/**
* This method should close the stream, if has not been closed before. If this method actually
* closes the stream, it should delete/release the resource behind the stream, such as the file
* that the stream writes to.
*
* <p>The above implies that this method is intended to be the "unsuccessful close", such as
* when cancelling the stream writing, or when an exception occurs. Closing the stream for the
* successful case must go through {@link #closeAndGetHandle()}.
*
* @throws IOException Thrown, if the stream cannot be closed.
*/
@Override
public abstract void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,5 @@ CheckpointStreamFactory resolveCheckpointStorageLocation(
* @return A checkpoint state stream to the location for state owned by tasks.
* @throws IOException Thrown, if the stream cannot be opened.
*/
CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream()
throws IOException;
CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@

package org.apache.flink.runtime.state;

import org.apache.flink.core.fs.FSDataOutputStream;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.OutputStream;

/**
* A factory for checkpoint output streams, which are used to persist data for checkpoints.
Expand All @@ -44,49 +39,4 @@ public interface CheckpointStreamFactory {
*/
CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
throws IOException;

/**
* A dedicated output stream that produces a {@link StreamStateHandle} when closed.
*
* <p><b>Important:</b> When closing this stream after the successful case, you must call {@link
* #closeAndGetHandle()} - only that method will actually retain the resource written to. The
* method has the semantics of "close on success". The {@link #close()} method is supposed to
* remove the target resource if called before {@link #closeAndGetHandle()}, hence having the
* semantics of "close on failure". That way, simple try-with-resources statements automatically
* clean up unsuccessful partial state resources in case the writing does not complete.
*
* <p>Note: This is an abstract class and not an interface because {@link OutputStream} is an
* abstract class.
*/
abstract class CheckpointStateOutputStream extends FSDataOutputStream {

/**
* Closes the stream and gets a state handle that can create an input stream producing the
* data written to this stream.
*
* <p>This closing must be called (also when the caller is not interested in the handle) to
* successfully close the stream and retain the produced resource. In contrast, the {@link
* #close()} method removes the target resource when called.
*
* @return A state handle that can create an input stream producing the data written to this
* stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
@Nullable
public abstract StreamStateHandle closeAndGetHandle() throws IOException;

/**
* This method should close the stream, if has not been closed before. If this method
* actually closes the stream, it should delete/release the resource behind the stream, such
* as the file that the stream writes to.
*
* <p>The above implies that this method is intended to be the "unsuccessful close", such as
* when cancelling the stream writing, or when an exception occurs. Closing the stream for
* the successful case must go through {@link #closeAndGetHandle()}.
*
* @throws IOException Thrown, if the stream cannot be closed.
*/
@Override
public abstract void close() throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface CheckpointStreamWithResultProvider extends Closeable {

/** Returns the encapsulated output stream. */
@Nonnull
CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();
CheckpointStateOutputStream getCheckpointOutputStream();

@Override
default void close() throws IOException {
Expand All @@ -61,10 +61,9 @@ default void close() throws IOException {
*/
class PrimaryStreamOnly implements CheckpointStreamWithResultProvider {

@Nonnull private final CheckpointStreamFactory.CheckpointStateOutputStream outputStream;
@Nonnull private final CheckpointStateOutputStream outputStream;

public PrimaryStreamOnly(
@Nonnull CheckpointStreamFactory.CheckpointStateOutputStream outputStream) {
public PrimaryStreamOnly(@Nonnull CheckpointStateOutputStream outputStream) {
this.outputStream = outputStream;
}

Expand All @@ -77,7 +76,7 @@ public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult(

@Nonnull
@Override
public CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream() {
public CheckpointStateOutputStream getCheckpointOutputStream() {
return outputStream;
}
}
Expand All @@ -93,8 +92,8 @@ class PrimaryAndSecondaryStream implements CheckpointStreamWithResultProvider {
@Nonnull private final DuplicatingCheckpointOutputStream outputStream;

public PrimaryAndSecondaryStream(
@Nonnull CheckpointStreamFactory.CheckpointStateOutputStream primaryOut,
CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut)
@Nonnull CheckpointStateOutputStream primaryOut,
CheckpointStateOutputStream secondaryOut)
throws IOException {
this(new DuplicatingCheckpointOutputStream(primaryOut, secondaryOut));
}
Expand Down Expand Up @@ -154,7 +153,7 @@ static CheckpointStreamWithResultProvider createSimpleStream(
@Nonnull CheckpointStreamFactory primaryStreamFactory)
throws IOException {

CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);

return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
Expand All @@ -168,7 +167,7 @@ static CheckpointStreamWithResultProvider createDuplicatingStream(
@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
throws IOException {

CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);

try {
Expand All @@ -179,7 +178,7 @@ static CheckpointStreamWithResultProvider createDuplicatingStream(
String.valueOf(UUID.randomUUID()));
Path outPath = new Path(outFile.toURI());

CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
CheckpointStateOutputStream secondaryOut =
new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);

return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(
}

return (snapshotCloseableRegistry) -> {
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(
CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
* user. This class is used to write state for local recovery as a local (secondary) copy of the
* (primary) state snapshot that is written to a (slower but highly-available) remote filesystem.
*/
public class DuplicatingCheckpointOutputStream
extends CheckpointStreamFactory.CheckpointStateOutputStream {
public class DuplicatingCheckpointOutputStream extends CheckpointStateOutputStream {

/** Default buffer size of 8KB. */
private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
Expand All @@ -48,13 +47,13 @@ public class DuplicatingCheckpointOutputStream
private int bufferIdx;

/** Primary stream for writing the checkpoint data. Failures from this stream are forwarded. */
private final CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream;
private final CheckpointStateOutputStream primaryOutputStream;

/**
* Primary stream for writing the checkpoint data. Failures from this stream are not forwarded
* until {@link #closeAndGetSecondaryHandle()}.
*/
private final CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream;
private final CheckpointStateOutputStream secondaryOutputStream;

/**
* Stores a potential exception that occurred while interacting with {@link
Expand All @@ -63,15 +62,15 @@ public class DuplicatingCheckpointOutputStream
private Exception secondaryStreamException;

public DuplicatingCheckpointOutputStream(
CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream,
CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream)
CheckpointStateOutputStream primaryOutputStream,
CheckpointStateOutputStream secondaryOutputStream)
throws IOException {
this(primaryOutputStream, secondaryOutputStream, DEFAULT_BUFFER_SIZER);
}

public DuplicatingCheckpointOutputStream(
CheckpointStreamFactory.CheckpointStateOutputStream primaryOutputStream,
CheckpointStreamFactory.CheckpointStateOutputStream secondaryOutputStream,
CheckpointStateOutputStream primaryOutputStream,
CheckpointStateOutputStream secondaryOutputStream,
int bufferSize)
throws IOException {

Expand Down Expand Up @@ -280,12 +279,12 @@ public Exception getSecondaryStreamException() {
}

@VisibleForTesting
CheckpointStreamFactory.CheckpointStateOutputStream getPrimaryOutputStream() {
CheckpointStateOutputStream getPrimaryOutputStream() {
return primaryOutputStream;
}

@VisibleForTesting
CheckpointStreamFactory.CheckpointStateOutputStream getSecondaryOutputStream() {
CheckpointStateOutputStream getSecondaryOutputStream() {
return secondaryOutputStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void writeKVStateData(
byte[] previousValue = null;
DataOutputView kgOutView = null;
OutputStream kgOutStream = null;
CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
CheckpointStateOutputStream checkpointOutputStream =
checkpointStreamWithResultProvider.getCheckpointOutputStream();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public final class KeyedStateCheckpointOutputStream
private final KeyGroupRangeOffsets keyGroupRangeOffsets;

public KeyedStateCheckpointOutputStream(
CheckpointStreamFactory.CheckpointStateOutputStream delegate,
KeyGroupRange keyGroupRange) {
CheckpointStateOutputStream delegate, KeyGroupRange keyGroupRange) {

super(delegate);
Preconditions.checkNotNull(keyGroupRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@
public abstract class NonClosingCheckpointOutputStream<T extends StreamStateHandle>
extends OutputStream {

protected final CheckpointStreamFactory.CheckpointStateOutputStream delegate;
protected final CheckpointStateOutputStream delegate;
private final ResourceGuard resourceGuard = new ResourceGuard();

public NonClosingCheckpointOutputStream(
CheckpointStreamFactory.CheckpointStateOutputStream delegate) {
public NonClosingCheckpointOutputStream(CheckpointStateOutputStream delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}

Expand Down Expand Up @@ -79,7 +78,7 @@ public final ResourceGuard.Lease acquireLease() throws IOException {
}

/** This method should not be public so as to not expose internals to user code. */
CheckpointStreamFactory.CheckpointStateOutputStream getDelegate() {
CheckpointStateOutputStream getDelegate() {
return delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public final class OperatorStateCheckpointOutputStream
private LongArrayList partitionOffsets;
private final long initialPosition;

public OperatorStateCheckpointOutputStream(
CheckpointStreamFactory.CheckpointStateOutputStream delegate) throws IOException {
public OperatorStateCheckpointOutputStream(CheckpointStateOutputStream delegate)
throws IOException {

super(delegate);
this.partitionOffsets = new LongArrayList(16);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
static CheckpointStreamWithResultProvider createSimpleStream(
@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {

CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(
CheckpointedStateScope.EXCLUSIVE);

Expand Down
Loading

0 comments on commit 72b0e9f

Please sign in to comment.