代码之家  ›  专栏  ›  技术社区  ›  Jake

Scala/Spark:将数据帧中的零膨胀数据转换为libsvm

  •  0
  • Jake  · 技术社区  · 8 年前

    我对scala很陌生(通常我在R中这样做)

    我导入了一个大的数据帧(2000多列,100000多行),它是零膨胀的。

    将数据转换为libsvm格式

    步骤

    1. 确保要素列设置为DoubleType,并且目标为Int
    2. 遍历每一行,保留每个值>一个数组中的0及其列在另一个数组中的索引
    3. 转换为RDD[标签点]

    我被困在3(但可能),因为我做错了第二步。

    主要功能:

    @Test
    def testSpark(): Unit =
    {
    try
    {
    
      var mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
    
    
      val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
    
      val indexer = new StringIndexer()
        .setInputCol("Majors_Final")
        .setOutputCol("Majors_Final_Indexed")
      val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
      val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
    
    
    
      //only doubles accepted by sparse vector, so that's what we filter for
      val fieldSeq: scala.collection.Seq[StructField] = schema.fields.toSeq.filter(f => f.dataType == DoubleType)
    
      val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
    
    
      val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
    
    
      assertTrue(true)
    }
    catch
    {
      case ex: Exception =>
      {
    
        println(s"There has been an Exception. Message is ${ex.getMessage} and ${ex}")
        fail()
      }
      }
    }
    

     @throws(classOf[Exception])
    private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label:Int): LabeledPoint =
    {
      try
      {
        val values: Map[String, Double] = rowIn.getValuesMap(fieldNameSeq)
    
        val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
        val rowValuesItr: Iterable[Double] = sortedValuesMap.values
    
        var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
        var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
        var currentPosition: Int = 0
        rowValuesItr.foreach
        {
          kv =>
            if (kv > 0)
            {
              valuesArray += kv;
              positionsArray += currentPosition;
            }
            currentPosition = currentPosition + 1;
        }
    
        val lp:LabeledPoint = new LabeledPoint(label,  org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray, valuesArray.toArray))
    
        return lp
    
      }
      catch
      {
        case ex: Exception =>
        {
          throw new Exception(ex)
        }
      }
    }
    

    问题

    val labeled:DataFrame = mDFFinal.map(row => convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"))).toDF()
    

    火花试验。scala:285:错误:无法找到数据集中存储的类型的编码器。导入spark支持基本类型(Int、String等)和产品类型(case类)。隐式支持seri 在未来的版本中将添加其他类型。 [信息]val标记为:DataFrame=mDFFinal。映射(row=>convertRowToLabeledPoint(row,fieldNameSeq,row.getAs(“Majors\u Final\u index”))。toDF()

    1 回复  |  直到 8 年前
        1
  •  0
  •   Jake    8 年前

    好的,我跳过了数据帧,创建了一个标签点数组,可以很容易地将其转换为RDD。剩下的很简单。

      val mDF: DataFrame = spark.read.option("header", "true").option("inferSchema", "true").csv("src/test/resources/knimeMergedTRimmedVariables.csv")
      val mDFTyped = castAllTypedColumnsTo(mDF, IntegerType, DoubleType)
    
      val indexer = new StringIndexer()
        .setInputCol("Majors_Final")
        .setOutputCol("Majors_Final_Indexed")
      val mDFTypedIndexed = indexer.fit(mDFTyped).transform(mDFTyped)
      val mDFFinal = castColumnTo(mDFTypedIndexed,"Majors_Final_Indexed", IntegerType)
    
      mDFFinal.show()
      //only doubles accepted by sparse vector, so that's what we filter for
      val fieldSeq: scala.collection.Seq[StructField] = mDFFinal.schema.fields.toSeq.filter(f => f.dataType == DoubleType)
      val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
    
      var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()
    
      mDFFinal.collect().foreach
      {
    
        row => positionsArray+=convertRowToLabeledPoint(row,fieldNameSeq,row.getAs("Majors_Final_Indexed"));
    
      }
    
      val mRdd:RDD[LabeledPoint]= spark.sparkContext.parallelize(positionsArray.toSeq)
    
      MLUtils.saveAsLibSVMFile(mRdd, "./output/libsvm")