代码之家  ›  专栏  ›  技术社区  ›  AVIK DUTTA

Spark map作业异常:对象不可序列化

  •  0
  • AVIK DUTTA  · 技术社区  · 7 年前

    Ws类

    package MPD1
        import java.util.UUID.randomUUID;
    class Ws (typ:String,c:Float,t:Float,r:Float,a:Float,s:Float,cu:Float) {
      var styp=typ;
      var uid=randomUUID().toString;
      var cost :Float =c;
      var time :Float  =t;
      var reliability :Float =r;
      var availability :Float =a;
      var security: Float =s
      var customAttributes :Float=cu;
      def Ws(typ:String,c:Float,t:Float,r:Float,a:Float,s:Float){
        this.styp=typ;
        this.uid=randomUUID().toString;
        this.cost =c;
        this.time =t;
        this.reliability =r;
        this.availability =a;
        this.security=s;
        this.customAttributes=Float.MaxValue;
      }
    
      def display()={
        println("STyp : "+styp+"| UID : "+uid+"|"+"cost :"+cost+"|"+"time :"+time+"|"+"reliability :"+reliability+"|"+"availability :"+availability+"|"+"security :"+security+"|"+"customAttributes :"+customAttributes+"|");
      }
    
    }
    

    主要功能是创建Ws类的对象,然后对其进行Parralize,以便可以完成进一步的映射生成作业

    package MPD1
    
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.collection.mutable.ListBuffer
    import java.lang.Exception
    object test {
      def main(args: Array[String]) {
        try {
          val conf = new SparkConf().setAppName("Simple Application")
          val sc = new SparkContext(conf)
    
          println(" \nHello World from Scala!\n")
          var wsArray = new Array[MPD1.Ws](10000)
          var i: Int = 0;
          val filename = "/home/nazi/Downloads/file.csv";
    
          var lines = sc.textFile(filename)
    
          var rddWsAll=lines.map(f=>Functions.createdDS(f));
          rddWsAll.collect().take(10).foreach(f=>f.display())
    
          for (line <- lines) {
            var tempStr = line.split(",");
            println(tempStr(0).toString())
            var wsTemp: Ws = new Ws(tempStr(0).toString(), tempStr(2).toFloat, tempStr(3).toFloat, tempStr(4).toFloat, tempStr(5).toFloat, tempStr(6).toFloat, tempStr(7).toFloat);
    
            wsArray(i) = wsTemp;
            wsTemp.display();
            i = i + 1;
          }
    
        }
        catch {
          case e: javax.script.ScriptException => e.printStackTrace
        }
    
      }
    
    }
    
    
    package MPD1
    
    object Functions {
     def createdDS(f:String):Ws={
       var tempStr = f.split(",");
       var wsTemp: Ws = new Ws(tempStr(0).toString(), tempStr(2).toFloat, tempStr(3).toFloat, tempStr(4).toFloat, tempStr(5).toFloat, tempStr(6).toFloat, tempStr(7).toFloat);
    
       return wsTemp
    
     }
    
    }
    

    我收到一个错误:

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: MPD1.Ws
    Serialization stack:
        - object not serializable (class: MPD1.Ws, value: MPD1.Ws@47acf13d)
        - element of array (index: 0)
        - array (class [LMPD1.Ws;, size 10000)
        ....
    

    。。。。。

    我在map函数中使用Ws类 var rddWsAll=lines.map(f=>Functions.createdDS(f)); 正在导致问题。但为什么不允许我们用类创建RDD,或者只允许我们用字符串创建RDD

    我正在使用 scalaVersion := "2.11.8" spark version :="2.2.1"

    1 回复  |  直到 7 年前
        1
  •  1
  •   Gyanendra Dwivedi    7 年前

    看起来像是班级 Ws 不可序列化。将序列化为

    class Ws extends java.io.Serializable (...
    

    请注意,RDD在默认情况下是可序列化的。

    推荐文章