不要提出你的要求
Future
s、 因为这会迫使他们阻塞并等待答案;正如您所看到的,这会导致死锁。相反,用它们来告诉他们完成后要做什么。而不是:
val result1 = f1()
val result2 = f2()
merge(result1,result2)
试试这个:
for {
result1 <- f1
result2 <- f2
} yield merge(result1, result2)
其结果将是
Responder[Result]
Future[Result]
)包含合并结果;您可以使用
respond()
foreach()
,或者你可以
map()
或
flatMap()
把它传给另一个人
Responder[T]
. 无需阻塞,只需继续为将来安排计算!
好的,签名
compute
功能必须更改为
响应者[结果]
那么,这对递归调用有什么影响呢?让我们试试这个:
private def compute( input: Input ):Responder[Result] = {
if( pairs.size < SIZE_LIMIT ) {
future(computeSequential())
} else {
val (input1,input2) = input.split
for {
result1 <- compute(input1)
result2 <- compute(input2)
} yield merge(result1, result2)
}
}
现在您不再需要将呼叫打包到
具有
future(...)
因为他们已经回来了
Responder
(一个超类)
未来
).
编辑2:
最初——不再阻塞。如果是从
main()
block
对所有这些未来,但只有一次,在最高水平,只有对结果
计算,而不是任何中间的计算。
正在归还的东西
compute()
不再有阻塞
apply()
未来
做。我不知道为什么
s生成泛型
应答器
而不是
未来
;这似乎是一个API错误。但在任何情况下,您都应该能够自己制作:
def claim[A](r:Responder[A]):A = {
import java.util.concurrent.ArrayBlockingQueue
import scala.actors.Actor.actor
val q = new ArrayBlockingQueue[A](1)
// uses of 'respond' need to be wrapped in an actor or future block
actor { r.respond(a => q.put(a)) }
return q.take
}
现在您可以创建一个阻塞调用来计算
main
方法如下:
val finalResult = claim(compute(input))