代码之家  ›  专栏  ›  技术社区  ›  zella

如何实现带跳过和条件停止的流

  •  0
  • zella  · 技术社区  · 7 年前

    我试图实现批处理。我的算法:

    1)首先,我需要数据库的请求项,初始 skip = 0 是的。如果没有项目,则完全停止处理。

      case class Item(i: Int)
    
      def getItems(skip: Int): Future[Seq[Item]] = {
        Future((skip until (skip + (if (skip < 756) 100 else 0))).map(Item))
      }
    

    2)然后每件事都要做重活( parallelism = 4 )

      def heavyJob(item: Item): Future[String] = Future {
        Thread.sleep(1000)
        item.i.toString + " done"
      }
    

    3)所有项目处理完成后,转到 skip += 100

    我在尝试什么:

    val dbSource: Source[List[Item], _] = Source.fromFuture(getItems(0).map(_.toList))
    
    val flattened: Source[Item, _] = dbSource.mapConcat(identity)
    
    val procced: Source[String, _] = flattened.mapAsync(4)(item => heavyJob(item))
    
    procced.runWith(Sink.onComplete(t => println("Complete: " + t.isSuccess)))
    

    但我不知道如何实现分页

    1 回复  |  直到 7 年前
        1
  •  0
  •   Ramón J Romero y Vigil    7 年前

    这个 skip 递增可以用 Iterator 作为价值的基本来源:

    val skipIncrement = 100
    
    val skipIterator : () => Iterator[Int] = 
      () => Iterator from (0, skipIncrement)
    

    这个迭代器可以用来驱动一个akka Source 获取项目并将继续处理,直到查询返回空 Seq 以下内容:

    val databaseStillHasValues : Seq[Item] => Boolean = 
      (dbValues) => !dbValues.isEmpty
    
    val itemSource : Source[Item, _] = 
      Source.fromIterator(skipIterator)
            .mapAsync(1)(getItems)
            .takeWhile(databaseStillHasValues)
            .mapConcat(identity)
    

    这个 heavyJob 可用于 Flow 以下内容:

    val heavyParallelism = 4
    
    val heavyFlow : Flow[Item, String, _] = 
      Flow[Item].mapAsync(heavyParallelism)(heavyJob)
    

    最后,源和流可以附加到 Sink 以下内容:

    val printSink = Sink[String].foreach(t => println(s"Complete: ${t.isSuccess}"))
    
    itemSource.via(heavyFlow)
              .runWith(printSink)