-
一个将主题“t1”和“t2”作为ktable读取,在其中一个中使用不同的键重新分区,然后连接到另一个中的数据
-
由于第一个侦听器执行一些连接和聚合,因此会自动创建一些主题,例如“test-1-KTABLE-AGGREGATE-STATE-STORE-0000000007-repartition-0”(不确定这是否与问题有关。)
当我设置代码时,使用
@StreamListener
,我在Spring Boot应用程序启动时得到以下错误:
Exception in thread "test-d44cb424-7575-4f5f-b148-afad034c93f4-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition t1-0 for non-subscribed topic regex pattern; subscription pattern is t3
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)
我认为重要的部分是:“分配分区
t1-0级
对于非订阅主题regex模式;订阅模式为
". 这是两个不相关的主题,所以据我所知,与t3相关的东西都不应该订阅与t1相关的东西。导致问题的确切主题也会断断续续地发生变化:有时会提到自动生成的主题之一,而不是t1本身。
@StreamListener
fun listenerForT1AndT2(
@Input("t1") t1KTable: KTable<String, T1Obj>,
@Input("t2") t2KTable: KTable<String, T2Obj>) {
t2KTable
.groupBy(...)
.aggregate(
{ ... },
{ ... },
{ ... },
Materialized.with(Serdes.String(), JsonSerde(SomeObj::class.java)))
.join(t1KTable,
{ ... },
Materialized.`as`<String, SomeObj, KeyValueStore<Bytes, ByteArray>>("test")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(SomeObj::class.java)))
}
@StreamListener
fun listenerForT3(@Input("t3") t3KStream: KStream<String, T3Obj>) {
events.map { ... }
}
@流侦听器
,并获取所有三个主题的参数,一切正常。
@StreamListener
fun compositeListener(
@Input("t1") t1KTable: KTable<String, T1Obj>,
@Input("t2") t2KTable: KTable<String, T2Obj>,
@Input("t3") t3KStream: KStream<String, T3Obj>) {
...
}
方法。
我知道有
content-based routing
StreamListener
注释,但是如果方法定义了输入通道,那么我不确定是否需要在这里使用它-我会想到
@Input
方法参数上的注释足以告诉系统要绑定到哪些通道(以及卡夫卡主题)?如果我
做
我还尝试将这两个侦听器方法分为两个独立的类,每个类都有
@EnableBinding
我发现的与此错误消息相关的所有其他内容,例如。
here
,是关于有多个应用程序实例,但在我的例子中只有一个Spring Boot应用程序实例。