我正在Scala中做spark工作,从avro文件中读取数据。
开始很简单:
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但后来它并不优雅,因为我需要对元组ie进行操作。
avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])).
map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))).
reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)).
map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(","))
...
我在考虑使用Map而不是tuple,但如果我有几个不同的类型,即Long和String
Map[String,Any]
并在每一次操作中铸造。
即
avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])).
map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
...
另一种解决方案是使用case类并将值包装到其中,但有时这可能会导致大量case类定义,例如:
case class TestClass(value: Long, level:Double, size:Long, category:String)
avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
map(x => (asDate(x._1),x._2)).
reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)).
map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(","))
...
我想知道,在这种情况下,是否有更好的方法来处理通用记录——这样,您就不需要不断地转换为特定类型,并且可以对字段的名称进行操作。类似于命名元组的东西可以完成这项工作。
你知道更好的方法吗?
你如何处理此类案件?