Skip to content

Commit

Permalink
Fix RemoteTaskRunner's auto-scaling (apache#3768)
Browse files Browse the repository at this point in the history
* Rename ResourceManagementStrategy to ProvisioningStrategy, similarly for related classes. Make ProvisioningService non-global, created per RemoteTaskRunner instead. Add OverlordBlinkLeadershipTest.

* Fix RemoteTaskRunnerFactoryTest.testExecNotSharedBetweenRunners()

* Small fix

* Make SimpleProvisioner and PendingProvisioner more similar in details

* Fix executor name

* Style fixes

* Use LifecycleLock in RemoteTaskRunner
  • Loading branch information
leventov authored and jihoonson committed Jul 14, 2017
1 parent c5c17bb commit b720351
Show file tree
Hide file tree
Showing 25 changed files with 778 additions and 586 deletions.
6 changes: 4 additions & 2 deletions common/src/main/java/io/druid/concurrent/LifecycleLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public void exitStart()

/**
* Awaits until {@link #exitStart()} is called, if needed, and returns {@code true} if {@link #started()} was called
* before that.
* before that. Returns {@code false} if {@link #started()} is not called before {@link #exitStart()}, or if {@link
* #canStop()} is already called on this LifecycleLock.
*/
public boolean awaitStarted()
{
Expand All @@ -222,7 +223,8 @@ public boolean awaitStarted()
/**
* Awaits until {@link #exitStart()} is called for at most the specified timeout, and returns {@code true} if {@link
* #started()} was called before that. Returns {@code false} if {@code started()} wasn't called before {@code
* exitStart()}, or if {@code exitStart()} isn't called on this LifecycleLock until the specified timeout expires.
* exitStart()}, or if {@code exitStart()} isn't called on this LifecycleLock until the specified timeout expires, or
* if {@link #canStop()} is already called on this LifecycleLock.
*/
public boolean awaitStarted(long timeout, TimeUnit unit)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.concurrent.LifecycleLock;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ProvisioningService;
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
Expand Down Expand Up @@ -172,12 +174,13 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer

private final Object statusLock = new Object();

private volatile boolean started = false;
private final LifecycleLock lifecycleLock = new LifecycleLock();

private final ListeningScheduledExecutorService cleanupExec;

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<WorkerTaskRunner> resourceManagement;
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
private ProvisioningService provisioningService;

public RemoteTaskRunner(
ObjectMapper jsonMapper,
Expand All @@ -188,7 +191,7 @@ public RemoteTaskRunner(
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<WorkerTaskRunner> resourceManagement
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -205,7 +208,7 @@ public RemoteTaskRunner(
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = MoreExecutors.listeningDecorator(cleanupExec);
this.resourceManagement = resourceManagement;
this.provisioningStrategy = provisioningStrategy;
this.runPendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
"rtr-pending-tasks-runner-%d"
Expand All @@ -216,11 +219,10 @@ public RemoteTaskRunner(
@LifecycleStart
public void start()
{
if (!lifecycleLock.canStart()) {
return;
}
try {
if (started) {
return;
}

final MutableInt waitingFor = new MutableInt(1);
final Object waitingForMonitor = new Object();

Expand Down Expand Up @@ -327,25 +329,26 @@ public void onFailure(Throwable throwable)
}
}
scheduleBlackListedNodesCleanUp();
resourceManagement.startManagement(this);
started = true;
provisioningService = provisioningStrategy.makeProvisioningService(this);
lifecycleLock.started();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycleLock.exitStart();
}
}

@Override
@LifecycleStop
public void stop()
{
if (!lifecycleLock.canStop()) {
return;
}
try {
if (!started) {
return;
}
started = false;

resourceManagement.stopManagement();
provisioningService.close();

Closer closer = Closer.create();
for (ZkWorker zkWorker : zkWorkers.values()) {
Expand Down Expand Up @@ -452,7 +455,7 @@ public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
@Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.fromNullable(resourceManagement.getStats());
return Optional.fromNullable(provisioningService.getStats());
}

public ZkWorker findWorkerRunningTask(String taskId)
Expand Down Expand Up @@ -510,8 +513,8 @@ public ListenableFuture<TaskStatus> run(final Task task)
@Override
public void shutdown(final String taskId)
{
if (!started) {
log.info("This TaskRunner is stopped. Ignoring shutdown command for task: %s", taskId);
if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId);
} else if (pendingTasks.remove(taskId) != null) {
pendingTaskPayloads.remove(taskId);
log.info("Removed task from pending queue: %s", taskId);
Expand Down Expand Up @@ -693,7 +696,7 @@ public int compare(RemoteTaskRunnerWorkItem o1, RemoteTaskRunnerWorkItem o2)
*/
private void cleanup(final String taskId)
{
if (!started) {
if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {
return;
}
final RemoteTaskRunnerWorkItem removed = completeTasks.remove(taskId);
Expand Down Expand Up @@ -1259,7 +1262,7 @@ public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyW
throw Throwables.propagate(e);
}
}
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
return getWorkerFromZK(lazyWorkers.values());
}
}

Expand Down Expand Up @@ -1288,7 +1291,7 @@ protected List<String> getAssignedTasks(Worker worker) throws Exception
@Override
public Collection<Worker> getLazyWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
return getWorkerFromZK(lazyWorkers.values());
}

private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers)
Expand All @@ -1308,18 +1311,20 @@ public ImmutableWorkerInfo apply(ZkWorker input)
);
}

