响应式架构重构
本期议题
- 现代应用架构瓶颈
- 阻塞式应用架构重构
- 响应式应用架构重构
现代应用架构瓶颈
- 网络
- 设计架构
- 应用性能
阻塞式应用架构重构
- 并发
- 并发
- 并发
分布式架构
分布式应用都是网络
RPC(远程过程调用)
- dubbo
- netty NIO + Async
- grpc
MOM(消息中间件)
- RabbitMQ
- Mq类似
DB(数据库)
- JDBC 连接
关键字:网络
- 网卡带宽
- 1 GB
- 100 MB
- 网络运用
- < 1GB
你没有用好。你带宽有 1GB 但是没有用到。
设计架构
业务架构
因系统而异
系统架构
- 非阻塞不一定是响应式
- 响应式一定是非阻塞
应用性能
- 阻塞式 + 不合理同步 (大多数)
阻塞式
- 同步
- 等待结果返回
假如说:用户下单
用户服务 + 下单服务
100 ms
+ 100 ms
= > 200 ms
1
2
1. 通过认证 token 查询 User
2. User.id 执行下单服务
两种模式
依赖模式
C = A + B
A => B => C
并行模式
C = A | B |
例子:小明需要从数据库 A 和 数据库 B 拉去两份用户名单
原始模型 >250ms
-
阻塞
重构模型 >150ms
-
并行
Demo阻塞和并行代码对比:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class BlockingQueryDB {
public static void main(String[] args) throws ExecutionException, InterruptedException {
BlockingQueryDB blockingQueryDB = new BlockingQueryDB();
blockingMode(blockingQueryDB);
cocurrentMode(blockingQueryDB);
}
public static void cocurrentMode(BlockingQueryDB blockingQueryDB) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(2);
// futureA 保存 queryA 的结果
Future<Integer[]> futureA = executorService.submit(blockingQueryDB::queryA);
// futureB 保存 queryB 的结果
Future<Integer[]> futureB = executorService.submit(blockingQueryDB::queryB);
// 阻塞
futureA.get();
futureB.get();
executorService.shutdown();
long costTime = System.currentTimeMillis() - start;
System.out.println("【cocurrentMode】 queryA[100 ms] + queryB[150 ms] 时间:" + costTime + " ms");
}
public static void blockingMode(BlockingQueryDB blockingQueryDB) {
long start = System.currentTimeMillis();
blockingQueryDB.queryA();
blockingQueryDB.queryB();
long costTime = System.currentTimeMillis() - start;
System.out.println("【blockingMode】queryA[100 ms] + queryB[150 ms] 时间:" + costTime + " ms");
}
public Integer[] queryA() {
sleep(100);
return of(1, 2, 3, 4);
}
public Integer[] queryB() {
sleep(150);
return of(2, 3, 4, 5, 6);
}
public static void sleep(long times) {
try {
Thread.sleep(times);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static <T> T[] of(T... values) {
return values;
}
}
1
2
【blockingMode】queryA[100 ms] + queryB[150 ms] 时间:276 ms
【cocurrentMode】 queryA[100 ms] + queryB[150 ms] 时间:187 ms
Reactive Streams 框架
-
JAVA 9 Flow API
-
Spring Reactor
-
RxJava
Java 8 Stream API
先通过一段代码了解一下 JAVA 8 Stream API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class StreamAPIDemo {
public static void main(String[] args) {
// Stream => 1,2,3,4,5
// Stream 管道
// Stream 运算模式
// 穿行
Stream.of(1, 2, 3, 4, 5, 6).forEach(System.out::print);
System.out.println("\n========================");
// 并行
Stream.of(1, 2, 3, 4, 5, 6).parallel().forEach(System.out::print);
// 并行加顺序
System.out.println("\n===================");
Stream.of(1, 2, 3, 4, 5, 6).parallel().forEachOrdered(System.out::print);
}
}
1
2
3
4
5
123456
========================
465231
===================
123456
http://reactivex.io/
ReactiveX
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
Observables fill the gap by being the ideal way to access asynchronous sequences of multiple items
single items multiple items synchronous T getData()
Iterable<T> getData()
asynchronous Future<T> getData()
Observable<T> getData()
它是一个框架,使用观察者系列,异步处理和基于异步事件的 编程范式。
Reactive Streams JVM
Reactive Streams
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
Reactive Streams 是提供一种 异步的 Streams 的计算的标准 是 背压的。
我们就把 Reactive Streams 理解为 Streams 的 Reactive。
Reactor
3.3. From Imperative to Reactive Programming
Reactive libraries such as Reactor aim to address these drawbacks of “classic” asynchronous approaches on the JVM while also focusing on a few additional aspects:
- Composability and readability
- Data as a flow manipulated with a rich vocabulary of operators
- Nothing happens until you subscribe
- Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high
- High level but high value abstraction that is concurrency-agnostic
Imperative : 驱使式 就是迭代器
理解 背压:
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class IteratorDemo {
public static void main(String[] args) {
List<Integer> valueList = Arrays.asList(1, 2, 3, 4, 5);
// 包含 5 个完整的数据
Iterator<Integer> iterator = valueList.iterator();
while (iterator.hasNext()) { // 被动的内容
Integer value = iterator.next(); // 客户端主动请求
if (value % 2 == 0) { // 判断是否为偶数
System.out.println(value);
} else {
// 奇数仍然还是产生了
}
}
}
/****
* 服务端推送数据 1,2,3,4,5
* 客户端只要 1,2,3
* 客户端 cancel 命令通知服务端推送 4,5
*/
public void update(Integer value , Request request) {
if(value == 3){
request.concel();
}
}
}
图解 迭代器模式:
假如说我现在现在需要一个场景:【 1,2,3,4,5】 但是客户端只需要【 1,2】 图片:
问题的所在:传统
我们可以看到消费者的速度明显小于生产者。
背压:当消费者消费积压的时候,反向的推送告诉生成者,我不需要数据了,这个叫背压。
背压的使用场景:
当消费者消费不了的时候,消费者通知服务端。推送给另一个消费者。
SPECIFICATION
1. Publisher (Code)
1 2 3 public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }2. Subscriber (Code)
1 2 3 4 5 6 7 8 9 10 public interface Subscriber<T> { // public void onSubscribe(Subscription s); // 数据到达 public void onNext(T t); // 数据错误 public void onError(Throwable t); // 数据完成 public void onComplete(); }3. Subscription (Code)
1 2 3 4 public interface Subscription { public void request(long n); public void cancel(); }4.Processor (Code)
1 2 public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
关键字:
-
standard:技术标准
-
asynchronous stream processing:异步 Stream 处理
-
Java 8 Stream API
-
1 2
{ Stream.of(1,2,3,4,5).parallel().forEachOrdered(System.out::println); }
-
-
-
non-blocking backpressure:非阻塞式背压
-
观察者模式扩展
-
异步 / 同步 混搭模式
- 同步非阻塞
- 异步非阻塞
-
非阻塞并非异步
- NIO VS AIO
Spring 5 Web Flux
- Reactor
- Reactive Streams JVM
观察者模式 VS 迭代器模式
JAVA API 表述
- 观察者模式:
Observable
/Observer
- 迭代器模式:
Iterator<T>
使用场景
- 观察者模式:推(Push-Based)模式
- 迭代器模式:拉(Pull-Based)模式
Java 8 Lambda 表达式
Function
Consumer
Supplier
Reactive-streams
- Publisher
- 数据发布者
- Subscriber
- 数据消费者
- Subscription
- 数据订阅内容
- Processor
- Publisher + Subscriber
1 2 3 4 5 6 7 8 9 10 11 Observable.create(emitter -> { while (!emitter.isDisposed()) { long time = System.currentTimeMillis(); emitter.onNext(time); if (time % 2 != 0) { emitter.onError(new IllegalStateException("Odd millisecond!")); break; } } }) .subscribe(System.out::println, Throwable::printStackTrace);数据的产生
1 2 3 4 5 6 7 8 9 Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done"; }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe(System.out::println, Throwable::printStackTrace); Thread.sleep(2000); // <--- wait for the flow to finish
Reactive 编程核心代码
#blockingMode()
阻塞模式#cocurrentMode()
并发模式#reactiveModeMistake()
响应模式 (写法错误, 仍然是迭代器模式)#reactiveMode()
响应模式(没有消费)#reactiveModeSubscribe()
响应模式 (生产数据在同一个线程里边#reactiveModeSubscribeAsync()
响应模式 (生产数据在不同的线程里边) 真正的异步非阻塞式的编程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.darian.demos;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BlockingQueryDB {
public static void main(String[] args) throws ExecutionException, InterruptedException {
BlockingQueryDB blockingQueryDB = new BlockingQueryDB();
blockingMode(blockingQueryDB);
cocurrentMode(blockingQueryDB);
reactiveModeMistake(blockingQueryDB);
reactiveMode(blockingQueryDB);
reactiveModeSubscribe(blockingQueryDB);
reactiveModeSubscribeAsync(blockingQueryDB);
Thread.currentThread().join();
}
/***
* 响应模式 (生产数据在不同的线程里边)
* 真正的异步非阻塞式的编程
*/
public static void reactiveModeSubscribeAsync(BlockingQueryDB blockingQueryDB){
long start = System.currentTimeMillis();
Flowable<Integer[]> flowableA = Flowable.fromCallable(blockingQueryDB::queryA);
Observable<Integer[]> observableb = Observable.fromCallable(blockingQueryDB::queryB);
flowableA.subscribeOn(Schedulers.newThread())
.subscribe(); // 没有订阅就没有执行
observableb.subscribeOn(Schedulers.newThread()).subscribe();
long costTime = System.currentTimeMillis() - start;
System.out.println("【reactiveModeSubscribeAsync】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:" + costTime + " ms");
}
/***
* 响应模式 (生产数据在同一个线程里边)
*/
public static void reactiveModeSubscribe(BlockingQueryDB blockingQueryDB) {
long start = System.currentTimeMillis();
Flowable<Integer[]> flowableA = Flowable.fromCallable(blockingQueryDB::queryA);
Observable<Integer[]> observableb = Observable.fromCallable(blockingQueryDB::queryB);
flowableA.subscribe();
observableb.subscribe();
long costTime = System.currentTimeMillis() - start;
System.out.println("【reactiveModeSubscribe】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:" + costTime + " ms");
}
/***
* 响应模式(没有消费)
*/
public static void reactiveMode(BlockingQueryDB blockingQueryDB) {
long start = System.currentTimeMillis();
// Publisher
// 发布 query A 操作
Flowable<Integer[]> flowableA = Flowable.fromCallable(blockingQueryDB::queryA);
// 发布 query B 数据操作
Observable<Integer[]> observableb = Observable.fromCallable(blockingQueryDB::queryB);
long costTime = System.currentTimeMillis() - start;
System.out.println("【reactiveMode】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:" + costTime + " ms");
}
/***
* 响应模式 (写法错误, 仍然是迭代器模式)
*/
public static void reactiveModeMistake(BlockingQueryDB blockingQueryDB) {
long start = System.currentTimeMillis();
Integer[] a = blockingQueryDB.queryA();
Integer[] b = blockingQueryDB.queryB();
// Publisher
// 发布 query A 操作
Flowable<Integer[]> flowableA = Flowable.just(a);
// 发布 query B 数据操作
Observable<Integer[]> observableb = Observable.just(b);
long costTime = System.currentTimeMillis() - start;
System.out.println("【reactiveModeMistake】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:" + costTime + " ms");
// just 是完全调用 queryA() + queryB() >= 阻塞模式
}
/***
* 并发模式
*/
public static void cocurrentMode(BlockingQueryDB blockingQueryDB) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(2);
// futureA 保存 queryA 的结果
Future<Integer[]> futureA = executorService.submit(blockingQueryDB::queryA);
// futureB 保存 queryB 的结果
Future<Integer[]> futureB = executorService.submit(blockingQueryDB::queryB);
// 阻塞
futureA.get();
futureB.get();
// 还有优化的写法,没必要阻塞,再阻塞。
executorService.shutdown();
long costTime = System.currentTimeMillis() - start;
System.out.println("【cocurrentMode】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:" + costTime + " ms");
}
/***
* 阻塞模式
*/
public static void blockingMode(BlockingQueryDB blockingQueryDB) {
long start = System.currentTimeMillis();
blockingQueryDB.queryA();
blockingQueryDB.queryB();
long costTime = System.currentTimeMillis() - start;
System.out.println("【blockingMode】queryA [ 100 ms ] + queryB [ 150 ms ] 时间:" + costTime + " ms");
}
public Integer[] queryA() {
sleep(100);
System.out.println("Thread[" + Thread.currentThread().getName() + "]: queryA() .....");
return of(1, 2, 3, 4);
}
public Integer[] queryB() {
sleep(150);
System.out.println("Thread[" + Thread.currentThread().getName() + "]: queryB() .....");
return of(2, 3, 4, 5, 6);
}
public static void sleep(long times) {
try {
Thread.sleep(times);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static <T> T[] of(T... values) {
return values;
}
}
日志的输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Thread[main]: queryA() .....
Thread[main]: queryB() .....
【blockingMode】queryA [ 100 ms ] + queryB [ 150 ms ] 时间:266 ms
Thread[pool-1-thread-1]: queryA() .....
Thread[pool-1-thread-2]: queryB() .....
【cocurrentMode】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:187 ms
Thread[main]: queryA() .....
Thread[main]: queryB() .....
【reactiveModeMistake】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:344 ms
【reactiveMode】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:0 ms
Thread[main]: queryA() .....
Thread[main]: queryB() .....
【reactiveModeSubscribe】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:281 ms
【reactiveModeSubscribeAsync】 queryA [ 100 ms ] + queryB [ 150 ms ] 时间:0 ms
Thread[RxNewThreadScheduler-1]: queryA() .....
Thread[RxNewThreadScheduler-2]: queryB() .....
理解Reactive和传统方式区别:
和线程池的代码上的区别:
线程池代码:
1
2
3
4
5
6
7
8
9
10
11
12
public static void executorService(BlockingQueryDB blockingQueryDB) {
Future<Integer[]> futureA = Executors.newFixedThreadPool(2)
.submit(blockingQueryDB::queryA);
try {
// 阻塞
futureA.get();
// 正常的逻辑
} catch (Exception e) {
// 异常的逻辑
}
// 结束后的逻辑
}
Reactive 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void reactiveFluent(BlockingQueryDB blockingQueryDB) {
// 流式处理
// 开发人员无需关注并发或者线程池
Observable.fromCallable(blockingQueryDB::queryA)
.subscribeOn(Schedulers.newThread()) // 决定同步/异步
.doOnNext(integers -> {
// 数据消费正常逻辑
})
.doOnError(throwable -> {
// 数据消费异常逻辑
})
.doOnComplete(() -> {
// 执行结束逻辑
})
.doFinally(() -> {
// 最终执行......
});
}
稍微标准一点的方法设计
1
2
3
4
public static void reactivefromObservable(Observable<BlockingQueryDB> blockingQueryDBObservable){
blockingQueryDBObservable.subscribeOn(Schedulers.newThread())
.subscribe();
}
把两个合并。Observable
1
2
3
observableA.mergeWith(observableB).map(integers -> Arrays.asList(integers))
.doOnNext(integers -> integers.forEach(System.out::println)).subscribe();
流式的处理,类似于 Stream 又称之为 Flowable API
注意事项
- Reactive 是高端玩家的游戏
- Java 8 Lambda
- Java 并发
- 观察者模式
优势
- 流式处理,面向数据
- 数据生成 generate
- 数据合并 merge
- 数据扁平化 map
- 数据 reduce
( BiConsumer<..> )
- 并行处理,无需关注线程池
- 线程池无感
- Future 无感
- Callable / Runnable 无感
- 提高吞吐量
- 并不意味处理事件变短
- 执行时间变长
- 处理的数量变多
- 执行任务后台化
劣势
- 学习难度复杂
- Java 8 Lambda
- Java 并发
- 观察者模式
- 不容易调试
- 容易出错
- API 组合太多
Java 高并发 误区
普遍性来讲:高并发 = 高血压并发症
- happens-before
- happen-before
- Lock-Free
JDBC Reactive 化
相关:技术
-
Observable
- 事件 / 监听
- 观察者模式
Publisher -> 消息管道 : publish 10ms
Publisher -> 消息管道 : publish 20ms
消息管道 -> Subscriber【1】 : subscribe 30ms
消息管道 -> Subscriber【1】 : subscribe 40ms
Subscriber【1】 ->Publisher: 这里已经消费积压了!!!
note left of Publisher: 发送数据给 Subscriber[2]
Publisher -> 消息管道 : publish 10ms
消息管道 -> Subscriber【2】: subscribe 40ms
####