Skip to content

Commit

Permalink
[FLINK-18355][tests] Remove SchedulerImpl in SlotPoolInteractionsTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing authored and zhuzhurk committed Aug 13, 2020
1 parent 52b8496 commit 3636695
Showing 1 changed file with 9 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
Expand All @@ -50,7 +47,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecution;
import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -87,13 +83,10 @@ public void testSlotAllocationNoResourceManager() throws Exception {
)) {

pool.start(JobMasterId.generate(), "foobar", testMainThreadExecutor.getMainThreadExecutor());
Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), pool);
scheduler.start(testMainThreadExecutor.getMainThreadExecutor());

CompletableFuture<LogicalSlot> future = testMainThreadExecutor.execute(() -> scheduler.allocateSlot(
final CompletableFuture<PhysicalSlot> future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot(
new SlotRequestId(),
new ScheduledUnit(getExecution()),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
ResourceProfile.UNKNOWN,
fastTimeout));

try {
Expand All @@ -114,14 +107,11 @@ public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
final CompletableFuture<SlotRequestId> timeoutFuture = new CompletableFuture<>();
pool.setTimeoutPendingSlotRequestConsumer(timeoutFuture::complete);
pool.start(JobMasterId.generate(), "foobar", testMainThreadExecutor.getMainThreadExecutor());
Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), pool);
scheduler.start(testMainThreadExecutor.getMainThreadExecutor());

SlotRequestId requestId = new SlotRequestId();
CompletableFuture<LogicalSlot> future = testMainThreadExecutor.execute(() -> scheduler.allocateSlot(
final CompletableFuture<PhysicalSlot> future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot(
requestId,
new ScheduledUnit(getExecution()),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
ResourceProfile.UNKNOWN,
fastTimeout));

try {
Expand Down Expand Up @@ -165,14 +155,10 @@ public void testSlotAllocationTimeout() throws Exception {
ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
pool.connectToResourceManager(resourceManagerGateway);

Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), pool);
scheduler.start(testMainThreadExecutor.getMainThreadExecutor());

SlotRequestId requestId = new SlotRequestId();
CompletableFuture<LogicalSlot> future = testMainThreadExecutor.execute(() -> scheduler.allocateSlot(
final CompletableFuture<PhysicalSlot> future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot(
requestId,
new DummyScheduledUnit(),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
ResourceProfile.UNKNOWN,
fastTimeout));

try {
Expand Down Expand Up @@ -200,9 +186,6 @@ public void testExtraSlotsAreKept() throws Exception {

pool.start(JobMasterId.generate(), "foobar", testMainThreadExecutor.getMainThreadExecutor());

Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), pool);
scheduler.start(testMainThreadExecutor.getMainThreadExecutor());

final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();

TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
Expand All @@ -215,10 +198,9 @@ public void testExtraSlotsAreKept() throws Exception {
pool.connectToResourceManager(resourceManagerGateway);

SlotRequestId requestId = new SlotRequestId();
CompletableFuture<LogicalSlot> future = testMainThreadExecutor.execute(() -> scheduler.allocateSlot(
final CompletableFuture<PhysicalSlot> future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot(
requestId,
new ScheduledUnit(getExecution()),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
ResourceProfile.UNKNOWN,
fastTimeout));

try {
Expand Down

0 comments on commit 3636695

Please sign in to comment.