代码之家  ›  专栏  ›  技术社区  ›  ahajib Shaun McHugh

Pyspark:在UDF中传递多列和一个参数

  •  1
  • ahajib Shaun McHugh  · 技术社区  · 7 年前

    我正在编写一个udf,它将获取两个dataframe列以及一个额外的参数(一个常量值),并应向dataframe添加一个新列。我的函数看起来像:

    def udf_test(column1, column2, constant_var):
        if column1 == column2:
            return column1
        else:
            return constant_var
    

    apply_test = udf(udf_test, StringType())
    df = df.withColumn('new_column', apply_test('column1', 'column2'))
    

    除非我移除 constant_var

    constant_var = 'TEST'
    apply_test = udf(lambda x: udf_test(x, constant_var), StringType())
    df = df.withColumn('new_column', apply_test(constant_var)(col('column1', 'column2')))
    

    apply_test = udf(lambda x,y: udf_test(x, y, constant_var), StringType())
    

    以上这些对我都不管用。我是根据 this this 我想很明显我的问题和这两个问题有什么不同。任何帮助都将不胜感激。

    注: 为了便于讨论,我在这里对函数进行了简化,实际函数更复杂。我知道这个手术可以用 when otherwise 声明。

    1 回复  |  直到 7 年前
        1
  •  7
  •   Pieter Eric Levieil    7 年前

    您不必使用用户定义的函数。你可以使用这些函数 when() otherwise() :

    from pyspark.sql import functions as f
    df = df.withColumn('new_column', 
                       f.when(f.col('col1') == f.col('col2'), f.col('col1'))
                        .otherwise('other_value'))
    

    另一种方法是生成用户定义的函数。但是,使用 udf

    def generate_udf(constant_var):
        def test(col1, col2):
            if col1 == col2:
                return col1
            else:
                return constant_var
        return f.udf(test, StringType())
    
    df = df.withColumn('new_column', 
                       generate_udf('default_value')(f.col('col1'), f.col('col2')))