代码之家  ›  专栏  ›  技术社区  ›  tyron

在Apache Beam/Dataflow输入上重复,即使在使用withIdAttribute时也是如此

  •  2
  • tyron  · 技术社区  · 7 年前

    我正在尝试将数据从第三方API摄取到数据流管道中。因为第三方不提供webhook,所以我编写了一个自定义脚本,不断地轮询它们的端点以获取更多的数据。

    数据每15分钟刷新一次,但由于我不想错过任何数据点,而且我想在新数据可用时立即使用,我的“爬虫”每1分钟运行一次。然后脚本将数据发送到PubSub主题。很容易看到PubSub将为源中的每个数据点接收大约15条重复消息。

    eventid ),从源代码的[ID+updated\u time]哈希创建。

    const attributes = {
             eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
             timestamp: item.timestamp.toString()
          };
    const dataBuffer = Buffer.from(JSON.stringify(item))
    publisher.publish(dataBuffer, attributes)
    

    然后我用一个 withIdAttribute() idLabel() ,基于 Record IDs ).

    PCollection<String> input = p
        .apply("ReadFromPubSub", PubsubIO
           .readStrings()
           .fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
           .withTimestampAttribute("timestamp")
           .withIdAttribute("eventid"))
       .apply("OutputToBigQuery", ...)
    

    会是一样的,消息会被丢弃。但由于某些原因,我仍然在输出数据集中看到重复项。

      • 我知道数据流上的重复数据消除有10分钟的限制,但即使在第二次插入(2分钟)时也会看到重复数据。

    任何帮助将不胜感激!

    1 回复  |  直到 7 年前
        1
  •  0
  •   Nathan Nasser    7 年前

    我认为你的思路是对的,而不是我建议使用时间戳的散列。更好的方法是使用windows。回顾一下这个 document 它过滤窗口外的数据。

    关于额外的重复数据,如果您使用请求订阅,并且在处理数据之前到达了确认截止日期,则将根据重新发送消息 at-least-once delivery

    推荐文章