代码之家  ›  专栏  ›  技术社区  ›  TR1096

在java中实现优雅取消运行异步作业的最佳方式

  •  0
  • TR1096  · 技术社区  · 8 年前

    假设我有一个这样的接口,用于应用程序中的组件运行作业-

    IJob {
        IResult execute();
        void cancel();
    }
    

    我想设置我的应用程序,以便异步运行这些作业。预期调用cancel应该立即返回执行结果,结果表明它已被取消。

    设置这个的最佳方法是什么?我可以只创建一个Thread对象来运行它,它有其他要取消的方法,但我也在查看Future接口,这是我新接触的。

    FutureTask的问题是cancel不合适,不允许我调用job.cancel()。扩展FutureTask并实现我自己的处理是个好主意吗?

    3 回复  |  直到 8 年前
        1
  •  1
  •   Joe C    8 年前

    当你打电话时 cancel 在您的任务中,它将向运行任务的线程发送中断信号。您的任务需要定期检查该信号是否已发送,并在出现以下情况时做出相应反应:

    if (Thread.interrupted()) {
        performNecessaryCleanup();
        return;
    }
    
        2
  •  1
  •   Spotted    8 年前

    当使用并发时,使用语言提供的内容,而不是手工实现。

    据我所知, ExecutorService 应该是适合您的工具,因为您可以:

    • 提供将异步运行并可能返回结果的it作业
    • 关闭执行器,以便取消所有正在运行的作业

    实例

    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        List<Future<IResult>> results = new ArrayList<>();
    
        for (int i = 0; i < 10; i++) {
            results.add(executor.submit(new Job(i))); //start jobs
        }
    
        executor.shutdownNow(); //attempts to stop all running jobs
    
        //program flow immediatly continues
    }
    

    正如@JoeC在回答中解释的那样,保证所有工作停止的条件是 中断 管理 在…内 每个线程之后的每个作业都将标记为 呼叫时 shutdownNow() .

    if (Thread.interrupted()) {
        //return result cancelled
    }
    
        3
  •  0
  •   Santos Zatarain Vera    8 年前

    使命感 FutureTask.run() 将阻塞当前线程,您需要进行筛选 IJob 未来任务 .

    行程安排 未来任务 是最好的选择,消费者可以致电 FutureTask.get() 等待结果(即使你打电话 IJob.cancel() ).

    我做了一个小小的嘲弄 IJob ,它使用普通线程进行调度,在生产中您应该有一个 执行服务 就像前面的帖子示例一样。

    如您所见,主线程可以检查状态调用 未来任务.isDone() ,基本上您正在检查结果是否已设置。设置结果意味着 IJob 的线程已完成。

    你可以打电话 IJob.cancel() 几乎可以随时完成包裹 IJob 在里面 未来任务 ,如果方法的行为与注释中的相同。

    模拟作业:

    public class MockJob implements IJob {
    
        private boolean cancelled;
    
        public MockJob() {
        }
    
        @Override
        public IResult execute() {
            int count = 0;
            while (!cancelled) {
                try {
                    count++;
                    System.out.println("Mock Job Thread: count = " + count);
                    if (count >= 10) {
                        break;
                    }
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    cancelled = true;
                }
            }
            return new MockResult(cancelled, count);
        }
    
        @Override
        public void cancel() {
            cancelled = true;
        }
    }
    

    模拟结果:

    public class MockResult implements IResult {
    
        private boolean cancelled;
        private int result;
    
        public MockResult(boolean cancelled, int result) {
            this.cancelled = cancelled;
            this.result = result;
        }
    
        public boolean isCancelled() {
            return cancelled;
        }
    
        public int getResult() {
            return result;
        }
    }
    

    主要类别:

    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
            // Job
            IJob mockJob = new MockJob();
    
            // Async task
            FutureTask<IResult> asyncTask = new FutureTask<>(mockJob::execute);
            Thread mockJobThread = new Thread(asyncTask);
    
            // Show result
            Thread showResultThread = new Thread(() -> {
                try {
                    IResult result = asyncTask.get();
                    MockResult mockResult = (MockResult) result;
                    Thread thread = Thread.currentThread();
                    System.out.println(String.format("%s: isCancelled = %s, result = %d",
                            thread.getName(),
                            mockResult.isCancelled(),
                            mockResult.getResult()
                    ));
                } catch (InterruptedException | ExecutionException ex) {
                    // NO-OP
                }
            });
    
            // Check status
            Thread monitorThread = new Thread(() -> {
                try {
                    while (!asyncTask.isDone()) {
                        Thread thread = Thread.currentThread();
                        System.out.println(String.format("%s: asyncTask.isDone = %s",
                                thread.getName(),
                                asyncTask.isDone()
                        ));
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException ex) {
                    // NO-OP
                }
                Thread thread = Thread.currentThread();
                System.out.println(String.format("%s: asyncTask.isDone = %s",
                        thread.getName(),
                        asyncTask.isDone()
                ));
            });
    
            // Async cancel
            Thread cancelThread = new Thread(() -> {
                try {
                    // Play with this Thread.sleep, set to 15000
                    Thread.sleep(5000);
                    if (!asyncTask.isDone()) {
                        Thread thread = Thread.currentThread();
                        System.out.println(String.format("%s: job.cancel()",
                                thread.getName()
                        ));
                        mockJob.cancel();
                    }
                } catch (InterruptedException ex) {
                    // NO-OP
                }
            });
    
            monitorThread.start();
            showResultThread.start();
            cancelThread.setDaemon(true);
            cancelThread.start();
            mockJobThread.start();
        }
    }
    

    输出(线程休眠(5000)):

    Thread-2: asyncTask.isDone = false
    Thread-0: count = 1
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 2
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 3
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 4
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 5
    Thread-3: job.cancel()
    Thread-2: asyncTask.isDone = false
    Thread-1: isCancelled = true, result = 5
    Thread-2: asyncTask.isDone = true
    

    输出(Thread.sleep(15000)):

    Thread-2: asyncTask.isDone = false
    Thread-0: count = 1
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 2
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 3
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 4
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 5
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 6
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 7
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 8
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 9
    Thread-2: asyncTask.isDone = false
    Thread-0: count = 10
    Thread-1: isCancelled = false, result = 10
    Thread-2: asyncTask.isDone = true