响应式编程入门系列(一)

背景

在软件开发的演变过程中,我们经历了命令式、面向对象和函数式编程等范式。如今,随着异步和分布式系统的普及,响应式编程范式开始逐渐崭露头角。

简单的例子

为了直观地理解什么是响应式,我们先从一个大家都比较熟悉的类比开始。首先打开Excel,在B、C、D三列输入如下公式:

B、C和D三列每个单元格的值均依赖其左侧的单元格,当我们在A列依次输入1、2和3时,变化会自动传递到了B、C和D三列,并触发相应状态变更,如下图:

我们可以把A列从上到下想象成一个数据流,每一个数据到达时都会触发一个事件,该事件会被传播到右侧单元格,后者则会处理事件并改变自身的状态。这一系列流程其实就是响应式的核心思想。

为什么需要响应式编程

这个问题是很多人了解响应式编程会遇到的问题,因为在很多人看来响应式编程能做的事情常规编程也能做到。为什么还要响应式编程?

我们要理解响应式编程是为了解决什么问题,知道了这个核心我们就可以理解其设计思路。响应式编程的目的是为了方便创建异步编程。异步的概念就是请求的结果并不会立即返回,而是会通过回调或者推的方式来实现。

采用回调的方式在业务流程比较复杂的情况下,由于多个回调嵌套调用,导致代码可读性差、难以维护,成为回调地狱。而响应式编程就是采取了推的方式,为了方便这个"推"的效果,采取了数据流的设计,也就是所有的代码都是在定义数据的流向,当你的代码完成后,数据的流向也就确定了。那么当数据产生后,就会按照制定的流程流向订阅者。

响应式编程从设计上摒弃了原有的历史负担,为异步编程提供了很多开箱即用的功能。随着异步并行编程的复杂度增加,原有的编程方式变得愈加复杂而且难以理解和维护。而响应式编程由于采用声明式编程,代码的复杂度得到大幅的降低,无论从阅读还是维护反应式编程都具有很大的优势。

响应式宣言

用分布式系统实现的业务的主要价值在于即时响应性。而实现一个即时响应性系统意味着遵循弹性和回弹性等基本原则。最后,获得具有即时响应性、弹性和回弹性的系统的基本方法之一是采用消息驱动的通信。此外,遵循这些原则构建的系统具有高度的可维护性和可扩展性,因为系统中的所有组件都是相互独立且适当隔离的。
以上所有概念都不是新的,并且已经定义在响应式宣言中。响应式宣言是描述响应式系统基本概念的词汇表。创建此宣言是为了确保业务人员和开发人员对惯用的概念有相同的理解。需要强调的是,响应式系统以及响应式宣言与架构有关,它可以应用于大型分布式应用程序或小型单节点应用程序。

定义

wiki百科的定义:

响应式编程 (reactive programming) 是一种基于数据流 (data stream) 和 变化传递 (propagation of change) 的声明式 (declarative) 的编程范式。

响应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化时,传统的编程范式需要对 a+b 进行重新计算来得到 c 的值。如果使用反应式编程,当 a 或者 b 的值发生变化时,c 的值会自动更新。

在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次由调用者通过 next()方法来获取序列中的下一个值。使用响应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。在响应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。

GUI的Listener、JavaFX的Property、Vue和微信小程序的MVVM模式都属于响应式编程实践。

演进

本节将以Java上最早的响应式编程库RxJava1.0为基础介绍响应式编程的核心设计思想。

观察者模式

我们首先回想一个“古老”的、众所周知的设计模式,即观察者模式(Observer pattern)。这是GoF提出的23个著名设计模式中的一个。乍一看,观察者模式似乎与响应式编程无关。但是经过一些小修改,它定义了响应式编程的基础。

观察者模式拥有一个主题(subject),其中包含该模式的依赖者列表,这些依赖者被称为观察者(Observer)。主题通常通过调用自身的一个方法将状态变化通知观察者。在基于事件处理实现系统时,此模式至关重要。观察者模式是MVC(模型-视图-控制器)模式的重要组成部分。因此,几乎所有UI库都在内部应用它。 

典型的观察者模式由 Subject 和 Observer 这两个接口组成。在这里,Observer在 Subject 中注册并接受它的通知。 Subject 既可以自己生成事件,也可以被其他组件调用。

Java定义Subject 接口如下:

public interface Subject<T> {
    void registerObserver(Observer<T> observer);
    void unregisterObserver(Observer<T> observer);
    void notifyObservers(T event);
}

 Oberver接口如下:

public interface Observer<T> {
    void observe(T event);
}

