代码之家  ›  专栏  ›  技术社区  ›  secretive

使用PySpark计算Jaccard距离时的对数少于其应有的对数

  •  0
  • secretive  · 技术社区  · 4 年前

    我试图用SparseVector的形式计算某些ID之间的Jaccard距离。

    from pyspark.ml.feature import MinHashLSH
    from pyspark.ml.linalg import Vectors
    from pyspark.sql.functions import col
    from pyspark.sql.functions import monotonically_increasing_id
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext
    import pyspark.sql.functions as F
    from pyspark.mllib.linalg import Vectors, VectorUDT
    from pyspark.sql.functions import udf
    sqlContext = SQLContext(sc)
    df = sqlContext.read.load("path")
    par = udf(lambda s: Vectors.parse(s), VectorUDT())
    d = df_filtered.select("id",par("vect"))
    from pyspark.ml.linalg import VectorUDT as VectorUDTML
    as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDTML())
    result = d.withColumn("<lambda>(vect)", as_ml("<lambda>(vect)"))
    mh = MinHashLSH(inputCol="<lambda>(vect)", outputCol="hashes", seed=12345, numHashTables=15)
    model = mh.fit(df)
    a = model.transform(df)
    
    jd = model.approxSimilarityJoin(a, a,1.0  , distCol="JaccardDistance").select(
         col("datasetA.id1").alias("idA"),
         col("datasetB.id1").alias("idB"),
         col("JaccardDistance"))
    

    df有两列, id sparse_vector . 身份证件 列是一个字母数字id,并且 稀疏向量 列包含这样的记录 SparseVector(243775, {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 15: 1.0, 16: 1.0, 24: 1.0, 30: 1.0, 31: 1.0, 32: 1.0, 61: 1.0, 88: 1.0, 90: 1.0, 96: 1.0, 104: 1.0, 153: 1.0, 155: 1.0, 159: 1.0, 160: 1.0, 161: 1.0, 162: 1.0, 169: 1.0, 181: 1.0, 194: 1.0, 212: 1.0, 220: 1.0, 222: 1.0, 232: 1.0, 303: 1.0, 390: 1.0, 427: 1.0, 506: 1.0, 508: 1.0, 509: 1.0, 518: 1.0, 554: 1.0, 568: 1.0, 798: 1.0, 1431: 1.0, 2103: 1.0, 2139: 1.0, 3406: 1.0, 3411: 1.0, 3415: 1.0, 3429: 1.0, 3431: 1.0, 3440: 1.0, 3443: 1.0, 3449: 1.0}))

    当我计算Jaccard并写下数据时,我丢失了很多id对。数据中总共有45k个标识,因此输出应该包含大约45k*45k对。

    当我把1k ID和45k ID进行比较时,我得到了所有可能的对,并以这种方式进行所有ID,有点像批处理。任何输入都会有帮助。 此外,我可以并行化代码,以便更快地批处理系统吗?我在emr集群上运行代码,并且有资源来增加集群大小。

    以下脚本可用于生成id为的样本数据和人工生成的稀疏向量。

    from random import randint
    from collections import OrderedDict
    with open('/mnt/lsh_data.csv', 'a') as the_file:
        the_file.write("id\vect\n")
        for i in range(45000):
            a = "id"
            b = a + str(i)
            num_ent = randint(101, 195) + randint(102, 200)
            lis = []
            for j in range(num_ent):
                lis.append(randint(0, 599999))
            lis.sort()
            l = list(OrderedDict.fromkeys(lis))
            data = []
            for j in range(len(l)):
                c = randint(0,1)
                if c == 0:
                    data.append(1.0)
                else:
                    data.append(0.0)
            b = b + "\t(600000,"+str(l)+","+str(data)+")\n"
            the_file.write(b)
    
    0 回复  |  直到 4 年前
        1
  •  0
  •   Paul    4 年前

    不是一个真正的答案,但太长了,无法发表评论:

    我不太确定 approxSimilarityJoin 以及预期的输出是什么。然而,我检查了文档中给出的示例( http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=minhash%20lsh#pyspark.ml.feature.MinHashLSH )它只有3x3,即使在这里,我们也不能得到完整的叉积(即使我们增加了阈值)。所以这可能不是预期的结果。。。

    from pyspark.ml.linalg import Vectors
    from pyspark.sql.functions import col
    from pyspark.ml.feature import MinHashLSH
    
    data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
            (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
            (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
    
    df = spark.createDataFrame(data, ["id", "features"])
    
    mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
    
    model = mh.fit(df)
    model.transform(df).head()
    
    data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
             (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
             (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
    
    df2 = spark.createDataFrame(data2, ["id", "features"])
    
    model.approxSimilarityJoin(df, df2, 1.0, distCol="JaccardDistance").show()
    
    
        2
  •  0
  •   Chris    4 年前

    检查 approxSimilarityJoin 源代码您可以看到函数首先在 locality sensitive hash (LSH)的每个输入向量,它“以高概率将相似的输入项散列到相同的桶中。”然后根据结果计算距离。其效果是,在计算每个向量的LSH后,仅计算最终位于同一桶中的向量之间的距离。这就是为什么在输入数据集中看不到所有对的距离,只看到最终位于同一个桶中的向量对的距离。

    此外,到LSH的输入是来自数据的输入向量和来自初始种子的随机系数,这解释了为什么改变种子会改变bucketing,从而改变输出。

    如果你通过改变 MinHashLSH seed 参数可以看到bucketing的变化。