代码之家  ›  专栏  ›  技术社区  ›  Himanshu Yadav

Spark数据帧:选择不同的行

  •  0
  • Himanshu Yadav  · 技术社区  · 6 年前

    我试过两种方法从拼花地板中找出不同的行,但似乎不起作用。
    尝试1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
    但是抛出

    Cannot have map type columns in DataFrame which calls set operations
    (intersect, except, etc.), 
    but the type of column canvasHashes is map<string,string>;;
    

    尝试2: 已尝试运行sql查询:

    Dataset<Row> df = sqlContext.read().parquet("location.parquet");
        rawLandingDS.createOrReplaceTempView("df");
        Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");
    

    我得到的错误:

    = SQL ==
    SELECT distinct on timestamp * from df
    -----------------------------^^^
    

    有没有办法在读取拼花文件时获得不同的记录?任何我可以使用的阅读选项。

    1 回复  |  直到 5 年前
        1
  •  5
  •   Community CDub    6 年前

    您面临的问题在异常消息中有明确的说明-因为 MapType 列既不可哈希也不可排序不能用作分组或分区表达式的一部分。

    您采用的SQL解决方案在逻辑上与 distinct Dataset . 如果要基于一组兼容列消除重复数据,则应使用 dropDuplicates :

    df.dropDuplicates("timestamp")
    

    相当于

    SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
           first(canvasHashes) AS canvasHashes
    FROM df GROUP BY timestamp
    

    不幸的是如果你的目标是真的 DISTINCT 不会那么容易的。一个可行的解决方案是利用Scala* Map 散列。你可以定义 斯卡拉 udf 这样地:

    spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)
    

    然后在Java代码中使用它来派生可用于 删除重复项 :

     df
      .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
      .dropDuplicates(
        // All columns excluding canvasHashes / hash_of_canvas_hashes
        "timestamp",  "c1", "c2", ..., "cn" 
        // Hash used as surrogate of canvasHashes
        "hash_of_canvas_hashes"         
      )
    

    与SQL等效

    SELECT 
      timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
      first(canvasHashes) AS canvasHashes
    FROM df GROUP BY
      timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes
    

    *请注意 java.util.Map 用它的 hashCode 也不管用 哈希码 不一致。

        2
  •  2
  •   Andronicus    6 年前

    是的,语法不正确,应该是:

    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");
    
        3
  •  2
  •   vaquar khan    6 年前

    1) 如果您想基于colun进行区分,可以使用它

    val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")
    
    
    scala> df.show
    +---+---+
    | no|age|
    +---+---+
    |  1|  2|
    |  3|  4|
    |  1|  6|
    +---+---+
    
    val distinctValuesDF = df.select(df("no")).distinct
    
    scala> distinctValuesDF.show
    +---+
    | no|
    +---+
    |  1|
    |  3|
    +---+
    

    2) 如果您想在所有列上都是唯一的,请使用dropduplicate

    scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")
    
    
    
    scala> df.show
    
    +---+---+
    | no|age|
    +---+---+
    |  1|  2|
    |  3|  4|
    |  3|  4|
    |  1|  6|
    +---+---+
    
    
    scala> df.dropDuplicates().show()
    +---+---+
    | no|age|
    +---+---+
    |  1|  2|
    |  3|  4|
    |  1|  6|
    +---+---+