代码之家  ›  专栏  ›  技术社区  ›  Homde

是否更改接收操作员的间隔?

  •  6
  • Homde  · 技术社区  · 15 年前

    这可能是个愚蠢的问题,因为我对RX有点陌生。)

    我正在采样一个事件(Rx for.NET 4.0):

    eventsobservable.sample(timespan.fromseconds(1)).timestamp().subscribe(x=>console.writeline(“测试:”+x.value.eventargs.str));

    问题是采样时间需要能够即时更改,我想我可以创建一些属性来删除现有的处理程序,并在其更改时创建一个新的处理程序,但它看起来有点混乱,更容易受到时间问题的影响。有没有办法简单地改变时间间隔?

    例如:假设某人正在编写一个字符串,当检测到某个序列时,您希望在不丢失事件的情况下更改采样时间,最好是不获取一个以上的事件。

    4 回复  |  直到 10 年前
        1
  •  8
  •   Jon Skeet    15 年前

    我不知道如何改变现有的采样间隔,但是你 能够 do是以您需要的最高频率采样,然后用 Where 使用变量的子句 可以 改变。

    例如:

    static IObservable<T> SampleEvery<T>(this IObservable<T> source,
        Func<int> multipleProvider)
    {
        int counter = 0;
        Func<T, bool> predicate = ignored => {
            counter++;
            if (counter >= multipleProvider())
            {
                counter = 0;
            }
            return counter == 0;
        };
        return source.Where(predicate);
    }
    

    你可以这样称呼它:

    // Keep this somewhere you can change it
    int multiple = 1;
    
    eventAsObservable.Sample(TimeSpan.FromSeconds(1))
                     .SampleEvery(() => multiple)
                     .Timestamp()
                     .Subscribe(x => Console.WriteLine("testing:" + 
                                                       x.Value.EventArgs.str));
    

    现在,更改 multiple 将改变有效采样频率。

    这是一个相当丑陋的黑客,但我认为它应该起作用。

        2
  •  6
  •   Richard Szalay    15 年前

    我知道这个问题已经被回答了,但我想我还可以增加一些方法来解决它。

    你可以使用 Switch 在一系列 TimeSpan S:

    private Subject<TimeSpan> sampleFrequencies = new Subject<TimeSpan>();
    
    sampleFrequencies
        .Select(x => eventAsObservable.Sample(Observable.Interval(x)).Timestamp())
        .Switch()
        .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));
    
    // To change:
    // sampleFrequencies.OnNext(TimeSpan.FromSeconds(5));
    

    或者,也可以使用 Defer , TakeUntil Repeat (这一个有点疯狂,作为一个思想练习包括在内):

    private TimeSpan sampleFrequency = TiemSpan.FromSeconds(2);
    private Subject<Unit> frequencyChanged = new Subject<Unit>();
    
    (Observable
        .Defer(() => eventAsObservable
           .Sample(Observable.Interval(sampleFrequency)
        )
        .Timestamp()
        .TakeUntil(frequencyChanged)
    ).Repeat()
    .Subscribe(x => .WriteLine("testing:" + x.Value.EventArgs.str));
    
    // To change: 
    // sampleFrequency = TimeSpan.FromSeconds(5);
    // frequencyChanged.OnNext(new Unit());
    
        3
  •  2
  •   Igor Dvorkin    10 年前

    DR: 使用ObservableFromIntervalFunctor创建一个Observable,如下所示:

    void Main()
    {
        // Pick an initial period, it can be changed later.
        var intervalPeriod = TimeSpan.FromSeconds(1);
    
        // Create an observable using a functor that captures the interval period.
        var o = ObservableFromIntervalFunctor(() => intervalPeriod);
    
        // Log every value so we can visualize the observable.
        o.Subscribe(Console.WriteLine);
    
        // Sleep for a while so you can observe the observable.
        Thread.Sleep(TimeSpan.FromSeconds(5.0));
    
        // Changing the interval period will takes effect on next tick.
        intervalPeriod = TimeSpan.FromSeconds(0.3);
    
    }
    
    IObservable<long> ObservableFromIntervalFunctor(Func<TimeSpan> intervalPeriodFunctor)
    {
        return Observable.Generate(0L, s => true, s => s + 1, s => s, s => intervalPeriodFunctor());
    }
    

    说明: generate有一个重载,允许您指定通过函数生成下一个值的时间。通过传递已捕获TimeSpan变量的函数,可以通过更改捕获的TimeSpan变量来更改observable.interval周期。

    Linqpad片段 here

        4
  •  0
  •   Ana Betts    15 年前

    你为什么不订阅两次呢?

    Observable.Merge(
        eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Timestamp().SelectMany(x => doLocalLookup(x)),
        eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Timestamp().SelectMany(x => doRemoteLookup(x)),
    ).Subscribe(Console.WriteLine);
    

    或者,如果搜索仅基于某种前缀或限定符(如google chrome的'?)才处于活动状态。操作员:

    Observable.Merge(
        eventAsObservable.Sample(TimeSpan.FromSeconds(1)).Where(x => isLocal(x)).SelectMany(x => doLocalLookup(x)),
        eventAsObservable.Sample(TimeSpan.FromSeconds(10)).Where(x => isARemoteQuery(x).SelectMany(x => doRemoteLookup(x)),
    ).Subscribe(Console.WriteLine);