响应式编程系列(三)Reactor 操作符总结

Reactive Streams 规范并未提供任何操作符(Operators),而 Reactor 框架的核心价值之一就是提供了丰富的操作符。从简单的转换、过滤到复杂的编排和错误处理,涉及方方面面。

推荐通过参考文档而不是 JavaDoc 来学习 Mono/Flux API 和 Operator 操作符。参考:Appendix A: Which operator do I need?

注意点

使用 Reactor API 时,有以下几个注意点:

一、每个 Operator API 都会返回新实例

在 Reactor 中,操作符(Operators)好比流水线中的工作站。 每个操作符都会向 Publisher 添加行为,并将上一步的 Publisher 包装成新实例。因而,整个链就被串起来,数据从第一个 Publisher 开始,沿着链向下移动,通过每个链接进行转换。最终,Subscriber 结束该流程。 注意,直到 Subscriber 订阅 Publisher 为止,什么都不会发生。

理解操作符会创建新实例这一行为,有助于避免一些常见错误,详见:I Used an Operator on my Flux but it Doesn’t Seem to Apply. What Gives?

二、Nothing Happens Until You subscribe()

Reactor 中,当您编写 Publisher 链时,仅用于描述异步处理的抽象过程,默认情况下数据不会开始处理。只有通过订阅,将 PublisherSubscriber 绑定在一起时,才能触发整个链中的数据流处理。这是由其内部实现方式决定的:Subscriber 发出请求信号并向上传播,直到源 Publisher

org.reactivestream_api

详见:C.2. I Used an Operator on my Flux but it Doesn’t Seem to Apply. What Gives?

三、使用背压(Backpressure)进行流量控制

Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.

The real mechanism defined by the Reactive Streams specification is pretty close to the analogy: A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.

Intermediate operators can also change the request in-transit. Imagine a buffer operator that groups elements in batches of ten. If the subscriber requests one buffer, it is acceptable for the source to produce ten elements. Some operators also implement prefetching strategies, which avoid request(1) round-trips and is beneficial if producing the elements before they are requested is not too costly.

This transforms the push model into a push-pull hybrid, where the downstream can pull n elements from upstream if they are readily available. But if the elements are not ready, they get pushed by the upstream whenever they are produced.

我该使用哪个操作符?

Creating a New Sequence…

Mono

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

for [0|1] elements

mono

创建 Mono 流:

mono_create

Flux

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

for [N] elements

flux

普通创建:

flux_create

遍历创建:

定时创建:

flux_create_interval

合并创建:

flux_create_combine

编程式创建:

flux_create_2

其它:

flux_create_switchOnNext

中间操作

Each operator adds behavior to a Publisher and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked, such that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process. Remember that nothing happens until a Subscriber subscribes to a Publisher.

While the Reactive Streams specification does not specify operators at all, one of the best added values of reactive libraries, such as Reactor, is the rich vocabulary of operators that they provide. These cover a lot of ground, from simple transformation and filtering to complex orchestration and error handling.

Transforming an Existing Sequence

