Skip to content

Commit

Permalink
Merge pull request apache#1417 from metamx/rtr-fixes
Browse files Browse the repository at this point in the history
fix behaviour of middlemanagers around ZK disconnects , fixes apache#709
  • Loading branch information
xvrl committed Jun 10, 2015
2 parents 9cf8662 + e9afec4 commit 6763e37
Show file tree
Hide file tree
Showing 12 changed files with 369 additions and 59 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ The following configs only apply if the overlord is running in remote mode:
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|

There are additional configs for autoscaling (if it is enabled):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -95,7 +97,8 @@
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
Expand Down Expand Up @@ -135,14 +138,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer

private volatile boolean started = false;

private final ScheduledExecutorService cleanupExec;

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();

public RemoteTaskRunner(
ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config,
IndexerZkConfig indexerZkConfig,
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -153,6 +161,7 @@ public RemoteTaskRunner(
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = cleanupExec;
}

@LifecycleStart
Expand Down Expand Up @@ -239,6 +248,18 @@ public void onFailure(Throwable throwable)
waitingForMonitor.wait();
}
}
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
for (String worker : cf.getChildren().forPath(indexerZkConfig.getStatusPath())) {
if (!zkWorkers.containsKey(worker)
&& cf.checkExists().forPath(JOINER.join(indexerZkConfig.getAnnouncementsPath(), worker)) == null) {
scheduleTasksCleanupForWorker(
worker,
cf.getChildren()
.forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker))
);
}
}

