Skip to content

Commit

Permalink
Merge branch 'grumpyjames-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Barker committed Apr 10, 2017
2 parents 2687e16 + a85ca47 commit 534f06c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
10 changes: 9 additions & 1 deletion src/main/java/com/lmax/disruptor/BatchEventProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class BatchEventProcessor<T>
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware;

/**
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
Expand All @@ -60,7 +61,10 @@ public BatchEventProcessor(
((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
}

timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
batchStartAware =
(eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
timeoutHandler =
(eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}

@Override
Expand Down Expand Up @@ -122,6 +126,10 @@ public void run()
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}

while (nextSequence <= availableSequence)
{
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/lmax/disruptor/BatchStartAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.lmax.disruptor;

public interface BatchStartAware
{
void onBatchStart(final long batchSize);
}
50 changes: 50 additions & 0 deletions src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import com.lmax.disruptor.support.StubEvent;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static com.lmax.disruptor.RingBuffer.createMultiProducer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public final class BatchEventProcessorTest
Expand Down Expand Up @@ -137,4 +141,50 @@ public void onEvent(StubEvent event, long sequence, boolean endOfBatch) throws E
}
}

@Test
public void reportAccurateBatchSizesAtBatchStartTime()
throws Exception
{
final List<Long> batchSizes = new ArrayList<Long>();
final CountDownLatch eventLatch = new CountDownLatch(6);

final class LoopbackEventHandler
implements EventHandler<StubEvent>, BatchStartAware
{

@Override
public void onBatchStart(long batchSize)
{
batchSizes.add(batchSize);
}

@Override
public void onEvent(StubEvent event, long sequence, boolean endOfBatch)
throws Exception
{
if (!endOfBatch)
{
ringBuffer.publish(ringBuffer.next());
}
eventLatch.countDown();
}
}

final BatchEventProcessor<StubEvent> batchEventProcessor =
new BatchEventProcessor<StubEvent>(
ringBuffer, sequenceBarrier, new LoopbackEventHandler());

ringBuffer.publish(ringBuffer.next());
ringBuffer.publish(ringBuffer.next());
ringBuffer.publish(ringBuffer.next());

Thread thread = new Thread(batchEventProcessor);
thread.start();
eventLatch.await();

batchEventProcessor.halt();
thread.join();

assertEquals(Arrays.asList(3L, 2L, 1L), batchSizes);
}
}

0 comments on commit 534f06c

Please sign in to comment.