代码之家  ›  专栏  ›  技术社区  ›  Vinod

跨职业和电影类型的电影收视率百分比分析

  •  1
  • Vinod  · 技术社区  · 7 年前

    我刚开始学习Spark编程和Python编程:你能帮我理解我代码中的错误吗

    我正在Jupyter笔记本电脑的交互模式下运行代码。

    1. 下面的测试代码运行良好,我在其中测试了该概念:

       rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))])
      
      result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]), (x[1][4]+y[1][4]), (x[1][5]+y[1][5]), (x[1][6]+y[1][6]), (x[1][7]+y[1][7]), (x[1][8]+y[1][8]), (x[1][9]+y[1][9]), (x[1][10]+y[1][10]), (x[1][11]+y[1][11])))
      print (result.top(3))
      

      输出:

      [('librarian', (2, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0))]
      
    2. 以下也很好:

      #[(movieid, genre_list)]
      
      aggregateRDD = movieRDD.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1])))
      print (aggregateRDD.top(3))
      

      输出:

      [(1682, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1681, [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (1680, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])]
      
    3. 但是,当我在程序中使用类似的概念时,它是不能接受的。我做错了什么:

      ##############################################################################
      ### Analysis of Movie Ratings percentages across Occupation and Movie Genre
      ##############################################################################
      
      from pyspark import SparkConf, SparkContext
      
      conf = SparkConf().setMaster("local").setAppName("popularMovie")
      sc = SparkContext(conf =conf)
      
      ###import movie ratings into RDD
      ratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")
      ###import user details into RDD
      userLines = sc.textFile("///SparkCourse/ml-100k/u.user")
      ###import movie data into RDD
      movieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")
      ###import genre data into RDD
      genreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")
      
      ###split on delimiter functions
      def splitRatingTab(line):
          line = line.split('\t')
          return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)
      def splitUserPipe(line):
          line = line.split('|')
          return (int(line[0]), line[3]) #(user, occupation)
      def splitMoviePipe(line):
          line = line.split('|')
          return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])
      
      
      def listToIntElements(lst):
          """conver the boolean text ('0', '1') genre value to integers (0, 1)"""
          for cnt, _ in enumerate(lst):
              lst[cnt] = int(_)
          return lst
      
      ###create dictionary object for genreid and genre
      def loadMovieGenre():
          """
          create dictionary object for genreid and genre
          """
          genre = {}
          with open('C:/SparkCourse/ml-100k/u.genre') as file:
              for line in file:
                  #each line is of type [genere, genreid]
                  line = line.split('|')
                  #convert genreid to int, to remove new line '\n' at the end of string
                  genre[int(line[1])] = line[0]
              return genre
      
      
      ### Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers
      ratingRDD = ratingLines.map(lambda line: splitRatingTab(line))
      
      ### Transform to RDD as [(user, occupation)]
      occupationRDD = userLines.map(splitUserPipe)
      
      ### Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres
      movieRDD = movieLines.map(splitMoviePipe)
      
      ###join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; 
      ###then Transform to [(movieid,((userid, rating), genre) )]
      joinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)
      
      
      ###Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]
      ###to Transform to [(occupation, ((1, genre)))]
      transRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))
      joinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))
      print (joinRatingGenresOccup.take(2))
      
      
      ###Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]
      totalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))
      print (totalRatingGenreCntByOccupation.take(2))
      

      错误:

      [('librarian', (1, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('librarian', (1, [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]))]
      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-137-a156d8bbfde9> in <module>()
      ----> 1 get_ipython().run_cell_magic('time', '', '\n##############################################################################\n### Analysis of Movie Ratings percentages across Occupation and Movie Genre\n##############################################################################\n\n#import movie ratings into RDD\nratingLines = sc.textFile("C:/SparkCourse/ml-100k/u.data")\n#import user details into RDD\nuserLines = sc.textFile("///SparkCourse/ml-100k/u.user")\n#import movie data into RDD\nmovieLines = sc.textFile("C:/SparkCourse/ml-100k/u.item")\n#import genre data into RDD\ngenreLines = sc.textFile("C:/SparkCourse/ml-100k/u.genre")\n\n#split on delimiter functions\ndef splitRatingTab(line):\n    line = line.split(\'\\t\')\n    return (int(line[0]), int(line[1]), int(line[2])) #(movieid, user, rating)\ndef splitUserPipe(line):\n    line = line.split(\'|\')\n    return (int(line[0]), line[3]) #(user, occupation)\ndef splitMoviePipe(line):\n    line = line.split(\'|\')\n    return (int(line[0]), list(listToIntElements(line[5:]))) #(movieid, genre_list[])\n\n\ndef listToIntElements(lst):\n    """conver the boolean text (\'0\', \'1\') genre value to integers (0, 1)"""\n    for cnt, _ in enumerate(lst):\n        lst[cnt] = int(_)\n    return lst\n\n#create dictionary object for genreid and genre\ndef loadMovieGenre():\n    """\n    create dictionary object for genreid and genre\n    """\n    genre = {}\n    with open(\'C:/SparkCourse/ml-100k/u.genre\') as file:\n        for line in file:\n            #each line is of type [genere, genreid]\n            line = line.split(\'|\')\n            #convert genreid to int, to remove new line \'\\n\' at the end of string\n            genre[int(line[1])] = line[0]\n        return genre\n\n    \n# Transform to RDD as [(movieid, user, rating)] for movies, which are reviewed by viewers\nratingRDD = ratingLines.map(lambda line: splitRatingTab(line))\n#print (\'ratingRDD:\\n\',ratingRDD.top(5))\n\n# Transform to RDD as [(user, occupation)]\noccupationRDD = userLines.map(splitUserPipe)\n#print (\'occupationRDD:\\n\',occupationRDD.top(3))\n\n# Transform to RDD as [(movieid, genre_list)], genre is boolean value, movie can be in multiple genres\nmovieRDD = movieLines.map(splitMoviePipe)\n#print (\'movieRDD:\\n\',movieRDD.top(3))\n\n#join Transformed rating RDD [(movieid, (user, rating))] and movieRDD [(movieid, genre] to get all genres; \n#then Transform to [(movieid,((userid, rating), genre) )]\njoinRatingMovieGenres = ratingRDD.map(lambda line: (line[0], (line[1], line[2]))).join(movieRDD)\n#print (joinRatingMovieGenres.take(2))\n\n#Transform joinRatingMovieGenres to RDD [userid, (rating, genre)] and join with occupationRDD [(userid, occupation)]\n#to Transform to [(occupation, ((1, genre)))]\ntransRatingMovieGenres = joinRatingMovieGenres.map(lambda line: (line[1][0][0], (line[1][0][1], line[1][1])))\njoinRatingGenresOccup = transRatingMovieGenres.join(occupationRDD).map(lambda line: (line[1][1], (1, line[1][0][1])))\nprint (joinRatingGenresOccup.take(2))\n\n\n#Transform by Aggregating the ratingCount and genreCount to [(occupation, (totalRatings, {cntGenresRating}))]\ntotalRatingGenreCntByOccupation = joinRatingGenresOccup.reduceByKey(lambda x, y: ((x[0]+y[0]), (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][12]), (x[1][3]+y[1][3])))\nprint (totalRatingGenreCntByOccupation.take(2))')
      
      C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
         2113             magic_arg_s = self.var_expand(line, stack_depth)
         2114             with self.builtin_trap:
      -> 2115                 result = fn(magic_arg_s, cell)
         2116             return result
         2117 
      
      <decorator-gen-60> in time(self, line, cell, local_ns)
      
      C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magic.py in <lambda>(f, *a, **k)
          186     # but it's overkill for just that one bit of state.
          187     def magic_deco(arg):
      --> 188         call = lambda f, *a, **k: f(*a, **k)
          189 
          190         if callable(arg):
      
      C:\Users\vmatcha\AppData\Local\Enthought\Canopy\edm\envs\User\lib\site-packages\IPython\core\magics\execution.py in time(self, line, cell, local_ns)
         1183         else:
         1184             st = clock2()
      -> 1185             exec(code, glob, local_ns)
         1186             end = clock2()
         1187             out = None
      
      <timed exec> in <module>()
      
      C:\spark\python\pyspark\rdd.py in take(self, num)
         1356 
         1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
      -> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
         1359 
         1360             items += res
      
      C:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
          999         # SparkContext#runJob.
         1000         mappedRDD = rdd.mapPartitions(partitionFunc)
      -> 1001         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
         1002         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
         1003 
      
      C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
         1158         answer = self.gateway_client.send_command(command)
         1159         return_value = get_return_value(
      -> 1160             answer, self.gateway_client, self.target_id, self.name)
         1161 
         1162         for temp_arg in temp_args:
      
      C:\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
           61     def deco(*a, **kw):
           62         try:
      ---> 63             return f(*a, **kw)
           64         except py4j.protocol.Py4JJavaError as e:
           65             s = e.java_exception.toString()
      
      C:\spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
          318                 raise Py4JJavaError(
          319                     "An error occurred while calling {0}{1}{2}.\n".
      --> 320                     format(target_id, ".", name), value)
          321             else:
          322                 raise Py4JError(
      
      Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 239.0 failed 1 times, most recent failure: Lost task 1.0 in stage 239.0 (TID 447, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
        File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
        File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "C:\spark\python\pyspark\rdd.py", line 362, in func
          return f(iterator)
        File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally
          merger.mergeValues(iterator)
        File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
          d[k] = comb(d[k], v) if k in d else creator(v)
        File "<timed exec>", line 73, in <lambda>
      TypeError: 'int' object is not subscriptable
      
          at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
          at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
          at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
          at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
          at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
          at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
          at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
          at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
          at org.apache.spark.scheduler.Task.run(Task.scala:109)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:748)
      
      Driver stacktrace:
          at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
          at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
          at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
          at scala.Option.foreach(Option.scala:257)
          at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
          at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
          at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
          at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
          at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
          at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
          at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
          at sun.reflect.GeneratedMethodAccessor90.invoke(Unknown Source)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
          at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
          at py4j.Gateway.invoke(Gateway.java:282)
          at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
          at py4j.commands.CallCommand.execute(CallCommand.java:79)
          at py4j.GatewayConnection.run(GatewayConnection.java:214)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 229, in main
        File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 224, in process
        File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "C:\spark\python\pyspark\rdd.py", line 2438, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "C:\spark\python\pyspark\rdd.py", line 362, in func
          return f(iterator)
        File "C:\spark\python\pyspark\rdd.py", line 1857, in combineLocally
          merger.mergeValues(iterator)
        File "C:\spark\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
          d[k] = comb(d[k], v) if k in d else creator(v)
        File "<timed exec>", line 73, in <lambda>
      TypeError: 'int' object is not subscriptable
      
          at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
          at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
          at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
          at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
          at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
          at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
          at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
          at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
          at org.apache.spark.scheduler.Task.run(Task.scala:109)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          ... 1 more 
      
    1 回复  |  直到 7 年前
        1
  •  0
  •   user3689574    7 年前

    当你还原时 您必须返回相同的结构 您已经收到,否则下次您将遇到同一个键的值,并尝试减少它,您的功能将无法工作。

    您只测试了两个元素,因此您没有看到它,但如果您尝试使用3…:

     rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
                          ('librarian', (1, [0, 1, 0, 0]))])
    
    result = rdd.reduceByKey(lambda x, y: ((x[0]+y[0]),\
                                (x[1][0]+y[1][0]), (x[1][1]+y[1][1]), (x[1][2]+y[1][2]), (x[1][3]+y[1][3]) ))
    

    。。。。。文件 “/home/hado/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py”,第行 1784年,在合并合并器中 合并mergeCombiners(迭代器)文件“/home/hado/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/shuffle.py”, 线路272,合并合并器中 d[k]=梳(d[k],v),如果d else v文件“”第3行中的k,类型错误: “int”对象没有属性' 获取项目 “。。。。。。

    正确的方法 在代码中执行reduceByKey就是返回具有相同大小的值和列表的相同元组:

     rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0])), ('librarian', (1, [0, 1, 0, 0])),\
                          ('librarian', (1, [0, 1, 0, 0]))])
    
    result = rdd.reduceByKey(lambda x, y: ( x[0] + y[0],\
                                [x[1][0]+y[1][0], x[1][1]+y[1][1], x[1][2]+y[1][2], x[1][3]+y[1][3] ] ))
    print (result.collect())
    

    [(‘图书管理员’,(3,[0,2,1,0])]

    您还可以执行如下所述的组合键: `combineByKey`, pyspark

    还要注意的是,(在倒数第二行):“(x[1][2]+y[1][12])似乎是一个拼写错误。