文章翻译整理自 Nikola Malovic 两篇博文:

当 Task.WhenAll 遇见 Task.WhenAny

在 TPL (Task Parallel Library) 中,有两种通过非阻塞方式等待 Task 数组任务结束的方式:Task.WhenAllTask.WhenAny

它们的工作方式是:

  • WhenAll 当每项任务都完成时为完成。
  • WhenAny 当任意项任务完成时为完成。

现在我们需要一项功能,完成 Task 数组中的所有任务,并且当有任务完成时汇报状态。

我们称这个扩展方法为:Task.WhileAll

扩展方法实现

     public static class TaskExtensions
{
public static async Task<IList<T>> WhileAll<T>(this IList<Task<T>> tasks, IProgress<T> progress)
{
var result = new List<T>(tasks.Count);
var done = new List<Task<T>>(tasks); while (done.Count > )
{
await Task.WhenAny(tasks); var spinning = new List<Task<T>>(done.Count - );
for (int i = ; i < done.Count; i++)
{
if (done[i].IsCompleted)
{
result.Add(done[i].Result);
progress.Report(done[i].Result);
}
else
{
spinning.Add(done[i]);
}
} done = spinning;
} return result;
}
}

代码实现很简单:

  • 其是 IList<Task<T>> 的一个 async 扩展方法
  • 方法返回完整的 IList<T> 结果
  • 方法会接受一个 IProgress<T> 类型的参数,用于向订阅者发布 Task 完成信息
  • 在方法体内,我们使用一个循环来检测,直到所有 Task 完成
  • 通过使用 Task.WhenAny 来异步等待 Task 完成

单元测试

     [TestClass]
public class UnitTest1
{
[TestMethod]
public async Task TestTaskExtensionsWhileAll()
{
var task1 = Task.Run(() => );
var task2 = Task.Run(() => );
var tasks = new List<Task<int>>() { task1, task2 }; List<int> result = new List<int>();
var listener = new Progress<int>(
taskResult =>
{
result.Add(taskResult);
}); var actual = await tasks.WhileAll(listener);
Thread.Sleep(); // wait a bit for progress reports to complete Assert.AreEqual(, result.Count);
Assert.IsTrue(result.Contains());
Assert.IsTrue(result.Contains()); Assert.AreEqual(, actual.Count);
Assert.IsTrue(actual.Contains());
Assert.IsTrue(actual.Contains());
}
}

同样,测试代码也不复杂:

  • 创建两个哑元 Task,并存到数组中
  • 定义进度侦听器 Progress<T>,来监测每个任务运行的结果
  • 通过 await 方式来调用方法
  • 使用 Thread.Sleep 来等待 50ms ,以便 Progress 可以来得及处理结果
  • 检查所有 Task 执行完毕后均已上报 Progress
  • 检查所有 Task 均已执行完毕

我知道每当使用 Thread.Sleep 时绝不是件好事,所以我决定摆脱它。

实现IProgressAsync<T>

问题实际上是因为 IProgress<T> 接口定义的是 void 委托,因此无法使用 await 进行等待。

因此我决定定义一个新的接口,使用同样的 Report 行为,但会返回 Task ,用以实现真正的异步。

     public interface IProgressAsync<in T>
{
Task ReportAsync(T value);
}

有了异步版本的支持,将使订阅者更容易处理 await 调用。当然也可以使用 async void 来达成,但我认为 async void 总会延伸出更差的设计。所以,我还是选择通过定义 Task 返回值签名的接口来达成这一功能。

如下为接口实现:

     public class ProgressAsync<T> : IProgressAsync<T>
{
private readonly Func<T, Task> handler; public ProgressAsync(Func<T, Task> handler)
{
this.handler = handler;
} public async Task ReportAsync(T value)
{
await this.handler.InvokeAsync(value);
}
}

显然也没什么特别的:

  • 使用 Func<T, Task> 来代替 Action<T>,以便可以使用 await
  • ReportAsync 通过使用 await 方式来提供 Task

有了这些之后,我们来更新扩展方法:

     public static class TaskExtensions
{
public static async Task<IList<T>> WhileAll<T>(this IList<Task<T>> tasks, IProgressAsync<T> progress)
{
var result = new List<T>(tasks.Count);
var remainingTasks = new List<Task<T>>(tasks); while (remainingTasks.Count > )
{
await Task.WhenAny(tasks);
var stillRemainingTasks = new List<Task<T>>(remainingTasks.Count - );
for (int i = ; i < remainingTasks.Count; i++)
{
if (remainingTasks[i].IsCompleted)
{
result.Add(remainingTasks[i].Result);
await progress.ReportAsync(remainingTasks[i].Result);
}
else
{
stillRemainingTasks.Add(remainingTasks[i]);
}
} remainingTasks = stillRemainingTasks;
} return result;
} public static Task InvokeAsync<T>(this Func<T, Task> task, T value)
{
return Task<Task>.Factory.FromAsync(task.BeginInvoke, task.EndInvoke, value, null);
}
}

所有都就绪后,我们就可以将 Thread.Sleep 从单元测试中移除了。

     [TestClass]
public class UnitTest1
{
private List<int> result = new List<int>();
private async Task OnProgressAsync(int arg)
{
result.Add(arg);
} [TestMethod]
public async Task TestTaskExtensionsWhileAll()
{
var task1 = Task.Run(() => );
var task2 = Task.Run(() => );
var tasks = new List<Task<int>>() { task1, task2 }; var listener = new ProgressAsync<int>(this.OnProgressAsync);
var actual = await tasks.WhileAll(listener); Assert.AreEqual(, this.result.Count);
Assert.IsTrue(this.result.Contains());
Assert.IsTrue(this.result.Contains()); Assert.AreEqual(, actual.Count);
Assert.IsTrue(actual.Contains());
Assert.IsTrue(actual.Contains());
}
}

最新文章

  1. GO语言练习:channel select 超时机制
  2. WinJs项目介绍
  3. Python学习总结17:exec和eval执行求值字符串
  4. ubuntu下卸载vmware
  5. MongoDB(2):入门
  6. 【转】iOS中流(Stream)的使用
  7. IList, ICollection ,IEnumerable AND IEnumerator in C#
  8. Android Api 检查參数状态Api
  9. 管道设计CAD系统中重量重心计算
  10. 零基础学Python--------第7章 面向对象程序设计
  11. MySQL中myisam与innodb的区别
  12. windows安装tomcat
  13. Homebrew&amp;Mongod
  14. 在Eclipse中安装python插件的方法
  15. kafka 数据一致性-leader,follower机制与zookeeper的区别;
  16. Codeforces.997C.Sky Full of Stars(容斥 计数)
  17. Javascript常见性能优化
  18. Latex 箭头、下标、符号上下写文字、正方形和三角形
  19. 如果从excel表中导出insert-sql
  20. vue+element 页面输入框--回车导致页面刷新的问题

热门文章

  1. 远程桌面Default.rdp 中各个参数的含义
  2. centos7.x/RedHat7.x重命名网卡名称
  3. DIOCP之注册编码解码器与ClientContext
  4. Delphi 字符数组存入文件
  5. Python随机森林算法的使用
  6. .net批量插入SqlServer数据库的方法:
  7. [付费视频]Delphi视频Android开发使用静态库(A)和动态库(SO)
  8. 数据库(表)的逻辑备份与恢复&lt;四&gt;
  9. How to Use JUnit With JMeter
  10. 简单介绍智能DNS解析+双线路接入