代码之家  ›  专栏  ›  技术社区  ›  baju

如何用Scala优雅地处理Spark中的Avro

  •  1
  • baju  · 技术社区  · 11 年前

    我正在Scala中做spark工作,从avro文件中读取数据。 开始很简单:

    val path = "hdfs:///path/to/your/avro/folder"
    val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
    

    但后来它并不优雅,因为我需要对元组ie进行操作。

    avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])).
    map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))).
    reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)).
    map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(","))
    ...
    

    我在考虑使用Map而不是tuple,但如果我有几个不同的类型,即Long和String Map[String,Any] 并在每一次操作中铸造。 即

    avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])).
    map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
    ...
    

    另一种解决方案是使用case类并将值包装到其中,但有时这可能会导致大量case类定义,例如:

    case class TestClass(value: Long, level:Double, size:Long, category:String)
    
    avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))).
    map(x => (asDate(x._1),x._2)).
    reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)).
    map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(","))
    ...
    

    我想知道,在这种情况下,是否有更好的方法来处理通用记录——这样,您就不需要不断地转换为特定类型,并且可以对字段的名称进行操作。类似于命名元组的东西可以完成这项工作。

    你知道更好的方法吗?

    你如何处理此类案件?

    1 回复  |  直到 11 年前
        1
  •  2
  •   Alexey Romanov    11 年前

    使用模式匹配:

    map { case (value, startTime, level, size, category) => 
      (asDate(startTime), (value,level,size,category)) 
    }.reduceByKey { case ((value1, level1, size1, category1), (value2, level2, size2, category2)) => 
      (value1+value2, level1+level2, size1+size2, category2) 
    }.map { case (startTime, (value, level, size, category)) => 
      List(startTime, value, level, size, category).mkString(","))
    }
    

    如果您有一些经常被重用的元组,请使用它们的case类。

    推荐文章