首先,正确的方法是创建处理程序bean,而不是在接收方类中创建消息队列成员。
public interface MessageHandler extends Consumer<MessageBO> {
public default void handle(MessageBO msg) { accept(msg); }
}
@Component
public class AMQListener {
@Resource("multiplexer")
MessageHandler handler;
@JmsListener(destination = "${spring.activemq.topic}")
public void Consume(TextMessage message) {
try {
String json = message.getText();
MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
handler.handle(bo);
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后,您将在处理程序bean中拥有队列
@Component("multiplexer")
public class MessageMultiplexer implements MessageHandler {
@Autowired
MessageHandler actualConsumer;
ExecutorService executor = Executors.newFixedThreadPool(4);
public void accept(MessageBO msg) {
executor.submit(msg -> actualConsumer.handle(msg));
}
}
在这种情况下,执行者几乎就是队列。
警告:您没有这样的1024限制。您可以使用
ThreadPoolExecutor
构造函数,并向其传递一个有限队列。