代码之家  ›  专栏  ›  技术社区  ›  erip Jigar Trivedi

为什么我的源代码产生的缓冲区大小大于?

  •  2
  • erip Jigar Trivedi  · 技术社区  · 7 年前

    我是阿卡溪的新手。我试图创建一个简单的分布式工作实例来执行任意的工作。我创建了一个随机工作生成器:

    import java.util.UUID
    
    import akka.stream.{Attributes, Outlet, SourceShape}
    import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
    
    final case class Job(id: UUID)
    
    class RandomJobSource extends GraphStage[SourceShape[Job]] {
      final val out: Outlet[Job] = Outlet.create("RandomJobSource.out")
    
      final val shape = SourceShape.of(out)
    
      override type Shape = SourceShape[Job]
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
        setHandler(out, new OutHandler {
          override def onPull(): Unit = {
            push(out, Job(id = UUID.randomUUID()))
          }
        })
    
      }
    }
    

    我可以创造出无限的 Job S与:

    val jobs = Source.fromGraph(new RandomJobSource)
    

    现在,为了在流之外工作,我创建了一个平衡器和工作者:

      def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
        import GraphDSL.Implicits._
    
        Flow.fromGraph(GraphDSL.create() { implicit b ⇒
          val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
          val merge = b.add(Merge[Out](workerCount))
    
          for (_ ← 1 to workerCount) {
            // for each worker, add an edge from the balancer to the worker, then wire
            // it to the merge element
            balancer ~> worker.async ~> merge
          }
    
          FlowShape(balancer.in, merge.out)
        })
      }
    
      def worker[In, Out](f: In => Out): Flow[In, Out, NotUsed] = Flow.fromFunction(f)
    
      private def work(job: Job): Unit = {
        println(s"Doing job ${job.id}...")
        Thread.sleep(5000 + Random.nextInt(1000))
      }
    

    最后,我把作品分发出去。总而言之:

    import akka.stream._
    import akka.stream.scaladsl._
    import akka.{Done, NotUsed}
    import akka.actor.ActorSystem
    
    import scala.concurrent.{ExecutionContext, Future}
    import scala.util.Random
    
    object ExampleMain extends App {
    
      def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
        import GraphDSL.Implicits._
    
        Flow.fromGraph(GraphDSL.create() { implicit b ⇒
          val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
          val merge = b.add(Merge[Out](workerCount))
    
          for (_ ← 1 to workerCount) {
            // for each worker, add an edge from the balancer to the worker, then wire
            // it to the merge element
            balancer ~> worker.async ~> merge
          }
    
          FlowShape(balancer.in, merge.out)
        })
      }
    
      def worker[In, Out](f: In => Out): Flow[In, Out, NotUsed] = Flow.fromFunction(f)
    
      private def work(job: Job): Unit = {
        println(s"Doing job ${job.id}...")
        Thread.sleep(5000 + Random.nextInt(1000))
      }
    
      implicit val system: ActorSystem = ActorSystem("QuickStart")
      implicit val mat: Materializer = ActorMaterializer()
      implicit val ec: ExecutionContext = system.dispatcher
    
      val jobs = Source.fromGraph(new RandomJobSource)
    
      val res: Future[Done] = jobs
        .withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
        // Work serially to avoid under-working priority jobs
        .buffer(1, OverflowStrategy.backpressure)
        .wireTap(job => println(s"Next job: ${job.id}"))   
        // Put work in a balancer with 5 workers.
        .via(balancer(worker(work), 5))
        .runWith(Sink.ignore)
    
      res.onComplete(_ => system.terminate())
    
    }
    

    我希望,因为缓冲区大小为1,所以每当一个工作人员完成工作时,一条新消息就会从源中提取出来(因此,我将看到 "Next job: ${job.id}" 线)。

    相反,如果我的前5名员工工作,并且 “下一个作业:${job.id}” 行打印到控制台:

    Next job: 4f33258a-cd0f-4d54-b782-4ae97c67f125
    Next job: 363a2b95-7cfa-4e8a-b683-17d2fccf48c5
    Next job: 82a8ca64-1d23-474e-afc4-9b6d69a7f842
    Doing job 363a2b95-7cfa-4e8a-b683-17d2fccf48c5...
    Next job: d01faedf-65c3-4b88-83c2-fd0ce98607fe
    Next job: 67f24856-a843-4df5-a87c-61f46b1f7141
    Doing job 82a8ca64-1d23-474e-afc4-9b6d69a7f842...
    Doing job 4f33258a-cd0f-4d54-b782-4ae97c67f125...
    Doing job d01faedf-65c3-4b88-83c2-fd0ce98607fe...
    Next job: 28b62312-a084-428f-ab61-d2426a242e52
    Doing job 67f24856-a843-4df5-a87c-61f46b1f7141...
    Next job: 80f9cb48-7447-494d-a1d9-e091049e7822
    Next job: 38d91ef0-cc90-4bb8-a1f2-4aedfc5f655f
    Next job: 546566b7-f7b6-4ed3-84c9-ddc8eb19c65a
    Next job: 1a3f8f0c-be5a-4f14-a054-9f7fcd8a356a
    Next job: 8f425329-2ac6-4dd0-a7b8-e77345ba125f
    Next job: d1d23777-6bf8-4c91-a61d-7bc60f97c725
    Next job: 628ddabe-25a1-47b0-9989-f963d6394f64
    Next job: 6291e210-5228-4175-9b18-76a35d82dff0
    Next job: 8c6344ad-b6f5-43a0-b709-4ddd2d080982
    Next job: dd917c8d-ff12-44e2-81a7-98f66f55170c
    Next job: 65449eb7-ea9d-4e29-b11f-d37190bb0001
    Next job: 5649871d-a4bd-4edc-8c62-167f1da41786
    Next job: af3d20cb-2e48-4668-bf42-3fabbd4cffad
    Next job: 0f5c9ec2-3b69-4e91-8d87-89b0c1580d8a
    Next job: 43f12408-a02c-4ac6-8564-a2ecc96155ac
    Next job: 20951e30-1801-4d78-9683-dc0f45dc61e6
    Next job: a95c94b9-a9ca-49b6-8ff4-eef81dcb13cd
    Next job: 76107909-6f2f-4035-b34f-6c0b10a79d76
    Next job: d4551999-f7fb-4bf6-9da9-3e18b8dfc4da
    Next job: 2df2ceee-953f-44c8-8169-91152c0a672a
    Next job: 7425e65a-89fb-45d1-9514-54b2af3c9791
    Next job: f986a2a0-a82c-4fbc-af2c-f5439856bba2
    Next job: ac165054-8718-4eb8-9161-40738c8b67b0
    Next job: 27ea10d9-34eb-4989-9d5d-c67118eaf41e
    Next job: e587fa05-2bc3-4901-8ed0-fe1efa912d3e
    Next job: 77ee6cb0-77e1-4f2b-8d81-d57907755832
    Next job: 9224f6eb-c81d-4bd5-b7f1-5bb09cc4e97e
    ...
    

    当然,这些工作是分批完成的。一旦他们是,这种模式重复。但是,为什么在工作结束时,这些信息没有被删除呢?

    1 回复  |  直到 7 年前
        1
  •  0
  •   erip Jigar Trivedi    7 年前

    默认情况下,当工人被 async 边界,它们隐式地获得最大大小为 16 . 因此,这五名工人将缓冲5*16=80个工作岗位。在我的例子中,这是不可取的,因此我只需为输入缓冲区大小为1的worker添加一个属性:

      def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
        import GraphDSL.Implicits._
    
        Flow.fromGraph(GraphDSL.create() { implicit b ⇒
          val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
          val merge = b.add(Merge[Out](workerCount))
    
          for (_ ← 1 to workerCount) {
            // for each worker, add an edge from the balancer to the worker, then wire
            // it to the merge element
            balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge
          }
    
          FlowShape(balancer.in, merge.out)
        })
      }
    

    不幸的是,这些工作器可以缓冲不少于一条消息,因此我的控制台输出会产生稍微不正确的消息:

    Next job: 38c6424c-a943-48bb-af9c-e6b488973781
    Doing job 38c6424c-a943-48bb-af9c-e6b488973781...
    Next job: ff5b9e0f-40c3-4e6b-ab60-89d349b5ba0f
    Doing job ff5b9e0f-40c3-4e6b-ab60-89d349b5ba0f...
    Next job: 0d260b3b-61b4-41f4-a67d-bac3190e257d
    Doing job 0d260b3b-61b4-41f4-a67d-bac3190e257d...
    Next job: 80a743a1-9c6c-4486-b836-c102a11f5503
    Doing job 80a743a1-9c6c-4486-b836-c102a11f5503...
    Next job: a10bb61b-865f-4252-96a6-3da9413c77fb
    Doing job a10bb61b-865f-4252-96a6-3da9413c77fb...
    Next job: 3a7f8aeb-a6bf-4caa-ae9b-619fb49442ca
    Next job: 209a1b78-f66a-4a54-91f1-93aaadb10522
    Next job: a56e02e5-5e7c-4505-ae44-8a9bb68d5fc6
    Next job: e218fd63-6ac2-45f5-9837-ce7bcc3c1930
    Next job: 9ce5f1ff-6ec2-4282-99d5-fcf5456f35a7