代码之家  ›  专栏  ›  技术社区  ›  Michele

等待阻塞队列已满

  •  0
  • Michele  · 技术社区  · 11 年前

    我正在寻找一种同步多个异步操作的方法。我想使用大小等于我的操作的BlockingQueue,但谁能等到队列满了?

    我正在寻找类似于反向阻塞队列的东西。

    我需要 收集结果 每个线程的结束。 AsyncHandler是固定的,它已经是ThreadExecutor基础,我无法启动新线程。

    //3 Times
    makeAsync(new AsyncHandler() {
       onSuccess() {
         ..
         queue.put(result)
       }
    
       onFailure() {
         ..
       }
    });
    
    //Blocking till the Queue is full
    List<Results> = queue.takeAll()
    

    附加问题:当我的一个请求失败时,我需要一种方法来结束等待

    4 回复  |  直到 11 年前
        1
  •  2
  •   Powerlord    11 年前

    我从来没有必要做这种事,但你可能会幸运地使用 CountDownLatch CyclicBarrier 从你的各种线程。

        2
  •  1
  •   Holger    11 年前

    你用什么来描述

    //Blocking till the Queue is full
    List<Results> results = queue.takeAll();
    

    在语义上与队列容量一样多的项目没有区别。如果您知道容量,可以通过以下方式实现:

    // preferably a constant which you also use to construct the bounded queue
    int capacity;
    

    List<Results> results = new ArrayList<>(capacity);
    queue.drainTo(results, capacity);
    while(result.size()<capacity)
        queue.drainTo(results, capacity-result.size());
    

    这将阻止,直到收到与 capacity 如前所述,这与等待队列变满(大小等于其容量)并接收所有项目相同。唯一的区别是,队列变满的事件不一定会发生,例如,如果您希望异步操作 offer 项目,直到队列已满,它不会以这种方式工作。

    如果你不知道自己的能力,你就没有运气了。甚至不能保证 BlockingQueue 是有界的,读,它可能有无限的容量。

    另一方面,如果异步操作能够检测到它们何时完成,那么它们只需在本地收集列表中的项目,然后将整个列表放入 BlockingQueue<List<Results>> 一旦完成,作为单个项目。那么等待它的代码只需要一个 take 以获得整个列表。

        3
  •  1
  •   Stuart Marks    11 年前

    如果您使用的是Java 8,请执行以下操作:

    • 每次呼叫时 makeAsync ,创建 CompletableFuture<Result> 实例,并将其提供给 AsyncHandler ,并让呼叫者也保留一个引用,比如在列表中。
    • 当异步任务正常完成时,调用 complete(result) 在其上 CompletableFuture 例子
    • 当异步任务以错误完成时,调用 completeExceptionally(exception) 在其上 可完成的未来 例子
    • 启动所有异步任务后,调用调用者 CompletableFuture.allOf(cfArray).join() 。不幸的是,这需要一个数组,而不是列表,因此必须转换。这个 join() 如果任何一个任务完成时出现错误,则调用将引发异常。否则,您可以从个人收集结果 可完成的未来 通过调用 get() 方法。

    如果您没有Java8,那么您必须使用自己的机制。初始化 CountDownLatch 将每个异步任务的结果(或异常,或其他指示失败的方式)存储到线程安全的数据结构中,然后递减(“countDown”)锁存器。让调用者等待锁存器达到零,然后收集结果和错误。这并不十分困难,但您必须确定存储有效结果的方法,并记录是否发生错误,还必须手动维护计数。

        4
  •  0
  •   Alex Suo    11 年前

    如果您可以修改methodAsync(),那么在每次将一些元素放入队列并让主线程等待这样的CountDownLatch之后,就可以使用CountDownLatch了。

    如果不幸无法修改methodAsync(),那么只需包装队列并给它一个倒计时锁存器,然后重写add()方法来倒计时这个锁存器。主要方法只是等待它完成。

    如上所述,您的程序结构看起来组织不好。