Qida's Blog

纸上得来终觉浅,绝知此事要躬行。

Java Documentation

Standard Edition

Java Platform, Standard Edition Documentation

Roadmap

详见:Oracle Java SE Support Roadmap

Oracle Java SE Support Roadmap

参考:

一文详解|从JDK8飞升到JDK17,再到未来的JDK21 | 阿里技术

Java SE 8

Java Platform, Standard Edition (Java SE) Overview

Reference

Oracle has two products that implement Java Platform Standard Edition (Java SE) 8:

  • Java SE Development Kit (JDK) 8. JDK 8 is a superset of JRE 8, and contains everything that is in JRE 8, plus tools such as the compilers and debuggers necessary for developing applets and applications.

  • Java SE Runtime Environment (JRE) 8. JRE 8 provides the libraries, the Java Virtual Machine (JVM), and other components to run applets and applications written in the Java programming language. Note that the JRE includes components not required by the Java SE specification, including both standard and non-standard Java components.

The following conceptual diagram illustrates the components of Oracle’s Java SE products:

Description of Java Conceptual Diagram

JDK

一些术语:

  • JCP 是 Java Community Process(Java社区进程)的简称,社会各界组成的 Java 社区,规划和领导 Java 的发展。
  • JSR 是 Java Specification Requests(Java 规范请求)的简称,是 JCP 成员向委员会提交的 Java 发展议案,经过一系列流程后,如果通过会成为 JEP,最终会体现在未来的 Java 中。
  • JEP 是 JDK Enhancement Proposals (Java 增强提案)的简称,为了保证日后 JDK 研发能够更加顺利地进行,从 JDK 8 开始,Oracle 启用 JEP 来定义和管理纳入新版 JDK 发布范围的功能特性。JDK 的版本变化将从这些提案中选取。例如:

一些历史:参考《JDK 历代版本变化

JDK 从 1.5 版本开始,在官方的正式文档与宣传资料中已经不再使用类似“JDK 1.5”的名称,只有程序员内部使用的开发版本号(Developer Version,例如 java -version 的输出)才继续沿用 1.5、1.6 和 1.7 的版本号(JDK 10 之后又改为了采用年份加月份作为开发版本号,例如 18.3),而公开版本号(Product Version)则改为 JDK 5、JDK 6 和 JDK 7 的命名方式。

从 JDK 10 开始,每年的 3 月和 9 月各发布一个大版本,目的就是避免众多功能特性被集中捆绑到一个 JDK 版本上而引发交付风险。同时为了降低维护成本,每六个 JDK 大版本中才会被划出一个长期支持(Long Term Suppot,LTS)版本,只有 LTS 版的 JDK 能够获得为期三年的支持和更新,普通版的 JDK 就只有短短六个月的生命周期。

JDK 8 和 JDK 11 是 LTS 版本,再下一个就到 2021 年发布的 JDK 17。

在 2018 年发布的 Java 11, Oracle 已经让 OpenJDK 和 Oracle JDK 两者的二进制文件在功能上尽可能相互接近,尽管 OpenJDK 与 Oracle JDK 两者在一些选项之间仍然存在一些差异。参考:<Differences Between Oracle JDK and OpenJDK>。

JDK

根据 JEP-320 的内容,计划于 2018 年 9 月发布的 JDK 11 将不包括 Java EE 模块:JAX-WS( JSR-224 )、JAXB( JSR-222 )、JAF( JSR-925 )、Commons Annotations( JSR-250 )和 JTA( JSR-907 ),而这些模块已在 JDK 中存在了多年。计划在 JDK 11 中移除的四个 Java EE 模块最终将进入 EE4J。

常见 JDK 版本:《你该选择什么样的 JDK?

  • Oracle JDK 商业版

    OTN 协议下发行的传统的 Oracle JDK,个人可以免费使用,但若在生产环境中商用就必须付费,可以有三年时间的更新支持。

  • OpenJDK 开源版

    GPLv2+CE 协议下由 Oracle 发行的 OpenJDK,可以免费在开发、测试、生产环境中使用,但是只有半年时间的更新支持。

    由于 Oracle 不愿意在旧版本的 OpenJDK 上继续耗费资源,而 RedHat (IBM) 又乐意扩大自己在 Java 社区的影响力,因此 RedHat 代替 Oracle 成为了 OpenJDK 历史版本的维护者,接过了 OpenJDK 6、7、8、11 的管理权力和维护职责。

  • OpenJDK 的变种版本:

JRE

不需要考虑 JDK 与 JRE 的关系了

JDK 和 JRE 都是 Java 的重要组成部分,但它们的角色和用途是不同的。JDK 是 Java 开发工具包,主要用于开发 Java 应用。它包含了 JRE,同时还提供了一些额外的工具,如编译器(javac)、调试器(jdb)等。JRE 则是运行 Java 应用程序所需的环境。它包含了 Java 虚拟机(JVM)和 Java 类库,也就是 Java 应用程序运行时所需的核心类和其他支持文件。

在 JDK 8 及之前的版本中,Oracle 会提供独立的 JRE 和 JDK 供用户下载。也就是说,你可以只安装 JRE 来运行 Java 程序,也可以安装 JDK 来开发 Java 程序。

然而从 JDK 9 开始,Oracle 不再单独发布 JRE。取而代之的是 jlink 工具,可以使用这个工具来生成 定制的运行时镜像。这种方式简化了 Java 应用的部署,因你只需要分发包含你的应用和定制运行时镜像的包,不需要单独安装 JRE。

JVM

除了官方 HotSpotVM、GraalVM 实现,其它厂商实现如下:

参考:Java 虚拟机系列

Enterprise Edition

Roadmap

作为分水岭,2017 年 11 月,Oracle 将 Java EE 移交给 Eclipse 基金会。 2018 年 3 月 5 日,Eclipse 基金会宣布 Java EE (Enterprise Edition) 被更名为 Jakarta EE。

参考:《Java EE 规范重命名为 Jakarta EE

规范 创建组织 发布时间
Java EE 1.3 (JSR-058) Java Community Process
Java EE 1.4 (JSR-151) Java Community Process
Java EE 5 (JSR-244) Java Community Process 2005
Java EE 6 (JSR-316) Java Community Process 2007
Java EE 7 (JSR-342) Java Community Process May 28, 2013
Java EE 8 (JSR-366) Java Community Process Aug 21, 2017
Jakarta EE 8 Jakarta EE Platform Specification Project with guidance provided by the Jakarta EE Working Group 2019.9
Jakarta EE 9 Jakarta EE Platform Specification Project with guidance provided by the Jakarta EE Working Group 2020.9

Java EE

Java Platform, Enterprise Edition

Java EE 和 Spring 之间复杂的关系:

Spring 诞生于 2004 年,由 Rod Johnson 发起,作为对 J2EE(Java 2 Platform,Enterprise Edition)和 EJB 2 复杂性的反击。从那个时候开始,Spring 和 Java EE 之间就没有停止过竞争,并彼此影响对方:

  • Spring(以及 Hibernate)的出现刺激了 Java EE 社区,促使他们推出了 EJB 3 和 JAP 1.0。
  • Spring Batch 直接影响到了 Batch 规范(JSR 352)。
  • Spring Dependency Injection 启发了 CDI(Context and Dependency Injection)。
  • Spring 恰到好处地使用了 J2EE 和 Java EE 中的某些标准,如 Servlet、JMS 和 JPA。
  • Spring 5 宣称兼容 Java EE 8。

从 2006 年开始,Java EE 也将提升易用性和对开发者的友好放在首位,但在演进速度方面还是很慢,主要有两个原因:

  • JCP 制定规范需要很长时间:即使是一个轻量级的规范,也需要多方参与,需要更长的时间才能达成一致。
  • 实现和认证:在规范发布之后,需要几个月时间才能找到符合认证的应用服务器。

而最近,这方面的差距在加大:

  • Spring Boot 将“以约定代替配置(Convention Over Configuration)”的原则发挥到了极致,进一步提升易用性。
  • Spring Cloud 利用 Netflix 的开源组件解决了与云原生应用开发相关的问题,如服务注册、服务发现、弹性、负载均衡、监控……
  • Spring 5 将响应式编程(Reactive Programming)提升为一等公民。

Java EE 在这方面的速度要慢的多。在 2013 年发布 Java EE 7 之后,经历了一段消停期。2016 年,在社区的压力下,Oracle 才发布了一个新的路线图。

Java EE 8 发布于 2017 年 9 月,虽然人们对其期望甚高,但并非革命性的。人们还是把更多的目光投向了 Java EE 9,期望下一个版本会有更多的创新。

与此同时,Eclipse 基金会于 2016 年中启动 Microprofile.io 项目,旨在以微服务架构为基准来优化企业版 Java,以此来推动 Java EE 生态系统的发展。Microprofile 1.0 涵盖了 JAX-RS 2.0、CDI 1.2 和 JSON-P 1.0,1.2 版本于 2017 年 9 月发布,加入了更多特性,如配置、容错、JWT、度量指标和健康检测,2.0 版本有望与 Java EE 8 看齐。

Jakarta EE

Eclipse Foundation Projects

Architecture

Jakarta EE 平台的架构关系如下图所示。(请注意,此图显示了元素间的逻辑关系;它并不意味着将元素间物理关系为单独的机器、进程、地址空间或虚拟机。)

