代码之家  ›  专栏  ›  技术社区  ›  Mark

在一组文档上使用ForkJoinPool

  •  2
  • Mark  · 技术社区  · 7 年前

    我从未使用过ForkJoinPool,我是通过这个代码片段来访问的。

    Set<Document> docs . 文档具有写入方法。如果我执行以下操作,是否需要get或join来确保集合中的所有文档都正确完成了它们的写入方法?

    ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);
    pool.submit(() -> docs.parallelStream().forEach(
        doc -> {
            doc.write();
        })
    );
    

    1 回复  |  直到 7 年前
        1
  •  1
  •   Didier L    6 年前

    ForkJoinPool.submit(Runnable) 返回 ForkJoinTask 表示任务的挂起完成。如果希望等待处理所有文档,则需要与该任务进行某种形式的同步,例如调用 its get() method Future 接口)。

    关于异常处理,通常流处理期间的任何异常都会停止它。但是您必须参考 Stream.forEach(Consumer)

    此操作的行为显式不确定。对于并行流管道,此操作不能保证尊重流的相遇顺序,因为这样做会牺牲并行的好处。对于任何给定的元素,可以在库选择的任何时间和线程中执行操作。[]

    这意味着,如果发生异常,您无法保证将写入哪个文档。处理将停止,但您无法控制仍将处理哪个文档。

    如果您想确保处理完剩余的文档,我建议使用两种解决方案:

    • document.write() 用一个 try / catch 以确保没有异常传播,但这使得很难检查哪个文档成功或是否有任何失败;或
    • CompletableFuture API . 如评论中所述,由于实现细节,您当前的解决方案是有效的,因此最好做一些更干净的事情。

    完全未来 ,您可以执行以下操作:

    List<CompletableFuture<Void>> futures = docs.stream()
                        .map(doc -> CompletableFuture.runAsync(doc::write, pool))
                        .collect(Collectors.toList());
    

    推荐文章