Skip to content

Commit

Permalink
Update ExchangerPipeLineManualTest.java (eugenp#9326)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored May 20, 2020
1 parent d0d0fe7 commit 1bf622a
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ public void givenData_whenPassedThrough_thenCorrect() throws InterruptedExceptio

Runnable processor = () -> {
Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writterBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
while (true) {
writterBuffer.add(processorBuffer.poll());
writerBuffer.add(processorBuffer.poll());
if (processorBuffer.isEmpty()) {
try {
processorBuffer = readerExchanger.exchange(processorBuffer);
writterBuffer = writerExchanger.exchange(writterBuffer);
writerBuffer = writerExchanger.exchange(writerBuffer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand All @@ -62,13 +62,13 @@ public void givenData_whenPassedThrough_thenCorrect() throws InterruptedExceptio
};

Runnable writer = () -> {
Queue<String> writterBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
try {
writterBuffer = writerExchanger.exchange(writterBuffer);
writerBuffer = writerExchanger.exchange(writerBuffer);
while (true) {
System.out.println(writterBuffer.poll());
if (writterBuffer.isEmpty()) {
writterBuffer = writerExchanger.exchange(writterBuffer);
System.out.println(writerBuffer.poll());
if (writerBuffer.isEmpty()) {
writerBuffer = writerExchanger.exchange(writerBuffer);
}
}
} catch (InterruptedException e) {
Expand Down

0 comments on commit 1bf622a

Please sign in to comment.