下面分别描述每个矩形及其之间的关系:

  • 容器(如 Web Container)作为 Jakarta EE 运行时环境,为 Application Components(如 Server Pages、Servlet)提供所需服务;
  • 而所提供的服务,由矩形下半部分的方框表示。例如, Web Container 为 Servlet 提供了 Bean Validation API。详见 Jakarta EE 标准服务
  • Java SE 的 API 受 Java SE 运行时环境(JRE)的支持,适用于每种类型的 Application Components。
  • 箭头表示需要访问 Jakarta EE 平台的其它部分。例如,Web Container 通过 JDBC™ API 为 Server Pages、Servlet 提供数据库访问能力。

Jakarta EE Architecture Diagram

Application Components

Jakarta EE 运行时环境定义了 Jakarta EE 产品必须支持的四种 Application Components 类型:

Application Components 描述
Application clients 通常是在台式计算机上执行的 GUI 程序。提供类似于本机应用程序的用户体验,并且可以访问 Jakarta EE 中间层的所有设施。
Applets 通常是在 Web 浏览器中执行的 GUI 组件,但也可以在支持 Applet 编程模型的各种其它应用程序或设备中执行。
Web Components (Servlets, Server Pages, Server Faces Applications, Filters, and Web Event Listeners) 通常在 Web 容器中执行,并可能响应来自 Web 客户端的 HTTP 请求。
Enterprise Beans 在支持事务的托管环境中执行。可以使用 SOAP/HTTP 协议直接提供 Web 服务。

Containers

容器为 Jakarta EE Application Components 提供运行时支持。

容器为 Application Components 提供了一套底层 Jakarta EE API 的联合视图。Jakarta EE Application Components 从不直接与其它 Jakarta EE Application Components 交互。它们使用容器的协议和方法来相互交互以及与平台服务交互。在 Application Components 和 Jakarta EE 服务之间插入一个容器,可以使该容器透明地注入该组件所需的服务,例如声明式事务管理,安全检查,资源池和状态管理。

This specification requires that containers provide a Java Compatible™ runtime environment, as defined by the Java Platform, Standard Edition, v8 specification (Java SE).

Database

The Jakarta EE platform requires a database, accessible through the JDBC API, for the storage of business data. The database is accessible from:

  • Web Components
  • Enterprise Beans
  • Application clients

Jakarta EE Standard Services

https://jakarta.ee/specifications/platform/9/jakarta-platform-spec-9.html#a84

参考

教程:

2017 年

2019 年

  • InfoQ 2019 年 Java 发展趋势报告

  • 2019 中国 Java 发展趋势报告

    Java 作为使用最为广泛的语言,最近几年还是有比较大进步的,无论从语法的易用性上还是性能上都有很大程度的提升。吸收了函数式编程的思想,lambda 表达式、Parallem stream、Var 变量等提升了开发人员的效率与代码的简洁性。ZGC 无疑是一项重大的改进,在一定程度上解决了 Java 天生的 GC 延迟问题。

    Java 的编程复杂度并没有明显的降低,比如 I/O 处理、并发 / 并⾏计算,以及类加载等等。再者是 Java 与操作系统之间的交互仍不够充分,尽管 Java 9 开始提供了不少的 API,然⽽了解和使用的群体不⾜。Java 在这方面明显不及 GO 语言。

    从语⾔层⾯来看,Java 正在向主流非 Java 语⾔融合,解决其中鸿沟的关键是语法的变化,比如 Java 8 的 Lambda 表达式 和 Java 10 的局部变量类型( var )等。个人认为这是一件好事,未来前后端不分家,相互渗透,对于彼此语言都是良性。

2020 年

2021 年

2022 年

2023 年:

从 Java 8 升级到 Java 17 踩坑全过程

从 JDK8 飞升到 JDK17,再到未来的 JDK21 | 阿里技术

升级指南之 JDK 11+ 新特性和阿里巴巴 AJDK | 阿里技术

从 JDK 9 到 19,我们帮您提炼了和云原生场景有关的能力列表 | 阿里技术

从 JDK 9 到 19,认识一个新的 Java 形态(内存篇)| 阿里技术

从 JDK 9 开始,一些关键特性 | 阿里技术

如何创建调度器?

通过工厂类 Schedulers 创建调取器:

https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html

Return a shared instance Return a new instance Description Notes
immediate() / No execution context at processing time, the submitted Runnable will be directly executed, effectively running them on the current Thread (can be seen as a “null object” or no-op Scheduler).
single() newSingle(...) A single, reusable thread. Note that this method reuses the same thread for all callers, until the Scheduler is disposed.
elastic() newElastic(...) An unbounded elastic thread pool. This one is no longer preferred with the introduction of Schedulers.boundedElastic(), as it has a tendency to hide backpressure problems and lead to too many threads (see below).
boundedElastic() newBoundedElastic(...) A bounded elastic thread pool. Like its predecessor elastic(), it creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are also disposed.
Unlike its predecessor elastic(), it has a cap on the number of backing threads it can create (default is number of CPU cores x 10). Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available.
This is a better choice for I/O blocking work. While it is made to help with legacy blocking code if it cannot be avoided. Schedulers.boundedElastic() is a handy way to give a blocking process its own thread so that it does not tie up other resources. See How Do I Wrap a Synchronous, Blocking Call?, but doesn’t pressure the system too much with new threads.
parallel() newParallel(...) A fixed pool of workers that is tuned for parallel work. It creates as many workers as you have CPU cores.
fromExecutorService(ExecutorService) A Customize thread pool. Create a Scheduler out of any pre-existing ExecutorService

delayElements
Signals are delayed and continue on the parallel default Scheduler
Signals are delayed and continue on an user-specified Scheduler

如何使用调度器?

Reactor offers two means of switching the execution context (or Scheduler) in a reactive chain: publishOn and subscribeOn. Both take a Scheduler and let you switch the execution context to that scheduler. But the placement of publishOn in the chain matters, while the placement of subscribeOn does not. To understand that difference, you first have to remember that nothing happens until you subscribe.

Let’s have a closer look at the publishOn and subscribeOn operators:

例子一

演示流是运行在 subscribe() 方法调用的线程上,且大多数操作符继续在前一个操作符执行的线程中工作。

most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made. The following example runs a Mono in a new thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// The Mono<String> is assembled in thread main.
final Mono<String> mono =
Mono.fromSupplier(() -> {
log.info("fromSupplier");
return "hello";
})
.map(msg -> {
log.info("map");
return msg + " world";
});

Thread t = new Thread(() ->
// However, it is subscribed to in thread Thread-0.
// As a consequence, all callbacks (fromSupplier, map, onNext) actually run in Thread-0
mono.subscribe(log::info)
);
t.start();
t.join();

输出结果:

1
2
3
21:02:18.436 [Thread-0] INFO FluxTest - fromSupplier
21:02:18.436 [Thread-0] INFO FluxTest - map
21:02:18.437 [Thread-0] INFO FluxTest - hello world

例子二

演示如何使用 subscribeOn 方法,简化上述例子一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CountDownLatch countDownLatch = new CountDownLatch(1);

// The Mono<String> is assembled in thread main.
Mono.fromSupplier(() -> {
log.info("fromSupplier");
return "hello";
})
.map(msg -> {
log.info("map");
return msg + " world";
})
.doOnTerminate(countDownLatch::countDown)
// However, the subscribeOn switches the whole sequence on a Thread picked from Scheduler.
.subscribeOn(Schedulers.newSingle("subscribeOn"))
.subscribe(log::info);

countDownLatch.await();

输出结果:

1
2
3
21:31:52.563 [subscribeOn-1] INFO FluxTest - fromSupplier
21:31:52.563 [subscribeOn-1] INFO FluxTest - map
21:31:52.563 [subscribeOn-1] INFO FluxTest - hello world

例子三

演示 publishOn 如何影响其后续操作符的执行线程。

publishOn takes signals from upstream and replays them downstream while executing the callback on a worker from the associated Scheduler. Consequently, it affects where the subsequent operators execute (until another publishOn is chained in), as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CountDownLatch countDownLatch = new CountDownLatch(1);

// 1、The Mono<String> is assembled in thread main.
Mono.fromSupplier(() -> {
log.info("fromSupplier");
return "hello";
})
.map(msg -> {
log.info("first map");
return msg + " world";
})
// 3、The publishOn affects where the subsequent operators execute.
.publishOn(Schedulers.newSingle("publishOn"))
.map(msg -> {
log.info("second map");
return msg + " again";
})
.doOnTerminate(countDownLatch::countDown)
// 2、However, the subscribeOn switches the whole sequence on a Thread picked from Scheduler.
.subscribeOn(Schedulers.newSingle("subscribeOn"))
.subscribe(log::info);

countDownLatch.await();

输出结果:

1
2
3
4
21:32:36.975 [subscribeOn-1] INFO FluxTest - fromSupplier
21:32:36.976 [subscribeOn-1] INFO FluxTest - first map
21:32:36.977 [publishOn-2] INFO FluxTest - second map
21:32:36.977 [publishOn-2] INFO FluxTest - hello world again

如何包装同步阻塞调用?

How Do I Wrap a Synchronous, Blocking Call?

参考

4.5. Threading and Schedulers

8. Exposing Reactor metrics

Appendix C.1. How Do I Wrap a Synchronous, Blocking Call?

Appendix C.6. How Do I Ensure Thread Affinity when I Use publishOn()?

https://www.woolha.com/tutorials/project-reactor-publishon-vs-subscribeon-difference

