代码之家  ›  专栏  ›  技术社区  ›  Danylo Kuznetsov

如何在PySpark Rancher中将DataFrame转换为整数?

  •  0
  • Danylo Kuznetsov  · 技术社区  · 7 月前

    我是PySpark的新用户,目前在ViewModel中工作,比较两个具有相同列结构的DataFrames。我正在将它们相互比较(本质上是将已经加载到数据库中的文件与新文件进行比较)。在此过程中,我使用以下代码计算对每个变量所做的更改数量:

    Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
    Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))  
    
    # Summarizing the number of changes
    Change_To_Value1 = Comparison_DF.select(sum("Value1_Change"))
    Change_To_Value2 = Comparison_DF.select(sum("Value2_Change"))
    
    # Forming the change report DataFrame
    # columns=["Type of Change", "Number of Occurrences"]
    data = [("Change to Value1", Change_To_Value1), ("Change to Value2", Change_To_Value2)]
    
    rdd = spark.sparkContext.parallelize(data)
    print(data) 
    

    线 rdd = spark.sparkContext.parallelize(data) 返回错误。在检查了错误回溯后,我意识到 Change_To_Value1 Change_To_Value2 不是变量,而是DataFrames。这个 print(data) 语句给出了以下结果: [('Change to Value1', DataFrame[sum(Value1_Change): bigint]), ('Change to Value2', DataFrame[sum(Value2_Change): bigint])] .

    我需要形成这种DataFrame,将其用作更改报告,以便与SSIS包返回的结果进行比较。

    我在StackOverflow或任何其他开源软件上都没有发现类似的东西。我试图构建一个循环语句来收集这些DataFrames并将其直接馈送到一个新的DataFrames中,但我也失败了。

    是否有方法将这些DataFrames转换为int变量?或者,是否有更好的方法来形成此DataFrame?

    1 回复  |  直到 7 月前
        1
  •  0
  •   Steven    7 月前

    如果我理解正确,您想创建一个如下所示的DataFrame:

    变更类型 发生次数
    更改为值1 xxx
    更改为值2 yyy

    以下是我将如何处理它:

    # Keep the first two lines as is:
    Comparison_DF = DF1_Data_To_Compare.withColumn("Value1_Change", when(col("b.Value1") == col("a.Value1"), 0).otherwise(1))
    Comparison_DF = Comparison_DF.withColumn("Value2_Change", when(col("b.Value2") == col("a.Value2"), 0).otherwise(1))  
    
    # Then, modify the next part slightly:
    Change_To_Value1 = Comparison_DF.select(
        F.lit("Change to Value1").alias("Type of Change"),
        sum("Value1_Change").alias("Number of Occurrences"),
    )
    Change_To_Value2 = Comparison_DF.select(
        F.lit("Change to Value2").alias("Type of Change"),
        sum("Value2_Change").alias("Number of Occurrences"),
    )
    
    # Combine the two DataFrames
    data = Change_To_Value1.unionByName(Change_To_Value2)
    
    # Additionally, if you need the data as an RDD (though not recommended):
    data.rdd
    

    这种方法应该能为您提供所需的DataFrame结构。请注意,除非必要,否则通常不建议使用RDD,因为DataFrames针对PySpark操作进行了更优化。

    推荐文章