异步流——异步流及其取消操作
问题
假设需要取消异步流,应该怎么做呢?
解决方案
并非所有的异步流都需要取消。当满足条件时简单地停止枚举也是可行的。如下所示,如果这种类型的“取消”可以满足需要,便不再需要真正意义上的取消操作:
await foreach(int result in SlowRange())
{
Console.WriteLine(result);
if(result>=8)
{
break;
}
}
async IAsyncEnumerable<int> SlowRange()
{
for(int i=0;i!=10;i++)
{
await Task.Delay(i*100);
yield return i;
}
}
取消异步流往往是有用的。这是因为某些运算符会对他们的源流传递取消令牌,就需要用CancellationToken从外部停止 wawait foreach
通过定义标记了EnumeratorCancellation属性的参数,返回IAsyncEnumerable<T>的async方法也许可以接收取消令牌
using var cts=new CancellationTokenSource(500);
CancellationToken token=cts.Token;
await foreach(int result in SlowRange(token))
{
Console.WriteLine(result);
}
async IAsyncEnumerable<int> SlowRange([EnumeratorCancellation]CancellationToken token=default)
{
for(int i=0;i!=10;i++)
{
await Task.Delay(i*100,token);
yield resturn i;
}
}
讨论
这里的解决方案将CancellationToken直接传给了返回异步枚举的方法,这是很常见的用法。
在其他场景中,代码会受到异步枚举,并把CancellationToken应用到其使用的枚举器中。在对可枚举对象开启新的枚举时,因为需要使用取消令牌,所以如此应用CancellationToken是合理的。在某些场景,应该把不同的取消令牌传入针对可枚举对象的不同枚举中。
简而言之,可取消的并非可枚举对象,而是由可枚举对象创建的枚举器。异步流支持WithCancellation扩展方法。
async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
using var cts=new CancellationTokenSource(500);
CancellationToken token=cts.Token;
await foreach(int result in items.WithCancellation(token))
{
Console.WriteLine(result);
}
}
async IAsyncEnumerable<int> SlowRange([EnumeratorCancellation] CancellationToken token=defualt)
{
for(int i=0;i!=10;i++)
{
await Task.Delay(i*100,token);
yield return i;
}
}
await ConsumeSequence(slowRange());
由于存在EnumeratorCancellation参数属性,因此编译器会负责将令牌从WithCancellation传递到由EnumeratorCancellation标记的token参数,此时取消请求会在处理完前几项之后,让await foreach抛出OperationCanceledException。
WithCancellation扩展方法不会阻碍ConfigureAwait(false),可以将这两种扩展方法串联在一起。
async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
using var cts=new CancellationTokenSource(500);
CancellationToken token=cts.Token;
await foreach(int result in items.WithCancellation(token).ConfigureAwait(false))
{
Console.WriteLine(result);
}
}