代码之家  ›  专栏  ›  技术社区  ›  Raman

KafkaConsumer:消费所有可用消息一次,然后退出

  •  1
  • Raman  · 技术社区  · 7 年前

    KafkaConsumer ,使用Kafka 2.0.0,它一次消耗所有可用消息,并立即退出。这与标准控制台用户实用程序略有不同,因为该实用程序 waits for a specified timeout 对于新邮件,并且仅在超时过期后退出。

    这个看似简单的任务似乎很难使用 卡夫卡消费品 . 我的直觉反应是以下伪代码:

    consumer.assign(all partitions)
    consumer.seekToBeginning(all partitions)
    do
      result = consumer.poll(Duration.ofMillis(0))
      // onResult(result)
    while result is not empty
    

    poll 始终返回空集合,即使主题中有许多消息。

    对此进行研究,一个原因可能是分配/订阅 considered lazy 投票 循环已经完成(尽管我在文档中找不到对这个断言的任何支持)。但是,下面的伪代码 每次调用时返回空集合

    consumer.assign(all partitions)
    consumer.seekToBeginning(all partitions)
    // returns nothing
    result = consumer.poll(Duration.ofMillis(0))
    // returns nothing
    result = consumer.poll(Duration.ofMillis(0))
    // returns nothing
    result = consumer.poll(Duration.ofMillis(0))
    // deprecated poll also returns nothing
    result = consumer.poll(0)
    // returns nothing
    result = consumer.poll(0)
    // returns nothing
    result = consumer.poll(0)
    ...
    

    所以很明显“懒惰”不是问题所在。

    javadoc声明:

    唯一有效的方法是指定一个非零超时 投票 ,而不仅仅是任何非零值 1 不起作用。这表明内部发生了一些不确定的行为 投票 投票 seems to confirm this 通过各种调用来检查超时是否过期 投票

    因此,对于幼稚的方法,显然需要更长的超时时间(理想情况下也是如此) Long.MAX_VALUE

    2 回复  |  直到 7 年前
        1
  •  0
  •   Adam Kotwasinski    7 年前

    endOffsets

    所以,在伪代码中:

    long currentOffset = -1
    long endOffset = consumer.endOffset(partition)
    while (currentOffset < endOffset) {
      records = consumer.poll(NONTRIVIAL_TIMEOUT) // discussed in your answer
      currentOffset = records.offsets().max()
    }
    

    这样我们就避免了最终的非零挂断,因为我们总是确信有东西可以接收。

    另外,您可能需要设置 max.poll.records 之后

        2
  •  0
  •   Raman    7 年前

    实现这一点的唯一方法似乎是使用一些自我管理偏移量的附加逻辑。以下是伪代码:

    consumer.assign(all partitions)
    consumer.seekToBeginning(all partitions)
    // record the current ending offsets and poll until we get there
    endOffsets = consumer.endOffsets(all partitions)
    
    do
      result = consumer.poll(NONTRIVIAL_TIMEOUT)
      // onResult(result)
    while given any partition p, consumer.position(p) < endOffsets[p]
    

    以及在Kotlin的实现:

    val topicPartitions = consumer.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }
    consumer.assign(topicPartitions)
    consumer.seekToBeginning(consumer.assignment())
    
    val endOffsets = consumer.endOffsets(consumer.assignment())
    fun pendingMessages() = endOffsets.any { consumer.position(it.key) < it.value }
    
    do {
      records = consumer.poll(Duration.ofMillis(1000))
      onResult(records)
    } while(pendingMessages())
    

    现在可以将轮询持续时间设置为一个合理的值(例如1s),而不必担心丢失的消息,因为循环将继续,直到使用者达到循环开始时确定的结束偏移量。

    它也不能设置得太高(否则使用者在检索

    推荐文章