在我的设置中,我定制了
Guava
s(24.1版)
AsyncEventBus
稍微:
public class PausableAsyncEventBus extends AsyncEventBus implements IPausableEventBus{
private boolean paused = false;
private LinkedList<Object> queuedEvents = new LinkedList<>();
public PausableAsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super(executor, subscriberExceptionHandler);
}
public void pause() {
paused = true;
}
public void resume() {
paused = false;
while (!paused && !queuedEvents.isEmpty()) {
super.post(queuedEvents.removeFirst());
}
}
@Override
public void post(Object event) {
if (!paused) {
super.post(event);
} else {
queuedEvents.add(event);
}
}
}
然后我有一个任务执行一次,发布一个事件。这个
异步事件总线
注射者
Spring
依赖注入:
@Autowired
private AsyncEventBus clientServerEventBus;
public void run() {
log.info("Run CelebrationTask for "+ this, new Exception("Called from"));
// calculate number of guests
CelebrationState state = calculateCelebrationState();
TargetedDialogStateWrapper wrapper = new TargetedDialogStateWrapper(player, state);
clientServerEventBus.post(wrapper);
// update reputation
updateReputation(state);
}
我在post方法上定义了一个AOP方面,因此我可以记录发布的事件。这篇文章的结果如下:
2018-12-27 23:52:53,783 [pool-2-thread-4] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:03,212 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:08,215 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:13,218 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
2018-12-27 23:55:18,303 [pool-2-thread-3] INFO c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
run方法上的日志行只出现一次,所以我知道run方法只调用一次,但是事件在多个线程上被多次发布。这又会导致对同一事件的多个处理。
我的基本部分是这样的:
@Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
public void syncEventBus(){}
@Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
public void asyncEventBus(){}
@Before("syncEventBus() || asyncEventBus()")
public void logEvent(JoinPoint joinPoint) {
Object event = joinPoint.getArgs()[0];
if (!ignore(event)) {
String name = ((EventBus) joinPoint.getTarget()).identifier();
StringBuilder sb = ...
log.info(sb.toString());
}
}
我只是不知道哪里出了问题。这可能是一个方面需要做的事情,也可能是eventbus由Spring代理的事实,或者是完全不同的事情,我想不起来。有什么想法吗?
[编辑:]
请注意,前两个日志条目位于不同的线程上,并且间隔几分钟,下面的所有条目与第二个日志条目位于同一线程上,并且间隔只有几秒。但是,在这些日志行之间,还有其他记录的事件,这些事件是为了简单起见而被删除的。
[编辑2:]
我找到了原因。在handler方法中
@Subscribe
我有:
executor.schedule(() -> clientServerEventBus.post(new TargetedEvent((IHumanPlayer) player, wrappedDialogState)), 5, TimeUnit.SECONDS);
与
wrappedDialogState
作为事件对象。
然后我有另一个方法订阅
TargetedEvent
:
@Subscribe
public void propagate(TargetedEvent message) {
clientServerEventBus.post(message.getEvent());
}
当然,这将调用第一个事件处理程序,它将在5秒后发布一条新消息。
有什么方法可以创建一个体系结构测试来检查这样的事件循环吗?