代码之家  ›  专栏  ›  技术社区  ›  Tomasz Jaskuλa

TPL数据流无法处理ActionBlock中的异常

  •  2
  • Tomasz Jaskuλa  · 技术社区  · 7 年前

    我正试图从一个 ActionBlock<int> 对于多个消费者 . 这很好,但是如果其中一个目标块引发异常,则这似乎不会传播到源块。在这里,我试图处理异常,但它从未转到 catch

    static void Main(string[] args)
    {
        var t1 = new ActionBlock<int>(async i =>
        {
            await Task.Delay(2000);
            Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
    
        var t2 = new ActionBlock<int>(async i =>
        {
            await Task.Delay(1000);
            Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
    
        var t3 = new ActionBlock<int>(async i =>
        {
            await Task.Delay(100);
            Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
            if (i > 5)
                throw new Exception("Too big number");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
    
        var targets = new [] { t1, t2, t3};
    
        var broadcaster = new ActionBlock<int>(
            async item =>
            {
                var processingTasks = targets.Select(async t =>
                {
                    try
                    {
                        await t.SendAsync(item);
                    }
                    catch
                    {
                        Trace.TraceInformation("handled in select"); // never goes here
                    }
                });
    
                try
                {
                    await Task.WhenAll(processingTasks);
                }
                catch
                {
                    Trace.TraceInformation("handled"); // never goes here
                }
            });
    
        for (var i = 1; i <= 10; i++)
            broadcaster.Post(i);
    }
    

    我不确定我在这里遗漏了什么,但我希望能够检索异常以及哪个目标块出现故障。

    1 回复  |  直到 7 年前
        1
  •  2
  •   JSteward    7 年前

    如果块进入故障状态,它将不再接受新项目,并且 Exception 它将连接到其 Completion 例外 你可以 await

    var processingTasks = targets.Select(async t =>
    {
        try
        {
            if(!await t.SendAsync(item))
                await t.Completion;
        }
        catch
        {
            Trace.TraceInformation("handled in select"); // never goes here
        }
    });