代码之家  ›  专栏  ›  技术社区  ›  Fedor

Java CompletableFuture比Scala Future更简洁、更快

  •  -1
  • Fedor  · 技术社区  · 7 年前

    我是Java开发人员,目前正在学习Scala。人们普遍承认,Java比Scala更冗长。我只需要同时调用2个或多个方法,然后合并结果。官方Scala文档位于docs。scala-lang.org/overview/core/futures。html建议使用 for-comprehention 为此。所以我直接使用了现成的解决方案。然后我想我该怎么做 CompletableFuture 令人惊讶的是,它产生了比Scala更简洁、更快的代码 Future

    让我们考虑一个基本的并发情况:对数组中的值求和。为了简单起见,让我们将数组分成两部分(因此它将是两个工作线程)。Java的 sumConcurrently 仅接受 4. LOC,而Scala的版本需要 12 地址:。Java的版本也是 15% 在我的电脑上更快。

    完整的代码, 基准优化。 Java实现:

    public class CombiningCompletableFuture {
        static int sumConcurrently(List<Integer> numbers) throws ExecutionException, InterruptedException {
            int mid = numbers.size() / 2;
            return CompletableFuture.supplyAsync( () -> sumSequentially(numbers.subList(0, mid)))
                    .thenCombine(CompletableFuture.supplyAsync( () -> sumSequentially(numbers.subList(mid, numbers.size())))
                    , (left, right) -> left + right).get();
        }
    
        static int  sumSequentially(List<Integer> numbers) {
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(1));
            } catch (InterruptedException ignored) {     }
            return numbers.stream().mapToInt(Integer::intValue).sum();
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            List<Integer> from1toTen = IntStream.rangeClosed(1, 10).boxed().collect(toList());
            long start = System.currentTimeMillis();
            long sum = sumConcurrently(from1toTen);
            long duration = System.currentTimeMillis() - start;
            System.out.printf("sum is %d in %d ms.", sum, duration);
        }
    }
    

    Scala的建议:

    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._
    
    
    object CombiningFutures extends App {
      def sumConcurrently(numbers: Seq[Int]) = {
        val splitted = numbers.splitAt(5)
        val leftFuture = Future {
          sumSequentally(splitted._1)
        }
        val rightFuture = Future {
          sumSequentally(splitted._2)
        }
        val totalFuture = for {
          left <- leftFuture
          right <- rightFuture
        } yield left + right
        Await.result(totalFuture, Duration.Inf)
      }
      def sumSequentally(numbers: Seq[Int]) = {
        Thread.sleep(1000)
        numbers.sum
      }
    
      val from1toTen = 1 to 10
      val start = System.currentTimeMillis
      val sum = sumConcurrently(from1toTen)
      val duration = System.currentTimeMillis - start
      println(s"sum is $sum in $duration ms.")
    }
    

    如何在不太影响可读性的情况下改进Scala代码,有什么解释和建议吗?

    2 回复  |  直到 7 年前
        1
  •  4
  •   sarveshseri    7 年前

    SumConcurrent的详细scala版本,

    def sumConcurrently(numbers: List[Int]): Future[Int] = {
      val (v1, v2) = numbers.splitAt(numbers.length / 2)
      for {
        sum1 <- Future(v1.sum)
        sum2 <- Future(v2.sum)
      } yield sum1 + sum2
    }
    

    更简洁的版本

    def sumConcurrently2(numbers: List[Int]): Future[Int] = numbers.splitAt(numbers.length / 2) match {
      case (l1, l2) => Future.sequence(List(Future(l1.sum), Future(l2.sum))).map(_.sum)
    }
    

    所有这些都是因为我们必须对列表进行分区。假设我们必须编写一个函数,该函数使用多个并发计算获取几个列表并返回其总和,

    def sumConcurrently3(lists: List[Int]*): Future[Int] =
      Future.sequence(lists.map(l => Future(l.sum))).map(_.sum)
    

    如果上面看起来很神秘。。。那么让我把它解密,

    def sumConcurrently3(lists: List[Int]*): Future[Int] = {
      val listOfFuturesOfSums = lists.map { l => Future(l.sum) }
      val futureOfListOfSums = Future.sequence(listOfFuturesOfSums)
      futureOfListOfSums.map { l => l.sum }
    }
    

    现在,无论何时使用 Future (假设未来在 t1 )在计算中,这意味着该计算必须在时间之后进行 t1级 . 在Scala中,你可以像这样进行阻塞,

    val sumFuture = sumConcurrently(List(1, 2, 3, 4))
    
    val sum = Await.result(sumFuture, Duration.Inf)
    
    val anotherSum = sum + 100
    
    println("another sum = " + anotherSum)
    

    但这一切的意义是什么,你是 blocking 当前线程,而这些线程上的计算将完成。为什么不把整个计算转移到未来呢。

    val sumFuture = sumConcurrently(List(1, 2, 3, 4))
    
    val anotherSumFuture = sumFuture.map(s => s + 100)
    
    anotherSumFuture.foreach(s => println("another sum = " + s))
    

    现在,您没有在任何地方阻塞,线程可以在任何需要的地方使用。

    将来 Scala中的实现和api旨在使您能够编写程序,尽可能避免阻塞。

        2
  •  1
  •   Viktor Klang    7 年前

    对于手头的任务,以下可能也不是最简单的选择:

    def sumConcurrently(numbers: Vector[Int]): Future[Int] = {
      val (v1, v2) = numbers.splitAt(numbers.length / 2)
      Future(v1.sum).zipWith(Future(v2.sum))(_ + _)
    }
    

    正如我在 comment 您的示例有几个问题。