代码之家  ›  专栏  ›  技术社区  ›  Hearen

如何在不丢失通知的情况下正确使用条件来等待和通知所有人?

  •  0
  • Hearen  · 技术社区  · 7 年前

    我有一组任务,它们分为两部分 enque deque 以确保 有资格的 (不止一次)先跑然后 然后 其他按顺序限定的(类似于优先级队列)。

    恩克 ,将检查任务,并且仅 qualified 任务将被执行,而另一个 unqualified 此路不通。

    德克 ,已完成的任务将从 queue 然后 notifyAll 阻塞的(实际上所有其他线程都要选择 有资格的 )中。

    下面是一个简单的演示,演示我要实现的目标:

    class MyTaskQueue {
        private static final Object THE_QUEUE_LOCK = new Object();
        public static Map<String, ReentrantLock> taskGroupLock = new HashMap<>();
        public static Map<String, Condition> taskGroupCondition = new HashMap<>();
    
        public static void enque(String name, String taskId) {
            synchronized (THE_QUEUE_LOCK) {
                taskGroupLock.putIfAbsent(name, new ReentrantLock());
                taskGroupCondition.putIfAbsent(name, taskGroupLock.get(name).newCondition());
            }
            synchronized (taskGroupLock.get(name)) {
                while (true) {
                    if (isValid(taskId)) {
                        break; // Go!!;
                    } else {
                        try {
                            taskGroupCondition.get(name).wait(); // blocked if it's not allowed;
                        } catch (InterruptedException ignored) {
                            ignored.printStackTrace();
                        }
                    }
                }
            }
        }
    
        public static void deque(String name, String taskId) {
            if (taskGroup.containsKey(name) && taskGroup.get(name).contains(taskId)) {
                synchronized (THE_QUEUE_LOCK) {
                    taskGroup.get(name).remove(taskId);
                    if (taskGroup.get(name).isEmpty()) {
                        taskGroup.remove(name);
                    }
                    synchronized (taskGroupLock.get(name)) {
                        taskGroupCondition.get(name).notifyAll();
                    }
                }
            }
        }
    
    }
    

    目前,只有第一个任务将被执行,尽管我检查了所有其他任务(至少大多数任务)都被正确阻止。

    但当我检查 taskGroupCondition.get(name) ,和 firstWaiter lastWaiter 两者都是 null 是的。

    我错过了什么?

    任何帮助都将不胜感激。

    1 回复  |  直到 7 年前
        1
  •  1
  •   gthanop    7 年前

    如果我理解正确,你会问:

    1. 从您的 MyTaskQueue 开始运行的权限(通过 enque 方法)。
    2. 恩克 方法 我的任务队列 块,直到请求运行的任务获得限定为止。
    3. 每个限定线程声明为 我的任务队列 它已经结束运行,通过调用 deque 方法。
    4. 德克 方法通知所有其他任务,以便检查哪些任务是合格的,而哪些任务则开始运行。

    然后我可以看到以下解决方案:

    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Random;
    import java.util.Set;
    
    public class MyTaskQueue {
    
        private final Map<String, Set<String>> runningTasks;
        private String qualifiedTaskId;
    
        public MyTaskQueue(String initialQualifiedTaskId) {
            runningTasks = new HashMap<>();
            qualifiedTaskId = initialQualifiedTaskId;
        }
    
        private synchronized boolean isValid(String taskId) {
            return qualifiedTaskId != null && taskId != null && taskId.equals(qualifiedTaskId); //Do your qualification tests here...
        }
    
        public synchronized void setQualifiedTaskId(String qualifiedTaskId) {
            this.qualifiedTaskId = qualifiedTaskId;
            notifyAll(); //Now that the qualification test changed, is time to notify every blocked task.
            //This way, all new qualified tasks will also be started. This "notifyAll()" operation is optional.
        }
    
        public synchronized void enque(String task, String taskId) {
            while (!isValid(taskId)) { //Reentrant lock.
                System.out.println("Blocking unqualified task {\"" + task + "\", \"" + taskId + "\"}...");
                try { wait(); } catch (InterruptedException ie) { /*Handle the exception...*/ }
            }
            runningTasks.putIfAbsent(task, new HashSet<>());
            runningTasks.get(task).add(taskId);
            System.out.println("Starting qualified task {\"" + task + "\", \"" + taskId + "\"}...");
        }
    
        //Optional method. Might be needed for example if a Thread
        //wants to check if another task is currently running...
        public synchronized boolean isRunning(String task, String taskId) {
            return runningTasks.containsKey(task) && runningTasks.get(task).contains(taskId);
        }
    
        public synchronized void deque(String task, String taskId) {
            if (isRunning(task, taskId)) { //Reentrant lock.
    
                //Cleanup:
                runningTasks.get(task).remove(taskId);
                if (runningTasks.get(task).isEmpty())
                    runningTasks.remove(task);
    
                //Notify all blocked tasks:
                notifyAll();
            }
        }
    
        public static void main(final String[] args) {
            MyTaskQueue q = new MyTaskQueue("qualified");
            Random rand = new Random();
            new MyThread(q, "Task1", "qualified222", 2500 + rand.nextInt(500)).start();
            new MyThread(q, "Task2", "qualified222", 2500 + rand.nextInt(500)).start();
            new MyThread(q, "Task3", "qualified", 2500 + rand.nextInt(500)).start();
            new MyThread(q, "Task4", "qualified", 2500 + rand.nextInt(500)).start();
            new MyThread(q, "Task5", "foreverBlocked", 2500 + rand.nextInt(500)).start();
            try { Thread.sleep(3000); } catch (InterruptedException ie) { /*Handle the exception...*/ }
            synchronized (q) {
                System.out.println("Qualifying tasks of id \"qualified222\"...");
                q.setQualifiedTaskId("qualified222"); //Reentrant lock.
            }
            //Execution of main method never ends, because of the forever blocked task "Task5".
            //The "Task5" still runs while waiting for permission... See MyThread for details...
        }
    }
    

    然后 MyThread 以下内容:

    public class MyThread extends Thread {
        private final String task, taskId;
        private final int actionTime; //Dummy uptime to simulate.
        private final MyTaskQueue q;
    
        public MyThread(MyTaskQueue q, String task, String taskId, int actionTime) {
            this.q = q;
            this.task = task;
            this.taskId = taskId;
            this.actionTime = actionTime;
        }
    
        @Override
        public void run() {
            q.enque(task, taskId); //Wait for permission to run...
            System.out.println("Task {\"" + task + "\", \"" + taskId + "\"} is currently running...");
    
            //Now lets actually execute the task of the Thread:
            try { Thread.sleep(actionTime); } catch (InterruptedException ie) { /*Handle the exception.*/ }
    
            q.deque(task, taskId); //Declare Thread ended.
        }
    }
    

    神话 是执行所需实际操作的类。

    为了简单起见,我假设一个任务的id等于一个变量(即 qualifiedTaskId )中。

    还有一个 main 方法来测试代码。

    遵循示例输出(我对行进行了编号):

    1. 阻止非限定任务{“task1”,“qualified222”}
    2. 阻止非限定任务{“task5”,“blocked”}
    3. 启动限定任务{“task4”,“qualified”}
    4. 任务{“task4”,“qualified”}当前正在运行…
    5. 正在启动限定任务{“task3”,“qualified”}
    6. 任务{“task3”,“qualified”}当前正在运行…
    7. 阻塞非限定任务{“task2”,“qualified222”}
    8. 阻止非限定任务{“task2”,“qualified222”}
    9. 阻塞非限定任务{“task5”,“blocked”}
    10. 阻止非限定任务{“task1”,“qualified222”}
    11. 阻止非限定任务{“task1”,“qualified222”}
    12. 阻止非限定任务{“task5”,“blocked”}
    13. 阻止非限定任务{“task2”,“qualified222”}
    14. ID为“qualified222”的限定任务…
    15. 正在启动限定任务{“task2”,“qualified222”}
    16. 任务{“task2”,“qualified222”}当前正在运行…
    17. 阻塞非限定任务{“task5”,“blocked”}
    18. 正在启动限定任务{“task1”,“qualified222”}
    19. 任务{“task1”,“qualified222”}当前正在运行…
    20. 阻塞非限定任务{“task5”,“blocked”}
    21. 阻塞非限定任务{“task5”,“blocked”}

    如您所见,第1行到第7行是每个线程的初始消息。
    然后,调用第8行到第10行,因为一个合格的任务结束了(所以它们被重新阻塞)。
    然后,调用第11行到第13行,因为另一个合格的任务结束了(所以它们被重新阻塞)。
    然后,在第14行到第19行中,鉴定测试发生变化,新的鉴定任务开始运行。还有一项任务( Task5 )还不合格。
    最后,由于任务id等于 "qualified222" 结束。