Skip to content

Commit

Permalink
Server selector improvement (apache#4315)
Browse files Browse the repository at this point in the history
* Do not re-create prioritized servers on each call in server selector and extend TierSelectorStrategy interface with a method to pick multiple elements at once

* Fix compilation
  • Loading branch information
dgolitsyn authored and leventov committed May 26, 2017
1 parent 1eaa788 commit 515fabc
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package io.druid.client.selector;

import io.druid.java.util.common.ISE;
import com.google.common.collect.Iterables;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;

import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;

/**
*/
Expand All @@ -39,23 +40,27 @@ public AbstractTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrateg

@Override
public QueryableDruidServer pick(
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment
)
{
final Map.Entry<Integer, Set<QueryableDruidServer>> priorityServers = prioritizedServers.pollFirstEntry();

if (priorityServers == null) {
return null;
}
return Iterables.getOnlyElement(pick(prioritizedServers, segment, 1), null);
}

final Set<QueryableDruidServer> servers = priorityServers.getValue();
switch (servers.size()) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return priorityServers.getValue().iterator().next();
default:
return serverSelectorStrategy.pick(servers, segment);
@Override
public List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
)
{
List<QueryableDruidServer> result = new ArrayList<>(numServersToPick);
for (Set<QueryableDruidServer> priorityServers : prioritizedServers.values()) {
result.addAll(serverSelectorStrategy.pick(priorityServers, segment, numServersToPick - result.size()));
if (result.size() == numServersToPick) {
break;
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package io.druid.client.selector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import io.druid.timeline.DataSegment;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;

public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
Expand All @@ -42,4 +45,15 @@ public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment
{
return Collections.min(servers, comparator);
}

@Override
public List<QueryableDruidServer> pick(
Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick
)
{
if (servers.size() <= numServersToPick) {
return ImmutableList.copyOf(servers);
}
return Ordering.from(comparator).leastOf(servers, numServersToPick);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,32 @@

package io.druid.client.selector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import io.druid.timeline.DataSegment;

import java.util.Random;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Random random = new Random();

@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
return Iterators.get(servers.iterator(), ThreadLocalRandom.current().nextInt(servers.size()));
}

@Override
public List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick)
{
if (servers.size() <= numServersToPick) {
return ImmutableList.copyOf(servers);
}
List<QueryableDruidServer> list = Lists.newArrayList(servers);
Collections.shuffle(list, ThreadLocalRandom.current());
return ImmutableList.copyOf(list.subList(0, numServersToPick));
}
}
79 changes: 33 additions & 46 deletions server/src/main/java/io/druid/client/selector/ServerSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,23 @@

package io.druid.client.selector;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
*/
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{

private static final EmittingLogger log = new EmittingLogger(ServerSelector.class);

private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> servers;

private final TierSelectorStrategy strategy;

Expand All @@ -50,8 +46,9 @@ public ServerSelector(
TierSelectorStrategy strategy
)
{
this.segment = new AtomicReference<DataSegment>(segment);
this.segment = new AtomicReference<>(segment);
this.strategy = strategy;
this.servers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
}

public DataSegment getSegment()
Expand All @@ -65,14 +62,25 @@ public void addServerAndUpdateSegment(
{
synchronized (this) {
this.segment.set(segment);
servers.add(server);
int priority = server.getServer().getPriority();
Set<QueryableDruidServer> priorityServers = servers.computeIfAbsent(priority, p -> new HashSet<>());
priorityServers.add(server);
}
}

public boolean removeServer(QueryableDruidServer server)
{
synchronized (this) {
return servers.remove(server);
int priority = server.getServer().getPriority();
Set<QueryableDruidServer> priorityServers = servers.get(priority);
if (priorityServers == null) {
return false;
}
boolean result = priorityServers.remove(server);
if (priorityServers.isEmpty()) {
servers.remove(priority);
}
return result;
}
}

Expand All @@ -84,49 +92,28 @@ public boolean isEmpty()
}

public List<DruidServerMetadata> getCandidates(final int numCandidates) {
List<DruidServerMetadata> result = Lists.newArrayList();
synchronized (this) {
final DataSegment target = segment.get();
for (Map.Entry<Integer, Set<QueryableDruidServer>> entry : toPrioritizedServers().entrySet()) {
Set<QueryableDruidServer> servers = entry.getValue();
TreeMap<Integer, Set<QueryableDruidServer>> tieredMap = Maps.newTreeMap();
while (!servers.isEmpty()) {
tieredMap.put(entry.getKey(), servers); // strategy.pick() removes entry
QueryableDruidServer server = strategy.pick(tieredMap, target);
if (server == null) {
// regard this as any server in tieredMap is not appropriate
break;
}
result.add(server.getServer().getMetadata());
if (numCandidates > 0 && result.size() >= numCandidates) {
return result;
}
servers.remove(server);
}
if (numCandidates > 0) {
return strategy.pick(servers, segment.get(), numCandidates)
.stream()
.map(server -> server.getServer().getMetadata())
.collect(Collectors.toList());
} else {
// return all servers as candidates
return servers.values()
.stream()
.flatMap(Collection::stream)
.map(server -> server.getServer().getMetadata())
.collect(Collectors.toList());
}
}
return result;
}

@Override
public QueryableDruidServer pick()
{
synchronized (this) {
return strategy.pick(toPrioritizedServers(), segment.get());
}
}

private TreeMap<Integer, Set<QueryableDruidServer>> toPrioritizedServers()
{
final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) {
theServers = Sets.newHashSet();
prioritizedServers.put(server.getServer().getPriority(), theServers);
}
theServers.add(server);
return strategy.pick(servers, segment.get());
}
return prioritizedServers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment;

import java.util.List;
import java.util.Set;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
Expand All @@ -32,5 +33,7 @@
})
public interface ServerSelectorStrategy
{
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);
QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);

List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;

/**
*/
Expand All @@ -37,7 +38,13 @@
})
public interface TierSelectorStrategy
{
public Comparator<Integer> getComparator();
Comparator<Integer> getComparator();

public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
QueryableDruidServer pick(Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);

List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
);
}
Loading

0 comments on commit 515fabc

Please sign in to comment.