如果我理解正确,你会问:
-
从您的
MyTaskQueue
开始运行的权限(通过
enque
方法)。
-
恩克
方法
我的任务队列
块,直到请求运行的任务获得限定为止。
-
每个限定线程声明为
我的任务队列
它已经结束运行,通过调用
deque
方法。
-
德克
方法通知所有其他任务,以便检查哪些任务是合格的,而哪些任务则开始运行。
然后我可以看到以下解决方案:
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
public class MyTaskQueue {
private final Map<String, Set<String>> runningTasks;
private String qualifiedTaskId;
public MyTaskQueue(String initialQualifiedTaskId) {
runningTasks = new HashMap<>();
qualifiedTaskId = initialQualifiedTaskId;
}
private synchronized boolean isValid(String taskId) {
return qualifiedTaskId != null && taskId != null && taskId.equals(qualifiedTaskId); //Do your qualification tests here...
}
public synchronized void setQualifiedTaskId(String qualifiedTaskId) {
this.qualifiedTaskId = qualifiedTaskId;
notifyAll(); //Now that the qualification test changed, is time to notify every blocked task.
//This way, all new qualified tasks will also be started. This "notifyAll()" operation is optional.
}
public synchronized void enque(String task, String taskId) {
while (!isValid(taskId)) { //Reentrant lock.
System.out.println("Blocking unqualified task {\"" + task + "\", \"" + taskId + "\"}...");
try { wait(); } catch (InterruptedException ie) { /*Handle the exception...*/ }
}
runningTasks.putIfAbsent(task, new HashSet<>());
runningTasks.get(task).add(taskId);
System.out.println("Starting qualified task {\"" + task + "\", \"" + taskId + "\"}...");
}
//Optional method. Might be needed for example if a Thread
//wants to check if another task is currently running...
public synchronized boolean isRunning(String task, String taskId) {
return runningTasks.containsKey(task) && runningTasks.get(task).contains(taskId);
}
public synchronized void deque(String task, String taskId) {
if (isRunning(task, taskId)) { //Reentrant lock.
//Cleanup:
runningTasks.get(task).remove(taskId);
if (runningTasks.get(task).isEmpty())
runningTasks.remove(task);
//Notify all blocked tasks:
notifyAll();
}
}
public static void main(final String[] args) {
MyTaskQueue q = new MyTaskQueue("qualified");
Random rand = new Random();
new MyThread(q, "Task1", "qualified222", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task2", "qualified222", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task3", "qualified", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task4", "qualified", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task5", "foreverBlocked", 2500 + rand.nextInt(500)).start();
try { Thread.sleep(3000); } catch (InterruptedException ie) { /*Handle the exception...*/ }
synchronized (q) {
System.out.println("Qualifying tasks of id \"qualified222\"...");
q.setQualifiedTaskId("qualified222"); //Reentrant lock.
}
//Execution of main method never ends, because of the forever blocked task "Task5".
//The "Task5" still runs while waiting for permission... See MyThread for details...
}
}
然后
MyThread
以下内容:
public class MyThread extends Thread {
private final String task, taskId;
private final int actionTime; //Dummy uptime to simulate.
private final MyTaskQueue q;
public MyThread(MyTaskQueue q, String task, String taskId, int actionTime) {
this.q = q;
this.task = task;
this.taskId = taskId;
this.actionTime = actionTime;
}
@Override
public void run() {
q.enque(task, taskId); //Wait for permission to run...
System.out.println("Task {\"" + task + "\", \"" + taskId + "\"} is currently running...");
//Now lets actually execute the task of the Thread:
try { Thread.sleep(actionTime); } catch (InterruptedException ie) { /*Handle the exception.*/ }
q.deque(task, taskId); //Declare Thread ended.
}
}
神话
是执行所需实际操作的类。
为了简单起见,我假设一个任务的id等于一个变量(即
qualifiedTaskId
)中。
还有一个
main
方法来测试代码。
遵循示例输出(我对行进行了编号):
-
阻止非限定任务{“task1”,“qualified222”}
-
阻止非限定任务{“task5”,“blocked”}
-
启动限定任务{“task4”,“qualified”}
-
任务{“task4”,“qualified”}当前正在运行…
-
正在启动限定任务{“task3”,“qualified”}
-
任务{“task3”,“qualified”}当前正在运行…
-
阻塞非限定任务{“task2”,“qualified222”}
-
阻止非限定任务{“task2”,“qualified222”}
-
阻塞非限定任务{“task5”,“blocked”}
-
阻止非限定任务{“task1”,“qualified222”}
-
阻止非限定任务{“task1”,“qualified222”}
-
阻止非限定任务{“task5”,“blocked”}
-
阻止非限定任务{“task2”,“qualified222”}
-
ID为“qualified222”的限定任务…
-
正在启动限定任务{“task2”,“qualified222”}
-
任务{“task2”,“qualified222”}当前正在运行…
-
阻塞非限定任务{“task5”,“blocked”}
-
正在启动限定任务{“task1”,“qualified222”}
-
任务{“task1”,“qualified222”}当前正在运行…
-
阻塞非限定任务{“task5”,“blocked”}
-
阻塞非限定任务{“task5”,“blocked”}
如您所见,第1行到第7行是每个线程的初始消息。
然后,调用第8行到第10行,因为一个合格的任务结束了(所以它们被重新阻塞)。
然后,调用第11行到第13行,因为另一个合格的任务结束了(所以它们被重新阻塞)。
然后,在第14行到第19行中,鉴定测试发生变化,新的鉴定任务开始运行。还有一项任务(
Task5
)还不合格。
最后,由于任务id等于
"qualified222"
结束。