diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J1_SimpleType.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J1_SimpleType.java new file mode 100644 index 0000000..63f953b --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J1_SimpleType.java @@ -0,0 +1,54 @@ +package com.heibaiying.atomic; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * 原子对象 + */ +public class J1_SimpleType { + + private static int i = 0; + private static AtomicInteger j = new AtomicInteger(0); + /*使用AtomicReference对普通对象进行封装*/ + private static AtomicReference k = new AtomicReference<>(0); + + static class Task implements Runnable { + + private CountDownLatch latch; + + Task(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + i++; + j.incrementAndGet(); + k.getAndUpdate(x -> x + 1); + latch.countDown(); + } + } + + public static void main(String[] args) throws InterruptedException { + int number = 10000; + CountDownLatch latch = new CountDownLatch(number); + Semaphore semaphore = new Semaphore(10); + ExecutorService executorService = Executors.newFixedThreadPool(10); + Task task = new Task(latch); + for (int i = 0; i < number; i++) { + semaphore.acquire(); + executorService.execute(task); + semaphore.release(); + } + latch.await(); + System.out.println("输出i的值" + i); + System.out.println("输出j的值" + j.get()); + System.out.println("输出K的值" + k.get()); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J2_ArrayThreadSafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J2_ArrayThreadSafe.java new file mode 100644 index 0000000..0141a1b --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J2_ArrayThreadSafe.java @@ -0,0 +1,52 @@ +package com.heibaiying.atomic; + +import java.util.ArrayList; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +public class J2_ArrayThreadSafe { + + // 对集合本生的操作线程安全 + private static LinkedBlockingQueue LinkedBlockingQueue = new LinkedBlockingQueue<>(); + //对集合本生的操作线程安全 + private static Vector vector = new Vector<>(); + //普通集合 + private static ArrayList arrayList = new ArrayList<>(); + + static class Task implements Runnable { + + private CountDownLatch latch; + + Task(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + LinkedBlockingQueue.add(i); + vector.add(i); + arrayList.add(i); + } + latch.countDown(); + } + } + + public static void main(String[] args) throws InterruptedException { + int number = 1000; + CountDownLatch latch = new CountDownLatch(number); + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (int i = 0; i < number; i++) { + executorService.execute(new Task(latch)); + } + latch.await(); + System.out.println("集合本生的线程安全:"); + System.out.println("LinkedBlockingQueue size : " + LinkedBlockingQueue.size()); + System.out.println("vector size : " + vector.size()); + System.out.println("arrayList size : " + arrayList.size()); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J3_ArrayElementThreadUnsafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J3_ArrayElementThreadUnsafe.java new file mode 100644 index 0000000..72cb5a7 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J3_ArrayElementThreadUnsafe.java @@ -0,0 +1,61 @@ +package com.heibaiying.atomic; + +import java.util.ArrayList; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicIntegerArray; + +public class J3_ArrayElementThreadUnsafe { + + private static int capacity = 10; + // 保证对集合内元素的操作具有原子性 + private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(capacity); + // 对集合内元素的操作线程不安全 + private static Vector vector = new Vector<>(capacity); + // 对集合内元素的操作线程不安全 + private static ArrayList arrayList = new ArrayList<>(capacity); + + static { + for (int i = 0; i < capacity; i++) { + arrayList.add(0); + vector.add(0); + } + } + + static class Task implements Runnable { + + private CountDownLatch latch; + + Task(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + int num = i % capacity; + atomicIntegerArray.getAndIncrement(num); + vector.set(num, vector.get(num) + 1); + arrayList.set(num, arrayList.get(num) + 1); + } + latch.countDown(); + } + } + + public static void main(String[] args) throws InterruptedException { + int number = 1000; + CountDownLatch latch = new CountDownLatch(number); + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (int i = 0; i < number; i++) { + executorService.execute(new Task(latch)); + } + latch.await(); + System.out.println("集合内元素的线程安全:"); + System.out.println("atomicIntegerArray size : " + atomicIntegerArray); + System.out.println("vector size : " + vector); + System.out.println("arrayList size : " + arrayList); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J4_ThreadUnsafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J4_ThreadUnsafe.java new file mode 100644 index 0000000..ec5334b --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J4_ThreadUnsafe.java @@ -0,0 +1,66 @@ +package com.heibaiying.atomic; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class J4_ThreadUnsafe { + + static class Task implements Runnable { + + private Candidate candidate; + private CountDownLatch latch; + + Task(CountDownLatch latch, Candidate candidate) { + this.candidate = candidate; + this.latch = latch; + } + + @Override + public void run() { + candidate.setScore(candidate.getScore() + 1); + latch.countDown(); + } + } + + public static void main(String[] args) throws InterruptedException { + int number = 100000; + CountDownLatch latch = new CountDownLatch(number); + ExecutorService executorService = Executors.newFixedThreadPool(10); + Candidate candidate = new Candidate("候选人", 0); + for (int i = 0; i < number; i++) { + executorService.execute(new Task(latch, candidate)); + } + latch.await(); + System.out.println(candidate.getName() + "获得票数:" + candidate.getScore()); + executorService.shutdown(); + } + + + private static class Candidate { + + private String name; + private volatile int score; + + Candidate(String name, int score) { + this.name = name; + this.score = score; + } + + public int getScore() { + return score; + } + + public void setScore(int score) { + this.score = score; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J5_AtomicIntegerFieldUpdater.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J5_AtomicIntegerFieldUpdater.java new file mode 100644 index 0000000..37da751 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J5_AtomicIntegerFieldUpdater.java @@ -0,0 +1,72 @@ +package com.heibaiying.atomic; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +public class J5_AtomicIntegerFieldUpdater { + + static class Task implements Runnable { + + private Candidate candidate; + private CountDownLatch latch; + private AtomicIntegerFieldUpdater fieldUpdater; + + Task(CountDownLatch latch, Candidate candidate, AtomicIntegerFieldUpdater fieldUpdater) { + this.candidate = candidate; + this.latch = latch; + this.fieldUpdater = fieldUpdater; + } + + @Override + public void run() { + fieldUpdater.incrementAndGet(candidate); + latch.countDown(); + } + } + + public static void main(String[] args) throws InterruptedException { + int number = 100000; + CountDownLatch latch = new CountDownLatch(number); + ExecutorService executorService = Executors.newFixedThreadPool(10); + Candidate candidate = new Candidate("候选人", 0); + AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); + for (int i = 0; i < number; i++) { + executorService.execute(new Task(latch, candidate, fieldUpdater)); + } + latch.await(); + System.out.println(candidate.getName() + "获得票数:" + candidate.getScore()); + executorService.shutdown(); + } + + + private static class Candidate { + + private String name; + + // 1. 不能声明为private 2. 必须用volatile关键字修饰 + public volatile int score; + + Candidate(String name, int score) { + this.name = name; + this.score = score; + } + + public int getScore() { + return score; + } + + public void setScore(int score) { + this.score = score; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J6_SynchronousQueue.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J6_SynchronousQueue.java new file mode 100644 index 0000000..9d71a2f --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/atomic/J6_SynchronousQueue.java @@ -0,0 +1,47 @@ +package com.heibaiying.atomic; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + +public class J6_SynchronousQueue { + + private static SynchronousQueue queue = new SynchronousQueue<>(); + + static class ReadThread implements Runnable { + @Override + public void run() { + System.out.println("读线程启动"); + while (true) { + try { + Double peek = queue.take(); + System.out.println("读线程获取值:" + peek); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + } + + static class WriteThread implements Runnable { + @Override + public void run() { + System.out.println("写线程写入值"); + try { + queue.put(Math.random()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + + public static void main(String[] args) throws InterruptedException { + new Thread(new ReadThread()).start(); + Thread.sleep(3000); + ScheduledExecutorService pool = Executors.newScheduledThreadPool(3); + pool.scheduleAtFixedRate(new WriteThread(), 0, 2, TimeUnit.SECONDS); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/condition/AwaitAndSignal.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/condition/AwaitAndSignal.java new file mode 100644 index 0000000..693b68c --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/condition/AwaitAndSignal.java @@ -0,0 +1,39 @@ +package com.heibaiying.condition; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 等待与唤醒 + */ +public class AwaitAndSignal { + + private static ReentrantLock lock = new ReentrantLock(); + private static Condition condition = lock.newCondition(); + + static class IncreaseTask implements Runnable { + @Override + public void run() { + try { + lock.lock(); + String threadName = Thread.currentThread().getName(); + System.out.println(threadName + "等待通知..."); + condition.await(); + System.out.println(threadName + "获得锁"); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + Thread thread1 = new Thread(new IncreaseTask()); + thread1.start(); + Thread.sleep(2000); + lock.lock(); + condition.signal(); + lock.unlock(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/countDown/J2_CountDown.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/countDown/J2_CountDown.java new file mode 100644 index 0000000..9ba25bc --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/countDown/J2_CountDown.java @@ -0,0 +1,41 @@ +package com.heibaiying.countDown; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +public class J2_CountDown { + + private static int number = 100; + private static CountDownLatch latch = new CountDownLatch(number); + private static AtomicInteger integer = new AtomicInteger(0); + + + static class IncreaseTask implements Runnable { + @Override + public void run() { + try { + // 假设这是一个耗时的任务 + Thread.sleep(3000); + integer.incrementAndGet(); + // 计数减一 + latch.countDown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + IncreaseTask task = new IncreaseTask(); + ExecutorService executorService = Executors.newFixedThreadPool(100); + for (int i = 0; i < number; i++) { + executorService.submit(task); + } + latch.await(); + // 会等待所有任务执行完成再输出 + System.out.println("integer:" + integer); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/countDown/j1_Normal.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/countDown/j1_Normal.java new file mode 100644 index 0000000..c32e8e5 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/countDown/j1_Normal.java @@ -0,0 +1,34 @@ +package com.heibaiying.countDown; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +public class j1_Normal { + + private static AtomicInteger integer = new AtomicInteger(0); + + static class IncreaseTask implements Runnable { + @Override + public void run() { + try { + // 假设这是一个耗时的任务 + Thread.sleep(3000); + integer.incrementAndGet(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) { + IncreaseTask task = new IncreaseTask(); + ExecutorService executorService = Executors.newFixedThreadPool(100); + for (int i = 0; i < 100; i++) { + executorService.submit(task); + } + // 不会等待所有任务完成就输出,此时通常为0 + System.out.println("integer:" + integer); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/cyclicBarrier/J1_CyclicBarrier.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/cyclicBarrier/J1_CyclicBarrier.java new file mode 100644 index 0000000..edda69a --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/cyclicBarrier/J1_CyclicBarrier.java @@ -0,0 +1,36 @@ +package com.heibaiying.cyclicBarrier; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 每五个人完成任务后,则算一个小组已完成 + */ +public class J1_CyclicBarrier { + + private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("五人小组任务执行完成")); + + static class Task implements Runnable { + @Override + public void run() { + try { + long l = new Double(Math.random() * 5000).longValue(); + Thread.sleep(l); + System.out.println("任务" + Thread.currentThread().getId() + "执行完成"); + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) { + ExecutorService executorService = Executors.newFixedThreadPool(20); + for (int j = 0; j < 10; j++) { + executorService.submit(new Task()); + } + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/forkAndJoin/CountTask.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/forkAndJoin/CountTask.java new file mode 100644 index 0000000..02f19e0 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/forkAndJoin/CountTask.java @@ -0,0 +1,62 @@ +package com.heibaiying.forkAndJoin; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveTask; + +public class CountTask extends RecursiveTask { + + private Long start; + private Long end; + private static long hold = 50L; + + CountTask(Long start, Long end) { + this.start = start; + this.end = end; + } + + @Override + protected Long compute() { + long sum = 0L; + // 一定要保证能够进进入if中的终止条件,如果无限制的拆分,可能会导致栈溢出 + if (end - start <= hold) { + // 假设一个最小的计算都是耗时的 + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + for (long i = start; i < end; i++) { + sum += i; + } + } else { + List countTasks = new ArrayList<>(); + long l = (end - start) / hold; + for (int i = 0; i < l; i++) { + CountTask task = new CountTask(start + i * hold, start + (i + 1) * hold); + countTasks.add(task); + task.fork(); + if (i == l - 1) { + CountTask countTask = new CountTask(start + (i + 1) * hold, end); + countTasks.add(countTask); + countTask.fork(); + } + } + for (CountTask countTask : countTasks) { + sum += countTask.join(); + } + } + return sum; + } + + public static void main(String[] args) throws ExecutionException, InterruptedException { + ForkJoinPool forkJoinPool = new ForkJoinPool(100); + CountTask task = new CountTask(0L, 10000L); + ForkJoinTask result = forkJoinPool.submit(task); + Long aLong = result.get(); + System.out.println("结果为" + aLong); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J1_Future.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J1_Future.java new file mode 100644 index 0000000..88e31f7 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J1_Future.java @@ -0,0 +1,43 @@ +package com.heibaiying.future; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +public class J1_Future { + + static class Task implements Callable { + + private int operator; + + Task(Integer operator) { + this.operator = operator; + } + + @Override + public Integer call() throws Exception { + Thread.sleep(500); + return operator * 10; + } + } + + public static void main(String[] args) { + ExecutorService executors = Executors.newFixedThreadPool(20); + List> futureList = new ArrayList<>(); + for (int i = 0; i <= 100; i++) { + Future submit = executors.submit(new Task(i)); + futureList.add(submit); + } + // 获取所有线程的返回值并计算 + Integer reduce = futureList.stream().map(x -> { + try { + return x.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return 0; + }).reduce(0, Integer::sum); + System.out.println("计算结果为:" + reduce); + executors.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J2_CompletableFuture.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J2_CompletableFuture.java new file mode 100644 index 0000000..e09b5ec --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J2_CompletableFuture.java @@ -0,0 +1,40 @@ +package com.heibaiying.future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class J2_CompletableFuture { + + static class Compute implements Runnable { + + private CompletableFuture future; + + Compute(CompletableFuture future) { + this.future = future; + } + + @Override + public void run() { + try { + System.out.println("子线程等待主线程运算完成····"); + Integer integer = future.get(); + System.out.println("子线程完成后续运算:" + integer * integer); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + CompletableFuture future = new CompletableFuture<>(); + System.out.println("主线程开始计算"); + new Thread(new Compute(future)).start(); + int i = 0; + for (int j = 0; j < 100; j++) { + i = i + j; + } + Thread.sleep(2000); + System.out.println("主线程计算完成"); + future.complete(i); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J3_SupplyAsync.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J3_SupplyAsync.java new file mode 100644 index 0000000..3d78026 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J3_SupplyAsync.java @@ -0,0 +1,28 @@ +package com.heibaiying.future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class J3_SupplyAsync { + + private static Integer compute() { + int i = 0; + for (int j = 0; j < 100; j++) { + i = i + j; + } + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("子线程计算完成"); + return i; + } + + public static void main(String[] args) throws ExecutionException, InterruptedException { + CompletableFuture supplyAsync = CompletableFuture.supplyAsync(J3_SupplyAsync::compute); + System.out.println("主线程等待子线程计算完成"); + Integer integer = supplyAsync.get(); + System.out.println("主线程计算完成:" + integer * integer); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J4_StreamingCall.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J4_StreamingCall.java new file mode 100644 index 0000000..80899a6 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J4_StreamingCall.java @@ -0,0 +1,43 @@ +package com.heibaiying.future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * 流式调用 + */ +public class J4_StreamingCall { + + private static Integer compute() { + System.out.println("compute所在线程:" + Thread.currentThread().getId()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return 100; + } + + private static Integer multi(Integer integer) { + try { + System.out.println("multi所在线程:" + Thread.currentThread().getId()); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return integer * integer; + } + + private static void accept(Integer integer) { + System.out.println("accept所在线程:" + Thread.currentThread().getId()); + System.out.println("accept方法消费掉计算结果:" + integer); + } + + public static void main(String[] args) throws ExecutionException, InterruptedException { + CompletableFuture future = CompletableFuture.supplyAsync(J4_StreamingCall::compute) + .thenApply(J4_StreamingCall::multi) + .thenAccept(J4_StreamingCall::accept) //值在这一步被消费掉了 + .thenAccept(x -> System.out.println("运算结果:" + x)); + future.get(); //惰性求值,如果缺少这一步,不会有任何输出 + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J5_AbnormalCapture.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J5_AbnormalCapture.java new file mode 100644 index 0000000..b2fd58e --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J5_AbnormalCapture.java @@ -0,0 +1,32 @@ +package com.heibaiying.future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * 异常捕获 + */ +public class J5_AbnormalCapture { + + private static Integer compute() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + int i = 100 / 0; + return 100; + } + + private static Integer dealException(Throwable e) { + e.printStackTrace(); + return 0; + } + + public static void main(String[] args) throws ExecutionException, InterruptedException { + CompletableFuture future = CompletableFuture.supplyAsync(J5_AbnormalCapture::compute) + .exceptionally(J5_AbnormalCapture::dealException) + .thenAccept(System.out::println); + future.get(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J6_Combination.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J6_Combination.java new file mode 100644 index 0000000..b15afbf --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/future/J6_Combination.java @@ -0,0 +1,38 @@ +package com.heibaiying.future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * 利用 thenCompose 和 thenCombineAsync 组合多个 CompletableFuture + */ +public class J6_Combination { + + private static Integer compute() { + System.out.println("compute 所在线程:" + Thread.currentThread().getId()); + return 100; + } + + private static Integer multi(Integer integer) { + System.out.println("epr 所在线程:" + Thread.currentThread().getId()); + return integer * integer; + } + + + public static void main(String[] args) throws ExecutionException, InterruptedException { + // 组合实现方式1 thenCompose 一个计算的输入依赖另外一个计算的结果 + CompletableFuture future01 = CompletableFuture.supplyAsync(J6_Combination::compute) + .thenCompose(x -> CompletableFuture.supplyAsync(() -> multi(x))) + .thenAccept(x -> System.out.println("运算结果:" + x)); + future01.get(); + + System.out.println(); + + // 组合实现方式2 thenCombineAsync 两个计算之间不依赖 + CompletableFuture future02 = CompletableFuture.supplyAsync(J6_Combination::compute); + CompletableFuture future03 = CompletableFuture.supplyAsync(() -> J6_Combination.multi(100)); + CompletableFuture futureAll = future02.thenCombineAsync(future03, (x, y) -> x + y); + System.out.println("运算结果:" + futureAll.get()); + + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/lockSupport/J1_LockSupport.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/lockSupport/J1_LockSupport.java new file mode 100644 index 0000000..adaac80 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/lockSupport/J1_LockSupport.java @@ -0,0 +1,27 @@ +package com.heibaiying.lockSupport; + +import java.util.concurrent.locks.LockSupport; + +public class J1_LockSupport { + + static class Task implements Runnable { + @Override + public void run() { + long id = Thread.currentThread().getId(); + System.out.println("线程" + id + "开始阻塞"); + LockSupport.park(); + System.out.println("线程" + id + "解除阻塞"); + } + } + + public static void main(String[] args) throws InterruptedException { + Thread thread01 = new Thread(new Task()); + Thread thread02 = new Thread(new Task()); + thread01.start(); + thread02.start(); + Thread.sleep(3000); + System.out.println("主线程干预"); + LockSupport.unpark(thread01); + LockSupport.unpark(thread02); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/readWriteLock/ReadWriteLock.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/readWriteLock/ReadWriteLock.java new file mode 100644 index 0000000..7ced738 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/readWriteLock/ReadWriteLock.java @@ -0,0 +1,94 @@ +package com.heibaiying.readWriteLock; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class ReadWriteLock { + + // 可重入锁 + private static ReentrantLock reentrantLock = new ReentrantLock(); + // 读写锁 + private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + // 读锁 + private static ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); + // 写锁 + private static ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); + + // 待赋值的变量 + private static String i = ""; + + //写方法 + static class Write implements Runnable { + + private Lock lock; + private String value; + + Write(Lock lock, String value) { + this.lock = lock; + this.value = value; + } + + @Override + public void run() { + try { + lock.lock(); + Thread.sleep(1000); + i = value; + System.out.println(Thread.currentThread().getName() + "写入值" + i); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + } + } + + //读方法 + static class Read implements Runnable { + + private Lock lock; + + Read(Lock lock) { + this.lock = lock; + } + + @Override + public void run() { + try { + lock.lock(); + Thread.sleep(1000); + System.out.println(Thread.currentThread().getName() + "读取到值" + i); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + lock.unlock(); + } + } + } + + + public static void main(String[] args) throws InterruptedException { + + // 耗时2秒,写锁是互斥的,但读锁是并行的 + for (int j = 0; j < 2; j++) { + Thread thread = new Thread(new Write(writeLock, String.valueOf(j))); + thread.start(); + } + for (int j = 0; j < 18; j++) { + Thread thread = new Thread(new Read(readLock)); + thread.start(); + } + + + // 使用重入锁时,读锁彼此之间也是互斥的 + for (int j = 0; j < 2; j++) { + Thread thread = new Thread(new Write(reentrantLock, String.valueOf(j))); + thread.start(); + } + for (int j = 0; j < 18; j++) { + Thread thread = new Thread(new Read(reentrantLock)); + thread.start(); + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J1_ThreadSafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J1_ThreadSafe.java new file mode 100644 index 0000000..e11a748 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J1_ThreadSafe.java @@ -0,0 +1,38 @@ +package com.heibaiying.reentrantLock; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * 利用ReentrantLock实现线程安全 + */ +public class J1_ThreadSafe { + + private static ReentrantLock reentrantLock = new ReentrantLock(); + private static Integer i = 0; + + static class IncreaseTask implements Runnable { + @Override + public void run() { + for (int j = 0; j < 100000; j++) { + try { + reentrantLock.lock(); + i++; + } catch (Exception e) { + e.printStackTrace(); + } finally { + reentrantLock.unlock(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + Thread thread1 = new Thread(new IncreaseTask()); + Thread thread2 = new Thread(new IncreaseTask()); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + System.out.println(i); + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J2_Reentrant.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J2_Reentrant.java new file mode 100644 index 0000000..8e5332f --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J2_Reentrant.java @@ -0,0 +1,42 @@ +package com.heibaiying.reentrantLock; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * 可重入性 + */ +public class J2_Reentrant { + + private static ReentrantLock reentrantLock = new ReentrantLock(); + private static Integer i = 0; + + static class IncreaseTask implements Runnable { + @Override + public void run() { + for (int j = 0; j < 100000; j++) { + try { + reentrantLock.lock(); + reentrantLock.lock(); + reentrantLock.lock(); + i++; + } catch (Exception e) { + e.printStackTrace(); + } finally { + reentrantLock.unlock(); + reentrantLock.unlock(); + reentrantLock.unlock(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + Thread thread1 = new Thread(new IncreaseTask()); + Thread thread2 = new Thread(new IncreaseTask()); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + System.out.println(i); + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J3_TimeLimitedLock.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J3_TimeLimitedLock.java new file mode 100644 index 0000000..caf0f3d --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J3_TimeLimitedLock.java @@ -0,0 +1,39 @@ +package com.heibaiying.reentrantLock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class J3_TimeLimitedLock { + + private static ReentrantLock reentrantLock = new ReentrantLock(); + + static class IncreaseTask implements Runnable { + @Override + public void run() { + + try { + String threadName = Thread.currentThread().getName(); + // 指定锁定时间 + if (reentrantLock.tryLock(5, TimeUnit.SECONDS)) { + System.out.println(threadName + "被执行"); + Thread.sleep(6000); + } else { + System.out.println(threadName + "获得锁失败"); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + reentrantLock.unlock(); + } + } + } + + public static void main(String[] args) { + Thread thread01 = new Thread(new IncreaseTask()); + thread01.setName("线程1"); + thread01.start(); + Thread thread02 = new Thread(new IncreaseTask()); + thread02.setName("线程2"); + thread02.start(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J4_FairLock.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J4_FairLock.java new file mode 100644 index 0000000..c515690 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/reentrantLock/J4_FairLock.java @@ -0,0 +1,32 @@ +package com.heibaiying.reentrantLock; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * 公平锁 + */ +public class J4_FairLock { + + // 参数为true,代表使用公平锁 + private static ReentrantLock fairLock = new ReentrantLock(true); + + static class IncreaseTask implements Runnable { + @Override + public void run() { + while (true) { + fairLock.lock(); + System.out.println(Thread.currentThread().getName() + "获得锁"); + fairLock.unlock(); + } + } + } + + public static void main(String[] args) { + Thread thread1 = new Thread(new IncreaseTask()); + Thread thread2 = new Thread(new IncreaseTask()); + Thread thread3 = new Thread(new IncreaseTask()); + thread1.start(); + thread2.start(); + thread3.start(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/semaphore/J1_Semaphore.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/semaphore/J1_Semaphore.java new file mode 100644 index 0000000..b5ca0ae --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/semaphore/J1_Semaphore.java @@ -0,0 +1,29 @@ +package com.heibaiying.semaphore; + +import java.util.concurrent.Semaphore; + +public class J1_Semaphore { + + private static Semaphore semaphore = new Semaphore(5); + + static class IncreaseTask implements Runnable { + @Override + public void run() { + try { + semaphore.acquire(); + System.out.println(Thread.currentThread().getId() + "获得锁!"); + Thread.sleep(5000); + semaphore.release(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) { + IncreaseTask task = new IncreaseTask(); + for (int i = 0; i < 20; i++) { + new Thread(task).start(); + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J1_ThreadUnsafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J1_ThreadUnsafe.java new file mode 100644 index 0000000..dc00cc3 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J1_ThreadUnsafe.java @@ -0,0 +1,33 @@ +package com.heibaiying.synchronized_; + +/** + * 线程不安全 + */ +public class J1_ThreadUnsafe { + + private static int i = 0; + + public static void main(String[] args) throws InterruptedException { + Thread thread1 = new Thread(new IncreaseTask()); + Thread thread2 = new Thread(new IncreaseTask()); + thread1.start(); + thread2.start(); + // 等待线程结束再打印返回值 + thread1.join(); + thread2.join(); + System.out.println(i); + } + + static class IncreaseTask implements Runnable { + @Override + public void run() { + for (int j = 0; j < 100000; j++) { + inc(); + } + } + + private void inc() { + i++; + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J2_SynchronizedUnsafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J2_SynchronizedUnsafe.java new file mode 100644 index 0000000..113a3ed --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J2_SynchronizedUnsafe.java @@ -0,0 +1,36 @@ +package com.heibaiying.synchronized_; + +public class J2_SynchronizedUnsafe { + + private static int i = 0; + + public static void main(String[] args) throws InterruptedException { + // 两个线程分别调用不同的IncreaseTask实例 + Thread thread1 = new Thread(new IncreaseTask()); + Thread thread2 = new Thread(new IncreaseTask()); + thread1.start(); + thread2.start(); + //等待结束后 才打印返回值 + thread1.join(); + thread2.join(); + //并打印返回值 + System.out.println(i); + } + + static class IncreaseTask implements Runnable { + @Override + public void run() { + for (int j = 0; j < 100000; j++) { + inc(); + } + } + + /** + * 两个线程启动的不是同一个IncreaseTask实例, + * 所以两个线程锁住的不是同一个方法对象,此时也是线程不安全的 + */ + private synchronized void inc() { + i++; + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J3_SynchronizedSafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J3_SynchronizedSafe.java new file mode 100644 index 0000000..cb4f279 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J3_SynchronizedSafe.java @@ -0,0 +1,33 @@ +package com.heibaiying.synchronized_; + +public class J3_SynchronizedSafe { + + private static int i = 0; + + public static void main(String[] args) throws InterruptedException { + // 两个线程调用的是同一个IncreaseTask实例,此时是线程安全的 + IncreaseTask task = new IncreaseTask(); + Thread thread1 = new Thread(task); + Thread thread2 = new Thread(task); + thread1.start(); + thread2.start(); + //等待结束后 才打印返回值 + thread1.join(); + thread2.join(); + //并打印返回值 + System.out.println(i); + } + + static class IncreaseTask implements Runnable { + @Override + public void run() { + for (int j = 0; j < 100000; j++) { + inc(); + } + } + + private synchronized void inc() { + i++; + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J4_SynchronizedSafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J4_SynchronizedSafe.java new file mode 100644 index 0000000..0ccf490 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J4_SynchronizedSafe.java @@ -0,0 +1,33 @@ +package com.heibaiying.synchronized_; + +public class J4_SynchronizedSafe { + + private static int i = 0; + + public static void main(String[] args) throws InterruptedException { + + Thread thread1 = new Thread(new IncreaseTask()); + Thread thread2 = new Thread(new IncreaseTask()); + thread1.start(); + thread2.start(); + //等待结束后 才打印返回值 + thread1.join(); + thread2.join(); + //并打印返回值 + System.out.println(i); + } + + static class IncreaseTask implements Runnable { + @Override + public void run() { + for (int j = 0; j < 100000; j++) { + inc(); + } + } + + // synchronized 作用在静态方法上,锁住的是类对象,此时也是线程安全的 + private static synchronized void inc() { + i++; + } + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J5_SynchronizedSafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J5_SynchronizedSafe.java new file mode 100644 index 0000000..d594e44 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/synchronized_/J5_SynchronizedSafe.java @@ -0,0 +1,30 @@ +package com.heibaiying.synchronized_; + +public class J5_SynchronizedSafe { + + private static final String s = ""; + + private static int i = 0; + + static class IncreaseTask implements Runnable { + @Override + public void run() { + for (int j = 0; j < 100000; j++) { + // 锁住的是同一个对象,此时也是线程安全的 + synchronized (s) { + i++; + } + } + } + } + + public static void main(String[] args) throws InterruptedException { + Thread thread1 = new Thread(new IncreaseTask()); + Thread thread2 = new Thread(new IncreaseTask()); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + System.out.println(i); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadGroup/J1_ThreadGroup.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadGroup/J1_ThreadGroup.java new file mode 100644 index 0000000..bb3803e --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadGroup/J1_ThreadGroup.java @@ -0,0 +1,21 @@ +package com.heibaiying.threadGroup; + +public class J1_ThreadGroup { + + static class Task implements Runnable { + @Override + public void run() { + Thread current = Thread.currentThread(); + System.out.println("当前线程id: " + current.getId() + " 当前所属线程组: " + current.getThreadGroup().getName()); + } + } + + public static void main(String[] args) { + ThreadGroup group = new ThreadGroup("java线程组"); + // 指定所属的线程组 + Thread thread1 = new Thread(group, new Task()); + Thread thread2 = new Thread(group, new Task()); + thread1.start(); + thread2.start(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadGroup/J2_Daemon.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadGroup/J2_Daemon.java new file mode 100644 index 0000000..41ec94b --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadGroup/J2_Daemon.java @@ -0,0 +1,25 @@ +package com.heibaiying.threadGroup; + +public class J2_Daemon { + + static class Task implements Runnable { + @Override + public void run() { + Thread current = Thread.currentThread(); + System.out.println("当前线程id: " + current.getId() + "当前所属线程组: " + current.getThreadGroup().getName()); + } + } + + public static void main(String[] args) { + ThreadGroup group = new ThreadGroup("java线程组"); + Thread thread1 = new Thread(group, new Task()); + Thread thread2 = new Thread(group, new Task()); + Thread thread3 = new Thread(group, new Task()); + thread1.setDaemon(true); + thread2.setDaemon(true); + thread3.setDaemon(true); + thread1.start(); // 通常不会执行 + thread2.start(); // 通常不会执行 + thread3.start(); // 通常不会执行 + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadLocal/J1_ThreadUnsafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadLocal/J1_ThreadUnsafe.java new file mode 100644 index 0000000..23b714f --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadLocal/J1_ThreadUnsafe.java @@ -0,0 +1,45 @@ +package com.heibaiying.threadLocal; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 线程不安全的SimpleDateFormat + */ +public class J1_ThreadUnsafe { + + private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + private static int sum = 1000; + private static CountDownLatch countDownLatch = new CountDownLatch(sum); + private static AtomicInteger atomicInteger = new AtomicInteger(0); + + static class Task implements Runnable { + + @Override + public void run() { + try { + Date parse = sdf.parse("2018-08-08 08:08:08"); + System.out.println(parse); + atomicInteger.incrementAndGet(); + } catch (ParseException e) { + e.printStackTrace(); + } finally { + countDownLatch.countDown(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (int i = 0; i < sum; i++) { + executorService.execute(new Task()); + } + countDownLatch.await(); + System.out.println("格式化成功次数为:" + atomicInteger.get()); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadLocal/J2_ThreadSafe.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadLocal/J2_ThreadSafe.java new file mode 100644 index 0000000..5deb993 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadLocal/J2_ThreadSafe.java @@ -0,0 +1,49 @@ +package com.heibaiying.threadLocal; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 使用ThreadLocal来保证线程安全 + */ +public class J2_ThreadSafe { + + private static ThreadLocal threadLocal = new ThreadLocal<>(); + private static int sum = 1000; + private static CountDownLatch countDownLatch = new CountDownLatch(sum); + private static AtomicInteger atomicInteger = new AtomicInteger(0); + + static class Task implements Runnable { + + @Override + public void run() { + try { + if (threadLocal.get() == null) { + threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); + } + Date parse = threadLocal.get().parse("2018-08-08 08:08:08"); + System.out.println(parse); + atomicInteger.incrementAndGet(); + } catch (ParseException e) { + e.printStackTrace(); + } finally { + countDownLatch.countDown(); + } + } + } + + public static void main(String[] args) throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (int i = 0; i < sum; i++) { + executorService.execute(new Task()); + } + countDownLatch.await(); + System.out.println("格式化成功次数为:" + atomicInteger.get()); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J1_ThreadPool.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J1_ThreadPool.java new file mode 100644 index 0000000..aed2913 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J1_ThreadPool.java @@ -0,0 +1,25 @@ +package com.heibaiying.threadPool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 线程池的基本使用 + */ +public class J1_ThreadPool { + + static class Task implements Runnable { + @Override + public void run() { + System.out.println(Thread.currentThread().getName() + "正在执行"); + } + } + + public static void main(String[] args) { + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (int i = 0; i < 100; i++) { + executorService.submit(new Task()); + } + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J2_ScheduledTask.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J2_ScheduledTask.java new file mode 100644 index 0000000..b0b2c26 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J2_ScheduledTask.java @@ -0,0 +1,46 @@ +package com.heibaiying.threadPool; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 计划任务 + */ +public class J2_ScheduledTask { + + private static long cacheTime = System.currentTimeMillis(); + + static class Task implements Runnable { + + private String type; + + Task(String type) { + this.type = type; + } + + @Override + public void run() { + try { + Thread.sleep(5000); + long nowTime = System.currentTimeMillis(); + System.out.println(type + Thread.currentThread().getId() + "执行耗时" + (nowTime - cacheTime) + "毫秒"); + cacheTime = nowTime; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void main(String[] args) { + ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); + // 只执行一次 + pool.schedule(new Task("schedule"), 2, TimeUnit.SECONDS); + // 指定2秒为固定周期执行,如果项目执行耗时5秒,则项目结束后立马执行下一次任务,所以输出的时间间隔为5秒 + pool.scheduleAtFixedRate(new Task("FixedRate"), 0, 2, TimeUnit.SECONDS); + // 总是在上一次项目结束后间隔指定周期执行,所以项目耗时5秒,还需要间隔2秒执行,所以输出的时间间隔为7秒 + pool.scheduleWithFixedDelay(new Task("WithFixedDelay"), 0, 2, TimeUnit.SECONDS); + // pool.shutdown(); + } + +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J3_CustomThreadPool.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J3_CustomThreadPool.java new file mode 100644 index 0000000..9a779f7 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J3_CustomThreadPool.java @@ -0,0 +1,46 @@ +package com.heibaiying.threadPool; + +import java.util.concurrent.*; + +public class J3_CustomThreadPool { + + private static int i = 0; + private static CountDownLatch latch = new CountDownLatch(1000); + + static class Task implements Runnable { + + @Override + public void run() { + increase(); + } + + private void increase() { + synchronized (this) { + i++; + } + System.out.println(Thread.currentThread().getName() + "输出:" + i); + latch.countDown(); + } + } + + public static void main(String[] args) throws InterruptedException { + + // 自定义线程池 + ExecutorService executorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + r -> { + Thread thread = new Thread(r); + thread.setDaemon(true); + System.out.println("create" + thread.getName()); + return thread; + }); + + Task task = new Task(); + for (int i = 0; i < 1000; i++) { + executorService.submit(task); + } + latch.await(); + System.out.println("最后的结果是" + i); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J4_ExtendedThreadPool.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J4_ExtendedThreadPool.java new file mode 100644 index 0000000..39869e8 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/threadPool/J4_ExtendedThreadPool.java @@ -0,0 +1,55 @@ +package com.heibaiying.threadPool; + +import java.util.concurrent.*; + +public class J4_ExtendedThreadPool { + + private static int i = 0; + private static CountDownLatch latch = new CountDownLatch(1000); + + static class Task implements Runnable { + + @Override + public void run() { + increase(); + } + + private void increase() { + synchronized (this) { + i++; + } + System.out.println(Thread.currentThread().getName() + "输出:" + i); + latch.countDown(); + } + } + + public static void main(String[] args) throws InterruptedException { + + // 自定义线程 + ExecutorService executorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>()) { + @Override + protected void beforeExecute(Thread t, Runnable r) { + System.out.println("线程" + t.getName() + "准备执行"); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + System.out.println("线程" + r + "执行结束"); + } + + @Override + protected void terminated() { + System.out.println("线程池退出"); + } + }; + + Task task = new Task(); + for (int i = 0; i < 1000; i++) { + executorService.submit(task); + } + latch.await(); + System.out.println("最后的结果是" + i); + executorService.shutdown(); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/yieldAndJoin/J1_Normal.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/yieldAndJoin/J1_Normal.java new file mode 100644 index 0000000..0353ef9 --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/yieldAndJoin/J1_Normal.java @@ -0,0 +1,19 @@ +package com.heibaiying.yieldAndJoin; + +/** + * 正常情况下,输出 0 + */ +public class J1_Normal { + + private static int j = 0; + + public static void main(String[] args) throws InterruptedException { + Thread thread = new Thread(() -> { + for (int i = 0; i < 100000; i++) { + j++; + } + }); + thread.start(); + System.out.println(j); + } +} diff --git a/code/Java/java-concurrency/src/main/java/com/heibaiying/yieldAndJoin/J2_Join.java b/code/Java/java-concurrency/src/main/java/com/heibaiying/yieldAndJoin/J2_Join.java new file mode 100644 index 0000000..c08bc9c --- /dev/null +++ b/code/Java/java-concurrency/src/main/java/com/heibaiying/yieldAndJoin/J2_Join.java @@ -0,0 +1,20 @@ +package com.heibaiying.yieldAndJoin; + +/** + * 使用Join让线程的并行执行换成串行执行,输出:100000 + */ +public class J2_Join { + + private static int j = 0; + + public static void main(String[] args) throws InterruptedException { + Thread thread = new Thread(() -> { + for (int i = 0; i < 100000; i++) { + j++; + } + }); + thread.start(); + thread.join(); + System.out.println(j); + } +}