我有一个程序有奇怪的并发错误。
此程序的作用:
-
EVENT_LOOP_PAUSE_DURATION_IN_MS
.
-
TaskProcessor
-
每个
500 ms
打印执行器的队列大小。
taskId
. 因此,当我在队列中添加任务时,我检查任务是否已经存在。如果没有任务,我就加上它。在任务处理结束时,我从
activeTasks
地图。
ERROR: 50
ERROR: 70
ERROR: 80
ERROR: 90
ERROR: 110
ERROR: 120
ERROR: 120
ERROR: 140
所以,有个虫子。我不知道为什么,但线程池队列的大小正在无限增加。
可以看到,我在程序的2点中删除活动任务:
-
在
finally
任务处理器
-
我在事件循环中移除过时的任务。
所以,如果我删除了代码,它删除了第(2)点的任务,那么bug就消失了。我不理解这种行为。
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Program {
private static final int NUMBER_OF_TASKS = 40;
private static final int NUMBER_OF_THREADS = 10;
private static final long EVENT_LOOP_PAUSE_DURATION_IN_MS = 40L;
class QueueSizePrinter extends Thread {
private final LinkedBlockingQueue<Runnable> workQueue;
public QueueSizePrinter(LinkedBlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
while (true) {
int qSize = workQueue.size();
if (qSize > NUMBER_OF_TASKS) {
System.out.println("ERROR: " + qSize);
}
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class TaskProcessor implements Runnable {
private final String currentTaskId;
private final ConcurrentHashMap<String, Long> activeTasks;
public TaskProcessor(String currentTaskId, ConcurrentHashMap<String, Long> activeTasks) {
this.currentTaskId = currentTaskId;
this.activeTasks = activeTasks;
}
@Override
public void run() {
try {
// emulate of useful work
Thread.sleep(300L);
} catch (Exception e) {
System.out.println("error: " + e.toString());
} finally {
activeTasks.remove(currentTaskId); // (1)
}
}
}
public void program() {
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
ExecutorService executor = new ThreadPoolExecutor(NUMBER_OF_THREADS, NUMBER_OF_THREADS, 0L, TimeUnit.MILLISECONDS, workQueue);
Set<String> initialTasks = ConcurrentHashMap.newKeySet();
for (int currentTaskIndex = 0; currentTaskIndex < NUMBER_OF_TASKS; currentTaskIndex++) {
initialTasks.add(String.valueOf(currentTaskIndex));
}
new QueueSizePrinter(workQueue).start();
ConcurrentHashMap<String, Long> activeTasks = new ConcurrentHashMap<>();
while (true) {
initialTasks.forEach((currentTaskId) -> {
if (!activeTasks.containsKey(currentTaskId)) {
activeTasks.put(currentTaskId, System.currentTimeMillis());
executor.submit(new TaskProcessor(currentTaskId, activeTasks));
}
});
// (2)
activeTasks.entrySet().removeIf(entry -> {
boolean hasDelete = System.currentTimeMillis() - entry.getValue() > 1000;
if (hasDelete) {
//System.out.println("DELETE id=" + entry.getKey());
}
return hasDelete;
});
try {
Thread.sleep(EVENT_LOOP_PAUSE_DURATION_IN_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Program main = new Program();
main.program();
}
}