我需要RX中滞后滤波器的功能。只有当先前发出的值和当前输入值相差一定量时,它才应该从源流发出值。作为一种通用扩展方法,它可能具有以下特征:
public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T, T, bool> filter)
我不知道如何在现有运营商中实现这一点。我在找类似的东西
lift
从RxJava,任何其他创建我自己的运算符的方法。我见过这个
checklist
,但我在网上没有找到任何例子。
以下方法(实际上都是相同的)对我来说似乎是可行的,但还有其他方法吗
接收方式
要做到这一点,就像不包装
subject
或者实际实现一个操作符?
async Task Main()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var rnd = new Random();
var s = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5)
.Publish()
.AutoConnect()
;
s.Subscribe(Console.WriteLine, cts.Token);
s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token);
s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token);
await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
}
public static class ReactiveOperators
{
public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
return new InternalHysteresisFilter<T>(source, filter).AsObservable;
}
public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
var subject = new Subject<T>();
T lastEmitted = default;
bool emitted = false;
source.Subscribe(
value =>
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
, ex => subject.OnError(ex)
, () => subject.OnCompleted()
);
return subject;
}
private class InternalHysteresisFilter<T>: IObserver<T>
{
Func<T, T, bool> filter;
T lastEmitted;
bool emitted;
private readonly Subject<T> subject = new Subject<T>();
public IObservable<T> AsObservable => subject;
public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter)
{
this.filter = filter;
source.Subscribe(this);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
public void OnNext(T value)
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
public void OnError(Exception error)
{
subject.OnError(error);
}
public void OnCompleted()
{
subject.OnCompleted();
}
}
}
旁注:将有数千个这样的过滤器应用于尽可能多的流。我需要延迟的吞吐量,因此我正在寻找CPU和内存开销都最小的解决方案,即使其他人看起来更喜欢。