代码之家  ›  专栏  ›  技术社区  ›  erip Jigar Trivedi

为什么我的打字演员不被监护人重新启动?

  •  0
  • erip Jigar Trivedi  · 技术社区  · 6 年前

    我在尝试用Akka打字。我有一个模仿古怪工人的假演员:

    import akka.actor.typed.Behavior
    import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
    import akka.actor.typed.scaladsl.Behaviors
    
    import scala.util.Random
    
    object DummyActor {
      def behavior[T](serviceKey: ServiceKey[T]): Behavior[Any] = Behaviors.setup { ctx =>
    
        ctx.system.receptionist ! Receptionist.Register(serviceKey, ctx.self)
    
        ctx.log.info("Woohoo, I'm alive!")
    
        Thread.sleep(1000)
    
        if (Random.nextInt(10) > 5)
          throw new IllegalStateException("Something bad happened!")
    
        Behaviors.empty
      }
    }
    

    它的守护者和路由器:

    import akka.actor.typed.{Behavior, SupervisorStrategy}
    import akka.actor.typed.receptionist.ServiceKey
    import akka.actor.typed.scaladsl.Behaviors
    
    object MyCluster {
    
      val serviceKey: ServiceKey[String] = ServiceKey[String]("cluster")
    
      val strategy = SupervisorStrategy.restart
    
      val behavior: Behavior[String] = Behaviors.setup { ctx =>
    
    
        (1 to 5).foreach { i =>
          ctx.log.info(s"Spawning actor #$i")
          ctx.spawn(
            Behaviors.supervise(DummyActor.behavior(serviceKey))
                     .onFailure[Throwable](strategy),
            s"actor-$i"
          )
        }
    
        val router = ctx.spawn(RandomRouter.clusterRouter(serviceKey), "router")
    
        Behaviors.empty
      }
    }
    

    我的路由器监听前台事件:

    import java.util.concurrent.ThreadLocalRandom
    
    import akka.actor.Address
    import akka.actor.typed.{ActorRef, Behavior}
    import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
    import akka.actor.typed.scaladsl.Behaviors
    import akka.cluster.ClusterEvent.{ReachabilityEvent, ReachableMember, UnreachableMember}
    import akka.cluster.typed.{Cluster, Subscribe}
    
    object RandomRouter {
      private final case class WrappedReachabilityEvent(event: ReachabilityEvent)
    
      // same as above, but also subscribes to cluster reachability events and
      // avoids routees that are unreachable
      def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] =
        Behaviors.setup[Any] { ctx ⇒
          ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
    
          val cluster = Cluster(ctx.system)
          // typically you have to map such external messages into this
          // actor's protocol with a message adapter
          val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(WrappedReachabilityEvent.apply)
    
          cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])
    
          def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] =
            Behaviors.receive { (ctx, msg) ⇒
              msg match {
                case serviceKey.Listing(services) ⇒
                  if (services.isEmpty) {
                    ctx.log.info("Found no services")
                  } else {
                    ctx.log.info(s"Found services: ${services.map(_.path).mkString(", ")}")
                  }
                  routingBehavior(services.toVector, unreachable)
                case WrappedReachabilityEvent(event) => event match {
                  case UnreachableMember(m) =>
                    ctx.log.warning(s"Member ${m.uniqueAddress.address} has become unreachable")
                    routingBehavior(routees, unreachable + m.address)
                  case ReachableMember(m) =>
                    ctx.log.info(s"Member ${m.uniqueAddress.address} has become reachable again")
                    routingBehavior(routees, unreachable - m.address)
                }
    
                case other: T @unchecked ⇒
                  if (routees.isEmpty)
                    Behaviors.unhandled
                  else {
                    val reachableRoutes =
                      if (unreachable.isEmpty) routees
                      else routees.filterNot { r => unreachable(r.path.address) }
    
                    val i = ThreadLocalRandom.current.nextInt(reachableRoutes.size)
                    reachableRoutes(i) ! other
                    Behaviors.same
                  }
              }
            }
    
          routingBehavior(Vector.empty, Set.empty)
        }.narrow[T]
    }
    

    但我发现,当我启动集群时,最终会发现一些参与者已经死亡(预期),但没有重新启动,留下如下日志:

    [INFO] [06/01/2018 18:11:14.242] [cluster-system-akka.actor.default-dispatcher-4] [akka://cluster-system/user/router] Found services: akka://cluster-system/user/actor-4, akka://cluster-system/user/actor-3, akka://cluster-system/user/actor-1, akka://cluster-system/user/actor-5
    

    为什么不是 MyCluster#strategy 重新启动失败的参与者?

    1 回复  |  直到 5 年前
        1
  •  0
  •   erip Jigar Trivedi    6 年前

    的代码和相关注释 SupervisorStrategy#restart 等着回答。

    如果演员期间有异常 设置 ,重新启动将是危险的,因为可能会创建无限重新启动循环。相反,建议使用 restartWithBackoff 用于启动监督。