代码之家  ›  专栏  ›  技术社区  ›  Robo

Akka ActorSystem处理同一事件两次

  •  1
  • Robo  · 技术社区  · 8 年前

    我们有一个scala-akka-actorsystem设置,每天处理数百万个事件,历史上我们很少发现有1到2个事件被处理了两次,但最近重复的事件在某些天增加到了100个左右。

    我们的设置简化如下:

    // EventJob runs once every 10 seconds
    class EventJob extends Actor {
      val EventListnerPoolOfActors = ActorSystem().actorOf(
        RoundRobinPool(10)
          .props(Props(classOf[EventHandler])),
        "InjectorActorID"
      )
    
      override def preStart(): Unit = {
        self ! ReceivedJobStart()
      }
    
      def receive: Actor.Receive = {
        case ReceivedJobStart() =>
          doWork()
          context.system.scheduler.scheduleOnce(10, self, ReceivedJobStart())
      }
    
      def doWork(): Future[Unit] = {
        // returns Future[Seq[Event]]
        getUnprocessEvents().map { x =>
          {
            // pass each Event to an EventHandler Actor to process
            for (a <- 0 to x.size) {
              EventListnerPoolOfActors ! x(a)
            }
          }
        }
      }
    }
    
    class EventHandler extends Actor {
      def receive = {
        ...
      }
    }
    

    每个事件都有一个唯一的id,在我们的日志中,它显示某个事件在相互之间的毫秒内被处理了两次(转到eventhandler.receive)。所有演员都是本地演员。

    假设默认的消息传递可靠性是最多一次的,那么为什么越来越多的消息似乎被传递了不止一次,并且如何减少这个问题呢?

    我们的系统是用来处理重复的,我们只是不知道为什么它最近似乎在增加,并希望减少它。

    1 回复  |  直到 8 年前
        1
  •  2
  •   Jeffrey Chung    8 年前

    假设您的系统不会产生具有相同id的重复工作单元,则系统有时处理一条消息不止一次的可能原因是事件被分发到 EventHandler actors;它与akka的消息传递保证无关。

    想想你的 getUnprocessEvents() 方法。它返回一个 Future[Seq[Event]] 并且在参与者的常规消息处理之外运行,并且不能保证 m 已从中删除 Seq 之前 GetUnprocessEvents() 再次调用。信息被推给工人,而不考虑他们是否可以做更多的工作。工作进程可能仍在处理消息 在随后的通话中 GetUnprocessEvents() ,在这种情况下 再次发送到工作人员的邮箱。使用调度程序周期性地调用此方法(即指定一个时间窗口,以便为工作参与者提供足够的时间来处理其消息)是协调工作的一种有缺陷的方法。

    更好的方法是使工作队列成为工作协调器actor状态的一部分(即,使队列成为actor中的一个内部变量,并通过actor消息改变队列),并使用 work pulling pattern . 另外,考虑使用 Akka Streams .

    作为旁注, EventJob 创建新的 ActorSystem :

    val EventListnerPoolOfActors = ActorSystem().actorOf(...)
    

    应该只有一个 Actor系统 每个应用程序。使用 context 相反:

    val EventListnerPoolOfActors = context.actorOf(...)
    
    推荐文章