代码之家  ›  专栏  ›  技术社区  ›  zangw

Beam:将数据写入GCP Pub/Sub时,处理卡在step SplittableProcess中

  •  0
  • zangw  · 技术社区  · 5 年前

    我尝试从地面军事系统读取数据,并在数据处理后将数据写入发布/订阅。但是,出现了以下错误

    Processing stuck in step s12/SplittableProcess for at least 05m00s without outputting or completing in state windmill-read at 
    java.lang.StringCoding.decode(StringCoding.java:215) at java.lang.String.<init>(String.java:463) at 
    java.lang.String.<init>(String.java:515) at org.apache.beam.sdk.coders.StringUtf8Coder.readString(StringUtf8Coder.java:61) at 
    org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:100) at org.apache.beam.sdk.coders.StringUtf8Coder.decode(
    StringUtf8Coder.java:90) at org.apache.beam.sdk.coders.StringUtf8Coder.decode(StringUtf8Coder.java:37) at 
    org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:47) at org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(
    MetadataCoder.java:62) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58) at 
    org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36) at 
    org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116) at 
    org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88) at 
    org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) at org.apache.beam.sdk.coders.IterableLikeCoder.decode
    (IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(
    KvCoder.java:82) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) at 
    org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592) at 
    org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529) at 
    org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeTagValue(WindmillStateReader.java:633) at 
    org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeResponse(WindmillStateReader.java:523) at 
    org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:420) at 
    org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:313) at 
    org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillValue.read(WindmillStateInternals.java:385) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(
    SplittableParDoViaKeyedWorkItems.java:375) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.Splittab
    leParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    

    这是我的密码

        PCollection<String> lines = pipeline.apply("readDataFromGCS",
                TextIO.read().from(sourcePath + "/gcspath")
                        .watchForNewFiles(Duration.standardMinutes(3), Watch.Growth.never()));
    
        PCollection<KV<String, Map<String, String>>> filter_event = lines.apply("ParseAndFilterFn", ParDo.of(new ParseAndFilterFn()));
        PCollection<KV<String, Map<String, String>>> minute_window_events = filter_event.apply("MinuteFixwindow",
                Window.<KV<String, Map<String, String>>>into(FixedWindows.of(Duration.standardMinutes(3)))
                        .triggering(AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardMinutes(2)))
                        .discardingFiredPanes()
                        .withAllowedLateness(Duration.standardMinutes(1))
        );
    
        minute_window_events.apply("GroupByUserId", Combine.perKey(new MaxFn()))
                .apply("AssembleUserMsg", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                        .via((KV<String, Map<String, String>> kv) ->
                                KV.of(String.format("userid:%s,level:%s,ts:%s", kv.getKey(), kv.getValue().get("level"), kv.getValue().get("ts")), kv.getValue().get("ts"))))
                .apply("ConvertToSimpleMsg", ParDo.of(new SimpleStamp()))
                .apply("WriteToPubSub",
                        PubsubIO.writeMessages()
                                .to(TOPICSTR)
    
                );
    
    ...
    
        static class MaxFn extends Combine.CombineFn<Map<String, String>, Map<String, String>, Map<String, String>> {
            @Override
            public Map<String, String> createAccumulator() {
                return new HashMap<>();
            }
    
            @Override
            public Map<String, String> addInput(Map<String, String> mutableAccumulator, Map<String, String> input) {
                int level = (int) Float.parseFloat((input.get("level")));
                if (level > (int) Float.parseFloat(mutableAccumulator.getOrDefault("level", "0"))) {
                    mutableAccumulator.put("level", input.get("level"));
                    mutableAccumulator.put("ts", input.get("ts"));
                }
    
                return mutableAccumulator;
            }
    
            @Override
            public Map<String, String> mergeAccumulators(Iterable<Map<String, String>> accumulators) {
                HashMap<String, String> m = new HashMap<>();
                for (Map<String, String> next : accumulators) {
                    String level = m.getOrDefault("level", "0");
                    String next_level = next.getOrDefault("level", "0");
                    if ((int) Float.parseFloat(level) < (int) Float.parseFloat(next_level)) {
                        m.put("level", next.get("level"));
                        m.put("ts", next.get("ts"));
                    }
                }
    
                return m;
            }
    
            @Override
            public Map<String, String> extractOutput(Map<String, String> accumulator) {
                return accumulator;
            }
        }
    

    和数据流中的管道

    enter image description here

    管道好像卡在里面了 GroupByUserId ? 怎么解决这个问题或者我遗漏了什么?

    梁版本:2.16.0

    0 回复  |  直到 5 年前