我在一个kafka流应用程序中工作,我在其中处理日志事件。在本例中,我希望将工作流输入类型聚合为工作流类型。我有问题让总的工作。
final KStream<String, WorkflowInput> filteredStream = someStream;
final KTable<String, Workflow> aggregatedWorkflows = filteredStream
.peek((k, v) -> {
if (!(v instanceof WorkflowInput)) {
throw new AssertionError("Type not expected");
}
})
.groupByKey()
.<Workflow>aggregate(Workflow::new, (k, input, workflow) -> workflow.updateFrom(input),
Materialized.<String, Workflow, KeyValueStore<Bytes, byte[]>>as("worflow-cache")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(new JsonSerializer<Workflow>(), new JsonDeserializer<Workflow>(Workflow.class)));
org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: workflowauditstreamer.WorkflowInput).
需要注意两件事:
*值序列化程序是StringSerializer,而我使用
withValueSerde
.
*实际值类型为
WorkflowInput
正如我所料
Workflow
因为那是我的聚合值类型。
我对卡夫卡流还不熟悉,所以我可能遗漏了一些显而易见的东西,但我想不出来。我错过了什么?