Reactive Streams 规范并未提供任何操作符(Operators),而 Reactor 框架的核心价值之一就是提供了丰富的操作符。从简单的转换、过滤到复杂的编排和错误处理,涉及方方面面。

推荐通过参考文档而不是 JavaDoc 来学习 Mono/Flux API 和 Operator 操作符。参考:Appendix A: Which operator do I need?

注意点

使用 Reactor API 时,有以下几个注意点:

一、每个 Operator API 都会返回新实例

在 Reactor 中,操作符(Operators)好比流水线中的工作站。 每个操作符都会向 Publisher 添加行为,并将上一步的 Publisher 包装成新实例。因而,整个链就被串起来,数据从第一个 Publisher 开始,沿着链向下移动,通过每个链接进行转换。最终,Subscriber 结束该流程。 注意,直到 Subscriber 订阅 Publisher 为止,什么都不会发生。

理解操作符会创建新实例这一行为,有助于避免一些常见错误,详见:I Used an Operator on my Flux but it Doesn’t Seem to Apply. What Gives?

二、Nothing Happens Until You subscribe()

Reactor 中,当您编写 Publisher 链时,仅用于描述异步处理的抽象过程,默认情况下数据不会开始处理。只有通过订阅,将 PublisherSubscriber 绑定在一起时,才能触发整个链中的数据流处理。这是由其内部实现方式决定的:Subscriber 发出请求信号并向上传播,直到源 Publisher

org.reactivestream_api

详见:C.2. I Used an Operator on my Flux but it Doesn’t Seem to Apply. What Gives?

三、使用背压(Backpressure)进行流量控制

Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.

The real mechanism defined by the Reactive Streams specification is pretty close to the analogy: A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.

Intermediate operators can also change the request in-transit. Imagine a buffer operator that groups elements in batches of ten. If the subscriber requests one buffer, it is acceptable for the source to produce ten elements. Some operators also implement prefetching strategies, which avoid request(1) round-trips and is beneficial if producing the elements before they are requested is not too costly.

This transforms the push model into a push-pull hybrid, where the downstream can pull n elements from upstream if they are readily available. But if the elements are not ready, they get pushed by the upstream whenever they are produced.

我该使用哪个操作符?

Creating a New Sequence…

Mono

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

for [0|1] elements

mono

创建 Mono 流:

mono_create

Flux

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

for [N] elements

flux

普通创建:

flux_create

遍历创建:

定时创建:

flux_create_interval

合并创建:

flux_create_combine

编程式创建:

flux_create_2

其它:

flux_create_switchOnNext

中间操作

Each operator adds behavior to a Publisher and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked, such that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process. Remember that nothing happens until a Subscriber subscribes to a Publisher.

While the Reactive Streams specification does not specify operators at all, one of the best added values of reactive libraries, such as Reactor, is the rich vocabulary of operators that they provide. These cover a lot of ground, from simple transformation and filtering to complex orchestration and error handling.

Transforming an Existing Sequence

常用的如下:

  • I want to transform existing data:

    • on a 1-to-1 basis (eg. strings to their length): map (Flux|Mono)

      • …by just casting it: cast (Flux|Mono)
      • …in order to materialize each source value’s index: Flux#index
    • on a 1-to-n basis (eg. strings to their characters): flatMap (Flux|Mono) + use a factory method

    • running an asynchronous task for each source item (eg. urls to http request): flatMap (Flux|Mono) + an async Publisher-returning method

      • …ignoring some data: conditionally return a Mono.empty() in the flatMap lambda

      • …retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)

      • …where the async task can return multiple values, from a Mono source: Mono#flatMapMany

        1
        2
        3
        // Mono 转 Flux
        // Create a Flux that emits the items contained in the provided Iterable. A new iterator will be created for each subscriber.
        Mono#flatMapMany(Flux::fromIterable)
  • I want to add pre-set elements to an existing sequence:

  • I want to aggregate a Flux: (the Flux# prefix is assumed below)

    • into a List: collectList, collectSortedList

    • into a Map: collectMap, collectMultiMap

    • into an arbitrary container: collect

    • into the size of the sequence: count (Mono.empty() 不计入 count)

    • by applying a function between each element (eg. running sum): reduce

      1
      2
      3
      4
      Flux.range(0, 5)
      .reduce(Integer::sum) // 两两相加
      .map(Objects::toString)
      .subscribe(log::info);
      • …but emitting each intermediary value: scan
    • into a boolean value from a predicate:

      • applied to all values (AND): all
      • applied to at least one value (OR): any
      • testing the presence of any value: hasElements (there is a Mono equivalent in hasElement)
      • testing the presence of a specific value: hasElement(T)
  • I want to combine publishers…

    • in sequential order: Flux#concat or .concatWith(other) (Flux|Mono)
    • in emission order (combined items emitted as they come): Flux#merge / .mergeWith(other) (Flux|Mono)
    • by pairing values:
    • by coordinating their termination:
      • from 1 Mono and any source into a Mono: Mono#and
      • from n sources when they all completed: Mono#when
      • into an arbitrary container type:
        • each time all sides have emitted: Flux#zip (up to the smallest cardinality)
        • each time a new value arrives at either side: Flux#combineLatest
    • selecting the first publisher which…
      • produces a value (onNext): firstWithValue (Flux|Mono)
      • produces any signal: firstWithSignal (Flux|Mono)
    • triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher)
    • triggered by the start of the next publisher in a sequence of publishers: switchOnNext
  • I want to repeat an existing sequence: repeat (Flux|Mono)

  • I have an empty sequence but…

    • I want a value instead: defaultIfEmpty (Flux|Mono)

    • I want another sequence instead: switchIfEmpty (Flux|Mono)

      1
      2
      3
      4
      5
      6
      7
      Flux.just(0, 1, 2, 3)
      .filter(i -> i < 0)
      .next()
      .doOnNext(i -> log.info("Exist a item: {}", i))
      .switchIfEmpty(Mono.just(-1))
      .map(Objects::toString)
      .subscribe(log::info);
  • I have a sequence but I am not interested in values: ignoreElements (Flux.ignoreElements()|Mono.ignoreElement())

    • …and I want the completion represented as a Mono: then (Flux|Mono)
    • …and I want to wait for another task to complete at the end: thenEmpty (Flux|Mono)
    • …and I want to switch to another Mono at the end: Mono#then(mono)
    • …and I want to emit a single value at the end: Mono#thenReturn(T)
    • …and I want to switch to a Flux at the end: thenMany (Flux|Mono)

Filtering a Sequence

方法 注释
take(long n) Take only the first N values from this Flux, if available.
takeLast(long n) Emit the last N values this Flux emitted before its completion.
last() Emit the last element observed before complete signal as a Mono, or emit NoSuchElementException error if the source was empty.
last(T defaultValue) Emit the last element observed before complete signal as a Mono, or emit the defaultValue if the source was empty.

Peeking into a Sequence

Reactive callback

方法 入参 注释
doOnSubscribe Consumer<? super Subscription> Add behavior triggered when the Mono is subscribed.
doOnCancel Runnable Add behavior triggered when the Mono is cancelled.
doOnRequest LongConsumer Add behavior triggering a LongConsumer when the Mono receives any request.
doOnNext Consumer<? super T> Add behavior triggered when the Mono emits a data successfully.
do on Complete …
doOnComplete Runnable Add behavior triggered when the Flux completes successfully.
doOnSuccess Consumer<? super T> Add behavior triggered when the Mono completes successfully.
* null : completed without data
* T: completed with data
doOnError Consumer<? super Throwable>
Class<E>, Consumer<? super E>
Predicate<? super Throwable>, Consumer<? super Throwable>
Add behavior triggered when the Mono completes with an error.
doOnTerminate Runnable completion or error
doAfterTerminate Runnable completion or error but after it has been propagated downstream
doFinally Consumer<SignalType> any terminating condition (complete, error, cancel).
doOnSuccessOrError Deprecated, will be removed in 3.5.0. Prefer using doOnNext(Consumer), doOnError(Consumer), doOnTerminate(Runnable) or doOnSuccess(Consumer).
Add behavior triggered when the Mono terminates, either by completing successfully or with an error.
* null, null : completing without data
* T, null : completing with data
* null, Throwable : failing with/without data
doAfterSuccessOrError Deprecated, will be removed in 3.5.0. Prefer using doAfterTerminate(Runnable) or doFinally(Consumer).

Add behavior triggered after the Mono terminates, either by completing downstream successfully or with an error. The arguments will be null depending on success, success with data and error:
* null, null : completed without data
* T, null : completed with data
* null, Throwable : failed with/without data
all events …
doOnEach Consumer<? super Signal<T>> I want to know of all events each represented as Signal object in a callback outside the sequence: doOnEach

调试类:

方法 注释
log Observe all Reactive Streams signals and trace them using Logger support. Default will use Level.INFO and java.util.logging. If SLF4J is available, it will be used instead.
timestamp If this Mono is valued, emit a Tuple2 pair of T1 the current clock time in millis (as a Long measured by the parallel Scheduler) and T2 the emitted data (as a T).
elapsed

Handling Errors

