代码之家  ›  专栏  ›  技术社区  ›  Josh Kitchens

RabbitMQ CLI状态与管理门户不同

  •  0
  • Josh Kitchens  · 技术社区  · 9 年前

    我注意到,当我从命令行运行RabbitMQ status命令时( rabbitmqctl status ),所有报道的数字都与我所知的现实大相径庭。我在管理门户网站上看到的内容证实了我的现实。

    以下是CLI状态的输出:

    [{pid,26647},
     {running_applications,
         [{rabbitmq_management,"RabbitMQ Management Console","3.5.4"},
          {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.5.4"},
          {webmachine,"webmachine","1.10.3-rmq3.5.4-gite9359c7"},
          {mochiweb,"MochiMedia Web Server","2.7.0-rmq3.5.4-git680dba8"},
          {rabbitmq_management_agent,"RabbitMQ Management Agent","3.5.4"},
          {rabbit,"RabbitMQ","3.5.4"},
          {os_mon,"CPO  CXC 138 46","2.3"},
          {inets,"INETS  CXC 138 49","5.10.4"},
          {mnesia,"MNESIA  CXC 138 12","4.12.4"},
          {amqp_client,"RabbitMQ AMQP Client","3.5.4"},
          {xmerl,"XML parser","1.3.7"},
          {sasl,"SASL  CXC 138 11","2.4.1"},
          {stdlib,"ERTS  CXC 138 10","2.3"},
          {kernel,"ERTS  CXC 138 10","3.1"}]},
     {os,{unix,linux}},
     {erlang_version,
         "Erlang/OTP 17 [erts-6.3] [source] [64-bit] [smp:4:4] [async-threads:64] [kernel-poll:true]\n"},
     {memory,
         [{total,45994136},
          {connection_readers,101856},
          {connection_writers,54800},
          {connection_channels,165968},
          {connection_other,373008},
          {queue_procs,175376},
          {queue_slave_procs,0},
          {plugins,437024},
          {other_proc,13385792},
          {mnesia,131904},
          {mgmt_db,484216},
          {msg_index,53112},
          {other_ets,1119384},
          {binary,3890640},
          {code,20097289},
          {atom,711569},
          {other_system,4812198}]},
     {alarms,[]},
     {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
     {vm_memory_high_watermark,0.4},
     {vm_memory_limit,787190579},
     {disk_free_limit,50000000},
     {disk_free,778919936},
     {file_descriptors,
         [{total_limit,924},
          {total_used,13},
          {sockets_limit,829},
          {sockets_used,11}]},
     {processes,[{limit,1048576},{used,350}]},
     {run_queue,0},
     {uptime,911}]
    

    读者、作家、频道等的数量,基本上每一个数字都是千倍的倍数。

    管理门户中的数字(见下文)是正确的。共10个连接,每个连接有两个通道

    actual number of connections

    我的所有队列都是不持久的,我只使用扇出交换发送非持久消息。据我理解,这应该意味着,如果出现问题(这对我的需求很好),就永远不会坚持下去。

    我注意到,每当我启动或关闭一个连接到代理的模块时,命令行上的读/写器数量会增加约17000,尽管门户中只会增加/减少1个。

    以下是我的代理配置代码供参考:

    private String endPoint;
    private int port;
    private String userName;
    private String password;
    private Exchange publisherExchange;
    private ExchangeType publisherExchangeType;
    private Map<Exchange, ExchangeType> subscriptionExchanges;
    
    private Channel publishChannel;
    private Channel subscriptionChannel;
    private Consumer consumer;
    
    private BrokerHandler(BrokerHandlerBuilder builder) throws ConnectException{
        this.endPoint = builder.endPoint;
        this.port = builder.port;
        this.userName = builder.userName;
        this.password = builder.password;
        this.publisherExchange = builder.publisherExchange;
        this.publisherExchangeType = builder.publisherExchangeType;
        this.subscriptionExchanges = builder.subscriptionExchanges;
    
        connect();
    }
    
    private void connect() throws ConnectException{
        ConnectionFactory factory = new ConnectionFactory();
    
        factory.setHost(this.endPoint);
        factory.setPort(this.port);
    
        if(this.userName != null && this.password != null){
            factory.setUsername(this.userName);
            factory.setPassword(this.password);
    
            factory.setAutomaticRecoveryEnabled(true);
            factory.setNetworkRecoveryInterval(RMQConstants.RABBITMQ_MAX_RETRY_DELAY);
    
        }
    
        try {
            log.info("Registering with broker on topic " + this.publisherExchange.toString() + " on " + this.endPoint + "...");
            connection = factory.newConnection();
            publishChannel = connection.createChannel();
            subscriptionChannel = connection.createChannel();
            configureConsumer();
    
            publishChannel.exchangeDeclare(this.publisherExchange.toString(), this.publisherExchangeType.toString());
    
        } catch(Exception e){
            throw new ConnectException("Unable to connect to RabbitMQ broker.");
        }
    
        if(this.subscriptionExchanges.size() > 0){
            subscribe(this.subscriptionExchanges);
        }
    }
    /**
     * Allows callers to publish a message to the broker, which will be broadcast to all listeners using a FANOUT strategy
     * @throws ConnectException if the handler is not connected to the broker
     */
    private void publishToBroker(String msg) throws ConnectException{
        try {
            publishChannel.basicPublish(this.publisherExchange.toString(), "", null, msg.getBytes());
        } 
        catch (IOException e) {
            log.error("Unable to write message to broker.", e);
        }
    }
    
    private void subscribe(Map<Exchange, ExchangeType> exchanges){
        try {
            String queueName = subscriptionChannel.queueDeclare().getQueue();
    
            exchanges.forEach((k,v) -> {
                try {
                    subscriptionChannel.exchangeDeclare(k.toString(), v.toString());
                    subscriptionChannel.queueBind(queueName, k.toString(), "");
                } catch (Exception e) {
                    log.error("Error declaring exchanges for exchange: " + k.toString(), e);
                }
            });
    
            subscriptionChannel.basicConsume(queueName, true, consumer);    
        } 
        catch(Exception e){
            log.error("Error declaring a queue for subscription channel", e);
        }
    }
    
    private void configureConsumer(){
        consumer = new DefaultConsumer(subscriptionChannel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope,
                                         AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                handleMessage(message);
              }
            };
    }
    

    客户端使用构建器模式实例化代理连接,在这一点上,他们指定了自己的发布交换和希望订阅的任意数量的交换。这个系统中总共只有19个交换。

    消息被正确发布和接收,但我一直收到报告说代理正在使服务器陷入困境。我会更密切地监视它,但我真的希望能够解释状态呼叫中的这些奇怪结果。我尝试过停止应用程序并重新设置然后重新配置代理,这会使连接计数返回到0,但一旦模块开始重新连接,数字就会开始回升。

    感谢您花时间仔细阅读。任何建议都将不胜感激!

    1 回复  |  直到 9 年前
        1
  •  1
  •   user825623 user825623    9 年前

    这个 connection_readers , connection_writers , connection_channels ,等等 memory 节,单位是字节,而不是连接数。这与您报告的管理UI部分完全不同。

    要通过CLI获取有关连接数的数据,请使用 rabbitmqctl list_connections 命令

    channels
        Number of channels using the connection.
    

    另请参见 list_exchanges , list_queues list_consumers .