代码之家  ›  专栏  ›  技术社区  ›  Matt Wonlaw

无法使缓存线程池具有大小限制?

  •  113
  • Matt Wonlaw  · 技术社区  · 15 年前

    似乎不可能使缓存线程池的线程数限制为它可以创建的线程数。

    下面是如何在标准Java库中实现静态RealCuth.NexCaseDeTeCuthCub:

     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    因此,使用该模板继续创建固定大小的缓存线程池:

    new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());
    

    现在,如果你用这个提交3个任务,一切都会好起来的。提交任何进一步的任务将导致被拒绝的执行异常。

    尝试这个:

    new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());
    

    将导致所有线程按顺序执行。也就是说,线程池永远不会生成多个线程来处理您的任务。

    这是线程池执行器的执行方法中的错误?或者这是故意的?还是有别的办法?

    编辑:我想要与缓存线程池完全相同的东西(它会根据需要创建线程,然后在超时后终止线程),但对它可以创建的线程数有限制,并且在达到线程限制后可以继续对其他任务进行排队。根据Sjlee的回应,这是不可能的。查看threadpoolExecutor的execute()方法确实是不可能的。我需要子类化threadpoolexecutor并重写execute(),就像swingworker一样,但swingworker在其execute()中所做的是一个完整的黑客。

    11 回复  |  直到 15 年前
        1
  •  214
  •   sjlee    6 年前

    线程池执行器有以下几个关键行为,您的问题可以通过这些行为来解释。

    任务提交后,

    1. 如果线程池未达到核心大小,则会创建新的线程。
    2. 如果已达到核心大小,并且没有空闲线程,则它将任务排队。
    3. 如果已达到核心大小,则没有空闲线程,并且队列已满,则会创建新线程(直到达到最大大小)。
    4. 如果已达到最大大小,则不存在空闲线程,并且队列已满,则将启动拒绝策略。

    在第一个示例中,请注意,SynchronousQueue的大小基本上为0。因此,当您达到最大尺寸(3)时,拒绝策略开始(4)。

    在第二个示例中,选择的队列是大小不受限制的LinkedBlockingQueue。因此,你会被行为所困扰。

    您不能对缓存类型或固定类型进行太多的修补,因为它们的行为几乎是完全确定的。

    如果您想要有一个有界的动态线程池,您需要使用一个正的核心大小和最大大小以及一个有限大小的队列。例如,

    new ThreadPoolExecutor(10, // core size
        50, // max size
        10*60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(20)); // queue with a size
    

    补遗 :这是一个相当古老的答案,当核心大小为0时,JDK似乎改变了它的行为。由于JDK1.6,如果核心大小为0,并且池中没有任何线程,那么threadpoolExecutor将添加一个线程来执行该任务。因此,核心大小0是上述规则的一个例外。谢谢 Steve 对于 bringing 这引起了我的注意。

        2
  •  57
  •   user1046052    13 年前

    除非我遗漏了什么,否则原始问题的解决方法很简单。以下代码实现了原始海报所描述的所需行为。它将生成最多5个线程来处理一个无边界队列,空闲线程将在60秒后终止。

    tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>());
    tp.allowCoreThreadTimeOut(true);
    
        3
  •  7
  •   s.d    11 年前

    有同样的问题。 由于没有其他答案能将所有问题结合在一起,因此我将添加我的:

    现在写得很清楚了 docs :如果使用不阻塞的队列( LinkedBlockingQueue )max threads设置无效,只使用核心线程。

    所以:

    public class MyExecutor extends ThreadPoolExecutor {
    
        public MyExecutor() {
            super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            allowCoreThreadTimeOut(true);
        }
    
        public void setThreads(int n){
            setMaximumPoolSize(Math.max(1, n));
            setCorePoolSize(Math.max(1, n));
        }
    
    }
    

    执行人有:

    1. 当我们使用无边界队列时,没有max线程的概念。这是一件好事,因为如果执行器遵循其通常的策略,这样的队列可能会导致执行器创建大量的非核心、额外的线程。

    2. 最大大小的队列 Integer.MAX_VALUE . Submit() 将投掷 RejectedExecutionException 如果挂起的任务数超过 integer.max_值 . 不确定我们会先耗尽内存,否则会发生这种情况。

    3. 有4个核心线程。如果空闲5秒,空闲核心线程将自动退出。因此,是的,严格按需线程。可以使用 setThreads() 方法。

    4. 确保最小核心线程数不少于一个,否则 submit() 将拒绝所有任务。由于核心线程需要为>=max线程,因此该方法 StTeRead() 设置max线程,尽管max线程设置对于无边界队列无效。

        4
  •  6
  •   brianegge    15 年前

    在第一个示例中,后续任务将被拒绝,因为 AbortPolicy 是默认的 RejectedExecutionHandler . threadpoolExecutor包含以下策略,您可以通过 setRejectedExecutionHandler 方法:

    CallerRunsPolicy
    AbortPolicy
    DiscardPolicy
    DiscardOldestPolicy
    

    听起来您希望缓存线程池带有CallerRunsPolicy。

        5
  •  5
  •   Pops Atula    13 年前

    这里的答案都没有解决我的问题,这与使用Apache的HTTP客户端(3.x版本)创建有限数量的HTTP连接有关。由于我花了几个小时才找到一个好的设置,我将分享:

    private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
      Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    

    这创造了一个 ThreadPoolExecutor 它从5开始,最多可同时运行10个线程,使用 CallerRunsPolicy 用于执行。

        6
  •  3
  •   Jonathan Feinberg    15 年前

    根据JavaDoc for ThreadPool Executor:

    如果运行的线程多于corepoolsize但少于maximumpoolsize,则将创建一个新线程。 仅当队列已满时 . 通过将corepoolsize和maximumpoolsize设置为相同,可以创建固定大小的线程池。

    (强调我的)

    抖动的答案是你想要的,尽管我的答案是你的另一个问题。:)

        7
  •  2
  •   Ashkrit Sharma    13 年前

    还有一个选择。您也可以使用任何其他队列,而不是使用新的SynchronousQueue,但必须确保其大小为1,这样将强制ExecutorService创建新线程。

        8
  •  2
  •   Stuart    13 年前

    看起来似乎没有任何答案能够真正回答这个问题——事实上,我看不到这样做的方法——即使您从PooledExecutorService子类,因为许多方法/属性都是私有的,例如,使AddingFunderMaximumPoolSize受到保护,您也可以执行以下操作:

    class MyThreadPoolService extends ThreadPoolService {
        public void execute(Runnable run) {
            if (poolSize() == 0) {
                if (addIfUnderMaximumPoolSize(run) != null)
                    return;
            }
            super.execute(run);
        }
    }
    

    我得到的最接近的是这个-但即使这不是一个很好的解决方案

    new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
        public void execute(Runnable command) {
            if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
                super.setCorePoolSize(super.getCorePoolSize() + 1);
            }
            super.execute(command);
        }
    
        protected void afterExecute(Runnable r, Throwable t) {
             // nothing in the queue
             if (getQueue().isEmpty() && getPoolSize() > min) {
                 setCorePoolSize(getCorePoolSize() - 1);
             }
        };
     };
    

    P.S.未进行上述测试

        9
  •  2
  •   Community CDub    8 年前

    这就是你想要的(至少我想是这样)。用于解释检查 Jonathan Feinberg answer

    Executors.newFixedThreadPool(int n)

    创建一个线程池,该线程池重用在共享的无边界队列上运行的固定数量的线程。在任何时候,最多第n个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到有一个线程可用。如果任何线程在关闭之前由于执行过程中的失败而终止,则在需要执行后续任务时,将替换一个新的线程。池中的线程将存在,直到显式关闭为止。

        10
  •  1
  •   Stuart    13 年前

    这是另一个解决方案。我认为这个解决方案的行为符合您的要求(尽管并不为这个解决方案感到骄傲):

    final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
        public boolean offer(Runnable o) {
            if (size() > 1)
                return false;
            return super.offer(o);
        };
    
        public boolean add(Runnable o) {
            if (super.offer(o))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    };
    
    RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            queue.add(r);
        }
    };
    
    dbThreadExecutor =
            new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
    
        11
  •  0
  •   Community CDub    8 年前
    1. 你可以使用 ThreadPoolExecutor 按照建议 @西丽

      您可以动态控制池的大小。请查看此问题以了解更多详细信息:

      Dynamic Thread Pool

    2. 你可以使用 newWorkStealingPool API,它是用Java 8引入的。

      public static ExecutorService newWorkStealingPool()
      

      使用所有可用处理器作为目标并行度级别创建窃取线程池的工作。

    默认情况下,并行度级别设置为服务器中CPU核心的数量。如果您有4个核心CPU服务器,线程池大小将为4。此API返回 ForkJoinPool 类型 ExecutorService 并允许通过从ForkJoinPool中的忙线程窃取任务来窃取空闲线程。