Skip to content

Commit

Permalink
Show candidate hosts for the given query (apache#2282)
Browse files Browse the repository at this point in the history
* Show candidate hosts for the given query

* Added test cases & minor changes to address comments

* Changed path-param to query-pram for intervals/numCandidates
  • Loading branch information
navis authored and b-slim committed Sep 22, 2016
1 parent 67920c1 commit 49c0fe0
Show file tree
Hide file tree
Showing 12 changed files with 552 additions and 52 deletions.
10 changes: 10 additions & 0 deletions docs/content/design/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,17 @@ Returns the dimensions of the datasource.

Returns the metrics of the datasource.

* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals-in-ISO8601-format}&numCandidates={numCandidates}`

Returns segment information lists including server locations for the given datasource and intervals. If "numCandidates" is not specified, it will return all servers for each interval.

* `/druid/broker/v1/loadstatus`

Returns a flag indicating if the broker knows about all segments in Zookeeper. This can be used to know when a broker node is ready to be queried after a restart.


### POST

* `/druid/v2/candidates/`

Returns segment information lists including server locations for the given query.
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,9 @@ private static String getHostFromThrowable(Throwable e)
return null;
}
}

public static QueryInterruptedException wrapIfNeeded(Throwable e)
{
return e instanceof QueryInterruptedException ? (QueryInterruptedException) e : new QueryInterruptedException(e);
}
}
78 changes: 78 additions & 0 deletions server/src/main/java/io/druid/client/ServerViewUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client;

import com.google.common.collect.Lists;
import io.druid.client.selector.ServerSelector;
import io.druid.query.DataSource;
import io.druid.query.LocatedSegmentDescriptor;
import io.druid.query.SegmentDescriptor;
import io.druid.query.TableDataSource;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;

import java.util.Collections;
import java.util.List;

/**
*/
public class ServerViewUtil
{
public static List<LocatedSegmentDescriptor> getTargetLocations(
TimelineServerView serverView,
String datasource,
List<Interval> intervals,
int numCandidates
)
{
return getTargetLocations(serverView, new TableDataSource(datasource), intervals, numCandidates);
}

public static List<LocatedSegmentDescriptor> getTargetLocations(
TimelineServerView serverView,
DataSource datasource,
List<Interval> intervals,
int numCandidates
)
{
TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(datasource);
if (timeline == null) {
return Collections.emptyList();
}
List<LocatedSegmentDescriptor> located = Lists.newArrayList();
for (Interval interval : intervals) {
for (TimelineObjectHolder<String, ServerSelector> holder : timeline.lookup(interval)) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
ServerSelector selector = chunk.getObject();
final SegmentDescriptor descriptor = new SegmentDescriptor(
holder.getInterval(), holder.getVersion(), chunk.getChunkNumber()
);
long size = selector.getSegment().getSize();
List<DruidServerMetadata> candidates = selector.getCandidates(numCandidates);
located.add(new LocatedSegmentDescriptor(descriptor, size, candidates));
}
}
}
return located;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,13 @@ public DirectDruidClient getClient()
{
return client;
}

@Override
public String toString()
{
return "QueryableDruidServer{" +
"server=" + server +
", client=" + client +
'}';
}
}
54 changes: 44 additions & 10 deletions server/src/main/java/io/druid/client/selector/ServerSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

package io.druid.client.selector;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -78,20 +83,49 @@ public boolean isEmpty()
}
}

public QueryableDruidServer pick()
{
public List<DruidServerMetadata> getCandidates(final int numCandidates) {
List<DruidServerMetadata> result = Lists.newArrayList();
synchronized (this) {
final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) {
theServers = Sets.newHashSet();
prioritizedServers.put(server.getServer().getPriority(), theServers);
final DataSegment target = segment.get();
for (Map.Entry<Integer, Set<QueryableDruidServer>> entry : toPrioritizedServers().entrySet()) {
Set<QueryableDruidServer> servers = entry.getValue();
TreeMap<Integer, Set<QueryableDruidServer>> tieredMap = Maps.newTreeMap();
while (!servers.isEmpty()) {
tieredMap.put(entry.getKey(), servers); // strategy.pick() removes entry
QueryableDruidServer server = strategy.pick(tieredMap, target);
if (server == null) {
// regard this as any server in tieredMap is not appropriate
break;
}
result.add(server.getServer().getMetadata());
if (numCandidates > 0 && result.size() >= numCandidates) {
return result;
}
servers.remove(server);
}
theServers.add(server);
}
}
return result;
}

public QueryableDruidServer pick()
{
synchronized (this) {
return strategy.pick(toPrioritizedServers(), segment.get());
}
}

return strategy.pick(prioritizedServers, segment.get());
private TreeMap<Integer, Set<QueryableDruidServer>> toPrioritizedServers()
{
final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) {
theServers = Sets.newHashSet();
prioritizedServers.put(server.getServer().getPriority(), theServers);
}
theServers.add(server);
}
return prioritizedServers;
}
}
134 changes: 134 additions & 0 deletions server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.server.coordination.DruidServerMetadata;
import org.joda.time.Interval;

import java.util.List;
import java.util.Objects;

/**
* public, evolving
* <p/>
* extended version of SegmentDescriptor, which is internal class, with location and size information attached
*/
public class LocatedSegmentDescriptor
{
private final Interval interval;
private final String version;
private final int partitionNumber;
private final long size;
private final List<DruidServerMetadata> locations;

@JsonCreator
public LocatedSegmentDescriptor(
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JsonProperty("partitionNumber") int partitionNumber,
@JsonProperty("size") long size,
@JsonProperty("locations") List<DruidServerMetadata> locations
)
{
this.interval = interval;
this.version = version;
this.partitionNumber = partitionNumber;
this.size = size;
this.locations = locations == null ? ImmutableList.<DruidServerMetadata>of() : locations;
}

public LocatedSegmentDescriptor(SegmentDescriptor descriptor, long size, List<DruidServerMetadata> candidates)
{
this(descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber(), size, candidates);
}

@JsonProperty("interval")
public Interval getInterval()
{
return interval;
}

@JsonProperty("version")
public String getVersion()
{
return version;
}

@JsonProperty("partitionNumber")
public int getPartitionNumber()
{
return partitionNumber;
}

@JsonProperty("size")
public long getSize()
{
return size;
}

@JsonProperty("locations")
public List<DruidServerMetadata> getLocations()
{
return locations;
}

@Override
public boolean equals(Object o)
{
if (!(o instanceof LocatedSegmentDescriptor)) {
return false;
}

LocatedSegmentDescriptor other = (LocatedSegmentDescriptor) o;

if (partitionNumber != other.partitionNumber) {
return false;
}
if (!Objects.equals(interval, other.interval)) {
return false;
}
if (!Objects.equals(version, other.version)) {
return false;
}

return true;
}

@Override
public int hashCode()
{
return Objects.hash(partitionNumber, interval, version);
}

@Override
public String toString()
{
return "LocatedSegmentDescriptor{" +
"interval=" + interval +
", version='" + version + '\'' +
", partitionNumber=" + partitionNumber +
", size=" + size +
", locations=" + locations +
'}';
}
}
Loading

0 comments on commit 49c0fe0

Please sign in to comment.