代码之家  ›  专栏  ›  技术社区  ›  Terry Pang

当涉及多个通道时,select如何工作?

  •  18
  • Terry Pang  · 技术社区  · 7 年前

    我发现在多个非缓冲通道上使用select时

    select {
    case <- chana:
    case <- chanb:
    }
    

    即使两个通道都有数据,但在处理此选择时, case chana和case chanb的通话不平衡。

    package main
    
    import (
        "fmt"
        _ "net/http/pprof"
        "sync"
        "time"
    )
    
    func main() {
        chana := make(chan int)
        chanb := make(chan int)
    
        go func() {
            for i := 0; i < 1000; i++ {
                chana <- 100 * i
            }
        }()
    
        go func() {
            for i := 0; i < 1000; i++ {
                chanb <- i
            }
        }()
    
        time.Sleep(time.Microsecond * 300)
    
        acount := 0
        bcount := 0
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            for {
                select {
                case <-chana:
                    acount++
                case <-chanb:
                    bcount++
                }
                if acount == 1000 || bcount == 1000 {
                    fmt.Println("finish one acount, bcount", acount, bcount)
                    break
                }
            }
            wg.Done()
        }()
    
        wg.Wait()
    }
    

    运行此演示,当其中一个chana、chanb完成读/写时,另一个可能保持999-1。

    有什么方法可以确保平衡吗?

    找到相关主题
    golang-channels-select-statement

    4 回复  |  直到 7 年前
        1
  •  24
  •   icza    6 年前

    围棋 select 声明并没有偏向任何(现成的)案例。引用规范:

    如果一个或多个通信可以继续,则通过 均匀伪随机选择 . 否则,如果存在默认情况,则选择该情况。如果没有默认情况,“select”语句阻塞,直到至少一个通信可以继续。

    如果可以进行多个通信,则随机选择一个。这不是一个完美的随机分布,规范也不能保证这一点,但它是随机的。

    你所经历的是围棋运动场 GOMAXPROCS=1 ( which you can verify here )goroutine调度程序不是先发制人的。这意味着默认情况下,goroutine不是并行执行的。如果遇到阻塞操作(例如,从网络读取,或试图从阻塞的通道接收或发送),并且另一个准备运行的继续,则goroutine将被置于停止状态。

    由于您的代码中没有阻塞操作,goroutines可能不会被放置在公园中,并且可能只有一个“producer”goroutines会运行,而另一个可能不会被调度(永远)。

    在我的本地计算机上运行你的代码 GOMAXPROCS=4 ,我有非常“现实”的结果。运行几次,输出:

    finish one acount, bcount 1000 901
    finish one acount, bcount 1000 335
    finish one acount, bcount 1000 872
    finish one acount, bcount 427 1000
    

    如果您需要确定单个案例的优先级,请查看以下答案: Force priority of go select statement

    的默认行为 选择 不保证同等优先级,但平均而言,它将接近它。如果需要保证同等优先级,则不应使用 选择 ,但您可以从2个通道执行2个非阻塞接收序列,这可能看起来像这样:

    for {
        select {
        case <-chana:
            acount++
        default:
        }
        select {
        case <-chanb:
            bcount++
        default:
        }
        if acount == 1000 || bcount == 1000 {
            fmt.Println("finish one acount, bcount", acount, bcount)
            break
        }
    }
    

    如果两个电源值都相同,则上述2个非阻塞接收将以相同的速度(具有相同的优先级)耗尽2个通道,如果其中一个没有,则另一个通道将持续接收,而不会延迟或阻塞。

    需要注意的是,如果 没有一个 在提供任何接收值的通道中,这基本上是一个“繁忙”循环,因此会消耗计算能力。为了避免这种情况,我们可能会检测到没有通道准备就绪,并且 然后 使用 选择 语句,该语句将阻止两个接收,直到其中一个准备好接收,而不会浪费任何CPU资源:

    for {
        received := 0
        select {
        case <-chana:
            acount++
            received++
        default:
        }
        select {
        case <-chanb:
            bcount++
            received++
        default:
        }
    
        if received == 0 {
            select {
            case <-chana:
                acount++
            case <-chanb:
                bcount++
            }
        }
    
        if acount == 1000 || bcount == 1000 {
            fmt.Println("finish one acount, bcount", acount, bcount)
            break
        }
    }
    

    有关goroutine调度的更多详细信息,请参阅以下问题:

    Number of threads used by Go runtime

    Goroutines 8kb and windows OS thread 1 mb

    Why does it not create many threads when many goroutines are blocked in writing file in golang?

        2
  •  2
  •   Matt Harrison    7 年前

    正如评论中提到的,如果你想确保平衡,你可以放弃使用 select 总之,在阅读goroutine中,依靠无缓冲通道提供的同步:

    go func() {
        for {
            <-chana
            acount++
            <-chanb
            bcount++
    
            if acount == 1000 || bcount == 1000 {
                fmt.Println("finish one acount, bcount", acount, bcount)
                break
            }
        }
        wg.Done()
    }()
    
        3
  •  0
  •   Ravi R    7 年前

    已编辑 :你也可以从供应方面进行平衡,但对我来说,@icza的答案似乎比这个更好&还解释了导致这种情况的最初原因。令人惊讶的是,即使在我的(虚拟)机器上,这也是片面的。

    这里有一些东西可以从供应端平衡两个例程(不知何故似乎在操场上不起作用)。

    package main
    
    import (
        "fmt"
        _ "net/http/pprof"
        "sync"
        "sync/atomic"
        "time"
    )
    
    func main() {
        chana := make(chan int)
        chanb := make(chan int)
        var balanceSwitch int32
    
        go func() {
            for i := 0; i < 1000; i++ {
                for atomic.LoadInt32(&balanceSwitch) != 0 {
                    fmt.Println("Holding R1")
                    time.Sleep(time.Nanosecond * 1)
                }
                chana <- 100 * i
                fmt.Println("R1: Sent i", i)
                atomic.StoreInt32(&balanceSwitch, 1)
    
            }
        }()
    
        go func() {
            for i := 0; i < 1000; i++ {
    
                for atomic.LoadInt32(&balanceSwitch) != 1 {
                    fmt.Println("Holding R2")
                    time.Sleep(time.Nanosecond * 1)
                }
                chanb <- i
                fmt.Println("R2: Sent i", i)
                atomic.StoreInt32(&balanceSwitch, 0)
    
            }
        }()
    
        time.Sleep(time.Microsecond * 300)
    
        acount := 0
        bcount := 0
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            for {
                select {
                case <-chana:
                    acount++
                case <-chanb:
                    bcount++
                }
                fmt.Println("Acount Bcount", acount, bcount)
                if acount == 1000 || bcount == 1000 {
                    fmt.Println("finish one acount, bcount", acount, bcount)
                    break
                }
            }
            wg.Done()
        }()
    
        wg.Wait()
    }
    

    通过改变 atomic.LoadInt32(&balanceSwitch) != XX atomic.StoreInt32(&balanceSwitch, X) ,或其他机制,您可以将其映射到任意数量的例程。这可能不是最好的做法,但如果这是一个要求,那么你可能不得不考虑这样的选择。希望这有帮助。

        4
  •  0
  •   user2679859    3 年前

    似乎其他所有的评论者都忽略了真正的bug。

    这是不平衡的原因,因为它实际上永远无法与上面的代码平衡。它是单个线程,因此for循环在每次通过循环时只能处理chana或chanb。所以:一个chan总是先达到1000个。

    if语句使用||,这意味着当其中一个到达1000时,它将停止。

    这里简单的错误修复:将| |更改为&&在if语句中