Skip to content

Commit

Permalink
javacript tiered broker selector strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
xvrl committed Oct 20, 2014
1 parent ca978d6 commit 38cb73e
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.server.router;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Optional;
import io.druid.query.Query;

import javax.script.Compilable;
import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;

public class JavascriptTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
private final SelectorFunction function;

public JavascriptTieredBrokerSelectorStrategy(@JsonProperty("function") String function)
{
final ScriptEngine engine = new ScriptEngineManager().getEngineByName("javascript");
try {
((Compilable)engine).compile("var apply = " + function).eval();
} catch(ScriptException e) {
Throwables.propagate(e);
}
this.function = ((Invocable)engine).getInterface(SelectorFunction.class);
}

@Override
public Optional<String> getBrokerServiceName(
TieredBrokerConfig config, Query query
)
{
return Optional.fromNullable(function.apply(config, query));
}

private static interface SelectorFunction
{
public String apply(TieredBrokerConfig config, Query query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class),
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class)
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class),
@JsonSubTypes.Type(name = "javascript", value = JavascriptTieredBrokerSelectorStrategy.class)
})

public interface TieredBrokerSelectorStrategy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.server.router;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import io.druid.query.Druids;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;

import java.util.LinkedHashMap;

public class JavascriptTieredBrokerSelectorStrategyTest
{

@Test
public void testGetBrokerServiceName() throws Exception
{
TieredBrokerSelectorStrategy jsStrategy = new JavascriptTieredBrokerSelectorStrategy(
"function (config, query) { if (config.getTierToBrokerMap().values().size() > 0 && query.getAggregatorSpecs && query.getAggregatorSpecs().size() <= 2) { return config.getTierToBrokerMap().values().toArray()[0] } else { return config.getDefaultBrokerServiceName() } }"
);

final LinkedHashMap<String, String> tierBrokerMap = new LinkedHashMap<>();
tierBrokerMap.put("fast", "druid/fastBroker");
tierBrokerMap.put("slow", "druid/broker");

final TieredBrokerConfig tieredBrokerConfig = new TieredBrokerConfig()
{
@Override
public String getDefaultBrokerServiceName()
{
return "druid/broker";
}

@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
{
return tierBrokerMap;
}
};

final Druids.TimeseriesQueryBuilder queryBuilder = Druids.newTimeseriesQueryBuilder().dataSource("test")
.intervals("2014/2015")
.aggregators(
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("count")
)
);

Assert.assertEquals(
Optional.of("druid/fastBroker"),
jsStrategy.getBrokerServiceName(
tieredBrokerConfig,
queryBuilder.build()
)
);


Assert.assertEquals(
Optional.of("druid/broker"),
jsStrategy.getBrokerServiceName(
tieredBrokerConfig,
Druids.newTimeBoundaryQueryBuilder().dataSource("test").bound("maxTime").build()
)
);

Assert.assertEquals(
Optional.of("druid/broker"),
jsStrategy.getBrokerServiceName(
tieredBrokerConfig,
queryBuilder.aggregators(
ImmutableList.of(
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("longSum", "a"),
new DoubleSumAggregatorFactory("doubleSum", "b")
)
).build()
)
);

}
}

0 comments on commit 38cb73e

Please sign in to comment.