System.Reactive基础——使用窗口和缓冲来分组事件数据
问题
一种情况是,假设有一个事件序列,需要在收到事件时将他们分组,比如需要响应成对的输入。另一种情况是,假设需要在两秒的窗口内响应所有的输入。
解决方案
System.Reactive有两个能够对出入的序列进行分组的运算符,分别是Buffer(缓冲)和Window(窗口)。Buffer会保留传入的事件,直至该组完成,此时他会把全部事件作为一个事件集合转发。Window会按逻辑对输入事件分组,但会在时间到来时就把他们传递出去。Buffer的返回类型是IObservable<IList<T>>(集合的事件流),Window的返回类型是IObservable<IObservable
下面的示例使用Interval运算符,,每秒创建一次OnNext通知,每次缓冲两个通知:
“`CSharp Observable.Interval(TimeSpan.FromSeconds(1)).Buffer(2).Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Got {x[0]} and {x[1]}"));
<pre><code class="line-numbers">以我的电脑为例,代码每两秒生成一对输出:
“`CSharp
20: Got 0 and 1
22: Got 2 and 3
24: Got 4 and 5
26: Got 6 and 7
28: Got 8 and 9
30: Got 10 and 11
32: Got 12 and 13
下面这个类似的示例使用Window来创建亮亮一对的事件分组:
Observable.Interval(TimeSpan.FromSeconds(1)).Window(2).Subscribe(
group =>
{
Trace.WriteLine("{DateTime.Now.Second}: Starting new group");
group.Subscribe(x=>Trace.WriteLine("{DateTime.Now.Second}; Saw{x}"),
()=>Trace.WriteLine($"{DateTime.Now.Second}; Ending group"));
});
以我的计算机为例,这个Window示例生成了如下输出:
54: Starting new group
55; Saw0
56; Saw1
56; Ending group
56: Starting new group
57; Saw2
58; Saw3
58; Ending group
58: Starting new group
59; Saw4
0; Saw5
0; Ending group
0: Starting new group
1; Saw6
2; Saw7
2; Ending group
2: Starting new group
3; Saw8
4; Saw9
Buffer等待其分组中的所有事件,然后发布单个集合,windows事件分组方式与之相同,但会在世界传入时便发布他们,另外,Window会立即发布可观察对象,这个对象会为该窗口发布事件。
Buffer和Window均可用于时间跨度:
private void button7_Click(object sender, EventArgs e)
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler).Buffer(TimeSpan.FromSeconds(1)).Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}:Saw {x.Count} items."));
}
讨论
利用Buffer和Window这样的工具,可以将输入塑造成期望看到的样子。另一个适用的技巧是节流
Buffer和Window都有其他的重载,可以在更高场景中使用。带有Skip参数和timeShit参数的重载既可以创建与其他分组重叠的分组,也可以在组与组之间跳过元素。还有一些重载接受委托,可用于动态定义分组的边界。