Skip to content

Commit

Permalink
Merge pull request apache#2261 from metamx/improve-segment-ordering
Browse files Browse the repository at this point in the history
Prioritize loading of segments based on segment interval
  • Loading branch information
xvrl committed Jan 27, 2016
2 parents 2e50040 + fd6bf3f commit e3d1e07
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
</scm>

<properties>
<metamx.java-util.version>0.27.6</metamx.java-util.version>
<metamx.java-util.version>0.27.7</metamx.java-util.version>
<apache.curator.version>2.9.1</apache.curator.version>
<jetty.version>9.2.5.v20141112</jetty.version>
<jersey.version>1.19</jersey.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
Expand Down Expand Up @@ -78,8 +79,10 @@
import org.joda.time.Duration;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -94,6 +97,20 @@
public class DruidCoordinator
{
public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR";

public static Comparator<DataSegment> SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
.onResultOf(
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
})
.compound(Ordering.<DataSegment>natural())
.reverse();

private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
private final Object lock = new Object();
private final DruidCoordinatorConfig config;
Expand Down Expand Up @@ -249,7 +266,8 @@ public CountingMap<String> getSegmentAvailability()
return retVal;
}

CountingMap<String> getLoadPendingDatasources() {
CountingMap<String> getLoadPendingDatasources()
{
final CountingMap<String> retVal = new CountingMap<>();
for (LoadQueuePeon peon : loadManagementPeons.values()) {
for (DataSegment segment : peon.getSegmentsToLoad()) {
Expand Down Expand Up @@ -386,7 +404,7 @@ public void moveSegment(
public void execute()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&
if (curator.checkExists().forPath(toServedSegPath) != null &&
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
Expand All @@ -411,7 +429,7 @@ public void execute()

public Set<DataSegment> getOrderedAvailableDataSegments()
{
Set<DataSegment> availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
Set<DataSegment> availableSegments = Sets.newTreeSet(SEGMENT_COMPARATOR);

Iterable<DataSegment> dataSegments = getAvailableDataSegments();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public static class Builder
this.databaseRuleManager = null;
this.segmentReplicantLookup = null;
this.dataSources = Sets.newHashSet();
this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
this.availableSegments = Sets.newTreeSet(DruidCoordinator.SEGMENT_COMPARATOR);
this.loadManagementPeons = Maps.newHashMap();
this.replicationManager = null;
this.emitter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public class LoadQueuePeon
private static final int DROP = 0;
private static final int LOAD = 1;

private static Comparator<DataSegment> segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator());

private static void executeCallbacks(List<LoadPeonCallback> callbacks)
{
for (LoadPeonCallback callback : callbacks) {
Expand All @@ -79,10 +77,10 @@ private static void executeCallbacks(List<LoadPeonCallback> callbacks)
private final AtomicInteger failedAssignCount = new AtomicInteger(0);

private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
segmentComparator
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
segmentComparator
DruidCoordinator.SEGMENT_COMPARATOR
);

private final Object lock = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
Expand Down Expand Up @@ -61,6 +63,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -375,4 +378,45 @@ public void childEvent(
EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}

@Test
public void testOrderedAvailableDataSegments()
{
DruidDataSource dataSource = new DruidDataSource("test", new HashMap());
DataSegment[] segments = new DataSegment[]{
getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z"))
};
for (DataSegment segment : segments) {
dataSource.addSegment(segment.getIdentifier(), segment);
}

EasyMock.expect(databaseSegmentManager.getInventory()).andReturn(
ImmutableList.of(dataSource)
).atLeastOnce();
EasyMock.replay(databaseSegmentManager);
Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
DataSegment[] expected = new DataSegment[]{
getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z"))
};
Assert.assertEquals(expected.length, availableSegments.size());
Assert.assertEquals(expected, availableSegments.toArray());
EasyMock.verify(databaseSegmentManager);
}


private DataSegment getSegment(String dataSource, Interval interval)
{
// Not using EasyMock as it hampers the performance of multithreads.
DataSegment segment = new DataSegment(
dataSource, interval, "dummy_version", Maps.<String, Object>newConcurrentMap(),
Lists.<String>newArrayList(), Lists.<String>newArrayList(), null, 0, 0L
);
return segment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,27 @@ public DataSegment apply(String intervalStr)

final List<DataSegment> segmentToLoad = Lists.transform(
ImmutableList.<String>of(
"2014-10-27T00:00:00Z/P1D",
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
}
}
);

// segment with latest interval should be loaded first
final List<DataSegment> expectedLoadOrder = Lists.transform(
ImmutableList.<String>of(
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-29T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D",
"2014-10-27T00:00:00Z/P1D"
), new Function<String, DataSegment>()
Expand Down Expand Up @@ -235,7 +253,7 @@ public void execute()
}
}

for (DataSegment segment : segmentToLoad) {
for (DataSegment segment : expectedLoadOrder) {
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getIdentifier());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()]));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
Expand Down

0 comments on commit e3d1e07

Please sign in to comment.