Skip to content

Commit

Permalink
Fixed equal distribution strategy when exist disable middleManager wi…
Browse files Browse the repository at this point in the history
…th same currCapacityUsed.
  • Loading branch information
Andrés Gomez committed Feb 17, 2016
1 parent 64409cd commit 07d714b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ public int compare(
ImmutableZkWorker zkWorker, ImmutableZkWorker zkWorker2
)
{
return -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
int retVal = -Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.

if(retVal == 0) {
retVal = zkWorker2.getWorker().getVersion().compareTo(zkWorker.getWorker().getVersion());
}

return retVal;
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ public int compare(
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.

if (retVal == 0) {
retVal = zkWorker.getWorker().getHost().compareTo(zkWorker2.getWorker().getHost());
retVal = zkWorker.getWorker().getVersion().compareTo(zkWorker2.getWorker().getVersion());
}

return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,70 @@ public String getDataSource()
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}

@Test
public void testOneDisableWorkerDifferentUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();

Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 2,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}

@Test
public void testOneDisableWorkerSameUsedCapacity() throws Exception
{
String DISABLED_VERSION = "";
final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy();

Optional<ImmutableZkWorker> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableZkWorker(
new Worker("disableHost", "disableHost", 10, DISABLED_VERSION), 5,
Sets.<String>newHashSet()
),
"localhost",
new ImmutableZkWorker(
new Worker("enableHost", "enableHost", 10, "v1"), 5,
Sets.<String>newHashSet()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableZkWorker worker = optional.get();
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
}

0 comments on commit 07d714b

Please sign in to comment.