我有一个Apache BEAM管道,我想从Google PubSub主题中读取,应用重复数据消除,并在15分钟固定窗口(结束时)将消息发送到另一个PubSub主题。然而,为了让它与重复数据消除一起工作,问题是消息似乎会立即发送到主题,而不是等待15分钟的结束。
即使在申请之后
Window.triggering(AfterWatermark.pastEndOfWindow())
它似乎不起作用(即消息会立即发出)。(参考:
https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/windowing/Window.html
).
寻求我的代码有什么问题的帮助?完整代码如下:
此外,假设重复数据消除以固定窗口为界是正确的,还是我需要单独设置重复数据消除的时域(参考:
https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html
似乎是说它将默认为时域,这将是定义的固定窗口)
pipeline
.apply("Read from source topic", PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC))
.apply("De-duplication ",
Deduplicate.<String>values()
)
.apply(windowDurationMins + " min Window",
Window.<String>into(FixedWindows.of(Duration.standardMinutes(windowDurationMins)))
.triggering(
AfterWatermark.pastEndOfWindow()
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply("Format to JSON", ParDo.of(new DataToJson()))
.apply("Emit to sink topic",
PubsubIO.writeStrings().to(SINK_TOPIC)
);
[更新]
-
删除重复数据消除
-
改为
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
-
从具有时间戳属性的主题中读取:
PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC).withTimestampAttribute("publishTime"))
窗口化似乎需要与每个数据元素相关联的某种时间戳。然而
.withTimestampAttribute("publishTime")
来自PubsubIO的消息似乎不起作用。还有什么我可以尝试为我的数据添加时间戳以进行窗口处理的吗?
[更新2]
尝试基于此引用手动附加时间戳(
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
)如下所示——但它仍然不起作用
.apply("Add timestamp", ParDo.of(new ApplyTimestamp()))
public class ApplyTimestamp extends DoFn<String, String> {
@ProcessElement
public void addTimestamp(ProcessContext context) {
try {
String data = context.element();
Instant timestamp = new Instant();
context.outputWithTimestamp(data, timestamp);
} catch(Exception e) {
LOG.error("Error timestamping", e);
}
}
}
在这一点上,我觉得我要发疯了哈哈。。。