代码之家  ›  专栏  ›  技术社区  ›  kan

卡夫卡消费者偏移自动复位等参数

  •  0
  • kan  · 技术社区  · 3 年前

    我有一个KafkaConsumer,它需要订阅两个主题 topicA topicB 。但是,我需要一些不同的参数。例如,如果我需要 auto.offset.reset 对于 topicA earilest 而对于 主题B 应该是 latest 。我看不出有什么简单的方法可以做到这一点。一种选择是运行两个使用者,但在这种情况下,我需要为它们提供两个轮询线程,因此应该处理多线程。有什么更简单的方法吗?

    0 回复  |  直到 3 年前
        1
  •  1
  •   OneCricketeer Gabriele Mariotti    3 年前

    创建两个(或多个)线程是正确的。

    使用者不是线程安全的,无论如何都应该与其他进程隔离和分离。

    您可以使用更高级别的Kafka库(例如Vert.x/Spring)来简化这一过程。

        2
  •  0
  •   sawim    3 年前

    如果你需要使用一个消费者,那么我认为你可以设置 auto.reset.offset latest 然后移动的偏移量 topicA 手动(如果需要)。为此,在订阅和轮询循环之间,您可以:

    1. 获取分配给使用者的分区(方法 assignment )
    2. 检查主题的承诺偏移量 topicA 通过筛选的分区 topicA 从上一点(方法 committed ). 如果结果是 null ,然后打电话 seekToBeginning 方法