Skip to content

Commit

Permalink
Normalized Cost Balancer (apache#3632)
Browse files Browse the repository at this point in the history
* Normalized Cost Balancer

* Adding documentation and renaming to use diskNormalizedCostBalancer

* Remove balancer from the strings

* Update docs and include random cost balancer

* Fix checkstyle issues
  • Loading branch information
niketh authored and cheddar committed Dec 6, 2016
1 parent c74d267 commit d904c79
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 130 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|

### Metadata Retrieval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
*/
package io.druid.server.coordinator;

import org.joda.time.DateTime;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.io.Closeable;

public interface BalancerStrategyFactory extends Closeable
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class),
})
public interface BalancerStrategyFactory
{
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp);
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,12 @@
package io.druid.server.coordinator;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.joda.time.DateTime;

import java.io.IOException;
import java.util.concurrent.Executors;

public class CostBalancerStrategyFactory implements BalancerStrategyFactory
{
private final ListeningExecutorService exec;

public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount)
{
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(costBalancerStrategyThreadCount));
}

@Override
public CostBalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
public CostBalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
{
return new CostBalancerStrategy(exec);
}

@Override
public void close() throws IOException
{
exec.shutdownNow();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.timeline.DataSegment;

public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
{
public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec)
{
super(exec);
}

/**
* Averages the cost obtained from CostBalancerStrategy. Also the costs are weighted according to their usage ratios.
* This ensures that all the hosts will have the same % disk utilization.
*/
@Override
protected double computeCost(
final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer
)
{
double cost = super.computeCost(proposalSegment, server, includeCurrentServer);

if(cost == Double.POSITIVE_INFINITY){
return cost;
}

int nSegments = 1;
if(server.getServer().getSegments().size() > 0)
{
nSegments = server.getServer().getSegments().size();
}

double normalizedCost = cost/nSegments;
double usageRatio = (double)server.getServer().getCurrSize()/(double)server.getServer().getMaxSize();

return normalizedCost*usageRatio;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;

import com.google.common.util.concurrent.ListeningExecutorService;

public class DiskNormalizedCostBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
{
return new DiskNormalizedCostBalancerStrategy(exec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
Expand Down Expand Up @@ -84,6 +86,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -129,7 +132,7 @@ public Interval apply(DataSegment segment)
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;

private final BalancerStrategyFactory factory;

@Inject
public DruidCoordinator(
Expand All @@ -146,7 +149,8 @@ public DruidCoordinator(
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self,
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory
)
{
this(
Expand All @@ -164,7 +168,8 @@ public DruidCoordinator(
serviceAnnouncer,
self,
Maps.<String, LoadQueuePeon>newConcurrentMap(),
indexingServiceHelpers
indexingServiceHelpers,
factory
);
}

Expand All @@ -183,7 +188,8 @@ public DruidCoordinator(
ServiceAnnouncer serviceAnnouncer,
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<DruidCoordinatorHelper> indexingServiceHelpers
Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory
)
{
this.config = config;
Expand All @@ -205,6 +211,7 @@ public DruidCoordinator(

this.leaderLatch = new AtomicReference<>(null);
this.loadManagementPeons = loadQueuePeonMap;
this.factory = factory;
}

public boolean isLeader()
Expand Down Expand Up @@ -664,6 +671,7 @@ protected CoordinatorRunnable(List<DruidCoordinatorHelper> helpers, final int st
@Override
public void run()
{
ListeningExecutorService balancerExec = null;
try {
synchronized (lock) {
final LeaderLatch latch = leaderLatch.get();
Expand All @@ -686,27 +694,32 @@ public void run()
}
}

try (BalancerStrategyFactory factory =
new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads())) {
// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategyFactory(factory)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
balancerExec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(getDynamicConfigs().getBalancerComputeThreads()));
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);

// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategy(balancerStrategy)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
}
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
} finally {
if(balancerExec != null){
balancerExec.shutdownNow();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategyFactory strategyFactory;
private final BalancerStrategy balancerStrategy;

public DruidCoordinatorRuntimeParams(
long startTime,
Expand All @@ -63,7 +63,7 @@ public DruidCoordinatorRuntimeParams(
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
BalancerStrategy balancerStrategy
)
{
this.startTime = startTime;
Expand All @@ -78,7 +78,7 @@ public DruidCoordinatorRuntimeParams(
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory = strategyFactory;
this.balancerStrategy = balancerStrategy;
}

public long getStartTime()
Expand Down Expand Up @@ -141,9 +141,9 @@ public DateTime getBalancerReferenceTimestamp()
return balancerReferenceTimestamp;
}

public BalancerStrategyFactory getBalancerStrategyFactory()
public BalancerStrategy getBalancerStrategy()
{
return strategyFactory;
return balancerStrategy;
}

public boolean hasDeletionWaitTimeElapsed()
Expand Down Expand Up @@ -171,7 +171,7 @@ public Builder buildFromExisting()
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}

Expand All @@ -190,7 +190,7 @@ public Builder buildFromExistingWithoutAvailableSegments()
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}

Expand All @@ -208,7 +208,7 @@ public static class Builder
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategyFactory strategyFactory;
private BalancerStrategy balancerStrategy;

Builder()
{
Expand Down Expand Up @@ -239,7 +239,7 @@ public static class Builder
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
BalancerStrategy balancerStrategy
)
{
this.startTime = startTime;
Expand All @@ -254,7 +254,7 @@ public static class Builder
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory=strategyFactory;
this.balancerStrategy=balancerStrategy;
}

public DruidCoordinatorRuntimeParams build()
Expand All @@ -272,7 +272,7 @@ public DruidCoordinatorRuntimeParams build()
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}

Expand Down Expand Up @@ -348,9 +348,9 @@ public Builder withBalancerReferenceTimestamp(DateTime balancerReferenceTimestam
return this;
}

public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory)
public Builder withBalancerStrategy(BalancerStrategy balancerStrategy)
{
this.strategyFactory=strategyFactory;
this.balancerStrategy=balancerStrategy;
return this;
}
}
Expand Down
Loading

0 comments on commit d904c79

Please sign in to comment.