我在跟随
https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example
用于将mosquitto和kafka与mqtt源连接器连接。我正在将mosquitto发行商发送的数据发送到mosquitto订户和kafka消费者。但是Kafka Consumer的ConsumerRecord对象中的key和value字段有一些预先准备好的字节字符。
下面是我得到的代码片段和输出。
mqttpublisher.py版
while v3 < 3:
data3 = {
"time": str(datetime.datetime.now().time()),
"val": v3
}
client.publish("sensor/dist", json.dumps(data3), qos=2)
v3 += 1
time.sleep(2)
mqtt订阅服务器.py
def on_message_print(client, userdata, message):
print(message.topic,message.payload)
subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
卡夫卡消费者.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message)
输出:mqttsubscriber.py
传感器/距离b'“时间”:“12:44:30.817462”,“val”:0”
传感器/距离b'“时间”:“12:44:32.820040”,“val”:1”
传感器/距离b'“时间”:“12:44:34.822657”,“val”:2”
输出:kafkaconsumer.py
ConsumerRecord(topic='mqtt'',partition=0,offset=225,timestamp=1545117270870,timestamp_type=0,
key=b'\x00\x00\x00\x00\x01\x16传感器/距离'
,
值=b'\x00\x00\x00\x00\x02j“time”:“12:44:30.817462”,“val”:0
,headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)
ConsumerRecord(topic='mqtt'',partition=0,offset=226,timestamp=1545117272821,timestamp_type=0,
key=b'\x00\x00\x00\x00\x01\x16传感器/距离'
,
值=b'\x00\x00\x00\x00\x02j“time”:“12:44:32.820040”,“val”:1”
,headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)
ConsumerRecord(topic='mqtt'',partition=0,offset=227,timestamp=1545117274824,timestamp_type=0,
key=b'\x00\x00\x00\x00\x01\x16传感器/距离'
,
值=b'\x00\x00\x00\x00\x02j“time”:“12:44:34.822657”,“val”:2”
,headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)
是什么导致了卡夫卡消费者在上面预先准备了额外的字节?
事先谢谢。