常用的如下:

  • I want to transform existing data:

    • on a 1-to-1 basis (eg. strings to their length): map (Flux|Mono)

      • …by just casting it: cast (Flux|Mono)
      • …in order to materialize each source value’s index: Flux#index
    • on a 1-to-n basis (eg. strings to their characters): flatMap (Flux|Mono) + use a factory method

    • running an asynchronous task for each source item (eg. urls to http request): flatMap (Flux|Mono) + an async Publisher-returning method

      • …ignoring some data: conditionally return a Mono.empty() in the flatMap lambda

      • …retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)

      • …where the async task can return multiple values, from a Mono source: Mono#flatMapMany

        1
        2
        3
        // Mono 转 Flux
        // Create a Flux that emits the items contained in the provided Iterable. A new iterator will be created for each subscriber.
        Mono#flatMapMany(Flux::fromIterable)
  • I want to add pre-set elements to an existing sequence:

  • I want to aggregate a Flux: (the Flux# prefix is assumed below)

    • into a List: collectList, collectSortedList

    • into a Map: collectMap, collectMultiMap

    • into an arbitrary container: collect

    • into the size of the sequence: count (Mono.empty() 不计入 count)

    • by applying a function between each element (eg. running sum): reduce

      1
      2
      3
      4
      Flux.range(0, 5)
      .reduce(Integer::sum) // 两两相加
      .map(Objects::toString)
      .subscribe(log::info);
      • …but emitting each intermediary value: scan
    • into a boolean value from a predicate:

      • applied to all values (AND): all
      • applied to at least one value (OR): any
      • testing the presence of any value: hasElements (there is a Mono equivalent in hasElement)
      • testing the presence of a specific value: hasElement(T)
  • I want to combine publishers…

    • in sequential order: Flux#concat or .concatWith(other) (Flux|Mono)
    • in emission order (combined items emitted as they come): Flux#merge / .mergeWith(other) (Flux|Mono)
    • by pairing values:
    • by coordinating their termination:
      • from 1 Mono and any source into a Mono: Mono#and
      • from n sources when they all completed: Mono#when
      • into an arbitrary container type:
        • each time all sides have emitted: Flux#zip (up to the smallest cardinality)
        • each time a new value arrives at either side: Flux#combineLatest
    • selecting the first publisher which…
      • produces a value (onNext): firstWithValue (Flux|Mono)
      • produces any signal: firstWithSignal (Flux|Mono)
    • triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher)
    • triggered by the start of the next publisher in a sequence of publishers: switchOnNext
  • I want to repeat an existing sequence: repeat (Flux|Mono)

  • I have an empty sequence but…

    • I want a value instead: defaultIfEmpty (Flux|Mono)

    • I want another sequence instead: switchIfEmpty (Flux|Mono)

      1
      2
      3
      4
      5
      6
      7
      Flux.just(0, 1, 2, 3)
      .filter(i -> i < 0)
      .next()
      .doOnNext(i -> log.info("Exist a item: {}", i))
      .switchIfEmpty(Mono.just(-1))
      .map(Objects::toString)
      .subscribe(log::info);
  • I have a sequence but I am not interested in values: ignoreElements (Flux.ignoreElements()|Mono.ignoreElement())

    • …and I want the completion represented as a Mono: then (Flux|Mono)
    • …and I want to wait for another task to complete at the end: thenEmpty (Flux|Mono)
    • …and I want to switch to another Mono at the end: Mono#then(mono)
    • …and I want to emit a single value at the end: Mono#thenReturn(T)
    • …and I want to switch to a Flux at the end: thenMany (Flux|Mono)

Filtering a Sequence

方法 注释
take(long n) Take only the first N values from this Flux, if available.
takeLast(long n) Emit the last N values this Flux emitted before its completion.
last() Emit the last element observed before complete signal as a Mono, or emit NoSuchElementException error if the source was empty.
last(T defaultValue) Emit the last element observed before complete signal as a Mono, or emit the defaultValue if the source was empty.

Peeking into a Sequence

Reactive callback

