代码之家  ›  专栏  ›  技术社区  ›  Suresh

无法序列化foreachRDD中的SparkContext

  •  2
  • Suresh  · 技术社区  · 8 年前

    我正在尝试将流数据从卡夫卡保存到卡桑德拉。我能够读取和解析数据,但当我调用下面的行来保存数据时,我会得到一个 Task not Serializable 例外我的类正在扩展可序列化,但不知道为什么我会看到这个错误,在谷歌搜索了3个小时后,没有得到太多帮助,有人能给出任何指针吗?

    val collection = sc.parallelize(Seq((obj.id, obj.data)))
    collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))` 
    
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import com.datastax.spark.connector._
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.rdd.RDD
    import com.datastax.spark.connector.SomeColumns
    import java.util.Formatter.DateTime
    
    object StreamProcessor extends Serializable {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor")
          .set("spark.cassandra.connection.host", "127.0.0.1")
    
        val sc = new SparkContext(sparkConf)
    
        val ssc = new StreamingContext(sc, Seconds(2))
    
        val sqlContext = new SQLContext(sc)
    
        val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
    
        val topics = args.toSet
    
        val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topics)
    
        stream.foreachRDD { rdd =>
    
          if (!rdd.isEmpty()) {
            try {
    
              rdd.foreachPartition { iter =>
                iter.foreach {
                  case (key, msg) =>
    
                    val obj = msgParseMaster(msg)
    
                    val collection = sc.parallelize(Seq((obj.id, obj.data)))
                    collection.saveToCassandra("testKS", "testTable ", SomeColumns("id", "data"))
    
                }
              }
    
            }
    
          }
        }
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
      import org.json4s._
      import org.json4s.native.JsonMethods._
      case class wordCount(id: Long, data: String) extends serializable
      implicit val formats = DefaultFormats
      def msgParseMaster(msg: String): wordCount = {
        val m = parse(msg).extract[wordCount]
        return m
    
      }
    
    }
    

    我得到了

    org.apache.spark网站。SparkException:任务不可序列化

    下面是完整日志

    16/08/06 10:24:52 ERROR JobScheduler:运行作业流作业1470504292000 ms.0时出错 org.apache.spark网站。SparkException:任务不可序列化 位于org.apache.spark.util.ClosureCleaner$.ensureSerializable(Closure-Cleaner.scala:304) 位于org.apache.spark.util.Closure Cleaner$.org$apache$spark$util$Closure清理器$$clean(Closure清洁器.scala:294) 位于org.apache.spark.util.ClosureCleaner$.clean(ClosureLeaner.scala:122) 位于org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 位于org.apache.spark.rdd.rdd$$anonfun$foreachPartition$1.apply(rdd.scala:919) 位于org.apache.spark.rdd.rdd$$anonfun$foreachPartition$1.apply(rdd.scala:918) 位于org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOpertionScope.scala:111) 位于org.apache.spark.rdd.rdd.withScope(rdd.scala:316) 位于org.apache.spark.rdd.rdd.foreachPartition(rdd.scala:918) 在

    2 回复  |  直到 2 年前
        1
  •  2
  •   Yuval Itzchakov    8 年前

    SparkContext 不可序列化,不能在内部使用 foreachRDD

    stream
      .map { 
        case (_, msg) => 
          val result = msgParseMaster(msg)
          (result.id, result.data)
       }
      .foreachRDD(rdd => if (!rdd.isEmpty)
                           rdd.saveToCassandra("testKS",
                                               "testTable",
                                                SomeColumns("id", "data")))
    
        2
  •  2
  •   Tzach Zohar    8 年前

    你不能打电话 sc.parallelize foreachPartition -该函数必须序列化并发送给每个执行器,并且 SparkContext 是(故意)不可序列化的(它应该只驻留在Driver应用程序中,而不是执行程序中)。