首页 > 编程笔记

Spring WebFlux框架简介与使用

Spring 5 于 2017 年 9 月发布了通用版本(GA)。从 Spring 5 开始,提供了全新的 Web 开发框架:以 Reactor 为基础的 WebFlux 框架。这预示着 Spring 开始全面拥抱 Reactive Programming。在此之前,Spring Web 开发主要是以 Spring MVC 为主,现在 Spring WebFlux 已经与 Spring MVC 具有同等的地位。

Spring WebFlux简介

Spring MVC 是为 Servlet API 和 Servlet 容器专门构建的。而 Spring WebFlux 是异步非阻塞的,支持在 Netty、Undertow 和 Servlet 3.1+ 容器之类的服务器上运行。Spring Boot 2 默认 WebFlux 是基于 Netty 实现的。

如图1 所示,Spring WebFlux 与 Spring MVC 的 Web 注解是一致的,这样便减少了从 Spring MVC 迁移到 Spring WebFlux 的成本。WebFlux 框架开发的接口返回类型必须是 Mono<T> 或者是 Flux<T>

图1 发布者/订阅者模型
图1 发布者/订阅者模型

Flux 和 Mono 是 Reactor 框架中的两个基本概念,它们都实现了 org.reactivestreams.Publisher 接口,也就是说 Mono 与 Flux 都是发布者。Mono 代表 0~1 个元素的发布者,Flux 代表 0~N 个元素的发布者。

Mono类

上面提到 Mono 实现了 Publisher 接口,因此它也是一个发布者,只是发布 0~1个 元素。Mono 类中典型的创建数据流的方法如表 1 所示。
表1 Mono类创建数据流的方法
静态方法 说  明
Mono.just() 指定序列对象
Mono.empty() 创建一个不包含任何元素而只发布结束消息的序列
Mono.never() 创建一个不包含任何消息的序列
Mono.fromCallable() 用Callable创建
Mono.fromSupplier() 用Supplier创建
Mono.fromFuture() 用CompletableFuture创建
Mono.create() 用MonoSink创建

下面是 Mono 创建数据流的几个示例:
//just()方法创建
Mono.just("hello mono").subscribe(System.out::println);
// justOrEmpty()方法创建
Mono.justOrEmpty(Optional.of("hello mono")).subscribe(System.out::println);
Mono.justOrEmpty("hello mono").subscribe(System.out::println);
Mono.empty().subscribe(System.out::println);
Mono.error(new Throwable("error")).subscribe();
Mono.never().subscribe();
//fromCallable()方法创建
Mono.fromCallable(() -> "hello mono").subscribe(System.out::println);
//fromSupplier()方法创建
Mono.fromSupplier(() -> "hello mono").subscribe(System.out::println);
//fromFuture()方法创建
Mono.fromFuture(CompletableFuture.completedFuture("hello mno")).subscribe(System.out::println);
//create()方法创建
Mono.create(sink->{
    sink.success("hello mono");
}).subscribe(System.out::println);
创建数据流之后,还可以在响应式流上通过声明的方式添加多种不同的操作函数。典型的操作函数如表 2 所示。

表2 Mono类的操作函数
操作函数 说  明
filter() 对流中包含的元素进行过滤
zipWith() 把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并
mergeWith() 合并数据流
flatMap() 把流中的每个元素转换成一个流

下面是 Mono 数据流操作函数的例子:
//filter()方法的使用
Mono.just(10).filter(i -> i%2 == 0).subscribe(System.out::println);
//zipWith()方法的使用
Mono.just("hello").zipWith(Mono.just("Mono")).subscribe(System.out::println);
//mergeWith()方法的使用
Mono.just("hello").mergeWith(Mono.just("Mono")).subscribe(System.out::println);
//flatMap()方法的使用
Mono.just("hello").flatMap(x -> Mono.just(x + " flux")).subscribe(System.out::println);

Flux类

Flux 与 Mono 的不同之处在于,作为发布者,Flux 可以发布 0~N 个元素。Flux 类创建数据流的方法如表 3 所示。

表3  Flux创建数据流的方法
静态方法 说  明
Flux.just() 从指定序列中创建
Flux.fromArray() 从数组中创建
Flux.range() 创建从start开始的count个数量的Integer对象的序列
Flux.create() 使用FluxSink创建
Flux.generate() 同步创建

下面是创建 Flux 的几个例子:
// just()方法创建
Flux.just("hello","flux").subscribe(System.out::println);
// fromArray()方法创建
Flux.fromArray(new String[] {"hello","flux"}).subscribe(System.out::println);
//range()方法创建
Flux.range(1, 5).subscribe(System.out::println);
// create()方法创建
Flux.create(sink -> {
      for (int i = 0; i < 10; i++) {
           sink.next(i);
      }
      sink.complete();
}).subscribe(System.out::println);
//generate()方法创建
Flux.generate(sink -> {
       sink.next("hello");
       sink.complete();
}).subscribe(System.out::println);
Flux 在响应式流上的操作函数与 Mono 类似,如表 4 所示。

表4 Flux操作函数
操作函数 说  明
filter() 对流中包含的元素进行过滤
zipWith() 把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并
reduce() 对流中包含的所有元素进行累积操作
merge() 把多个流合并成一个Flux序列
flatMap() 把流中的每个元素转换成一个流

下面是 Flux 操作函数的示例:
//使用filter()方法
Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);
//使用zipWith()方法
Flux.just("hello").zipWith(Flux.just("flux")).subscribe(System.out::
println);
// 使用reduce()方法
Flux.range(1, 10).reduce((x,y) -> x + y).subscribe(System.out::println);
// 使用merge()方法
Flux.merge(Flux.just("hello"),Flux.just("flux")).subscribe(System.
out::println);
// 使用flatMap()方法
Flux.just("hello").flatMap(x -> Flux.just(x + " flux")).subscribe
(System.out::println);

Spring WebFlux示例

学习了 Mono 与 Flux 类之后,基本上就能了解 WebFlux 开发的原理。相对于 Spring MVC 来说,WebFlux 接口返回的只能是 Mono 或Flux 类。下面给出两个接口的参考示例:
@RestController
@RequestMapping("/webFlux")
public class WebFluxApi {
    @GetMapping("/mono")
    public Mono<JSONObject> mono(){
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", "0");
        jsonObject.put("message", "hello mono!");
        return Mono.just(jsonObject); //创建Mono对象
    }
    @GetMapping("/flux")
    public Flux<JSONObject> flux(){
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", "0");
        jsonObject.put("message", "hello flux!");
        return Flux.just(jsonObject); //创建Mono对象
    }
}
在本地启动应用之后,分别访问http://localhost/webFlux/mono 接口,返回结果如下:

{
        code: "0",
        message: "hello mono!"
}

访问 http://localhost/webFlux/flux 接口,返回结果如下:

[
        {
                code: "0",
                message: "hello flux!"
        }
]

优秀文章