代码之家  ›  专栏  ›  技术社区  ›  job

Java并行工作迭代器?

  •  5
  • job  · 技术社区  · 16 年前

    我正在寻找一个类,在这个类中,我可以重写一个方法来完成工作,并像迭代器一样返回结果。大概是这样的:

    ParallelWorkIterator<Result> itr = new ParallelWorkIterator<Result>(trials,threads) {
    
      public Result work() {
        //do work here for a single trial...
        return answer;
      }
    
    };
    while (itr.hasNext()) {
      Result result = itr.next();
      //process result...
    }
    

    有希望地 实现了这一点,但我没有足够的信心,我想我应该检查一下这样的东西是否已经存在。

    3 回复  |  直到 16 年前
        1
  •  13
  •   Tim    16 年前

    看一看这本书 ExecutorCompletionService . 它做你想做的一切。

       void solve(Executor e, Collection<Callable<Result>> solvers)
         throws InterruptedException, ExecutionException {
           //This class will hold and execute your tasks
           CompletionService<Result> ecs
               = new ExecutorCompletionService<Result>(e);
           //Submit (start) all the tasks asynchronously
           for (Callable<Result> s : solvers)
               ecs.submit(s);
           //Retrieve completed task results and use them
           int n = solvers.size();
           for (int i = 0; i < n; ++i) {
               Result r = ecs.take().get();
               if (r != null)
                   use(r);
           }
       }
    

    使用CompletionService的好处是它总是返回第一个完成的结果。这可以确保您不必等待任务完成,并允许未完成的任务在后台运行。

        2
  •  2
  •   Brian Agnew    16 年前

    我建议您看看Java Executors

    您提交了许多任务并获得了 Future 为每个对象返回对象。您的工作是在后台处理的,您可以在未来的对象中进行迭代(就像您在上面所做的那样)。每个Future在可用时返回一个结果(通过调用 get() -这将阻塞,直到在单独的线程中生成结果为止)

        3
  •  1
  •   Adamski    16 年前

    我能想到的最贴切的事情就是使用 CompletionService 在完成时积累结果。

    简单的例子:

    ExecutorService executor = Executors.newSingleThreadExecutor(); // Create vanilla executor service.
    CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor); // Completion service wraps executor and is notified of results as they complete.
    Callable<Result> callable = new MyCallable();
    
    executor.submit(callable); // Do not store handle to Future here but rather obtain from CompletionService when we *know* the result is complete.
    
    Future<Result> fut = completionService.take(); // Will block until a completed result is available.
    Result result = fut.get(); // Will not block as we know this future represents a completed result.
    

    Iterator 接口作为 Future get() 方法可以引发两个可能的已检查异常: ExecutionException InterruptedException 因此,你需要捕捉它们,要么吞下它们,要么将它们重新吸出来 RuntimeException s、 这两件事都不是一件好事。此外,您的 迭代器 hasNext() next() 如果有正在进行的任务,则可能需要阻止方法,这对于使用 迭代器 . 相反,我将实现自己更具描述性的接口;例如

    public interface BlockingResultSet {
      /**
       * Returns next result when it is ready, blocking is required.
       * Returns null if no more results are available.
       */  
      Result take() throws InterruptedException, ExecutionException;
    }
    

    take() 通常表示 java.util.concurrent 包装)。