代码之家  ›  专栏  ›  技术社区  ›  Rohanil

常规生产者消费者模式恐慌

  •  0
  • Rohanil  · 技术社区  · 7 年前

    我已经实现了本文中提到的goroutine的生产者-消费者模式 answer . 但有时会出现恐慌,错误是:“恐慌:同步:负等待组计数器”。我的示例代码如下:

    package main
    
    import (
        "bytes"
        "encoding/gob"
        "log"
        _ "net/http/pprof"
        "sync"
    )
    
    // Test ...
    type Test struct {
        PropA []int
        PropB []int
    }
    
    // Clone deep-copies a to b
    func Clone(a, b interface{}) {
    
        buff := new(bytes.Buffer)
        enc := gob.NewEncoder(buff)
        dec := gob.NewDecoder(buff)
        enc.Encode(a)
        dec.Decode(b)
    }
    
    func main() {
        test := Test{
            PropA: []int{211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222},
            PropB: []int{111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124},
        }
        var wg, wg2 sync.WaitGroup
        ch := make(chan int, 5)
        results := make(chan Test, 5)
    
        // start consumers
        for i := 0; i < 4; i++ {
            wg.Add(1)
            go func(ch <-chan int, results chan<- Test) {
                defer wg.Done()
                for propA := range ch {
                    var temp Test
                    Clone(&test, &temp)
                    temp.PropA = []int{propA}
                    results <- temp
                }
            }(ch, results)
        }
    
        // start producing
        go func(ch chan<- int) {
            defer wg.Done()
            for _, propA := range test.PropA {
                ch <- propA
            }
            close(ch)
        }(ch)
    
        wg2.Add(1)
        go func(results <-chan Test) {
            defer wg2.Done()
            for tt := range results {
                log.Printf("finished propA %+v\n", tt.PropA[0])
            }
        }(results)
    
        wg.Wait() // Wait all consumers to finish processing jobs
    
        // All jobs are processed, no more values will be sent on results:
        close(results)
    
        wg2.Wait()
    }
    

    当我运行代码4-5次以上时,它至少会恐慌一次。有时,错误消息是“恐慌:在封闭通道上发送”。我不明白在制作人完成发送之前,频道是如何被关闭的,以及为什么Waitgroup计数器达到负值。谁能给我解释一下吗?

    编辑 panic的堆栈跟踪如下:(上述代码的文件名为 mycode.go )

    panic: send on closed channel
        panic: sync: negative WaitGroup counter
    
    goroutine 21 [running]:
    sync.(*WaitGroup).Add(0xc420134020, 0xffffffffffffffff)
        /usr/local/go/src/sync/waitgroup.go:75 +0x134
    sync.(*WaitGroup).Done(0xc420134020)
        /usr/local/go/src/sync/waitgroup.go:100 +0x34
    panic(0x7622e0, 0x80ffa0)
        /usr/local/go/src/runtime/panic.go:491 +0x283
    main.main.func1(0xc420134020, 0xc420136090, 0xc420148000, 0xc42014a000)
        /home/mycode.go:45 +0x80
    created by main.main
        /home/mycode.go:39 +0x21d
    exit status 2
    
    2 回复  |  直到 7 年前
        1
  •  4
  •   sberry    7 年前

    你的簿记 wg 因为制片人打来电话 wg.Done() 但是没有 Add()

    修复很简单,只需添加一个 wg.Add() 在你开始制作之前。

    ...
    
    wg.Add(1)
    // start producing
    go func(ch chan<- int) {
        defer wg.Done()
        for _, propA := range test.PropA {
            ch <- propA
        }
        close(ch)
    }(ch)
    
    ...
    

    将来,当您看到“负WaitGroup计数器”时,可以保证您不会匹配1:1的 Add Done .

        2
  •  0
  •   newbie master    7 年前

    看来你有两点工作组。Done()这可能就是问题所在。 我对你的代码进行了重构,使其只包含一个工作组点。Done()和我删除了工作组。从您放置的循环中添加(1)。 密码 here