对于异常处理,Reactor 除了默认的立刻抛出异常的处理方式之外,还提供三类处理方式:

  • 简单记录日志(doOnError
  • recover from errors by falling back (onErrorReturnonErrorResume)
  • recover from errors by retrying (retryretryWhen)
方法 注释 描述
error Create a Mono that terminates with the specified error immediately after being subscribed to. 创建异常流。
onErrorMap Transform any error emitted by this Mono by synchronously applying a function to it.
Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given type. Otherwise let the error pass through.
Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through.
catching an exception and wrapping and re-throwing
onErrorReturn Simply emit a captured fallback value when any error is observed on this Mono.
Simply emit a captured fallback value when an error of the specified type is observed on this Mono.
Simply emit a captured fallback value when an error matching the given predicate is observed on this Mono.
catching an exception and falling back to a default value
onErrorResume Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error.
Subscribe to a fallback publisher when an error matching the given type occurs.
Subscribe to a fallback publisher when an error matching a given predicate occurs.
catching an exception and falling back to another Mono
onErrorContinue Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. https://devdojo.com/ketonemaniac/reactor-onerrorcontinue-vs-onerrorresume
retry()
retry(long)
Re-subscribes to this Mono sequence if it signals any error, indefinitely.
Re-subscribes to this Mono sequence if it signals any error, for a fixed number of times.
retrying with a simple policy (max number of attempts)
retryWhen

Mono

mono_error

Flux

flux_error

受检异常处理:

非受检异常会被 Reactor 传播,而受检异常必须被用户代码 try catch ,为了让受检异常被 Reactor 的异常传播机制和异常处理机制支持,可以使用如下步骤处理:

  1. try catch 之后,使用 Exceptions.propagate 将受检异常包装为非受检异常并重新抛出传播出去。
  2. onError 回调等异常处理操作获取到异常之后,可以调用 Exceptions.unwrap 取得原受检异常。

参考:https://projectreactor.io/docs/core/release/reference/index.html#error.handling

Sorting a Flux

flux_sort

Splitting a Flux

https://projectreactor.io/docs/core/release/reference/index.html#advanced-three-sorts-batching

延迟处理

flux_delay

转成并行流

flux_parallel

终结操作

Subscribe a Sequence

MonoFlux

mono_subscribe

订阅后可以使用 Disposable API 停止 FLux 流。

Going Back to the Synchronous World

Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as “non-blocking only” (by default parallel() and single()).

参考

Reactor 框架,实现 Reactive Streams 规范,并扩展大量特性

Appendix A: Which operator do I need?

https://zhuanlan.zhihu.com/p/35964846

Reactor 是一款基于 JVM 的完全非阻塞的响应式编程框架。它实现了 Reactive Streams 规范,具有高效的流量控制(以管理背压的形式),并扩展了大量特性,例如提供了丰富的 Operators 运算符。

先决条件

Reactor Core 需要在 Java 8 及以上版本运行。因为 Reactor 直接集成了 Java 8 的函数式 API,特别是:

  • java.util.CompletableFuture
  • java.util.stream.Stream
  • java.time.Duration

依赖安装

自 Reactor 3 开始(since reactor-core 3.0.4, with the Aluminium release train),Reactor 使用 BOM (Bill of Materials) 模型来管理依赖。BOM 将一组相关的、可以良好协作的构建(Maven Artifact)组合在一起,提供版本管理。避免构件间潜在的版本不兼容风险。

为项目引入该 BOM:

1
2
3
4
5
6
7
8
9
10
11
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>${reactor.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

然后就可以为项目添加相关依赖。注意忽略版本号,以便由 BOM 统一管理版本号(除非你想覆盖 BOM 管理的版本号):

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<!-- Reactor -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

这里 Reactor Core 传递依赖于 Reactive Stream 规范,如下:

reactor_dependencies

1
2
3
4
5
6
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
<scope>compile</scope>
</dependency>

响应式编程

Reactor Core 提供了两个可组合式异步串行 API

  • reactor.core.publisher.Mono (for [0|1] elements)
  • reactor.core.publisher.Flux (for [N] elements)

这两个类都是 org.reactivestreams.Publisher 接口的实现类:

Publisher

Reactor Core 还提供了 org.reactivestreams.Subscriber 接口的实现类,如下(还有其它子类,此处不一一例举):

Subscriber

LambdaSubscriber 的构造方法如下:

LambdaSubscriber 的构造方法

不过一般不会直接使用该实现类,而是使用 MonoFlux 提供的 subscribe 方法(如下图),并传入 Lambda 表达式语句(代码即参数),由方法的实现负责将参数封装为 Subscriber 接口的实现类,供消费使用:

mono_subscribe

响应式编程,如 Reactor 旨在解决 JVM 上传统异步编程带来的缺点、以及编程范式上从命令式过渡到响应式编程:

阻塞带来的性能浪费

现代的应用程序通常有大量并发请求。即使现代的硬件性能不断提高,软件性能仍然是关键瓶颈。

广义上讲,有两种方法可以提高程序的性能:

  • 利用并行(parallel)使用更多 CPU 线程和更多硬件资源。
  • 提升现有资源的利用率。

通常,Java 开发者使用阻塞方式来编写程序。除非达到性能瓶颈,否则这种做法可行。之后,通过增加线程数,运行类似的阻塞代码。 但这种方式很快就会导致资源争用和并发问题

更糟糕的是,阻塞会浪费资源。试想一下,程序一旦遇到一些延迟(特别是 I/O 操作,例如数据库请求或网络请求),就会挂起线程,从而导致资源浪费,因为大量线程处于空闲状态,等待数据,甚至导致资源耗尽。尽管使用池化技术可以提升资源利用率、避免资源耗尽,但只能缓解而不能解决根本问题,而且池内资源同样有耗尽的问题。

因此,并行技术并非银弹。

传统异步编程带来的缺点

Java 提供了两种异步编程模型:

  • Callbacks: 异步方法没有返回值,但是带有一个额外的 callback 回调参数(值为 Lambda 表达式或匿名类),回调参数在结果可用时被调用。
  • Futures: 异步方法调用后立即返回一个 Future<T> 对象。异步方法负责计算出结果值 T,由 Future 对象包装起来。该结果值并非立即可用,可以轮询该 Future 对象,直到结果值可用为止。这种方式顺序执行代码,将异步编程模型转为同步编程模型。ExecutorService 提供了方法 <T> Future<T> submit(Callable<T> task)

两种模型都有一个共同的缺点:难以组合代码,从而导致代码可读性差、难以维护。例如,当业务逻辑复杂,步骤存在依赖关系时,会导致回调嵌套过深,从而导致著名的 Callback Hell 问题。

详见代码例子。

从命令式过渡到响应式编程

from_imperative_to_reactive_programming

参考

Reactor 框架,实现 Reactive Streams 规范,并扩展大量特性

历史

响应式编程(Reactive Programming)概念最早于上世纪九十年代被提出,微软为 .NET 生态开发了 Reactive Extensions (Rx) 库用于支持响应式编程,后来 Netflix 开发了 RxJava,为 JVM 生态实现了响应式编程。随着时间的推移,2015 年 Reactive Stream(响应式流)规范诞生,为 JVM 上的响应式编程定义了一组接口和交互规则。RxJava 从 RxJava 2 开始实现 Reactive Stream 规范。同时 MongoDB、Reactor、Slick 等也相继实现了 Reactive Stream 规范。

Spring Framework 5 推出了响应式 Web 框架

Java 9 引入了响应式编程的 API,将 Reactive Stream 规范定义的四个接口集成到了 java.util.concurrent.Flow 类中。Java 9 提供了 SubmissionPublisherConsumerSubscriber 两个默认实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--------------------------+     +-------------+     +------------------+     +-------------------------------+
| Reactive Extensions (Rx) | | RxJava 1.x | | Reactive Streams | | RxJava 2 |
| by Microsoft +-----> by Netflix +-----> Specification +-----> (Supporting Reactive Streams) |
| for .NET | | for Java 6+ | | for JVM | | for Java 6+ |
+--------------------------+ +-------------+ +------------------+ +-------------------------------+
|
|
|
+-----------------------+ +--------------------+ +---------------v---------------+
| Java 9 Standard | | Spring Framework 5 | | Project Reactor |
| (JEP-266 by Doug Lea) <-----+ Reactive Stack <-----+ (Supporting Reactive Streams) |
| | | | | for Java 8+ |
+-----------------------+ +--------------------+ +-------------------------------+

定义

响应式编程(Reactive Programing)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。

设计思想

响应式编程范式通常在面向对象语言中作为观察者模式的扩展出现。可以将其与大家熟知的迭代器模式作对比,主要区别在于:

迭代器(Iterator) 响应式流(Reactive Stream)
设计模式 迭代器模式 观察者模式
数据方向 拉模式(PULL) 推模式(PUSH)
获取数据 T next() onNext(T)
处理完成 hasNext() onCompleted()
异常处理 throws Exception onError(Exception)

Java 8 引入了 Stream 用于流的操作,Java 9 引入的 Flow 也是数据流的操作。相比之下:

  • Stream 更侧重于流的过滤、映射、整合、收集,使用的是 PULL 模式。
  • 而 Flow/RxJava/Reactor 更侧重于流的产生与消费,使用的是 PUSH 模式 。

迭代器模式

参考《Iterator API 总结

观察者模式

观察者模式是一种行为型设计模式,允许你定义一种订阅机制,可在对象事件发生时主动通知多个 “观察” 该对象的其它对象。

Observer

在响应式流中,上述操作由 Publisher-Subscriber 负责。由 Publisher 生产新值并推送给 Subscriber,这个“推送”就是响应式的关键,亦即“变化传递(propagation of change)”。另外,应用于被推送值的操作(Operator)是“声明式”而不是“命令式”的:开发者表达的是计算逻辑,而不是描述其具体的控制流程。

流程如下:

1
onNext x 0..N [onError | onComplete]

Reactive Streams 规范

依赖

Reactive Stream(响应式流)规范的 Maven 依赖如下:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
</dependency>

核心接口

整个依赖包中,仅仅定义了四个核心接口:

  • org.reactivestreams.Subscription 接口定义了连接发布者和订阅者的方法;
  • org.reactivestreams.Publisher<T> 接口定义了发布者的方法;
  • org.reactivestreams.Subscriber<T> 接口定义了订阅者的方法;
  • org.reactivestreams.Processor<T,R> 接口定义了处理器;

org.reactivestreams

接口交互流程

简要交互如下:

process_ofreactive_stream

API 交互如下:

process_of_reactive_stream_2

参考

《Reactive Java Programming》

Reactive Streams 规范

ReactiveX 系列

Reactor 框架

Spring

  • https://spring.io/reactive
  • Web on Reactive Stack - Spring
    • 基于 Reactor 框架实现
    • 默认基于 Netty 作为应用服务器
    • 好处:能够以固定的线程来处理高并发(充分发挥机器的性能)
    • 提供 API:
      • Spring WebFlux
      • WebClient
      • WebSockets
      • Testing
      • RSocket
      • Reactive Libraries

http://openjdk.java.net/jeps/266

Timer & TimerTask

java.util.Timer 是 JDK 中提供的一个定时器工具类,使用的时候会在主线程之外起一个单独的线程执行指定的定时任务 java.util.TimerTask,可以指定执行一次或者反复执行多次。

java.util.TimerTask 是一个实现了 java.lang.Runnable 接口的抽象类,代表一个可以被 Timer 执行的任务。

TimerTask

用法如下(参考 ZooKeeper 源码):

TimerTask usage

ScheduledExecutorService

⚠️ 注意:尽量别用 java.util.TimerTask,如要用一定要捕获处理好异常,一般建议使用 java.util.concurrent.ScheduledExecutorService 代替:

Executor

参考《阿里巴巴 Java 开发手册》:

TimerTask problem

用法如下(参考 Ribbon 源码 com.netflix.loadbalancer.PollingServerListUpdater):

1
2
3
4
5
6
7
8
9
10
11
ScheduledThreadPoolExecutor _serverListRefreshExecutor = Executors.newScheduledThreadPool(POOL_SIZE, new ThreadFactoryBuilder()
.setNameFormat("PollingServerListUpdater-%d")
.setDaemon(true)
.build()
);
_serverListRefreshExecutor.scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);

参考

JDK 中的 Timer 和 TimerTask 详解

面试官:知道时间轮算法吗?在 Netty 和 Kafka 中如何应用的?

背景

为什么要用线程池?

线程的创建和销毁是有代价的。

如果请求的到达率非常高且请求的处理过程是轻量级的,那么为每个请求创建一个新线程将消耗大量的计算资源。

活跃的线程会消耗系统资源,尤其是内存。大量空闲线程会占用许多内存,给垃圾回收器带来压力,而且大量线程竞争 CPU 资源还会产生其它的性能开销。

可创建线程的数量上存在限制,如果创建太多线程,会使系统饱和甚至抛出 OutOfMemoryException

问题如下:

no_thread_pool_design

为了解决以上问题,从 Java 5 开始 JDK 并发 API 提供了 Executor Framework,用于将任务的创建与执行分离,避免使用者直接与 Thread 对象打交道,通过池化设计与阻塞队列保护系统资源:

thread_pool_design

使用 Executor Framework 的第一步就是创建一个 ThreadPoolExecutor 类的对象。你可以使用这个类提供的 四个构造方法Executors 工厂类来创建 ThreadPoolExecutor 。一旦有了执行器,你就可以提交 RunnableCallable 对象给执行器来执行。

自定义线程池

继承关系

Executor 接口的实现类如下:

subtypes_of_Executor

其中,ThreadPoolExecutor 类实现了两个核心接口 ExecutorExecutorService,方法如下:

ThreadPoolExecutor

成员变量

ThreadPoolExecutor 类的成员变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 线程池使用一个int变量存储线程池状态和工作线程数
* int4个字节,32位,用高三位存储线程池状态,低29位存储工作线程数
* 为什么使用一个变量来同时表示线程状态和线程数?就是节省空间。咨询了一下写c的朋友,他们经常这么写
**/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
//理论上线程池最大线程数量CAPACITY=(2^29)-1,即 536,870,911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//初始化ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

/**
* 线程池状态转换
* RUNNING -> SHUTDOWN
* RUNNING or SHUTDOWN -> STOP
* SHUTDOWN or STOP -> TIDYING
* TIDYING -> TERMINATED terminated()执行完后变为该TERMINATED
*/
//接受新任务,可以处理阻塞队列里的任务
private static final int RUNNING = -1 << COUNT_BITS;
//不接受新任务,可以处理阻塞队列里的任务。执行shutdown()会变为SHUTDOWN
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务。执行shutdownNow()会变为STOP
private static final int STOP = 1 << COUNT_BITS;
//临时过渡状态,所有的任务都执行完了,当前线程池有效的线程数量为0,这个时候线程池的状态是TIDYING,执行terminated()变为TERMINATED
private static final int TIDYING = 2 << COUNT_BITS;
//终止状态,terminated()调用完成后的状态
private static final int TERMINATED = 3 << COUNT_BITS;

//重入锁,更新线程池核心大小、线程池最大大小等都有用到
private final ReentrantLock mainLock = new ReentrantLock();
//用于存储woker
private final HashSet<Worker> workers = new HashSet<Worker>();
//用于终止线程池
private final Condition termination = mainLock.newCondition();
//记录线程池中曾经出现过的最大线程数
private int largestPoolSize;
//完成任务数量
private long completedTaskCount;

/**
* 核心线程数
* 核心线程会一直存活,即使没有任务需要处理,当线程数小于核心线程数时。
* 即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。
* 核心线程数在初始化时不会创建,只有提交任务的时候才会创建。核心线程在allowCoreThreadTimeout为true的时候超时会退出。
*/
private volatile int corePoolSize;
/** 最大线程数
* 当线程数大于或者等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。
* 如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会采取拒绝操作。
*/
private volatile int maximumPoolSize;
/**
* 线程空闲时间
* 当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。
* 如果allowCoreThreadTimeout设置为true,则所有线程均会退出。
*/
private volatile long keepAliveTime;
//是否允许核心线程空闲超时退出,默认值为false。
private volatile boolean allowCoreThreadTimeOut;
//线程工厂
private volatile ThreadFactory threadFactory;
//用于保存等待执行的任务的阻塞队列。比如LinkedBlockQueue,SynchronousQueue等
private final BlockingQueue<Runnable> workQueue;
/**
* rejectedExecutionHandler:任务拒绝策略
* DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
* AbortPolicy:抛出异常。这也是默认的策略
* CallerRunsPolicy:用调用者所在线程来运行任务
* DiscardPolicy:不处理,丢弃掉
*/
private volatile RejectedExecutionHandler handler;
//默认的拒绝策略:抛出异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");

ctl

作为一个线程池,首先有两个关键属性:

  • 线程池状态 runState
  • 工作线程数 workerCnt

这两个关键属性保存在名为 ctlAtomicInteger 类型属性之中,高 3 位表示 runState,低 29 位表示 workerCnt,如下:

ctl

为什么要用 3 位来表示线程池的状态呢,原因是线程池一共有 5 种状态,而 2 位只能表示出 4 种情况,所以至少需要 3 位才能表示得了 5 种状态,如下:

1
2
3
4
5
6
7
runState workerCnt                       runState workerCnt
000 00000000000000000000000000000 SHUTDOWN empty
‭‭ 001 00000000000000000000000000000 STOP empty
010 00000000000000000000000000000 TIDYING empty
‭011 00000000000000000000000000000‬ TERMINATED empty
111 00000000000000000000000000000 RUNNING empty
‭ 111 11111111111111111111111111111 RUNNING full

通过 ctlOf 方法初始化 ctl 属性:

1
2
3
4
5
6
7
8
9
10
11
// 初始化ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 或运算符(|)规则:1|1=1
// 1|0=1
// 0|1=1
// 0|0=0
// 以初始化参数 ctlOf(RUNNING, 0) 为例:
11100000000000000000000000000000
| 00000000000000000000000000000000
= 11100000000000000000000000000000

通过 runStateOf 方法获取线程池状态 runState

1
2
3
4
5
6
7
8
9
10
// 获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }

// 取反运算符(~)规则:~1=0
// ~0=1
// 以 c = 111 11111111111111111111111111111(RUNNING full)为例:
~11111111111111111111111111111
= 00000000000000000000000000000
& 111 11111111111111111111111111111
= 111

通过 workerCountOf 方法获取工作线程数 workerCnt

1
2
3
4
5
6
7
8
9
10
11
// 获取工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }

// 与运算符(&)规则:1&1=1
// 1&0=0
// 0&1=0
// 0&0=0
// 以 c = 111 11111111111111111111111111111(RUNNING full)为例:
111 11111111111111111111111111111
& 11111111111111111111111111111
= 11111111111111111111111111111

线程池状态

线程池状态用于标识线程池内部的一些运行情况,线程池的开启到关闭的过程就是线程池状态的一个流转的过程。

线程池共有五种状态:

run_state

状态 runState 含义
RUNNING 111 运行状态,该状态下线程池可以接受新的任务,也可以处理阻塞队列中的任务。
执行 shutdown 方法可进入 SHUTDOWN 状态。
执行 shutdownNow 方法可进入 STOP 状态。
SHUTDOWN 000 待关闭状态,不再接受新的任务,继续处理阻塞队列中的任务。
当阻塞队列中的任务为空,并且工作线程数为 0 时,进入 TIDYING 状态。
STOP 001 停止状态,不接收新任务,也不处理阻塞队列中的任务,并且会尝试结束执行中的任务。
当工作线程数为 0 时,进入 TIDYING 状态。
TIDYING 010 整理状态,此时任务都已经执行完毕,并且也没有工作线程 执行 terminated 方法后进入 TERMINATED 状态。
TERMINATED 011 终止状态,此时线程池完全终止了,并完成了所有资源的释放。

工作线程数

尽管理论上线程池最大线程数量可达 CAPACITY 数,但是实际上都会通过 maximumPoolSize 限制最大线程数。因此工作线程数 workerCnt 的个数可能在 0 至 maximumPoolSize 之间变化。

当工作线程的空闲时间达到 keepAliveTime,该工作线程会退出,直到工作线程数 workerCnt 等于 corePoolSize。如果 allowCoreThreadTimeout 设置为 true,则所有工作线程均会退出。

worker_count

注意:

  • 整个线程池的基本执行过程:创建核心线程(Core Thread) > 任务排队 > 创建临时线程(Temp Thread)。
  • 如果将 maximumPoolSize 设置为无界值(如 Integer.MAX_VALUE),可能会创建大量的线程,从而导致 OOM。因此务必要限定 maximumPoolSize 的大小。
  • 如果将 corePoolSizemaximumPoolSize 设置为相同值,则创建了 Fixed 固定大小的线程池,无法弹性扩容,只能排队。

线程工厂

通过提供不同的 ThreadFactory 接口实现,可以定制被创建线程 Thread属性ThreadFactory 有几种创建方式:

1、完全自定义方式。缺点是需要在 newThread 方法中实现的代码较多:

1
2
3
4
5
6
7
8
ThreadFactory threadFactory = runnable -> {
Thread t = new Thread(runnable);
t.setName("wechat-notify-1");
t.setDaemon(false);
t.setPriority(1);
t.setUncaughtExceptionHandler((thread, exception) -> {});
return t;
};

2、使用 Executors 工具类提供的方法:

1
ThreadFactory threadFactory = Executors.defaultThreadFactory();

这也是 Executors 工具类提供的几种默认线程池所使用的 ThreadFactory

  • 缺点:只能使用默认属性,不提供任何定制参数,无法修改。
  • 优点:实现了基本的线程名称自增。

3、使用 Guava 提供的 ThreadFactoryBuilder。优点是可以轻松定制四个线程属性,且支持线程名称自增:

1
2
3
4
5
6
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("wechat-notify-%d")
.setDaemon(false)
.setPriority(1)
.setUncaughtExceptionHandler((thread, exception) -> {})
.build();

该实现如下,如果未提供自定义的 ThreadFactory,将基于 Executors.defaultThreadFactory() 进行二次修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static ThreadFactory build(ThreadFactoryBuilder builder) {
final String nameFormat = builder.nameFormat;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
final ThreadFactory backingThreadFactory = (builder.backingThreadFactory != null)
? builder.backingThreadFactory
: Executors.defaultThreadFactory();
final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;

return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = backingThreadFactory.newThread(runnable);
if (nameFormat != null) {
thread.setName(format(nameFormat, count.getAndIncrement()));
}
if (daemon != null) {
thread.setDaemon(daemon);
}
if (priority != null) {
thread.setPriority(priority);
}
if (uncaughtExceptionHandler != null) {
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
return thread;
}
};
}

