我有一个如下的用例。对于每个即将到来的事件,我想看看
某个字段,查看其状态是否从A更改为B,如果是,则将其发送到
输出主题。流程如下:带有“xyz”键的事件进入状态A,一段时间后
另一个事件带有状态B的键“xyz”。我有使用高级DSL的代码。
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
有没有更好的方法来使用DSL编写这个逻辑?
关于由上述代码中的聚合创建的状态存储的几个问题。
-
它是否在默认情况下创建内存状态存储?
-
如果我有无限数量的唯一传入密钥,会发生什么?
如果它在默认情况下使用内存中的存储,我不需要切换到持久存储吗?
在DSL中,我们如何处理这种情况?
-
如果状态存储非常大(内存中或持久性),它将如何影响
启动时间?如何使流处理等待,以便完全初始化存储?
或者,在处理任何传入事件之前,Kafka流是否会确保状态存储已完全初始化?
事先谢谢!