听起来你想在你的价值观周围建立一些状态,但不会妨碍你的价值观的消费。我建议看一些涉及
h.through
.
你可以用
fork
或
observe
并从这些值中减少一些状态:
h([1, 2, 3, 4])
.through(stream => {
const length = stream.observe()
.reduce(0, m => m + 1)
.map(length => ({ length }))
return h([stream, length])
.sequence()
})
.errors(err => console.error(err))
.each(x => console.log(x))
您可以创建一些状态,并根据源流的事件返回一个新流:
const h = require('highland')
h([1, 2, 3, 4])
.through(stream => {
let length = 0
return h(push => stream
.tap(() => ++length)
.errors(err => push(err))
.each(x => push(null, x))
.done(() => {
push(null, `length: ${length}`)
push(null, h.nil)
}))
})
.errors(err => console.error(err))
.each(x => console.log(x))