代码之家  ›  专栏  ›  技术社区  ›  GihanDB

Spark流-“utf8”编解码器无法解码字节

  •  0
  • GihanDB  · 技术社区  · 6 年前

    我想连接卡夫卡流到火花流。我用了下面的代码。

    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1]) 
    

    返回s.decode('utf-8') 文件“/usr/lib64/python2.7/encodings/utf\u 8.py”,第16行,解码 UnicodeDecodeError:“utf8”编解码器无法解码位置57-58中的字节:无效的连续字节

    我是否需要指定卡夫卡使用Avro,上面是否有错误?如果是这样的话,我该怎么说呢?

    2 回复  |  直到 6 年前
        1
  •  2
  •   OneCricketeer Gabriele Mariotti    6 年前

    对,问题是流的反序列化。你可以用 confluent-kafka-python 库并指定 值解码器 在:

    from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient`
    from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
    
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, valueDecoder=MessageSerializer.decode_message)`
    

    https://stackoverflow.com/a/49179186/6336337

        2
  •  0
  •   maxime G    6 年前

    是的,你应该指定它。

    创建流:

    final JavaInputDStream<ConsumerRecord<String, avroType>> stream =
                    KafkaUtils.createDirectStream(
                            jssc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.Subscribe(topics, kafkaParams));
    

    kafkaParams.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
            kafkaParams.put("value.deserializer", SpecificAvroDeserializer.class);
    
    推荐文章