Skip to content

Commit

Permalink
[FLINK-19958] Add IOException to all I/O related Sink API signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
guoweiM authored and aljoscha committed Nov 5, 2020
1 parent 513c806 commit 79b1632
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.annotation.Experimental;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -35,7 +36,7 @@ public interface Committer<CommT> extends AutoCloseable {
* Commit the given list of {@link CommT}.
* @param committables A list of information needed to commit data staged by the sink.
* @return A list of {@link CommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
* @throws Exception if the commit operation fail and do not want to retry any more.
* @throws IOException if the commit operation fail and do not want to retry any more.
*/
List<CommT> commit(List<CommT> committables) throws Exception;
List<CommT> commit(List<CommT> committables) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.annotation.Experimental;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -39,27 +40,38 @@ public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {
* Find out which global committables need to be retried when recovering from the failure.
* @param globalCommittables A list of {@link GlobalCommT} for which we want to verify
* which ones were successfully committed and which ones did not.
*
* @return A list of {@link GlobalCommT} that should be committed again.
*
* @throws IOException if fail to filter the recovered committables.
*/
List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables);
List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables) throws IOException;

/**
* Compute an aggregated committable from a list of committables.
* @param committables A list of {@link CommT} to be combined into a {@link GlobalCommT}.
*
* @return an aggregated committable
*
* @throws IOException if fail to combine the given committables.
*/
GlobalCommT combine(List<CommT> committables);
GlobalCommT combine(List<CommT> committables) throws IOException;

/**
* Commit the given list of {@link GlobalCommT}.
*
* @param globalCommittables a list of {@link GlobalCommT}.
*
* @return A list of {@link GlobalCommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
* @throws Exception if the commit operation fail and do not want to retry any more.
*
* @throws IOException if the commit operation fail and do not want to retry any more.
*/
List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws Exception;
List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException;

/**
* Signals that there is no committable any more.
*
* @throws IOException if fail to handle this notification.
*/
void endOfInput();
void endOfInput() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand All @@ -46,21 +47,35 @@ public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializ

/**
* Create a {@link SinkWriter}.
*
* @param context the runtime context.
* @param states the writer's state.
*
* @return A sink writer.
*
* @throws IOException if fail to create a writer.
*/
SinkWriter<InputT, CommT, WriterStateT> createWriter(InitContext context, List<WriterStateT> states);
SinkWriter<InputT, CommT, WriterStateT> createWriter(
InitContext context,
List<WriterStateT> states) throws IOException;

/**
* Creates a {@link Committer}.
*
* @return A committer.
*
* @throws IOException if fail to create a committer.
*/
Optional<Committer<CommT>> createCommitter();
Optional<Committer<CommT>> createCommitter() throws IOException;

/**
* Creates a {@link GlobalCommitter}.
*
* @return A global committer.
*
* @throws IOException if fail to create a global committer.
*/
Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter();
Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;

/**
* Returns the serializer of the committable type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.annotation.Experimental;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -37,10 +38,13 @@ public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

/**
* Add an element to the writer.
*
* @param element The input record
* @param context The additional information about the input record
*
* @throws IOException if fail to add an element.
*/
void write(InputT element, Context context);
void write(InputT element, Context context) throws IOException;

/**
* Prepare for a commit.
Expand All @@ -49,13 +53,17 @@ public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {
*
* @param flush Whether flushing the un-staged data or not
* @return The data is ready to commit.
*
* @throws IOException if fail to prepare for a commit.
*/
List<CommT> prepareCommit(boolean flush);
List<CommT> prepareCommit(boolean flush) throws IOException;

/**
* @return The writer's state.
*
* @throws IOException if fail to snapshot writer's state.
*/
List<WriterStateT> snapshotState();
List<WriterStateT> snapshotState() throws IOException;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -76,7 +77,7 @@ abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends Abstrac
*
* @param committables A list of committables
*/
abstract void recoveredCommittables(List<CommT> committables);
abstract void recoveredCommittables(List<CommT> committables) throws IOException;

/**
* Prepares a commit.
Expand All @@ -85,7 +86,7 @@ abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends Abstrac
*
* @return A list of committables that could be committed in the following checkpoint complete.
*/
abstract List<CommT> prepareCommit(List<InputT> input);
abstract List<CommT> prepareCommit(List<InputT> input) throws IOException;

/**
* Commits a list of committables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -46,11 +49,15 @@ public BatchCommitterOperatorFactory(Sink<?, CommT, ?, ?> sink) {
@SuppressWarnings("unchecked")
public <T extends StreamOperator<CommT>> T createStreamOperator(
StreamOperatorParameters<CommT> parameters) {
final BatchCommitterOperator<CommT> committerOperator =
new BatchCommitterOperator<>(
sink.createCommitter().orElseThrow(
() -> new IllegalStateException(
"Could not create committer from the sink")));
final BatchCommitterOperator<CommT> committerOperator;
try {
committerOperator = new BatchCommitterOperator<>(
sink.createCommitter().orElseThrow(
() -> new IllegalStateException(
"Could not create committer from the sink")));
} catch (IOException e) {
throw new FlinkRuntimeException("Could not create the Committer.", e);
}
committerOperator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -46,11 +49,15 @@ public BatchGlobalCommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink)
@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<GlobalCommT>> T createStreamOperator(StreamOperatorParameters<GlobalCommT> parameters) {
final BatchGlobalCommitterOperator<CommT, GlobalCommT> batchGlobalCommitterOperator =
new BatchGlobalCommitterOperator<>(
sink.createGlobalCommitter().orElseThrow(
() -> new IllegalStateException(
"Could not create global committer from the sink")));
final BatchGlobalCommitterOperator<CommT, GlobalCommT> batchGlobalCommitterOperator;
try {
batchGlobalCommitterOperator = new BatchGlobalCommitterOperator<>(
sink.createGlobalCommitter().orElseThrow(
() -> new IllegalStateException(
"Could not create global committer from the sink")));
} catch (IOException e) {
throw new FlinkRuntimeException("Could not create the GlobalCommitter.", e);
}

batchGlobalCommitterOperator.setup(
parameters.getContainingTask(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;

import java.io.IOException;
import java.util.Collections;

/**
Expand All @@ -42,7 +43,7 @@ final class StatelessSinkWriterOperator<InputT, CommT> extends AbstractSinkWrite
}

@Override
SinkWriter<InputT, CommT, ?> createWriter() {
SinkWriter<InputT, CommT, ?> createWriter() throws IOException {
return sink.createWriter(createInitContext(), Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -41,13 +44,17 @@ public StreamingCommitterOperatorFactory(Sink<?, CommT, ?, ?> sink) {

@Override
AbstractStreamingCommitterOperator<CommT, CommT> createStreamingCommitterOperator() {
return new StreamingCommitterOperator<>(
sink.createCommitter()
.orElseThrow(() -> new IllegalStateException(
"Could not create committer from the sink")),
sink.getCommittableSerializer()
.orElseThrow(() -> new IllegalStateException(
"Could not get committable serializer from the sink")));
try {
return new StreamingCommitterOperator<>(
sink.createCommitter()
.orElseThrow(() -> new IllegalStateException(
"Could not create committer from the sink")),
sink.getCommittableSerializer()
.orElseThrow(() -> new IllegalStateException(
"Could not get committable serializer from the sink")));
} catch (IOException e) {
throw new FlinkRuntimeException("Could not create the Committer.", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.operators.BoundedOneInput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -58,14 +59,14 @@ public final class StreamingGlobalCommitterOperator<CommT, GlobalCommT> extends
}

@Override
void recoveredCommittables(List<GlobalCommT> committables) {
void recoveredCommittables(List<GlobalCommT> committables) throws IOException {
final List<GlobalCommT> recovered = globalCommitter.
filterRecoveredCommittables(checkNotNull(committables));
recoveredGlobalCommittables.addAll(recovered);
}

@Override
List<GlobalCommT> prepareCommit(List<CommT> input) {
List<GlobalCommT> prepareCommit(List<CommT> input) throws IOException {
checkNotNull(input);
final List<GlobalCommT> result =
new ArrayList<>(recoveredGlobalCommittables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -40,13 +43,17 @@ public StreamingGlobalCommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> si

@Override
AbstractStreamingCommitterOperator<CommT, GlobalCommT> createStreamingCommitterOperator() {
return new StreamingGlobalCommitterOperator<>(
sink.createGlobalCommitter()
.orElseThrow(() -> new IllegalStateException(
"Could not create global committer from the sink")),
sink.getGlobalCommittableSerializer()
.orElseThrow(() -> new IllegalStateException(
"Could not create global committable serializer from the sink")));
try {
return new StreamingGlobalCommitterOperator<>(
sink.createGlobalCommitter()
.orElseThrow(() -> new IllegalStateException(
"Could not create global committer from the sink")),
sink.getGlobalCommittableSerializer()
.orElseThrow(() -> new IllegalStateException(
"Could not create global committable serializer from the sink")));
} catch (IOException e) {
throw new FlinkRuntimeException("Could not create the GlobalCommitter.", e);
}
}

@Override
Expand Down
Loading

0 comments on commit 79b1632

Please sign in to comment.