Skip to content

Commit

Permalink
Add EqualDistributionWithAffinityWorkerSelectStrategy which balance w… (
Browse files Browse the repository at this point in the history
apache#3998)

* Add EqualDistributionWithAffinityWorkerSelectStrategy which balance work load within affinity workers.

* add docs to equalDistributionWithAffinity
  • Loading branch information
JackyWoo authored and gianm committed Mar 26, 2017
1 parent 90f9932 commit a0f2cf0
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 11 deletions.
14 changes: 13 additions & 1 deletion docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ Issuing a GET request at the same URL will return the current worker config spec

|Property|Description|Default|
|--------|-----------|-------|
|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution` and `javascript`.|fillCapacity|
|`selectStrategy`|How to assign tasks to middle managers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution`, `equalDistributionWithAffinity` and `javascript`.|fillCapacity|
|`autoScaler`|Only used if autoscaling is enabled. See below.|null|

To view the audit history of worker config issue a GET request to the URL -
Expand Down Expand Up @@ -233,6 +233,18 @@ The workers with the least amount of tasks is assigned the task.
|--------|-----------|-------|
|`type`|`equalDistribution`.|required; must be `equalDistribution`|

##### Equal Distribution With Affinity

An affinity config can be provided.

|Property|Description|Default|
|--------|-----------|-------|
|`type`|`equalDistributionWithAffinity`.|required; must be `equalDistributionWithAffinity`|
|`affinity`|Exactly same with `fillCapacityWithAffinity` 's affinity.|{}|

Tasks will try to be assigned to preferred workers. Equal Distribution strategy is used if no preference for a datasource specified.


##### Javascript

Allows defining arbitrary logic for selecting workers to run task using a JavaScript function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@

/**
*/
public class FillCapacityWithAffinityConfig
public class AffinityConfig
{
// key:Datasource, value:[nodeHostNames]
private Map<String, List<String>> affinity = Maps.newHashMap();

@JsonCreator
public FillCapacityWithAffinityConfig(
public AffinityConfig(
@JsonProperty("affinity") Map<String, List<String>> affinity
)
{
Expand All @@ -57,7 +57,7 @@ public boolean equals(Object o)
return false;
}

FillCapacityWithAffinityConfig that = (FillCapacityWithAffinityConfig) o;
AffinityConfig that = (AffinityConfig) o;

if (affinity != null
? !Maps.difference(affinity, that.affinity).entriesDiffering().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.overlord.setup;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.WorkerTaskRunnerConfig;

import java.util.List;
import java.util.Set;

/**
*/
public class EqualDistributionWithAffinityWorkerSelectStrategy extends EqualDistributionWorkerSelectStrategy
{
private final AffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet();

@JsonCreator
public EqualDistributionWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
)
{
this.affinityConfig = affinityConfig;
for (List<String> affinityWorkers : affinityConfig.getAffinity().values()) {
for (String affinityWorker : affinityWorkers) {
this.affinityWorkerHosts.add(affinityWorker);
}
}
}

@JsonProperty
public AffinityConfig getAffinityConfig()
{
return affinityConfig;
}

@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
// don't run other datasources on affinity workers; we only want our configured datasources to run on them
ImmutableMap.Builder<String, ImmutableWorkerInfo> builder = new ImmutableMap.Builder<>();
for (String workerHost : zkWorkers.keySet()) {
if (!affinityWorkerHosts.contains(workerHost)) {
builder.put(workerHost, zkWorkers.get(workerHost));
}
}
ImmutableMap<String, ImmutableWorkerInfo> eligibleWorkers = builder.build();

List<String> workerHosts = affinityConfig.getAffinity().get(task.getDataSource());
if (workerHosts == null) {
return super.findWorkerForTask(config, eligibleWorkers, task);
}

ImmutableMap.Builder<String, ImmutableWorkerInfo> affinityBuilder = new ImmutableMap.Builder<>();
for (String workerHost : workerHosts) {
ImmutableWorkerInfo zkWorker = zkWorkers.get(workerHost);
if (zkWorker != null) {
affinityBuilder.put(workerHost, zkWorker);
}
}
ImmutableMap<String, ImmutableWorkerInfo> affinityWorkers = affinityBuilder.build();

if (!affinityWorkers.isEmpty()) {
Optional<ImmutableWorkerInfo> retVal = super.findWorkerForTask(config, affinityWorkers, task);
if (retVal.isPresent()) {
return retVal;
}
}

return super.findWorkerForTask(config, eligibleWorkers, task);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

EqualDistributionWithAffinityWorkerSelectStrategy that = (EqualDistributionWithAffinityWorkerSelectStrategy) o;

if (affinityConfig != null ? !affinityConfig.equals(that.affinityConfig) : that.affinityConfig != null) {
return false;
}

return true;
}

@Override
public int hashCode()
{
return affinityConfig != null ? affinityConfig.hashCode() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
*/
public class FillCapacityWithAffinityWorkerSelectStrategy extends FillCapacityWorkerSelectStrategy
{
private final FillCapacityWithAffinityConfig affinityConfig;
private final AffinityConfig affinityConfig;
private final Set<String> affinityWorkerHosts = Sets.newHashSet();

@JsonCreator
public FillCapacityWithAffinityWorkerSelectStrategy(
@JsonProperty("affinityConfig") FillCapacityWithAffinityConfig affinityConfig
@JsonProperty("affinityConfig") AffinityConfig affinityConfig
)
{
this.affinityConfig = affinityConfig;
Expand All @@ -52,7 +52,7 @@ public FillCapacityWithAffinityWorkerSelectStrategy(
}

@JsonProperty
public FillCapacityWithAffinityConfig getAffinityConfig()
public AffinityConfig getAffinityConfig()
{
return affinityConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
@JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "equalDistributionWithAffinity", value = EqualDistributionWithAffinityWorkerSelectStrategy.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class)
})
public interface WorkerSelectStrategy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.overlord.setup;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.overlord.ImmutableWorkerInfo;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;

public class EqualDistributionWithAffinityWorkerSelectStrategyTest
{
@Test
public void testFindWorkerForTask() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost1", "localhost2", "localhost3")))
);

Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost0",
new ImmutableWorkerInfo(
new Worker("localhost0", "localhost0", 2, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost1",
new ImmutableWorkerInfo(
new Worker("localhost1", "localhost1", 2, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost2",
new ImmutableWorkerInfo(
new Worker("localhost2", "localhost2", 2, "v1"), 1,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost3",
new ImmutableWorkerInfo(
new Worker("localhost3", "localhost3", 2, "v1"), 1,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("localhost1", worker.getWorker().getHost());
}

@Test
public void testFindWorkerForTaskWithNulls() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);

Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"lhost",
new ImmutableWorkerInfo(
new Worker("lhost", "lhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
),
"localhost",
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
);
ImmutableWorkerInfo worker = optional.get();
Assert.assertEquals("lhost", worker.getWorker().getHost());
}

@Test
public void testIsolation() throws Exception
{
EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWithAffinityWorkerSelectStrategy(
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);

Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
"localhost",
new ImmutableWorkerInfo(
new Worker("localhost", "localhost", 1, "v1"), 0,
Sets.<String>newHashSet(),
Sets.<String>newHashSet(),
DateTime.now()
)
),
new NoopTask(null, 1, 0, null, null, null)
);
Assert.assertFalse(optional.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
public void testFindWorkerForTask() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);

Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
Expand Down Expand Up @@ -76,7 +76,7 @@ public String getDataSource()
public void testFindWorkerForTaskWithNulls() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);

Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testFindWorkerForTaskWithNulls() throws Exception
public void testIsolation() throws Exception
{
FillCapacityWorkerSelectStrategy strategy = new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
new AffinityConfig(ImmutableMap.of("foo", Arrays.asList("localhost")))
);

Optional<ImmutableWorkerInfo> optional = strategy.findWorkerForTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testSerde() throws Exception
{
WorkerBehaviorConfig config = new WorkerBehaviorConfig(
new FillCapacityWithAffinityWorkerSelectStrategy(
new FillCapacityWithAffinityConfig(
new AffinityConfig(
ImmutableMap.of("foo", Arrays.asList("localhost"))
)
),
Expand Down

0 comments on commit a0f2cf0

Please sign in to comment.