代码之家  ›  专栏  ›  技术社区  ›  gbalduzzi

同步从不同线程中的繁重操作写入文件

  •  1
  • gbalduzzi  · 技术社区  · 7 年前

    我需要一次细化一个文件(可能是一个大文件)一个块,并将结果写入一个新文件。 简单地说,我有一个基本的功能来阐述一个块:

    func elaborateBlock(block []byte) []byte { ... }

    每个块都需要详细说明,然后按顺序写入输出文件(保持原始顺序)。

    单线程实现很简单:

    for {
            buffer := make([]byte, BlockSize)
            _, err := inputFile.Read(buffer)
    
            if err == io.EOF {
                break
            }
            processedData := elaborateBlock(buffer)
            outputFile.Write(processedData)
    }
    

    但是精化可能很繁重,每个块都可以单独处理,所以多线程实现是自然的进化。

    我想出的解决方案是创建一个通道数组,计算不同线程中的每个块,并通过循环通道数组来同步最终写入:

    效用函数:

    func blockThread(channel chan []byte, block []byte) {
        channel <- elaborateBlock(block)
    }
    

    在主程序中:

    chans = []chan []byte {}
    
    for {
        buffer := make([]byte, BlockSize)
        _, err := inputFile.Read(buffer)
    
        if err == io.EOF {
            break
        }
    
        channel := make(chan []byte)
        chans = append(chans, channel)
    
        go blockThread(channel, buffer)
    }
    
    for i := range chans {
        data := <- chans[i]
        outputFile.Write(data)
    }
    

    这种方法可以工作,但对于大文件可能会有问题,因为它需要在开始写入输出之前将整个文件加载到内存中。

    你认为有没有更好的解决方案,同时也有更好的整体性能?

    2 回复  |  直到 7 年前
        1
  •  1
  •   icza    7 年前

    如果块确实需要按顺序写出

    如果要同时处理多个块,显然需要同时在内存中保存多个块。

    您可以决定要同时处理多少个块,这就足够同时将多个块读入内存。你可以说你想同时处理5个块。这将限制内存使用,并且仍然可能最大限度地利用您的CPU资源。建议您根据可用的CPU内核选择一个数字(如果处理块尚未使用多核)。可以使用 runtime.GOMAXPROCS(0) .

    您应该有一个按顺序读取输入文件的goroutine,并用作业(也包含块索引)包装块。

    您应该有多个worker goroutine,最好有多个核心(但也要尝试更小和更高的值)。每个worker goroutine只接收作业和调用 elaborateBlock() 在数据上,并在结果通道上传递。

    应该有一个指定的使用者来接收已完成的作业,并将它们按顺序写入输出文件。由于goroutine是并发运行的,并且我们无法控制块的完成顺序,因此使用者应该跟踪下一个要写入输出的块的索引。无序到达的块只应被存储,并且只有在后续块到达时才继续写入。

    这是一个(不完整的)示例,说明如何执行所有这些操作:

    const BlockSize = 1 << 20 // 1 MB
    
    func elaborateBlock(in []byte) []byte { return in }
    
    type Job struct {
        Index int
        Block []byte
    }
    
    func producer(jobsCh chan<- *Job) {
        // Init input file:
        var inputFile *os.File
    
        for index := 0; ; index++ {
            job := &Job{
                Index: index,
                Block: make([]byte, BlockSize),
            }
    
            _, err := inputFile.Read(job.Block)
            if err != nil {
                break
            }
    
            jobsCh <- job
        }
    }
    
    func worker(jobsCh <-chan *Job, resultCh chan<- *Job) {
        for job := range jobsCh {
            job.Block = elaborateBlock(job.Block)
            resultCh <- job
        }
    }
    
    func consumer(resultCh <-chan *Job) {
        // Init output file:
        var outputFile *os.File
    
        nextIdx := 0
        jobMap := map[int]*Job{}
    
        for job := range resultCh {
            jobMap[job.Index] = job
    
            // Write out all blocks we have in contiguous index range:
            for {
                j := jobMap[nextIdx]
                if j == nil {
                    break
                }
                if _, err := outputFile.Write(j.Block); err != nil {
                    // handle error, maybe terminate?
                }
                delete(nextIdx) // This job is written out
                nextIdx++
            }
        }
    }
    
    func main() {
        jobsCh := make(chan *Job)
        resultCh := make(chan *Job)
    
        for i := 0; i < 5; i++ {
            go worker(jobsCh, resultCh)
        }
    
        wg := sync.WaitGroup{}
        wg.Add(1)
        go func() {
            defer wg.Done()
            consumer(resultCh)
        }()
    
        // Start producing jobs:
        producer(jobsCh)
        // No more jobs:
        close(jobsCh)
    
        // Wait for consumer to complete:
        wg.Wait()
    }
    

    这里要注意的一点是:单凭这一点不能保证限制使用的内存。想象一下,第一个块需要大量时间来计算,而随后的块则不需要。会发生什么?第一个区块将占用一个工人,其他工人将“快速”完成后续区块。使用者将所有内容存储在内存中,等待第一个块完成(因为必须先写出该块)。这可能会增加内存使用量。

    我们怎么能避免这个呢?

    通过引入一个工作池。新的工作岗位不能随意创造,只能从一个池子里拿出来。如果池是空的,制作人必须等待。所以当制作人需要一个新的 Job ,从游泳池中取出一个。当消费者写出 工作 ,将其放回池中。就这么简单。这也会减少垃圾收集器的压力,因为作业(和 []byte 缓冲区)不会被创建和丢弃,它们可以被重用。

    为了简单 工作 池实现可以使用缓冲通道。有关详细信息,请参见 How to implement Memory Pooling in Golang .

    如果块可以按任何顺序写入

    另一个选择是提前分配输出文件。如果输出块的大小也是确定的,可以这样做(例如 outsize := (insize / blocksize) * outblockSize )

    到什么时候?

    如果预先分配了输出文件,则使用者不需要按顺序等待输入块。一旦计算出一个输入块,你就可以计算出它在输出中的位置,寻找到那个位置,然后写下来。你可以用这个 File.Seek() .

    这个解决方案仍然需要将块索引从生产者发送给消费者,但是消费者不需要存储无序到达的块,因此消费者可以更简单,在后续块到达之前不需要存储已完成的块,以便继续写入输出文件。

    请注意,此解决方案自然不会对内存造成威胁,因为已完成的作业永远不会累积/缓存,而是按完成顺序写出。


    有关详细信息和技巧,请参见相关问题:

    Is this an idiomatic worker thread pool in Go?

    How to collect values from N goroutines executed in a specific order?

        2
  •  0
  •   ramrunner    7 年前

    下面是一个工作示例,它应该工作,并且尽可能接近原始代码。

    其思想是将数组转换为字节通道。然后

    • 首先启动一个消费者,它将读取这个通道的字节数,从中读取并写入结果。

    • 回到主线程上,您创建一个字节通道,将其写入通道通道(现在消费者按顺序读取结果),然后启动将在分配的通道(生产者)上执行工作和写入的进程。

    现在将发生的是,一旦从使用者读取并写入生成的块,与之相关联的资源将被释放,procuders和使用者之间将发生“竞争”。这可能是对原来设计的改进。

    以下是代码和操场链接:

    package main
    
    import (
        "bytes"
        "fmt"
        "io"
        "sync"
    )
    
    func elaborateBlock(b []byte) []byte {
        return []byte("werkwerkwerk")
    }
    
    func blockThread(channel chan []byte, block []byte, wg *sync.WaitGroup) {
        channel <- elaborateBlock(block)
        wg.Done()
    }
    
    func main() {
        chans := make(chan chan []byte)
        BlockSize := 3
        inputBytes := bytes.NewBuffer([]byte("transmutemetowerkwerkwerk"))
    
        producewg := sync.WaitGroup{}
        consumewg := sync.WaitGroup{}
        consumewg.Add(1)
        go func() {
            chancount := 0
            for ch := range chans {
                data := <-ch
                fmt.Printf("got %d block, result:%s\n", chancount, data)
                chancount++
            }
            fmt.Printf("done receiving\n")
            consumewg.Done()
        }()
        for {
            buffer := make([]byte, BlockSize)
            _, err := inputBytes.Read(buffer)
    
            if err == io.EOF {
                go func() {
                    //wait for all the procuders to finish
                    producewg.Wait()
                    //then close the main channel to notify the consumer
                    close(chans)
                }()
                break
            }
    
            channel := make(chan []byte)
            chans <- channel //give the channel that we return the result to the receiver
    
            producewg.Add(1)
            go blockThread(channel, buffer, &producewg)
        }
    
        consumewg.Wait()
        fmt.Printf("main exiting")
    }
    

    playground link

    作为一个小问题,我对“将整个文件读入内存”语句感觉不太好,因为您只是每次从读取器读取一个块,也许“将整个计算的结果保存在内存中”更合适?