代码之家  ›  专栏  ›  技术社区  ›  Souhaib Guitouni

将Spark结构化流媒体与融合模式注册表集成

  •  25
  • Souhaib Guitouni  · 技术社区  · 8 年前

    我在Spark结构化流媒体中使用卡夫卡源来接收合流编码的Avro记录。我打算使用Confluent Schema Registry,但与spark结构化流媒体的集成似乎是不可能的。

    我已经看到了这个问题,但无法将其与合流模式注册表一起使用。 Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

    9 回复  |  直到 7 年前
        1
  •  0
  •   RvdV    3 年前

    我花了几个月的时间阅读源代码并进行测试。简而言之,Spark只能处理字符串和二进制序列化。您必须手动反序列化数据。在spark中,创建汇合的rest服务对象以获取模式。使用Avro解析器将响应对象中的模式字符串转换为Avro模式。接下来,正常阅读卡夫卡主题。然后使用合流的KafkaAvroDeSerializer映射二进制类型的“value”列。我强烈建议进入这些类的源代码,因为这里有很多内容,所以为了简洁起见,我将省略许多细节。

    //Used Confluent version 3.2.2 to write this. 
    import io.confluent.kafka.schemaregistry.client.rest.RestService
    import io.confluent.kafka.serializers.KafkaAvroDeserializer
    import org.apache.avro.Schema
    
    case class DeserializedFromKafkaRecord(key: String, value: String)
    
    val schemaRegistryURL = "http://127.0.0.1:8081"
    
    val topicName = "Schema-Registry-Example-topic1"
    val subjectValueName = topicName + "-value"
    
    //create RestService object
    val restService = new RestService(schemaRegistryURL)
    
    //.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
    val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
    
    //Use Avro parsing classes to get Avro Schema
    val parser = new Schema.Parser
    val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
    
    //key schema is typically just string but you can do the same process for the key as the value
    val keySchemaString = "\"string\""
    val keySchema = parser.parse(keySchemaString)
    
    //Create a map with the Schema registry url.
    //This is the only Required configuration for Confluent's KafkaAvroDeserializer.
    val props = Map("schema.registry.url" -> schemaRegistryURL)
    
    //Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
    var keyDeserializer: KafkaAvroDeserializer = null
    var valueDeserializer: KafkaAvroDeserializer = null
    
    //Create structured streaming DF to read from the topic.
    val rawTopicMessageDF = sql.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "127.0.0.1:9092")
      .option("subscribe", topicName)
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 20)  //remove for prod
      .load()
    
    //instantiate the SerDe classes if not already, then deserialize!
    val deserializedTopicMessageDS = rawTopicMessageDF.map{
      row =>
        if (keyDeserializer == null) {
          keyDeserializer = new KafkaAvroDeserializer
          keyDeserializer.configure(props.asJava, true)  //isKey = true
        }
        if (valueDeserializer == null) {
          valueDeserializer = new KafkaAvroDeserializer
          valueDeserializer.configure(props.asJava, false) //isKey = false
        }
    
        //Pass the Avro schema.
        val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
        val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString
    
        DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
    }
    
    val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start()
    
    推荐文章