响应式编程系列(一)Java 响应式编程总结
历史
响应式编程(Reactive Programming)概念最早于上世纪九十年代被提出,微软为 .NET 生态开发了 Reactive Extensions (Rx) 库用于支持响应式编程,后来 Netflix 开发了 RxJava,为 JVM 生态实现了响应式编程。随着时间的推移,2015 年 Reactive Stream(响应式流)规范诞生,为 JVM 上的响应式编程定义了一组接口和交互规则。RxJava 从 RxJava 2 开始实现 Reactive Stream 规范。同时 MongoDB、Reactor、Slick 等也相继实现了 Reactive Stream 规范。
Spring Framework 5 推出了响应式 Web 框架。
Java 9 引入了响应式编程的 API,将 Reactive Stream 规范定义的四个接口集成到了 java.util.concurrent.Flow
类中。Java 9 提供了 SubmissionPublisher
和 ConsumerSubscriber
两个默认实现。
1 | +--------------------------+ +-------------+ +------------------+ +-------------------------------+ |
定义
响应式编程(Reactive Programing)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。
设计思想
响应式编程范式通常在面向对象语言中作为观察者模式的扩展出现。可以将其与大家熟知的迭代器模式作对比,主要区别在于:
迭代器(Iterator) | 响应式流(Reactive Stream) | |
---|---|---|
设计模式 | 迭代器模式 | 观察者模式 |
数据方向 | 拉模式(PULL) | 推模式(PUSH) |
获取数据 | T next() |
onNext(T) |
处理完成 | hasNext() |
onCompleted() |
异常处理 | throws Exception |
onError(Exception) |
Java 8 引入了 Stream 用于流的操作,Java 9 引入的 Flow 也是数据流的操作。相比之下:
- Stream 更侧重于流的过滤、映射、整合、收集,使用的是 PULL 模式。
- 而 Flow/RxJava/Reactor 更侧重于流的产生与消费,使用的是 PUSH 模式 。
迭代器模式
参考《Iterator API 总结》
观察者模式
观察者模式是一种行为型设计模式,允许你定义一种订阅机制,可在对象事件发生时主动通知多个 “观察” 该对象的其它对象。
在响应式流中,上述操作由 Publisher-Subscriber
负责。由 Publisher
生产新值并推送给 Subscriber
,这个“推送”就是响应式的关键,亦即“变化传递(propagation of change)”。另外,应用于被推送值的操作(Operator)是“声明式”而不是“命令式”的:开发者表达的是计算逻辑,而不是描述其具体的控制流程。
流程如下:
1 | onNext x 0..N [onError | onComplete] |
Reactive Streams 规范
依赖
Reactive Stream(响应式流)规范的 Maven 依赖如下:
1 | <!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams --> |
核心接口
整个依赖包中,仅仅定义了四个核心接口:
org.reactivestreams.Subscription
接口定义了连接发布者和订阅者的方法;org.reactivestreams.Publisher<T>
接口定义了发布者的方法;org.reactivestreams.Subscriber<T>
接口定义了订阅者的方法;org.reactivestreams.Processor<T,R>
接口定义了处理器;
接口交互流程
简要交互如下:
API 交互如下:
参考
《Reactive Java Programming》
Reactive Streams 规范
- https://www.reactive-streams.org/
- https://en.wikipedia.org/wiki/Reactive_Streams
- https://zhuanlan.zhihu.com/p/41342507
ReactiveX 系列
- http://reactivex.io/
- https://github.com/ReactiveX
- RxJava、RxKotlin、RxSwift、RxAndroid、RxJS、RxPY、RxGo、…
- 《Reactive Programming with RxJava》
- 《如何选择操作符? - RxSwift》
- RxJava、RxKotlin、RxSwift、RxAndroid、RxJS、RxPY、RxGo、…
Reactor 框架
- https://projectreactor.io/
- 实现 Reactive Streams 规范,并扩展大量特性
Spring
- https://spring.io/reactive
- Web on Reactive Stack - Spring
- 基于 Reactor 框架实现
- 默认基于 Netty 作为应用服务器
- 好处:能够以固定的线程来处理高并发(充分发挥机器的性能)
- 提供 API:
- Spring WebFlux
- WebClient
- WebSockets
- Testing
- RSocket
- Reactive Libraries