代码之家  ›  专栏  ›  技术社区  ›  Lukas Müller

如何以“池化”方式使用无缓冲的go通道

  •  0
  • Lukas Müller  · 技术社区  · 1 年前

    为了更好地说明我的问题,下面是一个带有注释的代码示例:

    (观察者网:我使用的是golangv1.20.5)

    // start to consume a queue
    // deliveries is an unbuffered receiver go channel of the type <-chan Message
    deliveries, err := qb.pubSubSubscriber.Consume(ctx, qName)
    if err != nil {
        return err
    }
    
    // infinite loop to consume the messages
    for msg := range deliveries {
        // and for every msg I execute a function
        result := myFunc()
    }
    

    这里的想法是像一个由n个工作人员组成的池一样使用消息,如果其中一个工作人员空闲,就会得到一条消息。

    更清楚地说,下面的例子不是一个有效的解决方案:

    // for the workerPool is this situation i would use the
    // tunny worker pool go package
    workerPool := newWorkerPoolWithNWorkers()
    for msg := range deliveries {
        go func(){
             result:=workerPool(myFunc)
        }()   
    }
    
    

    这是无效的,因为在我看来,这段代码所做的是一次获取每条消息,并让workerPool一次处理n个工作人员,但问题是,如何在无限循环中为每个“空闲”工作人员获取新消息?

    假设我们有一个包含100条消息的队列,想要的解决方案是首先提取3条消息,但当处理其中一条提取的消息时,代码会在循环中获得另一条新消息,并且是无限的。

    我试着做一些类似的事情

    wg := new(sync.WaitGroup)
    counter := 0 
    for msg := range deliveries {
        wg.Wait()
        go func(){
             counter ++
             if counter == n { // n could be any integer number wanted to limit the pool size
                 //this way a new message would be at wg.Wait() if all n goroutines are busy
                 wg.Add(1)
             }
             result:= myFunc()
             count--
             wg.Done()// one of the N "workers" is free, so we can ask for one more message
        }()   
    }
    
    

    但是它看起来太复杂了,我认为它不起作用。

    如果有人能帮助我,我将非常感激!

    1 回复  |  直到 1 年前
        1
  •  2
  •   Burak Serdar    1 年前

    我觉得你想得太多了。要使用工作池使用通道中的消息,可以:

    for i:=0;i<nWorkers;i++ {
       go func() {
          for msg:=range deliveries {
             myFunc(msg)
          }
       }()
    }
    

    换句话说,你创造 n goroutines都在同一个频道收听。运行时处理调度哪个goroutine接收它 deliveries 通道关闭,所有工人终止工作。