代码之家  ›  专栏  ›  技术社区  ›  paradigmatic

对未来作出反应

  •  7
  • paradigmatic  · 技术社区  · 14 年前

    我正在尝试使用分而治之(aka fork/join)方法来解决数字运算问题。代码如下:

    import scala.actors.Futures.future
    
    private def compute( input: Input ):Result = {
      if( pairs.size < SIZE_LIMIT ) {
        computeSequential()
      } else {
        val (input1,input2) = input.split
        val f1 = future( compute(input1) )
        val f2 = future( compute(input2) )
        val result1 = f1()
        val result2 = f2()
        merge(result1,result2)
      }
    }
    

    它运行起来(速度很快),但是未来的apply方法似乎阻塞了一个线程,线程池大大增加。当创建太多线程时,计算就会受阻。

    有没有一种 反应

    编辑: 我使用的是scala 2.8.0.final

    1 回复  |  直到 13 年前
        1
  •  8
  •   Community CDub    5 年前

    不要提出你的要求 Future s、 因为这会迫使他们阻塞并等待答案;正如您所看到的,这会导致死锁。相反,用它们来告诉他们完成后要做什么。而不是:

    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
    

    试试这个:

    for {
      result1 <- f1
      result2 <- f2
    } yield merge(result1, result2)
    

    其结果将是 Responder[Result] Future[Result] )包含合并结果;您可以使用 respond() foreach() ,或者你可以 map() flatMap() 把它传给另一个人 Responder[T] . 无需阻塞,只需继续为将来安排计算!

    好的,签名 compute 功能必须更改为 响应者[结果] 那么,这对递归调用有什么影响呢?让我们试试这个:

    private def compute( input: Input ):Responder[Result] = {
      if( pairs.size < SIZE_LIMIT ) {
        future(computeSequential())
      } else {
        val (input1,input2) = input.split
        for {
          result1 <- compute(input1)
          result2 <- compute(input2)
        } yield merge(result1, result2)
      }
    }
    

    现在您不再需要将呼叫打包到 具有 future(...) 因为他们已经回来了 Responder (一个超类) 未来 ).

    编辑2:

    最初——不再阻塞。如果是从 main() block 对所有这些未来,但只有一次,在最高水平,只有对结果 计算,而不是任何中间的计算。

    正在归还的东西 compute() 不再有阻塞 apply() 未来 做。我不知道为什么 s生成泛型 应答器 而不是 未来 ;这似乎是一个API错误。但在任何情况下,您都应该能够自己制作:

    def claim[A](r:Responder[A]):A = {
      import java.util.concurrent.ArrayBlockingQueue
      import scala.actors.Actor.actor
    
      val q = new ArrayBlockingQueue[A](1)
      // uses of 'respond' need to be wrapped in an actor or future block
      actor { r.respond(a => q.put(a)) } 
      return q.take
    }
    

    现在您可以创建一个阻塞调用来计算 main 方法如下:

    val finalResult = claim(compute(input))
    
    推荐文章