Reactive(响应式)编程入门教程
响应式宣言
说到响应式编程,就不能不提 The Reactive Manifesto(响应式宣言)。几年前,一个大型应用系统可能会部署在几百台服务器上,响应时间为秒级,每天产生 GB 级的数据。随着移动设备的普及,应用程序需要部署在数以千计或万计的云端集群上,用户对响应时间的需求也提高到了毫秒级,每天产生的数据也达到了 PB 级,这对当今的系统架构提出了新的挑战。基于此,一些组织开发出了响应式系统。响应式系统具有 4 个特性,如图1所示。
图1 响应式系统的特性
- 可响应:系统尽可能地响应。
- 可恢复:系统出错的情况下也可以响应。
- 可伸缩:系统在各种负载下都可以响应。
- 消息驱动:系统通过异步传递消息。
以上 4 个特性就组成了响应式宣言,为响应式编程指明了方向。响应式系统就是以事件驱动,打造可伸缩、可恢复、能实时响应的应用程序。
Reactive编程简介
关于响应式编程,百度百科中是这样解释的:在计算机领域,响应式编程是一个专注于数据流和变化传递的异步编程范式。这意味着可以使用编程语言很容易地表示静态(如数组)或动态(如事件发射器)数据流,在执行过程中数据流之间有一定的关系,关系的存在有利于数据流的自动变更。
上面的解释是不是不太好理解?我们具体分析一下。
- 首先,响应式编程是一个编程范式,是一种编程规范,和我们平时开发中的声明式编程、命令式编程、函数式编程一样。
- 其次,从过去的面向过程开发到Java提出的面向对象开发,响应式编程代表未来的发展方向——面向流开发。
- 因此我们总结出响应式编程的定义是:一种面向数据流的响应式编码方式。
注意:Reactive Programming = Streams + Operations。其中,Streams 代表被处理的数据节点,Operations 代表那些异步处理函数。
Reactive Streams标准
既然有了编程规范,就需要定义一套 API 协议标准。2013 年,Netflix、Pivotal 和 Lightbend 的工程师们启动了 Reactive Streams 项目。Reactive Stream(响应式流)是一套标准,是一套基于发布/订阅模式的数据流处理规范。对于开发人员来说,它其实就是一个 API 规范,具有异步非阻塞背压特性,异步非阻塞可以在同等资源下给出更快的响应。举个直观的例子可以帮助读者更好地理解响应式数据流。
现代前端开发框架如 Vue.js 和 React 等实现了双向数据绑定,在一个输入框内修改数据,可以同步在另一个组件中展示。也就是一个组件的数据值可以基于另一个组件的数据变化做出响应,这就是响应式。
在传统的命令式编程中,假设定义 c = a*b,那么当 a=1、b=2 时,c 的值就是 2。之后 a 变量的改变不会引起 c 变量的变化,因为它们都是确定的。如果 a、b 的值是不确定的,即 c=a*b,这个语句仅仅是定义了变量 c 与变量 a、b 的计算关系,那么 c 的值就是可变的。例如:
a=1,b=1,c=1
a=2,b=2,c=4
a=3,b=2,c=6
...
Java Flow API简介
基于 Reactive Streams 实现的响应式框架有 RxJava、Reactor、Akka 和 Vert.x 等。2017 年,Java JDK 9 发布,其中一个特性就是引入了基于 Reactive Streams 的 Flow 类。Flow API 是基于发布者/订阅者模式提供的推(push)和拉(pull)的模型,如图 2 所示。
图2 发布者/订阅者模型
基于发布/订阅模型的 Flow 更像是迭代器模式与观察者模式的组合。迭代器模式是拉(pull)模型,告诉数据源要拉取多少数据,观察者模式是推(push)模型,将数据推送给订阅者。Flow 订阅者最初请求(拉)N 个数据,然后发布者将最多 N 个数据推送给订阅者。
Flow 类中定义了 4 个嵌套的静态接口,如表 1 所示。
静态接口 | 说 明 |
---|---|
Flow.Publisher<T> | 数据项发布者和生产者 |
Subscriber<T> | 数据项订阅者和消费者 |
Subscription | 发布者与订阅者之间的关系,完成消息控制 |
Processor | 数据处理器 |
下面介绍 Flow 的相关 API,并给出一些实际的例子。
1. 基于Publisher与Subscriber的示例
Flow.Subscriber 有 4 个抽象方法:- onSubscribe():发布者调用该方法异步传递订阅。
- onNext():发布者调用该方法传递数据。
- onError():发生错误时调用。
- onComplete():数据发送完成后调用。
Sbscription 的 request() 和 cancel() 方法提供的背压特性,让订阅者可以告诉发布者能接收的最大数据量,还可以取消订阅,这样不至于因发布者速度过快而导致订阅系统崩溃。示例如下:
public class PublisherAndSubscriberDemo { public static void main(String[] args) throws InterruptedException { //发布者 SubmissionPublisher<String> publisher=new SubmissionPublisher<>(); //订阅者 Flow.Subscriber<String> subscriber=new Flow.Subscriber<String>() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription=subscription; subscription.request(1); } //传递数据 @Override public void onNext(String item) { System.out.println("【订阅者】接收消息: " + item); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } this.subscription.request(1); } //异常处理 @Override public void onError(Throwable throwable) { System.out.println("【订阅者】数据接收出现异常," + throwable); this.subscription.cancel(); } //发送结束处理 @Override public void onComplete() { System.out.println("【订阅者】数据接收完毕"); } }; publisher.subscribe(subscriber); for (int i=0;i<5;i++){ String message = "hello flow api " + i; System.out.println("【发布者】发布消息: " + message); publisher.submit(message); } publisher.close(); Thread.currentThread().join(20000); } }控制台的打印结果如下:
【发布者】发布消息: hello flow api 0
【发布者】发布消息: hello flow api 1
【发布者】发布消息: hello flow api 2
【发布者】发布消息: hello flow api 3
【发布者】发布消息: hello flow api 4
【订阅者】接收消息: hello flow api 0
【订阅者】接收消息: hello flow api 1
【订阅者】接收消息: hello flow api 2
【订阅者】接收消息: hello flow api 3
【订阅者】接收消息: hello flow api 4
【订阅者】数据接收完毕
2. Processor示例
Processor 扩展了 Publisher 和 Subscriber,因此它可以在 Publisher 和 Subscriber 之间来回切换。Processor 的示例如下:public class MyProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Integer, Integer>{ private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("Processor 收到订阅请求"); this.subscription = subscription; this.subscription.request(1); } //传递数据 @Override public void onNext(Integer item) { System.out.println("onNext 收到发布者数据: "+item); if (item % 2 == 0) { this.submit(item); } this.subscription.request(1); } //处理异常 @Override public void onError(Throwable throwable) { this.subscription.cancel(); } //结束处理 @Override public void onComplete() { System.out.println("处理器处理完毕"); this.close(); } }ProcessorDemo代码如下:
public class ProcessorDemo { public static void main(String[] args) throws InterruptedException { SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); MyProcessor myProcessor = new MyProcessor(); Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } //数据处理 @Override public void onNext(Integer item) { System.out.println("onNext 从Processor 接收到过滤后的数据 item : "+item); this.subscription.request(1); } //处理异常 @Override public void onError(Throwable throwable) { System.out.println("onError 出现异常"); subscription.cancel(); } //结束处理 @Override public void onComplete() { System.out.println("onComplete 所有数据接收完成"); } }; publisher.subscribe(myProcessor); //发布 myProcessor.subscribe(subscriber); //订阅 publisher.submit(1); publisher.submit(2); publisher.submit(3); publisher.submit(4); publisher.close(); TimeUnit.SECONDS.sleep(2); } }最终打印结果如下:
Processor 收到订阅请求
onNext 收到发布者数据: 1
onNext 收到发布者数据: 2
onNext 收到发布者数据: 3
onNext 收到发布者数据: 4
处理器处理完毕
onNext 从Processor 接收到过滤后的数据 item : 2
onNext 从Processor 接收到过滤后的数据 item : 4
onComplete 所有数据接收完成