代码之家  ›  专栏  ›  技术社区  ›  Varun Sharma

从Kaffa主题消费并添加到列表中

  •  0
  • Varun Sharma  · 技术社区  · 1 年前

    目前,我正试图从一个主题中消费,并附加来自一个主题的所有消息,但不幸的是,最后的列表没有打印任何内容。

    import json
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        'random_user_fetching',
        bootstrap_servers='localhost:9092',
        max_poll_records=100,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='my_grp' 
    )
    print(consumer)
    data_list=[]
    try:
        for message in consumer:
            data = message.value
            data_list.append(data)
            consumer.commit()
            
    except KeyboardInterrupt:
        pass
    finally:
        pass
    
    print(f"Processed messages: {len(data_list)}")
    print(data_list)
    consumer.close()
    

    输出如下 <kafka.consumer.group.KafkaConsumer object at 0x7f1c3379c9d0>

    有什么意见吗?

    1 回复  |  直到 1 年前
        1
  •  0
  •   TheHungryCub    1 年前

    代码看起来不错。你能在下面的循环中添加一些日志记录或打印语句吗?看看是否有任何异常。这可以帮助您确定在消息消费过程中是否存在任何问题。

    import json
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        'random_user_fetching',
        bootstrap_servers='localhost:9092',
        max_poll_records=100,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='my_grp' 
    )
    print(consumer)
    data_list = []
    
    try:
        for message in consumer:
            data = message.value
            data_list.append(data)
            print(f"Received message: {data}")
            consumer.commit()
            
    except KeyboardInterrupt:
        pass
    finally:
        pass
    
    print(f"Processed messages: {len(data_list)}")
    print(data_list)
    consumer.close()
    

    -----------更新---------

    尝试以下逻辑:循环将在消耗指定数量的消息后中断 (max_messages) 这样,一旦循环终止,您应该能够在循环外看到打印输出。根据调整条件

    import json
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
        'random_user_fetching',
        bootstrap_servers='localhost:9092',
        max_poll_records=100,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='my_grp' 
    )
    
    print(consumer)
    data_list = []
    max_messages = 100  # Set a limit on the number of messages to consume
    
    try:
        for message in consumer:
            data = message.value
            data_list.append(data)
            print(f"Received message: {data}")
            consumer.commit()
    
            # Break out of the loop after consuming a certain number of messages
            if len(data_list) >= max_messages:
                break
            
    except KeyboardInterrupt:
        pass
    finally:
        pass
    
    print(f"Processed messages: {len(data_list)}")
    print(data_list)
    consumer.close()