我正在尝试将数据从第三方API摄取到数据流管道中。因为第三方不提供webhook,所以我编写了一个自定义脚本,不断地轮询它们的端点以获取更多的数据。
数据每15分钟刷新一次,但由于我不想错过任何数据点,而且我想在新数据可用时立即使用,我的“爬虫”每1分钟运行一次。然后脚本将数据发送到PubSub主题。很容易看到PubSub将为源中的每个数据点接收大约15条重复消息。
eventid
),从源代码的[ID+updated\u time]哈希创建。
const attributes = {
eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
timestamp: item.timestamp.toString()
};
const dataBuffer = Buffer.from(JSON.stringify(item))
publisher.publish(dataBuffer, attributes)
然后我用一个
withIdAttribute()
idLabel()
,基于
Record IDs
).
PCollection<String> input = p
.apply("ReadFromPubSub", PubsubIO
.readStrings()
.fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
.withTimestampAttribute("timestamp")
.withIdAttribute("eventid"))
.apply("OutputToBigQuery", ...)
会是一样的,消息会被丢弃。但由于某些原因,我仍然在输出数据集中看到重复项。
-
-
-
我知道数据流上的重复数据消除有10分钟的限制,但即使在第二次插入(2分钟)时也会看到重复数据。
任何帮助将不胜感激!