【.NET跨平台】Rx.NET

概述

关于Reactive(本文统一译作响应式),有一个The Reactive Manifesto【响应式宣言】:响应式系统(Reactive System)具备以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)。
在这里插入图片描述
Reactive Extensions(Rx)是一个为.NET应用提供响应式编程模型的库,用来构建异步基于事件流的应用,通过安装System.ReactiveNuget包进行引用。Rx将事件流抽象为Observable sequences(可观察序列)表示异步数据流,使用LINQ运算符查询异步数据流,并使用Scheduler来控制异步数据流中的并发性。简单地说:Rx = Observables + LINQ + Schedulers。

在这里插入图片描述

在软件系统中,事件是一种消息用于指示发生了某些事情。事件由Event Source(事件源)引发并由Event Handler(事件处理程序)使用。
在Rx中,事件源可以由observable表示,事件处理程序可以由observer表示。
但是应用程序使用的数据如何表示呢,例如数据库中的数据或从Web服务器获取的数据。而在应用程序中我们一般处理的数据无外乎两种:静态数据和动态数据。 但无论使用何种类型的数据,其都可以作为流来观察。换句话说,数据流本身也是可观察的。也就意味着,我们也可以用observable来表示数据流。

核心

2个核心接口:IObservable、IObserver

其中IObservable代表观察源,而IObserver是观察者(“鼠标点击”是观察源,“点击后刷新”是观察者)

IObservable接口

IObservable只有一个方法,就是【触发事件】

[NullableContextAttribute(1)]
public interface IObservable<[NullableAttribute(2)] out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

IObserver接口

[NullableContextAttribute(1)]
public interface IObserver<[NullableAttribute(2)] in T>
{
  
    void OnCompleted();

    void OnError(Exception error);

    void OnNext(T value);
}

void OnNext(T value), 序列里有新的值的时候会调用这个

void OnCompleted(), 序列结束的时候调用这个

void OnError(Exception ex), 发生错误的时候调用这个

获取事件源的方式

获取事件源

 /// <summary>
 /// 获取返回一个简单值的事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetSimpleObservable()
 {
     return Observable.Return(42);
 }

 /// <summary>
 /// 返回抛出一个异常的事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetThrowObservable()
 {
     return Observable.Throw<int>(new ArgumentException("Error in observable"));
 }

 /// <summary>
 /// 返回一个空的事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetEmptyObservable()
 {
     return Observable.Empty<int>();
 }

 /// <summary>
 /// 返回一个任务事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetTaskObservable()
 {
     return GetTask().ToObservable();
 }

 private static async Task<int> GetTask()
 {
     return 42;
 }

 /// <summary>
 /// 返回一个范围的事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetRangeObservable()
 {
     return Observable.Range(2, 10);
 }

 /// <summary>
 /// 返回一个定时器事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<long> GetIntervalObservable()
 {
     return Observable.Interval(TimeSpan.FromMilliseconds(200));
 }

 /// <summary>
 /// 创建一个基本事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetCreateObservable()
 {
     return Observable.Create<int>(observer =>
     {
         observer.OnNext(1);
         observer.OnNext(2);
         observer.OnNext(3);
         observer.OnNext(4);
         observer.OnCompleted();
         return Disposable.Empty;
     });
 }

 /// <summary>
 /// 创建一个类似for循环的事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetGenerateObservable()
 {
     // 类似for循环
     return Observable.Generate(
         1,              // 初始值
         x => x < 10,    // 循环停止条件
         x => x + 1,     // 循环一次+1
         x => x + 20     // 循环中执行的操作
     );
 }
 

事件调用

最简单的用法应该是

class Program
{
    static void Main(string[] args)
    {
        var numbers = Observable.Range(2, 10);
        var observer = new MyConsoleObserver<int>();
        numbers.Subscribe(observer);
        Console.ReadLine();
    }
}

