代码之家  ›  专栏  ›  技术社区  ›  florins

Kafka:监视分配给分区主题的消费者的延迟

  •  1
  • florins  · 技术社区  · 9 年前

    我正在使用Kafka 0.9.1新的消费者API。使用者被手动分配到分区。对于这个消费者,我希望看到它的进步(即滞后)。由于我添加了group-id消费者教程作为属性,所以我假设我可以使用命令

    bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092

    (如此处所述 http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client )

    不幸的是,我的消费者组详细信息没有使用上述命令显示。因此,我无法监控消费者的进度(这是滞后的)。如何监控上述场景中的延迟(手动分配分区)?

    代码是:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "consumer-tutorial");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    
            String topic = "my-topic";
            TopicPartition topicPartition = new TopicPartition(topic, 0);
            consumer.assign(Arrays.asList(topicPartition));
            consumer.seekToBeginning(topicPartition);
    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records)
         System.out.println(record.offset() + ": " + record.value());
      consumer.commitSynch();
      }
    } finally {
      consumer.close();
    }
    
    5 回复  |  直到 9 年前
        1
  •  5
  •   Otis Gospodnetic    9 年前

    如果您不想编写代码来获取此信息,也不想临时运行工具/shell脚本之类的命令,那么有N个工具可以捕获Kafka指标,包括Consumer Lag。在我的脑海中: Burrow SPM for Kafka 做好工作这里有一些关于Kafka抵消、消费者滞后的背景信息,以及一些源自Kafka通过JMX公开的指标。高温高压。

        2
  •  1
  •   Vladimir Nabokov    5 年前

    如果你对JMX暴露的消费群体滞后感兴趣,我写的代理如下: https://github.com/peterkovgan/kafka9.offsets

    您可以在某些Kafka节点上运行此代理,并向外部读取器公开偏移滞后统计信息。

    下面是您如何在Telegraf中使用此代理的示例 ( https://influxdata.com/time-series-platform/telegraf/ ).

    最后(结合例如telegraf、influxdb和grafana),您可以看到几个消费群体的偏移滞后的漂亮图形。

        3
  •  0
  •   Kamal Chandraprakash    9 年前

    kafka-consumer-groups.sh 命令,您的组名不正确 --group consumer-tutorial 消费者教程组

        4
  •  0
  •   Stanley Ambrose    4 年前

    代码中的问题与手动将使用者分配给主题分区直接相关。

    您可以在 group.id 属性,但是,组ID仅在您通过 KafkaConsumer.subscribe() 应用程序编程接口。在您的示例中,您正在使用 .assign() 方法,它手动将客户端附加到指定的主题分区对,而不使用底层消费者组原语。正是由于这个原因,你无法看到消费者的滞后。像Burrow这样的工具在这种情况下不起作用,因为它们将查询消费者组的补偿,而消费者组不存在。

    有两个选项可供选择:

    1. 正确使用消费者组功能,使用 subscribe() 应用程序编程接口。这是Kafka的主要用例。然而 seekToBeginning() 在这种情况下也不起作用,因为补偿将完全由消费者群体管理。
    2. 完全删除使用者组,并手动管理分区分配和偏移。这给了你最大的灵活性,但需要做很多工作,你可能会发现自己正在重新发明轮子。大多数人不会走这条路,除非卡夫卡的消费群体特征不适合你的需求。

    选择将完全取决于您的用例。对于传统的流处理,#1是惯用的方法。这就是卡夫卡的设计目的#2意味着您知道自己在做什么,并将所有集团管理职责转移到您的应用程序上。

    注:卡夫卡没有一种“部分”模式,你可以做一些团队管理,卡夫卡可以做其余的工作。它要么全在,要么根本没有。

        5
  •  0
  •   Nafees Ahmed    4 年前

    您可以使用简单而强大的延迟监控工具,称为

    普罗米修斯·卡夫卡消费集团出口商

    请参阅以下url:

    https://github.com/braedon/prometheus-kafka-consumer-group-exporter

    安装完成后,运行以下命令在您所需的端口Prometheus Kafka Consumer Group Exporter上导出Consumer矩阵

    /usr/bin/python3/usr/local/bin/prometheus kafka消费群导出器-p PORT-b kafka_CLUSTER_IP_PORT

    运行以上命令后,验证http url YOUR-SERVER-IP:PORT上的数据,如127.0.0.1:9208

    现在,您可以将任何JMX刮板用于仪表板和警报系统。我正在使用普罗米修斯;格拉法纳

    这可以在任何共享服务器上运行,如[kafka broker、zookeeper server、prometheus server或任何],因为它对系统资源的开销非常低。