diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java index fd1769ee8f5b..1db1fc2aaef8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategy.java @@ -47,7 +47,17 @@ public int compare( ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2 ) { - return -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); + int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); + // the version sorting is needed because if the workers have the same currCapacityUsed only one of them is + // returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't + // run new tasks, so in this case the workers are sorted using version to ensure that if exists enable + // workers the comparator return one of them. + + if(retVal == 0) { + retVal = zkWorker2.getWorker().getVersion().compareTo(zkWorker.getWorker().getVersion()); + } + + return retVal; } } ); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java index a0e8e2700490..447fd9857395 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/FillCapacityWorkerSelectStrategy.java @@ -50,8 +50,13 @@ public int compare( ) { int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed()); + // the version sorting is needed because if the workers have the same currCapacityUsed only one of them is + // returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't + // run new tasks, so in this case the workers are sorted using version to ensure that if exists enable + // workers the comparator return one of them. + if (retVal == 0) { - retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost()); + retVal = zkWorker.getWorker().getVersion().compareTo(zkWorker2.getWorker().getVersion()); } return retVal; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java index 506aceceddc2..b43b5c6c4829 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java @@ -63,4 +63,70 @@ public String getDataSource() ImmutableZkWorker worker = optional.get(); Assert.assertEquals("lhost", worker.getWorker().getHost()); } + + @Test + public void testOneDisableWorkerDifferentUsedCapacity() throws Exception + { + String DISABLED_VERSION = ""; + final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(); + + Optional optional = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + ImmutableMap.of( + "lhost", + new ImmutableZkWorker( + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2, + Sets.newHashSet() + ), + "localhost", + new ImmutableZkWorker( + new Worker("enableHost", "enableHost", 10, "v1"), 5, + Sets.newHashSet() + ) + ), + new NoopTask(null, 1, 0, null, null, null) + { + @Override + public String getDataSource() + { + return "foo"; + } + } + ); + ImmutableZkWorker worker = optional.get(); + Assert.assertEquals("enableHost", worker.getWorker().getHost()); + } + + @Test + public void testOneDisableWorkerSameUsedCapacity() throws Exception + { + String DISABLED_VERSION = ""; + final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(); + + Optional optional = strategy.findWorkerForTask( + new RemoteTaskRunnerConfig(), + ImmutableMap.of( + "lhost", + new ImmutableZkWorker( + new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5, + Sets.newHashSet() + ), + "localhost", + new ImmutableZkWorker( + new Worker("enableHost", "enableHost", 10, "v1"), 5, + Sets.newHashSet() + ) + ), + new NoopTask(null, 1, 0, null, null, null) + { + @Override + public String getDataSource() + { + return "foo"; + } + } + ); + ImmutableZkWorker worker = optional.get(); + Assert.assertEquals("enableHost", worker.getWorker().getHost()); + } }