代码之家  ›  专栏  ›  技术社区  ›  Brian

Spark Kafka任务不可序列化

  •  1
  • Brian  · 技术社区  · 7 年前

    在尝试将spark应用程序分解成类并使用Try-also时,我遇到了无法序列化的任务。

    代码从S3获取schema,从Kafka(主题是带有schema reg的avro格式)进行流式读取。

    我试过用这个班而不是用这个班。。。但在这两种情况下,我都得到了一个关于闭包的serz错误。。我想当它试图塞尔兹的时候有东西被拉进来了。这个错误总是困扰着我。。绕来绕去真是太痛苦了。如果有人能告诉我如何避免这个问题,那就太棒了。这些Java类有时似乎有更多的问题。

       import java.util.Properties
    import com.databricks.spark.avro._
    import io.confluent.kafka.schemaregistry.client.rest.RestService
    import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDecoder, KafkaAvroDeserializerConfig}
    import org.apache.avro.Schema
    import org.apache.avro.generic.GenericData
    import org.apache.spark.sql.functions.{col, from_json}
    import org.apache.spark.sql.streaming.StreamingQuery
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import scala.util.{Failure, Success, Try}
    
    case class DeserializedFromKafkaRecord(value: String)
    
    class sparkS3() extends Serializable {
      def readpeopleSchemaDF(spark: SparkSession, topicSchemaLocation: String): scala.util.Try[StructType] = {
        val read: scala.util.Try[StructType] = Try(
          spark
            .read
            .option("header", "true")
            .format("com.databricks.spark.avro")
            .load(topicSchemaLocation)
            .schema
        )
        read
      }
    
      def writeTopicDF(peopleDFstream: DataFrame,
                       peopleDFstreamCheckpoint: String,
                       peopleDFstreamLocation: String): scala.util.Try[StreamingQuery] = {
        val write: scala.util.Try[StreamingQuery] = Try(
          peopleDFstream
            .writeStream
            .option("checkpointLocation", peopleDFstreamCheckpoint)
            .format("com.databricks.spark.avro")
            .option("path", peopleDFstreamLocation)
            .start()
        )
        write
      }
    }
    
    class sparkKafka() extends Serializable {
    
      def readpeopleTopicDF(spark: SparkSession, topicSchema: StructType): scala.util.Try[DataFrame] = {
        val brokers = "URL:9092"
        val schemaRegistryURL = "URL:8081"
        val kafkaParams = Map[String, String](
          "kafka.bootstrap.servers" -> brokers,
          "key.deserializer" -> "KafkaAvroDeserializer",
          "value.deserializer" -> "KafkaAvroDeserializer",
          "group.id" -> "structured-kafka",
          //"auto.offset.reset" -> "latest",
          "failOnDataLoss" -> "false",
          "schema.registry.url" -> schemaRegistryURL
        )
        var kafkaTopic = "people"
    
        object avroDeserializerWrapper {
          val props = new Properties()
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
          props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
          val vProps = new kafka.utils.VerifiableProperties(props)
          val deser = new KafkaAvroDecoder(vProps)
          val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(kafkaTopic + "-value")
          val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
        }
        import spark.implicits._
    
        val read: scala.util.Try[DataFrame] = Try(
          {
            val peopleStringDF = {
              spark
                .readStream
                .format("kafka")
                .option("subscribe", kafkaTopic)
                .option("kafka.bootstrap.servers", brokers)
                .options(kafkaParams)
                .load()
                .map(x => {
                  DeserializedFromKafkaRecord(avroDeserializerWrapper.deser.fromBytes(
                    x
                      .getAs[Array[Byte]]("value"), avroDeserializerWrapper.messageSchema)
                    .asInstanceOf[GenericData.Record].toString)
                })
            }
            val peopleJsonDF = {
              peopleStringDF
                .select(
                  from_json(col("value")
                    .cast("string"), topicSchema)
                    .alias("people"))
            }
            peopleJsonDF.select("people.*")
          })
        read
      }
    }
    
    
    object peopleDataLakePreprocStage1 {
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession
          .builder
          .appName("peoplePreProcConsumerStage1")
          .getOrCreate()
    
        val topicSchemaLocation = "URL"
        val topicDFstreamCheckpoint = "URL"
        val topicDFstreamLocation = "URL"
    
        val sparkKafka = new sparkKafka()
        val sparkS3 = new sparkS3()
    
    
        sparkS3.readpepleSchemaDF(spark, topicSchemaLocation) match {
          case Success(topicSchema) => {
            sparkKafka.readpeopletTopicDF(spark, topicSchema) match {
              case Success(df) => {
                sparkS3.writeTopicDF(df, topicDFstreamCheckpoint, topicDFstreamLocation) match {
                  case Success(query) => {
                    query.awaitTermination()
                  }
                  case Failure(f) => println(f)
                }
              }
              case Failure(f) => println(f)
            }
          }
          case Failure(f) => println(f)
        }
      }
    }
    

    这里有个错误

    java.lang.IllegalStateException: s3a://... when compacting batch 9 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
    18/08/10 13:04:07 ERROR MicroBatchExecution: Query [id = 2876ded4-f223-40c4-8634-0c8feec94bf6, runId = 9b9a1347-7a80-4295-bb6e-ff2de18eeaf4] terminated with error
    org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
    Caused by: java.lang.IllegalStateException: s3a://..../_spark_metadata/0 doesn't exist when compacting batch 9 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213)
        ... 17 more
    
    2 回复  |  直到 7 年前
        1
  •  1
  •   Brian    7 年前

    这个决议是两件事中的一件。。在类上扩展序列化,在同一命名空间中分离文件。我已经更新了上面的代码以反映

        2
  •  0
  •   Richard Fuller    7 年前

    就一刀。课堂上 斯巴克3 你用“var”来定义这些值-你是说“val”吗?

    推荐文章