Skip to content

Commit

Permalink
rudimentary filtered aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
xvrl committed Oct 21, 2014
1 parent ee392b6 commit 71831e4
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.HistogramAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
Expand Down Expand Up @@ -68,7 +69,8 @@ public AggregatorsModule()
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class)
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class),
@JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class)
})
public static interface AggregatorFactoryMixin
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.query.aggregation;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.segment.DimensionSelector;

import javax.annotation.Nullable;

public class FilteredAggregator implements Aggregator
{
private final String name;
private final DimensionSelector dimSelector;
private final Aggregator delegate;
private final Predicate<String> predicate;

public FilteredAggregator(String name, DimensionSelector dimSelector, Predicate<String> predicate, Aggregator delegate)
{
this.name = name;
this.dimSelector = dimSelector;
this.delegate = delegate;
this.predicate = predicate;
}

@Override
public void aggregate()
{
if (
Iterables.any(
dimSelector.getRow(), new Predicate<Integer>()
{
@Override
public boolean apply(@Nullable Integer input)
{
return predicate.apply(dimSelector.lookupName(input));
}
}
)
) {
delegate.aggregate();
}
}

@Override
public void reset()
{
delegate.reset();
}

@Override
public Object get()
{
return delegate.get();
}

@Override
public float getFloat()
{
return delegate.getFloat();
}

@Override
public String getName()
{
return name;
}

@Override
public void close()
{
delegate.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.query.aggregation;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;

import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;

public class FilteredAggregatorFactory implements AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x9;

private final String name;
private final AggregatorFactory delegate;
private final DimFilter filter;

public FilteredAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("aggregator") AggregatorFactory delegate,
@JsonProperty("filter") DimFilter filter
)
{
Preconditions.checkNotNull(delegate);
Preconditions.checkNotNull(filter);
Preconditions.checkArgument(filter instanceof SelectorDimFilter, "Filtered Aggregator only supports ");

this.name = name;
this.delegate = delegate;
this.filter = filter;
}

@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final Aggregator aggregator = delegate.factorize(metricFactory);
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(((SelectorDimFilter)filter).getDimension());
Predicate<String> predicate = Predicates.equalTo(((SelectorDimFilter)filter).getValue());
return new FilteredAggregator(name, dimSelector, predicate, aggregator);
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory);
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(((SelectorDimFilter)filter).getDimension());
Predicate<String> predicate = Predicates.equalTo(((SelectorDimFilter)filter).getValue());
return new FilteredBufferAggregator(dimSelector, predicate, aggregator);
}

@Override
public Comparator getComparator()
{
return delegate.getComparator();
}

@Override
public Object combine(Object lhs, Object rhs)
{
return delegate.combine(lhs, rhs);
}

@Override
public AggregatorFactory getCombiningFactory()
{
return delegate.getCombiningFactory();
}

@Override
public Object deserialize(Object object)
{
return delegate.deserialize(object);
}

@Override
public Object finalizeComputation(Object object)
{
return delegate.finalizeComputation(object);
}

@JsonProperty
@Override
public String getName()
{
return name;
}

@Override
public List<String> requiredFields()
{
return delegate.requiredFields();
}

@Override
public byte[] getCacheKey()
{
byte[] filterCacheKey = filter.getCacheKey();
byte[] aggregatorCacheKey = delegate.getCacheKey();
return ByteBuffer.allocate(1 + filterCacheKey.length + aggregatorCacheKey.length)
.put(CACHE_TYPE_ID)
.put(filterCacheKey)
.put(aggregatorCacheKey)
.array();
}

@Override
public String getTypeName()
{
return delegate.getTypeName();
}

@Override
public int getMaxIntermediateSize()
{
return delegate.getMaxIntermediateSize();
}

@Override
public Object getAggregatorStartValue()
{
return delegate.getAggregatorStartValue();
}

@JsonProperty
public AggregatorFactory getAggregator()
{
return delegate;
}

@JsonProperty
public DimFilter getFilter()
{
return filter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.query.aggregation;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.segment.DimensionSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

public class FilteredBufferAggregator implements BufferAggregator
{
private final DimensionSelector dimSelector;
private final Predicate<String> predicate;
private final BufferAggregator delegate;

public FilteredBufferAggregator(DimensionSelector dimSelector, Predicate<String> predicate, BufferAggregator delegate)
{
this.dimSelector = dimSelector;
this.predicate = predicate;
this.delegate = delegate;
}

@Override
public void init(ByteBuffer buf, int position)
{
delegate.init(buf, position);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
if (
Iterables.any(
dimSelector.getRow(), new Predicate<Integer>()
{
@Override
public boolean apply(@Nullable Integer input)
{
return predicate.apply(dimSelector.lookupName(input));
}
}
)
) {
delegate.aggregate(buf, position);
}
}

@Override
public Object get(ByteBuffer buf, int position)
{
return delegate.get(buf, position);
}

@Override
public float getFloat(ByteBuffer buf, int position)
{
return delegate.getFloat(buf, position);
}

@Override
public void close()
{
delegate.close();
}
}

0 comments on commit 71831e4

Please sign in to comment.