快速了解响应式编程与Reactive

按照目前的互联网整体开发现状,响应式编程和 WebFlux 的开发已经越来越多,虽然 WebMvc 的使用和开发已经非常普遍而且大家都能熟练使用,但面对高并发和高性能的要求,响应式编程的呼声越来越高,而且自从14年jdk1.8发行后,Java就已经支持了函数式编程,这也为后续的响应式编程提供了技术基础。接下来的这几篇我们来了解一下响应式编程和 WebFlux ,以及解析 WebFlux 中的一些原理,从而让小伙伴们从更深层面理解 WebFlux ,以及它和 WebMvc 的异同。

1. 响应式编程引入

WebFlux 的核心技术是响应式编程,而对于这个概念,到目前业界还没有一个完完全全被认可的定义,那既然我们是研究 WebFlux ,那我们去 SpringFramework 的官网来了解一下:

We touched on “non-blocking” and “functional” but what does reactive mean?

The term, “reactive,” refers to programming models that are built around reacting to change — network components reacting to I/O events, UI controllers reacting to mouse events, and others. In that sense, non-blocking is reactive, because, instead of being blocked, we are now in the mode of reacting to notifications as operations complete or data becomes available.

There is also another important mechanism that we on the Spring team associate with “reactive” and that is non-blocking back pressure. In synchronous, imperative code, blocking calls serve as a natural form of back pressure that forces the caller to wait. In non-blocking code, it becomes important to control the rate of events so that a fast producer does not overwhelm its destination.

Reactive Streams is a small spec (also adopted in Java 9) that defines the interaction between asynchronous components with back pressure. For example a data repository (acting as Publisher) can produce data that an HTTP server (acting as Subscriber) can then write to the response. The main purpose of Reactive Streams is to let the subscriber to control how quickly or how slowly the publisher produces data.

我们都讲响应式编程式是“无障碍”和“功能性”的,但是响应式意味着什么呢?

“响应式”这个概念是指围绕对更改做出反应的编程模型-网络组件对I/O事件做出反应,UI控制器对鼠标事件做出反应等。从这个意义上说,响应式是非阻塞的,因为随着操作完成或数据可用,我们现在处于响应通知的模式,而不是被阻塞。

Spring团队还有另一个重要机制与“响应式”相关联,这是不阻碍背压的机制。在同步命令式代码中,阻塞调用是强制调用者等待的一种自然的背压形式。在非阻塞代码中,控制事件的速率非常重要,这样事件数据生产方就不会快速地淹没其消费方。

Reactive Streams是一个小的规范(在Java 9中也采用了),它定义了带有反压力的异步组件之间的交互。例如,数据存储库(充当发布者)可以生成数据,以便作为HTTP服务器(充当订阅者)将其写入响应中。 Reactive Streams的主要目的是让订阅者控制发布者生成数据的速度。

这段解释有好多陌生的概念,咱暂且放置一边,毕竟不好理解的内容不要强行灌输到自己脑子里,这样没什么好处的。咱先用几个简单的例子来体会一些基础概念。

2. 体会一些基础概念

2.1 异步非阻塞

之前看过一个非常好的解释异步非阻塞的例子,小册加以引用:

