Skip to content

Commit

Permalink
Update emitter library and add support for ParametrizedUriEmitter (ap…
Browse files Browse the repository at this point in the history
…ache#4722)

* Move emitters from io.druid.server.initialization to the dedicated io.druid.server.emitter package; Update emitter library to 0.6.0; Add support for ParametrizedUriEmitter; Support hierarical properties in JsonConfigurator (was needed for ParametrizedUriEmitter)

* Log created RequestLoggers

* Fix forbidden API

* Test fix

* More Http and Parametrized Http Emitter docs

* Switch to debug level
  • Loading branch information
leventov authored Sep 13, 2017
1 parent 4f6eb47 commit 267f415
Show file tree
Hide file tree
Showing 24 changed files with 347 additions and 22 deletions.
40 changes: 39 additions & 1 deletion api/src/main/java/io/druid/guice/JsonConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.validation.Validator;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -93,7 +94,7 @@ public <T> T configurate(Properties props, String propertyPrefix, Class<T> clazz
value = propValue;
}

jsonMap.put(prop.substring(propertyBase.length()), value);
hieraricalPutValue(propertyPrefix, prop, prop.substring(propertyBase.length()), value, jsonMap);
}
}

Expand Down Expand Up @@ -165,6 +166,43 @@ public Message apply(String input)
return config;
}

private static void hieraricalPutValue(
String propertyPrefix,
String originalProperty,
String property,
Object value,
Map<String, Object> targetMap
)
{
int dotIndex = property.indexOf('.');
if (dotIndex < 0) {
targetMap.put(property, value);
return;
}
if (dotIndex == 0) {
throw new ProvisionException(StringUtils.format("Double dot in property: %s", originalProperty));
}
if (dotIndex == property.length() - 1) {
throw new ProvisionException(StringUtils.format("Dot at the end of property: %s", originalProperty));
}
String nestedKey = property.substring(0, dotIndex);
Object nested = targetMap.computeIfAbsent(nestedKey, k -> new HashMap<String, Object>());
if (!(nested instanceof Map)) {
// Clash is possible between properties, which are used to configure different objects: e. g.
// druid.emitter=parametrized is used to configure Emitter class, and druid.emitter.parametrized.xxx=yyy is used
// to configure ParametrizedUriEmitterConfig object. So skipping xxx=yyy key-value pair when configuring Emitter
// doesn't make any difference. That is why we just log this situation, instead of throwing an exception.
log.info(
"Skipping %s property: one of it's prefixes is also used as a property key. Prefix: %s",
originalProperty,
propertyPrefix
);
return;
}
Map<String, Object> nestedMap = (Map<String, Object>) nested;
hieraricalPutValue(propertyPrefix, originalProperty, property.substring(dotIndex + 1), value, nestedMap);
}

