首页 > 编程笔记

Reactive(响应式)编程入门教程

本文首先介绍响应式编程范式,以及基于该范式的 Reactive Streams 标准 API,最后给出基于 Reactive Streams 实现的 JDK 9 的 Flow 代码示例。

响应式宣言

说到响应式编程,就不能不提 The Reactive Manifesto(响应式宣言)。几年前,一个大型应用系统可能会部署在几百台服务器上,响应时间为秒级,每天产生 GB 级的数据。

随着移动设备的普及,应用程序需要部署在数以千计或万计的云端集群上,用户对响应时间的需求也提高到了毫秒级,每天产生的数据也达到了 PB 级,这对当今的系统架构提出了新的挑战。基于此,一些组织开发出了响应式系统。响应式系统具有 4 个特性,如图1所示。

图1  响应式系统的特性
图1 响应式系统的特性

以上 4 个特性就组成了响应式宣言,为响应式编程指明了方向。响应式系统就是以事件驱动,打造可伸缩、可恢复、能实时响应的应用程序。

Reactive编程简介

关于响应式编程,百度百科中是这样解释的:

在计算机领域,响应式编程是一个专注于数据流和变化传递的异步编程范式。这意味着可以使用编程语言很容易地表示静态(如数组)或动态(如事件发射器)数据流,在执行过程中数据流之间有一定的关系,关系的存在有利于数据流的自动变更。

上面的解释是不是不太好理解?我们具体分析一下。

注意:Reactive Programming = Streams + Operations。其中,Streams 代表被处理的数据节点,Operations 代表那些异步处理函数。

Reactive Streams标准

既然有了编程规范,就需要定义一套 API 协议标准。2013 年,Netflix、Pivotal 和 Lightbend 的工程师们启动了 Reactive Streams 项目。Reactive Stream(响应式流)是一套标准,是一套基于发布/订阅模式的数据流处理规范。对于开发人员来说,它其实就是一个 API 规范,具有异步非阻塞背压特性,异步非阻塞可以在同等资源下给出更快的响应。

举个直观的例子可以帮助读者更好地理解响应式数据流。

现代前端开发框架如 Vue.js 和 React 等实现了双向数据绑定,在一个输入框内修改数据,可以同步在另一个组件中展示。也就是一个组件的数据值可以基于另一个组件的数据变化做出响应,这就是响应式。

在传统的命令式编程中,假设定义 c = a*b,那么当 a=1、b=2 时,c 的值就是 2。之后 a 变量的改变不会引起 c 变量的变化,因为它们都是确定的。如果 a、b 的值是不确定的,即 c=a*b,这个语句仅仅是定义了变量 c 与变量 a、b 的计算关系,那么 c 的值就是可变的。例如:

a=1,b=1,c=1
a=2,b=2,c=4
a=3,b=2,c=6
...

简而言之,c 需要动态地由 a、b 共同来决定,当 a、b 的值发生变化时,c 的结果需要及时地做出响应(或者叫反应),以此来保证正确性。变化的 a、b 相当于数据流,c 要根据数据流的变化做出正确的响应,这就是 Reactive Streams(响应式流)。

Java Flow API简介

基于 Reactive Streams 实现的响应式框架有 RxJava、Reactor、Akka 和 Vert.x 等。2017 年,Java JDK 9 发布,其中一个特性就是引入了基于 Reactive Streams 的 Flow 类。

Flow API 是基于发布者/订阅者模式提供的推(push)和拉(pull)的模型,如图 2 所示。

图2  发布者/订阅者模型
图2 发布者/订阅者模型

基于发布/订阅模型的 Flow 更像是迭代器模式与观察者模式的组合。迭代器模式是拉(pull)模型,告诉数据源要拉取多少数据,观察者模式是推(push)模型,将数据推送给订阅者。Flow 订阅者最初请求(拉)N 个数据,然后发布者将最多 N 个数据推送给订阅者。

Flow 类中定义了 4 个嵌套的静态接口,如表 1 所示。
表1 Flow中定义的4个静态接口
静态接口 说  明
Flow.Publisher<T> 数据项发布者和生产者
Subscriber<T> 数据项订阅者和消费者
Subscription 发布者与订阅者之间的关系,完成消息控制
Processor 数据处理器

