我有一个带拼花文件的数据框,我必须添加一个包含一些随机数据的新列,但我需要这些随机数据彼此不同。这是我的实际代码,当前版本的Spark是1.5.1-CDH-5.5.2:
val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache
val r = scala.util.Random
import org.apache.spark.sql.functions.udf
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
有了这个代码,我就有了这个数据:
scala> myNewDF.select("myNewColumn").show(10,false)
+-----------+
|myNewColumn|
+-----------+
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
+-----------+
看起来udf mynextpositivenumber只调用一次,不是吗?
更新
确认,只有一个不同的值:
scala> myNewDF.select("myNewColumn").distinct.show(50,false)
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
...
+-----------+
|myNewColumn|
+-----------+
|889488717D |
+-----------+
我做错什么了?
更新2:最后,在@user6910411的帮助下,我得到了以下代码:
val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686
mydf.cache
val r = scala.util.Random
import org.apache.spark.sql.functions.udf
val accum = sc.accumulator(1)
def myNextPositiveNumber():String = {
accum+=1
accum.value.toString.concat("D")
}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))
myNewDF.select("myNewColumn").count
// 63385686
更新3
实际代码生成如下数据:
scala> mydf.select("myNewColumn").show(5,false)
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-----------+
|myNewColumn|
+-----------+
|2D |
|2D |
|2D |
|2D |
|2D |
+-----------+
only showing top 5 rows
看起来udf函数只调用一次,不是吗?我需要一个新的随机元素。
更新4@user6910411
我有这个实际的代码,增加了id,但它没有连接最后的字符,这很奇怪。这是我的代码:
import org.apache.spark.sql.functions.udf
val mydf = sqlContext.read.parquet("some.parquet")
mydf.cache
def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))
scala> myNewDF.select("myNewColumn").show(5,false)
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_4_0]
+-----------+
|myNewColumn|
+-----------+
|0 |
|1 |
|2 |
|3 |
|4 |
+-----------+
我需要这样的东西:
+-----------+
|myNewColumn|
+-----------+
|1D |
|2D |
|3D |
|4D |
+-----------+