started = true;
}
catch (Exception e) {
Expand Down Expand Up @@ -675,6 +696,12 @@ private ListenableFuture<ZkWorker> addWorker(final Worker worker)
log.info("Worker[%s] reportin' for duty!", worker.getHost());

try {
ScheduledFuture previousCleanup = removedWorkerCleanups.remove(worker.getHost());
if (previousCleanup != null) {
log.info("Cancelling Worker[%s] scheduled task cleanup", worker.getHost());
previousCleanup.cancel(false);
}

final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create();
Expand Down Expand Up @@ -819,22 +846,7 @@ private void removeWorker(final Worker worker)
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {

List<String> tasksToFail = getAssignedTasks(worker);
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}

log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
scheduleTasksCleanupForWorker(worker.getHost(), getAssignedTasks(worker));
}
catch (Exception e) {
throw Throwables.propagate(e);
Expand All @@ -852,6 +864,53 @@ private void removeWorker(final Worker worker)
lazyWorkers.remove(worker.getHost());
}

private void scheduleTasksCleanupForWorker(final String worker, final List<String> tasksToFail)
{
removedWorkerCleanups.put(
worker, cleanupExec.schedule(
new Runnable()
{
@Override
public void run()
{
log.info("Running scheduled cleanup for Worker[%s]", worker);
try {
for (String assignedTask : tasksToFail) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
try {
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}

if (cf.checkExists().forPath(statusPath) != null) {
cf.delete().guaranteed().forPath(statusPath);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}

log.info("Failing task[%s]", assignedTask);
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
}
finally {
removedWorkerCleanups.remove(worker);
}
}
},
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
)
);
}

private void taskComplete(
RemoteTaskRunnerWorkItem taskRunnerWorkItem,
ZkWorker zkWorker,
Expand Down Expand Up @@ -881,7 +940,7 @@ private void taskComplete(
taskRunnerWorkItem.setResult(taskStatus);
}

public List<ZkWorker> markWokersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
Expand Down Expand Up @@ -929,8 +988,14 @@ private List<String> getAssignedTasks(Worker worker) throws Exception
return assignedTasks;
}

// Used for tests
public List<ZkWorker> getLazyWorkers()
{
return ImmutableList.copyOf(lazyWorkers.values());
}

ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups()
{
return removedWorkerCleanups;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
Expand All @@ -28,6 +29,8 @@
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;

import java.util.concurrent.ScheduledExecutorService;

/**
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
Expand All @@ -38,6 +41,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScheduledExecutorService cleanupExec;

@Inject
public RemoteTaskRunnerFactory(
Expand All @@ -46,7 +50,8 @@ public RemoteTaskRunnerFactory(
final IndexerZkConfig zkPaths,
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef
final Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorFactory factory
)
{
this.curator = curator;
Expand All @@ -55,6 +60,7 @@ public RemoteTaskRunnerFactory(
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = factory.create(1,"RemoteTaskRunner-Scheduled-Cleanup--%d");
}

@Override
Expand All @@ -70,7 +76,8 @@ public TaskRunner build()
.withCompressed(true)
.build(),
httpClient,
workerConfigRef
workerConfigRef,
cleanupExec
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public String apply(ZkWorker input)
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps =
Lists.transform(
runner.markWokersLazy(isLazyWorker, excessWorkers),
runner.markWorkersLazy(isLazyWorker, excessWorkers),
new Function<ZkWorker, String>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public class RemoteTaskRunnerConfig
@NotNull
private Period taskAssignmentTimeout = new Period("PT5M");

@JsonProperty
@NotNull
private Period taskCleanupTimeout = new Period("PT15M");

@JsonProperty
private String minWorkerVersion = "0";

Expand All @@ -43,6 +47,11 @@ public Period getTaskAssignmentTimeout()
return taskAssignmentTimeout;
}

@JsonProperty
public Period getTaskCleanupTimeout(){
return taskCleanupTimeout;
}

public String getMinWorkerVersion()
{
return minWorkerVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ public class TaskAnnouncement

public static TaskAnnouncement create(Task task, TaskStatus status)
{
Preconditions.checkArgument(status.getId().equals(task.getId()), "task id == status id");
return new TaskAnnouncement(null, null, status, task.getTaskResource());
return create(task.getId(), task.getTaskResource(), status);
}

public static TaskAnnouncement create(String taskId, TaskResource resource, TaskStatus status)
{
Preconditions.checkArgument(status.getId().equals(taskId), "task id == status id");
return new TaskAnnouncement(null, null, status, resource);
}

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package io.druid.indexing.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
Expand All @@ -34,7 +36,9 @@
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;

/**
* The CuratorCoordinator provides methods to use Curator. Persistent ZK paths are created on {@link #start()}.
Expand Down Expand Up @@ -190,7 +194,7 @@ public void unannounceTask(String taskId)
}
}

public void announceTastAnnouncement(TaskAnnouncement announcement)
public void announceTaskAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
Expand All @@ -206,7 +210,7 @@ public void announceTastAnnouncement(TaskAnnouncement announcement)
}

curatorFramework.create()
.withMode(CreateMode.EPHEMERAL)
.withMode(CreateMode.PERSISTENT)
.forPath(
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
);
Expand All @@ -226,7 +230,7 @@ public void updateAnnouncement(TaskAnnouncement announcement)

try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
announceTastAnnouncement(announcement);
announceTaskAnnouncement(announcement);
return;
}
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
Expand All @@ -247,6 +251,30 @@ public void updateAnnouncement(TaskAnnouncement announcement)
}
}

public List<TaskAnnouncement> getAnnouncements(){
try {
return Lists.transform(
curatorFramework.getChildren().forPath(getStatusPathForWorker()), new Function<String, TaskAnnouncement>()
{
@Nullable
@Override
public TaskAnnouncement apply(String input)
{
try {
return jsonMapper.readValue(curatorFramework.getData().forPath(getStatusPathForId(input)),TaskAnnouncement.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

public void updateWorkerAnnouncement(Worker newWorker) throws Exception
{
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ public WorkerTaskMonitor(
public void start()
{
try {
// cleanup any old running task announcements which are invalid after restart
for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()){
if(announcement.getTaskStatus().isRunnable()) {
workerCuratorCoordinator.updateAnnouncement(
TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId())
)
);
}
}

pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener()
{
Expand Down Expand Up @@ -122,7 +135,7 @@ public void run()
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceTastAnnouncement(
workerCuratorCoordinator.announceTaskAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())
Expand Down
Loading

0 comments on commit 6763e37

Please sign in to comment.