代码之家  ›  专栏  ›  技术社区  ›  Max

这个程序中并发的错误是什么?

  •  0
  • Max  · 技术社区  · 6 年前

    我有一个程序有奇怪的并发错误。

    此程序的作用:

    1. EVENT_LOOP_PAUSE_DURATION_IN_MS .
    2. TaskProcessor
    3. 每个 500 ms 打印执行器的队列大小。

    taskId . 因此,当我在队列中添加任务时,我检查任务是否已经存在。如果没有任务,我就加上它。在任务处理结束时,我从 activeTasks 地图。

    ERROR: 50
    ERROR: 70
    ERROR: 80
    ERROR: 90
    ERROR: 110
    ERROR: 120
    ERROR: 120
    ERROR: 140
    

    所以,有个虫子。我不知道为什么,但线程池队列的大小正在无限增加。

    可以看到,我在程序的2点中删除活动任务:

    1. finally 任务处理器
    2. 我在事件循环中移除过时的任务。

    所以,如果我删除了代码,它删除了第(2)点的任务,那么bug就消失了。我不理解这种行为。

    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Program {
    
        private static final int NUMBER_OF_TASKS = 40;
        private static final int NUMBER_OF_THREADS = 10;
        private static final long EVENT_LOOP_PAUSE_DURATION_IN_MS = 40L;
    
        class QueueSizePrinter extends Thread {
    
            private final LinkedBlockingQueue<Runnable> workQueue;
    
            public QueueSizePrinter(LinkedBlockingQueue<Runnable> workQueue) {
                this.workQueue = workQueue;
            }
    
            @Override
            public void run() {
                while (true) {
                    int qSize = workQueue.size();
                    if (qSize > NUMBER_OF_TASKS) {
                        System.out.println("ERROR: " + qSize);
                    }
    
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        class TaskProcessor implements Runnable {
            private final String currentTaskId;
            private final ConcurrentHashMap<String, Long> activeTasks;
    
            public TaskProcessor(String currentTaskId, ConcurrentHashMap<String, Long> activeTasks) {
                this.currentTaskId = currentTaskId;
                this.activeTasks = activeTasks;
            }
    
            @Override
            public void run() {
                try {
                    // emulate of useful work
                    Thread.sleep(300L);
                } catch (Exception e) {
                    System.out.println("error: " + e.toString());
                } finally {
                    activeTasks.remove(currentTaskId); // (1)
                }
            }
        }
    
        public void program() {
    
            LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
            ExecutorService executor = new ThreadPoolExecutor(NUMBER_OF_THREADS, NUMBER_OF_THREADS, 0L, TimeUnit.MILLISECONDS, workQueue);
    
            Set<String> initialTasks = ConcurrentHashMap.newKeySet();
            for (int currentTaskIndex = 0; currentTaskIndex < NUMBER_OF_TASKS; currentTaskIndex++) {
                initialTasks.add(String.valueOf(currentTaskIndex));
            }
    
            new QueueSizePrinter(workQueue).start();
    
            ConcurrentHashMap<String, Long> activeTasks = new ConcurrentHashMap<>();
    
            while (true) {
    
                initialTasks.forEach((currentTaskId) -> {
                    if (!activeTasks.containsKey(currentTaskId)) {
                        activeTasks.put(currentTaskId, System.currentTimeMillis());
    
                        executor.submit(new TaskProcessor(currentTaskId, activeTasks));
                    }
                });
    
                // (2)
                activeTasks.entrySet().removeIf(entry -> {
                    boolean hasDelete = System.currentTimeMillis() - entry.getValue() > 1000;
                    if (hasDelete) {
                        //System.out.println("DELETE id=" + entry.getKey());
                    }
                    return hasDelete;
                });
    
                try {
                    Thread.sleep(EVENT_LOOP_PAUSE_DURATION_IN_MS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            Program main = new Program();
            main.program();
        }
    }
    
    0 回复  |  直到 6 年前
        1
  •  0
  •   miskender    6 年前

    问题在第(2)点,您正在从activeTasks映射中删除过时的任务。但他们仍然被提交给遗嘱执行服务。由于已将其从映射中移除,因此当while循环执行另一个周期时,相同的任务将重新提交给ExecutorService。这导致任务数增加。