代码之家  ›  专栏  ›  技术社区  ›  BenjaminJackman

在scala中的另一个线程上执行简单任务

  •  2
  • BenjaminJackman  · 技术社区  · 16 年前

    我想知道是否有一种方法可以在scala中的另一个线程上执行非常简单的任务,并且不会有太多开销?

    基本上,我想做一个全局“执行器”,可以处理任意数量的任务。然后,我可以使用executor构建其他构造。

    此外,如果客户端不必考虑阻塞或非阻塞的考虑,那就更好了。

    我知道scala actors库是在Doug Lea FJ的基础上构建的,而且它们在一定程度上支持我正在努力实现的目标。然而,据我所知,我必须预先分配一个“演员库”来完成。

    我希望避免为此创建一个全局线程池,因为据我所知,它并不擅长细粒度并行。

    import concurrent.SyncVar
    object SimpleExecutor {
      import actors.Actor._
      def exec[A](task:  => A) : SyncVar[A] = {
        //what goes here?
        //This is what I currently have
        val x = new concurrent.SyncVar[A]
        //The overhead of making the actor appears to be a killer
        actor {
          x.set(task)
        }
        x
      }
      //Not really sure what to stick here
      def execBlocker[A](task: => A) : SyncVar[A] = exec(task)
    
    }
    

    下面是一个使用exec的示例:

    object Examples {
      //Benchmarks a task
      def benchmark(blk : => Unit) = {
        val start = System.nanoTime
        blk
        System.nanoTime - start
      }
    
      //Benchmarks and compares 2 tasks
      def cmp(a: => Any, b: => Any) = {
        val at = benchmark(a)
        val bt = benchmark(b)
        println(at + " " + bt + " " +at.toDouble / bt)
      }
    
      //Simple example for simple non blocking comparison
      import SimpleExecutor._
      def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
      def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
    
      //Simple example for the blocking performance
      import Thread.sleep
      def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
      def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
    }
    

    import Examples._
    cmp(paraAdd(10000), singAdd(10000))
    cmp(paraSle(100), singSle(100))
    
    1 回复  |  直到 16 年前
        1
  •  8
  •   Daniel C. Sobral    16 年前

    那是什么 Futures import scala.actors.Futures._ 使用 future 要创造新的未来,方法如下 awaitAll apply respond isSet 看看是否准备好了,等等。

    编辑

    将简单的整数加法并行化无法获得性能,因为这比函数调用还要快。并发只能通过避免阻塞i/o所损失的时间,以及通过使用多个CPU内核并行执行任务来提高性能。在后一种情况下,任务的计算成本必须足以抵消划分工作负载和合并结果的成本。

    使用并发的另一个原因是改进 应用程序的名称。这并不是让它更快,而是让它更快地响应用户,一种方法是将相对较快的操作卸载到另一个线程,以便线程处理用户看到或做的事情可以更快。但我离题了。

    您的代码存在严重问题:

      def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
      def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
    

    或者,转化为未来,

      def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
      def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
    

    paraAdd 正在并行地执行任务,但它不是,因为 Range map (这取决于Scala 2.7;从Scala 2.8.0开始, 范围 是严格的)。您可以在其他Scala问题上查找它。结果是:

    1. 范围是从中创建的 0 直到 hi
    2. 范围投影是从范围的每个元素i创建到返回的函数中的 future(i+5) 打电话的时候。
    3. 对于范围投影的每个元素( i => future(i+5)) ,对元素进行求值( foreach 它叫什么名字。

    所以,因为 将来 将来

      def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)
    

    这将为您提供更好的性能,但从来没有简单的即时添加那么好。另一方面,假设您这样做:

    def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
    def paraRepeat(n: Int, f: => Any) = 
      (0 until n).force map (_ => future(f)) foreach (_.apply)
    

    cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))
    

    您可能开始看到收益(这取决于内核数量和处理器速度)。