Jakarta Bean Validation 2.0 was published in August 2019. There are no changes between Jakarta Bean Validation 2.0 and Bean Validation 2.0 except for the GAV: it is now jakarta.validation:jakarta.validation-api.
It’s part of Jakarta EE 8 (but can of course be used with plain Java SE as the previous releases).
hibernate-validator is entirely separate from the persistence aspects of Hibernate. So, by adding it as a dependency, we’re not adding these persistence aspects into the project.
Jakarta Bean Validation specifies that the constraints of a composed constraint are all combined via a logical AND. This means all of the composing constraints need to return true to obtain an overall successful validation.
Hibernate Validator offers an extension to this and allows you to compose constraints via a logical OR or NOT. To do so, you have to use the @ConstraintComposition annotation and the enum CompositionType with its values AND, OR and ALL_FALSE.
这表示该依赖在运行时由 Java EE container 容器提供,因此无须重复引入。但对于 Spring Boot 应用来说,则需要添加此依赖。如果缺少该依赖,则报错如下:
1
HV000183: Unable to load 'javax.el.ExpressionFactory'. Check that you have the EL dependencies on the classpath, or use ParameterMessageInterpolator instead
问题原因:缺少 Unified Expression Language (EL) 规范的实现依赖,即 Glassfish。
解决方案:
Hibernate Validator also requires an implementation of the Unified Expression Language (JSR 341) for evaluating dynamic expressions in constraint violation messages.
When your application runs in a Java EE container such as WildFly, an EL implementation is already provided by the container.
In a Java SE environment, however, you have to add an implementation as dependency to your POM file. For instance, you can add the following dependency to use the JSR 341 reference implementation:
Jakarta Annotations defines a collection of annotations representing common semantic concepts that enable a declarative style of programming that applies across a variety of Java technologies.
通过在 Java 平台中添加 JSR 175(Java 编程语言的元数据工具),我们设想各种技术将使用注解来实现声明式编程风格。如果这些技术各自为共同概念独立定义自己的注解,那将是不幸的。在 Jakarta EE 和 Java SE 组件技术中保持一致性很有价值,但在 Jakarta EE 和 Java SE 之间实现一致性也很有价值。
本规范的目的是定义一小组通用注解,这些注解可在其它规范中使用。希望这将有助于避免在不同 Jakarta EE 规范中定义的注解之间不必要的冗余或重复。这将允许我们将通用注解集中在一个地方,让技术引用此规范,而不是在多个规范中指定它们。这样,所有技术都可以使用相同版本的注解,并且跨平台使用的注解将保持一致。
Oracle has two products that implement Java Platform Standard Edition (Java SE) 8:
Java SE Development Kit (JDK) 8. JDK 8 is a superset of JRE 8, and contains everything that is in JRE 8, plus tools such as the compilers and debuggers necessary for developing applets and applications.
Java SE Runtime Environment (JRE) 8. JRE 8 provides the libraries, the Java Virtual Machine (JVM), and other components to run applets and applications written in the Java programming language. Note that the JRE includes components not required by the Java SE specification, including both standard and non-standard Java components.
The following conceptual diagram illustrates the components of Oracle’s Java SE products:
JDK
一些术语:
JCP 是 Java Community Process(Java社区进程)的简称,社会各界组成的 Java 社区,规划和领导 Java 的发展。
容器为 Application Components 提供了一套底层 Jakarta EE API 的联合视图。Jakarta EE Application Components 从不直接与其它 Jakarta EE Application Components 交互。它们使用容器的协议和方法来相互交互以及与平台服务交互。在 Application Components 和 Jakarta EE 服务之间插入一个容器,可以使该容器透明地注入该组件所需的服务,例如声明式事务管理,安全检查,资源池和状态管理。
This specification requires that containers provide a Java Compatible™ runtime environment, as defined by the Java Platform, Standard Edition, v8 specification (Java SE).
Database
The Jakarta EE platform requires a database, accessible through the JDBC API, for the storage of business data. The database is accessible from:
at processing time, the submitted Runnable will be directly executed, effectively running them on the current Thread (can be seen as a “null object” or no-op Scheduler).
single()
newSingle(...)
A single, reusable thread.
Note that this method reuses the same thread for all callers, until the Scheduler is disposed.
elastic()
newElastic(...)
An unbounded elastic thread pool.
This one is no longer preferred with the introduction of Schedulers.boundedElastic(), as it has a tendency to hide backpressure problems and lead to too many threads (see below).
boundedElastic()
newBoundedElastic(...)
A bounded elastic thread pool.
Like its predecessor elastic(), it creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are also disposed. Unlike its predecessor elastic(), it has a cap on the number of backing threads it can create (default is number of CPU cores x 10). Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available. This is a better choice for I/O blocking work. While it is made to help with legacy blocking code if it cannot be avoided. Schedulers.boundedElastic() is a handy way to give a blocking process its own thread so that it does not tie up other resources. See How Do I Wrap a Synchronous, Blocking Call?, but doesn’t pressure the system too much with new threads.
parallel()
newParallel(...)
A fixed pool of workers that is tuned for parallel work.
It creates as many workers as you have CPU cores.
fromExecutorService(ExecutorService)
A Customize thread pool.
Create a Scheduler out of any pre-existing ExecutorService
delayElements Signals are delayed and continue on the parallel default Scheduler Signals are delayed and continue on an user-specified Scheduler
如何使用调度器?
Reactor offers two means of switching the execution context (or Scheduler) in a reactive chain: publishOn and subscribeOn. Both take a Scheduler and let you switch the execution context to that scheduler. But the placement of publishOn in the chain matters, while the placement of subscribeOn does not. To understand that difference, you first have to remember that nothing happens until you subscribe.
Let’s have a closer look at the publishOn and subscribeOn operators:
most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made. The following example runs a Mono in a new thread:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// The Mono<String> is assembled in thread main. final Mono<String> mono = Mono.fromSupplier(() -> { log.info("fromSupplier"); return"hello"; }) .map(msg -> { log.info("map"); return msg + " world"; });
Threadt=newThread(() -> // However, it is subscribed to in thread Thread-0. // As a consequence, all callbacks (fromSupplier, map, onNext) actually run in Thread-0 mono.subscribe(log::info) ); t.start(); t.join();
输出结果:
1 2 3
21:02:18.436 [Thread-0] INFO FluxTest - fromSupplier 21:02:18.436 [Thread-0] INFO FluxTest - map 21:02:18.437 [Thread-0] INFO FluxTest - hello world
// The Mono<String> is assembled in thread main. Mono.fromSupplier(() -> { log.info("fromSupplier"); return"hello"; }) .map(msg -> { log.info("map"); return msg + " world"; }) .doOnTerminate(countDownLatch::countDown) // However, the subscribeOn switches the whole sequence on a Thread picked from Scheduler. .subscribeOn(Schedulers.newSingle("subscribeOn")) .subscribe(log::info);
countDownLatch.await();
输出结果:
1 2 3
21:31:52.563 [subscribeOn-1] INFO FluxTest - fromSupplier 21:31:52.563 [subscribeOn-1] INFO FluxTest - map 21:31:52.563 [subscribeOn-1] INFO FluxTest - hello world
例子三
演示 publishOn 如何影响其后续操作符的执行线程。
publishOn takes signals from upstream and replays them downstream while executing the callback on a worker from the associated Scheduler. Consequently, it affects where the subsequent operators execute (until another publishOn is chained in), as follows:
// 1、The Mono<String> is assembled in thread main. Mono.fromSupplier(() -> { log.info("fromSupplier"); return"hello"; }) .map(msg -> { log.info("first map"); return msg + " world"; }) // 3、The publishOn affects where the subsequent operators execute. .publishOn(Schedulers.newSingle("publishOn")) .map(msg -> { log.info("second map"); return msg + " again"; }) .doOnTerminate(countDownLatch::countDown) // 2、However, the subscribeOn switches the whole sequence on a Thread picked from Scheduler. .subscribeOn(Schedulers.newSingle("subscribeOn")) .subscribe(log::info);
countDownLatch.await();
输出结果:
1 2 3 4
21:32:36.975 [subscribeOn-1] INFO FluxTest - fromSupplier 21:32:36.976 [subscribeOn-1] INFO FluxTest - first map 21:32:36.977 [publishOn-2] INFO FluxTest - second map 21:32:36.977 [publishOn-2] INFO FluxTest - hello world again
Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.
The real mechanism defined by the Reactive Streams specification is pretty close to the analogy: A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.
Intermediate operators can also change the request in-transit. Imagine a buffer operator that groups elements in batches of ten. If the subscriber requests one buffer, it is acceptable for the source to produce ten elements. Some operators also implement prefetching strategies, which avoid request(1) round-trips and is beneficial if producing the elements before they are requested is not too costly.
This transforms the push model into a push-pull hybrid, where the downstream can pull n elements from upstream if they are readily available. But if the elements are not ready, they get pushed by the upstream whenever they are produced.
Each operator adds behavior to a Publisher and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked, such that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process. Remember that nothing happens until a Subscriber subscribes to a Publisher.
While the Reactive Streams specification does not specify operators at all, one of the best added values of reactive libraries, such as Reactor, is the rich vocabulary of operators that they provide. These cover a lot of ground, from simple transformation and filtering to complex orchestration and error handling.
Transforming an Existing Sequence
常用的如下:
I want to transform existing data:
on a 1-to-1 basis (eg. strings to their length): map (Flux|Mono)
…in order to materialize each source value’s index: Flux#index
on a 1-to-n basis (eg. strings to their characters): flatMap (Flux|Mono) + use a factory method
running an asynchronous task for each source item (eg. urls to http request): flatMap (Flux|Mono) + an async Publisher-returning method
…ignoring some data: conditionally return a Mono.empty() in the flatMap lambda
…retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)
…where the async task can return multiple values, from a Mono source: Mono#flatMapMany
1 2 3
// Mono 转 Flux // Create a Flux that emits the items contained in the provided Iterable. A new iterator will be created for each subscriber. Mono#flatMapMany(Flux::fromIterable)
I want to add pre-set elements to an existing sequence:
based on a publisher “timing out”: Flux#sampleTimeout (each element triggers a publisher, and is emitted if that publisher does not overlap with the next)
I expect at most 1 element (error if more than one)…
and I want an error if the sequence is empty: Flux#single()
and I want a default value if the sequence is empty: Flux#single(T)
Take only the first N values from this Flux, if available.
takeLast(long n)
Emit the last N values this Flux emitted before its completion.
last()
Emit the last element observed before complete signal as a Mono, or emit NoSuchElementException error if the source was empty.
last(T defaultValue)
Emit the last element observed before complete signal as a Mono, or emit the defaultValue if the source was empty.
Peeking into a Sequence
方法
入参
注释
doOnSubscribe
Consumer<? super Subscription>
Add behavior triggered when the Mono is subscribed.
doOnCancel
Runnable
Add behavior triggered when the Mono is cancelled.
doOnRequest
LongConsumer
Add behavior triggering a LongConsumer when the Mono receives any request.
doOnNext
Consumer<? super T>
Add behavior triggered when the Mono emits a data successfully.
do on Complete …
doOnComplete
Runnable
Add behavior triggered when the Flux completes successfully.
doOnSuccess
Consumer<? super T>
Add behavior triggered when the Mono completes successfully. * null : completed without data * T: completed with data
doOnError
Consumer<? super Throwable> Class<E>, Consumer<? super E> Predicate<? super Throwable>, Consumer<? super Throwable>
Add behavior triggered when the Mono completes with an error.
doOnTerminate
Runnable
completion or error
doAfterTerminate
Runnable
completion or error but after it has been propagated downstream
doFinally
Consumer<SignalType>
any terminating condition (complete, error, cancel).
doOnSuccessOrError
Deprecated, will be removed in 3.5.0. Prefer using doOnNext(Consumer), doOnError(Consumer), doOnTerminate(Runnable) or doOnSuccess(Consumer). Add behavior triggered when the Mono terminates, either by completing successfully or with an error. * null, null : completing without data * T, null : completing with data * null, Throwable : failing with/without data
Add behavior triggered after the Mono terminates, either by completing downstream successfully or with an error. The arguments will be null depending on success, success with data and error: * null, null : completed without data * T, null : completed with data * null, Throwable : failed with/without data
all events …
doOnEach
Consumer<? super Signal<T>>
I want to know of all events each represented as Signal object in a callback outside the sequence: doOnEach
调试类:
方法
注释
log
Observe all Reactive Streams signals and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.
timestamp
If this Mono is valued, emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the parallel Scheduler) and T2 the emitted data (as a T).
elapsed
Handling Errors
对于异常处理,Reactor 除了默认的立刻抛出异常的处理方式之外,还提供三类处理方式:
简单记录日志(doOnError)
recover from errors by falling back (onErrorReturn、onErrorResume)
recover from errors by retrying (retry、retryWhen)
方法
注释
描述
error
Create a Mono that terminates with the specified error immediately after being subscribed to.
创建异常流。
onErrorMap
Transform any error emitted by this Mono by synchronously applying a function to it. Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through. Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through.
catching an exception and wrapping and re-throwing
onErrorReturn
Simply emit a captured fallback value when any error is observed on this Mono. Simply emit a captured fallback value when an error of the specified type is observed on this Mono. Simply emit a captured fallback value when an error matching the given predicate is observed on this Mono.
catching an exception and falling back to a default value
onErrorResume
Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error. Subscribe to a fallback publisher when an error matching the given type occurs. Subscribe to a fallback publisher when an error matching a given predicate occurs.
catching an exception and falling back to another Mono
onErrorContinue
Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.
Re-subscribes to this Mono sequence if it signals any error, indefinitely. Re-subscribes to this Mono sequence if it signals any error, for a fixed number of times.
retrying with a simple policy (max number of attempts)