public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
private static ImmutableList<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
@Override
public Worker apply(ZkWorker input)
return ImmutableList.copyOf(
Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
return input.getWorker();
@Override
public Worker apply(ZkWorker input)
{
return input.getWorker();
}
}
}
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
import io.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import io.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import io.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
Expand All @@ -45,8 +45,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ResourceManagementStrategy resourceManagementStrategy;
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final ProvisioningStrategy provisioningStrategy;
private final ScheduledExecutorFactory factory;

@Inject
Expand All @@ -58,8 +58,8 @@ public RemoteTaskRunnerFactory(
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ScheduledExecutorFactory factory,
final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
final ResourceManagementStrategy resourceManagementStrategy
final ProvisioningSchedulerConfig provisioningSchedulerConfig,
final ProvisioningStrategy provisioningStrategy
)
{
this.curator = curator;
Expand All @@ -68,8 +68,8 @@ public RemoteTaskRunnerFactory(
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig;
this.resourceManagementStrategy = resourceManagementStrategy;
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.provisioningStrategy = provisioningStrategy;
this.factory = factory;
}

Expand All @@ -85,9 +85,9 @@ public RemoteTaskRunner build()
httpClient,
workerConfigRef,
factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"),
resourceManagementSchedulerConfig.isDoAutoscale()
? resourceManagementStrategy
: new NoopResourceManagementStrategy<>()
provisioningSchedulerConfig.isDoAutoscale()
? provisioningStrategy
: new NoopProvisioningStrategy<>()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.overlord.autoscaling;

import com.google.common.base.Supplier;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.java.util.common.granularity.PeriodGranularity;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
*/
public abstract class AbstractWorkerProvisioningStrategy implements ProvisioningStrategy<WorkerTaskRunner>
{
private static final EmittingLogger log = new EmittingLogger(AbstractWorkerProvisioningStrategy.class);

private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final Supplier<ScheduledExecutorService> execFactory;

AbstractWorkerProvisioningStrategy(
ProvisioningSchedulerConfig provisioningSchedulerConfig,
Supplier<ScheduledExecutorService> execFactory
)
{
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.execFactory = execFactory;
}

@Override
public ProvisioningService makeProvisioningService(WorkerTaskRunner runner)
{
return new WorkerProvisioningService(makeProvisioner(runner));
}

final class WorkerProvisioningService implements ProvisioningService
{
private final ScheduledExecutorService exec = execFactory.get();
private final Provisioner provisioner;

WorkerProvisioningService(final Provisioner provisioner)
{
log.info("Started Resource Management Scheduler");
this.provisioner = provisioner;

long rate = provisioningSchedulerConfig.getProvisionPeriod().toStandardDuration().getMillis();
exec.scheduleAtFixedRate(
new Runnable()
{
@Override
public void run()
{
try {
provisioner.doProvision();
}
catch (Exception e) {
log.error(e, "Uncaught exception.");
}
}
},
rate,
rate,
TimeUnit.MILLISECONDS
);

// Schedule termination of worker nodes periodically
Period period = provisioningSchedulerConfig.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(
period,
provisioningSchedulerConfig.getOriginTime(),
null
);
final long startTime = granularity.bucketEnd(new DateTime()).getMillis();

exec.scheduleAtFixedRate(
new Runnable()
{
@Override
public void run()
{
try {
provisioner.doTerminate();
}
catch (Exception e) {
log.error(e, "Uncaught exception.");
}
}
},
new Duration(System.currentTimeMillis(), startTime).getMillis(),
provisioningSchedulerConfig.getTerminatePeriod().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}

@Override
public ScalingStats getStats()
{
return provisioner.getStats();
}

@Override
public void close()
{
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
}
}

protected abstract Provisioner makeProvisioner(WorkerTaskRunner runner);
}
Loading

0 comments on commit b720351

Please sign in to comment.