代码之家  ›  专栏  ›  技术社区  ›  Tom

编写UPDATE_BEFORE消息以更新卡夫卡

  •  0
  • Tom  · 技术社区  · 4 年前

    我在阅读 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ .

    上面写着:

    作为接收器,upstart kafka连接器可以使用变更日志流。 它将写入INSERT/UPDATE_AFTER数据作为正常的Kafka消息值, 并将DELETE数据写入具有空值的Kafka消息(指示 键的tombstone)。

    它没有提到,如果UPDATE_BEFORE消息被写入upstart kafka,那么会发生什么?

    在同一个链接中( https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#full-example ),文档提供了一个完整的示例:

    INSERT INTO pageviews_per_region
    SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCT user_id)
    FROM pageviews
    GROUP BY user_region;
    

    通过上述INSERT/SELECT操作,将生成INSERT/UPDATE_BEFORE/UPDATE_AFTER消息,并将其发送到upsert-kafka接收器,我想问当upsert-kafka遇到UPDATE_BEFORE消息时会发生什么。

    0 回复  |  直到 4 年前
        1
  •  1
  •   Ran Lupovich    4 年前

    来自源代码上的注释

            / /   partial code
            // During the Upsert mode during the serialization process, if the operation is executed is Rowkind.delete or Rowkind.Update_before                 
            // set it to NULL (corresponding to Kafka tomb news)
    

    https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=165221669#content/view/165221669

    Upsert kafka sink不需要计划器发送UPDATE_BEFORE消息(在某些情况下,计划器仍然可以发送UPDATE-BEFORE消息),并且会将INSERT/UPDATE_AFTER消息作为带有关键部分的普通kafka记录写入,并将DELETE消息作为带有空值的kafka记录写入(指示密钥的逻辑删除)。Flink将通过主键列值上的分区数据来保证主键上的消息排序。

    Upsert-kafka源是一种变更日志源。变更日志源上的主键语义意味着物化变更日志(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE)在主键约束上是唯一的。Flink假设所有消息都在主键上按顺序排列。

    实施详细信息 由于upstart,kafka连接器只生成不包含UPDATE_BEFORE消息的upstart流。但是,有些操作需要UPDATE_BEFORE消息才能正确处理,例如聚合。因此,我们需要有一个物理节点来实现upstart流,并生成具有完整更改消息的变更日志流。在物理操作符中,我们将使用state来知道密钥是否是第一次被看到。操作员将生成INSERT行,或者为上一个图像额外生成UPDATE_BEFORE行,或者生成DELETE行,其中所有列都填充了值。