代码之家  ›  专栏  ›  技术社区  ›  leeeroy

ExecutorService(特别是ThreadPoolExecutor)是线程安全的吗?

  •  85
  • leeeroy  · 技术社区  · 16 年前

    ExecutorService

    我将从不同线程向同一ThreadPoolExecutor提交作业,在交互/提交任务之前,我必须同步对执行器的访问吗?

    6 回复  |  直到 7 年前
        1
  •  55
  •   Luke Usherwood Jigar Joshi    11 年前

    (与其他答案相反)线程安全合同 interface javadoc(与javadoc的方法相对)。例如,在底部 ExecutorService 您找到的javadoc:

    向ExecutorService提交可运行或可调用的任务 发生之前 发生之前 通过Future.get()检索结果。

    这足以回答这个问题:

    “在交互/提交任务之前,我必须同步对执行器的访问吗?”

    ExecutorService

    是一个 并发 实用程序,也就是说,它的设计目的是在不需要同步的情况下最大限度地运行,以提高性能。(同步会导致线程争用,这会降低多线程效率,特别是在扩展到大量线程时。)

    无法保证未来任务将在什么时候执行或完成(有些甚至可能在提交任务的同一线程上立即执行),但可以保证工作线程已经看到提交线程执行的所有效果 因此,(运行的线程)您的任务还可以安全地读取为其使用而创建的任何数据,而无需同步、线程安全类或任何其他形式的“安全发布”。提交任务的行为本身就足以将输入数据“安全发布”到任务中。您只需要确保在任务运行时不会以任何方式修改输入数据。

    同样,当您通过以下方式获取任务结果时 Future.get()

    该合同还意味着任务本身可以提交更多任务。

    “ExecutorService保证线程安全吗?”

    现在,这部分问题要笼统得多。例如,找不到关于该方法的线程安全契约的任何声明 shutdownAndAwaitTermination

    顺便说一句,我推荐《Java并发实践》一书,作为并发编程世界的良好基础。

        2
  •  31
  •   Kevin Bourrillion Gergely    16 年前

    确实,所讨论的JDK类似乎没有明确保证线程安全的任务提交。然而,在实践中,库中的所有ExecutorService实现确实以这种方式是线程安全的。我认为依赖这个是合理的。由于实现这些功能的所有代码都放在公共领域,因此任何人都没有动机以不同的方式完全重写它。

        3
  •  9
  •   Adamski    16 年前

    你的问题相当开放:所有 ExecutorService 接口确实保证某个地方的某个线程会处理提交的 Runnable Callable

    如果提交 可运行的 / 可调用的 引用可从其他人访问的共享数据结构 可运行的 / 可调用的 以确保跨此数据结构的线程安全。

    BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>();
    ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ);
    ...
    execService.submit(new Callable(...));
    

    编辑

    根据Brian的评论,如果我误解了你的问题:将多个生产者线程的任务提交给 通常是线程安全的(尽管据我所知,在接口的API中没有明确提到)。任何不提供线程安全的实现在多线程环境中都是无用的(因为多个生产者/多个消费者是一种相当常见的范式),这就是 java.util.concurrent

        4
  •  6
  •   Scott S. McCoy    15 年前

    ThreadPoolExecutor 答案很简单 . ExecutorService 两者都被明确记录为线程安全的。此外, 使用以下方式管理其作业队列 java.util.concurrent.BlockingQueue java.util.concurrent.* 实施 BlockingQueue

    所以你的标题问题的答案很明显 可能

        5
  •  2
  •   Community Mohan Dere    8 年前

    与答案相反 Luke Usherwood ExecutorService ThreadPoolExecutor 具体来说,请参阅其他答案。

    先行发生 指定了关系,但这并不意味着方法本身的线程安全性,正如注释所述 Miles .In 卢克·乌斯伍德 他的回答是,前者足以证明后者,但没有提出实际的论点。

    “线程安全”可能意味着各种各样的东西,但这里有一个简单的反例 Executor 执行器服务 先行发生 关系,但由于对的访问不同步,因此不是线程安全的 count

    class CountingDirectExecutor implements Executor {
    
        private int count = 0;
    
        public int getExecutedTaskCount() {
            return count;
        }
    
        public void execute(Runnable command) {
            command.run();
        }
    }
    

    免责声明:我不是专家,我发现这个问题是因为我自己在寻找答案。

        6
  •  2
  •   salexinx    7 年前

    对于ThreadPoolExecutor,它的提交是线程安全的。您可以在jdk8中看到源代码。添加新任务时,它使用mainLock来确保线程安全。

    private boolean addWorker(Runnable firstTask, boolean core) {
                retry:
                for (;;) {
                    int c = ctl.get();
                    int rs = runStateOf(c);
    
                    // Check if queue empty only if necessary.
                    if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                           firstTask == null &&
                           ! workQueue.isEmpty()))
                        return false;
    
                    for (;;) {
                        int wc = workerCountOf(c);
                        if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                            return false;
                        if (compareAndIncrementWorkerCount(c))
                            break retry;
                        c = ctl.get();  // Re-read ctl
                        if (runStateOf(c) != rs)
                            continue retry;
                        // else CAS failed due to workerCount change; retry inner loop
                    }
                }
    
                boolean workerStarted = false;
                boolean workerAdded = false;
                Worker w = null;
                try {
                    w = new Worker(firstTask);
                    final Thread t = w.thread;
                    if (t != null) {
                        final ReentrantLock mainLock = this.mainLock;
                        mainLock.lock();
                        try {
                            // Recheck while holding lock.
                            // Back out on ThreadFactory failure or if
                            // shut down before lock acquired.
                            int rs = runStateOf(ctl.get());
    
                            if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                                if (t.isAlive()) // precheck that t is startable
                                    throw new IllegalThreadStateException();
                                workers.add(w);
                                int s = workers.size();
                                if (s > largestPoolSize)
                                    largestPoolSize = s;
                                workerAdded = true;
                            }
                        } finally {
                            mainLock.unlock();
                        }
                        if (workerAdded) {
                            t.start();
                            workerStarted = true;
                        }
                    }
                } finally {
                    if (! workerStarted)
                        addWorkerFailed(w);
                }
                return workerStarted;
            }