Skip to content

Commit

Permalink
add log msg when event recvr firehose buffer is full (apache#3209)
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshug authored and pjain1 committed Jul 1, 2016
1 parent 8eeae2e commit e1313e4
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -147,6 +146,7 @@ public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiv
private volatile InputRow nextRow = null;
private volatile boolean closed = false;
private final AtomicLong bytesReceived = new AtomicLong(0);
private final AtomicLong lastBufferAddFailMsgTime = new AtomicLong(0);

public EventReceiverFirehose(MapInputRowParser parser)
{
Expand Down Expand Up @@ -298,6 +298,13 @@ public void addRows(Iterable<InputRow> rows) throws InterruptedException
boolean added = false;
while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
if (!added) {
long currTime = System.currentTimeMillis();
long lastTime = lastBufferAddFailMsgTime.get();
if (currTime - lastTime > 10000 && lastBufferAddFailMsgTime.compareAndSet(lastTime, currTime)) {
log.warn("Failed to add event to buffer with current size [%s] . Retrying...", buffer.size());
}
}
}

if (!added) {
Expand Down

0 comments on commit e1313e4

Please sign in to comment.