代码之家  ›  专栏  ›  技术社区  ›  gggg

golang中偶尔长时间(500+ms)写入或刷新

  •  0
  • gggg  · 技术社区  · 6 年前

    我有一个项目,我们从某个源读取数据,对其进行处理,然后将该数据的子集(可能还有压缩版本)写入磁盘。我们写入许多(通常在200个左右)不同的文件来对应不同的数据“通道”,并且经常在这些文件中写入几MB/s到磁盘。

    Write 或者去 Flush 大约需要500毫秒。如果可能的话,我会把这个数字减少到50毫秒以下。

    冲洗 冲洗 通话时间仍然是正常通话时间的100倍。尝试的MWE如下,它几乎完全按照我们编写数据的方式复制。它的顶部有一些标记,用于在代码的不同部分刷新,我正在测试是否调用 冲洗 冲洗 打电话会很有帮助的。即使平均的写时间比较慢,我也只想保持一致的行为。

    package main
    
    import (
        "bufio"
        "fmt"
        "io/ioutil"
        "os"
        "os/signal"
        "sync"
        "time"
    )
    
    const flushWithinBlock = true
    const flushAfterBlocks = true
    
    type Writer struct {
        FileName      string
        headerWritten bool
        writer        *bufio.Writer
    }
    
    func (w *Writer) writeHeader() error {
        file, err := os.Create(w.FileName)
        if err != nil {
            return err
        }
        w.writer = bufio.NewWriterSize(file, 32768)
        w.writer.WriteString("HEADER\n")
        w.headerWritten = true
        return nil
    }
    
    func (w *Writer) writeRecord(nBytes int) error {
        data := make([]byte, nBytes)
        nWritten, err := w.writer.Write(data)
        if nWritten != nBytes {
            return fmt.Errorf("wrong number of bytes written")
        }
        return err
    }
    
    func main() {
        dirname, err0 := ioutil.TempDir("", "")
        if err0 != nil {
            panic(err0)
        }
        fmt.Println(dirname)
        recordLength := 500
        numberOfChannels := 240
        recordsPerChanPerTick := 5
        writers := make([]*Writer, numberOfChannels)
        abortChan := make(chan struct{})
        for i := range writers {
            writers[i] = &Writer{FileName: fmt.Sprintf("%v/%v.ljh", dirname, i)}
        }
        go func() {
            signalChan := make(chan os.Signal)
            signal.Notify(signalChan, os.Interrupt)
            <-signalChan
            close(abortChan)
        }()
    
        tickDuration := 50 * time.Millisecond
        ticker := time.NewTicker(tickDuration)
        z := 0
        tLast := time.Now()
        fmt.Printf("recordsPerChanPerTick %v, Chans %v, tickDuration %v\n", recordsPerChanPerTick, numberOfChannels, tickDuration)
        fmt.Printf("records/second/chan %v, records/second total %v\n", float64(recordsPerChanPerTick)/tickDuration.Seconds(), float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds())
        fmt.Printf("megabytes/second total %v\n", float64(recordLength)*float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds()*1e-6)
        fmt.Printf("flushWithinBlock %v, flushAfterBlocks %v\n", flushWithinBlock, flushAfterBlocks)
        for {
            // 1. here we would get data from data source
            z++
            select {
            case <-abortChan:
                fmt.Println("clean exit")
                return
            case <-ticker.C:
                var wg sync.WaitGroup
                writeDurations := make([]time.Duration, numberOfChannels)
                flushDurations := make([]time.Duration, numberOfChannels)
                for i, w := range writers {
                    wg.Add(1)
                    go func(w *Writer, i int) {
                        tStart := time.Now()
    
                        defer wg.Done()
                        for j := 0; j < recordsPerChanPerTick; j++ {
                            if !w.headerWritten {
                                err := w.writeHeader()
                                if err != nil {
                                    panic(fmt.Sprintf("failed create file and write header: %v\n", err))
                                }
                            }
                            w.writeRecord(recordLength)
                        }
                        tWrite := time.Now()
                        if flushWithinBlock {
                            w.writer.Flush()
                        }
                        writeDurations[i] = tWrite.Sub(tStart)
                        flushDurations[i] = time.Now().Sub(tWrite)
                    }(w, i)
                }
                wg.Wait()
                for _, w := range writers {
                    if flushAfterBlocks {
                        w.writer.Flush()
                    }
                }
                var writeSum time.Duration
                var flushSum time.Duration
                var writeMax time.Duration
                var flushMax time.Duration
                for i := range writeDurations {
                    writeSum += writeDurations[i]
                    flushSum += flushDurations[i]
                    if writeDurations[i] > writeMax {
                        writeMax = writeDurations[i]
                    }
                    if flushDurations[i] > flushMax {
                        flushMax = flushDurations[i]
                    }
                }
                if z%100 == 0 || time.Now().Sub(tLast) > 75*time.Millisecond {
                    fmt.Printf("z %v, time.Now().Sub(tLast) %v\n", z, time.Now().Sub(tLast))
                    fmt.Printf("writeMean %v, flushMean %v, writeMax %v, flushMax %v\n", writeSum/time.Duration(numberOfChannels), flushSum/time.Duration(numberOfChannels), writeMax, flushMax)
                }
                tLast = time.Now()
            }
        }
    
    }
    

    冲洗 在我们的整个项目中:

    /tmp/296105809
    recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
    records/second/chan 100, records/second total 24000
    megabytes/second total 12
    flushWithinBlock true, flushAfterBlocks true
    z 1, time.Now().Sub(tLast) 75.96973ms
    writeMean 14.017745ms, flushMean 7.847µs, writeMax 24.761147ms, flushMax 420.896µs
    z 100, time.Now().Sub(tLast) 50.13856ms
    writeMean 1.71µs, flushMean 4.213µs, writeMax 12.271µs, flushMax 32.133µs
    z 200, time.Now().Sub(tLast) 50.006063ms
    writeMean 1.651µs, flushMean 3.032µs, writeMax 79.006µs, flushMax 7.246µs
    z 300, time.Now().Sub(tLast) 50.151421ms
    writeMean 1.685µs, flushMean 4.542µs, writeMax 10.429µs, flushMax 14.087µs
    z 400, time.Now().Sub(tLast) 50.059208ms
    

    带有SSD的MacBook Pro上的示例输出。你可以看到更长的时间 冲洗 flushMax 在线81 vs更典型的500 us 平齐最大值 在100号线上。

    /var/folders/_0/25kp6h7x25v6vyjv2yjlcnkm000wrm/T/934618054
    recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
    records/second/chan 100, records/second total 24000
    megabytes/second total 12
    flushWithinBlock true, flushAfterBlocks true
    z 1, time.Now().Sub(tLast) 84.897446ms
    writeMean 10.265068ms, flushMean 464.53µs, writeMax 24.752873ms, flushMax 3.528286ms
    ... some output removed
    ... NOTE, line 81 was printed because it took longer than normal
    z 81, time.Now().Sub(tLast) 75.804358ms
    writeMean 15.056µs, flushMean 18.324892ms, writeMax 408.406µs, flushMax 30.765425ms
    z 100, time.Now().Sub(tLast) 54.753448ms
    writeMean 3.25µs, flushMean 84.963µs, writeMax 74.152µs, flushMax 499.322µs
    
    0 回复  |  直到 6 年前