代码示例:
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException{FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println("-----come in FutureTask");try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }return ThreadLocalRandom.current().nextInt(100);});Thread t1 = new Thread(futureTask,"t1");t1.start();//3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,对于结果就是不见不散,会导致阻塞)//System.out.println(Thread.currentThread().getName()+"t"());//3秒钟后才出来结果,我只想等待1秒钟,过时不候System.out.println(Thread.currentThread().getName()+"t"(1L,TimeUnit.SECONDS));System.out.println(Thread.currentThread().getName()+"t"+" here");}
public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println("-----come in FutureTask");try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }return ""+ ThreadLocalRandom.current().nextInt(100);});new Thread(futureTask,"t1").start();System.out.println(Thread.currentThread().getName()+"t"+"线程完成任务");/*** 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果*/while (true){if(futureTask.isDone()){System.out.());break;}}}
CompletableFuture实现了Future,CompletionStage两个接口,故CompletableFuture功能更加强大
CompletableFuture
CompletionStage
CompletionStage 接口中的方法比较多,CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 10L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
private static void testSimpleCompletableFuture() throws InterruptedException, ExecutionException {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName() + "t is coming in ...");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + "t task was finished!");});// },poolExecutor); // 如果使用了自行创建的线程池,则使用自定义的或者特别指定的线程池执行异步代码System.out.()); // get方法返回null,因为runAsync方法无返回值
}
ForkJoinPoolmonPool-worker-1 is coming in ...
ForkJoinPoolmonPool-worker-1 task was finished!
null
同上,将runAsync方法改为supplyAsync方法,并返回一个整数类型的变量
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 10L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
private static void testSupplyAsync() throws InterruptedException, ExecutionException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "t is coming in ... supplyAsync");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + "t task was finished!");// return ThreadLocalRandom.current().nextInt(100);Runtime().availableProcessors();});// },poolExecutor);System.out.()); // get方法返回null,因为runAsync方法无返回值
}
ForkJoinPoolmonPool-worker-1 is coming in ... supplyAsync
ForkJoinPoolmonPool-worker-1 task was finished!
8
private static void testWhenCompleteAndExceptionally() {CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程long result = ThreadLocalRandom.current().nextLong(100);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("异步任务计算完成,结果为:" + result);if (result > 5) {int i = 1 / 0;}return result;}).whenComplete((res, err) -> {if (err == null) {System.out.println("异步任务执行正常,其结果为:" + res);}}).exceptionally(e -> {System.out.println("exceptionally,异步任务执行异常:" + e.getCause() + "< --- >" + e.getMessage());Runtime().freeMemory();});System.out.println(Thread.currentThread().getName() + " task was finished! ...");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}}
输出结果:
main task was finished! ...
异步任务计算完成,结果为:39
exceptionally,异步任务执行异常:java.lang.ArithmeticException: / by zero< --- >java.lang.ArithmeticException: / by zero
class Emall {@Getterprivate String emallName;public Emall(String emallName) {allName = emallName;}// 生成随机数,模拟价格计算public double computePrice(String productName) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);}}static List<Emall> list = Arrays.asList(new Emall("jdong"),new Emall("dangdang"),new Emall("taobao"),new Emall("pdd"),new Emall("tmall"));// 实现基本的功能需求// List<Emall> --> map --> List<String>public static List<String> getPrice(List<Emall> list, String productName) {return list.stream().map(emall ->String.format(productName + " in %s price is %.2f",EmallName(), emallputePrice(productName))).List());}
public static List<String> getPriceByCompletableFuture(List<Emall> list, String productName) {List<String> collect = list.stream().map(emall -> // 将每个商城对象emall映射成一个CompletableFuture异步任务,即将每个emall放到异步任务的线程中CompletableFuture.supplyAsync(() ->String.format(productName + " in %s price is %.2f",EmallName(), emallputePrice(productName)))).List()).stream().map(s -> s.join()).List());return collect;}
public static void main(String[] args) {long startTime = System.currentTimeMillis();List<String> list2 = getPriceByCompletableFuture(list, "mysql");for (String element : list2) {System.out.println(element);}long endTime = System.currentTimeMillis();System.out.println("异步处理任务 ----costTime: "+(endTime - startTime) +" 毫秒");long startTime2 = System.currentTimeMillis();List<String> ansList = getPrice(list, "mysql");for (String ele : ansList) {System.out.println(ele);}long endTime2 = System.currentTimeMillis();System.out.println("同步处理任务 ----costTime: "+(endTime2 - startTime2) +" 毫秒");}
输出结果:
mysql in jdong price is 110.77mysql in dangdang price is 110.44mysql in taobao price is 109.06mysql in pdd price is 109.14mysql in tmall price is 109.60异步处理任务 ----costTime: 1220 毫秒mysql in jdong price is 110.81mysql in dangdang price is 110.86mysql in taobao price is 110.08mysql in pdd price is 109.36mysql in tmall price is 109.12同步处理任务 ----costTime: 5050 毫秒
// 不见不散
public T get()// 过时不候
public T get(long timeout, TimeUnit unit)// 没有计算完成的情况下,给我一个替代结果
// 立即获取结果不阻塞 计算完,返回计算完成后的结果 没算完,返回设定的valueIfAbsent值
public T getNow(T valueIfAbsent)
// 和get方法功能类似,但join方法不需要抛出异常
public T join()
private static void testGetAndGetNow() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "111";});// try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }// 注释掉如上的Sleep方法,getNow返回的结果是方法中设置的值"2222"// 不注释如上的sleep方法,getNow方法的结果是future返回的结果值"111"System.out.Now("2222"));
}
System.out.println(CompletableFuture.supplyAsync(() -> "abc").thenApply(r -> r + "123").join());
// abc123
public static void main(String[] args) {// 注意: 这里最好不要使用 ExecutorService service = wFixedThreadPool(3);的方式来创建线程池(见阿里开发手册)ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy()//new ThreadPoolExecutor.AbortPolicy()//new ThreadPoolExecutor.DiscardPolicy()//new ThreadPoolExecutor.DiscardOldestPolicy());// 当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,CompletableFuture.supplyAsync(()->{try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("异步任务操作步骤 111");return 1;},threadPool).thenApply(res ->{System.out.println("异步任务操作步骤 222");return res + 2;}).thenApply(res ->{System.out.println("异步任务操作步骤 333");// int i = 10 / 0; // 异常情况:哪步出错就停在哪步return res + 3;}).whenComplete((result,err)->{if (err == null){System.out.println("异步任务处理正常,计算结果为:" + result); // result = 1 + 2 + 3 = 6}}).exceptionally(e->{e.getStackTrace();System.out.Message());return 404;});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭,即主线程执行太快,导致异步任务无法执行,可以让主线程Sleep几秒// 除了sleep的方法,还可以使用线程池来处理,注意,最后一定关闭线程池 threadPool.shutdown(); 否则,程序会一直处于运行状态System.out.println(Thread.currentThread().getName() + "t" + " --- 主线程先去忙其他任务");// try {// TimeUnit.SECONDS.sleep(1);// } catch (InterruptedException e) {// throw new RuntimeException(e);// }threadPool.shutdown();}
// 当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,// 异常情况:有异常也可以往下一步走,handle会根据异常参数在最后抛出对应的异常CompletableFuture.supplyAsync(() -> {//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }System.out.println("111");return 1024;}).handle((f,e) -> {// int age = 10/0;System.out.println("222");return f + 1;}).handle((f,e) -> {System.out.println("333");return f + 1;}).whenCompleteAsync((res,e) -> {System.out.println("任务处理的结果为: "+res);}).exceptionally(e -> {e.printStackTrace();return null;});System.out.println("-----主线程结束, END");// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
CompletableFuture.supplyAsync(()->{return 1;}).thenApply(num ->{return num + 2;}).thenApply(num ->{return num + 3;}).thenAccept(num -> System.out.println(num));System.out.println("--------------------");System.out.println(CompletableFuture.supplyAsync(()-> "the resultA of thenRun").thenRun(()->{System.out.println("thenRun,B不需要A的结果");}).join());System.out.println(CompletableFuture.supplyAsync(()-> "任务A和B依次先后执行,B需要A的结果,thenAccept").thenAccept(res -> System.out.println(res + ",任务B无返回值")).join());System.out.println(CompletableFuture.supplyAsync(()-> "任务A和B先后执行,B需要A的结果").thenApply(str -> str + " --> 任务B有返回值").join());
applyToEither方法:任务谁执行快,就用谁(选用执行速度最快的任务 返回的结果作为最后的总结果)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
使用实例:
CompletableFuture<String> planA = CompletableFuture.supplyAsync(() -> {System.out.println("A ");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planA";
});CompletableFuture<String> planB = CompletableFuture.supplyAsync(() -> {System.out.println("B ");try {TimeUnit.SECONDS.sleep(3);// TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return "planB";
});
CompletableFuture<String> result = planA.applyToEither(planB, p -> {return p + " is winner!"; // 两个任务planA和planB,谁执行得更快,p就是对应的返回结果
});// 补充:
// join()和get()方法类似,其不同在于使用get方法需要抛出异常 或 进行异常捕获,
// join()则不需要抛出异常,也不用进行异常捕获,但出现异常时,会直接抛异常
System.out.println(Thread.currentThread().getName() + "t" +result.join());
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "t --- 开始执行任务1...");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}return 20;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + "t -- 开始执行任务2..");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}return 30;});CompletableFuture<Integer> result = future1.thenCombine(future2, (x, y) -> {System.out.println("---> 合并两个任务的计算结果");return x * y;});System.out.println(result.join());//System.out.()); // get方法需要抛出异常或进行异常捕获
参考资料:尚硅谷2022版JUC并发编程
本文发布于:2024-01-28 08:49:45,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064029866232.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |