代码之家  ›  专栏  ›  技术社区  ›  Yoni Gibbs

在数据库和Kafka producer之间同步事务

  •  6
  • Yoni Gibbs  · 技术社区  · 6 年前

    我们正在考虑使用 spring-kafka (在一个springbootwebflux服务中),我可以看到它有一个 KafkaTransactionManager 但据我所知,这更多的是关于卡夫卡事务本身(确保卡夫卡生产者和消费者之间的一致性),而不是跨两个系统同步事务(参见 here reactive-pg-client ,因此手动处理事务,而不是使用Springs框架。)

    我能想到的一些选择:

    1. 不要将数据写入数据库:只将其写入卡夫卡。然后使用使用者(在服务a中)更新数据库。这似乎不是最有效的,并且会有问题,因为用户调用的服务不能立即看到它应该刚刚创建的数据库更改。
    2. 不要直接写卡夫卡:只写数据库,然后使用 Debezium 向卡夫卡报告这一变化。这里的问题是,更改是基于单个数据库记录的,而要存储在Kafka中的业务重要事件可能涉及来自多个表的数据的组合。
    3. 首先写入数据库(如果失败,什么也不做,只是抛出异常)。然后,在给卡夫卡写信时,假设写操作可能会失败。使用内置的自动重试功能,让它继续尝试一段时间。如果最终完全失败,请尝试写入死信队列,并为管理员创建某种手动机制来解决问题。如果写入DLQ失败(即Kafka完全关闭),只需以其他方式将其记录(例如,记录到数据库),然后再次创建某种手动机制供管理员进行排序。

    有没有人对上述问题有什么想法或建议,或者有没有人能够纠正我上述假设中的任何错误?

    提前谢谢!

    2 回复  |  直到 6 年前
        1
  •  23
  •   Yan Khonski    6 年前

    我建议使用方法2的一个稍微改变的变体。

    只写入数据库,但除了实际的表写入外,还要将“事件”写入同一数据库中的特殊表;这些事件记录将包含所需的聚合。最简单的方法是,只需插入另一个实体,例如由JPA映射的实体,该实体包含一个带有聚合负载的JSON属性。当然,这可以通过事务侦听器/框架组件的某种方式实现自动化。

    这些是帖子

    https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/

    https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

        2
  •  11
  •   Jonas    6 年前

    首先,我不得不说,我不是卡夫卡,也不是Spring专家,但我认为在编写独立资源时,这更像是一个概念上的挑战,解决方案应该适合您的技术堆栈。此外,我应该说,这个解决方案试图在没有像Debezium这样的外部组件的情况下解决这个问题,因为在我看来,每个额外的组件都会给测试、维护和运行应用程序带来挑战,而在选择这样的选项时,这些挑战常常被低估。也不是每个数据库都可以用作Debezium源。

    为了确保我们谈论的是相同的目标,让我们用一个简化的航空公司示例来说明情况,在这个示例中,客户可以购买机票。订单成功后,客户将收到由外部消息传递系统(我们必须与之交谈的系统)发送的消息(邮件、推送通知)。

    当您将订单存储在数据库中时,您还可以将消息(连同聚合数据)存储在您希望随后发送给Kafka的同一数据库中(例如,作为CLOB列中的JSON)。同样的资源保证,目前一切正常。现在,您需要一种机制来轮询KafkaTasks表,以查找应发送到Kafka主题的新任务(例如,使用计时器服务,可能可以在Spring中使用@Scheduled annotation)。消息成功发送到Kafka后,您可以删除任务条目。这样可以确保只有在订单也成功地存储在应用程序数据库中时,才会向Kafka发送消息。我们是否实现了与使用XA事务时相同的保证?不幸的是,没有,因为仍然有可能写信给卡夫卡工作,但删除任务失败。在这种情况下,重试机制(您需要一个问题中提到的机制)将重新处理任务并发送消息两次。如果您的商业案例对此感到满意,至少有一次可以保证您在这里完成了imho半复杂的解决方案,该解决方案可以作为框架功能轻松实现,因此不是每个人都需要关注细节。

    如果只需要一次,则不能将状态存储在应用程序数据库中(在本例中,删除任务就是状态),而是必须将其存储在Kafka中(假设两个Kafka主题之间有ACID保证)。例如:假设表中有100个任务(IDs 1到100),任务作业处理前10个任务。你把你的卡夫卡信息写在他们的主题上,另一条ID为10的信息写在你的主题上。都在同一个卡夫卡交易。在下一个循环中,您使用您的主题(值为10),并使用此值来获取下10个任务(并删除已处理的任务)。

    如果有更容易(在应用中)的解决方案与相同的保证我期待着听到你!

    很抱歉回答得太长了,但我希望能有所帮助。

        3
  •  2
  •   user3107673    4 年前

    上面描述的所有方法都是解决问题的最佳方法,并且都是定义良好的模式。您可以在下面提供的链接中探索这些。

    模式:事务发件箱

    通过将事件或消息保存在数据库的发件箱中,将其作为数据库事务的一部分发布。 http://microservices.io/patterns/data/transactional-outbox.html

    模式:轮询发布者

    通过轮询数据库中的发件箱来发布消息。 http://microservices.io/patterns/data/polling-publisher.html

    通过跟踪事务日志来发布对数据库所做的更改。 http://microservices.io/patterns/data/transaction-log-tailing.html