福田网站 建设seo信科,卓天商务跨境电商,网络营销中常用的营销策略,品牌策划公司和品牌设计公司前言
Java CompletableFuture 提供了一种异步编程的方式#xff0c;可以在一个线程中执行长时间的任务#xff0c;而不会堵塞主线程。
和Future相比#xff0c;CompletableFuture不仅实现了Future接口#xff0c;也实现了 CompletionStage接口。Future接口不用多说#…前言
Java CompletableFuture 提供了一种异步编程的方式可以在一个线程中执行长时间的任务而不会堵塞主线程。
和Future相比CompletableFuture不仅实现了Future接口也实现了 CompletionStage接口。Future接口不用多说CompletionStage接口将多个CompletionStage执行顺序依赖给抽象了出来。
有了CompletableFuture接口就能将多个异步事件的结果进行执行顺序编排。
使用
可数操作
一般使用 CompletableFuture的场景是有一个 a 操作一个 b操作还有一个 c 操作依赖 a、b两个操作的返回结果。可以直接使用 allOf()接受一长串的入参也可以使用thenCombine()针对两个操作的特定情况。
public static void main(String[] argv) {CompletableFutureString c1 CompletableFuture.supplyAsync(() - {try {Thread.sleep(second * 20);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1;});CompletableFutureString c2 CompletableFuture.supplyAsync(() - {try {Thread.sleep(second * 20);} catch (InterruptedException e) {throw new RuntimeException(e);}return 2;});CompletableFuture c9 CompletableFuture.allOf(c1, c2);c9.thenApply(v - {try {c1.get();c2.get();System.out.println(Everything is all right);} catch(Exception e) {e.printStackTrace();} finally {System.out.println(Something error);}return v;});c9.join();}可变操作
当想要处理的 CompletableFuture 是可变的比如说根据数据库查出的数据每个都需要执行一个 CompletableFuture 操作也就是 n 个 CompletableFuture。 CompletableFutureVoid allFuture CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));CompletableFutureListT result allFuture.thenApply(v -completableFutureList.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));ListT tList result.get(50, TimeUnit.SECONDS);源码实现
CompletableFuture 类成员变量
CompletableFuture中有一个 volatile 关键词修饰的成员变量resultCompletableFuture.get()函数中的返回的就是这个变量。它会先检查result变量是否为null不为null则直接返回为null则会根据是否可中断进行一个while循环等。 根据使用get() 或者 get(long timeout, TimeUnit unit) 函数的不同最终等待result结果的函数也不同。get(long timeout, TimeUnit unit)函数会是用 timedGet(long nanos) 函数进行等待。
/*** Waits if necessary for this future to complete, and then* returns its result.** return the result value* throws CancellationException if this future was cancelled* throws ExecutionException if this future completed exceptionally* throws InterruptedException if the current thread was interrupted* while waiting*/public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r result) null ? waitingGet(true) : r);}除了代表结果的 result 之外还有一个 Completion 类 的变量 stack。从断点执行和代码的注解上看这个stack代表者从属当前CompletableFuture的操作。当前CompletableFuture操作执行完毕后(result里有结果)会引动其他Completion进行处理。
/* * A CompletableFuture may have dependent completion actions,* collected in a linked stack. It atomically completes by CASing* a result field, and then pops off and runs those actions. This* applies across normal vs exceptional outcomes, sync vs async* actions, binary triggers, and various forms of completions.*/*
可以通过截图看出 在Idea 内存中有和没有 Completion stack的CompletableFuture相比有比没有多了 1 dependents的标记
Completion stack里没有东西的CompletableFutureCompletion stack里有东西的CompletableFuture
CompletableFuture 多个操作组织结构
CompletableFuture类能够通过 CompletableFuture.allOf()或者 CompletableFuture.anyOf()将多个CompletableFuture 对象组合在一起等到满足条件时再触发之后操作的执行。
以allOf方法为例CompletableFuture.allOf(CompletableFuture?... cfs) 方法会整合作为入参的所有CompletableFuture等到他们呢所有的都完成之后才返回结果。
/* ------------- Arbitrary-arity constructions -------------- *//*** Returns a new CompletableFuture that is completed when all of* the given CompletableFutures complete. If any of the given* CompletableFutures complete exceptionally, then the returned* CompletableFuture also does so, with a CompletionException* holding this exception as its cause. Otherwise, the results,* if any, of the given CompletableFutures are not reflected in* the returned CompletableFuture, but may be obtained by* inspecting them individually. If no CompletableFutures are* provided, returns a CompletableFuture completed with the value* {code null}.** pAmong the applications of this method is to await completion* of a set of independent CompletableFutures before continuing a* program, as in: {code CompletableFuture.allOf(c1, c2,* c3).join();}.** param cfs the CompletableFutures* return a new CompletableFuture that is completed when all of the* given CompletableFutures complete* throws NullPointerException if the array or any of its elements are* {code null}*/public static CompletableFutureVoid allOf(CompletableFuture?... cfs) {return andTree(cfs, 0, cfs.length - 1);}/** Recursively constructs a tree of completions. */static CompletableFutureVoid andTree(CompletableFuture?[] cfs,int lo, int hi) {CompletableFutureVoid d new CompletableFutureVoid();if (lo hi) // emptyd.result NIL;else {CompletableFuture? a, b;int mid (lo hi) 1;if ((a (lo mid ? cfs[lo] :andTree(cfs, lo, mid))) null ||(b (lo hi ? a : (hi mid1) ? cfs[hi] :andTree(cfs, mid1, hi))) null)throw new NullPointerException();if (!d.biRelay(a, b)) {BiRelay?,? c new BiRelay(d, a, b);a.bipush(b, c);c.tryFire(SYNC);}}return d;}/** Pushes completion to this and b unless both done. */final void bipush(CompletableFuture? b, BiCompletion?,?,? c) {if (c ! null) {Object r;while ((r result) null !tryPushStack(c))lazySetNext(c, null); // clear on failureif (b ! null b ! this b.result null) {Completion q (r ! null) ? c : new CoCompletion(c);while (b.result null !b.tryPushStack(q))lazySetNext(q, null); // clear on failure}}} /** Returns true if successfully pushed c onto stack. */final boolean tryPushStack(Completion c) {Completion h stack;lazySetNext(c, h);return UNSAFE.compareAndSwapObject(this, STACK, h, c);} boolean biRelay(CompletableFuture? a, CompletableFuture? b) {Object r, s; Throwable x;if (a null || (r a.result) null ||b null || (s b.result) null)return false;if (result null) {if (r instanceof AltResult (x ((AltResult)r).ex) ! null)completeThrowable(x, r);else if (s instanceof AltResult (x ((AltResult)s).ex) ! null)completeThrowable(x, s);elsecompleteNull();}return true;}
从源码上看是是将整个CompletableFuture数组通过andTree()方法划分成了一颗二叉树这个二叉树的叶子节点是传入的CompletableFuture对象非叶子节点代表了它的子节点CompletableFuture的完成情况。
然后检测根节点的CompletableFuture的两个子节点是否完成。 cfs1、cfs2、cfs3、cfs4 是allOf的入参四个CompletableFuture对象。
代码中通过a.bipush(b, c) 将 a、b串在一起。因为涉及到UNSAFE方法不知道方法具体执行了什么操作。所以只能通过IDEA里内存里实际的值去由结果推过程。
a.bipush(b,c) 前内存各个变量实际值。 a.bipush(b,c) 后内存各个变量实际值。 tryPushStack(Completion c) 方法前
tryPushStack(Completion c) 方法后 可以看到内存中 变量b 对应的内存地址为 75bd9247的 stack被赋值了成为了Completion c。
tryFire(int mode)方法执行前 可以看到 cfs 除了 cfs1 之外其他的 cfs 中的 stack都被赋值了。通过观察IDEA中内存中对象实际值可以发现stack中 的 src 是 自己的树上的兄弟节点 snd 是自己。 CompletableFuture 多个操作执行顺序控制
CompletableFuture 一个节点要开始执行的前提是他的子节点全部执行完毕之后才能触发自己节点上的操作。
当调用CompletableFuture 异步执行方法 supplyAsync 会传递一个 Supplier 对象作为入参。这个Supplier 会被封装成为 一个Runnable 子类 AsyncSupply 对象作为其抽象方法 run 中 执行的一部分。
CompletableFutureString c2 CompletableFuture.supplyAsync(() - {try {Thread.sleep(second * 20);} catch (InterruptedException e) {throw new RuntimeException(e);}return 2;});---------------------------------------------------------------------------------/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {link ForkJoinPool#commonPool()} with* the value obtained by calling the given Supplier.** param supplier a function returning the value to be used* to complete the returned CompletableFuture* param U the functions return type* return the new CompletableFuture*/public static U CompletableFutureU supplyAsync(SupplierU supplier) {return asyncSupplyStage(asyncPool, supplier);}static U CompletableFutureU asyncSupplyStage(Executor e,SupplierU f) {if (f null) throw new NullPointerException();CompletableFutureU d new CompletableFutureU();e.execute(new AsyncSupplyU(d, f));return d;}-----------------------------------------------------------------------------------public void run() {// fn 就是 CompletableFuture.supplyAsync 传入的 Supplier CompletableFutureT d; SupplierT f;if ((d dep) ! null (f fn) ! null) {dep null; fn null;if (d.result null) {try {// 将 Supplier 处理结果赋值给 CompletableFuture 的 resultd.completeValue(f.get());} catch (Throwable ex) {d.completeThrowable(ex);}} // Pops and tries to trigger all reachable dependents. Call only when known to be done.d.postComplete();}}
从源码中可以看到当执行了CompletableFuture.supplyAsync()他的通知机制封装在实现Runnable抽象方法run里。当你传入的Supplier 有结果返回之后会调用 CompletableFuture 中的 postComplete() 方法通知 stack中其他可达的 从属 Completion让他们各自完成自己的 action。
/*** Pops and tries to trigger all reachable dependents. Call only* when known to be done.*/final void postComplete() {/** On each step, variable f holds current dependents to pop* and run. It is extended along only one path at a time,* pushing others to avoid unbounded recursion.*/CompletableFuture? f this; Completion h;while ((h f.stack) ! null ||(f ! this (h (f this).stack) ! null)) {CompletableFuture? d; Completion t;if (f.casStack(h, t h.next)) {if (t ! null) {if (f ! this) {pushStack(h);continue;}h.next null; // detach}// 将 下一个需要执行的 Completion 弹出来后 执行 tryFiref (d h.tryFire(NESTED)) null ? this : d;}}}static final class UniApplyT,V extends UniCompletionT,V {Function? super T,? extends V fn;UniApply(Executor executor, CompletableFutureV dep,CompletableFutureT src,Function? super T,? extends V fn) {super(executor, dep, src); this.fn fn;}final CompletableFutureV tryFire(int mode) {CompletableFutureV d; CompletableFutureT a;if ((d dep) null ||// uniApply 对封装的 Supplier 进行执行!d.uniApply(a src, fn, mode 0 ? null : this))return null;dep null; src null; fn null;return d.postFire(a, mode);}} final S boolean uniApply(CompletableFutureS a,Function? super S,? extends T f,UniApplyS,T c) {Object r; Throwable x;if (a null || (r a.result) null || f null)return false;tryComplete: if (result null) {if (r instanceof AltResult) {if ((x ((AltResult)r).ex) ! null) {completeThrowable(x, r);break tryComplete;}r null;}try {if (c ! null !c.claim())return false;SuppressWarnings(unchecked) S s (S) r;// 这里实际执行 CompletableFuture 的 SuppliercompleteValue(f.apply(s));} catch (Throwable ex) {completeThrowable(ex);}}return true;}
从Idea里的 栈帧中可以看出来是由 CompletableFuture 1 执行完后的 postComplete 引发了接下来的CompletableFuture