Observer 接口的实现可能负责订阅过程, Observer 实例也可能根本不知道 Subject 的存在。在后一种情况下,第三方组件可能负责查找所有Subject 实例和所有注册程序。例如,依赖注入(Dependency Injection)容器可能扮演类似的角色。它将使用 @EventListener 注解和正确的签名扫描每个 Observer 的类路径。然后,它会将找到的组件注册到 Subject 。 

两个非常简单的观察者的实现,它们只接收 String 消息并将消息打印到输出流。 

public class ConcreteObserverA implements Observer<String> {
    @Override
    public void observe(String event) {
        System.out.println("Observer A: " + event);
    }
}
public class ConcreteObserverB implements Observer<String> {
    @Override
    public void observe(String event) {
        System.out.println("Observer B: " + event);
    }
}

Subject<String> 的实现,它生成 String 事件:

public class ConcreteSubject implements Subject<String> {
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();
    public void registerObserver(Observer<String> observer) {
        observers.add(observer);
    }
    public void unregisterObserver(Observer<String> observer) {
        observers.remove(observer);
    }
    public void notifyObservers(String event) {
        observers.forEach(observer -> observer.observe(event));
    }
}

Spring现在为事件处理提供了@EventListener 注解,为事件发布提供了ApplicationEventPublisher 类。即 @EventListener 和 ApplicationEventPublisher实现了发布-订阅模式(Publish-Subscribe pattern),它可以被视为观察者模式的变体。与观察者模式相反,在发布-订阅模式中,发布者和订阅者不需要彼此了解。

观察者加迭代器等于响应式流

我们再把之前的Subject 和 Observer接口做个升级,生产者要能够发出数据流结束信号,生产者在消费者出现之前不生成事件。在同步世界中,有一个设计模式可以用于这一场景,即迭代器(Iterator)模式。我们可以使用以下代码来描述该模式:

public interface Iterator<T> {
    T next();
    boolean hasNext();
}

为了逐个获取元素, Iterator 提供了 next() 方法,并且它还可以通过hasNext() 方法返回 false 值,从而发出序列结束的信号。那么如果我们试图将这个想法与观察者模式提供的异步执行进行混合:

public interface RxObserver<T> {
    void onNext(T next);
    void onComplete();
}

RxObserver非常类似于Iterator ,但它不是调用 Iterator 的 next()方法,而是通过onNext()回调将一个新值通知到RxObserver 。这里不是检查 hasNext() 方法的结果是否为正,而是通过调用onComplete()方法通知RxObserver流的结束。错误如何处理呢?因为Iterator 可能在处理 next() 方法时抛出 Exception ,所以如果有一个从生产者到 RxObserver 的错误传播机制会很棒。为此添加一个特殊的回调,即 onError() 。因此,最终解决方案如下所示: 

public interface RxObserver<T> {
    void onNext(T next);
    void onComplete();
    void onError(Exception e);
}

Observer接口是RxJava的基本概念。此接口定义了数据如何在响应式流的每个部分之间进行流动。作为库的最小组成部分, Observer 接口随处可见。 RxObserver类似于前面介绍的观察者模式中的 Observer 。

Observable 响应式类是观察者模式中 Subject 的对应类。因此,Observable 扮演事件源的角色,它会发出元素。它有数百种流转换方法以及几十种初始化响应式流的工厂方法。
Subscriber 抽象类不仅实现 Observer 接口并消费元素,还被用作Subscriber 的实际实现的基础。 Observable 和 Subscriber 之间的运行时关系由 Subscription 控制, Subscription 可以检查订阅状态并在必要时取消它。

RxJava定义了有关发送元素的规则,使Observable能发送任意数量的元素(包括零个)。然后它通过声明成功或引发错误来指示执行结束。因此,Observable会为与它关联的每个Subscriber多次调用onNext,然后再调用 onComplete或 onError(但不能同时调用两者)。所以在onComplete() 或 onError() 之后调用 onNext() 是不可行的。 

生产和消费流数据

现在,我们应该对RxJava库有了足够的了解并能够创建一个小应用示例。让我们定义一个由 Observable 类表示的流。目前,我们可以假设 Observable 是一种知道如何在订阅时为订阅者传播事件的生成器:

Observable<String> observale = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, reactive world!");
            sub.onCompleted();
        }
    }
);

所以,在这里我们创建一个 Observable 并使其带有一个回调,该回调将在订阅者出现时立即被触发。此时, Observer 将产生一个字符串值,并将流的结束信号发送给订阅者。我们还可以使用Java 8 lambda改进此代码:

Observable<String> observable = Observable.create(
    sub -> {
        sub.onNext("Hello, reactive world!");
        sub.onCompleted();
    }
);

与Java StreamAPI相比, Observable 是可重用的,而且每个订阅者都将在订阅之后立即收到 Hello, reactive world! 事件。

所以,现在我们需要一个 Subscriber ,代码如下所示:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
    @Override
    public void onError(Throwable e) {
        System.err.println(e);
    }
};

可以看到, Subscriber 必须实现 Observer 方法并定义对新事件、流完成和错误的响应。现在,将 observable 和 subscriber 实例串接在一起: 

observable.subscribe(subscriber);

运行上述代码,程序会生成以下输出:
Hello, reactive world!
Done!
至此我们刚刚编写了一个小而简单的响应式Hello-World应用程序!我们还可以使用lambda重写此示例,代码如下所示: 

Observable.create(
    sub -> {
        sub.onNext("Hello, reactive world!");
        sub.onCompleted();
    }
).subscribe(
    System.out::println,
    System.err::println,
    () -> System.out.println("Done!")
);

还可以有一些东西可以控制观察者-订阅者之间的协作。这就是Subscription ,该接口声明如下:

interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
} 

unsubscribe() 方法能让 Subscriber 通知 Observable 不需要再发送新事件。换句话说,上述方法代表的是订阅取消。另外, Observable 使用isUnsubscribed() 来检查 Subscriber 是否仍在等待事件。 

以下代码演示了在定义响应式流时如何获取一个 Subscription,还展示了如何取消对流的订阅:

CountDownLatch externalSignal = ...;
Subscription subscription = Observable
    .interval(100, MILLISECONDS)
    .subscribe(System.out::println);
externalSignal.await();
subscription.unsubscribe();

此时,我们已经了解到,响应式编程包含一个Observable流、一个Subscriber,以及某种 Subscription。该Subscription会传达Subscriber从Observable生产者处接收事件的意图。

RxJava库为创建 Observable 实例和 Subscriber 实例提供了很大的灵活性。我们可以使用 just 来引用元素、使用旧式数组,或者使用 from 通过 Iterable 集合,也可以引用 Callable及Future来创建 Observable 实例:

Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());

Observable<String> hello = Observable.fromCallable(() ->"Hello");
Future<String> future = Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future);

RxJava不仅可以生成一个未来的事件,还可以基于时间间隔等生成一个异步事件序列:

Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(e -> System.out.println("Received: " + e));
Thread.sleep(5000);

上面的代码如果删除 Thread.sleep(...),那么应用程序将在不输出任何内容的情况下退出。发生这种情况是因为生成事件并进行消费的过程发生在一个单独的守护线程中。因此,为了防止主线程完成执行,我们可以调用 sleep() 方法或执行一些其他有用的任务。 

流转换与弹珠图

使用 Observable 和 Subscriber 可以实现大量基础的工作流程,但RxJava的强大功能还包含在它的大量操作符中。操作符用于调整流的元素或更改流结构本身。对于操作符的描述人们发明了弹珠图(marble diagram),弹珠图能将流转换以可视化方式呈现出来。

下面介绍几个最基础的操作符。

map操作符

RxJava中最常用的操作符是 map

<R> Observable<R> map(Func1<T, R> func)

func 函数可以将 T 对象类型转换为 R 对象类型,并且应用 map 将 Observable<T> 转换为 Observable<R> 。

从图中可以看出,map 执行一对一的转换,输出流具有与输入流相同数量的元素。 

filter操作符

与 map 操作符相比, filter 操作符所产生的元素可能少于它所接收的元素。它只发出那些已成功通过谓词测试的元素。

count操作符

count 操作符自描述性很强,它发出的唯一值代表输入流中的元素数量。但是, count 操作符只在原始流结束时发出结果,因此,在处理无限流时, count 操作符将不会完成或返回任何内容。

zip操作符

zip操作符具有更复杂的行为,因为它会通过应用 zip 函数来组合来自两个并行流的值。它通常用于填充数据,且特别适用于部分预期结果从不同源获取的情况。

 一个简单的示例:

Observable.zip(
    Observable.just("A", "B", "C"),
    Observable.just("1", "2", "3"),
    (x, y) -> x + y
).forEach(System.out::println);
//输出为:
//A1
//B2
//C3

RxJava的 Observable 提供了数十个流转换操作符并涵盖了大量使用场景。RxJava不限制开发人员使用库提供的操作符以外的操作符。我们也可以通过实现从Observable.Transformer<T,R> 派生的类来编写自定义操作符。通过应用 Observable.compose(transformer) 操作符,我们可以将这样的操作符逻辑包括在工作流中。

