代码之家  ›  专栏  ›  技术社区  ›  SunilS

Kafka流:单个应用程序中的多个拓扑

  •  0
  • SunilS  · 技术社区  · 6 年前

    我想在一个kafka流应用程序中同时使用处理器api和dsl。此外,如何在一个应用程序中构建和运行多个拓扑(比如1个使用处理器api,其他使用dsl)。

    0 回复  |  直到 6 年前
        1
  •  3
  •   Bartosz Wardziński    6 年前

    您可以轻松地混合dsl和处理器api。 我理解您希望使用这两种方法构建处理图,为此,您可以调用dsl StreamsBuilder::stream ,对于处理器api,您可以调用 StreamsBuilder::build() 得到 Topology 并应用函数添加处理器等。

    源代码如下:

    StreamsBuilder builder = new StreamsBuilder();
    builder.<String, String>stream("input1").to("output1");
    Topology topology = builder.build();
    topology.addSource("inputNode","input2");
    topology.addProcessor("processor1", InputProcessor::new, "inputNode");
    topology.addSink("sink1", "output2", "processor1");
    
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    

    Eddi1:

    你可以造两个 拓扑结构 使用DSL,并行运行并收听不同的主题。可以像@matthias j.sax提到的那样 KStream::transform(...) , KStream::transformValues(...) KStream::process(...) . 代码应该是这样的:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
    KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);