阻塞队列

阻塞队列的使用详见另一篇《Java 集合框架系列(八)并发实现总结》。

work_queue

拒绝策略

拒绝策略,默认有四种实现:

  • AbortPolicy:抛出异常,默认的策略。
  • DiscardPolicy:不处理,丢弃掉。
  • DiscardOldestPolicy:丢弃队列中最近的一个任务,并执行该任务。
  • CallerRunsPolicy:用调用者所在线程来执行该任务。

RejectedExecutionHandler

通过 RejectedExecutionHandler 接口可以实现更多策略,例如记录日志或持久化不能处理的任务,或者计数并发出告警。

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

⚠️ 注意:rejectedExecution 方法是在主线程中执行的。

构造方法

java.util.concurrent.ThreadPoolExecutor 提供了四个构造方法,以参数最多的为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数校验
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

钩子方法

java.util.concurrent.ThreadPoolExecutor 提供了三个钩子方法,参考 Hook methods

This class provides protected overridable beforeExecute(Thread, Runnable) and afterExecute(Runnable, Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.

钩子方法在源码中的调用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final void runWorker(Worker w) {

...

beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}

...

}

例子:利用钩子方法清理 MDC 上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MdcAwareThreadPoolExecutor extends ThreadPoolExecutor {

public MdcAwareThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

/**
* Use ThreadPoolExecutor hooks and perform necessary cleanups after each execution.
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("Cleaning the MDC context");
MDC.clear();
org.apache.log4j.MDC.clear();
ThreadContext.clearAll();
}
}

执行流程

execute 方法的整体执行流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/

work_flow_of_execute_method

统计指标

虽然我们用上了线程池,但是该如何了解线程池的运行情况,例如有多少线程在执行、多少在队列中等待?下表提供了方法:

Modifier and Type Method and Description 备注
long getTaskCount()
Returns the approximate total number of tasks that have ever been scheduled for execution.
任务总数(已完成任务数 + 当前活跃线程数)
long getCompletedTaskCount()
Returns the approximate total number of tasks that have completed execution.
已完成任务数
int getActiveCount()
Returns the approximate number of threads that are actively executing tasks.
当前活跃中的工作线程数
int getPoolSize()
Returns the current number of threads in the pool.
当前工作线程数
int getLargestPoolSize()
Returns the largest number of threads that have ever simultaneously been in the pool.
最大工作线程数
BlockingQueue<Runnable> getQueue()
Returns the task queue used by this executor. Access to the task queue is intended primarily for debugging and monitoring.
当前排队数
1
2
3
4
5
6
7
logger.info("{}, taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
threadPoolExecutor.getThreadNamePrefix(),
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size()
);

使用工厂类创建线程池

java.util.concurrent.ThreadPoolExecutor 提供了四个不同的构造方法,但由于它们的复杂性(参数较多),Java 并发 API 提供了 java.util.concurrent.Executors 工厂类来简化线程池的构造,常用方法如下:

1
2
3
4
5
6
7
8
// 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(...) {...}
// 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor(...) {...}
// 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool(...) {...}
// 创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(...) {...}

但是这种方式并不推荐使用,参考《阿里巴巴 Java 开发手册》:

principal of executors

java.util.concurrent.Executors 源码分析如下,首先是 newFixedThreadPool(...)newSingleThreadExecutor(...)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Fixed 限定 corePoolSize 和 maximumPoolSize 为相同大小,即线程池大小固定(意味着无法扩展)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

// Single 其实就是 Fixed 为 1 的变种
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

上述方法中,关键在于对 java.util.concurrent.LinkedBlockingQueue 的构造,使用了默认的无参构造方法:

1
2
3
4
// ⚠️ 允许的请求队列长度(capacity)为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

然后是 newCachedThreadPool(...)newScheduledThreadPool(...)

1
2
3
4
5
6
7
8
9
10
11
// ⚠️ 允许的创建线程数量(maximumPoolSize)为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

// 问题在于 ScheduledThreadPoolExecutor 构造方法的默认参数
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

看下 java.util.concurrent.ScheduledThreadPoolExecutor 的构造方法:

1
2
3
4
5
// ⚠️ 允许的创建线程数量(maximumPoolSize)为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

⚠️ 注意:ScheduledThreadPoolExecutor 使用的是 DelayedWorkQueue 队列,这个队列是无界的(无法设置 capacity),也就是说 maximumPoolSize 的设置其实是没有什么意义的。

  • corePoolSize 设置的太小,会导致并发任务量大时,延迟任务得不到及时处理,造成阻塞。
  • corePoolSize 设置的太大,会导致并发任务量少时,造成大量的线程资源浪费。

参考

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html

如何合理估算 Java 线程池大小:综合指南

如何合理地估算线程池大小?

线程池执行的任务抛出异常会怎样?

Java 线程池实现原理,及其在美团业务中的实践

美团动态线程池实践思路及代码

案例分析|线程池相关故障梳理&总结 | 阿里技术

线程操纵术之更优雅的并行策略——Fork/Join | 阿里技术

工作中常用到一些并发编程类,这里做一些总结。

JDK 中涉及到线程的包如下:

java.lang

内含基础并发类。

Runnable

无返回结果的异步任务。

Thread

程序中的执行线程。

属性

Thread 对象中保存了一些属性能够帮助我们来辨别每一个线程,知道它的状态,调整控制其优先级等:

ID

每个线程的独特标识。

Name

线程的名称。

Priority

线程对象的优先级。优先级别在 1-10 之间,1 是最低级,10 是最高级。不建议改变它们的优先级。

Daemon

是否为守护线程。

Java 有一种特别的线程叫做守护线程。这种线程的优先级非常低,通常在程序里没有其他线程运行时才会执行它。当守护线程是程序里唯一在运行的线程时,JVM 会结束守护线程并终止程序。

根据这些特点,守护线程通常用于在同一程序里给普通线程(也叫使用者线程)提供服务。它们通常无限循环的等待服务请求或执行线程任务。它们不能做重要的任务,因为我们不知道什么时候会被分配到 CPU 时间片,并且只要没有其他线程在运行,它们可能随时被终止。JAVA中最典型的这种类型代表就是垃圾回收器 GC

只能在 start() 方法之前可以调用 setDaemon() 方法。一旦线程运行了,就不能修改守护状态。

可以使用 isDaemon() 方法来检查线程是否是守护线程。

Thread.UncaughtExceptionHandler

用于捕获和处理线程对象抛出的 Unchecked Exception 来避免程序终结。

Thread.State

线程的状态,共六种:
NEW
RUNNABLE
BLOCKED
WAITING
TIME_WAITING
TERMINATED

方法

Thread 类提供了以下几类方法

  • 线程睡眠 Thread.sleep(...)
  • 线程中断 Thread.interrupt()
  • 线程让步 Thread.yield()
  • 线程合并 Thread.join(...)
  • ……

Object 提供了一组线程协作方法:

  • 线程协作 Object.wait/notify

Thread state

ThreadLocal<T>

ThreadLocal 存放的值是线程内共享的,线程间互斥的,主要用于在线程内共享一些数据。

try-with-resources

可以通过实现 AutoCloseable 以使用 try-with-resources 语法简化 ThreadLocal 资源清理:

1
2
3
try (ChannelContext ctx = new ChannelContext(channel)) {
...
}

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class ChannelContext implements AutoCloseable {

private static final ThreadLocal<Channel> CTX = new ThreadLocal<>();

public ChannelContext(FundChannelDTO dto) {
Channel channel = Channel.builder()
.appId(dto.getAppId().toString())
.build();
CTX.set(channel);
}

public ChannelContext(Channel channel) {
CTX.set(channel);
}

public static Channel get() {
return CTX.get();
}

@Override
public void close() {
try {
CTX.remove();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Getter
@Builder
static class Channel {
private final String appId;
}

}

父子线程的值传递

https://docs.oracle.com/javase/8/docs/api/java/lang/InheritableThreadLocal.html

异步执行的上下文传递

https://github.com/alibaba/transmittable-thread-local

java.util.concurrent

JDK 5 引入的 Executor Framework ,用于取代传统的并发编程。

Package concurrent

Callable

有返回结果的异步任务。

Executor Framework 的一个重要优点是提供了 java.util.concurrent.Callable<V> 接口用于返回异步任务的结果。它的用法跟 Runnable 接口很相似,但它提供了两种改进:

  • Callable 接口中主要的方法叫 call() ,可以返回结果。

    Callable

  • 当你将 Callable 对象 submitExecutor 执行者,你可以获取一个实现 Future 对象,你可以用这个对象来控制和获取 Callable 对象的状态和结果。

    ThreadPoolExecutor

工具类

CountDownLatch

CyclicBarrier

Phaser

CompletableFuture

Semaphore

Exchanger

Executors

线程池

参考另一篇《并发编程系列(三)Java 线程池总结》。

并发集合

详见另一个篇《Java 集合框架系列(九)并发实现总结》。

显式锁

java.util.concurrent.locks

用于实现线程安全与通信。

Package locks

原子类

java.util.concurrent.atomic

使用这些数据结构可以避免在并发程序中使用同步代码块(synchronized 或 Lock)。

Package atomic

JDK 5 新增的原子类,底层基于魔术类 Unsafe 进行 CAS 无锁操作。实现类按功能分组如下:

Integer Long Boolean 引用类型
基本类 AtomicInteger AtomicLong AtomicBoolean
引用类型 AtomicReference
AtomicStampedReference
AtomicMarkableReference
数组类型 AtomicIntegerArray AtomicLongArray AtomicReferenceArray
属性原子修改器 AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater

JDK 8 新增 Striped64 累加计数器这个并发组件,64 指的是计数 64 bit 的数,即 LongDouble 类型。其实现类如下:

Long Double
LongAdder DoubleAdder
LongAccumulator DoubleAccumulator

性能对比参考:http://www.manongjc.com/article/105666.html

Spring 包简介

Task Execution and Scheduling - Spring Framework

org.springframework.scheduling

Spring Framework 中并发编程相关的类主要位于 spring-context 下的 org.springframework.scheduling,例如其子包 concurrent

org.springframework.scheduling.concurrent

其中,顶层的 org.springframework.scheduling.concurrent.CustomizableThreadFactory 结构如下:

org.springframework.util.CustomizableThreadFactory

  • CustomizableThreadFactory 实现了 java.util.concurrent.ThreadFactory 线程工厂接口,源码如下:

    1
    2
    3
    4
    // Executors.defaultThreadFactory 方法提供了一个实用的简单实现,为新线程设置了上下文,详见源码
    public interface ThreadFactory {
    Thread newThread(Runnable r);
    }
  • CustomizableThreadFactory 继承了 org.springframework.util.CustomizableThreadCreator 类,用于创建新线程,并提供各种线程属性自定义配置(如线程名前缀、线程优先级等)。

然后重点看下最常用的 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 类,提供的方法列表如下:

ThreadPoolTaskExecutor 方法列表

当我们在实例化 ThreadPoolTaskExecutor 对象之后,其变量如下:

ThreadPoolTaskExecutor variables

其调用堆栈如下:

可见,实际上是先调用了抽象父类 ExecutorConfigurationSupportafterPropertiesSet()initialize() 方法,最后再调用 ThreadPoolTaskExecutor#initializeExecutor(...),该方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
super.execute(taskDecorator.decorate(command));
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);

}

if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}

this.threadPoolExecutor = executor;
return executor;
}

实际上就是通过构造方法实例化 java.util.concurrent.ThreadPoolExecutor 对象,并设置相应参数。

数据特征

数据具有两种特征:

静态特征,指的是数据结构、数据间的联系、对数据取值范围的约束。

动态特征,指的是对数据可以进行符合一定规则的操作

数据模型

组成要素

composition_of_data_model

模型分类

classification_of_data_model

关系数据模型

关系数据结构

data_structure_of_relational_data_model

关系的完整性约束

data_check_of_relational_data_model

关系数据语言

关系语言是一种声明式的查询语言,基于声明式编程范式(Declarative),有别于命令式编程范式(Imperative),特点(优点)是:高度非过程化

relational_language_of_relational_data_model

关系数据库的规范化理论

normalized_form

normalized_form

normalized_form

参考

数学符号表(维基百科)

关系代数(维基百科)

关系代数(百度百度)

SQL 能完成哪方面的计算?一文详解关系代数和 SQL 语法 | 阿里技术

雪花算法介绍

雪花算法(Snowflake)是 Twitter 提出来的一个算法,其目的是生成一个 64 bit 的二进制数:

Snowflake

转换为十进制和十六进制分别如下图。其中十进制的最大长度为 19 位:

该二进制数分解如下:

  • 42 bit:用来记录时间戳,表示自 1970-01-01T00:00:00Z 之后经过的毫秒数(millisecond),其中 1 bit 是符号位。由于精确到毫秒(millisecond),相比有符号 32 bit 所存储的精度为秒(second)的时间戳需要多 10 bit 来记录毫秒数(0-999)。42 bit 可以记录 69 年,如果设置好起始时间例如 2018 年,那么可以用到 2087 年。42 bit 时间戳范围如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 0
    long a = 0b00_00000000_00000000_00000000_00000000_00000000L;
    // 999
    long b = 0b00_00000000_00000000_00000000_00000011_11100111L;
    // 2^41-1, 2199023255551
    long c = 0b01_11111111_11111111_11111111_11111111_11111111L;

    // 1970-01-01T00:00:00.000Z
    Instant.ofEpochMilli(a).atZone(ZoneOffset.of("-00:00")).toLocalDateTime();
    // 1970-01-01T00:00:00.999Z
    Instant.ofEpochMilli(b).atZone(ZoneOffset.of("-00:00")).toLocalDateTime();
    // 2039-09-07T15:47:35.551Z
    Instant.ofEpochMilli(c).atZone(ZoneOffset.of("-00:00")).toLocalDateTime();
  • 10 bit:用来记录机器 ID,最多可以记录 2^10-1=1023 台机器,一般用前 5 位代表数据中心,后面 5 位是某个数据中心的机器 ID

  • 12 bit:循环位,用来对同一个毫秒之内产生的 ID,12 位最多可以记录 2^12-1=4095 个,也就是在同一个机器同一毫秒最多记录 4095 个,多余的需要进行等待下个毫秒。

上面只是一个将 64 bit 划分的标准,当然也不一定这么做,可以根据不同业务的具体场景来划分,比如下面给出一个业务场景:

  • 服务目前 QPS10 万,预计几年之内会发展到百万。
  • 当前机器三地部署,上海,北京,深圳都有。
  • 当前机器 10 台左右,预计未来会增加至百台。

这个时候我们根据上面的场景可以再次合理的划分 64 bit,QPS几年之内会发展到百万,那么每毫秒就是千级的请求,目前10台机器那么每台机器承担百级的请求(100 W / 1000 ms / 10 台)。为了保证扩展,后面的循环位可以限制到 1024,也就是 2^10,那么循环位 10 位就足够了。

机器三地部署我们可以用 3 bit 总共 8 来表示机房位置;当前机器 10 台,为了保证能够扩展到百台那么可以用 7 bit 总共 128 来表示;时间位依然是 42 bit。那么还剩下 64-10-3-7-42 = 2 bit,剩下 2 bit 可以用来进行扩展。

雪花算法实现

下面是我的对雪花算法的实现,涉及几点思考:

  1. 为了节省空间、提升运算性能,主要使用到位运算,而不是字符串操作。
  2. 为了提高可配置性,将每一部分的位数抽取成了常量,以便自定义。
  3. 解决时间回拨问题:
    • 如果回拨时长较短(可配置,代码中配了 5 ms),线程等待并重试即可;
    • 如果回拨时长较长,则利用扩展位避免生成重复 ID(扩展位可配,代码中配置了 3 位,即最多支持 3 次回拨,超出则抛异常)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;

import static org.junit.Assert.assertEquals;

@Slf4j
public class IdGeneratorTest {

@Test
public void test() {
int initialCapacity = 1000;
Set<Long> ids = new HashSet<>(initialCapacity);
IdGenerator idGenerator = new IdGenerator(3, 31);

for (int i = 0; i < initialCapacity; i++) {
long id = idGenerator.nextId();
ids.add(id);

IdGenerator.log(id);
}
assertEquals(ids.size(), initialCapacity);
}

}

@Slf4j
class IdGenerator {

// 时间戳
private long timestamp;
// 数据中心
private int dataCenterNo;
// 机器
private int workerNo;
// 序列号
private int seqNo;
// 扩展位
private int ext;

// 数据中心 取值范围(十进制):0~3
private static final int DATA_CENTER_NO_BITS = 2;
// 机器 取值范围(十进制):0~31
private static final int WORKER_NO_BITS = 5;
// 序列号 取值范围(十进制):0~4095
private static final int SEQ_NO_BITS = 12;
// 扩展位 取值范围(十进制):0~7
private static final int EXT_BITS = 3;

// 数据中心 最大值(十进制):3
private static final int MAX_DATA_CENTER_NO = (1 << DATA_CENTER_NO_BITS) - 0B1;
// 机器 最大值(十进制):31
private static final int MAX_WORKER_NO = (1 << WORKER_NO_BITS) - 0B1;
// 序列号 最大值(十进制):4095
private static final int MAX_SEQ_NO = (1 << SEQ_NO_BITS) - 0B1;
// 扩展位 最大值(十进制):7
private static final int MAX_EXT = (1 << EXT_BITS) - 0B1;

// 时间戳 左移 22 位
private static final int TIMESTAMP_SHIFT = DATA_CENTER_NO_BITS + WORKER_NO_BITS + SEQ_NO_BITS + EXT_BITS;
// 数据中心 左移 20 位
private static final int DATA_CENTER_NO_SHIFT = WORKER_NO_BITS + SEQ_NO_BITS + EXT_BITS;
// 机器 左移 15 位
private static final int WORKER_NO_SHIFT = SEQ_NO_BITS + EXT_BITS;
// 序列号 左移 3 位
private static final int SEQ_NO_SHIFT = EXT_BITS;

// 最大回拨毫秒数
private static final int MAX_BACKWARD_MILLIS = 5;

/**
*
* @param dataCenterNo 数据中心编号
* @param workerNo 机器编号
*/
public IdGenerator(int dataCenterNo, int workerNo) {
if (Integer.toBinaryString(dataCenterNo).length() > DATA_CENTER_NO_BITS) {
throw new IllegalArgumentException(String.format("当前数据中心编号 %s 超限,最大支持 %s", dataCenterNo, MAX_DATA_CENTER_NO));
} else if (Integer.toBinaryString(workerNo).length() > WORKER_NO_BITS) {
throw new IllegalArgumentException(String.format("当前机器编号 %s 超限,最大支持 %s", workerNo, MAX_WORKER_NO));
}
this.dataCenterNo = dataCenterNo;
this.workerNo = workerNo;
}

/**
* 生成分布式 ID
* @return 分布式 ID
*/
public synchronized long nextId() {
long now = System.currentTimeMillis();
// init or reset
if (timestamp == 0 || timestamp < now) {
timestamp = now;
seqNo = 0;
}
// seqNo increment
else if (timestamp == now) {
if (seqNo < MAX_SEQ_NO) {
seqNo++;
} else {
log.warn("序列号已耗尽,等待重新生成。seqNo = {}, MAX_SEQ_NO = {}", seqNo, MAX_SEQ_NO);
return sleepAndNextId(1);
}
}
// clock backward
else {
return nextIdForBackward(now);
}

return timestamp << TIMESTAMP_SHIFT
| dataCenterNo << DATA_CENTER_NO_SHIFT
| workerNo << WORKER_NO_SHIFT
| seqNo << SEQ_NO_SHIFT
| ext;
}

private long nextIdForBackward(long now) {
log.warn("发生时间回拨,timestamp = {}, now = {}", timestamp, now);

long duration = timestamp - now;
// 回拨不多,直接等待并重试
if (duration <= MAX_BACKWARD_MILLIS) {
return sleepAndNextId(duration);
}
// 回拨过多,则使用扩展位
else {
if (ext < MAX_EXT) {
// 将时间戳修正为回拨时间,为防止重复生成 ID,扩展位加一,并重试
ext++;
timestamp = now;
return nextId();
} else {
throw new IllegalStateException(String.format("扩展位已耗尽。ext = %s, MAX_EXT = %s", ext, MAX_EXT));
}
}
}

@SneakyThrows
private long sleepAndNextId(long millis) {
Thread.sleep(millis);
return nextId();
}

public static void log(long id) {
long timestamp = id >> TIMESTAMP_SHIFT;
long dataCenterNo = id >> DATA_CENTER_NO_SHIFT & MAX_DATA_CENTER_NO;
long workerNo = id >> WORKER_NO_SHIFT & MAX_WORKER_NO;
long seqNo = id >> SEQ_NO_SHIFT & MAX_SEQ_NO;
long ext = id & MAX_EXT;

log.info("Binary is {}, id is {}", Long.toBinaryString(id), id);
log.info("Binary is {}, time is {}", Long.toBinaryString(timestamp), getLocalDateTime(timestamp));
log.info("Binary is {}, dataCenterNo is {}", Long.toBinaryString(dataCenterNo), dataCenterNo);
log.info("Binary is {}, workerNo is {}", Long.toBinaryString(workerNo), workerNo);
log.info("Binary is {}, seqNo is {}", Long.toBinaryString(seqNo), seqNo);
log.info("Binary is {}, ext is {}", Long.toBinaryString(ext), ext);
}

private static LocalDateTime getLocalDateTime(long timestamp) {
return Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime();
}

}

