代码之家  ›  专栏  ›  技术社区  ›  P Varga

如何“组播”异步可迭代对象?

  •  0
  • P Varga  · 技术社区  · 4 年前

    能不能 async 生成器是否以某种方式广播或多播,以便其所有迭代器(“消费者”?订阅者?)接收所有值?

    考虑这个例子:

    const fetchMock = () => "Example. Imagine real fetch";
    async function* gen() {
      for (let i = 1; i <= 6; i++) {
        const res = await fetchMock();
        yield res.slice(0, 2) + i;
      }
    }
    const ait = gen();
    
    (async() => {
      // first "consumer"
      for await (const e of ait) console.log('e', e);
    })();
    (async() => {
      // second...
      for await (const é of ait) console.log('é', é);
    })();

    迭代“消耗”一个值,所以只有一个或另一个得到它。 我希望他们俩(以及后来的任何一个)都能得到 yield ed值,如果这样的生成器可以以某种方式创建。(类似于a Observable .)

    0 回复  |  直到 4 年前
        1
  •  6
  •   Bergi    4 年前

    这不容易做到。您需要明确 tee 它。这类似于 the situation for synchronous iterators ,只是稍微复杂一点:

    const AsyncIteratorProto = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype));
    function teeAsync(iterable) {
        const iterator = iterable[Symbol.asyncIterator]();
        const buffers = [[], []];
        function makeIterator(buffer, i) {
            return Object.assign(Object.create(AsyncIteratorProto), {
                next() {
                    if (!buffer) return Promise.resolve({done: true, value: undefined});
                    if (buffer.length) return buffer.shift();
                    const res = iterator.next();
                    if (buffers[i^1]) buffers[i^1].push(res);
                    return res;
                },
                async return() {
                    if (buffer) {
                        buffer = buffers[i] = null;
                        if (!buffers[i^1]) await iterator.return();
                    }
                    return {done: true, value: undefined};
                },
            });
        }
        return buffers.map(makeIterator);
    }
    

    您应该确保两个迭代器的消耗速度大致相同,这样缓冲区就不会增长得太大。

        2
  •  0
  •   surj    3 年前

    这是一个解决方案,使用 Highland 作为中介。从文档中:

    分叉到多个消费者的流将一次从其源中提取一个值,速度只有最慢的消费者能够处理的速度。

    import _ from 'lodash'
    import H from 'highland'
    
    export function fork<T>(generator: AsyncGenerator<T>): [
        AsyncGenerator<T>,
        AsyncGenerator<T>
    ] {
        const source = asyncGeneratorToHighlandStream(generator).map(x => _.cloneDeep(x));
        return [
            highlandStreamToAsyncGenerator<T>(source.fork()),
            highlandStreamToAsyncGenerator<T>(source.fork()),
        ];
    }
    
    async function* highlandStreamToAsyncGenerator<T>(
        stream: Highland.Stream<T>
    ): AsyncGenerator<T> {
        for await (const row of stream.toNodeStream({ objectMode: true })) {
            yield row as unknown as T;
        }
    }
    
    function asyncGeneratorToHighlandStream<T>(
        generator: AsyncGenerator<T>
    ): Highland.Stream<T> {
        return H(async (push, next) => {
            try {
                const result = await generator.next();
                if (result.done) return push(null, H.nil);
                push(null, result.value);
                next();
            } catch (error) {
                return push(error);
            }
        });
    }
    

    用途:

    const [copy1, copy2] = fork(source);
    

    适用于Node,浏览器未经测试。

        3
  •  -1
  •   Tom Jenkinson    2 年前

    我在这里建了一个图书馆: https://github.com/tjenkinson/forkable-iterator

    意味着您可以执行以下操作:

    import { buildForkableIterator, fork } from 'forkable-iterator';
    
    function* Source() {
      yield 1;
      yield 2;
      return 'return';
    }
    
    const forkableIterator = buildForkableIterator(Source());
    
    console.log(forkableIterator.next()); // { value: 1, done: false }
    
    const child1 = fork(forkableIterator);
    // { value: 2, done: false }
    console.log(child1.next());
    // { value: 2, done: false }
    console.log(forkableIterator.next());
    
    // { value: 'return', done: true }
    console.log(child1.next());
    // { value: 'return', done: true }
    console.log(forkableIterator.next());
    

    如果你不再需要继续从一个提供松散引用的fork中消费,那么也不应该出现内存泄漏。