你在家里烧水,烧水的壶有两种:没有哨的普通水壶、壶盖上带哨的响水壶。对于烧水的动作,有四种情景:

  • 你先用普通水壶烧水,但你不放心什么时候水壶里的水会烧开,于是你就搬个小马扎坐在壶旁边盯着,直到壶冒热气,你知道水开了,拎下来,烧水结束。此谓:同步阻塞 (人会一直盯着壶,期间不干别的事情,烧水占据了你的注意力和时间,构成同步阻塞)

  • 你觉得这种烧水方法太浪费自己时间了,于是下一次烧水的时候你就不搬小马扎坐壶旁边干等着了,而是一边玩手机,玩几分钟就看一眼壶里的水开了没,水没开就继续玩,发现水开了就拎下来。此谓:同步非阻塞 (人不再一直盯着壶了,但还在惦记着壶,只是不再被壶一直占用着时间,构成同步非阻塞)

  • 烧了几次你发现这普通水壶用着太不爽了,于是去市场买了把带哨的,这次好使了,水开了壶会响。于是你屁颠屁颠的拿响水壶烧水,但第一次你也不知道这哨好不好使,就跟第一次一样,搬个小马扎坐在水壶旁边,但你也不干别的,就干等着水烧开。等哨响了,你把水壶拎下来,烧水结束。此谓:异步阻塞 (你没再惦记水壶,但这期间你没干别的事,相当于还是被烧水这个事情占据了时间,构成异步阻塞)

  • 烧完水你突然觉得你跟个二愣子似的,这玩意都带哨了我还等个毛线?于是以后再烧水你直接准备好就跑路玩手机了,也不回去看壶了,水烧开了,哨自然响,你自然就去把水壶拎下来。此谓:异步非阻塞 (你没再惦记水壶,而且烧水的过程也没再耽误你干别的事,构成异步非阻塞)

2.2 观察者模式

这个最基础的设计模式之一,小伙伴们已经都很熟悉吧。这里小册只举几个简单例子,帮助小伙伴们回忆一下吧:

  • 你要去面试,面试前准备出发时手机推送了天气预报今天有雨,于是你带了伞出门

    • 观察者(监听器):你 ; 主题(事件源):天气 ; 事件派发器:手机天气预报

  • 面试完成后,面试官会对你说:请留下你的联系方式,有消息我们会通知你的

    • 观察者(监听器):你 ; 主题(事件源&事件广播器):面试官

  • 面试后你出门叫车回家,由于外头还在下雨,你希望司机快一些来,于是给司机发消息让司机快点,司机告诉你快到的时候会给你打电话的

    • 观察者(监听器):你 ; 主题(事件源&事件广播器):司机

2.3 Vue等前端框架的双向绑定

在Vue、React、AngularJS中有一个很基础的概念:双向绑定。下面用一张动图来演示:

这就是响应式!

2.4 【思考】双向绑定中的玄机

仔细思考双向绑定,咱会产生一种感觉:

  1. 上面动图中,文本框上面的 span 部分应该是“观察着”下面的文本框内容,当文本框中内容发生改变时, span 部分就跟着变化。这不也是观察者模式吗? 好,这里引出第一个响应式的概念:变化传递。文本框的内容变化引起了 span 部分的变化。

  2. 再观察图,有注意到文本框中每发生一次变化,都会引起上面的 span 部分变化!如果把这一系列变化都列出来,会形成一组变化动作的事件记录。 好,这里再引出第二个响应式的概念:数据流。事件源的每一个变化连起来就是一个事件流。

  3. 这次不看图了,回想一下Vue是怎么做这种双向绑定的:

<div id="app">
    <p>{{ message }}</p>
    <input v-model="message">
</div>

除了这些以外没再写别的代码来绑定关系了! 由此可以引出第三个响应式的概念:声明式。我只告诉了你(声明)这个插值表达式里面的内容,以及一个 input 中的 model ,它就能帮我关联起来。

2.5 小总结

由上面的体会,咱来总结一下响应式的关键点:变化传递、数据流、声明式。这里面一直有一个很关键的核心围绕着响应式编程:事件。咱们也知道,事件是观察者模式的核心,咱又说响应式也是观察者模式,那自然响应式编程也要依赖事件。

3. 体会一些响应式的新概念

3.1 响应式流

前面咱总结出了数据流的概念,那响应式编程中的数据流自然就有些讲究,它与之前咱在 Java8 中熟悉的 Stream 有些不一样:普通的 Stream 还是同步阻塞的,对于高并发场景下还是不能解决问题,而响应式流可以做到异步非阻塞。另外,Stream 的一个特性是一旦有了消费型方法,它就会将这个流中的所有方法处理完毕,如果这期间的数据量很大,Stream 是无法对这些庞大数据量有一个妥善的处理,而响应式流可以对这些数据进行流量控制,保证数据的接收速度在处理能力之内。

所以总结下来,响应式流的关键点:异步非阻塞、数据流速控制

3.2 背压

