代码之家  ›  专栏  ›  技术社区  ›  K K

修复动态、自调整的优先级队列

  •  1
  • K K  · 技术社区  · 1 年前

    我试图在JavaScript中实现一个动态的、自调整的优先级队列,该队列与异步任务一起操作。目标是创建一个优先级队列,在该队列中可以随时添加具有给定优先级的任务。然而,复杂性源于任务的优先级可以根据外部异步事件动态更改的要求。

    以下是具体要求:

    动态任务添加: 队列应允许在任何时候添加具有指定优先级的任务(功能对象)。

    异步任务执行: 应使用async/await异步执行任务。

    优先级调整: 应该有一种方法可以根据外部异步触发器(如从API获取数据)调整队列中已存在任务的优先级。队列应根据新的优先级自动重新排序任务。

    并发控制: 系统最多应同时处理N个任务,其中N是可配置的参数。

    事件处理: 系统应该在每个任务完成后发出一个事件,并在不停止队列处理的情况下优雅地处理错误。

    任务取消: 提供一种取消排队任务的机制。

    我已经做了一些初步研究,并了解到这可能涉及异步/等待、承诺的组合,也许还有某种形式的事件处理或反应式编程,但我正在努力找出如何以可扩展和高效的方式构建这个系统。

    有人能为这样一个系统提供指导或参考实施吗?任何对相关模式或库的帮助或指示都将不胜感激。

    class AsyncPriorityQueue {
        constructor() {
            this.queue = [];
            this.processing = false;
        }
    
        // Add task to the queue with a given priority
        add(task, priority) {
            this.queue.push({ task, priority });
            this.queue.sort((a, b) => a.priority - b.priority);
            this.processQueue();
        }
    
        // Process the queue
        async processQueue() {
            if (this.processing || this.queue.length === 0) {
                return;
            }
            this.processing = true;
    
            const { task } = this.queue.shift();
            try {
                await task();
            } catch (error) {
                console.error("Task failed:", error);
            }
    
            this.processing = false;
            this.processQueue();
        }
    
        
        adjustPriority(taskId, newPriority) {}
    }
    
    // Usage Example
    const myQueue = new AsyncPriorityQueue();
    
    // Example tasks
    const task1 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 1'));
    const task2 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 2'));
    
    myQueue.add(task1, 2);
    myQueue.add(task2, 1);
    
    
    1 回复  |  直到 1 年前
        1
  •  0
  •   Mohammad Basit    1 年前

    该实现通过允许并行处理多个任务(达到并发设置的限制)来解决并发控制问题,动态调整任务优先级,并提供取消任务的方法。队列总是在新任务出列之前进行排序,以确保按正确的优先级顺序处理任务。

    class FixedAsyncPriorityQueue {
        constructor(concurrency = 1) {
            this.queue = [];
            this.concurrency = concurrency;
            this.currentTasks = new Set();
            this.taskCounter = 0;
        }
    
        // Add task to the queue with a given priority
        add(task, priority) {
            const taskId = this.taskCounter++;
            this.queue.push({ task, taskId, priority });
            this.processQueue();
            return taskId;
        }
    
        // Re-sort the queue when priorities change
        sortQueue() {
            this.queue.sort((a, b) => a.priority - b.priority);
        }
    
        // Process the queue with concurrency control
        async processQueue() {
            while (this.queue.length > 0 && this.currentTasks.size < this.concurrency) {
                this.sortQueue();
                const { task, taskId } = this.queue.shift();
                this.currentTasks.add(taskId);
    
                task().then(() => {
                    this.currentTasks.delete(taskId);
                    this.processQueue();
                }).catch(error => {
                    console.error(`Task ${taskId} failed:`, error);
                    this.currentTasks.delete(taskId);
                    this.processQueue();
                });
            }
        }
    
        // Adjust the priority of a specific task
        adjustPriority(taskId, newPriority) {
            const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
            if (taskIndex > -1) {
                this.queue[taskIndex].priority = newPriority;
                this.sortQueue();
            }
        }
    
        // Cancel a task
        cancelTask(taskId) {
            const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
            if (taskIndex > -1) {
                this.queue.splice(taskIndex, 1);
            }
        }
    }
    
    // Usage example
    const myQueue = new FixedAsyncPriorityQueue(2); // Set concurrency to 2
    
    // Example tasks
    const task1 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 1'));
    const task2 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 2'));
    
    const taskId1 = myQueue.add(task1, 2);
    const taskId2 = myQueue.add(task2, 1);