我们使用 C#中的 yield 关键字可以实现迭代器,使用 async 和 await 关键字可以实现异步方法。异步流是这两种功能的结合体,它用异步方式生成和消费数据的迭代器。异步流是在 C#8 中引入的,它以IAsyncEnumerable<out T>
和IAsyncEnumerator<out T>: IAsyncDisposable
两个接口为基础,这两个接口的代码如下:
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator ();
}
public interface IAsyncEnumerator<out T>: IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}
复制代码
在上面的代码中,从序列中获取每个元素的方法 MoveNextAsync 是一个异步操作,元素以零散的方式到达,就是异步流。IAsyncEnumerator 接口中的的 ValueTask<T>是 Task<T>类型的轻量化封装,它的类型是结构类型,使用方式和 Task<T>差不多,但是它在同步完成任务时或者在返回马上可以使用的结果时可以减少内存开销,因此它比 Task<T>更加高效。下面我们来看一看异步流的用法,首先我们定义一个计算斐波那契数列的方法 Fibonacci。其中我们使用 Thread.Sleep 模拟一个耗时操作。代码如下:
IEnumerable<int> Fibonacci(int count)
{
int prev = 1;
int curr = 1;
for (int i = 0; i < count; i++)
{
yield return prev;
Thread.Sleep(1000000);
int temp = prev + curr;
prev = curr;
curr = temp;
}
}
复制代码
在代码中 Thread.Sleep(1000000)是同步的,也就是说只有 Thread.Sleep(1000000)执行完成,后续代码才能继续执行。因此为了提高执行效率我们需要把 Thread.Sleep(1000000)改成异步的,在这里我们就可以让它生成异步流。要生成异步流就需要同用到迭代器和异步方法。要实现这个功能就必须使用 IAsyncEnumerable<T>接口作为方法的返回类型。修改后的代码如下:
async IAsyncEnumerable<int> FibonacciAsync(int count)
{
int prev = 1;
int curr = 1;
Random r = new();
for (int i = 0; i < count; i++)
{
yield return prev;
await Task.Delay(1000000);
int temp = prev + curr;
prev = curr;
curr = temp;
}
}
复制代码
经过修改后,FibonacciAsync 方法允许调用方以异步方式消费生成的数列,也就是说可以使用 await foreach 来遍历消费这个方法的返回结果。调用如下:
await foreach (var f in FibonacciAsync(200))
Console.Write("{0} ", f);
复制代码
修改后的代码虽然可以以一步的方式消费生成的数列,但是如果在 LINQ 查询语句中消费异步流是无法使用的。这时就需要引入 System.Linq.Async,调用如下:
IAsyncEnumerable<int> query =
from f in FibonacciAsync(200)
where f % 2 == 1
select f * 2;
await foreach (var number in query)
Console.WriteLine(number);
复制代码
如果想在 ASP.NET Core 的 Action 中返回异步流可以像下面这样做:
[HttpGet]
public async IAsyncEnumerable<string> Get()
{
using var dbContext = new BookContext();
await foreach (var userName in dbContext.Users
.Select(u => u.Name)
.AsAsyncEnumerable())
yield return userName;
}
复制代码
总结
异步流解决的是零散数据异步生成和消费问题。在 C#8 以前一组数据只能以整体异步的方式返回给调用者。
评论