代码之家  ›  专栏  ›  技术社区  ›  user2896120

查找非重复数组列的大小

  •  0
  • user2896120  · 技术社区  · 6 年前

    我正在使用scala和spark创建一个数据帧。到目前为止,我的代码是:

     val df = transformedFlattenDF
    .groupBy($"market", $"city", $"carrier").agg(count("*").alias("count"), min($"bandwidth").alias("bandwidth"), first($"network").alias("network"), concat_ws(",", collect_list($"carrierCode")).alias("carrierCode")).withColumn("carrierCode", split(($"carrierCode"), ",").cast("array<string>")).withColumn("Carrier Count", collect_set("carrierCode"))
    

    列carriercode变为数组列。数据如下:

    CarrierCode
    1: [12,2,12]
    2: [5,2,8]
    3: [1,1,3]
    

    我想创建一个列来计算每个数组中不同值的数量。我试过这么做 collect_set 但是,它给了我一个错误的说法 grouping expressions sequence is empty 是否可以找到每行数组中不同值的数目?因此,在我们的同一个示例中,可能会有这样一个列:

    Carrier Count
    1: 2
    2: 3
    3: 2
    
    3 回复  |  直到 6 年前
        1
  •  1
  •   Leo C    6 年前

    collect_set groupBy-agg

    val df = transformedFlattenDF.groupBy($"market", $"city", $"carrier").agg(
        count("*").alias("count"), min($"bandwidth").alias("bandwidth"),
        first($"network").alias("network"),
        concat_ws(",", collect_list($"carrierCode")).alias("carrierCode"),
        size(collect_set($"carrierCode")).as("carrier_count")  // <-- ADDED `collect_set`
      ).
      withColumn("carrierCode", split(($"carrierCode"), ",").cast("array<string>"))
    

    import org.apache.spark.sql.functions._
    
    val codeDF = Seq(
      Array("12", "2", "12"),
      Array("5", "2", "8"),
      Array("1", "1", "3")
    ).toDF("carrier_code")
    
    def distinctElemCount = udf( (a: Seq[String]) => a.toSet.size )
    
    codeDF.withColumn("carrier_count", distinctElemCount($"carrier_code")).
      show
    // +------------+-------------+
    // |carrier_code|carrier_count|
    // +------------+-------------+
    // | [12, 2, 12]|            2|
    // |   [5, 2, 8]|            3|
    // |   [1, 1, 3]|            2|
    // +------------+-------------+
    
        2
  •  0
  •   Ged    6 年前

    import org.apache.spark.sql.functions._
    
    val df = sc.parallelize(Seq(
             ("A", 2, 100, 2), ("F", 7, 100, 1), ("B", 10, 100, 100)
             )).toDF("c1", "c2", "c3", "c4")
    
    val x = df.select("c1", "c2", "c3", "c4").rdd.map(x => (x.get(0),  List(x.get(1), x.get(2), x.get(3)))  )
    val y = x.map {case (k, vL) => (k, vL.toSet.size) }
    // Manipulate back to your DF, via conversion, join, what not.
    

    res15: Array[(Any, Int)] = Array((A,2), (F,3), (B,2))
    

        3
  •  0
  •   Manoj Kumar Dhakad    6 年前

    //Input
    df.show
    +-----------+
    |CarrierCode|
    +-----------+
    |1:[12,2,12]|
    |  2:[5,2,8]|
    |  3:[1,1,3]|
    +-----------+
    //udf
     val countUDF=udf{(str:String)=>val strArr=str.split(":"); strArr(0)+":"+strArr(1).split(",").distinct.length.toString}
    
    df.withColumn("Carrier Count",countUDF(col("CarrierCode"))).show
    
    //Sample Output:
    +-----------+-------------+
    |CarrierCode|Carrier Count|
    +-----------+-------------+
    |1:[12,2,12]|          1:3|
    |  2:[5,2,8]|          2:3|
    |  3:[1,1,3]|          3:3|
    +-----------+-------------+