代码之家  ›  专栏  ›  技术社区  ›  cdarrigo

.NET Core 1.1接收RabbitMQ消息

  •  1
  • cdarrigo  · 技术社区  · 8 年前

    问题是,我可以很好地将消息排队,但所有读取消息的尝试都会导致没有消息被读取。很明显,我没有正确使用队列,希望能得到一些指导。

    谢谢

        using System;
    using System.Collections.Generic;
    using System.Text;
    using Newtonsoft.Json;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace RabbitMqDemo
    {
        class Program
        {
            static void Main(string[] args)
            {
                var client = new MessagingClient();
                if (args.Length == 1 && args[0].ToLower() == "-r")
                {
                    Console.WriteLine("Reading Messages from Queue.");
                    var messages = client.ReceiveMessages();
                    Console.WriteLine($"Read {messages.Length} message(s) from queue.");
                    foreach(var msg in messages)
                        Console.WriteLine(msg);
                }
                else
                {
                    foreach (var msg in args)
                    {
                        client.SendMessage(msg);
                    }
                    Console.WriteLine($"Enqueued {args.Length} Message.");
                }
            }
        }
    
        internal class MessagingClient
        {
            private readonly ConnectionFactory connectionFactory;
            private string ExchangeName => "defaultExchange";
            private string RoutingKey => "";
            private string QueueName => "Demo";
    
            private string HostName => "localhost";
    
    
            public MessagingClient()
            {
                this.connectionFactory = new ConnectionFactory {HostName = this.HostName};
            }
    
            public void SendMessage(string message)
            {
                using (var connection = this.connectionFactory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        this.QueueDeclare(channel, this.QueueName);
                        var properties = this.SetMessageProperties(channel, message);
    
                        string messageJson = JsonConvert.SerializeObject(message);
                        var body = Encoding.UTF8.GetBytes(messageJson);
    
                        channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body);
                    }
                }
            }
    
            public string[] ReceiveMessages()
            {
                var messages = new List<string>();
                using (var connection = this.connectionFactory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        this.QueueDeclare(channel, this.QueueName);
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {                        
                            string bodystring = Encoding.UTF8.GetString(ea.Body);
                            messages.Add(bodystring);
    
                            // ReSharper disable once AccessToDisposedClosure
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };
                        channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer);
                    }
                }
                return messages.ToArray();
            }
    
            private void QueueDeclare(IModel channel, string queueName)
            {
                channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct,
                    durable: true,
                    autoDelete: false,
                    arguments: null);
    
                var queueDeclared = channel.QueueDeclare(queue: queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
    
                channel.QueueBind(queueName, ExchangeName, RoutingKey);            
            }
    
            private IBasicProperties SetMessageProperties(IModel channel, object message)
            {
                var properties = channel.CreateBasicProperties();
                properties.ContentType = "application/json";
                properties.Persistent = true;
                return properties;
            }
    
        }
    }
    
    1 回复  |  直到 8 年前
        1
  •  1
  •   Luke Bakken    8 年前
    • 首先,使用管理UI确保正确设置了exchange和队列,并且消息已发布到其中。
    • ReceiveMessages() 因此,在事件有机会触发之前,您的读者可能会立即返回一个空数组。当使用者从RabbitMQ接收消息时,您没有代码等待。中的通知 the tutorial 怎样 Console.ReadLine() 已使用。在您的示例中,可以使用同步对象( ManualResetEvent ReceiveMessages()