代码之家  ›  专栏  ›  技术社区  ›  Yanick Salzmann

卡夫卡流重新分区后不使用serde

  •  0
  • Yanick Salzmann  · 技术社区  · 6 年前

    我的kafka streams应用程序正在使用使用以下键值布局的kafka主题: String.class -> HistoryEvent.class

    打印当前主题时,可以确认:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
    flow1 --  SUCCESS     #C:\Daten\file-service\in\crypto.p12
    

    “流1”是 String 键和后面的部分 -- 是序列化值。

    我的流程设置如下:

        KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
                historyEventSerde));
    
    
        eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
                .groupByKey()
                .reduce((e1, e2) -> e2,
                        Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                                .withKeySerde(new HistoryEventKeySerde()));
    

    据我所知,我告诉它使用 HistoryEvent 因为这就是主题。然后我“重新键入”它以使用组合键,该组合键应使用提供的SERDE存储在本地 HistoryEventKey.class . 据我所知,这将导致使用新密钥创建一个附加主题(可以在Kafka容器中的主题列表中看到)。这很好。

    现在的问题是,即使是在一个干净的环境中,应用程序也无法启动,主题中只有一个文档:

    org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
    Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    

    从信息中很难看出问题的确切位置。它在我的基本主题中说,但这是不可能的,因为密钥不属于类型 HistoryEventKey . 因为我为 历史事件密钥 reduce 它也不能与本地商店一起使用。

    唯一对我有意义的是它与 selectKey 导致重新排列和新主题的操作。然而,我不知道如何为这个行动提供SERDE。我不想将其设置为默认值,因为它不是默认的键serde。

    1 回复  |  直到 6 年前
        1
  •  3
  •   Yanick Salzmann    6 年前

    在对执行进行更多的调试之后,我能够发现新主题是在 groupByKey 步骤。您可以提供 Grouped 提供了指定 Serde 用于键和值:

        eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
                .groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
                        .withKeySerde(new HistoryEventKeySerde())
                        .withValueSerde(new HistoryEventSerde())
                )
                .reduce((e1, e2) -> e2,
                        Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                                .withKeySerde(new HistoryEventKeySerde()));
    
    推荐文章