异步非阻塞的概念咱在前面已经体会过了,下面通过一个模拟情景来体会数据流速控制的策略:背压

  • 你在一个知名手机生产大厂中,你的职位是生产流水线上的一名普通工人,你的工作是负责流水线上的一个关键部分,这部分需要的时间比较多,而恰好这段时间跟你一起干活的伙计都陪老婆生孩子去了,剩下你单枪匹马仍然战斗在一线。

  • 但是你的上游工人似乎并不知道跟你一起干活的伙计都陪老婆生孩子了,而且不知道咋回事他们天天跟打了鸡血似的干劲十足,搞得你这边压了好多上游同事给你的件儿,但你的这部分工序耗时长,积压这么多件你也hold不住,于是忍无可忍的你向你的上游同事发飙了:你们慢点,我处理不过来了!你的上游同事听到了你的怒吼,于是他们处理好的件儿就先不给你了,暂时放在他们自己那儿,等你告诉他你这边的处理差不多了,他们再塞件儿给你。

  • 后来过了一段时间,厂子的工头发现你的成绩非常好,于是阴差阳错你就不干流水线的工作了,转行当经销商来卖这个手机了。这你开心啊,于是经销商定期给你发一批一批的手机,你就拿来售卖。

  • 但很不幸,这批手机在售卖后的一段时间后传出电池爆炸的坏新闻,市面上买这款手机的人急剧下降,你作为经销商自然也就不想卖这款手机了,于是你会跟厂商反映:别给我供这款手机的货了。厂商也非常无奈,这边生产线还正在运作,而且也有产出的成品机,但经销商都不要了,于是只好将这部分成品都废弃掉

体会这个情景中的两个关键部分:下游向上游反馈,上游将数据暂时缓存/直接废弃


上面咱都是聊一些概念,下面咱用一些简单的Demo来体会真正的响应式编程应该是什么样子。

4. 快速体会Reactor

之所以选择 Reactor 作为响应式编程的框架支撑,是因为 WebFlux 的底层库就是 Reactor

咱直接导入 spring-boot-starter-webflux 依赖,Reactor 会一起引入进来。

咱上面也看到了,响应式编程是一种观察者模式,自然就有发布者(事件源)和接收者(监听器)。在 Reactor 中,发布者和接收者对应的接口分别是 PublisherSubscriber

下面先快速体会一个最简单的发布-订阅的实现:

public class QuickDemo {
    public static void main(String[] args) {
        Flux<Integer> flux = Flux.just(1, 2, 3);
        flux.subscribe(System.out::println);
    }
}

代码非常的简单,首先创建了一个 Publisher ,这里为了编码方便,我选用 Reactor 中的实现类 Flux 作为发布者,接收者的类型是 Consumer ,故可以传入Lambda表达式或者方法引用。这其中,发布者与订阅者建立订阅关系(消费关系)的时机是 Publishersubscribe 方法。一旦触发 subscribe 方法,接收者就可以向发布者拉取数据,等拉取到的数据处理完成后再继续拉取,直到发布者的数据全部处理完成,或者出现异常终止。

这里面简单介绍下这里面会涉及到的几个核心概念和组件:

4.1 Reactive中的核心概念

4.1.1 Publisher

Publisher 作为数据发布者,它只有一个方法:subcribe

public void subscribe(Subscriber<? super T> s);

它会接收一个 Subscriber ,构成“订阅”关系。看一眼它的文档注释:

Request Publisher to start streaming data. This is a "factory method" and can be called multiple times, each time starting a new Subscription. Each Subscription will work for only a single Subscriber. A Subscriber should only subscribe once to a single Publisher. If the Publisher rejects the subscription attempt or otherwise fails it will signal the error via Subscriber.onError.

请求发布者开始流式传输数据。 这是一种“工厂方法”,可以多次调用,每次启动一个新的订阅。 每个订阅仅适用于单个订阅者。 订阅者只能订阅一个发布者。 如果发布者拒绝订阅,或以其他方式失败,它将通过 Subscriber.onError 指示错误。

文档注释已经解释的比较清楚了,它可以产生多个订阅,一个订阅归属一个发布者和一个接收者。