RxJava1.x

以上是Java平台上用于响应式编程的标准库,即RxJava1. x 的核心实现。该库为Java响应式编程奠定了基础。目前,它不是唯一的响应式库,我们还有Akka Streams和Project Reactor。随着2. x版的发布,RxJava本身也发生了很大的变化。

虽然不同响应库的API可能稍有差异,其实现细节也各种各样,但其基本概念保持不变,即订阅者订阅可观察流,该流又反过来触发事件生成的异步过程。在生产者和订阅者之间通常存在一些订阅信息,这些信息使打破生产者-消费者关系成为可能。这种方式非常灵活,并使我们可以控制生产和消费的事件数量,节省CPU时间(CPU时间通常会浪费在创建永远不会用到的数据上)。

奇怪的是,我们今天所知道的RxJava历史和响应式编程的历史始于微软内部。2005年,Erik Meijer和他的云可编程性团队正在实验适合构建大规模异步数据密集型互联网服务架构的编程模型。经过几年的实验,Rx库的第一个版本诞生于2007年夏天。此后,Erik Meijer及其团队又花了两年时间专门讨论了该库的不同方面,包括多线程和协同性重调度。Rx.NET的第一个公开版本于2009年11月18日发布。不久之后,微软将该库移植到不同的语言中,例如JavaScript、C++、Ruby和Objective-C,以及Windows Phone平台。随着Rx开始普及,微软在2012年秋季开源了Rx.NET。

在某个时间点,Rx的想法传播到微软之外,Paul Betts与GitHub公司的Justin Spahr-Summers在2012年为Objective-C实现并发布了ReactiveCocoa。与此同时,Netflix的Ben Christensen将Rx.NET移植到Java平台,并于2013年初在GitHub上开源了RxJava库。

当时,Netflix面临着处理流媒体产生的大量互联网流量这一非常复杂的问题。一个名为RxJava的异步响应式库帮助他们构建了响应式系统,该系统在2015年拥有北美37%的互联网流量份额!现在,系统中的很大一部分流量由RxJava处理。为了承受这些巨大的负载,Netflix不得不发明新的架构模式并在库中实现它们,其中最著名的有Hystrix、Ribbon、Zuul、RxNetty等等(熟悉微服务的开发人员大概都知道这几个模块,这是集成在Spring Cloud Netflix微服务框架中的组件)。

小结

本篇简述了响应式编程的基础概念及RxJava1.x的基本实现。

在RxJava1.x版本后,各种不同的响应式编程库被开发出来,库之间的自然演化和竞争是正常的,但很明显,一旦我们尝试在一个Java应用程序中组合一些不同的响应式库或框架,就会出现问题。此外,我们会发现响应式库的行为总体上非常类似,但细节略有不同。此时,整个响应式编程很明显需要一些标准或通用API,为各种不同的库实现之间的兼容性提供保证。这样的标准已经被设计出来了,称为响应式流(Reactive Streams),下一篇中将进行详细介绍。

相关推荐

  1. Vue的响应编程

    2024-04-21 11:16:01       63 阅读
  2. 响应编程-数据劫持

    2024-04-21 11:16:01       26 阅读
  3. 响应编程Reactor API大全(中)

    2024-04-21 11:16:01       43 阅读
  4. 响应编程WebFlux基础实战练习

    2024-04-21 11:16:01       44 阅读
  5. 响应编程WebFlux基础API

    2024-04-21 11:16:01       50 阅读
  6. 响应编程Reactor API大全(下)

    2024-04-21 11:16:01       38 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-21 11:16:01       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-21 11:16:01       106 阅读
  3. 在Django里面运行非项目文件

    2024-04-21 11:16:01       87 阅读
  4. Python语言-面向对象

    2024-04-21 11:16:01       96 阅读

热门阅读

  1. 【计算机网络】面经

    2024-04-21 11:16:01       38 阅读
  2. 如何使用Python进行Web开发,如Flask或Django?

    2024-04-21 11:16:01       38 阅读
  3. 华为海思数字芯片设计笔试第八套

    2024-04-21 11:16:01       29 阅读
  4. LlamaIndex 组件 - Prompts

    2024-04-21 11:16:01       49 阅读
  5. 汽车牌照-C++

    2024-04-21 11:16:01       27 阅读
  6. 什么是Transformer架构的自注意力机制?

    2024-04-21 11:16:01       38 阅读
  7. HTML5声明与编码设置

    2024-04-21 11:16:01       35 阅读
  8. 【vim】折叠代码

    2024-04-21 11:16:01       35 阅读