方法 入参 注释
doOnSubscribe Consumer<? super Subscription> Add behavior triggered when the Mono is subscribed.
doOnCancel Runnable Add behavior triggered when the Mono is cancelled.
doOnRequest LongConsumer Add behavior triggering a LongConsumer when the Mono receives any request.
doOnNext Consumer<? super T> Add behavior triggered when the Mono emits a data successfully.
do on Complete …
doOnComplete Runnable Add behavior triggered when the Flux completes successfully.
doOnSuccess Consumer<? super T> Add behavior triggered when the Mono completes successfully.
* null : completed without data
* T: completed with data
doOnError Consumer<? super Throwable>
Class<E>, Consumer<? super E>
Predicate<? super Throwable>, Consumer<? super Throwable>
Add behavior triggered when the Mono completes with an error.
doOnTerminate Runnable completion or error
doAfterTerminate Runnable completion or error but after it has been propagated downstream
doFinally Consumer<SignalType> any terminating condition (complete, error, cancel).
doOnSuccessOrError Deprecated, will be removed in 3.5.0. Prefer using doOnNext(Consumer), doOnError(Consumer), doOnTerminate(Runnable) or doOnSuccess(Consumer).
Add behavior triggered when the Mono terminates, either by completing successfully or with an error.
* null, null : completing without data
* T, null : completing with data
* null, Throwable : failing with/without data
doAfterSuccessOrError Deprecated, will be removed in 3.5.0. Prefer using doAfterTerminate(Runnable) or doFinally(Consumer).

Add behavior triggered after the Mono terminates, either by completing downstream successfully or with an error. The arguments will be null depending on success, success with data and error:
* null, null : completed without data
* T, null : completed with data
* null, Throwable : failed with/without data
all events …
doOnEach Consumer<? super Signal<T>> I want to know of all events each represented as Signal object in a callback outside the sequence: doOnEach

调试类:

方法 注释
log Observe all Reactive Streams signals and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.
timestamp If this Mono is valued, emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the parallel Scheduler) and T2 the emitted data (as a T).
elapsed

Handling Errors

对于异常处理,Reactor 除了默认的立刻抛出异常的处理方式之外,还提供三类处理方式:

  • 简单记录日志(doOnError
  • recover from errors by falling back (onErrorReturnonErrorResume)
  • recover from errors by retrying (retryretryWhen)
方法 注释 描述
error Create a Mono that terminates with the specified error immediately after being subscribed to. 创建异常流。
onErrorMap Transform any error emitted by this Mono by synchronously applying a function to it.
Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.
Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through.
catching an exception and wrapping and re-throwing
onErrorReturn Simply emit a captured fallback value when any error is observed on this Mono.
Simply emit a captured fallback value when an error of the specified type is observed on this Mono.
Simply emit a captured fallback value when an error matching the given predicate is observed on this Mono.
catching an exception and falling back to a default value
onErrorResume Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error.
Subscribe to a fallback publisher when an error matching the given type occurs.
Subscribe to a fallback publisher when an error matching a given predicate occurs.
catching an exception and falling back to another Mono
onErrorContinue Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. https://devdojo.com/ketonemaniac/reactor-onerrorcontinue-vs-onerrorresume
retry()
retry(long)
Re-subscribes to this Mono sequence if it signals any error, indefinitely.
Re-subscribes to this Mono sequence if it signals any error, for a fixed number of times.
retrying with a simple policy (max number of attempts)
retryWhen

Mono

mono_error

Flux

flux_error

受检异常处理:

非受检异常会被 Reactor 传播,而受检异常必须被用户代码 try catch ,为了让受检异常被 Reactor 的异常传播机制和异常处理机制支持,可以使用如下步骤处理:

  1. try catch 之后,使用 Exceptions.propagate 将受检异常包装为非受检异常并重新抛出传播出去。
  2. onError 回调等异常处理操作获取到异常之后,可以调用 Exceptions.unwrap 取得原受检异常。

参考:https://projectreactor.io/docs/core/release/reference/index.html#error.handling

Sorting a Flux

flux_sort

Splitting a Flux

https://projectreactor.io/docs/core/release/reference/index.html#advanced-three-sorts-batching

延迟处理

flux_delay

转成并行流

flux_parallel

终结操作

Subscribe a Sequence

MonoFlux

mono_subscribe

订阅后可以使用 Disposable API 停止 FLux 流。

Going Back to the Synchronous World

Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as “non-blocking only” (by default parallel() and single()).

参考

Reactor 框架,实现 Reactive Streams 规范,并扩展大量特性

Appendix A: Which operator do I need?

https://zhuanlan.zhihu.com/p/35964846