该实现通过允许并行处理多个任务(达到并发设置的限制)来解决并发控制问题,动态调整任务优先级,并提供取消任务的方法。队列总是在新任务出列之前进行排序,以确保按正确的优先级顺序处理任务。
class FixedAsyncPriorityQueue {
constructor(concurrency = 1) {
this.queue = [];
this.concurrency = concurrency;
this.currentTasks = new Set();
this.taskCounter = 0;
}
// Add task to the queue with a given priority
add(task, priority) {
const taskId = this.taskCounter++;
this.queue.push({ task, taskId, priority });
this.processQueue();
return taskId;
}
// Re-sort the queue when priorities change
sortQueue() {
this.queue.sort((a, b) => a.priority - b.priority);
}
// Process the queue with concurrency control
async processQueue() {
while (this.queue.length > 0 && this.currentTasks.size < this.concurrency) {
this.sortQueue();
const { task, taskId } = this.queue.shift();
this.currentTasks.add(taskId);
task().then(() => {
this.currentTasks.delete(taskId);
this.processQueue();
}).catch(error => {
console.error(`Task ${taskId} failed:`, error);
this.currentTasks.delete(taskId);
this.processQueue();
});
}
}
// Adjust the priority of a specific task
adjustPriority(taskId, newPriority) {
const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
if (taskIndex > -1) {
this.queue[taskIndex].priority = newPriority;
this.sortQueue();
}
}
// Cancel a task
cancelTask(taskId) {
const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
if (taskIndex > -1) {
this.queue.splice(taskIndex, 1);
}
}
}
// Usage example
const myQueue = new FixedAsyncPriorityQueue(2); // Set concurrency to 2
// Example tasks
const task1 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 1'));
const task2 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 2'));
const taskId1 = myQueue.add(task1, 2);
const taskId2 = myQueue.add(task2, 1);