Skip to content

Commit

Permalink
fix emit wait time (apache#2869)
Browse files Browse the repository at this point in the history
  • Loading branch information
b-slim authored and fjy committed Apr 27, 2016
1 parent 5658bd9 commit 58510d8
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 38 deletions.
4 changes: 3 additions & 1 deletion docs/content/development/extensions-contrib/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ All the configuration parameters for graphite emitter are under `druid.emitter.g
|`druid.emitter.graphite.flushPeriod` | Queue flushing period in milliseconds. |no|1 minute|
|`druid.emitter.graphite.maxQueueSize`| Maximum size of the queue used to buffer events. |no|`MAX_INT`|
|`druid.emitter.graphite.alertEmitters`| List of emitters where alerts will be forwarded to. |no| empty list (no forwarding)|

|`druid.emitter.graphite.emitWaitTime` | wait time in milliseconds to try to send the event otherwise emitter will throwing event. |no|0|
|`druid.emitter.graphite.waitForEventTime` | waiting time in milliseconds if necessary for an event to become available. |no|1000 (1 sec)|

### Druid to Graphite Event Converter

Graphite Event Converter defines a mapping between druid metrics name plus dimensions to a Graphite metric path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package io.druid.emitter.graphite;

import com.codahale.metrics.graphite.PickledGraphite;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
Expand All @@ -30,7 +28,6 @@
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceMetricEvent;

import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -41,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;


Expand All @@ -52,22 +50,20 @@ public class GraphiteEmitter implements Emitter
private final GraphiteEmitterConfig graphiteEmitterConfig;
private final List<Emitter> emitterList;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ObjectMapper mapper;
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
private final static long DEFAULT_PUT_GET_TIMEOUT = 1000; // default wait for put/get operations on the queue 1 sec
private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("GraphiteEmitter-%s")
.build()); // Thread pool of two in order to schedule flush runnable
private AtomicLong countLostEvents = new AtomicLong(0);

public GraphiteEmitter(
@NotNull GraphiteEmitterConfig graphiteEmitterConfig,
List<Emitter> emitterList, @NotNull ObjectMapper mapper
GraphiteEmitterConfig graphiteEmitterConfig,
List<Emitter> emitterList
)
{
this.emitterList = emitterList;
this.mapper = mapper;
this.graphiteEmitterConfig = graphiteEmitterConfig;
this.graphiteEventConverter = graphiteEmitterConfig.getDruidToGraphiteEventConverter();
this.eventsQueue = new LinkedBlockingQueue(graphiteEmitterConfig.getMaxQueueSize());
Expand Down Expand Up @@ -103,22 +99,25 @@ public void emit(Event event)
return;
}
try {
final boolean isSuccessful = eventsQueue.offer(graphiteEvent, DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS);
final boolean isSuccessful = eventsQueue.offer(
graphiteEvent,
graphiteEmitterConfig.getEmitWaitTime(),
TimeUnit.MILLISECONDS
);
if (!isSuccessful) {
log.error(
"Throwing event [%s] on the floor Graphite queue is full please increase the capacity or/and the consumer frequency",
mapper.writeValueAsString(event)
);
if (countLostEvents.getAndIncrement() % 1000 == 0) {
log.error(
"Lost total of [%s] events because of emitter queue is full. Please increase the capacity or/and the consumer frequency",
countLostEvents.get()
);
}
}
}
catch (InterruptedException e) {
log.error(e, "got interrupted with message [%s]", e.getMessage());
Thread.currentThread().interrupt();

}
catch (JsonProcessingException e) {
log.error(e, e.getMessage());
}
} else if (!emitterList.isEmpty() && event instanceof AlertEvent) {
for (Emitter emitter : emitterList) {
emitter.emit(event);
Expand Down Expand Up @@ -147,7 +146,10 @@ public void run()
}
while (eventsQueue.size() > 0 && !exec.isShutdown()) {
try {
final GraphiteEvent graphiteEvent = eventsQueue.poll(DEFAULT_PUT_GET_TIMEOUT, TimeUnit.MILLISECONDS);
final GraphiteEvent graphiteEvent = eventsQueue.poll(
graphiteEmitterConfig.getWaitForEventTime(),
TimeUnit.MILLISECONDS
);
if (graphiteEvent != null) {
log.debug(
"sent [%s] with value [%s] and time [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class GraphiteEmitterConfig
{
private final static int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
private final static long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec

@JsonProperty
final private String hostname;
Expand All @@ -48,6 +49,13 @@ public class GraphiteEmitterConfig
@JsonProperty
final private List<String> alertEmitters;

@JsonProperty
final private Long emitWaitTime;

//waiting up to the specified wait time if necessary for an event to become available.
@JsonProperty
final private Long waitForEventTime;

@Override
public boolean equals(Object o)
{
Expand All @@ -66,40 +74,42 @@ public boolean equals(Object o)
if (getBatchSize() != that.getBatchSize()) {
return false;
}
if (getHostname() != null ? !getHostname().equals(that.getHostname()) : that.getHostname() != null) {
if (!getHostname().equals(that.getHostname())) {
return false;
}
if (!getFlushPeriod().equals(that.getFlushPeriod())) {
return false;
}
if (!getMaxQueueSize().equals(that.getMaxQueueSize())) {
return false;
}
if (getFlushPeriod() != null ? !getFlushPeriod().equals(that.getFlushPeriod()) : that.getFlushPeriod() != null) {
if (!getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())) {
return false;
}
if (getMaxQueueSize() != null
? !getMaxQueueSize().equals(that.getMaxQueueSize())
: that.getMaxQueueSize() != null) {
if (getAlertEmitters() != null
? !getAlertEmitters().equals(that.getAlertEmitters())
: that.getAlertEmitters() != null) {
return false;
}
if (getDruidToGraphiteEventConverter() != null
? !getDruidToGraphiteEventConverter().equals(that.getDruidToGraphiteEventConverter())
: that.getDruidToGraphiteEventConverter() != null) {
if (!getEmitWaitTime().equals(that.getEmitWaitTime())) {
return false;
}
return !(getAlertEmitters() != null
? !getAlertEmitters().equals(that.getAlertEmitters())
: that.getAlertEmitters() != null);
return getWaitForEventTime().equals(that.getWaitForEventTime());

}

@Override
public int hashCode()
{
int result = getHostname() != null ? getHostname().hashCode() : 0;
int result = getHostname().hashCode();
result = 31 * result + getPort();
result = 31 * result + getBatchSize();
result = 31 * result + (getFlushPeriod() != null ? getFlushPeriod().hashCode() : 0);
result = 31 * result + (getMaxQueueSize() != null ? getMaxQueueSize().hashCode() : 0);
result = 31 * result + (getDruidToGraphiteEventConverter() != null
? getDruidToGraphiteEventConverter().hashCode()
: 0);
result = 31 * result + getFlushPeriod().hashCode();
result = 31 * result + getMaxQueueSize().hashCode();
result = 31 * result + getDruidToGraphiteEventConverter().hashCode();
result = 31 * result + (getAlertEmitters() != null ? getAlertEmitters().hashCode() : 0);
result = 31 * result + getEmitWaitTime().hashCode();
result = 31 * result + getWaitForEventTime().hashCode();
return result;
}

Expand All @@ -111,9 +121,13 @@ public GraphiteEmitterConfig(
@JsonProperty("flushPeriod") Long flushPeriod,
@JsonProperty("maxQueueSize") Integer maxQueueSize,
@JsonProperty("eventConverter") DruidToGraphiteEventConverter druidToGraphiteEventConverter,
@JsonProperty("alertEmitters") List<String> alertEmitters
@JsonProperty("alertEmitters") List<String> alertEmitters,
@JsonProperty("emitWaitTime") Long emitWaitTime,
@JsonProperty("waitForEventTime") Long waitForEventTime
)
{
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.<String>emptyList() : alertEmitters;
this.druidToGraphiteEventConverter = Preconditions.checkNotNull(
druidToGraphiteEventConverter,
Expand Down Expand Up @@ -167,4 +181,16 @@ public List<String> getAlertEmitters()
{
return alertEmitters;
}

@JsonProperty
public Long getEmitWaitTime()
{
return emitWaitTime;
}

@JsonProperty
public Long getWaitForEventTime()
{
return waitForEventTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public Emitter apply(String s)
}
}
);
return new GraphiteEmitter(graphiteEmitterConfig, emitters, mapper);
return new GraphiteEmitter(graphiteEmitterConfig, emitters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public void testSerDeserGraphiteEmitterConfig() throws IOException
1000L,
100,
new SendAllGraphiteEventConverter("prefix", true, true),
Collections.EMPTY_LIST
Collections.EMPTY_LIST,
null,
null
);
String graphiteEmitterConfigString = mapper.writeValueAsString(graphiteEmitterConfig);
GraphiteEmitterConfig graphiteEmitterConfigExpected = mapper.reader(GraphiteEmitterConfig.class).readValue(
Expand Down

0 comments on commit 58510d8

Please sign in to comment.