代码之家  ›  专栏  ›  技术社区  ›  Shahid Ghafoor

spring引导和阻止队列侦听器

  •  1
  • Shahid Ghafoor  · 技术社区  · 7 年前

    我已经用spring boot实现了jms,我正在使用@JmsListener来听这个主题

      @Component
        public class AMQListner {
            BlockingQueue<MessageBO> queue = new ArrayBlockingQueue<>(1024);
            @JmsListener(destination = "${spring.activemq.topic}")
            public void Consume(TextMessage message) {
                try {
                    String json = message.getText();
                    MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
                    queue.add(bo);
                } catch (JMSException e) {
                    e.printStackTrace();
                } catch (JsonParseException e) {
                    e.printStackTrace();
                } catch (JsonMappingException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    

    现在,我需要一个侦听器来侦听阻塞队列,如果它有值,则进行处理。我们可以在spring boot中使用注释来实现这一点吗?

    1 回复  |  直到 7 年前
        1
  •  1
  •   daniu    7 年前

    首先,正确的方法是创建处理程序bean,而不是在接收方类中创建消息队列成员。

    public interface MessageHandler extends Consumer<MessageBO> {
        public default void handle(MessageBO msg) { accept(msg); }
    }
    
    @Component
    public class AMQListener {
        @Resource("multiplexer")
        MessageHandler handler;
    
        @JmsListener(destination = "${spring.activemq.topic}")
        public void Consume(TextMessage message) {
            try {
                String json = message.getText();
                MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
                handler.handle(bo);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    然后,您将在处理程序bean中拥有队列

    @Component("multiplexer")
    public class MessageMultiplexer implements MessageHandler {
        @Autowired
        MessageHandler actualConsumer;
    
        ExecutorService executor = Executors.newFixedThreadPool(4);
        public void accept(MessageBO msg) {
            executor.submit(msg -> actualConsumer.handle(msg));
        }
    }
    

    在这种情况下,执行者几乎就是队列。

    警告:您没有这样的1024限制。您可以使用 ThreadPoolExecutor 构造函数,并向其传递一个有限队列。