代码之家  ›  专栏  ›  技术社区  ›  ACPrice

Nodejs:如何优化编写多个文件?

  •  4
  • ACPrice  · 技术社区  · 7 年前

    我在Windows上的节点环境中工作。我的密码是30 Buffer 对象(每秒钟约500-900kb),我需要尽快将这些数据保存到文件系统中,而不需要进行任何阻止接收以下内容的工作 缓冲器 (即,目标是从 每一个 缓冲,约30-45分钟)。值得一提的是,这些数据是来自Kinect传感器的连续深度帧。

    我的问题是:在Node中编写文件最有效的方法是什么?

    以下是伪代码:

    let num = 0
    
    async function writeFile(filename, data) {
      fs.writeFileSync(filename, data)
    }
    
    // This fires 30 times/sec and runs for 30-45 min
    dataSender.on('gotData', function(data){
    
      let filename = 'file-' + num++
    
      // Do anything with data here to optimize write?
      writeFile(filename, data)
    }
    

    fs.writeFileSync 似乎比 fs.writeFile 这就是为什么我要用上面的。但是,有没有其他方法可以对数据进行操作或写入文件,从而加快每次保存的速度?

    2 回复  |  直到 7 年前
        1
  •  9
  •   jfriend00    7 年前

    首先,你永远不想使用 fs.writefileSync() 在处理实时请求时,因为这会阻塞整个节点。js事件循环,直到文件写入完成。

    好的,基于将每个数据块写入不同的文件,那么您希望允许同时进行多个磁盘写入,但不允许无限的磁盘写入。因此,使用队列仍然是合适的,但这一次队列不只是一次有一个写入进程,而是同时有一些写入进程:

    const EventEmitter = require('events');
    
    class Queue extends EventEmitter {
        constructor(basePath, baseIndex, concurrent = 5) {
            this.q = [];
            this.paused = false;
            this.inFlightCntr = 0;
            this.fileCntr = baseIndex;
            this.maxConcurrent = concurrent;
        }
    
        // add item to the queue and write (if not already writing)
        add(data) {
            this.q.push(data);
            write();
        }
    
        // write next block from the queue (if not already writing)
        write() {
            while (!paused && this.q.length && this.inFlightCntr < this.maxConcurrent) {
                this.inFlightCntr++;
                let buf = this.q.shift();
                try {
                    fs.writeFile(basePath + this.fileCntr++, buf, err => {
                        this.inFlightCntr--;
                        if (err) {
                            this.err(err);
                        } else {
                            // write more data
                            this.write();
                        }
                    });
                } catch(e) {
                    this.err(e);
                }
            }
        }
    
        err(e) {
            this.pause();
            this.emit('error', e)
        }
    
        pause() {
            this.paused = true;
        }
    
        resume() {
            this.paused = false;
            this.write();
        }
    }
    
    let q = new Queue("file-", 0, 5);
    
    // This fires 30 times/sec and runs for 30-45 min
    dataSender.on('gotData', function(data){
        q.add(data);
    }
    
    q.on('error', function(e) {
        // go some sort of write error here
        console.log(e);
    });
    

    需要考虑的事项:

    1. 实验 concurrent 传递给队列构造函数的值。从值5开始。然后看看提高这个值是否会让你的表现更好或更差。节点。js文件I/O子系统使用一个线程池来实现异步磁盘写入,因此有一个最大并发写入数,这将允许将并发数提高到很高的水平,这可能不会加快速度。

    2. 通过设置 UV_THREADPOOL_SIZE 在启动节点之前,请使用环境变量。js应用。

    3. 你最大的朋友是 磁盘写入速度 .所以,确保你有一个快速的磁盘和一个好的磁盘控制器。快速总线上的快速SSD最好。

    4. 如果可以将写操作分散到多个实际的物理磁盘上,那么很可能还会增加写吞吐量(更多的磁头在工作)。


    (在编辑之前,该问题的初始解释已更改)。

    由于您似乎需要按顺序进行磁盘写入(全部写入同一个文件),因此我建议您要么使用写入流,让流对象为您序列化和缓存数据,要么自己创建一个队列,如下所示:

    const EventEmitter = require('events');
    
    class Queue extends EventEmitter {
        // takes an already opened file handle
        constructor(fileHandle) {
            this.f = fileHandle;
            this.q = [];
            this.nowWriting = false;
            this.paused = false;
        }
    
        // add item to the queue and write (if not already writing)
        add(data) {
            this.q.push(data);
            write();
        }
    
        // write next block from the queue (if not already writing)
        write() {
            if (!nowWriting && !paused && this.q.length) {
                this.nowWriting = true;
                let buf = this.q.shift();
                fs.write(this.f, buf, (err, bytesWritten) => {
                    this.nowWriting = false;
                    if (err) {
                        this.pause();
                        this.emit('error', err);
                    } else {
                        // write next block
                        this.write();
                    }
                });
            }
        }
    
        pause() {
            this.paused = true;
        }
    
        resume() {
            this.paused = false;
            this.write();
        }
    }
    
    // pass an already opened file handle
    let q = new Queue(fileHandle);
    
    // This fires 30 times/sec and runs for 30-45 min
    dataSender.on('gotData', function(data){
        q.add(data);
    }
    
    q.on('error', function(err) {
        // got disk write error here
    });
    

    您可以使用writeStream而不是这个自定义队列类,但问题是writeStream可能已满,然后您必须有一个单独的缓冲区作为放置数据的地方。使用上述自定义队列可以同时解决这两个问题。

    其他可扩展性/性能评论

    1. 因为您似乎是在将数据串行写入同一个文件,所以磁盘写入不会从集群或并行运行多个操作中受益,因为它们基本上必须串行化。

    2. 如果你的节点。js服务器除了做这些写操作之外还有其他事情要做,创建第二个节点可能有一点好处(需要通过测试进行验证)。js进程,并在另一个进程中完成所有磁盘写入。你的主节点。js进程将接收数据,然后将其传递给子进程,该子进程将维护队列并进行写入。

    3. 你可以尝试的另一件事是合并写作。当队列中有多个项目时,可以将它们组合成一次写入。如果写操作已经相当大,这可能不会产生太大的影响,但如果写操作很小,这可能会产生很大的影响(将大量的小磁盘写操作合并到一个更大的写操作中通常效率更高)。

    4. 你最大的朋友是 磁盘写入速度 .所以,确保你有一个快速的磁盘和一个好的磁盘控制器。快速SSD最好。

        2
  •  1
  •   Petar    7 年前

    我已经编写了一个服务,它可以广泛地实现这一点,您可以做的最好的事情是将输入数据直接传输到文件(如果您也有输入流的话)。 下面是一个简单的示例,您可以通过以下方式下载文件:

    const http = require('http')
    
    const ostream = fs.createWriteStream('./output')
    http.get('http://nodejs.org/dist/index.json', (res) => {
        res.pipe(ostream)                                                                                                                                                                                              
    })
    .on('error', (e) => {
        console.error(`Got error: ${e.message}`);
    })
    

    因此,在本例中,不涉及整个文件的中间复制。当文件从远程http服务器分块读取时,它会写入磁盘上的文件。这比从服务器下载整个文件,保存在内存中,然后将其写入磁盘上的文件要高效得多。

    流是节点中许多操作的基础。所以你也应该研究这些。

    根据您的场景,您应该调查的另一件事是UV_THREADPOOL_SIZE,因为I/O操作使用默认设置为4的libuv线程池,如果您进行大量编写,您可能会填充该线程池。