代码之家  ›  专栏  ›  技术社区  ›  juan garcia

高地在完全被消耗之前添加到溪流中?

  •  0
  • juan garcia  · 技术社区  · 6 年前

    有没有一种方法可以将一条结束的信息添加到高地的溪流中,并对我们的需求进行全面描述?

    少假设我们有以下情况:

    const stream = high([1,4,6,7])
    

    然后使用这个流,我想计算每个正在处理的值,并说

    sink.drain(stream.pipe(4))
    

    是数组的元素数。考虑到这可能是一个流中数千个对象的发送,我需要从该流中消费,以便能够计数。

    我不能说array.length,因为它是一个可以包含任何信息的源,并且该信息正在用流处理…如何向流中添加一个消息结尾,并对所消费的内容进行描述?

    1 回复  |  直到 6 年前
        1
  •  1
  •   amsross    6 年前

    听起来你想在你的价值观周围建立一些状态,但不会妨碍你的价值观的消费。我建议看一些涉及 h.through .

    你可以用 fork observe 并从这些值中减少一些状态:

    h([1, 2, 3, 4])
      .through(stream => {
        const length = stream.observe()
          .reduce(0, m => m + 1)
          .map(length => ({ length }))
    
        return h([stream, length])
          .sequence()
      })
      .errors(err => console.error(err))
      .each(x => console.log(x))
    

    您可以创建一些状态,并根据源流的事件返回一个新流:

    const h = require('highland')
    h([1, 2, 3, 4])
      .through(stream => {
        let length = 0
    
        return h(push => stream
          .tap(() => ++length)
          .errors(err => push(err))
          .each(x => push(null, x))
          .done(() => {
            push(null, `length: ${length}`)
            push(null, h.nil)
          }))
      })
      .errors(err => console.error(err))
      .each(x => console.log(x))
    
    推荐文章