代码之家  ›  专栏  ›  技术社区  ›  Larsenal

预排序输入的Spark特征向量变换

  •  1
  • Larsenal  · 技术社区  · 9 年前

    HDFS上的制表符分隔文件中有一些数据,如下所示:

    label | user_id | feature
    ------------------------------
      pos | 111     | www.abc.com
      pos | 111     | www.xyz.com
      pos | 111     | Firefox
      pos | 222     | www.example.com
      pos | 222     | www.xyz.com
      pos | 222     | IE
      neg | 333     | www.jkl.com
      neg | 333     | www.xyz.com
      neg | 333     | Chrome
    

    我需要对其进行转换,为每个user_id创建一个特征向量,以训练 org.apache.spark.ml.classification.NaiveBayes 模型

    我目前的做法基本如下:

    1. 将原始数据加载到DataFrame
    2. 使用StringIndexer索引功能
    3. 转到RDD和Group by user_id,并将特征索引映射到稀疏向量中。

    关键是 数据已按user_id预先排序 。利用这一点的最佳方法是什么?想想可能会发生多少不必要的工作,我很痛苦。

    如果一点代码有助于理解我当前的方法,下面是地图的要点:

    val featurization = (vals: (String,Iterable[Row])) => {
      // create a Seq of all the feature indices
      // Note: the indexing was done in a previous step not shown
      val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq
    
      // create the sparse vector
      val featureVector = Vectors.sparse(maxIndex, seq)
    
      // convert the string label into a Double
      val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0
    
      (label, vals._1, featureVector)
    }
    
    d.rdd
      .groupBy(_.getString(1))
      .map(featurization)
      .toDF("label","user_id","features")
    
    1 回复  |  直到 9 年前
        1
  •  1
  •   Community CDub    8 年前

    让我们从开始 your other question

    如果我在磁盘上的数据保证按密钥进行预排序,该密钥将用于组聚合或缩减,那么Spark有没有办法利用这一点?

    这要看情况。如果您应用的操作可以从映射端聚合中受益,那么您可以通过预排序数据而无需对代码进行任何进一步干预,从而获得很多好处。共享同一密钥的数据应该位于相同的分区上,并且可以在洗牌之前在本地聚合。

    不幸的是,在这种特殊情况下,它不会有太大帮助。即使启用地图端聚合( groupBy(Key) 不使用is,所以您需要自定义实现)或聚合特征向量(您可以在我的回答中找到一些示例 How to define a custom aggregation function to sum a column of Vectors? )没有太多收获。您可以在这里和那里保存一些工作,但仍然需要在节点之间传输所有索引。

    如果你想获得更多,你就必须做更多的工作。我可以看到两种利用现有订单的基本方法:

    1. 使用自定义Hadoop输入格式只生成完整的记录(标签、id、所有特性),而不是逐行读取数据。如果您的数据每个id有固定的行数,您甚至可以尝试使用 NLineInputFormat 并应用 mapPartitions 以在之后聚合记录。

      这绝对是一个更详细的解决方案,但不需要在Spark中进行额外的洗牌。

    2. 照常读取数据,但使用自定义分区器 groupBy 。据我所知,使用 rangePartitioner 应该可以正常工作,但要确保您可以尝试以下步骤:

      • 使用 mapPartitionsWithIndex 查找每个分区的最小/最大id。
      • 创建保持最小值的分区器<=ids<最大电流( 第i个 )分区并将最大值推送到分区 i+1
      • 使用此分区器 groupBy(键)

      这可能是一个更友好的解决方案,但至少需要一些洗牌。如果要移动的预期记录数很低(每个分区<<#个记录),您甚至可以使用 映射分区 broadcast *尽管在实践中进行分区可能更有用,也更便宜。


    *您可以使用类似的方法: https://stackoverflow.com/a/33072089/1560062

    推荐文章