代码之家  ›  专栏  ›  技术社区  ›  Tom Lous

奇怪的性能问题Spark LSH MinHash approxSimilarityJoin

  •  1
  • Tom Lous  · 技术社区  · 7 年前

    在(内部)连接之后,数据集有点倾斜,但是每次一个或多个任务需要花费大量时间才能完成。

    sparkui-1

    正如你所看到的,每个任务的中间值是6m s(我在一个较小的源数据集上运行它进行测试),但是一个任务需要10分钟,它几乎不使用任何CPU周期,它实际上是连接数据的,但是速度太慢了。 下一个最慢的任务将在14秒内运行,实际溢出到磁盘的记录将增加4倍。

    如果你看 sparkuisql

    分解哈希表:

    modelDataset.select(
          struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
    

     override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
        val xSet = x.toSparse.indices.toSet
        val ySet = y.toSparse.indices.toSet
        val intersectionSize = xSet.intersect(ySet).size.toDouble
        val unionSize = xSet.size + ySet.size - intersectionSize
        assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
        1 - intersectionSize / unionSize
      }
    

    已处理数据集的联接:

    // Do a hash join on where the exploded hash values are equal.
    val joinedDataset = explodedA.join(explodedB, explodeCols)
      .drop(explodeCols: _*).distinct()
    
    // Add a new column to store the distance of the two rows.
    val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
    val joinedDatasetWithDist = joinedDataset.select(col("*"),
      distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
    )
    
    // Filter the joined datasets where the distance are smaller than the threshold.
    joinedDatasetWithDist.filter(col(distCol) < threshold)
    

    我尝试过缓存、重新分区甚至启用 spark.speculation ,都没用。

    数据由必须匹配的带状疱疹地址文本组成: 53536, Evansville, WI => 53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi

    这给出了相当精确的结果,但可能是连接歪斜的原因。

    我的问题是:

    • 我怎样才能在不失去准确性的情况下防止这种偏斜呢?
    • 有没有更好的方法在规模上做到这一点?(我无法将数百万条记录与位置数据集中的所有记录进行比较)
    0 回复  |  直到 7 年前
    推荐文章