代码之家  ›  专栏  ›  技术社区  ›  Brian

enocder问题-spark结构化流媒体-仅在repl中工作

  •  0
  • Brian  · 技术社区  · 7 年前

    我有一个使用schema reg摄取和反序列化kafka avro消息的工作过程。它在repl中工作得很好,但是当我尝试编译时

    Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    [error]       .map(x => {
    

    我不确定是否需要修改对象,但如果repl工作正常,为什么需要修改。

    object AgentDeserializerWrapper {
          val props = new Properties()
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
          props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
          val vProps = new kafka.utils.VerifiableProperties(props)
          val deser = new KafkaAvroDecoder(vProps)
          val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueNameAgentRead)
          val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
        }
    
        case class DeserializedFromKafkaRecord( value: String)
    
        import spark.implicits._
    
        val agentStringDF = spark
          .readStream
          .format("kafka")
          .option("subscribe", "agent")
          .options(kafkaParams)
          .load()
          .map(x => {
            DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
          })
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Quentin Geff    7 年前

    添加为[DeserializedFromKafCareCord],以便静态键入数据集:

    val agentStringDF = spark
          .readStream
          .format("kafka")
          .option("subscribe", "agent")
          .options(kafkaParams)
          .load()
          .as[DeserializedFromKafkaRecord]
          .map(x => {
            DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
          })
    
    推荐文章