代码之家  ›  专栏  ›  技术社区  ›  zella

如何将流程执行作为反应式的“可观察的”[字符串]

  •  1
  • zella  · 技术社区  · 7 年前

    我希望将外部进程执行表示为 Observable[String] 在哪里 String -流程输出的行。这里举个例子,我所做的工作是有效的:

    import monix.eval.Task
    import monix.execution.Scheduler.Implicits.global
    import monix.reactive.Observable
    
    object TestSo {
    
      def main(args: Array[String]): Unit = {
    
        val lineStream = scala.sys.process.Process("python3 test.py").lineStream
    
        val lineStreamO: Observable[String] = Observable.fromIterator(Task(lineStream.iterator))
          .doOnNext(l => Task(println(l))) //logging
          .guarantee(Task(println("clean resources")))
    
        println(lineStreamO.toListL.runSyncUnsafe())
      }
    
    }
    

    你可以看到,这个过程每秒钟都会发出一条新的线。但这并不重要。提供完整的例子, test.py :

    from time import sleep
    print(0, flush=True)
    sleep(1)
    print(1, flush=True)
    sleep(1)
    print(2, flush=True)
    sleep(1)
    print(3, flush=True)
    sleep(1)
    print(4, flush=True)
    

    输出:

    0
    1
    2
    3
    4
    5
    clean resources
    List(0, 1, 2, 3, 4, 5)
    

    问题 :

    如果进程冻结(例如 sleep 100000 )进程应在超时后终止。此外,如果流程被强制执行或失败,则应清除一些资源。( guarantee 例如)。非零退出代码应表示失败。

    如何将流程执行作为 可观测的 是否使用Propper错误处理? rx-java 欢迎提供解决方案。

    1 回复  |  直到 7 年前
        1
  •  2
  •   SergGr    7 年前

    超时的需要将迫使您重新编写 lineStream 逻辑。另一方面,通过这样的重写,你可以避免中间人 Iterator 直接将线条 Subject . 对于超时逻辑,可以使用monix timeoutOnSlowUpstream 方法,但您仍然必须处理超时错误并关闭已启动的进程。

    此外,还可以选择如何处理长输出和多个订阅服务器。在这段代码中,我决定使用 replayLimited . 根据你的需要,你可以选择一些不同的策略。下面是一个解决方案的草图:

    object ProcessHelper {
    
      import scala.sys.process.{Process, BasicIO}
      import scala.concurrent.duration.FiniteDuration
      import monix.eval.Task
      import monix.execution.Scheduler
      import monix.reactive.subjects.ConcurrentSubject
      import monix.reactive.Observable
    
      private class FinishedFlagWrapper(var finished: Boolean = false)
    
      def buildProcessLinesObservable(cmd: String, timeout: FiniteDuration, bufferLines: Int = 100)(implicit scheduler: Scheduler): Observable[String] = {
        // works both as a holder for a mutable boolean var and as a synchronization lock
        // that is required to preserve semantics of a Subject, particularly
        // that onNext is never called after onError or onComplete
        val finished = new FinishedFlagWrapper()
    
        // whether you want here replayLimited or some other logic depends on your needs
        val subj = ConcurrentSubject.replayLimited[String](bufferLines)
    
        val proc = Process(cmd).run(BasicIO(withIn = false,
          line => finished.synchronized {
            if (!finished.finished)
              subj.onNext(line)
          }, None))
    
        // unfortunately we have to block a whole thread just to wait for the exit code
        val exitThread = new Thread(() => {
          try {
            val exitCode = proc.exitValue()
            finished.synchronized {
              if (!finished.finished) {
                finished.finished = true
                if (exitCode != 0) {
                  subj.onError(new RuntimeException(s"Process '$cmd' has exited with $exitCode."))
                }
                else {
                  subj.onComplete()
                }
              }
            }
          }
          catch {
            // ignore when this is a result of our timeout
            case e: InterruptedException => if(!finished.finished) e.printStackTrace()
          }
        }, "Process-exit-wait")
        exitThread.start()
    
        subj.timeoutOnSlowUpstream(timeout)
          .guarantee(Task(finished.synchronized {
            if (!finished.finished) {
              finished.finished = true
              proc.destroy()
              exitThread.interrupt()
            }
          }))
      }
    }
    

    使用示例如下:

    def test(): Unit = {
      import monix.execution.Ack._
      import monix.reactive._
      import scala.concurrent._
      import scala.concurrent.duration._
      import monix.execution.Scheduler.Implicits.global
    
    
      val linesO = ProcessHelper.buildProcessLinesObservable("python3 test.py", 5 seconds, 2) // buffer is reduced to just 2 lines just for this example 
    
      linesO.subscribe(new Observer[String] {
        override def onNext(s: String): Future[Ack] = {
          println(s"Received '$s'")
          Future.successful(Continue)
        }
    
        override def onError(ex: Throwable): Unit = println(s"Error '$ex'")
    
        override def onComplete(): Unit = println("Complete")
      })
    
      try {
        println(linesO.toListL.runSyncUnsafe())
        println(linesO.toListL.runSyncUnsafe()) // second run will show only last 2 values because of the reduced buffer size
        println("Finish success")
      }
      catch {
        case e: Throwable => println("Failed with " + e)
      }
    }
    
    推荐文章