代码之家  ›  专栏  ›  技术社区  ›  Yoni Gibbs

多个streamlistener与连接到Kafka的Spring云流

  •  0
  • Yoni Gibbs  · 技术社区  · 6 年前

    • 一个将主题“t1”和“t2”作为ktable读取,在其中一个中使用不同的键重新分区,然后连接到另一个中的数据

    由于第一个侦听器执行一些连接和聚合,因此会自动创建一些主题,例如“test-1-KTABLE-AGGREGATE-STATE-STORE-0000000007-repartition-0”(不确定这是否与问题有关。)

    当我设置代码时,使用 @StreamListener ,我在Spring Boot应用程序启动时得到以下错误:

    Exception in thread "test-d44cb424-7575-4f5f-b148-afad034c93f4-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition t1-0 for non-subscribed topic regex pattern; subscription pattern is t3
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)
    

    我认为重要的部分是:“分配分区 t1-0级 对于非订阅主题regex模式;订阅模式为 ". 这是两个不相关的主题,所以据我所知,与t3相关的东西都不应该订阅与t1相关的东西。导致问题的确切主题也会断断续续地发生变化:有时会提到自动生成的主题之一,而不是t1本身。

    @StreamListener
    fun listenerForT1AndT2(
            @Input("t1") t1KTable: KTable<String, T1Obj>,
            @Input("t2") t2KTable: KTable<String, T2Obj>) {
    
        t2KTable
            .groupBy(...)
            .aggregate(
                    { ... },
                    { ... },
                    { ... },
                    Materialized.with(Serdes.String(), JsonSerde(SomeObj::class.java)))
            .join(t1KTable,
                    { ... },
                    Materialized.`as`<String, SomeObj, KeyValueStore<Bytes, ByteArray>>("test")
                            .withKeySerde(Serdes.String())
                            .withValueSerde(JsonSerde(SomeObj::class.java)))
    }
    
    @StreamListener
    fun listenerForT3(@Input("t3") t3KStream: KStream<String, T3Obj>) {
        events.map { ... }
    }
    

    @流侦听器 ,并获取所有三个主题的参数,一切正常。

    @StreamListener
    fun compositeListener(
            @Input("t1") t1KTable: KTable<String, T1Obj>,
            @Input("t2") t2KTable: KTable<String, T2Obj>,
            @Input("t3") t3KStream: KStream<String, T3Obj>) {
        ...
    }
    

    方法。

    我知道有 content-based routing StreamListener 注释,但是如果方法定义了输入通道,那么我不确定是否需要在这里使用它-我会想到 @Input 方法参数上的注释足以告诉系统要绑定到哪些通道(以及卡夫卡主题)?如果我

    我还尝试将这两个侦听器方法分为两个独立的类,每个类都有 @EnableBinding

    我发现的与此错误消息相关的所有其他内容,例如。 here ,是关于有多个应用程序实例,但在我的例子中只有一个Spring Boot应用程序实例。

    1 回复  |  直到 6 年前
        1
  •  4
  •   sobychacko    6 年前

    每个应用程序都需要单独的应用程序id StreamListener

    spring.cloud.stream.kafka.streams.bindings.t1.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t2.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t3.consumer.application-id=processor2-application-id

    请查看更新 here 更多细节。 这是一个 working sample 多重的 流侦听器 是Kafka流处理器的方法。