/// <summary>
/// 自定义观察者对象
/// </summary>
/// <typeparam name="T"></typeparam>
public class MyConsoleObserver<T> : IObserver<T>
{
    public void OnNext(T value)
    {
        Console.WriteLine("接收到 value {0}", value);
    }
    public void OnError(Exception error)
    {
        Console.WriteLine("出现异常! {0}", error);
    }
    public void OnCompleted()
    {
        Console.WriteLine("关闭观察行为");
    }
}

RX 操作符

在这里插入图片描述

多播传输靠:Subject

基于以上示例,我们了解到,借助Rx可以简化事件模型的实现,而其实质上就是对观察者模式的扩展。提到观察者模式,我们知道一个Subject可以被多个观察者订阅,从而完成消息的多播。同样,在Rx中,也引入了Subject用于多播消息传输,不过Rx中的Subject具有双重身份——即是观察者也是被观察者。

interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>
{
}

Rx中默认提供了以下四种实现:

Subject - 向所有观察者广播每个通知
在这里插入图片描述
AsyncSubject - 当可观察序列完成后有且仅发送一个通知
在这里插入图片描述
ReplaySubject - 缓存指定通知以对后续订阅的观察者进行重放
在这里插入图片描述
BehaviorSubject - 推送默认值或最新值给观察者
在这里插入图片描述
一种Subject有一点需要指出,当其有多个观察者序列时,一旦其中一个停止发送消息,则Subject就停止广播所有其他序列后续发送的任何消息
在这里插入图片描述

有温度的可观察者序列

对于Observable,它们是有温度的,有冷热之分。它们的区别如下图所示:
在这里插入图片描述
Cold Observable:有且仅当有观察者订阅时才发送通知,且每个观察者独享一份完整的观察者序列。
Hot Observable:不管有无观察者订阅都会发送通知,且所有观察者共享同一份观察者序列。

Scheduler

在Rx中,使用Scheduler来控制并发。而对于Scheduler我们可以理解为程序调度,通过Scheduler来规定在什么时间什么地点执行什么事情。Rx提供了以下几种Scheduler:

  1. NewThreadScheduler:即在新线程上执行
  2. ThreadPoolScheduler:即在线程池中执行
  3. TaskPoolScheduler:与ThreadPoolScheduler类似
  4. CurrentThreadScheduler:在当前线程执行
  5. ImmediateScheduler:在当前线程立即执行
  6. EventLoopScheduler:创建一个后台线程按序执行所有操作

举例而言:

Observable.Return("Hello",NewThreadScheduler.Default)
.Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")
);
Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");
 
以上输出:
Current ThreadId:1
Hello on ThreadId:4

来源

Rx.NET基础使用
Rx .Net 简介
响应式编程知多少 | Rx.NET 了解下

相关推荐

  1. c++ui

    2024-07-19 10:18:01       52 阅读
  2. 探索UI框架Maui

    2024-07-19 10:18:01       42 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-19 10:18:01       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-19 10:18:01       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-19 10:18:01       57 阅读
  4. Python语言-面向对象

    2024-07-19 10:18:01       68 阅读

热门阅读

  1. 深入解析`Arrays.asList`的用法与潜在陷阱

    2024-07-19 10:18:01       19 阅读
  2. Kubernetes面试整理-ELK和EFK的区别?

    2024-07-19 10:18:01       18 阅读
  3. 智能合约中重放攻击

    2024-07-19 10:18:01       21 阅读
  4. 【19】读感 - 架构整洁之道(一)

    2024-07-19 10:18:01       18 阅读
  5. [C++]运算符重载

    2024-07-19 10:18:01       19 阅读
  6. 每天一个数据分析题(四百三十七)- 统计量

    2024-07-19 10:18:01       22 阅读
  7. 缓存机制如何帮助减轻雪崩效应:

    2024-07-19 10:18:01       22 阅读
  8. 接近50个实用编程相关学习资源网站

    2024-07-19 10:18:01       20 阅读
  9. Seata 隔离级别问题

    2024-07-19 10:18:01       19 阅读
  10. 深入理解TCP/IP协议:三次握手与四次挥手

    2024-07-19 10:18:01       24 阅读