代码之家  ›  专栏  ›  技术社区  ›  guilhebl

如何在围棋中安全地关上一辆车?

  •  1
  • guilhebl  · 技术社区  · 7 年前

    我正在实现一个简单的工作者池算法,其中1个发送者(调度器)将作业发送给m个(工作者)go例程。为此,它使用一个通道通道来为第一个空闲工作线程分配一个可用的作业:

    // builds the pool
    func NewWorkerPool(maxWorkers int) WorkerPool {
        pool := make(chan chan Job, maxWorkers)
        workers := make([]Worker, 0)
        return WorkerPool{
            WorkerPool: pool,
            Workers: workers,
            maxWorkers: maxWorkers,
            waitGroup: sync.WaitGroup{}}
    }
    
    // Starts the WorkerPool
    func (p *WorkerPool) Run(queue chan Job) {
        w := p.waitGroup
    
        // starting n number of workers
        for i := 0; i < p.maxWorkers; i++ {
            worker := NewWorker(p.WorkerPool)
            p.Workers = append(p.Workers, worker)
            w.Add(1)
            worker.Start(&w)
        }
    
        go p.dispatch(queue)
    }
    
    // dispatches a job to be handled by an idle Worker of the pool
    func (p *WorkerPool) dispatch(jobQueue chan Job) {
        for {
            select {
            case job := <-jobQueue:
                // a model request has been received
                go func(job Job) {
                    // try to obtain a worker model channel that is available.
                    // this will block until a worker is idle
                    jobChannel := <-p.WorkerPool
    
                    // dispatch the model to the worker model channel
                    jobChannel <- job
                }(job)
            }
        }
    }
    
    
    // checks if a Worker Pool is open or closed - If we can recieve on the channel then it is NOT closed
    func (p *WorkerPool) IsOpen() bool {
        _, ok := <-p.WorkerPool
        return ok
    }
    

    // Start method starts the run loop for the worker, listening for a quit channel in
    // case we need to stop it
    func (w Worker) Start(wg *sync.WaitGroup) {
        go func() {
            defer wg.Done()
            for {
                // register the current worker into the worker queue.
                w.WorkerPool <- w.JobChannel
    
                select {
                case job := <-w.JobChannel:
                    // we have received a work request.
                    result := job.Run()
                    job.ReturnChannel <- result
    
                    // once result is returned close the job output channel
                    close(job.ReturnChannel)
    
                case <-w.quit:
                    // we have received a signal to stop
                    return
                }
            }
        }()
    }
    
    // Stop signals the worker to stop listening for work requests.
    func (w Worker) Stop() {
        go func() {
            w.quit <- true
        }()
    }
    

    现在,我尝试使用以下方法关闭池,我使用了同步。等待组,以等待所有工人关闭:

    // stops the Pool
    func (p *WorkerPool) Stop() bool {
        // stops all workers
        for _, worker := range p.Workers {
            worker.Stop()
        }
        p.waitGroup.Wait() //Wait for the goroutines to shutdown
    
        close(p.WorkerPool)
    
        more := p.IsOpen()
    
        fmt.Printf(" more? %t", more)
    
        return ok
    }
    

    //打印更多?符合事实的

    即使我等待worker退出,然后调用close(p.WorkerPool),我仍然打开了通道,在这种情况下缺少什么,如何相应地关闭通道?

    1 回复  |  直到 7 年前
        1
  •  2
  •   Sarath Sadasivan Pillai    7 年前

    关闭通道表示不再向其发送值。这有助于将完成情况传达给信道接收器。

    通道中的数据仍然存在,您可能需要关闭通道,然后按以下方式删除其中的所有通道

    // Stop stops the Pool and free all the channels
    func (p *WorkerPool) Stop() bool {
        // stops all workers
        for _, worker := range p.Workers {
            worker.Stop()
        }
        p.waitGroup.Wait() //Wait for the goroutines to shutdown
        close(p.WorkerPool)
        for channel := range p.WorkerPool {
            fmt.Println("Freeing channel") //remove all the channels
        }
        more := p.IsOpen()
        fmt.Printf(" more? %t", more)
    
        return ok
    }
    

    顺便说一句,你不能使用 _, ok <-