代码之家  ›  专栏  ›  技术社区  ›  Nagesh Singh Chauhan

在apache beam中读取csv文件时跳过头

  •  3
  • Nagesh Singh Chauhan  · 技术社区  · 7 年前

    我想跳过csv文件的标题行。从现在起,我将在加载到google存储之前手动删除头。

    以下是我的代码:

    PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
        PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
    
            private static final long serialVersionUID = 1L;
            @ProcessElement
            public void processElement(ProcessContext c) {
                String[] strArr = c.element().split(",");
                ClassFinance fin = new ClassFinance();
                fin.setBeneficiaryFinance(strArr[0]);
                fin.setCatlibCode(strArr[1]);
                fin.set_rNR_(Double.valueOf(strArr[2]));
                fin.set_rNCS_(Double.valueOf(strArr[3]));
                fin.set_rCtb_(Double.valueOf(strArr[4]));
                fin.set_rAC_(Double.valueOf(strArr[5]));
                c.output(fin);
            }
        }));
    

    我已经检查了StackOverflow中的现有问题,但我发现它没有前途: Skipping header rows - is it possible with Cloud DataFlow?

    有什么帮助吗?

    编辑: 我试过下面这样的方法,效果很好:

    PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       
    
        PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
    
            private static final long serialVersionUID = 1L;
            @ProcessElement
            public void processElement(ProcessContext c) {  
                String[] strArr2 = c.element().split(",");
                String header = Arrays.toString(strArr2);
                ClassFinance fin = new ClassFinance();
    
                    if(header.contains("Beneficiary"))
                    System.out.println("Header");
                    else {
                fin.setBeneficiaryFinance(strArr2[0].trim());
                fin.setCatlibCode(strArr2[1].trim());
                fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
                fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
                fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
                fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
                c.output(fin);
                }
            }
        }));
    
    3 回复  |  直到 7 年前
        1
  •  2
  •   dsesto    7 年前

    您共享的旧堆栈溢出日志( Skipping header rows - is it possible with Cloud DataFlow? )包含你问题的答案。

    这个选项是 当前不可用 在apache beam sdk中,虽然 开放功能请求 在apache beam jira问题跟踪程序中, BEAM-123 . 请注意,在撰写本文时,此功能请求仍处于打开状态,尚未解决,并且已经有两年的时间了。不过,从这个意义上讲,似乎正在做一些努力,最新的更新是从2018年2月开始的,所以我建议您继续了解jira问题的最新情况,因为它最后一次被转移到 sdk-java-core 组件,可能会引起更多的注意。

    考虑到这些信息,我想说你正在使用的方法(在将文件上传到gcs之前删除头)是你的最佳选择。我将避免手动操作,因为您可以轻松编写脚本并自动执行 删除页眉 阿维 上传档案 过程。


    编辑:

    我已经能够想出一个简单的过滤器使用 DoFn . 它可能不是最优雅的解决方案(我自己也不是apache be am专家),但它确实可以工作,而且您可以根据自己的需要调整它。它要求您事先知道要上载的csv文件的头(因为它将按元素内容进行筛选),但同样,将此作为模板,您可以根据需要进行修改:

    public class RemoveCSVHeader {
      // The Filter class
      static class FilterCSVHeaderFn extends DoFn<String, String> {
        String headerFilter;
    
        public FilterCSVHeaderFn(String headerFilter) {
          this.headerFilter = headerFilter;
        }
    
        @ProcessElement
        public void processElement(ProcessContext c) {
          String row = c.element();
          // Filter out elements that match the header
          if (!row.equals(this.headerFilter)) {
            c.output(row);
          }
        }
      }
    
      // The main class
      public static void main(String[] args) throws IOException {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);
    
        PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));
    
        String header = "col1,col2,col3,col4";
    
        vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
            .apply(TextIO.write().to("out"));
    
        p.run().waitUntilFinish();
      }
    }
    
        2
  •  2
  •   ankur agrawal    6 年前

    这个密码对我有效。我使用filter.by()从csv文件中筛选出标题行。

    static void run(GcsToDbOptions options) {
    
    Pipeline p = Pipeline.create(options);
    // Read the CSV file from GCS input file path
    p.apply("Read Rows from " + options.getInputFile(), TextIO.read()
        .from(options.getInputFile()))
        // filter the header row
        .apply("Remove header row",
            Filter.by((String row) -> !((row.startsWith("dwid") || row.startsWith("\"dwid\"")
                || row.startsWith("'dwid'")))))
        // write the rows to database using prepared statement
        .apply("Write to Auths Table in Postgres", JdbcIO.<String>write()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource(options)))
            .withStatement(INSERT_INTO_MYTABLE)
            .withPreparedStatementSetter(new StatementSetter()));
    PipelineResult result = p.run();
    try {
      result.getState();
      result.waitUntilFinish();
    } catch (UnsupportedOperationException e) {
      // do nothing
    } catch (Exception e) {
      e.printStackTrace();
    }}
    
        3
  •  -2
  •   Raj Rajen    6 年前

    https://medium.com/@baranitharan/the-textio-write-1be1c07fbef0 这个 TextIO.Write 在数据流中现在有 withHeader 函数向数据添加标题行。这个函数是在verison中添加的 1.7.0 .

    因此,您可以将标题添加到csv中,如下所示:

    TextIO.Write.named("WriteToText")
                .to("/path/to/the/file")
                .withHeader("col_name1,col_name2,col_name3,col_name4")
                .withSuffix(".csv"));
    

    WithHeader函数自动在标题行的末尾添加换行符。