代码之家  ›  专栏  ›  技术社区  ›  user1126515

关于复合处理器和接收器的问题

  •  1
  • user1126515  · 技术社区  · 6 年前

    我正在使用SpringCan数据流1.7.2.release,并尝试遵循 this blog post

    创建一个“处理器+接收器到单个应用程序的组合:一个新的接收器”。

    当我将我的代码作为博客的示例进行结构化时,我遇到了一些问题,我认为这是因为博客的示例使用了类似于处理器的java.util.function。

    我猜我应该使用java.util.consumer,因为我正试图将现有的接收器改为处理器-接收器混合型。

    我的班级如下:

    @EnableBinding(Sink.class)
    public class SampleCombinedSink extends Something {
    
        String modifiedPayload;
        Logger log;
    
        Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
    
        public void accept(String s){
          log.info("SampleCombinedSink.accept() s="+s);
        }
    
        @StreamListener(Sink.INPUT)
        public void doSink(String payload) {
    
          consumer.accept(payload);
          log.info("SampleCombinedSink.doSink() Payload received.");
          log.info("SampleCombinedSink.doSink() payload="+ payload);
          log.info("SampleCombinedSink.doSink() modifiedPayload="+ modifiedPayload);
        }
    }
    

    我的输出如下:

    SampleCombinedSink.doSink() Payload received.
    SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:53.330+0000
    SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:53.330+0000]
    SampleCombinedSink.doSink() Payload received.
    SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:54.332+0000
    SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:54.332+0000]
    SampleCombinedSink.doSink() Payload received.
    SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:55.333+0000
    SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:55.333+0000]
    SampleCombinedSink.doSink() Payload received.
    SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:56.313+0000
    SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:56.313+0000]
    

    我的源每秒发出一个时间戳。

    我对我的消费者感到困惑。

    Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };
    

    我想我可以做一些像:

    Consumer<String> consumer = i -> { i="STUFF ADDED BY CONSUMER i=["+i+"]"; };
    

    然后有效载荷进入

    @StreamListener(Sink.INPUT)
    public void doSink(String payload) {
    

    将包含“由消费者i添加的内容”=[ 时间戳

    它没有。

    我想将输入更改为dosink,并通过添加“由消费者添加的内容”来更改它,这样当输入达到dosink(字符串有效负载)时,有效负载将包含“由消费者添加的内容i=[ 时间戳

    我怎样才能做到?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Ilayaperumal Gopinathan    6 年前

    在这种情况下,您不必更改 Sink 相反,只需在 应用程序端。

    例如,使:

    a combination of processor + sink into a single application: “a new sink”."
    
    

    你只需要拥有你的 function 豆子是 应用或甚至拥有 功能 豆子在一个单独的人工制品中,但在 classpath 属于 应用。一旦你有了这个,你就可以定义 spring.cloud.stream.function.definition 对于 应用。

    你可以看到这个的样品 here . 应用程序 log-composed has 函数beans已定义。

    运行示例:

    dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
    Successfully registered application 'source:http-transformer'
    
    dataflow:>app register --name log-composed --type sink --uri file:///Users/igopinathan/.m2/repository/org/springframework/cloud/stream/app/log-composed/2.1.0.BUILD-SNAPSHOT/log-composed-2.1.0.BUILD-SNAPSHOT.jar
    Successfully registered application 'sink:log-composed'
    
    dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log-composed"
    Created new stream 'helloComposed'
    
    
    dataflow:>stream deploy helloComposed --properties "app.log-composed.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
    Deployment request has been sent for stream 'helloComposed'
    
    dataflow:>http post --data "friend" --target "http://localhost:9001"
    
    

    stream deploy 命令,您可以看到用于指定 function composition 日志合成 .