Skip to content

Commit

Permalink
[FLINK-18957] Use NoResourceAvailableException for bulk timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
azagrebin committed Sep 9, 2020
1 parent 564c2f8 commit 9c979f0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor.DummyComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.util.clock.Clock;

Expand Down Expand Up @@ -62,23 +63,28 @@ public void start(final ComponentMainThreadExecutor mainThreadExecutor) {
}

@Override
public void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk, Time timeout) {
public void schedulePendingRequestBulkTimeoutCheck(final PhysicalSlotRequestBulk bulk, Time timeout) {
PhysicalSlotRequestBulkWithTimestamp bulkWithTimestamp = new PhysicalSlotRequestBulkWithTimestamp(bulk);
bulkWithTimestamp.markUnfulfillable(clock.relativeTimeMillis());
schedulePendingRequestBulkTimeoutCheck(bulkWithTimestamp, timeout);
schedulePendingRequestBulkWithTimestampCheck(bulkWithTimestamp, timeout);
}

private void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulkWithTimestamp bulk, Time timeout) {
private void schedulePendingRequestBulkWithTimestampCheck(
final PhysicalSlotRequestBulkWithTimestamp bulk,
final Time timeout) {
componentMainThreadExecutor.schedule(() -> {
TimeoutCheckResult result = checkPhysicalSlotRequestBulkTimeout(bulk, timeout);

switch (result) {
case PENDING:
//re-schedule the timeout check
schedulePendingRequestBulkTimeoutCheck(bulk, timeout);
schedulePendingRequestBulkWithTimestampCheck(bulk, timeout);
break;
case TIMEOUT:
bulk.cancel(new TimeoutException("Slot request bulk is not fulfillable!"));
Throwable cancellationCause = new NoResourceAvailableException(
"Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout",
new TimeoutException("Timeout has occurred: " + timeout));
bulk.cancel(cancellationCause);
break;
case FULFILLED:
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
Expand All @@ -44,7 +45,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -144,7 +144,7 @@ public void testBulkSlotAllocationTimeoutsIfUnfulfillable() {
exception,
"Slot request bulk is not fulfillable!");
assertThat(cause.isPresent(), is(true));
assertThat(cause.get(), instanceOf(TimeoutException.class));
assertThat(cause.get(), instanceOf(NoResourceAvailableException.class));
}

@Test
Expand Down

0 comments on commit 9c979f0

Please sign in to comment.