代码之家  ›  专栏  ›  技术社区  ›  laptou

如何创建基于while循环的可观察对象?

  •  1
  • laptou  · 技术社区  · 7 年前

    我正在尝试创建一个Observable,它不断向外部服务查询更新,如果有新的服务,则会发出更新:

    this._loop = new Rx.Observable<TDL.Result>(subscriber =>
    {
        let shouldLoop = true;
    
        while (shouldLoop)
        {
            if (!this._client)
                throw new Error("This client is not initialised.");
    
            const update = this._lib.receiveSync(this._client, 5);
    
            if (!update)
                continue;
    
            if (update._ === "error")
                this.emit("error", update);
            else
                this.emit("update", update);
    
            subscriber.next(update);
        }
    
        // never gets here b/c of while loop, so subscribing to this Observable
        // causes everything to block
    
        // cancellation logic
        return () =>
        {
            shouldLoop = false;
            this._loop = null;
        };
    }).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;
    
    this._loopSubscription = this._loop.connect();
    

    然而,subscribe函数是阻塞的,这意味着当我调用 connect()

    1 回复  |  直到 7 年前
        1
  •  0
  •   laptou    7 年前

    谢谢@martin,解决办法很明显。我不知道为什么我没有想到这一点

    this._loop = new Rx.Observable<TDL.Result>(subscriber =>
    {
        let shouldLoop = true;
    
        process.nextTick(() =>
        {
            while (shouldLoop)
            {
                if (!this._client)
                    throw new Error("This client is not initialised.");
    
                const update = this._lib.receiveSync(this._client, 5);
    
                if (!update)
                    continue;
    
                if (update._ === "error")
                    this.emit("error", update);
                else
                    this.emit("update", update);
    
                subscriber.next(update);
            }
        });
    
        // cancellation logic
        return () =>
        {
            shouldLoop = false;
            this._loop = null;
        };
    }).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;