我一直在进行从Spring2.X到3.X的迁移,我的旧代码在使用者中使用了@EnableBinding(Sink::class)和@StreamListener(Sink.INPUT)。由于两者都被弃用并从使用者中删除,当使用使用者运行我的Kafka测试时,会发生以下错误:
org.opentest4j.AssertionFailedError:
expected: 1L
but was: 0L
我试着把@KafkaListener(topics=[“kafka test”],group id=“test group”)放在StreamListener所在的位置:但它导致了同样的错误。
我重写了StreamListener来自的使用者:
fun consume(message: Message<*>) {
到
fun consume(): Consumer<Message<*>> = Consumer { message ->
但这也导致了一个错误:
Too many arguments for public open fun consume(): Consumer<Message<*>>
通过的论据如下:
kafkaConsumer.consume(GenericMessage("""{"aV": 1, "dI": 3, "id": $alertId}""".toByteArray()))
将其重写为:
@KafkaListener(topics = ["test-topic"], groupId = "group-test")
fun consume(message: GenericMessage<ByteArray>) {`
导致相同的预期1L,但却是0L错误。
在迁移之前,只删除了EnableBinding和StreamListener,但所有旧的2.X依赖项都相同,也会发生相同的错误。我想知道,对于卡夫卡消费者来说,是否有与这两种功能相同的东西?
当前迁移的kafka依赖项是Spring kafka 3.1.4、Spring cloud stream 4.1.1、kafka客户端6.2.1、Spring integration 6.1.6。