代码之家  ›  专栏  ›  技术社区  ›  Kevin

Akka:在邮箱中保留不匹配的邮件

  •  0
  • Kevin  · 技术社区  · 6 年前

    我熟悉erlang/elixir,其中流程邮箱中的邮件在匹配之前一直保留在邮箱中:

    模式 Pattern 与邮箱中第一封邮件按时间顺序顺序进行顺序匹配,然后与第二封邮件进行顺序匹配,依此类推。如果匹配成功并且可选的保护序列 GuardSeq 是真的,相应的 Body 被评估。匹配的邮件将被使用,即从邮箱中删除,而邮箱中的任何其他邮件将保持不变。

    ( http://erlang.org/doc/reference_manual/expressions.html#receive )

    但是,与akka actors不匹配的消息将从邮箱中删除。 例如,在就餐哲学家模拟中实现forks时,这很烦人:

    import akka.actor._
    
    object Fork {
      def props(id: Int): Props = Props(new Fork(id))
      final case class Take(philosopher: Int)
      final case class Release(philosopher: Int)
      final case class TookFork(fork: Int)
      final case class ReleasedFork(fork: Int)
    }
    
    class Fork(val id: Int) extends Actor {
      import Fork._
    
      object Status extends Enumeration {
        val FREE, TAKEN = Value
      }
    
      private var _status: Status.Value = Status.FREE
      private var _held_by: Int = -1
    
      def receive = {
        case Take(philosopher) if _status == Status.FREE => {
          println(s"\tPhilosopher $philosopher takes fork $id.")
          take(philosopher)
          sender() ! TookFork(id)
          context.become(taken, false)
        }
        case Release(philosopher) if _status == Status.TAKEN && _held_by == philosopher => {
          println(s"\tPhilosopher $philosopher puts down fork $id.")
          release()
          sender() ! ReleasedFork(id)
          context.unbecome()
        }
      }
    
      def take(philosopher: Int) = {
        _status  = Status.TAKEN
        _held_by = philosopher
      }
    
      def release() = {
        _status  = Status.FREE
        _held_by = -1
      }
    }
    

    当A Take(<philosopher>) 消息被发送到fork, 我们希望邮件保留在邮箱中,直到释放分叉并匹配邮件。然而,在阿克卡 接受(哲学家) 如果当前使用fork,则邮件将从邮箱中删除,因为没有匹配项。

    目前,我通过重写 unhandled fork actor的方法并再次将消息转发到fork:

    override def unhandled(message: Any): Unit = {
      self forward message
    }
    

    我相信这是非常低效的,因为它一直将消息发送到fork,直到它被匹配。有没有另一种方法来解决这个问题,不涉及不断转发不匹配的消息?

    我相信在最坏的情况下,我必须实现一个模仿erlang邮箱的自定义邮箱类型,如下所述: http://ndpar.blogspot.com/2010/11/erlang-explained-selective-receive.html


    编辑:我根据tim的建议修改了我的实现,并按照建议使用了stash特性。我的 Fork 演员现在看起来如下:

    class Fork(val id: Int) extends Actor with Stash {
      import Fork._
    
      // Fork is in "taken" state
      def taken(philosopher: Int): Receive = {
        case Release(`philosopher`) => {
          println(s"\tPhilosopher $philosopher puts down fork $id.")
          sender() ! ReleasedFork(id)
          unstashAll()
          context.unbecome()
        }
        case Take(_) => stash()
      }
    
      // Fork is in "free" state
      def receive = {
        case Take(philosopher) => {
          println(s"\tPhilosopher $philosopher takes fork $id.")
          sender() ! TookFork(id)
          context.become(taken(philosopher), false)
        }
      }
    }
    

    但是,我不想写 stash() unstashAll() 到处打电话。相反,我想实现一个自定义邮箱类型来实现这一点,即在参与者处理邮件时隐藏未处理的邮件并取消对它们的灰化。这可能吗?

    我尝试实现一个自定义邮箱,但无法确定邮件是否与接收块匹配。

    2 回复  |  直到 6 年前
        1
  •  1
  •   Tim    6 年前

    问题在于 forward 如果有多条消息等待处理,它可能会对消息重新排序,这可能不是一个好主意。

    这里最好的解决方案似乎是在提供所需语义的actor内部实现自己的队列。如果不能立即处理消息,则将其放入队列中,当下一条消息到达时,可以处理尽可能多的队列。这还允许您检测发件人何时发送不一致的邮件(例如 Release 在他们没有的叉子上 Take )否则将在传入邮箱中累积。

    在您证明这是一个问题之前,我不会担心效率,但是如果每个接收函数只处理在该特定状态下相关的消息,效率会更高。


    我会避免使用 var 通过将状态放入 receive 方法。以及 _status 值在接收处理程序的选择中是隐式的,不需要作为值存储。这个 taken 接收处理程序只需要处理 释放 消息和主接收处理程序只需要处理 采取 信息。

        2
  •  0
  •   Jeffrey Chung    6 年前

    存在 a sample project 在包含多个 "Dining Philosophers" 问题。你的方法和他们的方法的一个关键区别是,他们把器具和哲学家都当作演员来实现,而你只把器具定义为演员。示例实现展示了如何在不处理未处理邮件或使用自定义邮箱的情况下对问题建模。