代码之家  ›  专栏  ›  技术社区  ›  Avius

如何在异步调用后管道化流而不丢失数据?

  •  1
  • Avius  · 技术社区  · 6 年前

    在我的应用程序中,我希望能够执行以下步骤:

    1. 获取读取流;
    2. 等待异步函数完成;
    3. 将水流输送至目的地1;
    4. 等待另一个异步函数完成;
    5. 将目的地1连接到目的地2。

    我期望以下内容:

    1. 流处理仅在步骤5之后开始
    2. 数据未丢失
    3. 当流处理结束时,整个逻辑完全解决。( .on("finish") )

    在提出任何问题之前,下面是一个代码示例:

    return new Promise(resolve => {
        logger.debug("Creating a stream");
        const stream = fs.createReadStream("/home/username/dev/resources/ex.tar.bz2");
    
        setTimeout(() => {
            logger.debug("Attaching pipe 1");
            const pipe1 = stream.pipe(
                through(
                    function(data) {
                        logger.info("DATA in PIPE 1");
                        this.queue(data);
                    },
                    function() {
                        logger.info("END in PIPE 1");
                        this.queue(null);
                    }
                )
            );
    
            stream.pause(); // LINE 1
    
            setTimeout(() => {
                logger.debug("Attaching pipe 2");
                const pipe2 = pipe1.pipe(
                    through(
                        function() {
                            logger.info("DATA in PIPE 2");
                        },
                        function() {
                            logger.info("END in PIPE 2");
                            resolve();
                        }
                    )
                )
    
                pipe2.resume(); // LINE 2
            }, 1000);
        }, 1000);
    });
    

    在此代码中,如果删除了第1行和第2行,代码将不起作用(打印 管道1中的数据 管端1 ,从不解决),因为:

    • 附加目的地1启动数据流;
    • 如果我理解正确,在附加目的地2时,数据已被消耗。

    如果第1行和第2行都存在,则代码 出现 工作(印刷) 管道1中的数据 , 管道2中的数据 , 管端1 , 管端2 并解决)因为:

    • 第1行停止数据流 stream ;
    • 附加目的地2(有些混乱)不会从原始源开始流;
    • 第2行启动数据流。

    根据NODEJS文件:

    如果存在管道目的地,那么调用stream.pause()将不能保证在这些目的地耗尽并请求更多数据后,流将保持暂停状态。

    这就引出了我的主要问题: 可以吗 可靠地 以我尝试的方式实现这个(在管道之间使用异步调用)?

    奖金问题:

    1. 我想使用管道的正确方法可能是确保在同时构建整个管道之前完成所有必需的异步调用。 我的猜测正确吗?
    2. 为什么附加目的地2不触发流,而附加目的地1触发流?
    3. 如果我把2号线换成 pipe1.resume() stream.resume() 代码也同样有效。我想这可以延伸到无限多的管道上。 为什么我可以通过拨打 .resume() 任何 那些管道?这个简历和管道连接时应该出现的简历有什么不同(显然这两种方式不一样)?
    1 回复  |  直到 6 年前
        1
  •  2
  •   Elliot Nelson    6 年前

    您正在体验 节点流变量 属于 Heisenberg's uncertainty principle -观察水流的行为改变了水流的行为。

    在执行任何其他操作之前,请删除 through 流(虽然非常简单,但这本身会影响行为)。让我们使用内置的 Passthrough 溪流,我们知道没有副作用:

    logger.debug("Attaching pipe 1");
    const pipe1 = new PassThrough();
    stream.pipe(pipe1);
    pipe1.on('data', data => logger.info('DATA in PIPE 1')); 
    pipe1.on('end', () => logger.info('END in PIPE 1')); 
    
    
    // ...
    
    logger.debug("Attaching pipe 2");
    const pipe2 = new PassThrough();
    pipe1.pipe(pipe2);
    pipe2.on('data', data => logger.info('DATA in PIPE 2')); 
    pipe2.on('end', () => {
        logger.info('END in PIPE 2');
        resolve();
    }); 
    

    输出:

    Creating a stream
    Attaching pipe 1
    DATA in PIPE 1
    END in PIPE 1
    Attaching pipe 2
    END in PIPE 2
    

    因此,如果没有停顿/恢复语句,这是有效的(它不应该永远挂起,我不确定您为什么看到这种行为);但是 没有数据 在PIPE2中。它当然不会等待或缓冲任何东西。

    问题是通过附加 on('data') 处理程序(即 通过 同样如此),您正在通知流它有一种使用数据的方法——它不需要缓冲任何东西。当我们将管道添加到 pipe2 立即开始管道-管道中没有数据了,因为我们已经消耗了数据。

    尝试 述评 这个 data 处理程序 pipe1 :

    //pipe1.on('data', data => logger.info('DATA in PIPE 1'));
    

    现在我们得到了我们所期望的:

    Creating a stream
    Attaching pipe 1
    Attaching pipe 2
    DATA in PIPE 2
    END in PIPE 1
    END in PIPE 2
    

    现在,当我们创建读取流时,它立即开始读取(进入缓冲区);我们附加 PIPE1 立即开始将数据(导入 PIPE1 的内部缓冲);然后我们附加 PIPE2 立即开始将数据(导入 PIPE2 的内部缓冲区)。您可以无限期地继续这一过程,最终通过管道进入写入流并将数据泵送到磁盘或HTTP响应等。