Skip to content

Commit

Permalink
Remove throttling on drop segments (apache#3736)
Browse files Browse the repository at this point in the history
* Remove throttling on drop

* Throttle loadqueuepeon segment change requests to ZK

* Make initial delay configurable, add docs, shutdown gracefully

* Make loadqueuepeon repeat delay configurable
  • Loading branch information
niketh authored and fjy committed Jan 20, 2017
1 parent bb7c496 commit 2b8d3c1
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 157 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)|

### Metadata Retrieval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ public ImmutableDruidServer apply(DruidServer input)
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
loadQueuePeon.start();
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);

loadManagementPeons.put(server.getName(), loadQueuePeon);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ public String getConsoleStatic()
{
return null;
}

@Config("druid.coordinator.loadqueuepeon.repeatDelay")
public Duration getLoadQueuePeonRepeatDelay()
{
return Duration.millis(50);
}
}
206 changes: 105 additions & 101 deletions server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.metamx.emitter.EmittingLogger;

import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
Expand All @@ -41,6 +42,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -154,7 +156,6 @@ public void loadSegment(
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback)));
doNext();
}

public void dropSegment(
Expand Down Expand Up @@ -184,115 +185,98 @@ public void dropSegment(

log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback)));
doNext();
}

private void doNext()
{
synchronized (lock) {
if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else {
private void processSegmentChangeRequest() {
if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else {
return;
}

try {
if (currentlyProcessing == null) {
if(!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
}
actionCompleted();
return;
}

processingExecutor.execute(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
// expected when the coordinator looses leadership and LoadQueuePeon is stopped.
if (currentlyProcessing == null) {
if(!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);

processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
actionCompleted();
doNext();
return;
}
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);

processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);

final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
}
}
}
).forPath(path);

if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());

// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);

entryRemoved(path);
catch (Exception e) {
failAssign(e);
}
}
catch (Exception e) {
failAssign(e);
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);

final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
}
}
}
}
}
);
} else {
log.info(
"Server[%s] skipping doNext() because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentIdentifier()
);
).forPath(path);

if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());

// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);

entryRemoved(path);
}
} catch (Exception e) {
failAssign(e);
}
} else {
log.info(
"Server[%s] skipping doNext() because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentIdentifier()
);
}
}

Expand Down Expand Up @@ -326,6 +310,29 @@ public void run()
}
}

public void start()
{
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
config.getLoadQueuePeonRepeatDelay(),
config.getLoadQueuePeonRepeatDelay(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
processSegmentChangeRequest();

if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}

public void stop()
{
synchronized (lock) {
Expand Down Expand Up @@ -371,8 +378,6 @@ private void entryRemoved(String path)
actionCompleted();
log.info("Server[%s] done processing [%s]", basePath, path);
}

doNext();
}

private void failAssign(Exception e)
Expand All @@ -382,7 +387,6 @@ private void failAssign(Exception e)
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted();
doNext();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* The ReplicationThrottler is used to throttle the number of replicants that are created and destroyed.
* The ReplicationThrottler is used to throttle the number of replicants that are created.
*/
public class ReplicationThrottler
{
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);

private final Map<String, Boolean> replicatingLookup = Maps.newHashMap();
private final Map<String, Boolean> terminatingLookup = Maps.newHashMap();
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();

private volatile int maxReplicants;
private volatile int maxLifetime;
Expand All @@ -58,11 +56,6 @@ public void updateReplicationState(String tier)
update(tier, currentlyReplicating, replicatingLookup, "create");
}

public void updateTerminationState(String tier)
{
update(tier, currentlyTerminating, terminatingLookup, "terminate");
}

private void update(String tier, ReplicatorSegmentHolder holder, Map<String, Boolean> lookup, String type)
{
int size = holder.getNumProcessing(tier);
Expand Down Expand Up @@ -95,11 +88,6 @@ public boolean canCreateReplicant(String tier)
return replicatingLookup.get(tier) && !currentlyReplicating.isAtMaxReplicants(tier);
}

public boolean canDestroyReplicant(String tier)
{
return terminatingLookup.get(tier) && !currentlyTerminating.isAtMaxReplicants(tier);
}

public void registerReplicantCreation(String tier, String segmentId, String serverId)
{
currentlyReplicating.addSegment(tier, segmentId, serverId);
Expand All @@ -110,16 +98,6 @@ public void unregisterReplicantCreation(String tier, String segmentId, String se
currentlyReplicating.removeSegment(tier, segmentId, serverId);
}

public void registerReplicantTermination(String tier, String segmentId, String serverId)
{
currentlyTerminating.addSegment(tier, segmentId, serverId);
}

public void unregisterReplicantTermination(String tier, String segmentId, String serverId)
{
currentlyTerminating.removeSegment(tier, segmentId, serverId);
}

private class ReplicatorSegmentHolder
{
private final Map<String, ConcurrentHashMap<String, String>> currentlyProcessingSegments = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier);
replicatorThrottler.updateTerminationState(tier);
}

DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExistingWithoutAvailableSegments()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,33 +203,9 @@ private CoordinatorStats drop(
}

if (holder.isServingSegment(segment)) {
if (expectedNumReplicantsForTier > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(tier)) {
serverQueue.add(holder);
break;
}

replicationManager.registerReplicantTermination(
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
}

holder.getPeon().dropSegment(
segment,
new LoadPeonCallback()
{
@Override
public void execute()
{
replicationManager.unregisterReplicantTermination(
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
}
}
null
);
--loadedNumReplicantsForTier;
stats.addToTieredStat(droppedCount, tier, 1);
Expand Down
Loading

0 comments on commit 2b8d3c1

Please sign in to comment.