代码之家  ›  专栏  ›  技术社区  ›  Wolfspirit

使用系统订阅/收听流的“附带事件”。反应性

  •  2
  • Wolfspirit  · 技术社区  · 7 年前

    我使用Observable创建并使用事件代理的侦听器。网我创建了一个“IncomingMessage”类,其中包含来自eventbroker的消息,然后我开始在Observable中创建侦听器。创建函数。这很有效。

    现在,我还想从侦听器获取状态通知,如“连接…”,“已连接。”,“结束……”这不是IncomingMessage,所以我创建了一个类“BrokerEvent”,它有一个“Message”属性和一个“IncomingMessage”和“BrokerEvent”接口。现在我通过observer发送这两个消息。OnNext(…)当它们发生时。到目前为止,这也很有效。

    我愿意:

    GetObservable().Where(x => x is BrokerEvent ||
                            (x is IncomingMessage msg &&
                                msg.User == "test")).Subscribe(...)
    

    在尝试了一点之后,我现在终于这样做了。。。

    var observable = GetObservable().Publish();
    
    observable.OfType<BrokerEvent>().Subscribe(...);
    observable.OfType<IncomingMessage>().Where(x=>x.User == "test").Subscribe(...);
    
    var disposable = observable.Connect();
    

    这似乎也管用,但由于我是反应式扩展的新手,我不太确定这是否有任何不必要的副作用。我也不确定在流中包含状态消息是否“正确”。有没有更好的方法来处理这个问题(不使用Publish也可以)或者这是一种方法?

    2 回复  |  直到 7 年前
        1
  •  2
  •   Shlomo    7 年前

    GetObservable 返回 IObservable<object> ,这并不理想。

    执行上述代码的最佳方法如下:

    var observable = GetObservable().Publish();
    
    var sub1 = observable.OfType<BrokerEvent>().Subscribe(_ => { });
    var sub2 = observable.OfType<IncomingMessage>().Where(x => x.User == "test").Subscribe(_ => { });
    var connectSub = observable.Connect();
    var disposable = new CompositeDisposable(sub1, sub2, connectSub);
    

    在这种情况下,您可能需要创建一个 Discriminated-Union 输入你的可观察,这可能会使处理更容易。

        2
  •  0
  •   Sentinel    7 年前

    m=>processor.Process((dynamic)m)
    

    如果要在调用流程之前进行筛选,可以引入一个公共类(ProcessiableMesage或类似的类),也可以调用。在OfType streams上进行合并,或者通过使用动态MessageFilter类,采用与上述相同的方法。

    推荐文章