Skip to content

Commit

Permalink
Add timeout to shutdown request to middle manager for indexing service
Browse files Browse the repository at this point in the history
  • Loading branch information
drcrallen committed Oct 27, 2015
1 parent 72c408c commit 44a2b20
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The following configs only apply if the overlord is running in remote mode:
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|
|`druid.indexer.runner.taskShutdownLinkTimeout`|How long to wait on a shutdown request to a middle manager before timing out|PT1M|

There are additional configs for autoscaling (if it is enabled):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.RE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
Expand Down Expand Up @@ -66,12 +67,12 @@
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -110,6 +111,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer

private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final Duration shutdownTimeout;
private final IndexerZkConfig indexerZkConfig;
private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
Expand Down Expand Up @@ -155,6 +157,7 @@ public RemoteTaskRunner(
{
this.jsonMapper = jsonMapper;
this.config = config;
this.shutdownTimeout = config.getTaskShutdownLinkTimeout().toStandardDuration(); // Fail fast
this.indexerZkConfig = indexerZkConfig;
this.cf = cf;
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
Expand Down Expand Up @@ -252,7 +255,8 @@ public void onFailure(Throwable throwable)
List<String> workers;
try {
workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath());
} catch (KeeperException.NoNodeException e) {
}
catch (KeeperException.NoNodeException e) {
// statusPath doesn't exist yet; can occur if no middleManagers have started.
workers = ImmutableList.of();
}
Expand Down Expand Up @@ -386,12 +390,13 @@ public void shutdown(final String taskId)
log.info("Can't shutdown! No worker running task %s", taskId);
return;
}

URL url = null;
try {
final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
final StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, url),
RESPONSE_HANDLER
RESPONSE_HANDLER,
shutdownTimeout
).get();

log.info(
Expand All @@ -405,8 +410,12 @@ public void shutdown(final String taskId)
log.error("Shutdown failed for %s! Are you sure the task was running?", taskId);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RE(e, "Interrupted posting shutdown to [%s] for task [%s]", url, taskId);
}
catch (Exception e) {
throw Throwables.propagate(e);
throw new RE(e, "Error in handling post to [%s] for task [%s]", zkWorker.getWorker().getHost(), taskId);
}
}
}
Expand Down Expand Up @@ -911,7 +920,8 @@ public void run()
catch (Exception e) {
log.makeAlert("Exception while cleaning up worker[%s]", worker).emit();
throw Throwables.propagate(e);
} finally {
}
finally {
removedWorkerCleanups.remove(worker);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class RemoteTaskRunnerConfig
@Min(10 * 1024)
private long maxZnodeBytes = 512 * 1024;

@JsonProperty
private Period taskShutdownLinkTimeout = new Period("PT1M");

public Period getTaskAssignmentTimeout()
{
return taskAssignmentTimeout;
Expand All @@ -61,4 +64,9 @@ public long getMaxZnodeBytes()
{
return maxZnodeBytes;
}

public Period getTaskShutdownLinkTimeout()
{
return taskShutdownLinkTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public long getMaxZnodeBytes()
return 10 * 1024;
}

@Override
public Period getTaskShutdownLinkTimeout()
{
return timeout;
}

@Override
public String getMinWorkerVersion()
{
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>1.0.2</version>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
Expand Down

0 comments on commit 44a2b20

Please sign in to comment.