响应式编程系列(三)Reactor 操作符总结
Reactive Streams 规范并未提供任何操作符(Operators),而 Reactor 框架的核心价值之一就是提供了丰富的操作符。从简单的转换、过滤到复杂的编排和错误处理,涉及方方面面。
推荐通过参考文档而不是 JavaDoc 来学习 Mono/Flux API 和 Operator 操作符。参考:Appendix A: Which operator do I need?
注意点
使用 Reactor API 时,有以下几个注意点:
一、每个 Operator API 都会返回新实例
在 Reactor 中,操作符(Operators)好比流水线中的工作站。 每个操作符都会向
Publisher
添加行为,并将上一步的Publisher
包装成新实例。因而,整个链就被串起来,数据从第一个Publisher
开始,沿着链向下移动,通过每个链接进行转换。最终,Subscriber
结束该流程。 注意,直到Subscriber
订阅Publisher
为止,什么都不会发生。理解操作符会创建新实例这一行为,有助于避免一些常见错误,详见:I Used an Operator on my Flux but it Doesn’t Seem to Apply. What Gives?
二、Nothing Happens Until You subscribe()
Reactor 中,当您编写
Publisher
链时,仅用于描述异步处理的抽象过程,默认情况下数据不会开始处理。只有通过订阅,将Publisher
与Subscriber
绑定在一起时,才能触发整个链中的数据流处理。这是由其内部实现方式决定的:Subscriber
发出请求信号并向上传播,直到源Publisher
。
详见:C.2. I Used an Operator on my Flux
but it Doesn’t Seem to Apply. What Gives?
三、使用背压(Backpressure)进行流量控制
Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.
The real mechanism defined by the Reactive Streams specification is pretty close to the analogy: A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the
request
mechanism to signal the source that it is ready to process at mostn
elements.Intermediate operators can also change the request in-transit. Imagine a
buffer
operator that groups elements in batches of ten. If the subscriber requests one buffer, it is acceptable for the source to produce ten elements. Some operators also implement prefetching strategies, which avoidrequest(1)
round-trips and is beneficial if producing the elements before they are requested is not too costly.This transforms the push model into a push-pull hybrid, where the downstream can pull n elements from upstream if they are readily available. But if the elements are not ready, they get pushed by the upstream whenever they are produced.
我该使用哪个操作符?
Creating a New Sequence…
Mono
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
for [0|1] elements
创建 Mono 流:
Flux
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
for [N] elements
普通创建:
遍历创建:
定时创建:
合并创建:
编程式创建:
其它:
中间操作
Each operator adds behavior to a
Publisher
and wraps the previous step’sPublisher
into a new instance. The whole chain is thus linked, such that data originates from the firstPublisher
and moves down the chain, transformed by each link. Eventually, aSubscriber
finishes the process. Remember that nothing happens until aSubscriber
subscribes to aPublisher
.While the Reactive Streams specification does not specify operators at all, one of the best added values of reactive libraries, such as Reactor, is the rich vocabulary of operators that they provide. These cover a lot of ground, from simple transformation and filtering to complex orchestration and error handling.
Transforming an Existing Sequence
常用的如下:
I want to transform existing data:
on a 1-to-1 basis (eg. strings to their length):
map
(Flux|Mono)- …by just casting it:
cast
(Flux|Mono) - …in order to materialize each source value’s index: Flux#index
- …by just casting it:
on a 1-to-n basis (eg. strings to their characters):
flatMap
(Flux|Mono) + use a factory methodrunning an asynchronous task for each source item (eg. urls to http request):
flatMap
(Flux|Mono) + an async Publisher-returning method…ignoring some data: conditionally return a Mono.empty() in the flatMap lambda
…retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)
…where the async task can return multiple values, from a Mono source: Mono#flatMapMany
1
2
3// Mono 转 Flux
// Create a Flux that emits the items contained in the provided Iterable. A new iterator will be created for each subscriber.
Mono#flatMapMany(Flux::fromIterable)
I want to add pre-set elements to an existing sequence:
- at the start: Flux#startWith(T…)
- at the end: Flux#concatWithValues(T…)
I want to aggregate a Flux: (the
Flux#
prefix is assumed below)into a List: collectList, collectSortedList
into a Map: collectMap, collectMultiMap
into an arbitrary container: collect
into the size of the sequence: count (
Mono.empty()
不计入count
)by applying a function between each element (eg. running sum): reduce
1
2
3
4Flux.range(0, 5)
.reduce(Integer::sum) // 两两相加
.map(Objects::toString)
.subscribe(log::info);- …but emitting each intermediary value: scan
into a boolean value from a predicate:
- applied to all values (AND): all
- applied to at least one value (OR): any
- testing the presence of any value: hasElements (there is a Mono equivalent in hasElement)
- testing the presence of a specific value: hasElement(T)
I want to combine publishers…
- in sequential order: Flux#concat or
.concatWith(other)
(Flux|Mono)- …but delaying any error until remaining publishers have been emitted: Flux#concatDelayError
- …but eagerly subscribing to subsequent publishers: Flux#mergeSequential
- in emission order (combined items emitted as they come): Flux#merge /
.mergeWith(other)
(Flux|Mono)- …with different types (transforming merge): Flux#zip / Flux#zipWith
- by pairing values:
- from 2 Monos into a Tuple2: Mono#zipWith
- from n Monos when they all completed: Mono#zip
- by coordinating their termination:
- from 1 Mono and any source into a Mono: Mono#and
- from n sources when they all completed: Mono#when
- into an arbitrary container type:
- each time all sides have emitted: Flux#zip (up to the smallest cardinality)
- each time a new value arrives at either side: Flux#combineLatest
- selecting the first publisher which…
- triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher)
- triggered by the start of the next publisher in a sequence of publishers: switchOnNext
- in sequential order: Flux#concat or
I want to repeat an existing sequence:
repeat
(Flux|Mono)- …
I have an empty sequence but…
I have a sequence but I am not interested in values:
ignoreElements
(Flux.ignoreElements()|Mono.ignoreElement())- …and I want the completion represented as a Mono:
then
(Flux|Mono) - …and I want to wait for another task to complete at the end:
thenEmpty
(Flux|Mono) - …and I want to switch to another Mono at the end: Mono#then(mono)
- …and I want to emit a single value at the end: Mono#thenReturn(T)
- …and I want to switch to a Flux at the end:
thenMany
(Flux|Mono)
- …and I want the completion represented as a Mono:
…
Filtering a Sequence
- I want to filter a sequence:
- based on an arbitrary criteria:
filter
(Flux|Mono) - restricting on the type of the emitted objects:
ofType
(Flux|Mono) - by ignoring the values altogether:
ignoreElements
(Flux.ignoreElements()|Mono.ignoreElement()) - by ignoring duplicates:
- in the whole sequence (logical set): Flux#distinct
- between subsequently emitted items (deduplication): Flux#distinctUntilChanged
- based on an arbitrary criteria:
- I want to keep only a subset of the sequence:
- by taking N elements:
- at the beginning of the sequence: Flux#take(long, true)
- …requesting an unbounded amount from upstream: Flux#take(long, false)
- …based on a duration: Flux#take(Duration)
- …only the first element, as a Mono: Flux#next()
- …using request(N) rather than cancellation: Flux#limitRequest(long)
- at the end of the sequence: Flux#takeLast
- until a criteria is met (inclusive): Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based)
- while a criteria is met (exclusive): Flux#takeWhile
- at the beginning of the sequence: Flux#take(long, true)
- by taking at most 1 element:
- at a specific position: Flux#elementAt
- at the end: .takeLast(1)
- …and emit an error if empty: Flux#last()
- …and emit a default value if empty: Flux#last(T)
- by skipping elements:
- at the beginning of the sequence: Flux#skip(long)
- …based on a duration: Flux#skip(Duration)
- at the end of the sequence: Flux#skipLast
- until a criteria is met (inclusive): Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based)
- while a criteria is met (exclusive): Flux#skipWhile
- at the beginning of the sequence: Flux#skip(long)
- by sampling items:
- by duration: Flux#sample(Duration)
- but keeping the first element in the sampling window instead of the last: sampleFirst
- by a publisher-based window: Flux#sample(Publisher)
- based on a publisher “timing out”: Flux#sampleTimeout (each element triggers a publisher, and is emitted if that publisher does not overlap with the next)
- by duration: Flux#sample(Duration)
- by taking N elements:
- I expect at most 1 element (error if more than one)…
- and I want an error if the sequence is empty: Flux#single()
- and I want a default value if the sequence is empty: Flux#single(T)
- and I accept an empty sequence as well: Flux#singleOrEmpty
方法 | 注释 |
---|---|
take(long n) |
Take only the first N values from this Flux , if available. |
takeLast(long n) |
Emit the last N values this Flux emitted before its completion. |
last() |
Emit the last element observed before complete signal as a Mono , or emit NoSuchElementException error if the source was empty. |
last(T defaultValue) |
Emit the last element observed before complete signal as a Mono , or emit the defaultValue if the source was empty. |
Peeking into a Sequence
方法 | 入参 | 注释 |
---|---|---|
doOnSubscribe |
Consumer<? super Subscription> |
Add behavior triggered when the Mono is subscribed. |
doOnCancel |
Runnable |
Add behavior triggered when the Mono is cancelled. |
doOnRequest |
LongConsumer |
Add behavior triggering a LongConsumer when the Mono receives any request. |
doOnNext |
Consumer<? super T> |
Add behavior triggered when the Mono emits a data successfully. |
do on Complete … | ||
doOnComplete |
Runnable |
Add behavior triggered when the Flux completes successfully. |
doOnSuccess |
Consumer<? super T> |
Add behavior triggered when the Mono completes successfully.* null : completed without data* T : completed with data |
doOnError |
Consumer<? super Throwable> Class<E>, Consumer<? super E> Predicate<? super Throwable>, Consumer<? super Throwable> |
Add behavior triggered when the Mono completes with an error. |
doOnTerminate |
Runnable |
completion or error |
doAfterTerminate |
Runnable |
completion or error but after it has been propagated downstream |
doFinally |
Consumer<SignalType> |
any terminating condition (complete, error, cancel). |
doOnSuccessOrError |
Deprecated, will be removed in 3.5.0. Prefer using doOnNext(Consumer) , doOnError(Consumer) , doOnTerminate(Runnable) or doOnSuccess(Consumer) . Add behavior triggered when the Mono terminates, either by completing successfully or with an error.* nul l, null : completing without data* T , null : completing with data* null , Throwable : failing with/without data |
|
doAfterSuccessOrError |
Deprecated, will be removed in 3.5.0. Prefer using doAfterTerminate(Runnable) or doFinally(Consumer) .Add behavior triggered after the Mono terminates, either by completing downstream successfully or with an error. The arguments will be null depending on success, success with data and error:* null , null : completed without data* T , null : completed with data* null , Throwable : failed with/without data |
|
all events … | ||
doOnEach |
Consumer<? super Signal<T>> |
I want to know of all events each represented as Signal object in a callback outside the sequence: doOnEach |
调试类:
方法 | 注释 |
---|---|
log |
Observe all Reactive Streams signals and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead. |
timestamp |
If this Mono is valued, emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the parallel Scheduler) and T2 the emitted data (as a T). |
elapsed |
Handling Errors
对于异常处理,Reactor 除了默认的立刻抛出异常的处理方式之外,还提供三类处理方式:
- 简单记录日志(
doOnError
) - recover from errors by falling back (
onErrorReturn
、onErrorResume
) - recover from errors by retrying (
retry
、retryWhen
)
方法 | 注释 | 描述 |
---|---|---|
error |
Create a Mono that terminates with the specified error immediately after being subscribed to. |
创建异常流。 |
onErrorMap |
Transform any error emitted by this Mono by synchronously applying a function to it.Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through. |
catching an exception and wrapping and re-throwing |
onErrorReturn |
Simply emit a captured fallback value when any error is observed on this Mono .Simply emit a captured fallback value when an error of the specified type is observed on this Mono .Simply emit a captured fallback value when an error matching the given predicate is observed on this Mono . |
catching an exception and falling back to a default value |
onErrorResume |
Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error. Subscribe to a fallback publisher when an error matching the given type occurs. Subscribe to a fallback publisher when an error matching a given predicate occurs. |
catching an exception and falling back to another Mono |
onErrorContinue |
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. | https://devdojo.com/ketonemaniac/reactor-onerrorcontinue-vs-onerrorresume |
retry() retry(long) |
Re-subscribes to this Mono sequence if it signals any error, indefinitely.Re-subscribes to this Mono sequence if it signals any error, for a fixed number of times. |
retrying with a simple policy (max number of attempts) |
retryWhen |
Mono
Flux
受检异常处理:
非受检异常会被 Reactor 传播,而受检异常必须被用户代码 try catch ,为了让受检异常被 Reactor 的异常传播机制和异常处理机制支持,可以使用如下步骤处理:
- try catch 之后,使用
Exceptions.propagate
将受检异常包装为非受检异常并重新抛出传播出去。onError
回调等异常处理操作获取到异常之后,可以调用Exceptions.unwrap
取得原受检异常。
参考:https://projectreactor.io/docs/core/release/reference/index.html#error.handling
Sorting a Flux
Splitting a Flux
https://projectreactor.io/docs/core/release/reference/index.html#advanced-three-sorts-batching
延迟处理
转成并行流
终结操作
Subscribe a Sequence
Mono
、Flux
订阅后可以使用 Disposable
API 停止 FLux 流。
Going Back to the Synchronous World
Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as “non-blocking only” (by default parallel() and single()).
- I have a Flux and I want to:
- block until I can get the first element: Flux#blockFirst
- …with a timeout: Flux#blockFirst(Duration)
- block until I can get the last element (or null if empty): Flux#blockLast
- …with a timeout: Flux#blockLast(Duration)
- synchronously switch to an Iterable: Flux#toIterable
- synchronously switch to a Java 8 Stream: Flux#toStream
- block until I can get the first element: Flux#blockFirst
- I have a Mono and I want:
- to block until I can get the value: Mono#block
- …with a timeout: Mono#block(Duration)
- a CompletableFuture: Mono#toFuture
- to block until I can get the value: Mono#block
参考
Reactor 框架,实现 Reactive Streams 规范,并扩展大量特性