代码之家  ›  专栏  ›  技术社区  ›  Shubham

mqtt-kafka源连接器:有趣的字节字符

  •  3
  • Shubham  · 技术社区  · 6 年前

    我在跟随 https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 用于将mosquitto和kafka与mqtt源连接器连接。我正在将mosquitto发行商发送的数据发送到mosquitto订户和kafka消费者。但是Kafka Consumer的ConsumerRecord对象中的key和value字段有一些预先准备好的字节字符。 下面是我得到的代码片段和输出。

    mqttpublisher.py版

    while v3 < 3:
                 data3 = {
                          "time": str(datetime.datetime.now().time()),
                           "val": v3
                          }
                 client.publish("sensor/dist", json.dumps(data3), qos=2)
    
                 v3 += 1
                 time.sleep(2)
    

    mqtt订阅服务器.py

    def on_message_print(client, userdata, message):
                print(message.topic,message.payload)
    
    subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
    

    卡夫卡消费者.py

    consumer = KafkaConsumer('mqtt.',
                         bootstrap_servers=['localhost:9092'])
    
    for message in consumer:
       print(message)
    

    输出:mqttsubscriber.py

    传感器/距离b'“时间”:“12:44:30.817462”,“val”:0”

    传感器/距离b'“时间”:“12:44:32.820040”,“val”:1”

    传感器/距离b'“时间”:“12:44:34.822657”,“val”:2”

    输出:kafkaconsumer.py

    ConsumerRecord(topic='mqtt'',partition=0,offset=225,timestamp=1545117270870,timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16传感器/距离' , 值=b'\x00\x00\x00\x00\x02j“time”:“12:44:30.817462”,“val”:0 ,headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)

    ConsumerRecord(topic='mqtt'',partition=0,offset=226,timestamp=1545117272821,timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16传感器/距离' , 值=b'\x00\x00\x00\x00\x02j“time”:“12:44:32.820040”,“val”:1” ,headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)

    ConsumerRecord(topic='mqtt'',partition=0,offset=227,timestamp=1545117274824,timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16传感器/距离' , 值=b'\x00\x00\x00\x00\x02j“time”:“12:44:34.822657”,“val”:2” ,headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],checksum=none,serialized_key_size=17,serialized_value_size=43,serialized_header_size=62)

    是什么导致了卡夫卡消费者在上面预先准备了额外的字节? 事先谢谢。

    1 回复  |  直到 6 年前
        1
  •  0
  •   OneCricketeer Gabriele Mariotti    6 年前

    作为演示的一部分,您正在启动一个模式注册表

    启动Kafka连接和依赖项(Kafka、ZooKeeper、架构注册表):

    confluent start connect

    如果查看前5个字节,您将看到它们以0开头,然后再多4个字节表示一个整数。

    Schema Registry Wire Format 试着做一个 curl localhost:8081/subjects 看看它是否列出了你的主题名称 mqtt-key mqtt-value .

    如果不想使用AVRO,则需要配置和编辑Kafka Connect属性文件以使用不同的转换器,而不是使用 confluent start 除了让卡夫卡和动物园管理员跑

    或者,如果希望python对avro进行反序列化,可以参考github上的融合kafka python repo。

    推荐文章