代码之家  ›  专栏  ›  技术社区  ›  David.Warwick

C#等待异步循环完成不工作[重复]

  •  1
  • David.Warwick  · 技术社区  · 7 年前

    在metro应用程序中,我需要执行许多WCF调用。需要进行大量的调用,因此我需要在并行循环中进行这些调用。问题是,并行循环在WCF调用全部完成之前退出。

    您将如何重构此项以按预期工作?

    var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
    var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();
    
    Parallel.ForEach(ids, async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        var cust = await repo.GetCustomer(i);
        customers.Add(cust);
    });
    
    foreach ( var customer in customers )
    {
        Console.WriteLine(customer.ID);
    }
    
    Console.ReadKey();
    
    0 回复  |  直到 9 年前
        1
  •  159
  •   bitshift    8 年前

    背后的全部想法 Parallel.ForEach() 您有一组线程和集合中的每个线程进程。正如你所注意到的,这不适用于 async - await ,在异步调用期间要释放线程的位置。

    你可以堵住 ForEach() 但这就违背了 - 等待

    你能做的就是利用 TPL Dataflow 而不是 并行.ForEach() Task 很好。

    具体来说,您的代码可以使用 TransformBlock Customer 使用 异步 ActionBlock 写每一个 顾客 设置块网络后,可以 Post() 转换块 .

    代码中:

    var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
    
    var getCustomerBlock = new TransformBlock<string, Customer>(
        async i =>
        {
            ICustomerRepo repo = new CustomerRepo();
            return await repo.GetCustomer(i);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });
    var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
    getCustomerBlock.LinkTo(
        writeCustomerBlock, new DataflowLinkOptions
        {
            PropagateCompletion = true
        });
    
    foreach (var id in ids)
        getCustomerBlock.Post(id);
    
    getCustomerBlock.Complete();
    writeCustomerBlock.Completion.Wait();
    

    尽管您可能希望限制 转换块 一些小常数。另外,你可以限制 转换块 SendAsync() ,例如,如果集合太大。

    与您的代码相比(如果它有效的话)还有一个额外的好处,那就是编写工作将在单个项目完成后立即开始,而不是等到所有的处理完成。

        2
  •  122
  •   Community Mohan Dere    9 年前

    svick's answer (像往常一样)很好。

    但是,我发现数据流在实际需要传输大量数据时更有用。或者当你需要 async -兼容队列。

    在您的情况下,一个更简单的解决方案是使用 -样式并行:

    var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
    
    var customerTasks = ids.Select(i =>
      {
        ICustomerRepo repo = new CustomerRepo();
        return repo.GetCustomer(i);
      });
    var customers = await Task.WhenAll(customerTasks);
    
    foreach (var customer in customers)
    {
      Console.WriteLine(customer.ID);
    }
    
    Console.ReadKey();
    
        3
  •  77
  •   Community Mohan Dere    9 年前

    像svick建议的那样使用数据流可能是过分的,Stephen的回答并没有提供控制操作并发性的方法。然而,这可以简单地实现:

    public static async Task RunWithMaxDegreeOfConcurrency<T>(
         int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
    {
        var activeTasks = new List<Task>(maxDegreeOfConcurrency);
        foreach (var task in collection.Select(taskFactory))
        {
            activeTasks.Add(task);
            if (activeTasks.Count == maxDegreeOfConcurrency)
            {
                await Task.WhenAny(activeTasks.ToArray());
                //observe exceptions here
                activeTasks.RemoveAll(t => t.IsCompleted); 
            }
        }
        await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
        {
            //observe exceptions in a manner consistent with the above   
        });
    }
    

    这个 ToArray() 可以通过使用数组而不是列表和替换已完成的任务来优化调用,但我怀疑这在大多数情况下会有很大的不同。根据OP的问题使用示例:

    RunWithMaxDegreeOfConcurrency(10, ids, async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        var cust = await repo.GetCustomer(i);
        customers.Add(cust);
    });
    

    编辑 SO用户和TPL wiz同事 Eli Arbel related article from Stephen Toub . 与往常一样,他的实现既优雅又高效:

    public static Task ForEachAsync<T>(
          this IEnumerable<T> source, int dop, Func<T, Task> body) 
    { 
        return Task.WhenAll( 
            from partition in Partitioner.Create(source).GetPartitions(dop) 
            select Task.Run(async delegate { 
                using (partition) 
                    while (partition.MoveNext()) 
                        await body(partition.Current).ContinueWith(t => 
                              {
                                  //observe exceptions
                              });
    
            })); 
    }
    
        4
  •  36
  •   Serge Semenov    8 年前

    你可以用新的 AsyncEnumerator NuGet Package ,这在4年前问题最初发布时并不存在。它允许您控制并行度:

    using System.Collections.Async;
    ...
    
    await ids.ParallelForEachAsync(async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        var cust = await repo.GetCustomer(i);
        customers.Add(cust);
    },
    maxDegreeOfParallelism: 10);
    

    免责声明:我是AsyncEnumerator库的作者,该库是开源的,并在麻省理工学院获得许可,我发布此消息只是为了帮助社区。

        5
  •  15
  •   Liam Joshua    7 年前

    包裹 Parallel.Foreach 变成一个 Task.Run() 而不是 await 关键字使用 [yourasyncmethod].Result

    (您需要执行任务。运行thing以不阻止UI线程)

    像这样的:

    var yourForeachTask = Task.Run(() =>
            {
                Parallel.ForEach(ids, i =>
                {
                    ICustomerRepo repo = new CustomerRepo();
                    var cust = repo.GetCustomer(i).Result;
                    customers.Add(cust);
                });
            });
    await yourForeachTask;
    
        6
  •  7
  •   John Gietzen    10 年前

    这应该是相当有效的,而且比让整个TPL数据流正常工作更容易:

    var customers = await ids.SelectAsync(async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    });
    
    ...
    
    public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
    {
        var results = new List<TResult>();
    
        var activeTasks = new HashSet<Task<TResult>>();
        foreach (var item in source)
        {
            activeTasks.Add(selector(item));
            if (activeTasks.Count >= maxDegreesOfParallelism)
            {
                var completed = await Task.WhenAny(activeTasks);
                activeTasks.Remove(completed);
                results.Add(completed.Result);
            }
        }
    
        results.AddRange(await Task.WhenAll(activeTasks));
        return results;
    }
    
        7
  •  4
  •   Teoman shipahi    9 年前

    我来晚了一点,但您可能想考虑使用GetAwaiter.GetResult()在同步上下文中运行异步代码,如下所示;

     Parallel.ForEach(ids, i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        // Run this in thread which Parallel library occupied.
        var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
        customers.Add(cust);
    });
    
        8
  •  3
  •   Jay Shah    7 年前

    一种扩展方法,它利用信号量限制并允许设置最大并行度

        /// <summary>
        /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
        /// </summary>
        /// <typeparam name="T">Type of IEnumerable</typeparam>
        /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
        /// <param name="action">an async <see cref="Action" /> to execute</param>
        /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
        /// Must be grater than 0</param>
        /// <returns>A Task representing an async operation</returns>
        /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
        public static async Task ForEachAsyncConcurrent<T>(
            this IEnumerable<T> enumerable,
            Func<T, Task> action,
            int? maxDegreeOfParallelism = null)
        {
            if (maxDegreeOfParallelism.HasValue)
            {
                using (var semaphoreSlim = new SemaphoreSlim(
                    maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
                {
                    var tasksWithThrottler = new List<Task>();
    
                    foreach (var item in enumerable)
                    {
                        // Increment the number of currently running tasks and wait if they are more than limit.
                        await semaphoreSlim.WaitAsync();
    
                        tasksWithThrottler.Add(Task.Run(async () =>
                        {
                            await action(item).ContinueWith(res =>
                            {
                                // action is completed, so decrement the number of currently running tasks
                                semaphoreSlim.Release();
                            });
                        }));
                    }
    
                    // Wait for all tasks to complete.
                    await Task.WhenAll(tasksWithThrottler.ToArray());
                }
            }
            else
            {
                await Task.WhenAll(enumerable.Select(item => action(item)));
            }
        }
    

    示例用法:

    await enumerable.ForEachAsyncConcurrent(
        async item =>
        {
            await SomeAsyncMethod(item);
        },
        5);
    
        9
  •  3
  •   BoarGules    7 年前

    const int DegreeOfParallelism = 10;
    IEnumerable<double> result = await Enumerable.Range(0, 1000000)
        .Split(DegreeOfParallelism)
        .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
        .ConfigureAwait(false);
    

    这里发生的事情是:我们将源集合分成10个块( .Split(DegreeOfParallelism) ),然后运行10个任务,每个任务逐个处理其项( .SelectManyAsync(...) )把它们合并成一个列表。

    double[] result2 = await Enumerable.Range(0, 1000000)
        .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
        .WhenAll()
        .ConfigureAwait(false);
    

    但它需要一个 预防 :如果您的源集合太大,它将计划 Task 对每一个项目立即,这可能会导致重大的性能打击。

    上述示例中使用的扩展方法如下:

    public static class CollectionExtensions
    {
        /// <summary>
        /// Splits collection into number of collections of nearly equal size.
        /// </summary>
        public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
        {
            if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
    
            List<T> source = src.ToList();
            var sourceIndex = 0;
            for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
            {
                var list = new List<T>();
                int itemsLeft = source.Count - targetIndex;
                while (slicesCount * list.Count < itemsLeft)
                {
                    list.Add(source[sourceIndex++]);
                }
    
                yield return list;
            }
        }
    
        /// <summary>
        /// Takes collection of collections, projects those in parallel and merges results.
        /// </summary>
        public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
            this IEnumerable<IEnumerable<T>> source,
            Func<T, Task<TResult>> func)
        {
            List<TResult>[] slices = await source
                .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
                .WhenAll()
                .ConfigureAwait(false);
            return slices.SelectMany(s => s);
        }
    
        /// <summary>Runs selector and awaits results.</summary>
        public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
        {
            List<TResult> result = new List<TResult>();
            foreach (TSource source1 in source)
            {
                TResult result1 = await selector(source1).ConfigureAwait(false);
                result.Add(result1);
            }
            return result;
        }
    
        /// <summary>Wraps tasks with Task.WhenAll.</summary>
        public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
        {
            return Task.WhenAll<TResult>(source);
        }
    }
    
    推荐文章