代码之家  ›  专栏  ›  技术社区  ›  jlyh

消息批处理Apache BEAM管道立即触发,而不是在固定窗口之后触发

  •  0
  • jlyh  · 技术社区  · 3 年前

    我有一个Apache BEAM管道,我想从Google PubSub主题中读取,应用重复数据消除,并在15分钟固定窗口(结束时)将消息发送到另一个PubSub主题。然而,为了让它与重复数据消除一起工作,问题是消息似乎会立即发送到主题,而不是等待15分钟的结束。

    即使在申请之后 Window.triggering(AfterWatermark.pastEndOfWindow()) 它似乎不起作用(即消息会立即发出)。(参考: https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/windowing/Window.html ).

    寻求我的代码有什么问题的帮助?完整代码如下:

    此外,假设重复数据消除以固定窗口为界是正确的,还是我需要单独设置重复数据消除的时域(参考: https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html 似乎是说它将默认为时域,这将是定义的固定窗口)

    pipeline
      .apply("Read from source topic", PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC))
      .apply("De-duplication ", 
          Deduplicate.<String>values()
      )
      .apply(windowDurationMins + " min Window", 
          Window.<String>into(FixedWindows.of(Duration.standardMinutes(windowDurationMins)))
             .triggering(
                  AfterWatermark.pastEndOfWindow()
              )
              .withAllowedLateness(Duration.ZERO)
              .discardingFiredPanes()
      )
      .apply("Format to JSON", ParDo.of(new DataToJson()))
      .apply("Emit to sink topic",
          PubsubIO.writeStrings().to(SINK_TOPIC)
      );
    

    [更新]

    • 删除重复数据消除
    • 改为 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
    • 从具有时间戳属性的主题中读取: PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC).withTimestampAttribute("publishTime"))

    窗口化似乎需要与每个数据元素相关联的某种时间戳。然而 .withTimestampAttribute("publishTime") 来自PubsubIO的消息似乎不起作用。还有什么我可以尝试为我的数据添加时间戳以进行窗口处理的吗?

    [更新2] 尝试基于此引用手动附加时间戳( https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements )如下所示——但它仍然不起作用

    .apply("Add timestamp", ParDo.of(new ApplyTimestamp()))

    public class ApplyTimestamp extends DoFn<String, String> {
        @ProcessElement
        public void addTimestamp(ProcessContext context) {
            try {
                String data = context.element();
                Instant timestamp = new Instant();
                context.outputWithTimestamp(data, timestamp);
            } catch(Exception e) {
                LOG.error("Error timestamping", e);
            }
        }
    }
    

    在这一点上,我觉得我要发疯了哈哈。。。

    0 回复  |  直到 3 年前
        1
  •  0
  •   ningk    3 年前

    在从源代码读取后立即打开窗口和重复数据消除逻辑之间需要进行GBK转换。窗口应用于 下一个 GroupByKey,包括复合变换中的一个。GBK将通过组合键和窗口对元素进行分组。另外,请注意,默认触发器已经是AfterWatermark,允许延迟为0