代码之家  ›  专栏  ›  技术社区  ›  Ben

Spring 2.1.0.M4 rabbitmq在运行时声明队列并将它们绑定到侦听器

  •  0
  • Ben  · 技术社区  · 6 年前

    假设我有一个声明的侦听器:

    Listener.java

    @RabbitListener(id = "test listener 1")
        public String test2(String req) {
            return req + " result";
    }
    

    ListenerTest.java

    Queue declaredQueue = new Queue("new.queue", false);
    
    admin.declareQueue(declaredQueue);
    
    SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) 
                registry.getListenerContainer("test listener 1");
    listener.addQueues(declaredQueue);
    

    String result = template.convertSendAndReceiveAsType("new.queue", "req", ParameterizedTypeReference.forType(String.class));
    

    但它只是超时并返回null。

    在调试器中检查侦听器时,我看不到任何绑定到新队列的使用者 enter image description here

    here 剩下的人来测试这个 here .

    值得注意的是,这种精确的设置在spring引导版本中有效

    2 回复  |  直到 6 年前
        1
  •  1
  •   Gary Russell    6 年前

    在运行时添加队列将导致容器回收其使用者(相当于停止并重新启动容器)。看到了吗 https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java#L687 这是由于消费者的设计方式;每个使用者线程从多个队列中消费。

    https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java#L168

    DirectMessageListenerContainer 添加队列时不需要重新启动其使用者(每个队列至少有一个使用者)。

        2
  •  0
  •   Ben    6 年前

    在2.1.0版本之后 SimpleMessageListenerContainer AbstractMessageListenerContainer#addQueues 导致 SimpleMessageListenerContainer#queuesChanged SimpleMessageListenerContainer#addQueueNames 被覆盖并工作。所以要么改成

    SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) 
                registry.getListenerContainer("test listener 1");
    listener.addQueueNames(declaredQueue.getName());
    

    queuesChanged 必须手动调用

    SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) 
                registry.getListenerContainer("test listener 1");
    listener.addQueues(declaredQueue);
    Method method = SimpleMessageListenerContainer.class.getDeclaredMethod("queuesChanged");
    method.setAccessible(true);
    method.invoke(listener);