代码之家  ›  专栏  ›  技术社区  ›  sobychacko

卡夫卡流中的聚集和状态存储保留

  •  3
  • sobychacko  · 技术社区  · 6 年前

    我有一个如下的用例。对于每个即将到来的事件,我想看看 某个字段,查看其状态是否从A更改为B,如果是,则将其发送到 输出主题。流程如下:带有“xyz”键的事件进入状态A,一段时间后 另一个事件带有状态B的键“xyz”。我有使用高级DSL的代码。

    final KStream<String, DomainEvent> inputStream....
    
    final KStream<String, DomainEvent> outputStream = inputStream
              .map((k, v) -> new KeyValue<>(v.getId(), v))
                        .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                        .aggregate(DomainStatusMonitor::new,
                                (k, v, aggregate) -> {
                                    aggregate.updateStatusMonitor(v);
                                    return aggregate;
                                }, Materialized.with(Serdes.String(), jsonSerde))
                        .toStream()
                        .filter((k, v) -> v.isStatusChangedFromAtoB())
                        .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
    

    有没有更好的方法来使用DSL编写这个逻辑?

    关于由上述代码中的聚合创建的状态存储的几个问题。

    1. 它是否在默认情况下创建内存状态存储?
    2. 如果我有无限数量的唯一传入密钥,会发生什么? 如果它在默认情况下使用内存中的存储,我不需要切换到持久存储吗? 在DSL中,我们如何处理这种情况?
    3. 如果状态存储非常大(内存中或持久性),它将如何影响 启动时间?如何使流处理等待,以便完全初始化存储? 或者,在处理任何传入事件之前,Kafka流是否会确保状态存储已完全初始化?

    事先谢谢!

    1 回复  |  直到 6 年前
        1
  •  8
  •   Matthias J. Sax    6 年前
    1. 默认情况下,将使用持久RockSDB存储。如果你想使用内存中的存储,你可以输入 Materialized.as(Stores.inMemoryKeyValueStore(...))

    2. 如果您有无限多的唯一键,那么您最终会耗尽主内存或磁盘,应用程序也会死掉。根据您的语义,您可以通过使用带有大“间隙”参数的会话窗口聚合来终止旧密钥,从而获得“ttl”。

    3. 在处理新数据之前,将始终还原状态。如果在内存存储中使用,则会使用底层的changelog主题。根据您所在州的大小,这可能需要一段时间。如果使用Persistent RockSDB Store,则状态将从磁盘加载,因此不需要恢复,应立即进行处理。只有当您释放本地磁盘上的状态时,才会从changelog主题进行还原。