文章目录
概述
关于Reactive(本文统一译作响应式),有一个The Reactive Manifesto【响应式宣言】:响应式系统(Reactive System)具备以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)。
Reactive Extensions(Rx)是一个为.NET应用提供响应式编程模型的库,用来构建异步基于事件流的应用,通过安装System.ReactiveNuget包进行引用。Rx将事件流抽象为Observable sequences(可观察序列)表示异步数据流,使用LINQ运算符查询异步数据流,并使用Scheduler来控制异步数据流中的并发性。简单地说:Rx = Observables + LINQ + Schedulers。
在软件系统中,事件是一种消息用于指示发生了某些事情。事件由Event Source(事件源)引发并由Event Handler(事件处理程序)使用。
在Rx中,事件源可以由observable表示,事件处理程序可以由observer表示。
但是应用程序使用的数据如何表示呢,例如数据库中的数据或从Web服务器获取的数据。而在应用程序中我们一般处理的数据无外乎两种:静态数据和动态数据。 但无论使用何种类型的数据,其都可以作为流来观察。换句话说,数据流本身也是可观察的。也就意味着,我们也可以用observable来表示数据流。
核心
2个核心接口:IObservable、IObserver
其中IObservable代表观察源,而IObserver是观察者(“鼠标点击”是观察源,“点击后刷新”是观察者)
IObservable接口
IObservable只有一个方法,就是【触发事件】
[NullableContextAttribute(1)]
public interface IObservable<[NullableAttribute(2)] out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
IObserver接口
[NullableContextAttribute(1)]
public interface IObserver<[NullableAttribute(2)] in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
void OnNext(T value), 序列里有新的值的时候会调用这个
void OnCompleted(), 序列结束的时候调用这个
void OnError(Exception ex), 发生错误的时候调用这个
获取事件源的方式
获取事件源
/// <summary>
/// 获取返回一个简单值的事件源
/// </summary>
/// <returns></returns>
private static IObservable<int> GetSimpleObservable()
{
return Observable.Return(42);
}
/// <summary>
/// 返回抛出一个异常的事件源
/// </summary>
/// <returns></returns>
private static IObservable<int> GetThrowObservable()
{
return Observable.Throw<int>(new ArgumentException("Error in observable"));
}
/// <summary>
/// 返回一个空的事件源
/// </summary>
/// <returns></returns>
private static IObservable<int> GetEmptyObservable()
{
return Observable.Empty<int>();
}
/// <summary>
/// 返回一个任务事件源
/// </summary>
/// <returns></returns>
private static IObservable<int> GetTaskObservable()
{
return GetTask().ToObservable();
}
private static async Task<int> GetTask()
{
return 42;
}
/// <summary>
/// 返回一个范围的事件源序列
/// </summary>
/// <returns></returns>
private static IObservable<int> GetRangeObservable()
{
return Observable.Range(2, 10);
}
/// <summary>
/// 返回一个定时器事件源序列
/// </summary>
/// <returns></returns>
private static IObservable<long> GetIntervalObservable()
{
return Observable.Interval(TimeSpan.FromMilliseconds(200));
}
/// <summary>
/// 创建一个基本事件源序列
/// </summary>
/// <returns></returns>
private static IObservable<int> GetCreateObservable()
{
return Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnNext(4);
observer.OnCompleted();
return Disposable.Empty;
});
}
/// <summary>
/// 创建一个类似for循环的事件源序列
/// </summary>
/// <returns></returns>
private static IObservable<int> GetGenerateObservable()
{
// 类似for循环
return Observable.Generate(
1, // 初始值
x => x < 10, // 循环停止条件
x => x + 1, // 循环一次+1
x => x + 20 // 循环中执行的操作
);
}
事件调用
最简单的用法应该是
class Program
{
static void Main(string[] args)
{
var numbers = Observable.Range(2, 10);
var observer = new MyConsoleObserver<int>();
numbers.Subscribe(observer);
Console.ReadLine();
}
}
/// <summary>
/// 自定义观察者对象
/// </summary>
/// <typeparam name="T"></typeparam>
public class MyConsoleObserver<T> : IObserver<T>
{
public void OnNext(T value)
{
Console.WriteLine("接收到 value {0}", value);
}
public void OnError(Exception error)
{
Console.WriteLine("出现异常! {0}", error);
}
public void OnCompleted()
{
Console.WriteLine("关闭观察行为");
}
}
RX 操作符
多播传输靠:Subject
基于以上示例,我们了解到,借助Rx可以简化事件模型的实现,而其实质上就是对观察者模式的扩展。提到观察者模式,我们知道一个Subject可以被多个观察者订阅,从而完成消息的多播。同样,在Rx中,也引入了Subject用于多播消息传输,不过Rx中的Subject具有双重身份——即是观察者也是被观察者。
interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>
{
}
Rx中默认提供了以下四种实现:
Subject - 向所有观察者广播每个通知
AsyncSubject - 当可观察序列完成后有且仅发送一个通知
ReplaySubject - 缓存指定通知以对后续订阅的观察者进行重放
BehaviorSubject - 推送默认值或最新值给观察者
一种Subject有一点需要指出,当其有多个观察者序列时,一旦其中一个停止发送消息,则Subject就停止广播所有其他序列后续发送的任何消息
有温度的可观察者序列
对于Observable,它们是有温度的,有冷热之分。它们的区别如下图所示:
Cold Observable:有且仅当有观察者订阅时才发送通知,且每个观察者独享一份完整的观察者序列。
Hot Observable:不管有无观察者订阅都会发送通知,且所有观察者共享同一份观察者序列。
Scheduler
在Rx中,使用Scheduler来控制并发。而对于Scheduler我们可以理解为程序调度,通过Scheduler来规定在什么时间什么地点执行什么事情。Rx提供了以下几种Scheduler:
- NewThreadScheduler:即在新线程上执行
- ThreadPoolScheduler:即在线程池中执行
- TaskPoolScheduler:与ThreadPoolScheduler类似
- CurrentThreadScheduler:在当前线程执行
- ImmediateScheduler:在当前线程立即执行
- EventLoopScheduler:创建一个后台线程按序执行所有操作
举例而言:
Observable.Return("Hello",NewThreadScheduler.Default)
.Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")
);
Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");
以上输出:
Current ThreadId:1
Hello on ThreadId:4