响应式编程系列(一)Java 响应式编程总结

历史

响应式编程(Reactive Programming)概念最早于上世纪九十年代被提出,微软为 .NET 生态开发了 Reactive Extensions (Rx) 库用于支持响应式编程,后来 Netflix 开发了 RxJava,为 JVM 生态实现了响应式编程。随着时间的推移,2015 年 Reactive Stream(响应式流)规范诞生,为 JVM 上的响应式编程定义了一组接口和交互规则。RxJava 从 RxJava 2 开始实现 Reactive Stream 规范。同时 MongoDB、Reactor、Slick 等也相继实现了 Reactive Stream 规范。

Spring Framework 5 推出了响应式 Web 框架

Java 9 引入了响应式编程的 API,将 Reactive Stream 规范定义的四个接口集成到了 java.util.concurrent.Flow 类中。Java 9 提供了 SubmissionPublisherConsumerSubscriber 两个默认实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--------------------------+     +-------------+     +------------------+     +-------------------------------+
| Reactive Extensions (Rx) | | RxJava 1.x | | Reactive Streams | | RxJava 2 |
| by Microsoft +-----> by Netflix +-----> Specification +-----> (Supporting Reactive Streams) |
| for .NET | | for Java 6+ | | for JVM | | for Java 6+ |
+--------------------------+ +-------------+ +------------------+ +-------------------------------+
|
|
|
+-----------------------+ +--------------------+ +---------------v---------------+
| Java 9 Standard | | Spring Framework 5 | | Project Reactor |
| (JEP-266 by Doug Lea) <-----+ Reactive Stack <-----+ (Supporting Reactive Streams) |
| | | | | for Java 8+ |
+-----------------------+ +--------------------+ +-------------------------------+

定义

响应式编程(Reactive Programing)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。

设计思想

响应式编程范式通常在面向对象语言中作为观察者模式的扩展出现。可以将其与大家熟知的迭代器模式作对比,主要区别在于:

迭代器(Iterator) 响应式流(Reactive Stream)
设计模式 迭代器模式 观察者模式
数据方向 拉模式(PULL) 推模式(PUSH)
获取数据 T next() onNext(T)
处理完成 hasNext() onCompleted()
异常处理 throws Exception onError(Exception)

Java 8 引入了 Stream 用于流的操作,Java 9 引入的 Flow 也是数据流的操作。相比之下:

  • Stream 更侧重于流的过滤、映射、整合、收集,使用的是 PULL 模式。
  • 而 Flow/RxJava/Reactor 更侧重于流的产生与消费,使用的是 PUSH 模式 。

迭代器模式

参考《Iterator API 总结

观察者模式

观察者模式是一种行为型设计模式,允许你定义一种订阅机制,可在对象事件发生时主动通知多个 “观察” 该对象的其它对象。

Observer

在响应式流中,上述操作由 Publisher-Subscriber 负责。由 Publisher 生产新值并推送给 Subscriber,这个“推送”就是响应式的关键,亦即“变化传递(propagation of change)”。另外,应用于被推送值的操作(Operator)是“声明式”而不是“命令式”的:开发者表达的是计算逻辑,而不是描述其具体的控制流程。

流程如下:

1
onNext x 0..N [onError | onComplete]

Reactive Streams 规范

依赖

Reactive Stream(响应式流)规范的 Maven 依赖如下:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
</dependency>

核心接口

整个依赖包中,仅仅定义了四个核心接口:

  • org.reactivestreams.Subscription 接口定义了连接发布者和订阅者的方法;
  • org.reactivestreams.Publisher<T> 接口定义了发布者的方法;
  • org.reactivestreams.Subscriber<T> 接口定义了订阅者的方法;
  • org.reactivestreams.Processor<T,R> 接口定义了处理器;

org.reactivestreams

接口交互流程

简要交互如下:

process_ofreactive_stream

API 交互如下:

process_of_reactive_stream_2

参考

《Reactive Java Programming》

Reactive Streams 规范

ReactiveX 系列

Reactor 框架

Spring

  • https://spring.io/reactive
  • Web on Reactive Stack - Spring
    • 基于 Reactor 框架实现
    • 默认基于 Netty 作为应用服务器
    • 好处:能够以固定的线程来处理高并发(充分发挥机器的性能)
    • 提供 API:
      • Spring WebFlux
      • WebClient
      • WebSockets
      • Testing
      • RSocket
      • Reactive Libraries

http://openjdk.java.net/jeps/266