Skip to content

Commit

Permalink
Add namespace extraction thread config (apache#4833)
Browse files Browse the repository at this point in the history
  • Loading branch information
leventov authored and drcrallen committed Sep 25, 2017
1 parent 07446ef commit b56a907
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ setting namespaces (broker, peon, historical)
|Property|Description|Default|
|--------|-----------|-------|
|`druid.lookup.namespace.cache.type`|Specifies the type of caching to be used by the namespaces. May be one of [`offHeap`, `onHeap`]. `offHeap` uses a temporary file for off-heap storage of the namespace (memory mapped files). `onHeap` stores all cache on the heap in standard java map types.|`onHeap`|
|`druid.lookup.namespace.numExtractionThreads`|The number of threads in the thread pool dedicated for lookup extraction and updates. This number may need to be scaled up, if you have a lot of lookups and they take long time to extract, to avoid timeouts.|2|

The cache is populated in different ways depending on the settings below. In general, most namespaces employ
a `pollPeriod` at the end of which time they poll the remote resource of interest for updates.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.lookup.namespace;

import com.fasterxml.jackson.annotation.JsonProperty;

public class NamespaceExtractionConfig
{
/**
* The default value of two is chosen because the overhead of having an extra idle thread of the minimum priority is
* very low, but having more than one thread may save when one namespace extraction is stuck or taking too long time,
* so all the others won't queue up and timeout.
*/
@JsonProperty
private int numExtractionThreads = 2;

public int getNumExtractionThreads()
{
return numExtractionThreads;
}

public void setNumExtractionThreads(int numExtractionThreads)
{
this.numExtractionThreads = numExtractionThreads;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.initialization.DruidModule;
Expand Down Expand Up @@ -77,6 +78,8 @@ public static MapBinder<Class<? extends ExtractionNamespace>, CacheGenerator<?>>
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.lookup.namespace", NamespaceExtractionConfig.class);

PolyBind
.createChoiceWithDefault(binder, TYPE_PREFIX, Key.get(NamespaceExtractionCacheManager.class), "onHeap")
.in(LazySingleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.druid.java.util.common.concurrent.ExecutorServices;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -45,10 +46,14 @@ public abstract class NamespaceExtractionCacheManager

private final ScheduledThreadPoolExecutor scheduledExecutorService;

public NamespaceExtractionCacheManager(final Lifecycle lifecycle, final ServiceEmitter serviceEmitter)
public NamespaceExtractionCacheManager(
final Lifecycle lifecycle,
final ServiceEmitter serviceEmitter,
final NamespaceExtractionConfig config
)
{
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
1,
config.getNumExtractionThreads(),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("NamespaceExtractionCacheManager-%d")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;
Expand Down Expand Up @@ -132,9 +133,13 @@ private MapDbCacheDisposerAndCleaner(MapDbCacheDisposer cacheDisposer, Cleaner c
private AtomicInteger cacheCount = new AtomicInteger(0);

@Inject
public OffHeapNamespaceExtractionCacheManager(Lifecycle lifecycle, ServiceEmitter serviceEmitter)
public OffHeapNamespaceExtractionCacheManager(
Lifecycle lifecycle,
ServiceEmitter serviceEmitter,
NamespaceExtractionConfig config
)
{
super(lifecycle, serviceEmitter);
super(lifecycle, serviceEmitter, config);
try {
tmpFile = File.createTempFile("druidMapDB", getClass().getCanonicalName());
log.info("Using file [%s] for mapDB off heap namespace cache", tmpFile.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;

import java.lang.ref.WeakReference;
import java.util.Collections;
Expand Down Expand Up @@ -55,9 +56,13 @@ public class OnHeapNamespaceExtractionCacheManager extends NamespaceExtractionCa
);

@Inject
public OnHeapNamespaceExtractionCacheManager(Lifecycle lifecycle, ServiceEmitter serviceEmitter)
public OnHeapNamespaceExtractionCacheManager(
Lifecycle lifecycle,
ServiceEmitter serviceEmitter,
NamespaceExtractionConfig config
)
{
super(lifecycle, serviceEmitter);
super(lifecycle, serviceEmitter, config);
}

private void expungeCollectedCaches()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ JdbcExtractionNamespace.class, new JdbcCacheGenerator()
scheduler = new CacheScheduler(
noopServiceEmitter,
factoryMap,
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter)
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter, new NamespaceExtractionConfig())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setup() throws Exception
scheduler = new CacheScheduler(
noopServiceEmitter,
Collections.<Class<? extends ExtractionNamespace>, CacheGenerator<?>>emptyMap(),
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter)
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter, new NamespaceExtractionConfig())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,23 @@ public void close() throws IOException
@Override
public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
{
return new OnHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
return new OnHeapNamespaceExtractionCacheManager(
lifecycle,
new NoopServiceEmitter(),
new NamespaceExtractionConfig()
);
}
},
new Function<Lifecycle, NamespaceExtractionCacheManager>()
{
@Override
public NamespaceExtractionCacheManager apply(Lifecycle lifecycle)
{
return new OffHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
return new OffHeapNamespaceExtractionCacheManager(
lifecycle,
new NoopServiceEmitter(),
new NamespaceExtractionConfig()
);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.druid.query.lookup.namespace.ExtractionNamespace;
import io.druid.query.lookup.namespace.UriExtractionNamespace;
import io.druid.query.lookup.namespace.UriExtractionNamespaceTest;
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -76,7 +77,11 @@ public class CacheSchedulerTest
@Override
public NamespaceExtractionCacheManager apply(@Nullable Lifecycle lifecycle)
{
return new OnHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
return new OnHeapNamespaceExtractionCacheManager(
lifecycle,
new NoopServiceEmitter(),
new NamespaceExtractionConfig()
);
}
};
public static final Function<Lifecycle, NamespaceExtractionCacheManager> CREATE_OFF_HEAP_CACHE_MANAGER =
Expand All @@ -86,7 +91,11 @@ public NamespaceExtractionCacheManager apply(@Nullable Lifecycle lifecycle)
@Override
public NamespaceExtractionCacheManager apply(@Nullable Lifecycle lifecycle)
{
return new OffHeapNamespaceExtractionCacheManager(lifecycle, new NoopServiceEmitter());
return new OffHeapNamespaceExtractionCacheManager(
lifecycle,
new NoopServiceEmitter(),
new NamespaceExtractionConfig()
);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.druid.query.lookup.namespace.ExtractionNamespace;
import io.druid.query.lookup.namespace.JdbcExtractionNamespace;
import io.druid.server.lookup.namespace.JdbcCacheGenerator;
import io.druid.server.lookup.namespace.NamespaceExtractionConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -224,7 +225,7 @@ public CacheScheduler.VersionedCache generateCache(
}
}
),
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter)
new OnHeapNamespaceExtractionCacheManager(lifecycle, noopServiceEmitter, new NamespaceExtractionConfig())
);
try {
lifecycle.start();
Expand Down

0 comments on commit b56a907

Please sign in to comment.