代码之家  ›  专栏  ›  技术社区  ›  Wentao

无法序列化对象:AttributeError:“builtin\u function\u或\u method”对象没有属性“\uu code\uuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuuu

  •  1
  • Wentao  · 技术社区  · 7 年前

    我用python中的tensorflow训练了DNN分类器模型。现在,我想将其加载到pyspark中,并使用该模型预测RDD每条记录的性别。首先,我在训练模型中构建tensorflow图,然后加载训练模型并尝试预测RDD的每一行:

    """
    code to generate the tensorflow graph omitted
    """
    
    with tf.Session(graph=graph) as sess:
        # load the trained model
        saver.restore(sess, "./nonClass_gender")
        # lib is the RDD, each Row has the form of Row(key = ..., values = ..., indcies =..., shape = ...)
        predictions_1 = lib.map(lambda e: Row(key = e["key"], 
        prob = y_proba.eval(feed_dict={values: e["values"], 
        indices: e["indices"], shape: [1,2318]})))
        predictions_1.take(5)
    

    注意,在RDD中,每一行都有行的形式(键=…,值=…,索引=…,形状=…)。数值、指数和形状等同于此答案中的数值、指数和稠密形状: Use coo_matrix in TensorFlow . 它们用于生成sparsetenservalue。不同之处在于,在我的代码中,每行将生成一个sparsetenservalue。

    那么我有以下错误:

    Traceback (most recent call last):
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 148, in dump
        return Pickler.dump(self, obj)
      File "/usr/lib/python2.7/pickle.py", line 224, in dump
        self.save(obj)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
        save(element)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 255, in save_function
        self.save_function_tuple(obj)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
        save((code, closure, base_globals))
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
        save(element)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 600, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
        save(x)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 255, in save_function
        self.save_function_tuple(obj)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
        save((code, closure, base_globals))
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
        save(element)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 600, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
        save(tmp[0])
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 249, in save_function
        self.save_function_tuple(obj)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 297, in save_function_tuple
        save(f_globals)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 600, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
        save(tmp[0])
      File "/usr/lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 600, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
        save(x)
      File "/usr/lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 600, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
        save(x)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
        save(element)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 249, in save_function
        self.save_function_tuple(obj)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 292, in save_function_tuple
        save((code, closure, base_globals))
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
        save(element)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 600, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
        save(tmp[0])
      File "/usr/lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 600, in save_list
        self._batch_appends(iter(obj))
      File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
        save(x)
      File "/usr/lib/python2.7/pickle.py", line 331, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 600, in save_reduce
        save(state)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
        self._batch_setitems(obj.iteritems())
      File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
        save(v)
      File "/usr/lib/python2.7/pickle.py", line 286, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 368, in save_builtin_function
        return self.save_function(obj)
      File "/usr/local/spark/python/pyspark/cloudpickle.py", line 247, in save_function
        if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
    AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
    -------------------------------------------------------------------
    PicklingError                     Traceback (most recent call last)
    <ipython-input-210-74fa9037373f> in <module>()
          6         prob = y_proba.eval(feed_dict={values: e["values"], 
          7         indices: e["indices"], shape: [1,2318]})))
    ----> 8     predictions_1.take(5)
    
    /usr/local/spark/python/pyspark/rdd.pyc in take(self, num)
       1341 
       1342             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
    -> 1343             res = self.context.runJob(self, takeUpToNumLeft, p)
       1344 
       1345             items += res
    
    /usr/local/spark/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
        990         # SparkContext#runJob.
        991         mappedRDD = rdd.mapPartitions(partitionFunc)
    --> 992         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
        993         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
        994 
    
    /usr/local/spark/python/pyspark/rdd.pyc in _jrdd(self)
       2453 
       2454         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
    -> 2455                                       self._jrdd_deserializer, profiler)
       2456         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
       2457                                              self.preservesPartitioning)
    
    /usr/local/spark/python/pyspark/rdd.pyc in _wrap_function(sc, func, deserializer, serializer, profiler)
       2386     assert serializer, "serializer should not be empty"
       2387     command = (func, profiler, deserializer, serializer)
    -> 2388     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
       2389     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
       2390                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    
    /usr/local/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command)
       2372     # the serialized command will be compressed by broadcast
       2373     ser = CloudPickleSerializer()
    -> 2374     pickled_command = ser.dumps(command)
       2375     if len(pickled_command) > (1 << 20):  # 1M
       2376         # The broadcast will have same life cycle as created PythonRDD
    
    /usr/local/spark/python/pyspark/serializers.pyc in dumps(self, obj)
        458 
        459     def dumps(self, obj):
    --> 460         return cloudpickle.dumps(obj, 2)
        461 
        462 
    
    /usr/local/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
        702 
        703     cp = CloudPickler(file,protocol)
    --> 704     cp.dump(obj)
        705 
        706     return file.getvalue()
    
    /usr/local/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
        160                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
        161             print_exec(sys.stderr)
    --> 162             raise pickle.PicklingError(msg)
        163 
        164     def save_memoryview(self, obj):
    
    PicklingError: Could not serialize object: AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
    

    在上述代码中,如果我更改 prob = y_proba.eval(feed_dict={values: e["values"], indices: e["indices"], shape: [1,2318]}))) 到python定义的函数,如 proba = test(e["values"],e["indices"], [1,2318]) ,它将起作用。还有,如果我使用 y_proba.eval 在python中(不是在RDD映射中),它也可以工作。

    2 回复  |  直到 7 年前
        1
  •  0
  •   Alper t. Turker    7 年前
    • 将模型分发到每台机器(您可以使用 SparkFiles ).
    • 重写器

      def predict(rows, worker_session_path):
          with tf.Session(graph=graph) as sess:
              # load the trained model
              saver.restore(sess, worker_session_path)
              # lib is the RDD, each Row has the form of Row(key = ..., values = ..., indcies =..., shape = ...)
              return map(lambda e: Row(key = e["key"], 
                  prob = y_proba.eval(feed_dict={values: e["values"], 
                  indices: e["indices"], shape: [1,2318]})), rows)
      
    • 将其用于 mapPartitions

      lib.mapPartitions(lambda rows: predict(rows, worker_session_path))
      
        2
  •  0
  •   Wentao    7 年前

    感谢@user8371915,他的回答以及相关主题激发了我们的灵感: Transform map to mapPartition using pyspark ,我可以完成这项工作。解决方案的关键是 在mapPartitions使用的函数中构建tensoflow图 ,而不是在函数外部。以下是有效的代码:

    def predict(rows,worker_session_path):
    
        n_inputs = 2318 # the second dimension of the input sparse matrix X
        n_hidden1 = 200 # first hidden layer neuron 
        n_hidden2 = 20 # second hidden layer neuron 
        n_outputs = 2 # binary classification
        # build the graph as in the training model
        graph = tf.Graph()
        with graph.as_default():
            # for sparse tensor X
            values = tf.placeholder(tf.float32) 
            indices = tf.placeholder(tf.int64)
            shape = tf.placeholder(tf.int64)
    
            y = tf.placeholder(tf.int32, shape=(None), name="y")
    
            training = tf.placeholder_with_default(False, shape=(), name='training')
    
            with tf.name_scope("dnn"):
                hidden1 = first_layer(values, indices, shape, n_hidden1, name="hidden1", 
                                      activation=tf.nn.relu, n_inputs = n_inputs)
                hidden1_drop = tf.layers.dropout(hidden1, dropout_rate, training=training)
                hidden2 = neuron_layer(hidden1_drop, n_hidden2, name="hidden2",
                                       activation=tf.nn.relu)
                hidden2_drop = tf.layers.dropout(hidden2, dropout_rate, training=training)
                logits = neuron_layer(hidden2_drop, n_outputs, name="outputs")
                y_proba = tf.nn.softmax(logits)
    
            saver = tf.train.Saver()
    
        with tf.Session(graph=graph) as sess:
            saver.restore(sess, worker_session_path)
            for e in rows:
                proba = sess.run(y_proba, feed_dict={indices:e["indices"], 
                                                 values:e["values"], shape: [1,2318]})
                # np.squeeze convet proba shape from (1,2) to (2,)
                yield(Row(key = e['key'], proba = np.squeeze(proba)))
    
    lib2 = lib.mapPartitions(lambda rows: predict(rows, "./nonClass_gender"))
    lib2.take(5)