Skip to content

Commit

Permalink
Use ImmutableWorkerInfo instead of ZKWorker
Browse files Browse the repository at this point in the history
review comments

add test for equals and hashcode
  • Loading branch information
nishantmonu51 committed Mar 14, 2016
1 parent d51a0a0 commit 9cceff2
Show file tree
Hide file tree
Showing 23 changed files with 593 additions and 284 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.overlord;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;

import java.util.Collection;
import java.util.Set;

/**
* A snapshot of a Worker and its current state i.e tasks assigned to that worker.
*/
public class ImmutableWorkerInfo
{
private final Worker worker;
private final int currCapacityUsed;
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;

@JsonCreator
public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("availabilityGroups") Set<String> availabilityGroups,
@JsonProperty("runningTasks") Collection<String> runningTasks,
@JsonProperty("lastCompletedTaskTime") DateTime lastCompletedTaskTime
)
{
this.worker = worker;
this.currCapacityUsed = currCapacityUsed;
this.availabilityGroups = ImmutableSet.copyOf(availabilityGroups);
this.runningTasks = ImmutableSet.copyOf(runningTasks);
this.lastCompletedTaskTime = lastCompletedTaskTime;
}

@JsonProperty("worker")
public Worker getWorker()
{
return worker;
}

@JsonProperty("currCapacityUsed")
public int getCurrCapacityUsed()
{
return currCapacityUsed;
}

@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
return availabilityGroups;
}

@JsonProperty("runningTasks")
public Set<String> getRunningTasks()
{
return runningTasks;
}

@JsonProperty("lastCompletedTaskTime")
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime;
}

public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
}

public boolean canRunTask(Task task)
{
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ImmutableWorkerInfo that = (ImmutableWorkerInfo) o;

if (currCapacityUsed != that.currCapacityUsed) {
return false;
}
if (!worker.equals(that.worker)) {
return false;
}
if (!availabilityGroups.equals(that.availabilityGroups)) {
return false;
}
if (!runningTasks.equals(that.runningTasks)) {
return false;
}
return lastCompletedTaskTime.equals(that.lastCompletedTaskTime);

}

@Override
public int hashCode()
{
int result = worker.hashCode();
result = 31 * result + currCapacityUsed;
result = 31 * result + availabilityGroups.hashCode();
result = 31 * result + runningTasks.hashCode();
result = 31 * result + lastCompletedTaskTime.hashCode();
return result;
}

@Override
public String toString()
{
return "ImmutableWorkerInfo{" +
"worker=" + worker +
", currCapacityUsed=" + currCapacityUsed +
", availabilityGroups=" + availabilityGroups +
", runningTasks=" + runningTasks +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
* <p/>
* <p>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* <p/>
* <p>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
* <p/>
* <p>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
Expand Down Expand Up @@ -365,14 +365,9 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
}

@Override
public Collection<Worker> getWorkers()
public Collection<ImmutableWorkerInfo> getWorkers()
{
return ImmutableList.copyOf(getWorkerFromZK(zkWorkers.values()));
}

public Collection<ZkWorker> getZkWorkers()
{
return ImmutableList.copyOf(zkWorkers.values());
return getImmutableWorkerFromZK(zkWorkers.values());
}

@Override
Expand Down Expand Up @@ -672,7 +667,7 @@ private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem ta

ZkWorker assignedWorker = null;
try {
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
final Optional<ImmutableWorkerInfo> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Expand All @@ -687,10 +682,10 @@ public boolean apply(Map.Entry<String, ZkWorker> input)
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableZkWorker>()
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableZkWorker transformEntry(
public ImmutableWorkerInfo transformEntry(
String key, ZkWorker value
)
{
Expand All @@ -712,7 +707,8 @@ public ImmutableZkWorker transformEntry(

log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return false;
} finally {
}
finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
// note that this is essential as a task might not get a worker because a worker was assigned another task.
Expand Down Expand Up @@ -1092,7 +1088,7 @@ private void taskComplete(
}

@Override
public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
Expand All @@ -1101,7 +1097,7 @@ public Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int ma
String worker = iterator.next();
ZkWorker zkWorker = zkWorkers.get(worker);
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.getWorker())) {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker, zkWorker);
if (lazyWorkers.size() == maxWorkers) {
Expand Down Expand Up @@ -1146,6 +1142,23 @@ public Collection<Worker> getLazyWorkers()
return ImmutableList.copyOf(getWorkerFromZK(lazyWorkers.values()));
}

private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> workers)
{
return ImmutableList.copyOf(
Collections2.transform(
workers,
new Function<ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableWorkerInfo apply(ZkWorker input)
{
return input.toImmutable();
}
}
)
);
}

public static Collection<Worker> getWorkerFromZK(Collection<ZkWorker> workers)
{
return Collections2.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface WorkerTaskRunner extends TaskRunner
* List of known workers who can accept tasks
* @return A list of workers who can accept tasks for running
*/
Collection<Worker> getWorkers();
Collection<ImmutableWorkerInfo> getWorkers();

/**
* Return a list of workers who can be reaped by autoscaling
Expand All @@ -43,5 +43,5 @@ public interface WorkerTaskRunner extends TaskRunner
* @param maxWorkers
* @return
*/
Collection<Worker> markWorkersLazy(Predicate<Worker> isLazyWorker, int maxWorkers);
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ZkWorker implements Closeable
private final Function<ChildData, TaskAnnouncement> cacheConverter;

private AtomicReference<Worker> worker;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());

public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{
Expand Down Expand Up @@ -128,7 +129,7 @@ public Set<String> getAvailabilityGroups()
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return worker.get().getLastCompletedTaskTime();
return lastCompletedTaskTime.get();
}

public boolean isRunningTask(String taskId)
Expand All @@ -138,7 +139,7 @@ public boolean isRunningTask(String taskId)

public boolean isValidVersion(String minVersion)
{
return worker.get().isValidVersion(minVersion);
return worker.get().getVersion().compareTo(minVersion) >= 0;
}

public void setWorker(Worker newWorker)
Expand All @@ -152,12 +153,13 @@ public void setWorker(Worker newWorker)

public void setLastCompletedTaskTime(DateTime completedTaskTime)
{
worker.get().setLastCompletedTaskTime(completedTaskTime);
lastCompletedTaskTime.set(completedTaskTime);
}

public ImmutableZkWorker toImmutable()
public ImmutableWorkerInfo toImmutable()
{
return new ImmutableZkWorker(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups());

return new ImmutableWorkerInfo(worker.get(), getCurrCapacityUsed(), getAvailabilityGroups(), getRunningTaskIds(), lastCompletedTaskTime.get());
}

@Override
Expand All @@ -171,6 +173,7 @@ public String toString()
{
return "ZkWorker{" +
"worker=" + worker +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}
}
Loading

0 comments on commit 9cceff2

Please sign in to comment.