@VisibleForTesting
public static <T> void verifyClazzIsConfigurable(ObjectMapper mapper, Class<T> clazz)
{
Expand Down
22 changes: 19 additions & 3 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts

|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter`|Setting this value to "noop", "logging", or "http" will initialize one of the emitter modules. value "composing" can be used to initialize multiple emitter modules. |noop|
|`druid.emitter`|Setting this value to "noop", "logging", "http" or "parametrized" will initialize one of the emitter modules. value "composing" can be used to initialize multiple emitter modules. |noop|

#### Logging Emitter Module

Expand All @@ -216,10 +216,26 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts

|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter.http.timeOut`|The timeout for data reads.|PT5M|
|`druid.emitter.http.readTimeout`|The timeout for data reads.|PT5M|
|`druid.emitter.http.flushMillis`|How often the internal message buffer is flushed (data is sent).|60000|
|`druid.emitter.http.flushCount`|How many messages the internal message buffer can hold before flushing (sending).|500|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none|
|`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification|
|`druid.emitter.http.flushTimeOut|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config|

#### Parametrized Http Emitter Module

`druid.emitter.parametrized.httpEmitting.*` configs correspond to the configs of Http Emitter Modules, see above.
Except `readTimeout` and `recipientBaseUrl`. E. g. `druid.emitter.parametrized.httpEmitting.flushMillis`,
`druid.emitter.parametrized.httpEmitting.flushCount`, etc.

The additional configs are:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter.parametrized.readTimeout`|The timeout for data reads.|PT5M|
|`druid.emitter.parametrized.recipientBaseUrlPattern`|The URL pattern to send an event to, based on the event's feed. E. g. `http://foo.bar/{feed}`, that will send event to `http://foo.bar/metrics` if the event's feed is "metrics".|none, required config|

#### Composing Emitter Module

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.4.5</version>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.storage.derby.DerbyMetadataStorageDruidModule;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.emitter.EmitterModule;
import io.druid.server.initialization.jetty.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import org.apache.commons.io.FileUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.databind.Module;
import com.google.common.base.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.google.common.base.Strings;
import com.google.common.base.Supplier;
Expand Down Expand Up @@ -72,6 +72,7 @@ public void configure(Binder binder)
binder.install(new NoopEmitterModule());
binder.install(new LogEmitterModule());
binder.install(new HttpEmitterModule());
binder.install(new ParametrizedUriEmitterModule());
binder.install(new ComposingEmitterModule());

binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class);
Expand All @@ -87,6 +88,7 @@ public ServiceEmitter getServiceEmitter(@Self Supplier<DruidNode> configSupplier
"version",
Strings.nullToEmpty(version) // Version is null during `mvn test`.
);
log.info("Underlying emitter for ServiceEmitter: %s", emitter);
final ServiceEmitter retVal = new ServiceEmitter(
config.getServiceName(),
config.getHostAndPortToUse(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
Expand All @@ -27,10 +27,10 @@
public class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig
{
@JsonProperty
private Period timeOut = new Period("PT5M");
private Period readTimeout = new Period("PT5M");

public Period getReadTimeout()
{
return timeOut;
return readTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
Expand Down Expand Up @@ -49,6 +49,11 @@ public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.http", HttpEmitterConfig.class);

configureSsl(binder);
}

static void configureSsl(Binder binder)
{
final SSLContext context;
try {
context = SSLContext.getDefault();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.google.inject.Binder;
import com.google.inject.Module;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.emitter;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;

public class ParametrizedUriEmitterConfig extends com.metamx.emitter.core.ParametrizedUriEmitterConfig
{
@JsonProperty
private Period readTimeout = new Period("PT5M");

public Period getReadTimeout()
{
return readTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.emitter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.ParametrizedUriEmitter;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.http.LifecycleUtils;
import io.druid.java.util.common.lifecycle.Lifecycle;

import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;

public class ParametrizedUriEmitterModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.parametrized", ParametrizedUriEmitterConfig.class);
HttpEmitterModule.configureSsl(binder);
}

@Provides
@ManageLifecycle
@Named("parametrized")
public Emitter getEmitter(
Supplier<ParametrizedUriEmitterConfig> config,
@Nullable SSLContext sslContext,
Lifecycle lifecycle,
ObjectMapper jsonMapper
)
{
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(1)
.withReadTimeout(config.get().getReadTimeout().toStandardDuration());
if (sslContext != null) {
builder.withSslContext(sslContext);
}
return new ParametrizedUriEmitter(
config.get(),
HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)),
jsonMapper
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.RequestLogLine;

import javax.validation.constraints.NotNull;
Expand All @@ -35,6 +36,8 @@
@JsonTypeName("composing")
public class ComposingRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(ComposingRequestLoggerProvider.class);

@JsonProperty
@NotNull
private final List<RequestLoggerProvider> loggerProviders = Lists.newArrayList();
Expand All @@ -46,7 +49,9 @@ public RequestLogger get()
for (RequestLoggerProvider loggerProvider : loggerProviders) {
loggers.add(loggerProvider.get());
}
return new ComposingRequestLogger(loggers);
ComposingRequestLogger logger = new ComposingRequestLogger(loggers);
log.debug(new Exception("Stack trace"), "Creating %s at", logger);
return logger;
}

public static class ComposingRequestLogger implements RequestLogger
Expand Down Expand Up @@ -79,6 +84,14 @@ public void log(RequestLogLine requestLogLine) throws IOException
throw Throwables.propagate(exception);
}
}

@Override
public String toString()
{
return "ComposingRequestLogger{" +
"loggers=" + loggers +
'}';
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public void log(final RequestLogLine requestLogLine) throws IOException
emitter.emit(new RequestLogEventBuilder(feed, requestLogLine));
}

@Override
public String toString()
{
return "EmittingRequestLogger{" +
"emitter=" + emitter +
", feed='" + feed + '\'' +
'}';
}

private static class RequestLogEvent implements Event
{
final ImmutableMap<String, String> serviceDimensions;
Expand Down
Loading

0 comments on commit 267f415

Please sign in to comment.