代码之家  ›  专栏  ›  技术社区  ›  David Pfeffer

如果在超时内没有下一个(…)调用,则重试(重新订阅)到源Observable

  •  0
  • David Pfeffer  · 技术社区  · 6 年前

    我试图获取一个rxjs source observate,表示一个推送我数据的网络连接,如果在超时时间内没有收到数据,则重新连接(通过重新订阅source observate)。我当然可以用一种有点老套的方式来写这篇文章,但是有没有一种用rxjs简洁地写这篇文章的好方法呢?

    1 回复  |  直到 6 年前
        1
  •  0
  •   David Pfeffer    6 年前

    我最终写了一个接线员。我认为有更好的方法可以做到这一点,但鉴于其他人也不知道,下面是我写的可管道运算符:

    import { Observable, Subscription } from "rxjs";
    
    export function retryAfterTimeout<T>(timeout: number, allowCompletion = false): (obs: Observable<T>) => Observable<T> {
        return source => new Observable<T>(observer => {
            let sub: Subscription | undefined;
            let timer: number | undefined;
    
            function resetTimer() {
                if (timer) clearTimeout(timer);
                timer = window.setTimeout(() => resub(), timeout);
            }
    
            function resub() {
                if (sub) sub.unsubscribe();
                sub = source.subscribe({
                    next(x) {
                        resetTimer();
                        observer.next(x);
                    },
                    error(err) {
                        observer.error(err);
                    },
                    complete() {
                        if (allowCompletion)
                            observer.complete();
                        else
                            resub();
                    }
                });
            }
    
            resub();
            resetTimer();
    
            return () => {
                if (sub) sub.unsubscribe();
                if (timer) window.clearTimeout(timer);
            };
        });
    }