您可以轻松地混合dsl和处理器api。
我理解您希望使用这两种方法构建处理图,为此,您可以调用dsl
StreamsBuilder::stream
,对于处理器api,您可以调用
StreamsBuilder::build()
得到
Topology
并应用函数添加处理器等。
源代码如下:
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Eddi1:
你可以造两个
拓扑结构
使用DSL,并行运行并收听不同的主题。可以像@matthias j.sax提到的那样
KStream::transform(...)
,
KStream::transformValues(...)
和
KStream::process(...)
. 代码应该是这样的:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);