代码之家  ›  专栏  ›  技术社区  ›  user1126515

在springcloudstreamdataflow和rabbitmq中,消费者组的正确用法是什么?

  •  0
  • user1126515  · 技术社区  · 6 年前

    后续行动:

    one SCDF source, 2 processors but only 1 processes each item

    this is the graph i'm trying to implement

    我的处理器有以下功能应用程序.属性

    spring.application.name=${vcap.application.name:sample-processor}
    info.app.name=@project.artifactId@
    info.app.description=@project.description@
    info.app.version=@project.version@
    management.endpoints.web.exposure.include=health,info,bindings
    spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
    spring.cloud.stream.bindings.input.group=input
    

    是“云。溪流.bindings.input.group“正确指定?

    以下是处理器代码:

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(String inputStr) throws InterruptedException{
    
        ApplicationLog log = new ApplicationLog(this, "timerMessageSource");
    
        String message = " I AM [" + inputStr + "] AND I HAVE BEEN PROCESSED!!!!!!!";
    
        log.info("SampleProcessor.transform() incoming inputStr="+inputStr);
    
        return message;
    }
    

    @Transformer注释是将这段代码与云。溪流.bindings.input.group“从应用程序属性?是否需要其他注释?

    以下是我的消息来源:

    private String format = "EEEEE dd MMMMM yyyy HH:mm:ss.SSSZ";
    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public MessageSource<String> timerMessageSource() {
        ApplicationLog log = new ApplicationLog(this, "timerMessageSource");
        String message = new SimpleDateFormat(format).format(new Date());
        log.info("SampleSource.timeMessageSource() message=["+message+"]");
        return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
    }
    

    加入@Poller会给我带来什么问题吗?

    这是我如何在SCDF shell中定义2个处理器流(del-1和del-2):

    stream create del-1 --definition ":split > processor-that-does-everything-sleeps5 --spring.cloud.stream.bindings.applicationMetrics.destination=metrics > :merge"
    
    stream create del-2 --definition ":split > processor-that-does-everything-sleeps5 --spring.cloud.stream.bindings.applicationMetrics.destination=metrics > :merge"
    

    我需要在那里做些什么不同的事情吗?

    RabbitMQ由bitnami给出/rabbitmq:3.7.2-r1和配置了以下道具:

    RABBITMQ_USERNAME: user
    RABBITMQ_PASSWORD <redacted>:  
    RABBITMQ_ERL_COOKIE <redacted>:  
    RABBITMQ_NODE_PORT_NUMBER: 5672
    RABBITMQ_NODE_TYPE: stats
    RABBITMQ_NODE_NAME: rabbit@localhost
    RABBITMQ_CLUSTER_NODE_NAME: 
    RABBITMQ_DEFAULT_VHOST: /
    RABBITMQ_MANAGER_PORT_NUMBER: 15672
    RABBITMQ_DISK_FREE_LIMIT: "6GiB"
    

    是否需要其他环境变量?

    0 回复  |  直到 6 年前