Skip to content

Commit

Permalink
fix the deadlock of TestFileBlockingQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
metacret committed Apr 7, 2015
1 parent 5b95e58 commit caecfd7
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,43 +138,51 @@ public void testMultithreaded() throws IOException, InterruptedException {
final FileBlockingQueue<Message> queue = new FileBlockingQueue<Message>(
tempDir.newFolder().getAbsolutePath(), "default", 3600, new MessageSerDe());

final int messagecount = 100;
final int threadCount = 10;
ExecutorService executors = Executors.newFixedThreadPool(threadCount);
final CountDownLatch latch = new CountDownLatch(threadCount + 1);
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch producerStartLatch = new CountDownLatch(1);
final CountDownLatch consumerStartLatch = new CountDownLatch(1);
for (int i = 0; i < threadCount; ++i) {
executors.execute(new Runnable() {

@Override
public void run() {
try {
startLatch.await();
producerStartLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 10000; ++j) {
for (int j = 0; j < messagecount; ++j) {
String str = generateRandomString();
Message msg = new Message("testRoutingKey", str.getBytes());
queue.offer(msg);
}

latch.countDown();
consumerStartLatch.countDown();
}
});
}
executors.execute(new Runnable() {

@Override
public void run() {
for (int i = 0; i < threadCount * 10000; ++i) {
try {
consumerStartLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < threadCount * messagecount; ++i) {
Message msg = queue.poll();
assertEquals(msg.getRoutingKey(), "testRoutingKey");
assertEquals(new String(msg.getPayload()).length(), 4096);
}
latch.countDown();
}
});
startLatch.countDown();
producerStartLatch.countDown();
latch.await();
}

Expand Down

0 comments on commit caecfd7

Please sign in to comment.