代码之家  ›  专栏  ›  技术社区  ›  Joe Bloggr

如何将Dataframe类型的函数参数传递给SparkSQL查询

  •  0
  • Joe Bloggr  · 技术社区  · 11 月前

    我有一个封装在函数中的spark.sql查询。我想向查询传递一个函数参数,该参数是一个数据帧,但出现了一些错误。有人能看出我做错了什么吗?

    功能:

    1  def my_function(df_table: DataFrame) -> DataFrame:
    2 
    3    sql_query = f"""
    4    SELECT DISTINCT dt.CountryId,
    5    Cast(RIGHT(dt.RegionIdentifier, 2) as Integer) as RegionID
    6    FROM {df_table} dt 
    7    WHERE dt.CountryCode = 23
    8    """
    9
    10   df = spark.sql(sql_query)
    11   return df
    

    这就是我在笔记本中的称呼:

    df_table = spark.table('path_to_table/_location/')
    my_function(df_table)
    

    我收到错误消息:

    [PARSE_SYNTAX_ERROR] Syntax error at or near '['. SQLSTATE: 42601
    

    如果我删除 {df_table} 在LINE-6上,将表名硬编码,它就可以工作了。是否有方法将表名作为Dataframe传递,并将其作为arg传递?

    当我打印sql_query时,它显示:

    SELECT DISTINCT dt.CountryId,
    Cast(RIGHT(dt.RegionIdentifier, 2) as Integer) as RegionID
    FROM DataFrame[CountryId: bigint, RegionID: int, RegionIdentifier: string, TimeOf: timestamp] dt
    WHERE dt.CountryCode = 23
    
    2 回复  |  直到 11 月前
        1
  •  2
  •   Czaporka    11 月前

    为了能够在SQL查询中访问DataFrame,请使用方法将其注册为临时视图 DataFrame.createOrReplaceTempView :

    def my_function(df_table: DataFrame) -> DataFrame:
        temp_view_name = "some_temp_view"
        df_table.createOrReplaceTempView(temp_view_name)
    
        sql_query = f"""
        SELECT DISTINCT dt.CountryId,
        Cast(RIGHT(dt.RegionIdentifier, 2) as Integer) as RegionID
        FROM {temp_view_name} dt 
        WHERE dt.CountryCode = 23
        """
    
        df = spark.sql(sql_query)
        return df
    
        2
  •  1
  •   Yaroslav Fyodorov    11 月前

    您的SQL语句需要一个表的名称——您不能放DataFrame对象,它们不等效。DataFrame不一定有名称,因为它可能是您加载的DataFrame上的某些操作的结果。

    如果您希望此函数始终接受表作为参数,那么只需传递名称(字符串)即可。

    如果你预计有时它可能会得到一个DataFrame,这是其他操作的结果,那么你不能将select构建为SQL语句,但你可以通过应用pyspark函数来添加它,就像这样

     import pyspark.sql.functions as F
    
     df = df_table.where(F.col("CountryCode") == 23).\
                   select(F.countDistinct(F.col("CountryId")), 
                          F.right(F.col("RegionIdentifier"),2).\
                                    cast("integer").alias("RegionID"))