输出结果:

1
2
3
4
5
6
Binary is 110000011011010011001111001101001100000111000011000000000100000, id is 6979004485312020512
Binary is 11000001101101001100111100110100110000011, time is 2022-09-23T17:12:12.931
Binary is 10, dataCenterNo is 2
Binary is 11, workerNo is 3
Binary is 100, seqNo is 4
Binary is 0, ext is 0

位运算过程如下:

  • timestamp = id >> TIMESTAMP_SHIFT

    1
    2
       01100000 11011010 01100111 10011010 01100000 11100001 10000000 00100000
    >> 01100000 11011010 01100111 10011010 01100000 11
  • dataCenterNo = id >> DATA_CENTER_NO_SHIFT & MAX_DATA_CENTER_NO

    1
    2
    3
    4
       01100000 11011010 01100111 10011010 01100000 11100001 10000000 00100000
    >> 01100000 11011010 01100111 10011010 01100000 1110
    & 11
    = 10
  • workerNo = id >> WORKER_NO_SHIFT & MAX_WORKER_NO

    1
    2
    3
    4
       01100000 11011010 01100111 10011010 01100000 11100001 10000000 00100000
    >> 01100000 11011010 01100111 10011010 01100000 11100001 1
    & 1111 1
    = 1 1
  • seqNo = id >> SEQ_NO_SHIFT & MAX_SEQ_NO

    1
    2
    3
    4
       01100000 11011010 01100111 10011010 01100000 11100001 10000000 00100000
    >> 01100000 11011010 01100111 10011010 01100000 11100001 10000000 00100
    & 1111111 11111
    = 100
  • ext = id & MAX_EXT

    1
    2
    3
      01100000 11011010 01100111 10011010 01100000 11100001 10000000 00100000
    & 111
    = 0

延伸阅读

位运算

位运算符如下:

1
2
3
4
5
6
7
&:按位与
|:按位或
~:按位非
^:按位异或
<<:左位移(M << n = M * 2^n)
>>:右位移(M >> n = M / 2*n)
>>>:无符号右移

例如,Java 中求 key 应当放到散列表的哪个位置(offset):

1
2
3
4
5
6
7
static final int getOffset(Object key, int length) 
int hashcode;
int hash = (hashcode = key.hashCode()) ^ (hashcode >>> 16);
int offset = hash & (length - 1);
log.info("key: {}, hashcode: {}, hash: {}, offset: {}", key, Integer.toBinaryString(hashcode), Integer.toBinaryString(hash), offset);
return offset;
}

参考:

UNIX 时间戳

参考:UNIX 时间戳总结