Skip to content

Commit

Permalink
HttpRemoteTaskRunner: Fix markLazyWorkers for maxLazyWorkers == 0. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm authored Jul 6, 2023
1 parent d02bb8b commit 037f09b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1397,11 +1397,11 @@ private void taskComplete(
}

@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers)
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxWorkers < 1) {
if (maxLazyWorkers < 1) {
return Collections.emptyList();
}
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
Expand All @@ -1412,7 +1412,7 @@ public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyW
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", zkWorker.getWorker().getHost());
lazyWorkers.put(worker.getKey(), zkWorker);
if (lazyWorkers.size() == maxWorkers) {
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ enum ActionType

/**
* Check which workers can be marked as lazy
*
* @param isLazyWorker predicate that checks if a worker is lazy
* @param maxLazyWorkers maximum number of lazy workers to return
*/
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers);
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers);

WorkerTaskRunnerConfig getConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -922,16 +923,22 @@ public Collection<Worker> getLazyWorkers()
}

@Override
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxWorkers)
public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> isLazyWorker, int maxLazyWorkers)
{
// skip the lock and bail early if we should not mark any workers lazy (e.g. number
// of current workers is at or below the minNumWorkers of autoscaler config)
if (maxLazyWorkers < 1) {
return Collections.emptyList();
}

synchronized (statusLock) {
for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
final WorkerHolder workerHolder = worker.getValue();
try {
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost());
lazyWorkers.put(worker.getKey(), workerHolder);
if (lazyWorkers.size() == maxWorkers) {
if (lazyWorkers.size() == maxLazyWorkers) {
// only mark excess workers as lazy and allow their cleanup
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,17 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo
Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());

Assert.assertEquals(
Collections.emptyList(),
taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 0)
);

Assert.assertEquals(
"host3:8080",
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 1))
.getHost()
);

Assert.assertEquals(
"host3:8080",
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))
Expand Down

0 comments on commit 037f09b

Please sign in to comment.