-
链包装在事务中
-
JMS事务需要在数据库事务之前在链的末尾提交
-
如果JMS提交失败,则不应执行数据库提交
-
在所有成功的数据库提交之后,需要将消息(及其所有操作)放到输出通道上
问题链如下
<int:poller id="myInputChannelPoller">
<int:transactional transaction-manager="myTransactionManager">
</int:poller>
<int:chain id="myChain" input-channel="myInputChannel">
<int:poller ref="myInputChannelPoller" />
<int:service-activtor id="someMessageManipulation" />
<int:service-activtor id="aDatabaseOperation" />
<int:service-activtor id="sendJmsMessage" />
<int:service-activtor id="anotherMessageManipulation" />
<int:service-activtor id="anotherDatabaseOperation" />
</int:chain>
为了在事务中包装链,我添加了
<int:transactional/>
标记投票人。
在数据库事务之前提交了所有JMS事务,我使用了
ChainedTransactionManager
@Bean
public PlatformTransactionManager myTransactionManager(JpaTransactionManager jpaTransactionManager, JmsTransactionmanager jmsTransactionManager) {
// jpa before jms, since the commits will be done in reverse order
return new ChainedTransactionManager(jpaTransactionManager, jmsTransactionmanager)
}
我不确定如何正确满足的唯一要求是after database commit操作。首先想到的是
transaction-synchronization-factory
机制如中所示
<int:transaction-synchronization-factory id="mySyncFactory">
<int:after-commit channel="myOutputChannel" />
</int:transaction-synchronization-factory>
并将其链接到现有轮询器的事务
<int:poller id="myInputChannelPoller">
<int:transactional transaction-manager="myTransactionManager" synchronization-factory="mySyncFactory">
</int:poller>
另一个想法是添加另一个
service-activator
MyJpaTransactionManagerInjector
MyJpaTransactionManager
.
接收消息和目标输出通道作为输入。创建
ForwardMessage
然后注射到
MyJpaTransactionManager
.
public class MyJpaTransactionManagerInjector {
@Inject
private MyJpaTransactionManager myJpaTransactionManager;
@ServiceActivator
public void injectMessage(Message<?> message, @Header(value = "outputChannel") MessageChannel outputChannel) {
myJpaTransactionManager.inject(new ForwardMessage(message, outputChannel));
}
}
登记
转发消息
在执行提交之前。
public class MyJpaTransactionManager extends JpaTransactionManager {
private final ThreadLocal<List<TransactionSynchronizationAdapter>> synchronizations =
new NamedThreadLocal<>("Transactional resources");
public void inject(TransactionSynchronizationAdapter synchronization) {
if (synchronizations.get() == null) {
synchronizations.set(new ArrayList<>());
}
synchronizations.get().add(synchronization);
}
@Override
protected void prepareForCommit(DefaultTransactionStatus status) {
super.prepareForCommit(status);
if (synchronizations.get() != null) {
synchronizations.get().forEach(TransactionSynchronizationManager::registerSynchronization);
}
synchronizations.remove();
}
}
转发消息
afterCommit()
到目标通道
public class ForwardMessage extends TransactionSynchronizationAdapter {
private Message<?> message;
private MessageChannel outputChannel;
public ForwardMessage(Message<?> message, MessageChannel outputChannel) {
this.message = message;
this.outputChannel = outputChannel;
}
@Override
public void afterCommit() {
outputChannel.send(message);
}
}
只要可能,我都会避免将
TransactionManager
因为很容易破坏一些工作得很好的东西。有没有比这更好的解决方案?