为了更好地说明我的问题,下面是一个带有注释的代码示例:
(观察者网:我使用的是golangv1.20.5)
// start to consume a queue
// deliveries is an unbuffered receiver go channel of the type <-chan Message
deliveries, err := qb.pubSubSubscriber.Consume(ctx, qName)
if err != nil {
return err
}
// infinite loop to consume the messages
for msg := range deliveries {
// and for every msg I execute a function
result := myFunc()
}
这里的想法是像一个由n个工作人员组成的池一样使用消息,如果其中一个工作人员空闲,就会得到一条消息。
更清楚地说,下面的例子不是一个有效的解决方案:
// for the workerPool is this situation i would use the
// tunny worker pool go package
workerPool := newWorkerPoolWithNWorkers()
for msg := range deliveries {
go func(){
result:=workerPool(myFunc)
}()
}
这是无效的,因为在我看来,这段代码所做的是一次获取每条消息,并让workerPool一次处理n个工作人员,但问题是,如何在无限循环中为每个“空闲”工作人员获取新消息?
假设我们有一个包含100条消息的队列,想要的解决方案是首先提取3条消息,但当处理其中一条提取的消息时,代码会在循环中获得另一条新消息,并且是无限的。
我试着做一些类似的事情
wg := new(sync.WaitGroup)
counter := 0
for msg := range deliveries {
wg.Wait()
go func(){
counter ++
if counter == n { // n could be any integer number wanted to limit the pool size
//this way a new message would be at wg.Wait() if all n goroutines are busy
wg.Add(1)
}
result:= myFunc()
count--
wg.Done()// one of the N "workers" is free, so we can ask for one more message
}()
}
但是它看起来太复杂了,我认为它不起作用。
如果有人能帮助我,我将非常感激!