代码之家  ›  专栏  ›  技术社区  ›  André

聚合上使用了错误的序列化程序

  •  0
  • André  · 技术社区  · 7 年前

    我在一个kafka流应用程序中工作,我在其中处理日志事件。在本例中,我希望将工作流输入类型聚合为工作流类型。我有问题让总的工作。

    final KStream<String, WorkflowInput> filteredStream = someStream;
    final KTable<String, Workflow> aggregatedWorkflows = filteredStream
        .peek((k, v) -> {
            if (!(v instanceof WorkflowInput)) {
                throw new AssertionError("Type not expected");
            }
        })
        .groupByKey()
        .<Workflow>aggregate(Workflow::new, (k, input, workflow) -> workflow.updateFrom(input),
                Materialized.<String, Workflow, KeyValueStore<Bytes, byte[]>>as("worflow-cache")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(Serdes.serdeFrom(new JsonSerializer<Workflow>(), new JsonDeserializer<Workflow>(Workflow.class)));
    

    org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: workflowauditstreamer.WorkflowInput).

    需要注意两件事: *值序列化程序是StringSerializer,而我使用 withValueSerde . *实际值类型为 WorkflowInput 正如我所料 Workflow 因为那是我的聚合值类型。

    我对卡夫卡流还不熟悉,所以我可能遗漏了一些显而易见的东西,但我想不出来。我错过了什么?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Matthias J. Sax    7 年前

    如果覆盖默认值 Serde 从配置中,它是在操作员就地覆盖。它不会传播到下游(Kafka2.0——有WIP可以改进这一点)。

    因此,您需要通过 塞德 你用在 someStream = builder.stream(...) .groupByKey(Serialized.with(...)) 也是。