Skip to content

Commit

Permalink
druid.coordinator.asOverlord.enabled flag at coordinator to make it a…
Browse files Browse the repository at this point in the history
…n overlord too (apache#3711)
  • Loading branch information
himanshug authored and fjy committed Feb 13, 2017
1 parent 9ab9fec commit 8cf7ad1
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 26 deletions.
2 changes: 2 additions & 0 deletions docs/content/configuration/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)|
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|

### Metadata Retrieval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
Expand Down Expand Up @@ -73,18 +74,23 @@ public TaskMaster(
final TaskLockbox taskLockbox,
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode node,
@Self final DruidNode selfNode,
final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory,
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
final ServiceEmitter emitter,
final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager
)
{
this.supervisorManager = supervisorManager;
this.taskActionClientFactory = taskActionClientFactory;

final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode :
selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService());

this.leaderSelector = new LeaderSelector(
curator,
zkPaths.getLeaderLatchPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public OverlordRedirectInfo(TaskMaster taskMaster)
}

@Override
public boolean doLocal()
public boolean doLocal(String requestURI)
{
return taskMaster.isLeading();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testDoLocal()
{
EasyMock.expect(taskMaster.isLeading()).andReturn(true).anyTimes();
EasyMock.replay(taskMaster);
Assert.assertTrue(redirectInfo.doLocal());
Assert.assertTrue(redirectInfo.doLocal(null));
EasyMock.verify(taskMaster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -183,6 +184,7 @@ public void announce(DruidNode node)
announcementLatch.countDown();
}
},
new CoordinatorOverlordServiceConfig(null, null),
serviceEmitter,
supervisorManager,
EasyMock.createNiceMock(OverlordHelperManager.class)
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/io/druid/server/DruidNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public int getPort()
return port;
}

public DruidNode withService(String service)
{
return new DruidNode(service, host, port);
}

/**
* Returns host and port together as something that can be used as part of a URI.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

/**
*/
public class CoordinatorOverlordServiceConfig
{
@JsonProperty
private final boolean enabled;

@JsonProperty
private final String overlordService;

public CoordinatorOverlordServiceConfig(
@JsonProperty("enabled") Boolean enabled,
@JsonProperty("overlordService") String overlordService
)
{
this.enabled = enabled == null ? false : enabled.booleanValue();
this.overlordService = overlordService;

Preconditions.checkArgument((this.enabled && this.overlordService != null) || !this.enabled,
"coordinator is enabled to be overlord but overlordService is not specified");
}

public boolean isEnabled()
{
return enabled;
}

public String getOverlordService()
{
return overlordService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public CoordinatorRedirectInfo(DruidCoordinator coordinator) {
}

@Override
public boolean doLocal()
public boolean doLocal(String requestURI)
{
return coordinator.isLeader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
throw new ServletException("non-HTTP request or response");
}

if (redirectInfo.doLocal()) {
if (redirectInfo.doLocal(request.getRequestURI())) {
chain.doFilter(request, response);
} else {
URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public interface RedirectInfo
{
public boolean doLocal();
public boolean doLocal(String requestURI);

public URL getRedirectURL(String queryString, String requestURI);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testDoLocal()
{
EasyMock.expect(druidCoordinator.isLeader()).andReturn(true).anyTimes();
EasyMock.replay(druidCoordinator);
Assert.assertTrue(coordinatorRedirectInfo.doLocal());
Assert.assertTrue(coordinatorRedirectInfo.doLocal(null));
EasyMock.verify(druidCoordinator);
}

Expand Down
30 changes: 26 additions & 4 deletions services/src/main/java/io/druid/cli/CliCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
Expand Down Expand Up @@ -79,6 +78,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
Expand All @@ -94,6 +94,7 @@ public class CliCoordinator extends ServerRunnable
private static final Logger log = new Logger(CliCoordinator.class);

private Properties properties;
private boolean beOverlord;

public CliCoordinator()
{
Expand All @@ -104,12 +105,19 @@ public CliCoordinator()
public void configure(Properties properties)
{
this.properties = properties;
beOverlord = isOverlord(properties);

if (beOverlord) {
log.info("Coordinator is configured to act as Overlord as well.");
}
}

@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
List<Module> modules = new ArrayList<>();

modules.add(
new Module()
{
@Override
Expand All @@ -131,7 +139,11 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class);

binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
if (beOverlord) {
binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class);
} else {
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
}

binder.bind(MetadataSegmentManager.class)
.toProvider(MetadataSegmentManagerProvider.class)
Expand Down Expand Up @@ -192,7 +204,6 @@ public void configure(Binder binder)
Predicates.equalTo("true"),
DruidCoordinatorSegmentKiller.class
);

}

@Provides
Expand All @@ -211,5 +222,16 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster(
}
}
);

if (beOverlord) {
modules.addAll(new CliOverlord().getModules(false));
}

return modules;
}

public static boolean isOverlord(Properties properties)
{
return Boolean.valueOf(properties.getProperty("druid.coordinator.asOverlord.enabled")).booleanValue();
}
}
31 changes: 22 additions & 9 deletions services/src/main/java/io/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
Expand Down Expand Up @@ -113,18 +114,26 @@ public CliOverlord()

@Override
protected List<? extends Module> getModules()
{
return getModules(true);
}

protected List<? extends Module> getModules(final boolean standalone)
{
return ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant()
.annotatedWith(Names.named("serviceName"))
.to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);

if (standalone) {
binder.bindConstant()
.annotatedWith(Names.named("serviceName"))
.to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);
}

JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);

Expand Down Expand Up @@ -160,14 +169,18 @@ public void configure(Binder binder)
.toProvider(AuditManagerProvider.class)
.in(ManageLifecycle.class);

binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
if (standalone) {
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
}

binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
Jerseys.addResource(binder, OverlordResource.class);
Jerseys.addResource(binder, SupervisorResource.class);

LifecycleModule.register(binder, Server.class);
if (standalone) {
LifecycleModule.register(binder, Server.class);
}
}

private void configureTaskStorage(Binder binder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.resource.ResourceCollection;

import java.util.Properties;

/**
*/
class CoordinatorJettyServerInitializer implements JettyServerInitializer
{
private final DruidCoordinatorConfig config;
private final boolean beOverlord;

@Inject
CoordinatorJettyServerInitializer(DruidCoordinatorConfig config)
CoordinatorJettyServerInitializer(DruidCoordinatorConfig config, Properties properties)
{
this.config = config;
this.beOverlord = CliCoordinator.isOverlord(properties);
}

@Override
Expand All @@ -59,10 +63,19 @@ public void initialize(Server server, Injector injector)

root.addServlet(holderPwd, "/");
if(config.getConsoleStatic() == null) {
ResourceCollection staticResources = new ResourceCollection(
Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static")
);
ResourceCollection staticResources;
if (beOverlord) {
staticResources = new ResourceCollection(
Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static"),
Resource.newClassPathResource("indexer_static")
);
} else {
staticResources = new ResourceCollection(
Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static")
);
}
root.setBaseResource(staticResources);
} else {
// used for console development
Expand All @@ -81,10 +94,15 @@ public void initialize(Server server, Injector injector)
// Can't use '/*' here because of Guice and Jetty static content conflicts
root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null);
if (beOverlord) {
root.addFilter(GuiceFilter.class, "/druid/indexer/*", null);
}
// this will be removed in the next major release
root.addFilter(GuiceFilter.class, "/coordinator/*", null);

root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*");
if (!beOverlord) {
root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*");
}

HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.getJettyRequestLogHandler(), root});
Expand Down
Loading

0 comments on commit 8cf7ad1

Please sign in to comment.