Skip to content

Commit

Permalink
Close provisioner during HttpRemotetaskRunner LifecycleStop (apache#1…
Browse files Browse the repository at this point in the history
…2176)

Fixed an issue where the provisionerService which can be used to spawn resources as needed is left running on a non-leader coordinator/overlord, after it is removed from leadership. Provisioning should only be done by the leader. To fix the issue, a call to stop the provisionerService was added to the stop() method of HttpRemoteTaskRunner class. The provisionerService was properly closed on other TaskRunner types.
  • Loading branch information
zachjsh authored Jan 20, 2022
1 parent 6ce14e6 commit 376d7c0
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,9 @@ public void stop()

log.info("Stopping...");

if (provisioningService != null) {
provisioningService.close();
}
pendingTasksExec.shutdownNow();
workersSyncExec.shutdownNow();
cleanupExec.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
Expand Down Expand Up @@ -78,6 +80,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.easymock.EasyMock.isA;

/**
*
*/
Expand Down Expand Up @@ -182,6 +186,145 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", Wo
Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
}

/*
Simulates startup of Overlord. Overlord is then stopped and is expected to close down certain things.
*/
@Test(timeout = 60_000L)
public void testFreshStartAndStop()
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);

DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
ProvisioningService provisioningService = EasyMock.createNiceMock(ProvisioningService.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
.andReturn(provisioningService);
provisioningService.close();
EasyMock.expectLastCall();
EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);

HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
return 3;
}
},
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
provisioningStrategy,
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return HttpRemoteTaskRunnerTest.createWorkerHolder(
smileMapper,
httpClient,
config,
workersSyncExec,
listener,
worker,
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(),
new AtomicInteger(),
ImmutableSet.of()
);
}
};

taskRunner.start();
taskRunner.stop();
EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);
}

/*
Simulates startup of Overlord with no provisoner. Overlord is then stopped and is expected to close down certain
things.
*/
@Test(timeout = 60_000L)
public void testFreshStartAndStopNoProvisioner()
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);

DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
.andReturn(null);
EasyMock.expectLastCall();
EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy);

HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
return 3;
}
},
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
provisioningStrategy,
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return HttpRemoteTaskRunnerTest.createWorkerHolder(
smileMapper,
httpClient,
config,
workersSyncExec,
listener,
worker,
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(),
new AtomicInteger(),
ImmutableSet.of()
);
}
};

taskRunner.start();
taskRunner.stop();
EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy);
}

/*
Simulates one task not getting acknowledged to be running after assigning it to a worker. But, other tasks are
successfully assigned to other worker and get completed.
Expand Down

0 comments on commit 376d7c0

Please sign in to comment.