我有一个项目,我们从某个源读取数据,对其进行处理,然后将该数据的子集(可能还有压缩版本)写入磁盘。我们写入许多(通常在200个左右)不同的文件来对应不同的数据“通道”,并且经常在这些文件中写入几MB/s到磁盘。
Write
或者去
Flush
大约需要500毫秒。如果可能的话,我会把这个数字减少到50毫秒以下。
写
和
冲洗
冲洗
通话时间仍然是正常通话时间的100倍。尝试的MWE如下,它几乎完全按照我们编写数据的方式复制。它的顶部有一些标记,用于在代码的不同部分刷新,我正在测试是否调用
冲洗
冲洗
和
打电话会很有帮助的。即使平均的写时间比较慢,我也只想保持一致的行为。
package main
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"os/signal"
"sync"
"time"
)
const flushWithinBlock = true
const flushAfterBlocks = true
type Writer struct {
FileName string
headerWritten bool
writer *bufio.Writer
}
func (w *Writer) writeHeader() error {
file, err := os.Create(w.FileName)
if err != nil {
return err
}
w.writer = bufio.NewWriterSize(file, 32768)
w.writer.WriteString("HEADER\n")
w.headerWritten = true
return nil
}
func (w *Writer) writeRecord(nBytes int) error {
data := make([]byte, nBytes)
nWritten, err := w.writer.Write(data)
if nWritten != nBytes {
return fmt.Errorf("wrong number of bytes written")
}
return err
}
func main() {
dirname, err0 := ioutil.TempDir("", "")
if err0 != nil {
panic(err0)
}
fmt.Println(dirname)
recordLength := 500
numberOfChannels := 240
recordsPerChanPerTick := 5
writers := make([]*Writer, numberOfChannels)
abortChan := make(chan struct{})
for i := range writers {
writers[i] = &Writer{FileName: fmt.Sprintf("%v/%v.ljh", dirname, i)}
}
go func() {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt)
<-signalChan
close(abortChan)
}()
tickDuration := 50 * time.Millisecond
ticker := time.NewTicker(tickDuration)
z := 0
tLast := time.Now()
fmt.Printf("recordsPerChanPerTick %v, Chans %v, tickDuration %v\n", recordsPerChanPerTick, numberOfChannels, tickDuration)
fmt.Printf("records/second/chan %v, records/second total %v\n", float64(recordsPerChanPerTick)/tickDuration.Seconds(), float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds())
fmt.Printf("megabytes/second total %v\n", float64(recordLength)*float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds()*1e-6)
fmt.Printf("flushWithinBlock %v, flushAfterBlocks %v\n", flushWithinBlock, flushAfterBlocks)
for {
// 1. here we would get data from data source
z++
select {
case <-abortChan:
fmt.Println("clean exit")
return
case <-ticker.C:
var wg sync.WaitGroup
writeDurations := make([]time.Duration, numberOfChannels)
flushDurations := make([]time.Duration, numberOfChannels)
for i, w := range writers {
wg.Add(1)
go func(w *Writer, i int) {
tStart := time.Now()
defer wg.Done()
for j := 0; j < recordsPerChanPerTick; j++ {
if !w.headerWritten {
err := w.writeHeader()
if err != nil {
panic(fmt.Sprintf("failed create file and write header: %v\n", err))
}
}
w.writeRecord(recordLength)
}
tWrite := time.Now()
if flushWithinBlock {
w.writer.Flush()
}
writeDurations[i] = tWrite.Sub(tStart)
flushDurations[i] = time.Now().Sub(tWrite)
}(w, i)
}
wg.Wait()
for _, w := range writers {
if flushAfterBlocks {
w.writer.Flush()
}
}
var writeSum time.Duration
var flushSum time.Duration
var writeMax time.Duration
var flushMax time.Duration
for i := range writeDurations {
writeSum += writeDurations[i]
flushSum += flushDurations[i]
if writeDurations[i] > writeMax {
writeMax = writeDurations[i]
}
if flushDurations[i] > flushMax {
flushMax = flushDurations[i]
}
}
if z%100 == 0 || time.Now().Sub(tLast) > 75*time.Millisecond {
fmt.Printf("z %v, time.Now().Sub(tLast) %v\n", z, time.Now().Sub(tLast))
fmt.Printf("writeMean %v, flushMean %v, writeMax %v, flushMax %v\n", writeSum/time.Duration(numberOfChannels), flushSum/time.Duration(numberOfChannels), writeMax, flushMax)
}
tLast = time.Now()
}
}
}
写
冲洗
在我们的整个项目中:
/tmp/296105809
recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
records/second/chan 100, records/second total 24000
megabytes/second total 12
flushWithinBlock true, flushAfterBlocks true
z 1, time.Now().Sub(tLast) 75.96973ms
writeMean 14.017745ms, flushMean 7.847µs, writeMax 24.761147ms, flushMax 420.896µs
z 100, time.Now().Sub(tLast) 50.13856ms
writeMean 1.71µs, flushMean 4.213µs, writeMax 12.271µs, flushMax 32.133µs
z 200, time.Now().Sub(tLast) 50.006063ms
writeMean 1.651µs, flushMean 3.032µs, writeMax 79.006µs, flushMax 7.246µs
z 300, time.Now().Sub(tLast) 50.151421ms
writeMean 1.685µs, flushMean 4.542µs, writeMax 10.429µs, flushMax 14.087µs
z 400, time.Now().Sub(tLast) 50.059208ms
带有SSD的MacBook Pro上的示例输出。你可以看到更长的时间
和
冲洗
flushMax
在线81 vs更典型的500 us
平齐最大值
在100号线上。
/var/folders/_0/25kp6h7x25v6vyjv2yjlcnkm000wrm/T/934618054
recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
records/second/chan 100, records/second total 24000
megabytes/second total 12
flushWithinBlock true, flushAfterBlocks true
z 1, time.Now().Sub(tLast) 84.897446ms
writeMean 10.265068ms, flushMean 464.53µs, writeMax 24.752873ms, flushMax 3.528286ms
... some output removed
... NOTE, line 81 was printed because it took longer than normal
z 81, time.Now().Sub(tLast) 75.804358ms
writeMean 15.056µs, flushMean 18.324892ms, writeMax 408.406µs, flushMax 30.765425ms
z 100, time.Now().Sub(tLast) 54.753448ms
writeMean 3.25µs, flushMean 84.963µs, writeMax 74.152µs, flushMax 499.322µs