Skip to content

Commit

Permalink
多线程编程
Browse files Browse the repository at this point in the history
  • Loading branch information
heibaiying committed Nov 18, 2019
1 parent 2ffc0a3 commit 910790b
Show file tree
Hide file tree
Showing 39 changed files with 1,605 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.heibaiying.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* 原子对象
*/
public class J1_SimpleType {

private static int i = 0;
private static AtomicInteger j = new AtomicInteger(0);
/*使用AtomicReference对普通对象进行封装*/
private static AtomicReference<Integer> k = new AtomicReference<>(0);

static class Task implements Runnable {

private CountDownLatch latch;

Task(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
i++;
j.incrementAndGet();
k.getAndUpdate(x -> x + 1);
latch.countDown();
}
}

public static void main(String[] args) throws InterruptedException {
int number = 10000;
CountDownLatch latch = new CountDownLatch(number);
Semaphore semaphore = new Semaphore(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Task task = new Task(latch);
for (int i = 0; i < number; i++) {
semaphore.acquire();
executorService.execute(task);
semaphore.release();
}
latch.await();
System.out.println("输出i的值" + i);
System.out.println("输出j的值" + j.get());
System.out.println("输出K的值" + k.get());
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.heibaiying.atomic;

import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class J2_ArrayThreadSafe {

// 对集合本生的操作线程安全
private static LinkedBlockingQueue<Integer> LinkedBlockingQueue = new LinkedBlockingQueue<>();
//对集合本生的操作线程安全
private static Vector<Integer> vector = new Vector<>();
//普通集合
private static ArrayList<Integer> arrayList = new ArrayList<>();

static class Task implements Runnable {

private CountDownLatch latch;

Task(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
for (int i = 0; i < 1000; i++) {
LinkedBlockingQueue.add(i);
vector.add(i);
arrayList.add(i);
}
latch.countDown();
}
}

public static void main(String[] args) throws InterruptedException {
int number = 1000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch));
}
latch.await();
System.out.println("集合本生的线程安全:");
System.out.println("LinkedBlockingQueue size : " + LinkedBlockingQueue.size());
System.out.println("vector size : " + vector.size());
System.out.println("arrayList size : " + arrayList.size());
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.heibaiying.atomic;

import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerArray;

public class J3_ArrayElementThreadUnsafe {

private static int capacity = 10;
// 保证对集合内元素的操作具有原子性
private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(capacity);
// 对集合内元素的操作线程不安全
private static Vector<Integer> vector = new Vector<>(capacity);
// 对集合内元素的操作线程不安全
private static ArrayList<Integer> arrayList = new ArrayList<>(capacity);

static {
for (int i = 0; i < capacity; i++) {
arrayList.add(0);
vector.add(0);
}
}

static class Task implements Runnable {

private CountDownLatch latch;

Task(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
for (int i = 0; i < 1000; i++) {
int num = i % capacity;
atomicIntegerArray.getAndIncrement(num);
vector.set(num, vector.get(num) + 1);
arrayList.set(num, arrayList.get(num) + 1);
}
latch.countDown();
}
}

public static void main(String[] args) throws InterruptedException {
int number = 1000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch));
}
latch.await();
System.out.println("集合内元素的线程安全:");
System.out.println("atomicIntegerArray size : " + atomicIntegerArray);
System.out.println("vector size : " + vector);
System.out.println("arrayList size : " + arrayList);
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.heibaiying.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class J4_ThreadUnsafe {

static class Task implements Runnable {

private Candidate candidate;
private CountDownLatch latch;

Task(CountDownLatch latch, Candidate candidate) {
this.candidate = candidate;
this.latch = latch;
}

@Override
public void run() {
candidate.setScore(candidate.getScore() + 1);
latch.countDown();
}
}

public static void main(String[] args) throws InterruptedException {
int number = 100000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Candidate candidate = new Candidate("候选人", 0);
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch, candidate));
}
latch.await();
System.out.println(candidate.getName() + "获得票数:" + candidate.getScore());
executorService.shutdown();
}


private static class Candidate {

private String name;
private volatile int score;

Candidate(String name, int score) {
this.name = name;
this.score = score;
}

public int getScore() {
return score;
}

public void setScore(int score) {
this.score = score;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.heibaiying.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class J5_AtomicIntegerFieldUpdater {

static class Task implements Runnable {

private Candidate candidate;
private CountDownLatch latch;
private AtomicIntegerFieldUpdater fieldUpdater;

Task(CountDownLatch latch, Candidate candidate, AtomicIntegerFieldUpdater fieldUpdater) {
this.candidate = candidate;
this.latch = latch;
this.fieldUpdater = fieldUpdater;
}

@Override
public void run() {
fieldUpdater.incrementAndGet(candidate);
latch.countDown();
}
}

public static void main(String[] args) throws InterruptedException {
int number = 100000;
CountDownLatch latch = new CountDownLatch(number);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Candidate candidate = new Candidate("候选人", 0);
AtomicIntegerFieldUpdater<Candidate> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
for (int i = 0; i < number; i++) {
executorService.execute(new Task(latch, candidate, fieldUpdater));
}
latch.await();
System.out.println(candidate.getName() + "获得票数:" + candidate.getScore());
executorService.shutdown();
}


private static class Candidate {

private String name;

// 1. 不能声明为private 2. 必须用volatile关键字修饰
public volatile int score;

Candidate(String name, int score) {
this.name = name;
this.score = score;
}

public int getScore() {
return score;
}

public void setScore(int score) {
this.score = score;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.heibaiying.atomic;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class J6_SynchronousQueue {

private static SynchronousQueue<Double> queue = new SynchronousQueue<>();

static class ReadThread implements Runnable {
@Override
public void run() {
System.out.println("读线程启动");
while (true) {
try {
Double peek = queue.take();
System.out.println("读线程获取值:" + peek);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

static class WriteThread implements Runnable {
@Override
public void run() {
System.out.println("写线程写入值");
try {
queue.put(Math.random());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public static void main(String[] args) throws InterruptedException {
new Thread(new ReadThread()).start();
Thread.sleep(3000);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
pool.scheduleAtFixedRate(new WriteThread(), 0, 2, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.heibaiying.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 等待与唤醒
*/
public class AwaitAndSignal {

private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();

static class IncreaseTask implements Runnable {
@Override
public void run() {
try {
lock.lock();
String threadName = Thread.currentThread().getName();
System.out.println(threadName + "等待通知...");
condition.await();
System.out.println(threadName + "获得锁");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new IncreaseTask());
thread1.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();
}
}
Loading

0 comments on commit 910790b

Please sign in to comment.