代码之家  ›  专栏  ›  技术社区  ›  Stella

Apache Beam代码中插入语法错误

  •  0
  • Stella  · 技术社区  · 7 年前

    我正在编写用于测试的Apache Beam代码。请参考下面的代码。我创建了示例simplefunction并应用代码并尝试编译。

        public static void main(String[] args) throws IOException {
            System.out.println("Test Log");
    
            PipelineOptions options = PipelineOptionsFactory.create();
            //options.setRunner(SparkRunner.class);
            options.setRunner(SparkRunner.class);
    
            Pipeline p = Pipeline.create(options);
    
            p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
                // withCompression can be omitted - by default compression is detected from the filename.
                .apply(FileIO.readMatches())
                .apply(MapElements
                    // uses imports from TypeDescriptors
                    .via(
                        new SimpleFunction <ReadableFile, KV<String,String>>() {
    
                            private static final long serialVersionUID = -7867677L;
    
                            @SuppressWarnings("unused")
                            public KV<String,String> createKV(ReadableFile f) {
                                String temp = null;
                                try{
                                temp = f.readFullyAsUTF8String();
                                }catch(IOException e){
    
                                }
                                return KV.of(f.getMetadata().resourceId().toString(), temp);
                            }
    
                            @Override
                            public String apply(ReadableFile element, KV<String, String> input) {
                                StringBuilder str = new StringBuilder();
                                return str.toString();
                            }
                        }
    
                ))
                .apply(FileIO.write());
            p.run();
        }
    

    但编译器正在抛出 syntax error at public String apply(ReadableFile

    我试过但没有成功解决这个问题,有人能指导我吗?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Anton    7 年前

    SimpleFunction<InputT, OutputT> 取的值为 InputT 并返回 OutputT . 签字 apply 在这种情况下是 OutputT apply(InputT input); here .

    在你的情况下,你的类型 SimpleFunction 必须看起来像这样:

    new SimpleFunction <ReadableFile, KV<String,String>>() {
       ...
       @Override
       public KV<String,String> apply(ReadableFile input) {
          ...
       }
    }
    

    例如,看看它是如何使用的 here .

    在你的情况下,你需要更多的逻辑 readMatches() here 例如,如何将其应用于分析avros,以及 this 是的实施细节 PTransform 从那个密码。