代码之家  ›  专栏  ›  技术社区  ›  marjun

JavaUDF在Sql SQL中复制相同的值[复制]

  •  1
  • marjun  · 技术社区  · 6 年前

    我有一个带拼花文件的数据框,我必须添加一个包含一些随机数据的新列,但我需要这些随机数据彼此不同。这是我的实际代码,当前版本的Spark是1.5.1-CDH-5.5.2:

    val mydf = sqlContext.read.parquet("some.parquet")
    // mydf.count()
    // 63385686 
    mydf.cache
    
    val r = scala.util.Random
    import org.apache.spark.sql.functions.udf
    def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
    val myFunction = udf(myNextPositiveNumber _)
    val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
    

    有了这个代码,我就有了这个数据:

    scala> myNewDF.select("myNewColumn").show(10,false)
    +-----------+
    |myNewColumn|
    +-----------+
    |889488717D |
    |889488717D |
    |889488717D |
    |889488717D |
    |889488717D |
    |889488717D |
    |889488717D |
    |889488717D |
    |889488717D |
    |889488717D |
    +-----------+
    

    看起来udf mynextpositivenumber只调用一次,不是吗?

    更新 确认,只有一个不同的值:

    scala> myNewDF.select("myNewColumn").distinct.show(50,false)
    17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    ...
    
    +-----------+                                                                   
    |myNewColumn|
    +-----------+
    |889488717D |
    +-----------+
    

    我做错什么了?

    更新2:最后,在@user6910411的帮助下,我得到了以下代码:

    val mydf = sqlContext.read.parquet("some.parquet")
    // mydf.count()
    // 63385686 
    mydf.cache
    
    val r = scala.util.Random
    
    import org.apache.spark.sql.functions.udf
    
    val accum = sc.accumulator(1)
    
    def myNextPositiveNumber():String = {
       accum+=1
       accum.value.toString.concat("D")
    }
    
    val myFunction = udf(myNextPositiveNumber _)
    
    val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
    
    myNewDF.select("myNewColumn").count
    
    // 63385686
    

    更新3

    实际代码生成如下数据:

    scala> mydf.select("myNewColumn").show(5,false)
    17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
    +-----------+
    |myNewColumn|
    +-----------+
    |2D         |
    |2D         |
    |2D         |
    |2D         |
    |2D         |
    +-----------+
    only showing top 5 rows
    

    看起来udf函数只调用一次,不是吗?我需要一个新的随机元素。

    更新4@user6910411

    我有这个实际的代码,增加了id,但它没有连接最后的字符,这很奇怪。这是我的代码:

    import org.apache.spark.sql.functions.udf
    
    
    val mydf = sqlContext.read.parquet("some.parquet")
    
    mydf.cache
    
    def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")
    
    val myFunction = udf(myNextPositiveNumber _)
    
    val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))
    
    scala> myNewDF.select("myNewColumn").show(5,false)
    17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
    [rdd_4_0]
    +-----------+
    |myNewColumn|
    +-----------+
    |0          |
    |1          |
    |2          |
    |3          |
    |4          |
    +-----------+
    

    我需要这样的东西:

    +-----------+
    |myNewColumn|
    +-----------+
    |1D         |
    |2D         |
    |3D         |
    |4D         |
    +-----------+
    
    0 回复  |  直到 6 年前
        1
  •  12
  •   user6910411    7 年前

    火花=2.3

    可以使用 asNondeterministic 方法:

    import org.apache.spark.sql.expressions.UserDefinedFunction
    
    val f: UserDefinedFunction = ???
    val fNonDeterministic: UserDefinedFunction = f.asNondeterministic
    

    在使用此选项之前,请确保您理解这些保证。

    火花<2.3

    传递给UDF的函数应该是确定性的(可能除了 SPARK-20586 可以用常量替换零函数调用。如果要生成在内置函数上使用的随机数:

    • rand - 从u[0.0,1.0]生成一个具有独立且相同分布(i.i.d.)样本的随机列。
    • randn - 从标准正态分布中生成具有独立且相同分布(I.I.D.)样本的列。

    并转换输出以获得所需的分布,例如:

    (rand * Integer.MAX_VALUE).cast("bigint").cast("string")
    
        2
  •  0
  •   Avik Aggarwal    7 年前

    你可以利用 monotonically_increasing_id 生成随机值。

    然后,您可以定义一个UDF,在将字符串转换为字符串后将其附加到字符串中。 单调递增 默认情况下返回long。

    scala> var df = Seq(("Ron"), ("John"), ("Steve"), ("Brawn"), ("Rock"), ("Rick")).toDF("names")
    +-----+
    |names|
    +-----+
    |  Ron|
    | John|
    |Steve|
    |Brawn|
    | Rock|
    | Rick|
    +-----+
    
    scala> val appendD = spark.sqlContext.udf.register("appendD", (s: String) => s.concat("D"))
    
    scala> df = df.withColumn("ID",monotonically_increasing_id).selectExpr("names","cast(ID as String) ID").withColumn("ID",appendD($"ID"))
    +-----+---+
    |names| ID|
    +-----+---+
    |  Ron| 0D|
    | John| 1D|
    |Steve| 2D|
    |Brawn| 3D|
    | Rock| 4D|
    | Rick| 5D|
    +-----+---+