4.1.2 Subscriber

Subscriber 作为数据接收者,它的接口方法有4个:

public void onSubscribe(Subscription s);

public void onNext(T t);

public void onError(Throwable t);

public void onComplete();

方法前面都带有 on ,代表它属于事件形式(联想 JavaScript 中的 onclick 等)。那上面的四个方法就可以分别解释:

  • onSubscribe:当触发订阅时

  • onNext:当接收到下一个数据时

  • onComplete:当发布者的数据都接受处理完成时

  • onError:当出现异常时

4.1.3 Subscription

Subscription 可以看做一个订阅“关系”,它归属于一个发布者和一个接收者(可以类似的理解为关系型数据库的多对多中间表的一条数据)。它有两个方法:

public void request(long n);

public void cancel();

很明显上面是请求/拉取数据,下面是放弃/停止拉取,它完成了数据接收者对发布者的交互,背压也是基于此来实现。

4.1.4 Processor

Processor 字面意思可以翻译为处理器,咱之前也看到过IOC容器中的好多后置处理器。这些处理器的特点都是有输入,有输出,那对应到Reactor的概念中,就应该是发布者和接收者的合体:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R>

在接口定义中它就是直接继承了 PublisherSubscriber 接口,它一般用于数据的中间处理。

4.2 Reactor中常用组件

4.2.1 Flux

Flux 可以简单理解为“非阻塞的Stream”,它实现了 Publisher 接口:

public abstract class Flux<T> implements Publisher<T>

那既然它是发布者,那它必然与 subscribe 方法。Flux 和下面的 Mono 重载的 subscribe 方法有很多:

这里面要注意的是,它在实现 Publisher 原生的 subscribe 方法时,还扩展了几个方法,以简化操作。比如上面咱在快速体会中的简单示例,它就是使用了上面图中的第二个重载的方法,只传入一个 Consumer (只处理正常情况下的数据接收)。

之所以称它可以理解为 Stream ,是因为它拥有 Stream 中的中间操作,并且更多更全面,举几个例子吧:

public class FluxDemo {
    public static void main(String[] args) throws Exception {
        Flux<Integer> flux = Flux.just(1, 2, 3);
        flux.map(num -> num * 5) // 将所有数据扩大5倍
            .filter(num -> num > 10) // 只过滤出数值中超过10的数
            .map(String::valueOf) // 将数据转为String类型
            .publishOn(Schedulers.elastic()) // 使用弹性线程池来处理数据
            .subscribe(System.out::println); // 消费数据
    }
}

更具体的方法的使用,小伙伴们可以参照API文档,或者网络资料进行了解和练习。

4.2.2 Mono

Mono 可以简单理解为“非阻塞的Optional”,它也实现了 Publisher 接口:

public abstract class Mono<T> implements Publisher<T>

它跟 Optional 类似,都是里面要么有一个对象,要么是空的。它的操作与 Flux 相似,不再赘述。

4.2.3 Scheduler

Scheduler 可以简单理解为“线程池”,从上面的示例中可以看到线程池需要由 Schedulers 工具类产生(Scheduler 不是 public 类型)。它有几种类型:

  • immediate:与主线程一致

  • single:只有一个线程的线程池(可类比 Executors.newSingleThreadExecutor()

  • elastic:弹性线程池,线程池中的线程数量原则上没有上限(底层创建线程池时指定了最大容量为 Integer.MAX_VALUE

  • parallel:并行线程池,线程池中的线程数量等于CPU处理器的数量(jdk中的 Runtime 类可以调用 availableProcessors 方法来获取CPU处理器数量)

5. 小结

  1. 响应式编程的三个关键点:变化传递、数据流、声明式

  2. 响应式编程不同于观察者模式的关键点在于:接收者(监听器)可以反向与发布者(事件源)交互。

  3. Reactor 是实现 Reactive 编程规范的框架,核心组件包括 FluxMonoScheduler

前一篇咱快速的了解了响应式编程,以及 Reactor 框架的基本使用,接下来咱要开始正式接触 WebFlux 了。

最后更新于