diff --git a/AtomicIntegerDemo.java b/AtomicIntegerDemo.java index 4e36f92..13ad440 100644 --- a/AtomicIntegerDemo.java +++ b/AtomicIntegerDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -23,7 +21,7 @@ public void run() { } } - public static void main(String args[]) throws InterruptedException { + public static void main(String[] args) throws InterruptedException { List list = new ArrayList<>(); for (int i = 0; i < 2; i++) { diff --git a/BlockingQueueDemo.java b/BlockingQueueDemo.java index d996dca..88947f0 100644 --- a/BlockingQueueDemo.java +++ b/BlockingQueueDemo.java @@ -1,13 +1,10 @@ -package main.concurrency; - import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Simple sample of Producer/Consumer using ArrayBlockingQueue (ThreadSafe) - * - One Producer adds items in a pipeline in a frequency of 100ms + * - One Producer adds items in a pipeline in a frequency of 200ms * - The pipeline has a limit of 5 items - * - When the pipeline is full, the Producer will wait for 500ms * - Two consumers take items from the pipeline and consumes it in 500ms */ public class BlockingQueueDemo { @@ -16,10 +13,10 @@ public class BlockingQueueDemo { private static final int CONSUMERS = 2; private static final int CAPACITY = 5; - static class Producer extends Thread { - BlockingQueue pipeline; + private static class Producer extends Thread { + private final BlockingQueue pipeline; - Producer(BlockingQueue pipeline) { + Producer(BlockingQueue pipeline) { this. pipeline = pipeline; } @@ -27,17 +24,12 @@ public void run() { int nItems = 1; while (nItems <= 20) { try { - if (pipeline.remainingCapacity() > 0) { - String item = "item" + nItems; - pipeline.add(item); - String capacity = String.format(" [%d/%d]", pipeline.size(), CAPACITY); - System.out.println("Producer is adding " + item + capacity ); - nItems++; - Thread.sleep(100); - } else { - System.out.println("Producer queue is full"); - Thread.sleep(500); - } + String item = "item" + nItems; + pipeline.offer(item); + String capacity = String.format(" [%d/%d]", pipeline.size(), CAPACITY); + System.out.println("Producer is adding " + item + capacity ); + nItems++; + Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } @@ -59,11 +51,11 @@ public void run() { } } - static class Consumer extends Thread { - BlockingQueue pipeline; - String name; + private static class Consumer extends Thread { + private final BlockingQueue pipeline; + private final String name; - Consumer(String name, BlockingQueue pipeline) { + Consumer(String name, BlockingQueue pipeline) { this.name = name; this. pipeline = pipeline; } @@ -71,7 +63,7 @@ static class Consumer extends Thread { public void run() { while (true) { try { - String item = (String)pipeline.take(); + String item = pipeline.take(); if (item.equals(STOP)) { break; } @@ -85,7 +77,7 @@ public void run() { } public static void main(String[] args) throws InterruptedException { - BlockingQueue pipeline = new ArrayBlockingQueue(CAPACITY); + BlockingQueue pipeline = new ArrayBlockingQueue<>(CAPACITY); new Producer(pipeline).start(); Thread.sleep(100); for (int i = 0; i < CONSUMERS; i++) { diff --git a/ConditionVariableDemo.java b/ConditionVariableDemo.java index 82b7ebc..cee4f14 100644 --- a/ConditionVariableDemo.java +++ b/ConditionVariableDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -8,41 +6,40 @@ * Alternate 3 threads while consuming of a share resource using Condition variable */ -class Consumer extends Thread { - +public class ConditionVariableDemo { private static int items = 10; - private static Lock lock = new ReentrantLock(); - private static Condition alternate = lock.newCondition(); + private static final Lock lock = new ReentrantLock(); + private static final Condition alternate = lock.newCondition(); - private int id; + static class Consumer extends Thread { + private final int id; - Consumer(int id) { - this.id = id; - } + Consumer(int id) { + this.id = id; + } - public void run() { - while (items > 0) { - lock.lock(); - try { - while (items % 3 != id && items > 0) { - alternate.await(); - } - if (items > 0) { - System.out.println("Consumer " + id + " took the item " + items); - items--; - alternate.signalAll(); + public void run() { + while (items > 0) { + lock.lock(); + try { + while (items % 3 != id && items > 0) { + alternate.await(); + } + if (items > 0) { + System.out.println("Consumer " + id + " took the item " + items); + items--; + alternate.signalAll(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); } - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - lock.unlock(); } } } -} -public class ConditionVariableDemo { - public static void main(String args[]) { + public static void main(String[] args) { for (int i = 0; i <= 2; i++) { new Consumer(i).start(); } diff --git a/CountDownLatchDemo.java b/CountDownLatchDemo.java index 6d24605..b2b4369 100644 --- a/CountDownLatchDemo.java +++ b/CountDownLatchDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; @@ -13,15 +11,13 @@ * [1] Releases when a count value reaches zero */ public class CountDownLatchDemo { + public static int MAX_NUM_THREADS = 10; + public static int sum = 0; + private static final Lock lock = new ReentrantLock(); + private static final CountDownLatch barrier = new CountDownLatch(MAX_NUM_THREADS/2); private static class Counter extends Thread { - - public static int MAX_NUM_THREADS = 10; - public static int sum = 0; - private static Lock lock = new ReentrantLock(); - private static CountDownLatch barrier = new CountDownLatch(MAX_NUM_THREADS/2); - - private String name; + private final String name; Counter(String name) { this.name = name; @@ -52,9 +48,9 @@ public void run() { } } - public static void main(String args[]) throws InterruptedException { + public static void main(String[] args) throws InterruptedException { List list = new ArrayList<>(); - for (int i = 0; i < Counter.MAX_NUM_THREADS/2 ; i++) { + for (int i = 0; i < MAX_NUM_THREADS / 2 ; i++) { list.add(new Counter("Adder-" + i)); list.add(new Counter("Multiplier-" + i)); } @@ -64,6 +60,6 @@ public static void main(String args[]) throws InterruptedException { for (Counter c : list) { c.join(); } - System.out.println("Final value and is " + Counter.sum); + System.out.println("Final value and is " + sum); } } diff --git a/CountDownLatchDemo2.java b/CountDownLatchDemo2.java index 7c007cf..eb2cf82 100644 --- a/CountDownLatchDemo2.java +++ b/CountDownLatchDemo2.java @@ -18,14 +18,32 @@ */ public class CountDownLatchDemo2 { private static final int NUM_TASKS = 10; - private static AtomicInteger counter = new AtomicInteger(0); + private static final AtomicInteger counter = new AtomicInteger(0); - public static void main(String args[]) { + private static class MyTask implements Callable { + private final CountDownLatch latch; + + MyTask(CountDownLatch latch) { + this.latch = latch; + } + + public String call() throws Exception { + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + counter.getAndIncrement(); + } finally { + latch.countDown(); + } + return String.format("%s | counter %d", Instant.now(), counter.get()); + } + } + + public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(NUM_TASKS); List tasks = new ArrayList<>(); for (int i = 0; i < NUM_TASKS; i++) { - tasks.add(new MyTask(latch)); + tasks.add(new CountDownLatchDemo2.MyTask(latch)); } ExecutorService pool = Executors.newFixedThreadPool(NUM_TASKS); @@ -46,22 +64,4 @@ public static void main(String args[]) { pool.shutdown(); } } - - private static class MyTask implements Callable { - private CountDownLatch latch; - - MyTask(CountDownLatch latch) { - this.latch = latch; - } - - public String call() throws Exception { - try { - Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); - counter.getAndIncrement(); - } finally { - latch.countDown(); - } - return String.format("%s | counter %d", Instant.now(), counter.get()); - } - } } diff --git a/CyclicBarrierDemo.java b/CyclicBarrierDemo.java index e1ba80b..04a621e 100644 --- a/CyclicBarrierDemo.java +++ b/CyclicBarrierDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; @@ -12,15 +10,13 @@ * [1] Releases when a number of threads are waiting */ public class CyclicBarrierDemo { + public static int MAX_NUM_THREADS = 10; + public static int sum = 0; + private static final Lock lock = new ReentrantLock(); + private static final CyclicBarrier barrier = new CyclicBarrier(MAX_NUM_THREADS); private static class Counter extends Thread { - - public static int MAX_NUM_THREADS = 10; - public static int sum = 0; - private static Lock lock = new ReentrantLock(); - private static CyclicBarrier barrier = new CyclicBarrier(MAX_NUM_THREADS); - - private String name; + private final String name; Counter(String name) { this.name = name; @@ -53,9 +49,9 @@ public void run() { } } - public static void main(String args[]) throws InterruptedException { + public static void main(String[] args) throws InterruptedException { List list = new ArrayList<>(); - for (int i = 0; i < Counter.MAX_NUM_THREADS/2 ; i++) { + for (int i = 0; i < MAX_NUM_THREADS / 2 ; i++) { list.add(new Counter("Adder-" + i)); list.add(new Counter("Multiplier-" + i)); } @@ -65,6 +61,6 @@ public static void main(String args[]) throws InterruptedException { for (Counter c : list) { c.join(); } - System.out.println("Final value and is " + Counter.sum); + System.out.println("Final value and is " + sum); } } diff --git a/ForkJoinRecursiveActionDemo.java b/ForkJoinRecursiveActionDemo.java index 59e08f2..cefda8a 100644 --- a/ForkJoinRecursiveActionDemo.java +++ b/ForkJoinRecursiveActionDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; @@ -13,11 +11,11 @@ */ public class ForkJoinRecursiveActionDemo { - static List list = new ArrayList<>(); - - static class RecursiveReplace extends RecursiveAction { + private static final List list = new ArrayList<>(); - private int start, end; + private static class RecursiveReplace extends RecursiveAction { + private final int start; + private final int end; private RecursiveReplace() { this.start = 0; diff --git a/ForkJoinRecursiveTaskDemo.java b/ForkJoinRecursiveTaskDemo.java index 636f0c3..e6b9106 100644 --- a/ForkJoinRecursiveTaskDemo.java +++ b/ForkJoinRecursiveTaskDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; @@ -10,9 +8,9 @@ */ public class ForkJoinRecursiveTaskDemo { - static class RecursiveSum extends RecursiveTask { - - long start, end; + private static class RecursiveSum extends RecursiveTask { + private final long start; + private final long end; RecursiveSum(long start, long end) { this.start = start; diff --git a/README.md b/README.md index c4baa68..3cb85ab 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ -# java_concurrency_algorithms -Java algorithms examples using concurrency +# Java Concurrent Animated - Visualizing the Java Concurrent API +## A Swing Application Demonstrating Core Java Concurrency Concepts + +Have you ever found yourself pondering how multiple threads work in Java and how to effectively utilize concepts like "notify", "notifyAll", and "wait"? Moreover, why is the usage of the "synchronized" keyword crucial in concurrent programming? Wouldn't it be amazing to witness this in action? + +Introducing Java Concurrent Animated, a Swing application crafted by the Java Champion **[Victor Grazi](https://blogs.oracle.com/java/post/victor-grazi-java-champion)**. This program serves as an illustrative showcase of core Java Concurrency concepts, including but not limited to AtomicInteger, CountDownLatch, Semaphore, ReentrantLock, ReadWriteLock, and more. + +## ⭐ [Medium Full Article Link](https://medium.com/javarevisited/java-concurrent-animated-visualizing-the-java-concurrent-api-e54d0708fd7a) ### 1. AtomicIntegerDemo Description: @@ -124,9 +130,8 @@ Description: ``` /** * Simple sample of Producer/Consumer using ArrayBlockingQueue (ThreadSafe) - * - One Producer adds items in a pipeline in a frequency of 100ms + * - One Producer adds items in a pipeline in a frequency of 200ms * - The pipeline has a limit of 5 items - * - When the pipeline is full, the Producer will wait for 500ms * - Two consumers take items from the pipeline and consumes it in 500ms */ ``` @@ -135,49 +140,45 @@ Description: Sample output: ```console Producer is adding item1 [1/5] -Consumer0 took item1 -Consumer1 took item2 +Consumer1 took item1 +Consumer0 took item2 Producer is adding item2 [1/5] Producer is adding item3 [1/5] -Producer is adding item4 [2/5] -Producer is adding item5 [3/5] -Producer is adding item6 [4/5] -Consumer0 took item3 -Consumer1 took item4 -Producer is adding item7 [3/5] -Producer is adding item8 [4/5] -Producer is adding item9 [5/5] -Producer queue is full -Consumer0 took item5 -Consumer1 took item6 -Producer is adding item10 [4/5] -Producer is adding item11 [5/5] -Consumer0 took item7 -Consumer1 took item8 -Producer is adding item12 [4/5] -Producer is adding item13 [5/5] -Producer queue is full -Consumer0 took item9 -Consumer1 took item10 -Producer is adding item14 [4/5] -Producer is adding item15 [5/5] -Producer queue is full -Consumer0 took item11 -Consumer1 took item12 +Consumer1 took item3 +Producer is adding item4 [1/5] +Consumer0 took item4 +Producer is adding item5 [1/5] +Producer is adding item6 [2/5] +Consumer1 took item5 +Consumer0 took item6 +Producer is adding item7 [1/5] +Producer is adding item8 [2/5] +Consumer1 took item7 +Producer is adding item9 [2/5] +Consumer0 took item8 +Producer is adding item10 [2/5] +Producer is adding item11 [3/5] +Consumer1 took item9 +Consumer0 took item10 +Producer is adding item12 [2/5] +Producer is adding item13 [3/5] +Consumer1 took item11 +Producer is adding item14 [3/5] +Consumer0 took item12 +Producer is adding item15 [3/5] Producer is adding item16 [4/5] -Consumer0 took item13 -Consumer1 took item14 +Consumer1 took item13 +Consumer0 took item14 Producer is adding item17 [3/5] Producer is adding item18 [4/5] -Producer is adding item19 [5/5] -Producer queue is full -Consumer0 took item15 -Consumer1 took item16 +Consumer1 took item15 +Producer is adding item19 [4/5] +Consumer0 took item16 Producer is adding item20 [4/5] -Consumer0 took item17 -Consumer1 took item18 -Consumer0 took item19 -Consumer1 took item20 +Consumer1 took item17 +Consumer0 took item18 +Consumer1 took item19 +Consumer0 took item20 ``` ### 7. ForkJoinRecursiveTaskDemo Description: @@ -262,14 +263,59 @@ Description: Sample output: ```console -2020-06-20T08:23:52.783Z | counter 4 -2020-06-20T08:23:52.146Z | counter 1 -2020-06-20T08:23:54.452Z | counter 6 -2020-06-20T08:23:55.507Z | counter 9 -2020-06-20T08:23:55.870Z | counter 10 -2020-06-20T08:23:55.317Z | counter 8 -2020-06-20T08:23:54.762Z | counter 7 -2020-06-20T08:23:53.057Z | counter 5 -2020-06-20T08:23:52.558Z | counter 2 -2020-06-20T08:23:52.759Z | counter 3 +2023-07-23T16:29:25.091Z | counter 1 +2023-07-23T16:29:26.428Z | counter 6 +2023-07-23T16:29:25.192Z | counter 2 +2023-07-23T16:29:28.022Z | counter 10 +2023-07-23T16:29:27.134Z | counter 8 +2023-07-23T16:29:25.205Z | counter 3 +2023-07-23T16:29:26.668Z | counter 7 +2023-07-23T16:29:25.511Z | counter 5 +2023-07-23T16:29:25.390Z | counter 4 +2023-07-23T16:29:27.178Z | counter 9 +``` +### 11. ThreadLocalDemo +Description: +``` +/** + * In this example, 3 threads will print the 3 consecutive exponential values of 1, 2 and 3. + * All of them will be using the SharedUtil class for it. + * P.S. SharedUtil uses Java ThreadLocal class which enables us to create variables that can only be read and written by the same thread. + */ +``` +[Code link](https://github.com/wagnerjfr/java_concurrency_algorithms/blob/master/ThreadLocalDemo.java) + +Sample output: +```console +Thread exp2: 2 +Thread exp3: 3 +Thread exp1: 1 +Thread exp1: 1 +Thread exp1: 1 +Thread exp2: 4 +Thread exp3: 9 +Thread exp2: 8 +Thread exp3: 27 +``` +### 12. SynchronizedDemo +Description: +``` +/** + * In this example, we have two threads, threadA and threadB, that share a common object lock. threadA enters a synchronized + * block, does some work, then calls lock.wait() to wait for threadB to notify it. threadB, in its synchronized block, does some work, + * and then calls lock.notify() to notify threadA to resume. + * When you run this code, you'll observe that threadA will wait until threadB calls lock.notify(), demonstrating how synchronization + * using synchronized, wait, and notify ensures proper coordination between the two threads. + */ +``` +[Code link](https://github.com/wagnerjfr/java_concurrency_algorithms/blob/master/SynchronizedDemo.java) + +Sample output: +```console +Thread A is doing some work. +Thread A is waiting for Thread B to notify. +Thread B is doing some work. +Thread B is notifying Thread A to resume. +Thread A is resuming its work. +Both threads have completed. ``` diff --git a/ReentrantReadWriteLockDemo.java b/ReentrantReadWriteLockDemo.java index d1b5a05..9948d1a 100644 --- a/ReentrantReadWriteLockDemo.java +++ b/ReentrantReadWriteLockDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -63,7 +61,7 @@ void add(int i) { private static class Worker implements Runnable { private static final int MAX_OP = 5; - private String name; + private final String name; private boolean stop = false; private int count = 1; diff --git a/SemaphoreDemo.java b/SemaphoreDemo.java index 25ca637..61f1d13 100644 --- a/SemaphoreDemo.java +++ b/SemaphoreDemo.java @@ -1,5 +1,3 @@ -package main.concurrency; - import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -8,11 +6,16 @@ * [1] Can be used by multiple threads at the same time and includes a counter to track availability */ public class SemaphoreDemo { + private static final Semaphore charger = new Semaphore(5); - static class ElectricalVehicle extends Thread { + public static void main(String[] args) { + for (int i = 0; i < 10; i++) { + new ElectricalVehicle("EV" + i).start(); + } + } - static Semaphore charger = new Semaphore(5); - String name; + private static class ElectricalVehicle extends Thread { + private final String name; ElectricalVehicle(String name) { this.name = name; @@ -31,10 +34,4 @@ public void run() { } } } - - public static void main(String[] args) { - for (int i = 0; i < 10; i++) { - new ElectricalVehicle("EV" + i).start(); - } - } } diff --git a/SynchronizedDemo.java b/SynchronizedDemo.java new file mode 100644 index 0000000..7726ed0 --- /dev/null +++ b/SynchronizedDemo.java @@ -0,0 +1,55 @@ +/** + * In this example, we have two threads, threadA and threadB, that share a common object lock. threadA enters a synchronized + * block, does some work, then calls lock.wait() to wait for threadB to notify it. threadB, in its synchronized block, does some work, + * and then calls lock.notify() to notify threadA to resume. + * When you run this code, you'll observe that threadA will wait until threadB calls lock.notify(), demonstrating how synchronization + * using synchronized, wait, and notify ensures proper coordination between the two threads. + */ +public class SynchronizedDemo { + public static void main(String[] args) { + Object lock = new Object(); + + // Thread A + Thread threadA = new Thread(() -> { + synchronized (lock) { + try { + System.out.println("Thread A is doing some work."); + Thread.sleep(2000); + System.out.println("Thread A is waiting for Thread B to notify."); + lock.wait(); + System.out.println("Thread A is resuming its work."); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + // Thread B + Thread threadB = new Thread(() -> { + synchronized (lock) { + try { + System.out.println("Thread B is doing some work."); + Thread.sleep(3000); + System.out.println("Thread B is notifying Thread A to resume."); + lock.notify(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + // Start both threads + threadA.start(); + threadB.start(); + + // Wait for both threads to finish + try { + threadA.join(); + threadB.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("Both threads have completed."); + } +} diff --git a/ThreadLocalDemo.java b/ThreadLocalDemo.java new file mode 100644 index 0000000..257ce5c --- /dev/null +++ b/ThreadLocalDemo.java @@ -0,0 +1,63 @@ +import java.util.concurrent.ThreadLocalRandom; + +/** + * In this example, 3 threads will print the 3 consecutive exponential values of 1, 2 and 3. + * All of them will be using the SharedUtil class for it. + * P.S. SharedUtil uses Java ThreadLocal class which enables us to create variables that can only be read and written by the same thread. + */ +public class ThreadLocalDemo { + + public static void main(String[] args) { + new ThreadLocalDemo().execute(); + } + + private void execute() { + for (int i = 1; i <= 3; i++) { + new Thread(new Task(i), "Thread exp" + i).start(); + } + } + + private static class Task implements Runnable { + private final int num; + + private Task(int num) { + this.num = num; + } + + @Override + public void run() { + try { + SharedUtil.setCounter(num); + + for (int i = 0; i < 3; i++) { + SharedUtil.calculateAndPrint(); + Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500)); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + SharedUtil.remove(); + } + } + } + + private static class SharedUtil { + private static final ThreadLocal threadLocalCounter = ThreadLocal.withInitial(() -> 0); + private static final ThreadLocal threadLocalAccumulator = ThreadLocal.withInitial(() -> 0); + + static void setCounter(int number) { + threadLocalCounter.set(number); + threadLocalAccumulator.set(number); + } + + static void calculateAndPrint() { + System.out.println(Thread.currentThread().getName() + ": " + threadLocalAccumulator.get()); + threadLocalAccumulator.set(threadLocalAccumulator.get() * threadLocalCounter.get()); + } + + static void remove() { + threadLocalAccumulator.remove(); + threadLocalCounter.remove(); + } + } +} diff --git a/javaConcurrentAnimated.jar b/javaConcurrentAnimated.jar new file mode 100644 index 0000000..f1767a7 Binary files /dev/null and b/javaConcurrentAnimated.jar differ