代码之家  ›  专栏  ›  技术社区  ›  Alon

创建在总线上包装订阅和发布的异步方法

  •  3
  • Alon  · 技术社区  · 11 年前

    我有一种实现此接口的总线:

    public interface IBus
    {
        void Publish<T>(T t);
        void Subscribe<T>(Guid subscriptionId, Action<T> action);
        void Unsubscribe<T>(Guid subscriptionId);
    }
    

    下面是我如何使用它的示例:

    public void PrintName()
    {
        IBus bus = new Bus();
        var id = Guid.NewGuid();
        bus.Subscribe<ReplyUserName>(id, replyUserName =>
        {
            bus.Unsubscribe<ReplyUserName>(id);
            Console.WriteLine(replyUserName.UserName);
        });
    
        Bus.Publish(new RequestUserName());
    }
    

    下面是RequestUserName和ReplyUserName类:

    public class RequestUserName {}
    
    public class ReplyUserName
    {
        public string UserName { get; set; }
    }
    

    然而,我想写一个扩展方法,用异步来包装它:

    public static class BusExtension
    {
        public static async Task<TResult> Request<TRequest, TResult>(this IBus bus, TRequest request)
        {
            // TODO...
        }
    }
    

    因此,我将能够以这样的方式编写前面的代码:

    public async void PrintName()
    {
        IBus bus = new Bus();
        var replyUserName = await bus.Request<RequestUserName, ReplyUserName>(new RequestUserName());
        Console.WriteLine(replyUserName.UserName);
    }
    

    我应该写什么来代替TODO?

    2 回复  |  直到 11 年前
        1
  •  6
  •   Stephen Cleary    11 年前

    您可以使用 TaskCompletionSource<T> wrap anything 进入 await -兼容方法。

    public static Task<TResult> Request<TRequest, TResult>(this IBus bus, TRequest request)
    {
      var tcs = new TaskCompletionSource<TResult>();
      var id = Guid.NewGuid();
      bus.Subscribe<TResult>(id, result =>
      {
        bus.Unsubscribe<TResult>(id);
        tcs.TrySetResult(result);
      });
      bus.Publish(request);
      return tcs.Task;
    }
    

    然而,请注意,您应该 ensure that the task is completed 。如果总线有任何可能不响应请求,您应该有一个计时器或使 TaskCompletionSource .

        2
  •  5
  •   Theon    10 年前

    您可以按如下方式实现:

    var taskCompletionSource = new TaskCompletionSource<TResult>();
    bus.Subscribe<TResult>(id, result =>
    {
        bus.Unsubscribe<TResult>(id);
        taskCompletionSource.SetResult(result);
    });
    bus.Publish(request);
    return taskCompletionSource.Task;
    

    您可能还想查看Reactive Extensions(Rx),因为IBus接口看起来与ISubject接口类似( http://msdn.microsoft.com/en-us/library/hh211669.aspx ). Reactive Extensions库已经提供了与您尝试实现的扩展方法类似的方便扩展方法。