代码之家  ›  专栏  ›  技术社区  ›  jagamot

为ThreadPoolExecutor处理异常

  •  15
  • jagamot  · 技术社区  · 15 年前

    我有以下代码片段,它基本上扫描了需要执行的任务列表,然后将每个任务交给执行者执行。

    这个 JobExecutor 然后创建另一个执行器(用于执行db任务…将数据读写到队列)并完成任务。

    作业执行器 返回A Future<Boolean> 对于提交的任务。当其中一个任务失败时,我希望通过捕获所有异常来优雅地中断所有线程并关闭执行器。我需要做什么改变?

    public class DataMovingClass {
        private static final AtomicInteger uniqueId = new AtomicInteger(0);
    
      private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();   
    
      ThreadPoolExecutor threadPoolExecutor  = null ;
    
       private List<Source> sources = new ArrayList<Source>();
    
        private static class IDGenerator extends ThreadLocal<Integer> {
            @Override
            public Integer get() {
                return uniqueId.incrementAndGet();
            }
      }
    
      public void init(){
    
        // load sources list
    
      }
    
      public boolean execute() {
    
        boolean succcess = true ; 
        threadPoolExecutor = new ThreadPoolExecutor(10,10,
                    10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("DataMigration-" + uniqueNumber.get());
                            return t;
                        }// End method
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
    
         List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();
    
         for (Source source : sources) {
                        result.add(threadPoolExecutor.submit(new JobExecutor(source)));
         }
    
         for (Future<Boolean> jobDone : result) {
                    try {
                        if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
                            // in case of successful DbWriterClass, we don't need to change
                            // it.
                            success = false;
                        }
                    } catch (Exception ex) {
                        // handle exceptions
                    }
                }
    
      }
    
      public class JobExecutor implements Callable<Boolean>  {
    
            private ThreadPoolExecutor threadPoolExecutor ;
            Source jobSource ;
            public SourceJobExecutor(Source source) {
                this.jobSource = source;
                threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                        new ThreadFactory() {
                            public Thread newThread(Runnable r) {
                                Thread t = new Thread(r);
                                t.setName("Job Executor-" + uniqueNumber.get());
                                return t;
                            }// End method
                        }, new ThreadPoolExecutor.CallerRunsPolicy());
            }
    
            public Boolean call() throws Exception {
                boolean status = true ; 
                System.out.println("Starting Job = " + jobSource.getName());
                try {
    
                            // do the specified task ; 
    
    
                }catch (InterruptedException intrEx) {
                    logger.warn("InterruptedException", intrEx);
                    status = false ;
                } catch(Exception e) {
                    logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
                    status = false ;
                }
               System.out.println("Ending Job = " + jobSource.getName());
                return status ;
            }
        }
    }   
    
    3 回复  |  直到 8 年前
        1
  •  14
  •   Ravindra babu    9 年前

    当你向执行者提交任务时,它会返回一个 FutureTask 实例。

    FutureTask.get() 将作为 ExecutorException .

    所以当你在 List<Future> 打电话给每个人,接住 执行者例外 启动有序关闭。

        2
  •  4
  •   Community CDub    8 年前

    因为您要将任务提交给 ThreadPoolExecutor ,异常被 FutureTask .

    看看这个 code

    **Inside FutureTask$Sync**
    
    void innerRun() {
        if (!compareAndSetState(READY, RUNNING))
            return;
    
      runner = Thread.currentThread();
        if (getState() == RUNNING) { // recheck after setting thread
            V result;
           try {
                result = callable.call();
            } catch (Throwable ex) {
               setException(ex);
                return;
            }
           set(result);
        } else {
            releaseShared(0); // cancel
        }
    

    }

    protected void setException(Throwable t) {
       sync.innerSetException(t);
    }
    

    从上面的代码来看,很明显 setException 方法捕获 Throwable . 因为这个原因, 未来任务 如果你使用 submit() 方法 线程池

    按Java documentation ,您可以扩展 afterExecute() 方法在 线程池

    protected void afterExecute(Runnable r,
                                Throwable t) 
    

    文档中的示例代码:

    class ExtendedExecutor extends ThreadPoolExecutor {
       // ...
       protected void afterExecute(Runnable r, Throwable t) {
         super.afterExecute(r, t);
         if (t == null && r instanceof Future<?>) {
           try {
             Object result = ((Future<?>) r).get();
           } catch (CancellationException ce) {
               t = ce;
           } catch (ExecutionException ee) {
               t = ee.getCause();
           } catch (InterruptedException ie) {
               Thread.currentThread().interrupt(); // ignore/reset
           }
         }
         if (t != null)
           System.out.println(t);
       }
     }
    

    你可以抓住 Exceptions 在三个方面

    1. Future.get() 如公认答案所示
    2. 包整 run() call() 方法在 try{}catch{}Exceptoion{} 阻碍
    3. 重写 afterExecute 属于 线程池 方法如上所示

    要优雅地中断其他线程,请查看以下SE问题:

    How to stop next thread from running in a ScheduledThreadPoolExecutor

    How to forcefully shutdown java ExecutorService

        3
  •  2
  •   Ravindra babu    8 年前

    子类 ThreadPoolExecutor 并覆盖它 protected afterExecute (Runnable r, Throwable t) 方法。

    如果要通过 java.util.concurrent.Executors 便利类(你不是),看看它的源代码,看看它是如何调用的 ThreadPoolExecutor .