下面介绍 Flow 的相关 API,并给出一些实际的例子。

1. 基于Publisher与Subscriber的示例

Flow.Subscriber 有 4 个抽象方法:

Sbscription 的 request() 和 cancel() 方法提供的背压特性,让订阅者可以告诉发布者能接收的最大数据量,还可以取消订阅,这样不至于因发布者速度过快而导致订阅系统崩溃。示例如下:
public class PublisherAndSubscriberDemo {
public static void main(String[] args) throws InterruptedException {
    //发布者
        SubmissionPublisher<String> publisher=new SubmissionPublisher<>();
        //订阅者
        Flow.Subscriber<String> subscriber=new Flow.Subscriber<String>() {
            private Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription=subscription;
                subscription.request(1);
            }
            //传递数据
            @Override
            public void onNext(String item) {
                System.out.println("【订阅者】接收消息: " + item);
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                this.subscription.request(1);
            }
            //异常处理
            @Override
            public void onError(Throwable throwable) {
                System.out.println("【订阅者】数据接收出现异常," + throwable);
                this.subscription.cancel();
            }
            //发送结束处理
            @Override
            public void onComplete() {
                System.out.println("【订阅者】数据接收完毕");
            }
        };
        publisher.subscribe(subscriber);
        for (int i=0;i<5;i++){
            String message = "hello flow api " + i;
            System.out.println("【发布者】发布消息: " + message);
            publisher.submit(message);
        }
        publisher.close();
        Thread.currentThread().join(20000);
    }
}

控制台的打印结果如下:

【发布者】发布消息: hello flow api 0
【发布者】发布消息: hello flow api 1
【发布者】发布消息: hello flow api 2
【发布者】发布消息: hello flow api 3
【发布者】发布消息: hello flow api 4
【订阅者】接收消息: hello flow api 0
【订阅者】接收消息: hello flow api 1
【订阅者】接收消息: hello flow api 2
【订阅者】接收消息: hello flow api 3
【订阅者】接收消息: hello flow api 4
【订阅者】数据接收完毕

2. Processor示例

Processor 扩展了 Publisher 和 Subscriber,因此它可以在 Publisher 和 Subscriber 之间来回切换。Processor 的示例如下:
public class MyProcessor extends SubmissionPublisher<Integer>
implements Flow.Processor<Integer, Integer>{
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Processor 收到订阅请求");
        this.subscription = subscription;
        this.subscription.request(1);
    }
    //传递数据
    @Override
    public void onNext(Integer item) {
        System.out.println("onNext 收到发布者数据: "+item);
        if (item % 2 == 0) {
            this.submit(item);
        }
        this.subscription.request(1);
}
//处理异常
    @Override
    public void onError(Throwable throwable) {
        this.subscription.cancel();
}
//结束处理
    @Override
    public void onComplete() {
        System.out.println("处理器处理完毕");
        this.close();
    }
}
ProcessorDemo代码如下:
public class ProcessorDemo {
    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        MyProcessor myProcessor = new MyProcessor();
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(1);
            }
            //数据处理
            @Override
            public void onNext(Integer item) {
                System.out.println("onNext 从Processor 接收到过滤后的数据 item : "+item);
                this.subscription.request(1);
            }
            //处理异常
            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError 出现异常");
                subscription.cancel();
            }
            //结束处理
            @Override
            public void onComplete() {
                System.out.println("onComplete 所有数据接收完成");
            }
        };
        publisher.subscribe(myProcessor); //发布
        myProcessor.subscribe(subscriber); //订阅
        publisher.submit(1);
        publisher.submit(2);
        publisher.submit(3);
        publisher.submit(4);
        publisher.close();
        TimeUnit.SECONDS.sleep(2);
    }
}
最终打印结果如下:

Processor 收到订阅请求
        onNext 收到发布者数据: 1
        onNext 收到发布者数据: 2
        onNext 收到发布者数据: 3
        onNext 收到发布者数据: 4
        处理器处理完毕
        onNext 从Processor 接收到过滤后的数据 item : 2
        onNext 从Processor 接收到过滤后的数据 item : 4
        onComplete 所有数据接收完成

优秀文章