From b72cc1092de0d3bbc6c6cfcb2322c641132ac255 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E5=BF=97=E5=9D=9A?= Date: Sat, 9 May 2020 10:13:00 +0800 Subject: [PATCH 1/3] add jdk8 completablefuture implementation --- .gitignore | 25 +++++++++ pom.xml | 8 +++ .../cn/hackingwu/promise/BasicPromise.java | 10 ++++ .../cn/hackingwu/promise/jdk8/IOPromise.java | 55 +++++++++++++++++++ .../cn/hackingwu/promise/jdk8/Promise.java | 35 ++++++++++++ .../java/cn/hackingwu/promise/jdk8/readme.md | 19 +++++++ 6 files changed, 152 insertions(+) create mode 100644 .gitignore create mode 100644 src/main/java/cn/hackingwu/promise/BasicPromise.java create mode 100644 src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java create mode 100644 src/main/java/cn/hackingwu/promise/jdk8/Promise.java create mode 100644 src/main/java/cn/hackingwu/promise/jdk8/readme.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..135dc4f --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +.idea +*.iml diff --git a/pom.xml b/pom.xml index 1fa76c7..c1e3a2d 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,14 @@ 1.7 + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + diff --git a/src/main/java/cn/hackingwu/promise/BasicPromise.java b/src/main/java/cn/hackingwu/promise/BasicPromise.java new file mode 100644 index 0000000..a68949b --- /dev/null +++ b/src/main/java/cn/hackingwu/promise/BasicPromise.java @@ -0,0 +1,10 @@ +package cn.hackingwu.promise; + +/** + * @Author: Jason Wu + * @Date: 2020/5/8 + * @Description: + */ +public interface BasicPromise { + +} diff --git a/src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java b/src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java new file mode 100644 index 0000000..37df518 --- /dev/null +++ b/src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java @@ -0,0 +1,55 @@ +package cn.hackingwu.promise.jdk8; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @Author: Jason Wu + * @Date: 2020/5/8 + * @Description: + */ +public class IOPromise { + + static Executor executor; + + static { + + //因为Executors提供的newCachedThreadPool和singleThreadExecutor + //允许请求的队列长度最大为Integer.MAX_VALUE,可能会堆积大量的请求,导致OOM +// executor = Executors.newCachedThreadPool(); +// executor = Executors.newSingleThreadExecutor(); + executor = new ThreadPoolExecutor(5, 200, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(1024), new PromiseThreadFactory(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + } + + Executor getExecutor(){ + return this.getExecutor(); + } + + + static class PromiseThreadFactory implements ThreadFactory { + + final AtomicInteger count = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + //daemon的thread在运行不影响JVM关闭 + thread.setDaemon(true); + thread.setName("IOPromise" + count.getAndIncrement()); + thread.setPriority(Thread.NORM_PRIORITY); + return null; + } + } + +} diff --git a/src/main/java/cn/hackingwu/promise/jdk8/Promise.java b/src/main/java/cn/hackingwu/promise/jdk8/Promise.java new file mode 100644 index 0000000..6f03430 --- /dev/null +++ b/src/main/java/cn/hackingwu/promise/jdk8/Promise.java @@ -0,0 +1,35 @@ +package cn.hackingwu.promise.jdk8; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; + +/** + * @Author: Jason Wu + * @Date: 2020/5/8 + * @Description: + */ +public class Promise { + + private static CompletableFuture[] supplyAsync(Supplier... suppliers){ + CompletableFuture[] futures = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + futures[i] = CompletableFuture.supplyAsync(suppliers[i], getExecutor()); + } + return futures; + } + + public static CompletableFuture all(Supplier... suppliers){ + return CompletableFuture.allOf(supplyAsync(suppliers)); + } + + public static CompletableFuture any(Supplier... suppliers){ + return CompletableFuture.anyOf(supplyAsync(suppliers)); + } + + + static Executor getExecutor(){ + return ForkJoinPool.commonPool(); + } +} diff --git a/src/main/java/cn/hackingwu/promise/jdk8/readme.md b/src/main/java/cn/hackingwu/promise/jdk8/readme.md new file mode 100644 index 0000000..316a62e --- /dev/null +++ b/src/main/java/cn/hackingwu/promise/jdk8/readme.md @@ -0,0 +1,19 @@ +CompletableFuture实现了CompletionStage接口,CompletionStage提供了很多方法,有 +thenApply, thenAccept, thenRun这三个常用的返回,表示当一个future完成后继续执行的动作。 +这个三个方法后面都可以加上后缀Async,即thenApplyAsync, thenAccpetAsync, thenRunAsync 。 +他们与没有Async的区别是,xxxAsync多接受了一个参数Executor,表示继续执行的动作是在哪个Executor上执行。 +如果没有指定Executor,是在asyncPool上执行,即和stream().parallel()用的是同一个Exceutor, +其线程数受CPU核心数限制。如果你要执行的动作是IO密集型的,可以自定义Executor提供更多的线程。 +` +CompletableFuture.java +private static final Executor asyncPool = useCommonPool ? + ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); + ` + +那么thenApply, thenAccept, thenRun 三个方法有啥区别。还是看其方法签名(入参和返回值)。 +thenApply public CompletionStage thenApply(Function fn); +接受一个Function,可以拿到上一个future的返回值,并且处理完,返回一个对象,可以继续被处理。 +thenAccept public CompletionStage thenAccept(Consumer action); +接受一个Consumer,可以拿到上一个future的返回值,但是处理完没有返回值。 +thenApply public CompletionStage thenRun(Runnable action); +接受一个Runable,拿不到上一个返回值,处理完没有返回值。 \ No newline at end of file From 1b8784599b24a429a123b61eff28864e74030b07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E5=BF=97=E5=9D=9A?= Date: Tue, 12 May 2020 18:32:23 +0800 Subject: [PATCH 2/3] add some methods --- .../cn/hackingwu/promise/jdk8/IOPromise.java | 34 +++++++++++++++++++ .../cn/hackingwu/promise/jdk8/Promise.java | 17 ++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java b/src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java index 37df518..9bbc094 100644 --- a/src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java +++ b/src/main/java/cn/hackingwu/promise/jdk8/IOPromise.java @@ -1,5 +1,6 @@ package cn.hackingwu.promise.jdk8; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -9,6 +10,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import static cn.hackingwu.promise.jdk8.Promise.all; /** * @Author: Jason Wu @@ -52,4 +56,34 @@ public Thread newThread(Runnable r) { } } + public static Supplier getSupplier(int i){ + return () -> { + System.out.println("supplier"+i+": "+Thread.currentThread().getId()); + Supplier supplier1_1 = () ->{ + System.out.println("supplier"+i+"_1: "+Thread.currentThread().getId()); + return null; + }; + Supplier supplier1_2 = () ->{ + System.out.println("supplier"+i+"_2: "+Thread.currentThread().getId()); + return null; + }; + try { + Promise.all(supplier1_1, supplier1_2).get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + return null; + }; + } + + public static void main(String[] args){ + Supplier supplier1 = getSupplier(1); + Supplier supplier2 = getSupplier(2); + Supplier supplier3 = getSupplier(3); + + all(supplier1, supplier2, supplier3); + } + } diff --git a/src/main/java/cn/hackingwu/promise/jdk8/Promise.java b/src/main/java/cn/hackingwu/promise/jdk8/Promise.java index 6f03430..7070592 100644 --- a/src/main/java/cn/hackingwu/promise/jdk8/Promise.java +++ b/src/main/java/cn/hackingwu/promise/jdk8/Promise.java @@ -20,6 +20,23 @@ private static CompletableFuture[] supplyAsync(Supplier... suppliers){ return futures; } + private static CompletableFuture[] runAsync(Runnable... runnables){ + CompletableFuture[] futures = new CompletableFuture[runnables.length]; + for (int i = 0; i < runnables.length; i++) { + futures[i] = CompletableFuture.runAsync(runnables[i], getExecutor()); + } + return futures; + } + + public static CompletableFuture all(Runnable... runnables){ + return CompletableFuture.allOf(runAsync(runnables)); + } + + public static CompletableFuture any(Runnable... runnables){ + return CompletableFuture.anyOf(runAsync(runnables)); + } + + public static CompletableFuture all(Supplier... suppliers){ return CompletableFuture.allOf(supplyAsync(suppliers)); } From 8f198c917761cb58267d04f36e7038ac68caab6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E5=BF=97=E5=9D=9A?= Date: Mon, 23 Aug 2021 09:37:24 +0800 Subject: [PATCH 3/3] Update pom.xml --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index c1e3a2d..f01e42b 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ junit junit - 4.12 + 4.13.1 test @@ -39,4 +39,4 @@ - \ No newline at end of file +