我的kafka streams应用程序正在使用使用以下键值布局的kafka主题:
String.class -> HistoryEvent.class
打印当前主题时,可以确认:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 -- SUCCESS
“流1”是
String
键和后面的部分
--
是序列化值。
我的流程设置如下:
KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
historyEventSerde));
eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
.groupByKey()
.reduce((e1, e2) -> e2,
Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
.withKeySerde(new HistoryEventKeySerde()));
据我所知,我告诉它使用
弦
和
HistoryEvent
因为这就是主题。然后我“重新键入”它以使用组合键,该组合键应使用提供的SERDE存储在本地
HistoryEventKey.class
. 据我所知,这将导致使用新密钥创建一个附加主题(可以在Kafka容器中的主题列表中看到)。这很好。
现在的问题是,即使是在一个干净的环境中,应用程序也无法启动,主题中只有一个文档:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
从信息中很难看出问题的确切位置。它在我的基本主题中说,但这是不可能的,因为密钥不属于类型
HistoryEventKey
. 因为我为
历史事件密钥
在
reduce
它也不能与本地商店一起使用。
唯一对我有意义的是它与
selectKey
导致重新排列和新主题的操作。然而,我不知道如何为这个行动提供SERDE。我不想将其设置为默认值,因为它不是默认的键serde。