代码之家  ›  专栏  ›  技术社区  ›  Witold Kupś

kotlin协程-在运行阻塞中使用主线程

  •  5
  • Witold Kupś  · 技术社区  · 6 年前

    我正在尝试执行以下代码:

     val jobs = listOf(...)
     return runBlocking(CommonPool) {
        val executed = jobs.map {
            async { it.execute() }
        }.toTypedArray()
        awaitAll(*executed)
     }
    

    在哪里? jobs 是一些 Supplier 在synchronus世界中,这应该只是创建一个int列表。 一切正常,但问题是主线程没有被利用。下面是您工具包中的屏幕截图: enter image description here

    所以,问题是-我怎样才能利用主线呢?

    我想 runBlocking 问题就在这里,但有没有其他方法可以得到同样的结果呢?使用Java并行流看起来更好,但是主线程仍然没有完全利用(任务是完全独立的)。

    更新

    好吧,也许我告诉你的太少了。 我的问题是在看了Vankant Subramaniam的演讲后不久提出的: https://youtu.be/0hQvWIdwnw4 是的。 我需要最大的性能,没有IO,没有UI等,只有计算。只有请求,我需要使用所有可用的资源。

    我的一个想法是将paralleizm设置为线程数+1,但我认为这相当愚蠢。

    4 回复  |  直到 6 年前
        1
  •  1
  •   Marko Topolnik    6 年前

    我用Java 8并行流测试了解决方案:

    jobs.parallelStream().forEach { it.execute() }
    

    我发现CPU的利用率是可靠的100%作为参考,我使用了这个计算作业:

    class MyJob {
        fun execute(): Double {
            val rnd = ThreadLocalRandom.current()
            var d = 1.0
            (1..rnd.nextInt(1_000_000)).forEach { _ ->
                d *= 1 + rnd.nextDouble(0.0000001)
            }
            return d
        }
    }
    

    注意,它的持续时间从零到执行100000000 FP乘法所需的时间随机变化。

    出于好奇,我还研究了您添加到问题中的代码,作为对您有效的解决方案。我发现了一些问题,例如:

    • 将所有结果累积到一个列表中,而不是在它们可用时进行处理
    • 在提交最后一个作业后立即关闭结果通道,而不是等待所有结果

    我自己编写了一些代码,并添加了一行代码来对Stream API进行基准测试这里是:

    const val NUM_JOBS = 1000
    val jobs = (0 until NUM_JOBS).map { MyJob() }
    
    
    fun parallelStream(): Double =
            jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })
    
    fun channels(): Double {
        val resultChannel = Channel<Double>(UNLIMITED)
    
        val mainComputeChannel = Channel<MyJob>()
        val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
            GlobalScope.actor<MyJob>(Dispatchers.Default) {
                for (job in channel) {
                    job.execute().also { resultChannel.send(it) }
                }
            }
        }
        val allComputeChannels = poolComputeChannels + mainComputeChannel
    
        // Launch a coroutine that submits the jobs
        GlobalScope.launch {
            jobs.forEach { job ->
                select {
                    allComputeChannels.forEach { chan ->
                        chan.onSend(job) {}
                    }
                }
            }
        }
    
        // Run the main loop which takes turns between running a job
        // submitted to the main thread channel and receiving a result
        return runBlocking {
            var completedCount = 0
            var sum = 0.0
            while (completedCount < NUM_JOBS) {
                select<Unit> {
                    mainComputeChannel.onReceive { job ->
                        job.execute().also { resultChannel.send(it) }
                    }
                    resultChannel.onReceive { result ->
                        sum += result
                        completedCount++
                    }
                }
            }
            sum
        }
    }
    
    fun main(args: Array<String>) {
        measure("Parallel Stream", ::parallelStream)
        measure("Channels", ::channels)
        measure("Parallel Stream", ::parallelStream)
        measure("Channels", ::channels)
    }
    
    fun measure(task: String, measuredCode: () -> Double) {
        val block = { print(measuredCode().toString().substringBefore('.')) }
        println("Warming up $task")
        (1..20).forEach { _ -> block() }
        println("\nMeasuring $task")
        val average = (1..20).map { measureTimeMillis(block) }.average()
        println("\n$task took $average ms")
    }
    

    下面是我的典型结果:

    Parallel Stream took 396.85 ms
    Channels took 398.1 ms
    

    结果是相似的,但是一行代码仍然胜过50行代码:)

        2
  •  1
  •   Pawel    6 年前

    仅仅因为在这个显式线程上没有运行任何工作并不意味着设备没有在同一个内核上运行其他线程。

    其实最好是你的 MainThread 空闲,这将使您的用户界面更具响应性。

        3
  •  1
  •   Aivean    6 年前

    首先,我想感同身受的是,利用主线程通常没有任何实际用途。

    如果您的应用程序是完全异步的,那么您将只有一个(主)线程被阻塞。这个线程确实消耗了一些内存,并给调度程序增加了一些额外的压力,但是对性能的额外影响可以忽略不计,甚至无法测量。

    在实际的java世界中,jvm中几乎不可能有固定数量的线程。有系统线程(gc)、nio线程等。

    一根线没什么区别只要应用程序中的线程数不会随着负载的增加而变得不受约束,就可以了。


    回到原来的问题。

    我认为在这种并行处理任务中没有一种简洁的方法来利用主线程。

    例如,可以执行以下操作:

    data class Job(val res: Int) {
        fun execute(): Int {
            Thread.sleep(100)
            println("execute $res in ${Thread.currentThread().name}")
            return res
        }
    }
    
    fun main() {
        val jobs = (1..100).map { Job(it) }
        val resultChannel = Channel<Int>(Channel.UNLIMITED)
        val mainInputChannel = Channel<Job>()
    
        val workers = (1..10).map {
            actor<Job>(CommonPool) {
                for (j in channel) {
                    resultChannel.send(j.execute())
                }
            }
        }
    
        val res: Deferred<List<Int>> = async(CommonPool) {
            val allChannels = (listOf(mainInputChannel) + workers)
    
            jobs.forEach { job ->
                select {
                    allChannels.forEach {
                        it.onSend(job) {}
                    }
                }
            }
    
            allChannels.forEach { it.close() }
            (1..jobs.size).map { resultChannel.receive() }
        }
    
        runBlocking {
            for (j in mainInputChannel) {
                resultChannel.send(j.execute())
            }
        }
    
        runBlocking {
            res.await().forEach { println(it) }
        }
    }
    

    基本上,是一个简单的生产者/消费者实现,其中主线程充当消费者之一。但这也导致了很多样板。

    输出:

    execute 1 in main @coroutine#12
    execute 5 in ForkJoinPool.commonPool-worker-1 @coroutine#4
    execute 6 in ForkJoinPool.commonPool-worker-2 @coroutine#5
    execute 7 in ForkJoinPool.commonPool-worker-7 @coroutine#6
    execute 2 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 8 in ForkJoinPool.commonPool-worker-4 @coroutine#7
    execute 4 in ForkJoinPool.commonPool-worker-5 @coroutine#3
    execute 3 in ForkJoinPool.commonPool-worker-3 @coroutine#2
    execute 12 in main @coroutine#12
    execute 10 in ForkJoinPool.commonPool-worker-7 @coroutine#9
    execute 15 in ForkJoinPool.commonPool-worker-5 @coroutine#6
    execute 11 in ForkJoinPool.commonPool-worker-3 @coroutine#10
    execute 16 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 9 in ForkJoinPool.commonPool-worker-1 @coroutine#8
    execute 14 in ForkJoinPool.commonPool-worker-4 @coroutine#5
    execute 13 in ForkJoinPool.commonPool-worker-2 @coroutine#4
    execute 20 in main @coroutine#12
    execute 17 in ForkJoinPool.commonPool-worker-5 @coroutine#2
    execute 18 in ForkJoinPool.commonPool-worker-3 @coroutine#3
    execute 24 in ForkJoinPool.commonPool-worker-1 @coroutine#6
    execute 23 in ForkJoinPool.commonPool-worker-4 @coroutine#5
    execute 22 in ForkJoinPool.commonPool-worker-2 @coroutine#4
    execute 19 in ForkJoinPool.commonPool-worker-7 @coroutine#7
    execute 21 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 25 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 28 in main @coroutine#12
    execute 29 in ForkJoinPool.commonPool-worker-2 @coroutine#2
    execute 30 in ForkJoinPool.commonPool-worker-7 @coroutine#3
    execute 27 in ForkJoinPool.commonPool-worker-4 @coroutine#10
    execute 26 in ForkJoinPool.commonPool-worker-1 @coroutine#9
    execute 32 in ForkJoinPool.commonPool-worker-3 @coroutine#4
    execute 31 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 36 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 35 in ForkJoinPool.commonPool-worker-4 @coroutine#7
    execute 33 in ForkJoinPool.commonPool-worker-2 @coroutine#5
    execute 38 in ForkJoinPool.commonPool-worker-3 @coroutine#2
    execute 37 in main @coroutine#12
    execute 34 in ForkJoinPool.commonPool-worker-7 @coroutine#6
    execute 39 in ForkJoinPool.commonPool-worker-6 @coroutine#3
    execute 40 in ForkJoinPool.commonPool-worker-1 @coroutine#1
    execute 44 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 41 in ForkJoinPool.commonPool-worker-4 @coroutine#4
    execute 46 in ForkJoinPool.commonPool-worker-1 @coroutine#2
    execute 47 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 45 in main @coroutine#12
    execute 42 in ForkJoinPool.commonPool-worker-2 @coroutine#9
    execute 43 in ForkJoinPool.commonPool-worker-7 @coroutine#10
    execute 48 in ForkJoinPool.commonPool-worker-3 @coroutine#3
    execute 52 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 49 in ForkJoinPool.commonPool-worker-1 @coroutine#5
    execute 54 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 53 in main @coroutine#12
    execute 50 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 51 in ForkJoinPool.commonPool-worker-6 @coroutine#7
    execute 56 in ForkJoinPool.commonPool-worker-3 @coroutine#3
    execute 55 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 60 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 61 in ForkJoinPool.commonPool-worker-1 @coroutine#5
    execute 57 in ForkJoinPool.commonPool-worker-4 @coroutine#4
    execute 59 in ForkJoinPool.commonPool-worker-3 @coroutine#10
    execute 64 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 58 in ForkJoinPool.commonPool-worker-6 @coroutine#9
    execute 62 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 63 in main @coroutine#12
    execute 68 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 65 in ForkJoinPool.commonPool-worker-1 @coroutine#3
    execute 66 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 67 in ForkJoinPool.commonPool-worker-7 @coroutine#7
    execute 69 in ForkJoinPool.commonPool-worker-6 @coroutine#4
    execute 70 in ForkJoinPool.commonPool-worker-3 @coroutine#2
    execute 74 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 75 in main @coroutine#12
    execute 71 in ForkJoinPool.commonPool-worker-5 @coroutine#5
    execute 76 in ForkJoinPool.commonPool-worker-7 @coroutine#3
    execute 73 in ForkJoinPool.commonPool-worker-6 @coroutine#10
    execute 78 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 72 in ForkJoinPool.commonPool-worker-1 @coroutine#9
    execute 77 in ForkJoinPool.commonPool-worker-3 @coroutine#8
    execute 79 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 83 in main @coroutine#12
    execute 84 in ForkJoinPool.commonPool-worker-4 @coroutine#3
    execute 85 in ForkJoinPool.commonPool-worker-5 @coroutine#5
    execute 82 in ForkJoinPool.commonPool-worker-1 @coroutine#7
    execute 81 in ForkJoinPool.commonPool-worker-6 @coroutine#4
    execute 80 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 89 in ForkJoinPool.commonPool-worker-3 @coroutine#8
    execute 90 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 91 in main @coroutine#12
    execute 86 in ForkJoinPool.commonPool-worker-5 @coroutine#6
    execute 88 in ForkJoinPool.commonPool-worker-6 @coroutine#10
    execute 87 in ForkJoinPool.commonPool-worker-1 @coroutine#9
    execute 92 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 93 in ForkJoinPool.commonPool-worker-4 @coroutine#3
    execute 99 in main @coroutine#12
    execute 97 in ForkJoinPool.commonPool-worker-3 @coroutine#8
    execute 98 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 95 in ForkJoinPool.commonPool-worker-1 @coroutine#5
    execute 100 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 94 in ForkJoinPool.commonPool-worker-5 @coroutine#4
    execute 96 in ForkJoinPool.commonPool-worker-7 @coroutine#7
    1
    5
    6
    7
    2
    8
    4
    3
    12
    10
    15
    11
    16
    9
    14
    13
    20
    17
    18
    24
    23
    22
    19
    21
    25
    28
    29
    30
    27
    26
    32
    31
    36
    35
    33
    38
    37
    34
    39
    40
    44
    41
    46
    47
    45
    42
    43
    48
    52
    49
    54
    53
    50
    51
    56
    55
    60
    61
    57
    59
    64
    58
    62
    63
    68
    65
    66
    67
    69
    70
    74
    75
    71
    76
    73
    78
    72
    77
    79
    83
    84
    85
    82
    81
    80
    89
    90
    91
    86
    88
    87
    92
    93
    99
    97
    98
    95
    100
    94
    96
    
        4
  •  0
  •   DmitryBorodin    6 年前

    不带任何参数的async()使用DefaultDispatcher并将从父池中获取池,因此所有异步调用都在CommonPool中执行。如果希望不同的线程集运行代码,请创建自己的池。 虽然通常不使用主线程进行计算是很好的做法,但这取决于您的用例。