代码之家  ›  专栏  ›  技术社区  ›  St.Antario

CompletableFuture的完成处理程序在哪个线程中执行?

  •  47
  • St.Antario  · 技术社区  · 8 年前

    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
    

    JavaDoc就是这么说的:

    返回一个新的CompletionStage,当此阶段完成时 通常,以该阶段的结果作为 提供的功能。有关规则,请参阅CompletionStage文档 涵盖异常完成。

    线程呢?这将在哪个线程中执行?如果未来是由线程池完成的呢?

    5 回复  |  直到 4 年前
        1
  •  52
  •   Mike Strobel    8 年前

    @nullpointer 指出,文档告诉您需要了解的内容。然而,相关文本出人意料地模糊,这里发布的一些评论(和答案)似乎依赖于文档不支持的假设。因此,我认为把它分开是值得的。具体来说,我们应该仔细阅读这一段:

    听起来很简单,但细节很简单。它似乎有意避免描述 依赖完成可以在完成线程上调用,而不是在调用完成方法期间调用,如 thenApply . 如前所述,上述段落实际上是 让我们用假设来填补空白。这是危险的,尤其是当主题涉及并发和异步编程时,我们作为程序员开发的许多期望都会迎头赶上。让我们仔细看看文档

    声称受抚养人完成登记 呼叫 complete() 可以 调用完成方法时调用,如 没有 声明将调用完成 (注意“任何其他”)。

    对于任何使用 CompletableFuture 安排和组合任务。考虑以下事件序列:

    1. 线程A通过 f.thenApply(c1)
    2. 一段时间后,线程B调用 f.complete() .
    3. f.thenApply(c2)

    完成() 做两件事:它发布未来的结果,然后尝试调用依赖完成。现在,如果线程C运行,会发生什么 之后 结果值已过帐,但 线程B开始调用 c1 f c1 c2 . 或者,线程C可以调用 c2 . 文件没有排除任何一种可能性。考虑到这一点,以下是假设:

    1. c 注册日期 f 将在调用期间调用 f、 完成()
    2. c 回报;
    3. 依赖补全将按任何特定顺序调用(例如,注册顺序);
    4. 独立完成登记 之前 完成。

    再举一个例子:

    1. f、 完成() ;
    2. 一段时间后,线程B通过 f、 然后应用(c1)
    3. 大约在同一时间,线程C通过 .

    将在 f、 然后应用(c1) c2 将在 f、 然后应用(c2) . 有人可能进一步假设 届时将完成 f、 然后应用(c1) 返回。然而,文件 没有 最终调用 二者都 c1 ,而另一个线程两者都不调用。

    tldr: 小心你的假设,当你写文档时,要尽可能清楚和深思熟虑。虽然简洁是一件美妙的事情,但要警惕人类填补空白的倾向。

        2
  •  31
  •   Naman    8 年前

    CompletableFuture 文档可以帮助您更好地理解:

    • 为非异步方法的依赖完成提供的操作可能是 由执行 , 或由完成方法的任何其他调用方执行 .

    • ForkJoinPool.commonPool() (除非不支持 并行度级别至少为两级,在这种情况下,将创建一个新线程 创建以运行每个任务 ). 简化监视、调试和 跟踪时,所有生成的异步任务都是标记的实例 CompletableFuture.AsynchronousCompletionTask .

    使现代化 this answer 由@Mike作为一个有趣的分析进一步深入到文档的细节。

        3
  •  9
  •   NPE    8 年前

    Javadoc

    为非异步方法的依赖完成提供的操作可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用方执行。

    更具体地说:

    • fn complete() 在任何线程调用的上下文中 完成() .

    • 如果 已经完成了 thenApply() fn 将在线程调用的上下文中运行 .

        4
  •  7
  •   John Kugelman Michael Hodel    7 年前

    Async CompletableFuture 不要自己产生新线程。工作将在现有线程下进行。

    thenApply 将在原始版本中运行 可完成的未来 complete() ,或调用 thenApply() 如果未来已经完成。如果你想控制线程,如果 fn thenApplyAsync .

        5
  •  1
  •   ALargeTom    4 年前

    我知道这个问题很老,但我想用源代码来解释这个问题。

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }
    
    private CompletableFuture<Void> uniAcceptStage(Executor e,
                                                   Consumer<? super T> f) {
        if (f == null) throw new NullPointerException();
        Object r;
        if ((r = result) != null)
            return uniAcceptNow(r, e, f);
        CompletableFuture<Void> d = newIncompleteFuture();
        unipush(new UniAccept<T>(e, d, this, f));
        return d;
    }
    

    这是来自java 16的源代码,我们可以看到,如果我们触发Accept,我们将把一个空的executor服务引用传递到我们的函数中。 从第二个函数uniAcceptStage()第二个if条件。如果结果不为null,它将触发uniAcceptNow()

    if (e != null) {
         e.execute(new UniAccept<T>(null, d, this, f));
    } else {
         @SuppressWarnings("unchecked") T t = (T) r;
         f.accept(t);
         d.result = NIL;
    }
    

    如果执行器服务为null,我们将使用lambda函数f.accept(t)执行它。如果我们从主线程触发这个应用/接受,它将使用主线程作为执行线程。

    final void postComplete() {
        /*
         * On each step, variable f holds current dependents to pop
         * and run.  It is extended along only one path at a time,
         * pushing others to avoid unbounded recursion.
         */
        CompletableFuture<?> f = this; Completion h;
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (STACK.compareAndSet(f, h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    NEXT.compareAndSet(h, t, null); // try to detach
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }
    
    推荐文章