代码之家  ›  专栏  ›  技术社区  ›  maiksensi

春季云流兔MQ

  •  5
  • maiksensi  · 技术社区  · 8 年前

    https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html )这基本上就是我想要做的。它创建了一个连接了2个队列的直接交换,并根据路由密钥将消息路由到Q1或Q2。

    如果你看一下教程,整个过程非常简单,你创建了所有的部分,将它们绑定在一起,然后就可以开始了。

    我看到那条小溪有一个 BinderAwareChannelResolver 这似乎做了同样的事情。但我正在努力将所有这些整合在一起,以实现RabbitMQ Spring教程中的效果。我不确定这是否是一个依赖性问题,但我似乎从根本上误解了一些东西,我想:

    spring.cloud.stream.bindings.output.destination=myDestination
    spring.cloud.stream.bindings.output.group=consumerGroup
    spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression='key'
    

    应该抓住这个把戏。

    https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html

    编辑 :

    下面是一组最小的代码,它演示了如何执行我的要求。我没有附上 build.gradle 因为这是直截了当的(但如果有人感兴趣,请告诉我)

    application.properties

    spring.cloud.stream.bindings.output.destination=tut.direct
    spring.cloud.stream.rabbit.bindings.output.producer.exchangeType=direct
    spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers.type
    

    Sources.class :设置生产者频道

    public interface Sources {
    
        String OUTPUT = "output";
    
        @Output(Sources.OUTPUT)
        MessageChannel output();
    }
    

    StatusController.class

    /**
     * Status endpoint for the health-check service.
     */
    @RestController
    @EnableBinding(Sources.class)
    public class StatusController {
    
        private int index;
    
        private int count;
    
        private final String[] keys = {"orange", "black", "green"};
    
        private Sources sources;
    
        private StatusService status;
    
        @Autowired
        public StatusController(Sources sources, StatusService status) {
            this.sources = sources;
            this.status = status;
        }
    
        /**
         * Service available, service returns "OK"'.
         * @return The Status of the service.
         */
        @RequestMapping("/status")
        public String status() {
            String status = this.status.getStatus();
    
            StringBuilder builder = new StringBuilder("Hello to ");
            if (++this.index == 3) {
                this.index = 0;
            }
            String key = keys[this.index];
            builder.append(key).append(' ');
            builder.append(Integer.toString(++this.count));
            String payload = builder.toString();
            log.info(payload);
    
            // add kv pair - routingkeyexpression (which matches 'type') will then evaluate
            // and add the value as routing key
            Message<String> msg = new GenericMessage<>(payload, Collections.singletonMap("type", key));
            sources.output().send(msg);
    
            // return rest call
            return status;
        }
    }
    

    spring.cloud.stream.bindings.input.destination=tut.direct
    spring.cloud.stream.rabbit.bindings.input.consumer.exchangeType=direct
    spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=orange
    spring.cloud.stream.bindings.inputer.destination=tut.direct
    spring.cloud.stream.rabbit.bindings.inputer.consumer.exchangeType=direct
    spring.cloud.stream.rabbit.bindings.inputer.consumer.bindingRoutingKey=black
    

    Sinks.class :

    public interface Sinks {
    
        String INPUT = "input";
    
        @Input(Sinks.INPUT)
        SubscribableChannel input();
    
        String INPUTER = "inputer";
    
        @Input(Sinks.INPUTER)
        SubscribableChannel inputer();
    }
    

    ReceiveStatus.class

    @EnableBinding(Sinks.class)
    public class ReceiveStatus {
        @StreamListener(Sinks.INPUT)
        public void receiveStatusOrange(String msg) {
           log.info("I received a message. It was orange number: {}", msg);
        }
    
        @StreamListener(Sinks.INPUTER)
        public void receiveStatusBlack(String msg) {
            log.info("I received a message. It was black number: {}", msg);
        }
    }
    
    1 回复  |  直到 8 年前
        1
  •  4
  •   Ilayaperumal Gopinathan    8 年前

    Spring Cloud Stream允许您通过使应用程序能够连接(通过)来开发事件驱动的微服务应用程序 @EnableBinding )到使用Spring云流绑定器实现的外部消息传递系统(Kafka、RabbitMQ、JMS绑定器等)。显然,Spring Cloud Stream使用Spring AMQP实现RabbitMQ绑定器。

    BinderAwareChannelResolver 适用于生产者的动态绑定支持,我认为在您的案例中,它是关于配置交换和将消费者绑定到该交换的。

    bindingRoutingKey group 对于出站通道。这个 属性仅适用于消费者(因此为入站)。

    您可能还需要检查以下内容: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/57 正如我看到一些关于使用 routing-key-expression this 一个是使用表达式值。