代码之家  ›  专栏  ›  技术社区  ›  Benjamin.E

从命名管道连续读取

  •  0
  • Benjamin.E  · 技术社区  · 8 年前

    我想知道我还有什么其他选择,以便使用golang连续读取命名管道。我当前的代码依赖于在gorutine中运行的无限for循环;但hat使一个CPU保持100%的使用率。

    func main() {
    ....
    
    var wg sync.WaitGroup
    fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
    defer fpipe.Close()
    
    f, _ := os.Create("dump.txt")
    defer f.Close()
    var buff bytes.Buffer
    
    wg.Add(1)
    go func() {
            for {
              io.Copy(&buff, fpipe)
              if buff.Len() > 0 {
                  buff.WriteTo(f)
               }
             }
        }()
    
        wg.Wait()
    }
    
    3 回复  |  直到 8 年前
        1
  •  3
  •   Markus W Mahlberg    8 年前

    如前所述,如果没有剩余写入程序,则命名管道读取器将收到EOF。

    然而,我发现@JimB的解决方案不是最优的:

    1. 如果重新启动,平均会丢失50ms的数据。再说一次,没有什么好理由。
    2. io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) 会是更好的解决方案,伊姆霍。但这甚至没有必要,因为 io.Copy

    我的方法

    github.com/rjeczalik/notify 可以用于访问我们感兴趣的事件,因为写入事件在大多数重要操作系统上跨平台工作。我们感兴趣的另一个事件是删除命名管道,因为我们将没有任何可读取的内容。

    因此,我的解决方案是:

    package main
    
    import (
        "flag"
        "io"
        "log"
        "os"
    
        "github.com/rjeczalik/notify"
    )
    
    const (
        MAX_CONCURRENT_WRITERS = 5
    )
    
    var (
        pipePath string
        filePath string
    )
    
    func init() {
        flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
        flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
        log.SetOutput(os.Stderr)
    }
    
    func main() {
        flag.Parse()
    
        var p, f *os.File
        var err error
        var e notify.EventInfo
    
        // The usual stuff: checking wether the named pipe exists etc
        if p, err = os.Open(pipePath); os.IsNotExist(err) {
            log.Fatalf("Named pipe '%s' does not exist", pipePath)
        } else if os.IsPermission(err) {
            log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
        } else if err != nil {
            log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
        }
        // Yep, there and readable. Close the file handle on exit
        defer p.Close()
    
        // Do the same for the output file
        if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
            log.Fatalf("File '%s' does not exist", filePath)
        } else if os.IsPermission(err) {
            log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
        } else if err != nil {
            log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
        }
        // Again, close the filehandle on exit
        defer f.Close()
    
        // Here is where it happens. We create a buffered channel for events which might happen
        // on the file. The reason why we make it buffered to the number of expected concurrent writers
        // is that if all writers would (theoretically) write at once or at least pretty close
        // to each other, it might happen that we loose event. This is due to the underlying implementations
        // not because of go.
        c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)
    
        // Here we tell notify to watch out named pipe for events, Write and Remove events
        // specifically. We watch for remove events, too, as this removes the file handle we
        // read from, making reads impossible
        notify.Watch(pipePath, c, notify.Write|notify.Remove)
    
        // We start an infinite loop...
        for {
            // ...waiting for an event to be passed.
            e = <-c
    
            switch e.Event() {
    
            case notify.Write:
                // If it a a write event, we copy the content of the named pipe to
                // our output file and wait for the next event to happen.
                // Note that this is idempotent: Even if we have huge writes by multiple
                // writers on the named pipe, the first call to Copy will copy the contents.
                // The time to copy that data may well be longer than it takes to generate the events.
                // However, subsequent calls may copy nothing, but that does not do any harm.
                io.Copy(f, p)
    
            case notify.Remove:
                // Some user or process has removed the named pipe,
                // so we have nothing left to read from.
                // We should inform the user and quit.
                log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)
            }
        }
    }
    
        2
  •  2
  •   JimB    8 年前

    当没有写入程序时,命名管道读取器将收到EOF。此代码之外的解决方案是确保始终有一个writer进程保存文件描述符,尽管它不需要写任何东西。

    io.Reader

    for {
        err := io.Copy(&buff, fpipe)
        if buff.Len() > 0 {
            buff.WriteTo(f)
        }
    
        if err != nil {
            // something other than EOF happened
            return
        }
    
        time.Sleep(100 * time.Millisecond)
    }
    
        3
  •  0
  •   user2679859    6 年前

    问题:当“最后一个编写器”关闭管道时,即使稍后可能会出现新的编写器,也会得到EOF。

    解决方案:自己打开管道进行写入,不要关闭。现在,您可以将读取端视为永无止境的读取,而不会获得EOF。将以下内容直接放在打开管道阅读的位置之后:

    nullWriter, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
    if err != nil {
      logger.Crit("Error opening pipe for (placeholder) write", "err", err)
    }
    defer nullWriter.Close()