Skip to content

Commit

Permalink
Merge pull request apache#2249 from metamx/workerExpanded
Browse files Browse the repository at this point in the history
Use Worker instead of ZkWorker whenever possible
  • Loading branch information
nishantmonu51 committed Feb 24, 2016
2 parents 6c9e1a2 + ac13a59 commit fb7eae3
Show file tree
Hide file tree
Showing 22 changed files with 715 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,12 @@ public Optional<ScalingStats> getScalingStats()
return Optional.absent();
}

@Override
public void start()
{
// No state setup required
}

@Override
public Optional<ByteSource> streamTaskLog(final String taskid, final long offset)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
Expand All @@ -42,7 +44,6 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.lifecycle.LifecycleStart;
Expand All @@ -53,7 +54,6 @@
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
Expand Down Expand Up @@ -113,7 +113,7 @@
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
Expand Down Expand Up @@ -153,7 +153,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningScheduledExecutorService cleanupExec;

private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ResourceManagementStrategy<RemoteTaskRunner> resourceManagement;
private final ResourceManagementStrategy<WorkerTaskRunner> resourceManagement;

public RemoteTaskRunner(
ObjectMapper jsonMapper,
Expand All @@ -164,7 +164,7 @@ public RemoteTaskRunner(
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec,
ResourceManagementStrategy<RemoteTaskRunner> resourceManagement
ResourceManagementStrategy<WorkerTaskRunner> resourceManagement
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -180,6 +180,7 @@ public RemoteTaskRunner(
this.resourceManagement = resourceManagement;
}

@Override
@LifecycleStart
public void start()
{
Expand Down Expand Up @@ -298,6 +299,7 @@ public void onFailure(Throwable throwable)
}
}

@Override
@LifecycleStop
public void stop()
{
Expand Down Expand Up @@ -325,7 +327,13 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
return ImmutableList.of();
}

public Collection<ZkWorker> getWorkers()
@Override
public Collection<Worker> getWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
}

public Collection<ZkWorker> getZkWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
}
Expand Down Expand Up @@ -1018,7 +1026,8 @@ private void taskComplete(
taskRunnerWorkItem.setResult(taskStatus);
}

public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
@Override
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
Expand All @@ -1027,7 +1036,7 @@ public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxW
String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker);
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker)) {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) {
Expand All @@ -1040,13 +1049,13 @@ public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxW
throw Throwables.propagate(e);
}
}
return ImmutableList.copyOf(lazyWorkers.values());
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}
}

private List<String> getAssignedTasks(Worker worker) throws Exception
protected List<String> getAssignedTasks(Worker worker) throws Exception
{
List<String> assignedTasks = Lists.newArrayList(
final List<String> assignedTasks = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost()))
);

Expand All @@ -1066,9 +1075,25 @@ private List<String> getAssignedTasks(Worker worker) throws Exception
return assignedTasks;
}

public List<ZkWorker> getLazyWorkers()
@Override
public Collection<Worker> getLazyWorkers()
{
return ImmutableList.copyOf(lazyWorkers.values());
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}

public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
workers,
new Function<ZkWorker, Worker>()
{
@Override
public Worker apply(ZkWorker input)
{
return input.getWorker();
}
}
);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunner>
{
public static final String TYPE_NAME = "remote";
private static final Logger LOG = new Logger(RemoteTaskRunnerFactory.class);
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
Expand Down Expand Up @@ -83,7 +84,7 @@ public RemoteTaskRunnerFactory(
@Override
public RemoteTaskRunner build()
{
final ResourceManagementStrategy<RemoteTaskRunner> resourceManagementStrategy;
final ResourceManagementStrategy<WorkerTaskRunner> resourceManagementStrategy;
if (resourceManagementSchedulerConfig.isDoAutoscale()) {
resourceManagementStrategy = new SimpleResourceManagementStrategy(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

/**
* Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}.
* Holds state
*/
public interface TaskRunner
{
Expand Down Expand Up @@ -75,4 +76,9 @@ public interface TaskRunner
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional<ScalingStats> getScalingStats();

/**
* Start the state of the runner
*/
void start();
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private static ListeningExecutorService buildExecutorService(int priority)
);
}

@Override
@LifecycleStop
public void stop()
{
Expand Down Expand Up @@ -259,6 +260,12 @@ public Optional<ScalingStats> getScalingStats()
return Optional.absent();
}

@Override
public void start()
{
// No state startup required
}

@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;

import com.google.common.base.Predicate;
import io.druid.indexing.worker.Worker;

import java.util.Collection;

public interface WorkerTaskRunner extends TaskRunner
{
/**
* List of known workers who can accept tasks
* @return A list of workers who can accept tasks for running
*/
Collection<Worker> getWorkers();

/**
* Return a list of workers who can be reaped by autoscaling
* @return Workers which can be reaped by autoscaling
*/
Collection<Worker> getLazyWorkers();

/**
* Check which workers can be marked as lazy
* @param isLazyWorker
* @param maxWorkers
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter;

private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());

public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
Expand Down Expand Up @@ -129,7 +128,7 @@ public Set<String> getAvailabilityGroups()
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime.get();
return worker.get().getLastCompletedTaskTime();
}

public boolean isRunningTask(String taskId)
Expand All @@ -139,7 +138,7 @@ public boolean isRunningTask(String taskId)

public boolean isValidVersion(String minVersion)
{
return worker.get().getVersion().compareTo(minVersion) >= 0;
return worker.get().isValidVersion(minVersion);
}

public void setWorker(Worker newWorker)
Expand All @@ -153,7 +152,7 @@ public void setWorker(Worker newWorker)

public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
lastCompletedTaskTime.set(completedTaskTime);
worker.get().setLastCompletedTaskTime(completedTaskTime);
}

public ImmutableZkWorker toImmutable()
Expand All @@ -172,7 +171,6 @@ public String toString()
{
return "ZkWorker{" +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}
Loading

0 comments on commit fb7eae3

Please sign in to comment.