代码看起来不错。你能在下面的循环中添加一些日志记录或打印语句吗?看看是否有任何异常。这可以帮助您确定在消息消费过程中是否存在任何问题。
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'random_user_fetching',
bootstrap_servers='localhost:9092',
max_poll_records=100,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_grp'
)
print(consumer)
data_list = []
try:
for message in consumer:
data = message.value
data_list.append(data)
print(f"Received message: {data}")
consumer.commit()
except KeyboardInterrupt:
pass
finally:
pass
print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()
-----------更新---------
尝试以下逻辑:循环将在消耗指定数量的消息后中断
(max_messages)
这样,一旦循环终止,您应该能够在循环外看到打印输出。根据调整条件
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'random_user_fetching',
bootstrap_servers='localhost:9092',
max_poll_records=100,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_grp'
)
print(consumer)
data_list = []
max_messages = 100
try:
for message in consumer:
data = message.value
data_list.append(data)
print(f"Received message: {data}")
consumer.commit()
if len(data_list) >= max_messages:
break
except KeyboardInterrupt:
pass
finally:
pass
print(f"Processed messages: {len(data_list)}")
print(data_list)
consumer.close()