如前所述,如果没有剩余写入程序,则命名管道读取器将收到EOF。
然而,我发现@JimB的解决方案不是最优的:
-
-
如果重新启动,平均会丢失50ms的数据。再说一次,没有什么好理由。
-
io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)
会是更好的解决方案,伊姆霍。但这甚至没有必要,因为
io.Copy
我的方法
github.com/rjeczalik/notify
可以用于访问我们感兴趣的事件,因为写入事件在大多数重要操作系统上跨平台工作。我们感兴趣的另一个事件是删除命名管道,因为我们将没有任何可读取的内容。
因此,我的解决方案是:
package main
import (
"flag"
"io"
"log"
"os"
"github.com/rjeczalik/notify"
)
const (
MAX_CONCURRENT_WRITERS = 5
)
var (
pipePath string
filePath string
)
func init() {
flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
log.SetOutput(os.Stderr)
}
func main() {
flag.Parse()
var p, f *os.File
var err error
var e notify.EventInfo
// The usual stuff: checking wether the named pipe exists etc
if p, err = os.Open(pipePath); os.IsNotExist(err) {
log.Fatalf("Named pipe '%s' does not exist", pipePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
} else if err != nil {
log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
}
// Yep, there and readable. Close the file handle on exit
defer p.Close()
// Do the same for the output file
if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
log.Fatalf("File '%s' does not exist", filePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
} else if err != nil {
log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
}
// Again, close the filehandle on exit
defer f.Close()
// Here is where it happens. We create a buffered channel for events which might happen
// on the file. The reason why we make it buffered to the number of expected concurrent writers
// is that if all writers would (theoretically) write at once or at least pretty close
// to each other, it might happen that we loose event. This is due to the underlying implementations
// not because of go.
c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)
// Here we tell notify to watch out named pipe for events, Write and Remove events
// specifically. We watch for remove events, too, as this removes the file handle we
// read from, making reads impossible
notify.Watch(pipePath, c, notify.Write|notify.Remove)
// We start an infinite loop...
for {
// ...waiting for an event to be passed.
e = <-c
switch e.Event() {
case notify.Write:
// If it a a write event, we copy the content of the named pipe to
// our output file and wait for the next event to happen.
// Note that this is idempotent: Even if we have huge writes by multiple
// writers on the named pipe, the first call to Copy will copy the contents.
// The time to copy that data may well be longer than it takes to generate the events.
// However, subsequent calls may copy nothing, but that does not do any harm.
io.Copy(f, p)
case notify.Remove:
// Some user or process has removed the named pipe,
// so we have nothing left to read from.
// We should inform the user and quit.
log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)
}
}
}