代码之家  ›  专栏  ›  技术社区  ›  hotzst

在AsyncEventbus上多次发布事件

  •  0
  • hotzst  · 技术社区  · 6 年前

    在我的设置中,我定制了 Guava s(24.1版) AsyncEventBus 稍微:

    public class PausableAsyncEventBus extends AsyncEventBus implements IPausableEventBus{
        private boolean paused = false;
        private LinkedList<Object> queuedEvents = new LinkedList<>();
    
        public PausableAsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
            super(executor, subscriberExceptionHandler);
        }
        public void pause() {
            paused = true;
        }
        public void resume() {
            paused = false;
            while (!paused && !queuedEvents.isEmpty()) {
                super.post(queuedEvents.removeFirst());
            }
        }
    
        @Override
        public void post(Object event) {
            if (!paused) {
                super.post(event);
            } else {
                queuedEvents.add(event);
            }
        }
    }
    

    然后我有一个任务执行一次,发布一个事件。这个 异步事件总线 注射者 Spring 依赖注入:

    @Autowired
    private AsyncEventBus clientServerEventBus;
    
    public void run() {
        log.info("Run CelebrationTask for "+ this, new Exception("Called from"));
        // calculate number of guests
        CelebrationState state = calculateCelebrationState();
        TargetedDialogStateWrapper wrapper = new TargetedDialogStateWrapper(player, state);
        clientServerEventBus.post(wrapper);
        // update reputation
        updateReputation(state);
    }
    

    我在post方法上定义了一个AOP方面,因此我可以记录发布的事件。这篇文章的结果如下:

    2018-12-27 23:52:53,783 [pool-2-thread-4] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
    2018-12-27 23:55:03,212 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
    2018-12-27 23:55:08,215 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
    2018-12-27 23:55:13,218 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
    2018-12-27 23:55:18,303 [pool-2-thread-3] INFO  c.s.g.o.c.l.EventBusAspect : Posted event on event bus 'PausableAsyncEventBus (default)' ch.sahits.game.openpatrician.model.ui.TargetedDialogStateWrapper 35645f14-e0d9-40f0-aa1e-e1c6f99e8d43: Thomas Pfeffersack from ch.sahits.game.openpatrician.clientserverinterface.model.task.CelebrationTask.run
    

    run方法上的日志行只出现一次,所以我知道run方法只调用一次,但是事件在多个线程上被多次发布。这又会导致对同一事件的多个处理。

    我的基本部分是这样的:

    @Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
    public void syncEventBus(){}
    
    @Pointcut("execution(public void com.google.common.eventbus.EventBus.post(Object))")
    public void asyncEventBus(){}
    
    @Before("syncEventBus() || asyncEventBus()")
    public void logEvent(JoinPoint joinPoint) {
        Object event = joinPoint.getArgs()[0];
        if (!ignore(event)) {
            String name = ((EventBus) joinPoint.getTarget()).identifier();
            StringBuilder sb = ...
            log.info(sb.toString());
        }
    }
    

    我只是不知道哪里出了问题。这可能是一个方面需要做的事情,也可能是eventbus由Spring代理的事实,或者是完全不同的事情,我想不起来。有什么想法吗?

    [编辑:] 请注意,前两个日志条目位于不同的线程上,并且间隔几分钟,下面的所有条目与第二个日志条目位于同一线程上,并且间隔只有几秒。但是,在这些日志行之间,还有其他记录的事件,这些事件是为了简单起见而被删除的。

    [编辑2:]

    我找到了原因。在handler方法中 @Subscribe 我有:

    executor.schedule(() -> clientServerEventBus.post(new TargetedEvent((IHumanPlayer) player, wrappedDialogState)), 5, TimeUnit.SECONDS);
    

    wrappedDialogState 作为事件对象。 然后我有另一个方法订阅 TargetedEvent :

    @Subscribe
    public void propagate(TargetedEvent message) {
        clientServerEventBus.post(message.getEvent());
    }
    

    当然,这将调用第一个事件处理程序,它将在5秒后发布一条新消息。

    有什么方法可以创建一个体系结构测试来检查这样的事件循环吗?

    0 回复  |  直到 6 年前