Skip to content

Commit

Permalink
Fix Router race condition and use default broker service name for inv…
Browse files Browse the repository at this point in the history
…alid priority (apache#5050)

* use default brokerServiceName when priority is not valid

* use AtomicInteger for NodesHolder.roundRobinIndex

* revert inspectionProfiles change

* adjust TieredBrokerHostSelectorTest

* combine if statements and ensure index does not become negative

* set next index with mod if overflows

* fix codestyle

* use nextIndex

* extract the while loop to a method
  • Loading branch information
jisookim0513 authored and himanshug committed Nov 9, 2017
1 parent a8dc056 commit 1bf253f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
import io.druid.query.QueryContexts;

Expand All @@ -48,19 +47,9 @@ public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Quer
{
final int priority = QueryContexts.getPriority(query);

if (priority < minPriority) {
if (priority < minPriority || priority > maxPriority) {
return Optional.of(
Iterables.getLast(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
} else if (priority >= maxPriority) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
tierConfig.getDefaultBrokerServiceName()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
Expand Down Expand Up @@ -277,7 +278,7 @@ public List<Server> apply(NodesHolder input)

private static class NodesHolder
{
private int roundRobinIndex = 0;
private AtomicInteger roundRobinIndex = new AtomicInteger(-1);

private Map<String, Server> nodesMap = new HashMap<>();
private ImmutableList<Server> nodes = ImmutableList.of();
Expand Down Expand Up @@ -312,11 +313,21 @@ Server pick()
return null;
}

if (roundRobinIndex >= currNodes.size()) {
roundRobinIndex %= currNodes.size();
}
return currNodes.get(getIndex(currNodes));
}

return currNodes.get(roundRobinIndex++);
int getIndex(ImmutableList<Server> currNodes)
{
while (true) {
int index = roundRobinIndex.get();
int nextIndex = index + 1;
if (nextIndex >= currNodes.size()) {
nextIndex = 0;
}
if (roundRobinIndex.compareAndSet(index, nextIndex)) {
return nextIndex;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void testPrioritySelect() throws Exception
.build()
).lhs;

Assert.assertEquals("coldBroker", brokerName);
Assert.assertEquals("hotBroker", brokerName);
}

@Test
Expand Down

0 comments on commit 1bf253f

Please sign in to comment.