代码之家  ›  专栏  ›  技术社区  ›  wegenmic

事务提交的顺序正确,提交后与Spring集成挂钩

  •  3
  • wegenmic  · 技术社区  · 7 年前

    • 链包装在事务中
    • JMS事务需要在数据库事务之前在链的末尾提交
    • 如果JMS提交失败,则不应执行数据库提交
    • 在所有成功的数据库提交之后,需要将消息(及其所有操作)放到输出通道上

    问题链如下

    <int:poller id="myInputChannelPoller">
      <int:transactional transaction-manager="myTransactionManager">
    </int:poller>
    
    <int:chain id="myChain" input-channel="myInputChannel">
      <int:poller ref="myInputChannelPoller" />
      <int:service-activtor id="someMessageManipulation" />
      <int:service-activtor id="aDatabaseOperation" />
      <int:service-activtor id="sendJmsMessage" />
      <int:service-activtor id="anotherMessageManipulation" />
      <int:service-activtor id="anotherDatabaseOperation" />
    </int:chain>
    

    为了在事务中包装链,我添加了 <int:transactional/> 标记投票人。

    在数据库事务之前提交了所有JMS事务,我使用了 ChainedTransactionManager

    @Bean
    public PlatformTransactionManager myTransactionManager(JpaTransactionManager jpaTransactionManager, JmsTransactionmanager jmsTransactionManager) {
            // jpa before jms, since the commits will be done in reverse order
            return new ChainedTransactionManager(jpaTransactionManager, jmsTransactionmanager)
    }
    

    我不确定如何正确满足的唯一要求是after database commit操作。首先想到的是 transaction-synchronization-factory 机制如中所示

    <int:transaction-synchronization-factory id="mySyncFactory">
      <int:after-commit channel="myOutputChannel" />
    </int:transaction-synchronization-factory>
    

    并将其链接到现有轮询器的事务

    <int:poller id="myInputChannelPoller">
      <int:transactional transaction-manager="myTransactionManager" synchronization-factory="mySyncFactory">
    </int:poller>
    

    另一个想法是添加另一个 service-activator MyJpaTransactionManagerInjector MyJpaTransactionManager .

    接收消息和目标输出通道作为输入。创建 ForwardMessage 然后注射到 MyJpaTransactionManager .

    public class MyJpaTransactionManagerInjector {
    
        @Inject
        private MyJpaTransactionManager myJpaTransactionManager;
    
        @ServiceActivator
        public void injectMessage(Message<?> message, @Header(value = "outputChannel") MessageChannel outputChannel) {
            myJpaTransactionManager.inject(new ForwardMessage(message, outputChannel));
        }
    }
    

    登记 转发消息 在执行提交之前。

    public class MyJpaTransactionManager extends JpaTransactionManager {
    
        private final ThreadLocal<List<TransactionSynchronizationAdapter>> synchronizations =
                new NamedThreadLocal<>("Transactional resources");
    
        public void inject(TransactionSynchronizationAdapter synchronization) {
            if (synchronizations.get() == null) {
                synchronizations.set(new ArrayList<>());
            }
            synchronizations.get().add(synchronization);
        }
    
        @Override
        protected void prepareForCommit(DefaultTransactionStatus status) {            
            super.prepareForCommit(status);
            if (synchronizations.get() != null) {
                synchronizations.get().forEach(TransactionSynchronizationManager::registerSynchronization);
            }
            synchronizations.remove();
        }
    }
    

    转发消息 afterCommit() 到目标通道

    public class ForwardMessage extends TransactionSynchronizationAdapter {
    
        private Message<?> message;
        private MessageChannel outputChannel;
    
        public ForwardMessage(Message<?> message, MessageChannel outputChannel) {
            this.message = message;
            this.outputChannel = outputChannel;
        }
    
        @Override
        public void afterCommit() {
            outputChannel.send(message);
        }
    }
    

    只要可能,我都会避免将 TransactionManager 因为很容易破坏一些工作得很好的东西。有没有比这更好的解决方案?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Artem Bilan    7 年前

    我还是会坚持 <int:transaction-synchronization-factory> .

    PollingConsumer 注册此项:

    TransactionSynchronizationManager.bindResource(resource,
                                    integrationSynchronization.getResourceHolder());
    

    进入交易。

    哪里 resource 是一个 MessageChannel 对于消费者。对你来说是这样的 myInputChannel

    您可以使用以下内容:

    IntegrationResourceHolder integrationResourceHolder = (IntegrationResourceHolder) TransactionSynchronizationManager.getResource(myInputChannel);
    

    并称其为 addAttribute() 要在处理后存储消息,请执行以下操作:

    integrationResourceHolder.addAttribute("resultMessage", resultMessage);
    

    之后你仍然可以使用它 <int:after-commit channel="myOutputChannel" /> 并通过适当的SpEL变量访问该属性:

    <int:after-commit channel="myOutputChannel"  expression="#resultMessage"/>