代码之家  ›  专栏  ›  技术社区  ›  px5x2

Kafka将GlobalKTable同步流传输到应用程序

  •  1
  • px5x2  · 技术社区  · 7 年前

    使用正常的k流,kafka将每个应用程序的偏移量存储在其内部偏移量主题上。在应用程序重新启动时,应用程序将根据 auto.offset.reset 政策这确实得到了解释 here

    我用的是卡夫卡流 GlobalKTable 通过应用程序复制数据。然而,我对应用程序的重启有点困惑,因为 它不会在id为的应用程序上填充( StreamsConfig.APPLICATION_ID_CONFIG )不会更改 重新启动后(由于部署或崩溃)。每当我用新id启动streams应用程序的新实例时, 全局语言表 已填充。

    A. 全局语言表 没有什么不同,只是启用了日志压缩功能的主题。javadoc创建 StreamsBuilder#globalTable 国家:

    streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))
    

    请注意 全局语言表 始终应用“自动偏移重置”策略 “最早”不考虑中的指定值 StreamsConfig

    因此我想, 无论应用程序id如何 ,my streams应用程序读取 kglobaltable-store 主题,并像这样在本地填充存储 github issue 。javadoc所指的主题似乎是 some-topic 而不是 kglobaltable商店

    这是预期的行为吗 全局语言表 ?此外,对于支持的主题是否有保留策略 GlobalKTables ?

    这种行为还会导致 kglobaltable商店 主题当我们在上有保留策略时 一些话题 。示例如下:

    在时间t0,let;

    一些主题:(1,a)->(2,b)->(1,c)

    kglobaltable存储:[(1,c),(2,b)]

    在保留一段时间(2,b)后,我启动我的streams应用程序(使用新id)并 全局语言表 只有在这种情况下才存储记录(1,c)。

    编辑: 我正在使用 InMemoryKeyValueStore

    1 回复  |  直到 7 年前
        1
  •  2
  •   Matthias J. Sax    7 年前

    因为您正在使用 InMemoryKeyValueStore 我假设您遇到了以下错误: https://issues.apache.org/jira/browse/KAFKA-6711

    作为一种解决方法,您可以删除本地检查点文件(cf GlobalKTable checkpoints )对于全局存储,这将在重新启动时触发引导。或者切换回默认值 RocksDB 百货商店

    顺便说一句:如果您直接以表或全局表的形式读取主题,Kafka Streams将不会为容错创建额外的变更日志主题,而是为此使用原始输入主题(这将减少Kafka集群内的存储需求)。因此,这些输入主题应该启用日志压缩。