From 376d7c069de3b87b21db262cca4b72ddc49e3b66 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 20 Jan 2022 13:32:08 -0500 Subject: [PATCH] Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176) Fixed an issue where the provisionerService which can be used to spawn resources as needed is left running on a non-leader coordinator/overlord, after it is removed from leadership. Provisioning should only be done by the leader. To fix the issue, a call to stop the provisionerService was added to the stop() method of HttpRemoteTaskRunner class. The provisionerService was properly closed on other TaskRunner types. --- .../overlord/hrtr/HttpRemoteTaskRunner.java | 3 + .../hrtr/HttpRemoteTaskRunnerTest.java | 143 ++++++++++++++++++ 2 files changed, 146 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 45af2157282d..6acaa207221c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1342,6 +1342,9 @@ public void stop() log.info("Stopping..."); + if (provisioningService != null) { + provisioningService.close(); + } pendingTasksExec.shutdownNow(); workersSyncExec.shutdownNow(); cleanupExec.shutdown(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 5a676113eda4..db52d3a6ffdc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -42,6 +42,8 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.worker.TaskAnnouncement; @@ -78,6 +80,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.easymock.EasyMock.isA; + /** * */ @@ -182,6 +186,145 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", Wo Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size()); } + /* + Simulates startup of Overlord. Overlord is then stopped and is expected to close down certain things. + */ + @Test(timeout = 60_000L) + public void testFreshStartAndStop() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + ProvisioningService provisioningService = EasyMock.createNiceMock(ProvisioningService.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class))) + .andReturn(provisioningService); + provisioningService.close(); + EasyMock.expectLastCall(); + EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig() + { + @Override + public int getPendingTasksRunnerNumThreads() + { + return 3; + } + }, + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + provisioningStrategy, + druidNodeDiscoveryProvider, + EasyMock.createNiceMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null) + ) + { + @Override + protected WorkerHolder createWorkerHolder( + ObjectMapper smileMapper, + HttpClient httpClient, + HttpRemoteTaskRunnerConfig config, + ScheduledExecutorService workersSyncExec, + WorkerHolder.Listener listener, + Worker worker, + List knownAnnouncements + ) + { + return HttpRemoteTaskRunnerTest.createWorkerHolder( + smileMapper, + httpClient, + config, + workersSyncExec, + listener, + worker, + ImmutableList.of(), + ImmutableList.of(), + ImmutableMap.of(), + new AtomicInteger(), + ImmutableSet.of() + ); + } + }; + + taskRunner.start(); + taskRunner.stop(); + EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService); + } + + /* + Simulates startup of Overlord with no provisoner. Overlord is then stopped and is expected to close down certain + things. + */ + @Test(timeout = 60_000L) + public void testFreshStartAndStopNoProvisioner() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class))) + .andReturn(null); + EasyMock.expectLastCall(); + EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig() + { + @Override + public int getPendingTasksRunnerNumThreads() + { + return 3; + } + }, + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + provisioningStrategy, + druidNodeDiscoveryProvider, + EasyMock.createNiceMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null) + ) + { + @Override + protected WorkerHolder createWorkerHolder( + ObjectMapper smileMapper, + HttpClient httpClient, + HttpRemoteTaskRunnerConfig config, + ScheduledExecutorService workersSyncExec, + WorkerHolder.Listener listener, + Worker worker, + List knownAnnouncements + ) + { + return HttpRemoteTaskRunnerTest.createWorkerHolder( + smileMapper, + httpClient, + config, + workersSyncExec, + listener, + worker, + ImmutableList.of(), + ImmutableList.of(), + ImmutableMap.of(), + new AtomicInteger(), + ImmutableSet.of() + ); + } + }; + + taskRunner.start(); + taskRunner.stop(); + EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy); + } + /* Simulates one task not getting acknowledged to be running after assigning it to a worker. But, other tasks are successfully assigned to other worker and get completed.