代码之家  ›  专栏  ›  技术社区  ›  ZorgoZ

如何在rx中实现我自己的操作员。网

  •  0
  • ZorgoZ  · 技术社区  · 5 年前

    我需要RX中滞后滤波器的功能。只有当先前发出的值和当前输入值相差一定量时,它才应该从源流发出值。作为一种通用扩展方法,它可能具有以下特征:

    public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T/*previously emitted*/, T/*current*/, bool> filter)
    

    我不知道如何在现有运营商中实现这一点。我在找类似的东西 lift 从RxJava,任何其他创建我自己的运算符的方法。我见过这个 checklist ,但我在网上没有找到任何例子。

    以下方法(实际上都是相同的)对我来说似乎是可行的,但还有其他方法吗 接收方式 要做到这一点,就像不包装 subject 或者实际实现一个操作符?

    async Task Main()
    {
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
    
        var rnd = new Random();
        var s = Observable.Interval(TimeSpan.FromMilliseconds(10))
                .Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5)
                .Publish()
                .AutoConnect()
                ;
    
        s.Subscribe(Console.WriteLine, cts.Token);
    
        s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token);
        s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token);
    
        await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
    }
    
    public static class ReactiveOperators
    {
        public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter)
        {
            return new InternalHysteresisFilter<T>(source, filter).AsObservable; 
        }
    
        public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter)
        {
            var subject = new Subject<T>();
            T lastEmitted = default;
            bool emitted = false;
    
            source.Subscribe(
                value =>
                {
                    if (!emitted || filter(lastEmitted, value))
                    {
                        subject.OnNext(value);
                        lastEmitted = value;
                        emitted = true;
                    }
                } 
                , ex => subject.OnError(ex)
                , () => subject.OnCompleted()
            );
    
            return subject;
        }
    
        private class InternalHysteresisFilter<T>: IObserver<T>
        {
            Func<T, T, bool> filter;
            T lastEmitted;
            bool emitted;
    
            private readonly Subject<T> subject = new Subject<T>();
    
            public IObservable<T> AsObservable => subject;
    
            public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter)
            {
                this.filter = filter;
                source.Subscribe(this);
            }
    
            public IDisposable Subscribe(IObserver<T> observer)
            {
                return subject.Subscribe(observer);
            }
    
            public void OnNext(T value)
            {
                if (!emitted || filter(lastEmitted, value))
                {
                    subject.OnNext(value);
                    lastEmitted = value;
                    emitted = true;
                }
            }
    
            public void OnError(Exception error)
            {
                subject.OnError(error);
            }
    
            public void OnCompleted()
            {
                subject.OnCompleted();
            }
        }
    }
    

    旁注:将有数千个这样的过滤器应用于尽可能多的流。我需要延迟的吞吐量,因此我正在寻找CPU和内存开销都最小的解决方案,即使其他人看起来更喜欢。

    0 回复  |  直到 5 年前
        1
  •  1
  •   Theodor Zoulias    5 年前

    我在书中看到的大多数例子 Introduction to Rx 我们正在使用这种方法 Observable.Create 用于创建新的操作员。

    这个 创造 工厂方法是实现定制可观察序列的首选方法。受试者的使用应该主要停留在样本和测试领域。( citation )

    public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source,
        Func<T, T, bool> predicate)
    {
        return Observable.Create<T>(observer =>
        {
            T lastEmitted = default;
            bool emitted = false;
            return source.Subscribe(value =>
            {
                if (!emitted || predicate(lastEmitted, value))
                {
                    observer.OnNext(value);
                    lastEmitted = value;
                    emitted = true;
                }
            }, observer.OnError, observer.OnCompleted);
        });
    }
    
        2
  •  1
  •   Shlomo    5 年前

    这个答案与@Theodor的答案相同,但它避免使用 Observable.Create ,这是我通常会避免的。

    public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source,
        Func<T, T, bool> predicate)
    {
        return source
            .Scan((emitted: default(T), isFirstItem: true, emit: false), (state, newItem) => state.isFirstItem || predicate(state.emitted, newItem)
                ? (newItem, false, true)
                : (state.emitted, false, false)
            )
            .Where(t => t.emit)
            .Select(t => t.emitted);
    }
    

    .Scan 是在可观察范围内跟踪项目状态时要使用的。

    推荐文章