代码之家  ›  专栏  ›  技术社区  ›  Himanshu Yadav

如何有效地读写拼花文件?

  •  0
  • Himanshu Yadav  · 技术社区  · 7 年前

    我正在开发一个实用程序,它一次读取多个拼花文件,并将它们写入一个输出文件。 实现非常简单。这个实用程序从目录中读取拼花文件,读取 Group 然后使用ParquetWrite将所有这些组写入一个文件。
    在读取600mb之后,它抛出Java堆空间的内存不足错误。读写500mb的数据也需要15-20分钟。

    有没有办法让这个行动更有效率?

    Read方法如下所示:

    ParquetFileReader reader = new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER);
              ParquetMetadata readFooter = reader.getFooter();
              MessageType schema = readFooter.getFileMetaData().getSchema();
              ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
              reader.close();
              PageReadStore pages = null;
              try {
                while (null != (pages = r.readNextRowGroup())) {
                  long rows = pages.getRowCount();
                  System.out.println("Number of rows: " + pages.getRowCount());
    
                  MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
                  RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
                  for (int i = 0; i < rows; i++) {
                    Group g = (Group) recordReader.read();
                    //printGroup(g);
                    groups.add(g);
                  }
                }
              } finally {
                System.out.println("close the reader");
    
                r.close();
              }
    

    写入方法如下:

    for(Path file : files){
                groups.addAll(readData(file));
            }
    
            System.out.println("Number of groups from the parquet files "+groups.size());
    
            Configuration configuration = new Configuration();
            Map<String, String> meta = new HashMap<String, String>();
            meta.put("startkey", "1");
            meta.put("endkey", "2");
            GroupWriteSupport.setSchema(schema, configuration);
            ParquetWriter<Group> writer = new ParquetWriter<Group>(
                    new Path(outputFile),
                    new GroupWriteSupport(),
                    CompressionCodecName.SNAPPY,
                    2147483647,
                    268435456,
                    134217728,
                    true,
                    false,
                    ParquetProperties.WriterVersion.PARQUET_2_0,
                    configuration);
            System.out.println("Number of groups to write:"+groups.size());
            for(Group g : groups) {
                writer.write(g);
            }
            writer.close();
    
    4 回复  |  直到 7 年前
        1
  •  2
  •   Shafi Rasulov    6 年前

    我使用这些函数来合并拼花文件,但它在Scala中。不管怎样,这可能会给你一个很好的起点。

    import java.util
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.Path
    import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter}
    import org.apache.parquet.hadoop.util.{HadoopInputFile, HadoopOutputFile}
    import org.apache.parquet.schema.MessageType
    
    import scala.collection.JavaConverters._
    
    object ParquetFileMerger {
        def mergeFiles(inputFiles: Seq[Path], outputFile: Path): Unit = {
            val conf = new Configuration()
            val mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles.asJava, conf).getFileMetaData
            val writer = new ParquetFileWriter(conf, mergedMeta.getSchema, outputFile, ParquetFileWriter.Mode.OVERWRITE)
    
            writer.start()
            inputFiles.foreach(input => writer.appendFile(HadoopInputFile.fromPath(input, conf)))
            writer.end(mergedMeta.getKeyValueMetaData)
        }
    
        def mergeBlocks(inputFiles: Seq[Path], outputFile: Path): Unit = {
            val conf = new Configuration()
            val parquetFileReaders = inputFiles.map(getParquetFileReader)
            val mergedSchema: MessageType =
                parquetFileReaders.
                  map(_.getFooter.getFileMetaData.getSchema).
                  reduce((a, b) => a.union(b))
    
            val writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outputFile, conf), mergedSchema, ParquetFileWriter.Mode.OVERWRITE, 64*1024*1024, 8388608)
    
            writer.start()
            parquetFileReaders.foreach(_.appendTo(writer))
            writer.end(new util.HashMap[String, String]())
        }
    
        def getParquetFileReader(file: Path): ParquetFileReader = {
            ParquetFileReader.open(HadoopInputFile.fromPath(file, new Configuration()))
        }
    }
    
    
        2
  •  1
  •   Zoltan    7 年前

    您正在尝试实现的目标已经可以使用 merge 指挥 parquet-tools . 但是,不建议合并小文件,因为它实际上并不合并行组,而是将它们一个接一个地放置(这正是您在问题中描述的方式)。生成的文件可能具有不好的性能特征。

    如果你想自己实现它,你可以 increase the heap size ,或者修改代码,使其在写入新文件之前不会将所有文件都读入内存,而是逐个读取它们(甚至更好,是逐行组读取),并立即将它们写入新文件。这样,您只需要在内存中保留一个文件或行组。

        3
  •  1
  •   ruslangm    6 年前

    我也面临着同样的问题。对于不太大的文件(高达100兆字节),写入时间可能长达20分钟。 尝试使用kite-sdkapi。我知道它看起来像是被遗弃了,但其中有些事情做得非常有效率。如果您喜欢Spring,也可以尝试springdatahadoop(这是kitesdkapi上的某种包装器)。在我的例子中,这个库的使用将写作时间减少到了2分钟。

    final DatasetRepositoryFactory repositoryFactory = new DatasetRepositoryFactory();
    repositoryFactory.setBasePath(basePath);
    repositoryFactory.setConf(configuration);
    repositoryFactory.setNamespace("my-parquet-file");
    
    DatasetDefinition datasetDefinition = new DatasetDefinition(targetClass, true, Formats.PARQUET.getName());
    try (DataStoreWriter<T> writer = new ParquetDatasetStoreWriter<>(clazz, datasetRepositoryFactory, datasetDefinition)) {
         for (T record : records) {
            writer.write(record);
         }
         writer.flush();
    }
    

    当然,您需要向您的项目添加一些依赖项(在我的示例中,这是spring data hadoop):

         <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop</artifactId>
            <version>${spring.hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-boot</artifactId>
            <version>${spring.hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-config</artifactId>
            <version>${spring.hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-store</artifactId>
            <version>${spring.hadoop.version}</version>
        </dependency>
    

        4
  •  0
  •   Ajay Kharade    6 年前

    我已经用Spark和pyspark脚本实现了一些解决方案,下面是相同的示例代码,这里从目录位置加载多个parquet文件,如果parquet文件模式在文件中有点不同,我们也会合并这些文件。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
            .appName("App_name") \
            .getOrCreate() 
    
    dataset_DF = spark.read.option("mergeSchema", "true").load("/dir/parquet_files/")
    
    dataset_DF.write.parquet("file_name.parquet")
    

    希望这是一个简短的解决方案。

    推荐文章