Skip to content

Commit

Permalink
optionally enable coordinator auto kill tasks on all dataSources via …
Browse files Browse the repository at this point in the history
…dynamic config (apache#3250)
  • Loading branch information
himanshug authored and fjy committed Jul 18, 2016
1 parent 7995818 commit 3f82108
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 15 deletions.
3 changes: 2 additions & 1 deletion docs/content/configuration/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|false|
|`druid.coordinator.conversion.on`|Boolean flag for converting old segment indexing versions to the latest segment indexing version.|false|
|`druid.coordinator.load.timeout`|The timeout duration for when the coordinator assigns a segment to a historical node.|PT15M|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all the whitelisted dataSources, coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all segments except for the last `durationToRetain` period. Whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.|false|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|false|
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
Expand Down Expand Up @@ -88,6 +88,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
|`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|

To view the audit history of coordinator dynamic config issue a GET request to the URL -

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.IAE;

import java.util.Collection;
import java.util.HashSet;
Expand All @@ -38,6 +39,7 @@ public class CoordinatorDynamicConfig
private final int replicationThrottleLimit;
private final int balancerComputeThreads;
private final boolean emitBalancingStats;
private final boolean killAllDataSources;
private final Set<String> killDataSourceWhitelist;

@JsonCreator
Expand All @@ -54,7 +56,8 @@ public CoordinatorDynamicConfig(
// Type is Object here so that we can support both string and list as
// coordinator console can not send array of strings in the update request.
// See https://github.com/druid-io/druid/issues/3055
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
@JsonProperty("killAllDataSources") boolean killAllDataSources
)
{
this.maxSegmentsToMove = maxSegmentsToMove;
Expand All @@ -66,6 +69,8 @@ public CoordinatorDynamicConfig(
this.emitBalancingStats = emitBalancingStats;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);

this.killAllDataSources = killAllDataSources;

if (killDataSourceWhitelist instanceof String) {
String[] tmp = ((String) killDataSourceWhitelist).split(",");
this.killDataSourceWhitelist = new HashSet<>();
Expand All @@ -80,6 +85,10 @@ public CoordinatorDynamicConfig(
} else {
this.killDataSourceWhitelist = ImmutableSet.of();
}

if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
}
}

@JsonProperty
Expand Down Expand Up @@ -136,6 +145,12 @@ public Set<String> getKillDataSourceWhitelist()
return killDataSourceWhitelist;
}

@JsonProperty
public boolean isKillAllDataSources()
{
return killAllDataSources;
}

@Override
public String toString()
{
Expand All @@ -149,6 +164,7 @@ public String toString()
", balancerComputeThreads=" + balancerComputeThreads +
", emitBalancingStats=" + emitBalancingStats +
", killDataSourceWhitelist=" + killDataSourceWhitelist +
", killAllDataSources=" + killAllDataSources +
'}';
}

Expand Down Expand Up @@ -188,6 +204,9 @@ public boolean equals(Object o)
if (emitBalancingStats != that.emitBalancingStats) {
return false;
}
if (killAllDataSources != that.killAllDataSources) {
return false;
}
return !(killDataSourceWhitelist != null
? !killDataSourceWhitelist.equals(that.killDataSourceWhitelist)
: that.killDataSourceWhitelist != null);
Expand All @@ -205,6 +224,7 @@ public int hashCode()
result = 31 * result + replicationThrottleLimit;
result = 31 * result + balancerComputeThreads;
result = 31 * result + (emitBalancingStats ? 1 : 0);
result = 31 * result + (killAllDataSources ? 1 : 0);
result = 31 * result + (killDataSourceWhitelist != null ? killDataSourceWhitelist.hashCode() : 0);
return result;
}
Expand All @@ -220,10 +240,11 @@ public static class Builder
private boolean emitBalancingStats;
private int balancerComputeThreads;
private Set<String> killDataSourceWhitelist;
private boolean killAllDataSources;

public Builder()
{
this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null);
this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, false);
}

private Builder(
Expand All @@ -235,7 +256,8 @@ private Builder(
int replicationThrottleLimit,
int balancerComputeThreads,
boolean emitBalancingStats,
Set<String> killDataSourceWhitelist
Set<String> killDataSourceWhitelist,
boolean killAllDataSources
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
Expand All @@ -247,6 +269,7 @@ private Builder(
this.emitBalancingStats = emitBalancingStats;
this.balancerComputeThreads = balancerComputeThreads;
this.killDataSourceWhitelist = killDataSourceWhitelist;
this.killAllDataSources = killAllDataSources;
}

public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
Expand Down Expand Up @@ -308,7 +331,8 @@ public CoordinatorDynamicConfig build()
replicationThrottleLimit,
balancerComputeThreads,
emitBalancingStats,
killDataSourceWhitelist
killDataSourceWhitelist,
killAllDataSources
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.joda.time.Interval;

import java.util.Collection;
import java.util.List;
import java.util.Set;

/**
*/
Expand Down Expand Up @@ -81,7 +81,17 @@ public DruidCoordinatorSegmentKiller(
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
Set<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();
boolean killAllDataSources = params.getCoordinatorDynamicConfig().isKillAllDataSources();
Collection<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();

if (killAllDataSources && whitelist != null && !whitelist.isEmpty()) {
log.error("killAllDataSources can't be true when killDataSourceWhitelist is non-empty, No kill tasks are scheduled.");
return params;
}

if (killAllDataSources) {
whitelist = segmentManager.getAllDatasourceNames();
}

if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) {
lastKillTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public void testDropRemove() throws Exception

EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false, null
0, 0, 0, 0, 1, 24, 0, false, null, false
)
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
Expand Down Expand Up @@ -1031,7 +1031,7 @@ public void testReplicantThrottleAcrossTiers() throws Exception
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 7, 0, false, null
0, 0, 0, 0, 1, 7, 0, false, null, false
)
).atLeastOnce();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
Expand Down Expand Up @@ -1212,7 +1212,7 @@ private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false, null
0, 0, 0, 0, 1, 24, 0, false, null, false
)
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package io.druid.server.http;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.IAE;
import io.druid.segment.TestHelper;
import io.druid.server.coordinator.CoordinatorDynamicConfig;
import org.junit.Assert;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void testSerde() throws Exception
);

Assert.assertEquals(
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2")),
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false),
actual
);
}
Expand Down Expand Up @@ -89,25 +91,75 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception
);

Assert.assertEquals(
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2")),
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false),
actual
);
}

@Test
public void testSerdeWithKillAllDataSources() throws Exception
{
String jsonStr = "{\n"
+ " \"millisToWaitBeforeDeleting\": 1,\n"
+ " \"mergeBytesLimit\": 1,\n"
+ " \"mergeSegmentsLimit\" : 1,\n"
+ " \"maxSegmentsToMove\": 1,\n"
+ " \"replicantLifetime\": 1,\n"
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
+ " \"emitBalancingStats\": true,\n"
+ " \"killAllDataSources\": true\n"
+ "}\n";

ObjectMapper mapper = TestHelper.getObjectMapper();
CoordinatorDynamicConfig actual = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
)
),
CoordinatorDynamicConfig.class
);

Assert.assertEquals(
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true),
actual
);



//ensure whitelist is empty when killAllDataSources is true
try {
jsonStr = "{\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"killAllDataSources\": true\n"
+ "}\n";
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
);

Assert.fail("deserialization should fail.");
} catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IAE);
}
}

@Test
public void testBuilderDefaults()
{
Assert.assertEquals(
new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null),
new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false),
new CoordinatorDynamicConfig.Builder().build()
);
}

@Test
public void testEqualsAndHashCodeSanity()
{
CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null);
CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null);
CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false);
CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false);

Assert.assertEquals(config1, config2);
Assert.assertEquals(config1.hashCode(), config2.hashCode());
Expand Down

0 comments on commit 3f82108

Please sign in to comment.