![]() |
1
23
我建议使用方法2的一个稍微改变的变体。 只写入数据库,但除了实际的表写入外,还要将“事件”写入同一数据库中的特殊表;这些事件记录将包含所需的聚合。最简单的方法是,只需插入另一个实体,例如由JPA映射的实体,该实体包含一个带有聚合负载的JSON属性。当然,这可以通过事务侦听器/框架组件的某种方式实现自动化。
这些是帖子 https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/ https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/ |
![]() |
2
11
首先,我不得不说,我不是卡夫卡,也不是Spring专家,但我认为在编写独立资源时,这更像是一个概念上的挑战,解决方案应该适合您的技术堆栈。此外,我应该说,这个解决方案试图在没有像Debezium这样的外部组件的情况下解决这个问题,因为在我看来,每个额外的组件都会给测试、维护和运行应用程序带来挑战,而在选择这样的选项时,这些挑战常常被低估。也不是每个数据库都可以用作Debezium源。 为了确保我们谈论的是相同的目标,让我们用一个简化的航空公司示例来说明情况,在这个示例中,客户可以购买机票。订单成功后,客户将收到由外部消息传递系统(我们必须与之交谈的系统)发送的消息(邮件、推送通知)。
当您将订单存储在数据库中时,您还可以将消息(连同聚合数据)存储在您希望随后发送给Kafka的同一数据库中(例如,作为CLOB列中的JSON)。同样的资源保证,目前一切正常。现在,您需要一种机制来轮询KafkaTasks表,以查找应发送到Kafka主题的新任务(例如,使用计时器服务,可能可以在Spring中使用@Scheduled annotation)。消息成功发送到Kafka后,您可以删除任务条目。这样可以确保只有在订单也成功地存储在应用程序数据库中时,才会向Kafka发送消息。我们是否实现了与使用XA事务时相同的保证?不幸的是,没有,因为仍然有可能写信给卡夫卡工作,但删除任务失败。在这种情况下,重试机制(您需要一个问题中提到的机制)将重新处理任务并发送消息两次。如果您的商业案例对此感到满意,至少有一次可以保证您在这里完成了imho半复杂的解决方案,该解决方案可以作为框架功能轻松实现,因此不是每个人都需要关注细节。 如果只需要一次,则不能将状态存储在应用程序数据库中(在本例中,删除任务就是状态),而是必须将其存储在Kafka中(假设两个Kafka主题之间有ACID保证)。例如:假设表中有100个任务(IDs 1到100),任务作业处理前10个任务。你把你的卡夫卡信息写在他们的主题上,另一条ID为10的信息写在你的主题上。都在同一个卡夫卡交易。在下一个循环中,您使用您的主题(值为10),并使用此值来获取下10个任务(并删除已处理的任务)。 如果有更容易(在应用中)的解决方案与相同的保证我期待着听到你! 很抱歉回答得太长了,但我希望能有所帮助。 |
![]() |
3
2
上面描述的所有方法都是解决问题的最佳方法,并且都是定义良好的模式。您可以在下面提供的链接中探索这些。 模式:事务发件箱 通过将事件或消息保存在数据库的发件箱中,将其作为数据库事务的一部分发布。 http://microservices.io/patterns/data/transactional-outbox.html 模式:轮询发布者 通过轮询数据库中的发件箱来发布消息。 http://microservices.io/patterns/data/polling-publisher.html
通过跟踪事务日志来发布对数据库所做的更改。 http://microservices.io/patterns/data/transaction-log-tailing.html |
![]() |
bohunn · Kafka Streams-无法搜索状态存储 1 年前 |
![]() |
Annu · 使用Spring Kafka添加自定义标题 7 年前 |
![]() |
Alex Kamornikov · 春季云流确定主题卡夫卡消息来源 7 年前 |
|
yokv · 卡夫卡制作人始终重试连接 7 年前 |