代码之家  ›  专栏  ›  技术社区  ›  user25089203

替代Kafka 3.6.2中的@EnableBinding和@StreamListener

  •  0
  • user25089203  · 技术社区  · 1 年前

    我一直在进行从Spring2.X到3.X的迁移,我的旧代码在使用者中使用了@EnableBinding(Sink::class)和@StreamListener(Sink.INPUT)。由于两者都被弃用并从使用者中删除,当使用使用者运行我的Kafka测试时,会发生以下错误:

    org.opentest4j.AssertionFailedError: 
    expected: 1L
     but was: 0L
    

    我试着把@KafkaListener(topics=[“kafka test”],group id=“test group”)放在StreamListener所在的位置:但它导致了同样的错误。

    我重写了StreamListener来自的使用者:

        fun consume(message: Message<*>) {
    
    

    fun consume(): Consumer<Message<*>> = Consumer { message ->
    
    

    但这也导致了一个错误:

    Too many arguments for public open fun consume(): Consumer<Message<*>>
    

    通过的论据如下:

    kafkaConsumer.consume(GenericMessage("""{"aV": 1, "dI": 3, "id": $alertId}""".toByteArray()))
    
    

    将其重写为:

    
    @KafkaListener(topics = ["test-topic"], groupId = "group-test")
        fun consume(message: GenericMessage<ByteArray>) {`
    
    

    导致相同的预期1L,但却是0L错误。

    在迁移之前,只删除了EnableBinding和StreamListener,但所有旧的2.X依赖项都相同,也会发生相同的错误。我想知道,对于卡夫卡消费者来说,是否有与这两种功能相同的东西?

    当前迁移的kafka依赖项是Spring kafka 3.1.4、Spring cloud stream 4.1.1、kafka客户端6.2.1、Spring integration 6.1.6。

    0 回复  |  直到 1 年前
        1
  •  0
  •   Nikolai Shevchenko    1 年前

    而不是

    kafkaConsumer.consume(GenericMessage(...))
    

    你必须使用

    kafkaConsumer.consume().accept(GenericMessage(...))