From a85ca47f77e7a84079d8d883c7ea21131adefa0e Mon Sep 17 00:00:00 2001 From: James Byatt Date: Fri, 7 Apr 2017 17:28:46 +0100 Subject: [PATCH] Notify BatchStartAware event handlers of batch size before processing a batch --- .../lmax/disruptor/BatchEventProcessor.java | 10 +++- .../com/lmax/disruptor/BatchStartAware.java | 6 +++ .../disruptor/BatchEventProcessorTest.java | 50 +++++++++++++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/lmax/disruptor/BatchStartAware.java diff --git a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java index 6b1711f73..53ae98572 100644 --- a/src/main/java/com/lmax/disruptor/BatchEventProcessor.java +++ b/src/main/java/com/lmax/disruptor/BatchEventProcessor.java @@ -37,6 +37,7 @@ public final class BatchEventProcessor private final EventHandler eventHandler; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final TimeoutHandler timeoutHandler; + private final BatchStartAware batchStartAware; /** * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when @@ -60,7 +61,10 @@ public BatchEventProcessor( ((SequenceReportingEventHandler) eventHandler).setSequenceCallback(sequence); } - timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null; + batchStartAware = + (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null; + timeoutHandler = + (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null; } @Override @@ -122,6 +126,10 @@ public void run() try { final long availableSequence = sequenceBarrier.waitFor(nextSequence); + if (batchStartAware != null) + { + batchStartAware.onBatchStart(availableSequence - nextSequence + 1); + } while (nextSequence <= availableSequence) { diff --git a/src/main/java/com/lmax/disruptor/BatchStartAware.java b/src/main/java/com/lmax/disruptor/BatchStartAware.java new file mode 100644 index 000000000..5c447e8bc --- /dev/null +++ b/src/main/java/com/lmax/disruptor/BatchStartAware.java @@ -0,0 +1,6 @@ +package com.lmax.disruptor; + +public interface BatchStartAware +{ + void onBatchStart(final long batchSize); +} diff --git a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java index a8d21d694..ee2bee021 100644 --- a/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java +++ b/src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java @@ -18,10 +18,14 @@ import com.lmax.disruptor.support.StubEvent; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static com.lmax.disruptor.RingBuffer.createMultiProducer; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public final class BatchEventProcessorTest @@ -137,4 +141,50 @@ public void onEvent(StubEvent event, long sequence, boolean endOfBatch) throws E } } + @Test + public void reportAccurateBatchSizesAtBatchStartTime() + throws Exception + { + final List batchSizes = new ArrayList(); + final CountDownLatch eventLatch = new CountDownLatch(6); + + final class LoopbackEventHandler + implements EventHandler, BatchStartAware + { + + @Override + public void onBatchStart(long batchSize) + { + batchSizes.add(batchSize); + } + + @Override + public void onEvent(StubEvent event, long sequence, boolean endOfBatch) + throws Exception + { + if (!endOfBatch) + { + ringBuffer.publish(ringBuffer.next()); + } + eventLatch.countDown(); + } + } + + final BatchEventProcessor batchEventProcessor = + new BatchEventProcessor( + ringBuffer, sequenceBarrier, new LoopbackEventHandler()); + + ringBuffer.publish(ringBuffer.next()); + ringBuffer.publish(ringBuffer.next()); + ringBuffer.publish(ringBuffer.next()); + + Thread thread = new Thread(batchEventProcessor); + thread.start(); + eventLatch.await(); + + batchEventProcessor.halt(); + thread.join(); + + assertEquals(Arrays.asList(3L, 2L, 1L), batchSizes); + } }