代码之家  ›  专栏  ›  技术社区  ›  marjun

使用spark submit提交自定义UDF时出现类型错误

  •  -3
  • marjun  · 技术社区  · 7 年前

    得到 TypeError 使用spark submit--py文件udf提交时

    TypeError: 'in <string>' requires string as left operand, not NoneType

    我已经用proj.py写了所有的自定义项。

    group_1 =['EAST','NORTH','SOUTH','SOUTHEAST','SOUTHWEST']
    group_2 =['AUTORX','CAREWORKS','CHIROSPORT']
    
    mearged_list = group_1 + group_2
    str1 = ''.join(mearged_list)
    
    def search_list(column):
        return any(column in item for item in str1)
    
    sqlContext.udf.register("search_list_udf", search_list, BooleanType())
    

    当从pyspark shell调用此函数时,它不会抛出任何错误。当我用spark运行这个命令时,提交获取以下错误。

    错误:

      File "/hd_data/disk23/hadoop/yarn/local/usercache/hscrsawd/appcache/application_1530205632093_12027/container_1530205632093_12027_01_000007/pyspark.zip/pyspark/worker.py", line 177, in main
        process()
      File "/hd_data/disk23/hadoop/yarn/local/usercache/hscrsawd/appcache/application_1530205632093_12027/container_1530205632093_12027_01_000007/pyspark.zip/pyspark/worker.py", line 172, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/hd_data/disk23/hadoop/yarn/local/usercache/hscrsawd/appcache/application_1530205632093_12027/container_1530205632093_12027_01_000007/pyspark.zip/pyspark/worker.py", line 104, in <lambda>
        func = lambda _, it: map(mapper, it)
      File "<string>", line 1, in <lambda>
      File "/hd_data/disk23/hadoop/yarn/local/usercache/hscrsawd/appcache/application_1530205632093_12027/container_1530205632093_12027_01_000007/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
        return lambda *a: f(*a)
      File "NAM_Udfs.py", line 115, in search_list
        return any(column in item for item in str1)
      File "NAM_Udfs.py", line 115, in <genexpr>
        return any(column in item for item in str1)
    TypeError: 'in <string>' requires string as left operand, not NoneType
    
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
            at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
            at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
            at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
            at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
            at org.apache.spark.scheduler.Task.run(Task.scala:108)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Ryan Widmaier    7 年前

    您只需要将UDF更改为如下所示的空值。您可能还需要考虑列值中的空字符串。

    def search_list(column):
        if column is None:
            return False
        return any(column in item for item in str1)