代码之家  ›  专栏  ›  技术社区  ›  Yoni Gibbs

带有Kafka Connect ElasticSearch连接器的消息顺序

  •  0
  • Yoni Gibbs  · 技术社区  · 6 年前

    我们在执行来自Kafka主题的消息通过Kafka Connect ElasticSearch连接器发送到ElasticSearch的顺序时遇到问题。在主题中,消息的顺序是正确的,偏移量是正确的,但是如果有两条消息具有快速连续创建的相同ID,则它们会以错误的顺序间歇性地发送到ElasticSearch。这将导致ElasticSearch具有来自第二条最后消息的数据,而不是来自最后一条消息的数据。如果我们在主题中的两条消息之间添加一到两秒的人为延迟,问题就消失了。

    文件 here 国家:

    使用分区级别可以确保文档级别的更新顺序 卡夫卡偏移量作为文档版本,并使用 version_mode=external .

    但是我在任何地方都找不到关于这个的任何文档 version_mode 设置,以及我们是否需要将自己设置到某个地方。

    在来自Kafka Connect系统的日志文件中,我们可以看到两条消息(对于相同的ID)以错误的顺序处理,间隔几毫秒。看起来像是在不同的线程中处理的,这可能很重要。还要注意,这个主题只有一个分区,所以所有消息都在同一个分区中。

    下面是日志片段,为了清晰起见,稍微编辑了一下。Kafka主题中的消息由Debezium填充,我认为这与问题无关,但很容易包含时间戳值。这表明消息的处理顺序是错误的(尽管它们在Kafka主题中的顺序是正确的,由Debezium填充):

    [2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
    {
      "op": "u",
      "before": {
        "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
        ... << DATA FROM BEFORE SECOND UPDATE >> ...
      },
      "after": {
        "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
        ... << DATA FROM AFTER SECOND UPDATE >> ...
      },
      "source": { ... },
      "ts_ms": 1547716205205
    }
    " (org.apache.http.wire)
    
    ...
    
    [2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
    {
      "op": "u",
      "before": {
        "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
        ... << DATA FROM BEFORE FIRST UPDATE >> ...
      },
      "after": {
        "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
        ... << DATA FROM AFTER FIRST UPDATE >> ...
      },
      "source": { ... },
      "ts_ms": 1547716204190
    }
    " (org.apache.http.wire)
    
    

    在将消息发送到ElasticSearch时,是否有人知道如何强制此连接器维护给定文档ID的消息顺序?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Steven Frew    6 年前

    问题是我们的ElasticSearch连接器 key.ignore 配置设置为 true .

    我们在Github源中发现了这一行用于连接器的 DataConverter.java ):

    final Long version = ignoreKey ? null : record.kafkaOffset();
    

    这意味着, key.ignore=true 正在生成并发送到ElasticSearch的索引操作实际上是“无版本”的…基本上,ElasticSearch为文档接收的最后一组数据将取代任何以前的数据,即使它是“旧数据”。

    从日志文件来看,连接器似乎有几个使用者线程读取源主题,然后将转换后的消息传递给ElasticSearch,但是它们传递给ElasticSearch的顺序不一定与主题顺序相同。

    使用 key.ignore=false ,每个ElasticSearch消息现在包含一个等于kafka记录偏移量的版本值,如果ElasticSearch已经接收到更高版本的数据,则它拒绝更新文档的索引数据。

    那不是 只有 修复这个的东西。我们仍然需要对来自Kafka主题的Debezium消息进行转换,以将密钥转换为ElasticSearch满意的纯文本格式:

    "transforms": "ExtractKey",
    "transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.ExtractKey.field": "id"