谢谢@martin,解决办法很明显。我不知道为什么我没有想到这一点
this._loop = new Rx.Observable<TDL.Result>(subscriber =>
{
let shouldLoop = true;
process.nextTick(() =>
{
while (shouldLoop)
{
if (!this._client)
throw new Error("This client is not initialised.");
const update = this._lib.receiveSync(this._client, 5);
if (!update)
continue;
if (update._ === "error")
this.emit("error", update);
else
this.emit("update", update);
subscriber.next(update);
}
});
// cancellation logic
return () =>
{
shouldLoop = false;
this._loop = null;
};
}).pipe(RxOp.publish()) as Rx.ConnectableObservable<TDL.Result>;