代码之家  ›  专栏  ›  技术社区  ›  Nurzhan Nogerbek

数据帧在Spark中的正确连接?

  •  0
  • Nurzhan Nogerbek  · 技术社区  · 7 年前

    我是新来的 火花框架 需要帮助!

    假设第一个数据帧( df1 )存储用户访问呼叫中心的时间。

    +---------+-------------------+
    |USER_NAME|       REQUEST_DATE|
    +---------+-------------------+
    |     Mark|2018-02-20 00:00:00|
    |     Alex|2018-03-01 00:00:00|
    |      Bob|2018-03-01 00:00:00|
    |     Mark|2018-07-01 00:00:00|
    |     Kate|2018-07-01 00:00:00|
    +---------+-------------------+
    

    第二个数据帧存储有关人员是否为组织成员的信息。 OUT 表示用户已离开组织。 IN 表示用户已来到组织。 START_DATE END_DATE 指相应过程的开始和结束。

    例如,您可以看到 Alex 离开组织 2018-01-01 00:00:00 ,此过程结束于 2018-02-01 00:00:00 . 您可以注意到,一个用户可以在不同的时间进出组织 Mark .

    +---------+---------------------+---------------------+--------+
    |NAME     | START_DATE          | END_DATE            | STATUS |
    +---------+---------------------+---------------------+--------+
    |     Alex| 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
    |      Bob| 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
    |     Mark| 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
    |     Mark| 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
    |    Meggy| 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
    +----------+--------------------+---------------------+--------+
    

    我想在决赛中得到这样一个数据框架。它必须包含来自第一个数据帧的所有记录,再加上一列,指示该人员在请求时是否是组织的成员。( REQUEST_DATE )或不是。

    +---------+-------------------+----------------+
    |USER_NAME|       REQUEST_DATE| USER_STATUS    |
    +---------+-------------------+----------------+
    |     Mark|2018-02-20 00:00:00| Our user       |
    |     Alex|2018-03-01 00:00:00| Not our user   |
    |      Bob|2018-03-01 00:00:00| Our user       |
    |     Mark|2018-07-01 00:00:00| Not our user   |
    |     Kate|2018-07-01 00:00:00| No Information |
    +---------+-------------------+----------------+
    

    代码:

    val df1: DataFrame  = Seq(
        ("Mark", "2018-02-20 00:00:00"),
        ("Alex", "2018-03-01 00:00:00"),
        ("Bob", "2018-03-01 00:00:00"),
        ("Mark", "2018-07-01 00:00:00"),
        ("Kate", "2018-07-01 00:00:00")
    ).toDF("USER_NAME", "REQUEST_DATE")
    
    df1.show()
    
    val df2: DataFrame  = Seq(
        ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
        ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
        ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
        ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
        ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
    ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")
    
    df2.show()
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Etienne Herlaut    7 年前
    import org.apache.spark.sql.Dataset
    import org.apache.spark.sql.functions._
    
    case class UserAndRequest(
                               USER_NAME:String,
                               REQUEST_DATE:java.sql.Date,
                               START_DATE:java.sql.Date,
                               END_DATE:java.sql.Date,
                               STATUS:String,
                               REQUEST_ID:Long
                             )
    
    val joined : Dataset[UserAndRequest] = df1.withColumn("REQUEST_ID", monotonically_increasing_id).
      join(df2,$"USER_NAME" === $"NAME", "left").
      as[UserAndRequest]
    
    val lastRowByRequestId = joined.
      groupByKey(_.REQUEST_ID).
      reduceGroups( (x,y) =>
        if (x.REQUEST_DATE.getTime > x.END_DATE.getTime && x.END_DATE.getTime > y.END_DATE.getTime) x else y
      ).map(_._2)
    
    def logic(status: String): String = {
      if (status == "IN") "Our user"
      else if (status == "OUT") "not our user"
      else "No Information"
    }
    
    val logicUDF = udf(logic _)
    
    val finalDF = lastRowByRequestId.withColumn("USER_STATUS",logicUDF($"REQUEST_DATE"))
    

    收益率:

    result