Skip to content

Commit

Permalink
Add ability to filter segments for specific dataSources on broker wit…
Browse files Browse the repository at this point in the history
…hout creating tiers (apache#2848)

* Add back FilteredServerView removed in a32906c to reduce memory usage using watched tiers.

* Add functionality to specify "druid.broker.segment.watchedDataSources"
  • Loading branch information
nishantmonu51 authored and xvrl committed Apr 19, 2016
1 parent 08c784f commit dbf63f7
Show file tree
Hide file tree
Showing 20 changed files with 609 additions and 54 deletions.
3 changes: 2 additions & 1 deletion docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,6 @@ See [cache configuration](caching.html) for how to configure cache settings.

|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import io.druid.client.DruidServer;
import io.druid.client.FilteredServerInventoryView;
import io.druid.client.ServerView;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
Expand All @@ -30,18 +32,34 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

public class TestServerView implements ServerView.SegmentCallback
public class TestServerView implements FilteredServerInventoryView, ServerView.SegmentCallback
{
final ConcurrentMap<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> callbacks = Maps.newConcurrentMap();
final ConcurrentMap<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> callbacks = Maps.newConcurrentMap();

@Override
public void registerSegmentCallback(
final Executor exec,
final ServerView.SegmentCallback callback,
final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
callbacks.put(callback, Pair.of(filter, exec));
}

@Override
public void registerServerCallback(Executor exec, ServerView.ServerCallback callback)
{
// No-op
}

@Override
public ServerView.CallbackAction segmentAdded(
final DruidServerMetadata server,
final DataSegment segment
)
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(segment)) {
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(Pair.of(server,segment))) {
entry.getValue().rhs.execute(
new Runnable()
{
Expand All @@ -64,8 +82,8 @@ public ServerView.CallbackAction segmentRemoved(
final DataSegment segment
)
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(segment)) {
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> entry : callbacks.entrySet()) {
if (entry.getValue().lhs.apply(Pair.of(server, segment))) {
entry.getValue().rhs.execute(
new Runnable()
{
Expand All @@ -85,7 +103,7 @@ public void run()
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<DataSegment>, Executor>> entry : callbacks.entrySet()) {
for (final Map.Entry<ServerView.SegmentCallback, Pair<Predicate<Pair<DruidServerMetadata, DataSegment>>, Executor>> entry : callbacks.entrySet()) {
entry.getValue().rhs.execute(
new Runnable()
{
Expand All @@ -100,4 +118,16 @@ public void run()

return ServerView.CallbackAction.CONTINUE;
}

@Override
public DruidServer getInventoryValue(String string)
{
return null;
}

@Override
public Iterable<DruidServer> getInventory()
{
return null;
}
}
87 changes: 81 additions & 6 deletions server/src/main/java/io/druid/client/BatchServerInventoryView.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,46 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.ManageLifecycle;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;

import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

/**
*/
@ManageLifecycle
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
implements FilteredServerInventoryView
{
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);

final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
final private ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
.makeMap();
final private Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;

@Inject
public BatchServerInventoryView(
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter
)
{
super(
Expand All @@ -60,6 +73,8 @@ public BatchServerInventoryView(
{
}
);

this.defaultFilter = Preconditions.checkNotNull(defaultFilter);
}

@Override
Expand All @@ -69,30 +84,70 @@ protected DruidServer addInnerInventory(
final Set<DataSegment> inventory
)
{
zNodes.put(inventoryKey, inventory);
for (DataSegment segment : inventory) {
Set<DataSegment> filteredInventory = filterInventory(container, inventory);
zNodes.put(inventoryKey, filteredInventory);
for (DataSegment segment : filteredInventory) {
addSingleInventory(container, segment);
}
return container;
}

private Set<DataSegment> filterInventory(final DruidServer container, Set<DataSegment> inventory)
{
Predicate<Pair<DruidServerMetadata, DataSegment>> predicate = Predicates.or(
defaultFilter,
Predicates.or(segmentPredicates.values())
);

// make a copy of the set and not just a filtered view, in order to not keep all the segment data in memory
Set<DataSegment> filteredInventory = Sets.newHashSet(Iterables.transform(
Iterables.filter(
Iterables.transform(
inventory,
new Function<DataSegment, Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public Pair<DruidServerMetadata, DataSegment> apply(DataSegment input)
{
return Pair.of(container.getMetadata(), input);
}
}
),
predicate
),
new Function<Pair<DruidServerMetadata, DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(
Pair<DruidServerMetadata, DataSegment> input
)
{
return input.rhs;
}
}
));
return filteredInventory;
}

@Override
protected DruidServer updateInnerInventory(
DruidServer container, String inventoryKey, Set<DataSegment> inventory
)
{
Set<DataSegment> filteredInventory = filterInventory(container, inventory);

Set<DataSegment> existing = zNodes.get(inventoryKey);
if (existing == null) {
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
}

for (DataSegment segment : Sets.difference(inventory, existing)) {
for (DataSegment segment : Sets.difference(filteredInventory, existing)) {
addSingleInventory(container, segment);
}
for (DataSegment segment : Sets.difference(existing, inventory)) {
for (DataSegment segment : Sets.difference(existing, filteredInventory)) {
removeSingleInventory(container, segment.getIdentifier());
}
zNodes.put(inventoryKey, inventory);
zNodes.put(inventoryKey, filteredInventory);

return container;
}
Expand All @@ -113,4 +168,24 @@ protected DruidServer removeInnerInventory(final DruidServer container, String i
}
return container;
}

public void registerSegmentCallback(
final Executor exec,
final SegmentCallback callback,
final Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
SegmentCallback filteringCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
segmentPredicates.put(filteringCallback, filter);
registerSegmentCallback(
exec,
filteringCallback
);
}

@Override
protected void segmentCallbackRemoved(SegmentCallback callback)
{
segmentPredicates.remove(callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.metamx.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
Expand All @@ -47,6 +49,11 @@ public class BatchServerInventoryViewProvider implements ServerInventoryViewProv
@Override
public BatchServerInventoryView get()
{
return new BatchServerInventoryView(zkPaths, curator, jsonMapper);
return new BatchServerInventoryView(
zkPaths,
curator,
jsonMapper,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private Set<String> watchedTiers = null;

@JsonProperty
private Set<String> watchedDataSources = null;

public Set<String> getWatchedTiers()
{
return watchedTiers;
}

public Set<String> getWatchedDataSources()
{
return watchedDataSources;
}
}
Loading

0 comments on commit dbf63f7

Please sign in to comment.