代码之家  ›  专栏  ›  技术社区  ›  Fernando Aguilar

在批处理执行中提取聚合器值

  •  0
  • Fernando Aguilar  · 技术社区  · 9 年前

    在Dataflow批处理执行之后,是否有任何方法可以通过编程提取聚合器的最终值?

    基于 DirectePipelineRunner

    如果有帮助的话,我假设聚合器是基于Long值的,带有一个和组合函数。

    public static Map<String, Object> extractAllCounters(Pipeline p, PipelineResult pr)
    {
        AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(p);
        Map<String, Object> results = new HashMap<>();
    
        for (Map.Entry<Aggregator<?, ?>, Collection<PTransform<?, ?>>> e :
                aggregatorExtractor.getAggregatorSteps().entrySet()) {
            Aggregator agg = e.getKey();
            try {
                results.put(agg.getName(), pr.getAggregatorValues(agg).getTotalValue(agg.getCombineFn()));
            } catch(AggregatorRetrievalException|IllegalArgumentException aggEx) {
                //System.err.println("Can't extract " + agg.getName() + ": " + aggEx.getMessage());
            }
        }
    
        return results;
    }
    
    1 回复  |  直到 9 年前
        1
  •  2
  •   Ben Chambers    9 年前

    聚合器的值应在 PipelineResult 。例如:

    CountOddsFn countOdds = new CountOddsFn();
    pipeline
      .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
      .apply(ParDo.of(countOdds));
    PipelineResult result = pipeline.run();
    // Here you may need to use the BlockingDataflowPipelineRunner 
    
    AggregatorValues<Integer> values =
    result.getAggregatorValues(countOdds.aggregator);
    Map<String, Integer> valuesAtSteps = values.getValuesAtSteps();
    // Now read the values from the step...
    

    实例 DoFn 报告聚合器的:

    private static class CountOddsFn extends DoFn<Integer, Void> {
    
      Aggregator<Integer, Integer> aggregator =
        createAggregator("odds", new SumIntegerFn());
    
      @Override
      public void processElement(ProcessContext c) throws Exception {
        if (c.element() % 2 == 1) {
          aggregator.addValue(1);
        }
      }
    }