代码之家  ›  专栏  ›  技术社区  ›  Konstantin Spirin

使用带类型化消息代理的RX

  •  0
  • Konstantin Spirin  · 技术社区  · 14 年前

    我有一个类似于Caliburn提供的类型化消息代理:

    public interface IMessageBroker
    {
        void Publish<T>(T message);
        IDisposable Subscribe<T>(Action<T> subscriber);
    }
    

    如何将订阅转换为iobservable?

    我想要一个扩展方法,类似于:

    public static IObservable<T> Subscribe<T>(this IMessageBroker messageBroker)
    {
        var subject = new Subject<T>();
        messageBroker.Subscribe<T>(subject.OnNext);
        return subject;
    }
    

    这个实现中的问题是我不能取消订阅,所以它泄漏了。

    订阅方法的更好名称也受到欢迎。

    2 回复  |  直到 14 年前
        1
  •  3
  •   Benjol    14 年前

    public static IObservable<T> ToObservable<T>(this IMessageBroker messageBroker)
    {
        IObservable<T> observable = Observable.CreateWithDisposable<T>(o =>
            {
                return messageBroker.Subscribe<T>(o.OnNext);
            });
        return observable;
    }
    

    var observableBroker = messageBroker.ToObservable<int>();
    var subject = new Subject<int>();
    observableBroker.Subscribe(subject.OnNext);
    
    //alternatively, there are overloads of Observerable.Subscribe which take lambdas:
    observableBroker.Subscribe(t => DoSomethingWith(t));
    
        2
  •  0
  •   Ahmed Alejo    8 年前

    Observable.Create

    public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
    {
        return Observable.Create<T>(observer => messageBroker.Subscribe<T>(observer.OnNext));
    }
    

    System.Reactive Observable.CreateWithDisposable

    Rx

    public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
    {
        return new DelegateObservable(observer => messageBroker.Subscribe<T>(observer.OnNext));
    }
    
    public class DelegateObservable<T> : IObservable<T>
    {
        private Func<IObserver<T>, IDisposable> subscriber;
    
        public DelegateObservable(Func<IObserver<T>, IDisposable> subscriber)
        {
            this.subscriber = subscriber;
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return this.subscriber(observer);
        }
    }