代码之家  ›  专栏  ›  技术社区  ›  Shawn

在ConcurrentQueue前面出现消息之前保持请求不起作用

  •  0
  • Shawn  · 技术社区  · 6 年前

    我遇到了一个问题,看起来我的单例并发队列没有按正确的顺序处理项目。我知道这是FIFO,所以我想也许内存中的队列不一样,或者我的dequeue出了问题?我测试这一点的方法是快速向我的API端点触发3个Postman请求。如果有人能帮助我理解为什么它们不互相追逐,我会非常感激的!

    到目前为止,我正倾向于排队。由于第二个和第三个请求似乎在第一个请求出列之前排队,所以Trypeek工作不正常。

    因此,当我运行下面的代码时,我在控制台中看到了下面的输出。

    Queued message: Test 1
    Sending message: Test 1
    Queued message: Test 2
    Sending message: Test 2
    Dequeuing message: Test 2
    Returning response: Test 2
    Queued message: Test 3
    Sending message: Test 3
    Dequeuing message: Test 1
    Returning response: Test 1
    Dequeuing message: Test 3
    Returning response: Test 3
    

    这是我的API控制器方法,它获取一条消息并对该消息进行排队,一旦消息排队,它将等待,直到看到前面是该请求的消息,然后发送该消息,然后将其出列。

    控制器

    [HttpPost]
    [Route("message")]
    public IActionResult SendMessageUser([FromBody]Message request)
    {
        Console.WriteLine($"Queued message: {request.Message}");
        _messageQueue.QueueAndWaitForTurn(request);
        Console.WriteLine($"Sending message: {request.Message}");
        var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
        Console.WriteLine($"Dequeuing message: {request.Message}");
        _messageQueue.DequeueMessage(request);
        Console.WriteLine($"Returning response: {request.Message}");
        return Ok(sendMessageResponse);
    }
    

    至于排队问题,我将以这样的方式约束国际奥委会:

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<IMessageQueue, MessageQueue>();
        services.AddScoped<IMessageService, MessageService>();
    
        services.AddMvc();
    }
    

    这是我的队列类singleton,我在这里使用singleton,因为在应用程序的整个生命周期中,我只希望这个队列有一个实例。

    public class MessageQueue : IMessageQueue
    {
        private Lazy<ConcurrentQueue<Message>> _queue = 
            new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());
    
        public ConcurrentQueue<Message> Queue
        {
            get
            {
                return _queue.Value;
            }
        }
    
        public void QueueAndWaitForTurn(Message message)
        {
            Queue.Enqueue(message);
    
            WaitForTurn();
        }
    
        public bool DequeueMessage(Message message)
        {
            var messageIsDequeued = Queue.TryDequeue(out message);
    
            return messageIsDequeued;
        }
    
        public void WaitForTurn()
        {
            Message myMessage = null;
            var myMessageIsNext = Queue.TryPeek(out myMessage);
    
            while (!Queue.TryPeek(out myMessage))
            {
                Thread.Sleep(1000);
                WaitForTurn();
            }
        }
    }
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   ProgrammingLlama Raveena Sarda    6 年前

    我会创建一种FifoseMaphore:

    public class FifoSemaphore : IDisposable
    {
        private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
        private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
        private readonly object _lockObject = new object();
    
        public async Task WaitAsync()
        {
            // Enqueue a task
            Task resultTask;
            lock (_lockObject)
            {
                var tcs = new TaskCompletionSource<object>();
                resultTask = tcs.Task;
                _taskQueue.Enqueue(tcs);
            }
    
            // Wait for the lock
            await _semaphore.WaitAsync();
    
            // Dequeue the next item and set it to resolved (release back to API call)
            TaskCompletionSource<object> queuedItem;
            lock (_lockObject)
            {
                queuedItem = _taskQueue.Dequeue();
            }
            queuedItem.SetResult(null);
    
            // Await our own task
            await resultTask;
        }
    
        public void Release()
        {
            // Release the semaphore so another waiting thread can enter
            _semaphore.Release();
        }
    
        public void Dispose()
        {
            _semaphore?.Dispose();
        }
    }
    

    然后像这样使用:

    [HttpPost]
    [Route("message")]
    public async Task<IActionResult> SendMessageUser([FromBody]Message request)
    {
        try
        {
            await _fifoSemaphore.WaitAsync();
            // process message code here
        }
        finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
        {
            _fifoSemaphore.Release();
        }
    }
    

    其想法是,每个等待的项目都将首先排队。

    接下来,我们等待信号量让我们进入(我们的信号量一次允许一个项目)。

    然后我们将下一个等待项出列,并将其释放回API方法。

    最后,我们等待自己在队列中的位置完成,然后返回到API方法。

    在API方法中,我们异步地等待轮到我们,完成我们的任务,然后返回。其中包括一个try/finally,以确保在后续消息中释放信号量,即使在失败的情况下也是如此。