代码之家  ›  专栏  ›  技术社区  ›  Jeff

如何从文件中读取数据并将其传递给Spark/PySpark中的FPGrowth算法

  •  0
  • Jeff  · 技术社区  · 7 年前

    我正在尝试从文件(用逗号分隔的项目)读取数据,并使用PySpark将此数据传递给FPGrowth算法。

    到目前为止,我的代码如下:

    import pyspark
    from pyspark import SparkContext
    
    sc = SparkContext("local", "Assoc Rules", pyFiles=[])
    
    txt = sc.textFile("step3.basket")
    data =  txt.map(lambda line: line.split(",")).collect()
    rdd = sc.parallelize(data, 2)
    
    from pyspark.ml.fpm import FPGrowth
    
    fpg = FPGrowth(minSupport=0.02, minConfidence=0.6)
    model = fpg.fit(rdd)
    

    但是,当我尝试运行代码时,出现以下错误:

    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    <ipython-input-3-d34039dccad5> in <module>()
          2 
          3 fpg = FPGrowth(minSupport=0.02, minConfidence=0.6)
    ----> 4 model = fpg.fit(rdd)
    
    ~/local/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
         62                 return self.copy(params)._fit(dataset)
         63             else:
    ---> 64                 return self._fit(dataset)
         65         else:
         66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "
    
    ~/local/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
        263 
        264     def _fit(self, dataset):
    --> 265         java_model = self._fit_java(dataset)
        266         return self._create_model(java_model)
        267 
    
    ~/local/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
        260         """
        261         self._transfer_params_to_java()
    --> 262         return self._java_obj.fit(dataset._jdf)
        263 
        264     def _fit(self, dataset):
    

    AttributeError:“RDD”对象没有属性“\u jdf”

    我做错了什么,我该如何纠正?

    1 回复  |  直到 7 年前
        1
  •  1
  •   mayank agrawal    7 年前

    pyspark的FPGrowth。ml.fpm采用pyspark数据帧,而不是rdd。将rdd转换为数据帧,然后传递。检查 http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth.fit

    或从mllib导入fpgrowth

    from pyspark.mllib.fpm import FPGrowth
    

    编辑: 有两种方法可以继续。

    1、使用rdd方法

    直接从文件中提取,

    from pyspark.mllib.fpm import FPGrowth
    txt = sc.textFile("step3.basket").map(lambda line: line.split(","))    
                                #your txt is already a rdd
                                #No need to collect it and parallelize again
    
    model = FPGrowth.train(txt, minSupport=0.2, numPartitions=10) #change parameters according to need
                                                                  #model is ready
    

    2、使用dataframe(我认为这是一种更好的方法)

    from pyspark.ml.fpm import FPGrowth
    df = sc.textFile("step3.basket").map(lambda line: (line.split(","),))
            .toDF('items')
    
    fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
    model = fp.fit(df)  #model is ready!