JAVA 9 异步并发编程
主要议题
- Reactive Streams
- 前 Java 9 时代
- Java 9 时代
Reactive Streams
- 核心组件
Publisher
Subscriber
Subscription
Processor
- Reactive Streams
- Reactive X
前 Java 9 时代
- 观察者模式
- 事件监听模式
- Reactor 模式
Java 9 时代
- Flow API
Publisher<T>
Subscriber<T>
Processor<T,R>
Subscription
Reactive Streams
今天探讨 JAVA 8 到 JAVA 9 的过渡。
Publisher
- 发布者,发布内容
Subscription
- 发布者,发布内容
Subscriber
- 订阅者,订阅内容
Subscription
- 订阅者,订阅内容
Subscription
- 订阅的内容
Processor
Publisher
+Subscriber
Processor 图解:
Processor
是发布者也是订阅者
Spring Cloud Stream / Spring Cloud Stream Data 里边有消息服务总线。
https://github.com/reactive-streams/reactive-streams-jvm
API Components
The API consists of the following components that are required to be provided by Reactive Stream implementations:
- Publisher
- Subscriber
- Subscription
- Processor
Glossary
Term Definition Signal As a noun: one of the onSubscribe
,onNext
,onComplete
,onError
,request(n)
orcancel
methods. As a verb: calling/invoking a signal.………. ………. SPECIFICATION
1. Publisher (Code)
1 2 3 public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
1 2 3 4 5 6 7 8 类似于我们的 mq 的形式。 WebSocket - onOpen - onMessage - onError 他和 stream 的信号,相似,但是一个是直连,一个不是。 一个发布者,也可能是一个订阅者,也可能十多个,可能是单一传播或者是广播。
MQ 两种模式
- 点对点模式
- 一对一
- Topic 模式
- 1 : N
他并不是一个新的东西,只是把一些相关的东西融合到一块
http://reactivex.io/intro.html
ReactiveX
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.
reactiveX 关键字
- asynchronous (异步) event-based (基于事件)
- observable sequences(观察者)
前 JAVA 9 时代
好的架构师会注意 技术的传承
- 观察者模式: JAVA 支持
- 事件监听模式(观察者模式的变种): JAVA Beans !=
#get
+#set()
- Reactor 模式 : (设计 NIO 的东西)
JAVA 9 的最大的特性就是 模块化
句柄的操作,JAVA 追踪堆栈很麻烦的,JAVA 9 做了优化
JAVA 一直支持 Observer
1
2
3
4
5
6
7
8
9
@Deprecated(since="9")
// 观察者
public interface Observer {
/**
* Observerable 事件源
* Object arg 参数
*/
void update(Observable o, Object arg);
}
java.util.Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void notifyObservers() {
notifyObservers(null);
}
public void notifyObservers(Object arg) {
Object[] arrLocal;
synchronized (this) {
if (!changed)
return;
arrLocal = obs.toArray();
clearChanged();
}
for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}
所谓的传参数,把自己传进来,
他是倒序的,从后往前的,是一种实现方式。
消息队列,并不要求,队列必须按照时间顺序去执行,除非,后边的计算单元和前边的计算单元有强烈的计算关系的时候,拼接一个 “Hello, world” ,一个一个字符必须按照顺序,才能够符合你的期望,但是实际情况下,消息的到达并不一定按顺序的。所以这点看一下,不要太深入了。。。。。
当你依赖于某一个接口,的时候,传入的参数的顺序,就是你这里 lambda 表达式的顺序。
Observable
的 #setChanged
方法是 protected
,所以需要自己实现。
提升一下它的级别,JAVA 的编程模型是重进入的。
观察者的统治的顺序就是倒序
观察者 Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/***
* 观察者模式 Demo
*/
public class ObserverDemo {
public static void main(String[] args) {
// 第一个缺点: Vector 作为底层存储(全线程安全的)
// 第二个缺点,没有实现泛型
// 第三个阙殆你: 同步 -> 阻塞
MyObserable observable = new MyObserable();// 构建观察对象
// 增加了第一个观察者
observable.addObserver((o, arg) -> {
System.out.println("第一个--观察到的数值:" + arg);
});
// 增加了第二个观察者
observable.addObserver((o, arg) -> {
System.out.println("第二个--观察到的数值:" + arg);
});
// 设置变化值
observable.setChanged();
observable.notifyObservers(1212321);
}
private static class MyObserable extends Observable {
public void setChanged() {
super.setChanged();
}
}
}
事件监听者模式 Demo
Pojo 不单单是 getter / setter
xxxEvent
xxxListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/***
* 事件监听者模式
* {@link java.util.EventObject} 标准的事件对象
* {@link java.util.EventListener} 标准的事件监听对象
* @see java.util.EventListener
*/
public class EventListenerDemo {
public static void main(String[] args) {
Person person = new Person();
// JAVA Beans 规范 -> 内省
// propertyChangeEvent 广播源
// PropertyChangeListener 注册器
PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(person);
propertyChangeSupport.addPropertyChangeListener("name", evt -> {
Person bean = (Person) evt.getSource();
System.out.println("【Person】:" + bean + "\n,[旧值]:"
+ evt.getOldValue() + ",【新值】" + evt.getNewValue());
});
// 触发事件
propertyChangeSupport.firePropertyChange("name",
null, "darian");
}
/***
* Pojo getter / setter
*/
public static class Person {
private String name;
private int age;
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
}
java.beans.PropertyChangeSupport#addPropertyChangeListener(java.lang.String, java.beans.PropertyChangeListener)
也是一个一个添加进去,一个一个循环的去更新。数组之类的
reactor
Reactor pattern
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexesthe incoming requests and dispatches them synchronously to the associated request handlers.[1]
和 NIO 非常的像
NIO 有一个二注册的事件,有一个 SelectorKey
有一个值,读、写、连接。
Reactor
JAVA 9 时代
https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html
- 正常返回
- onComplete
- 错误返回
- flatMap
- map
- reduce
就是类似的。
Flow 的引入是开源的胜利,大家都在用
reactive
,所以 JAVA API 被迫支持,衣巷_百度汉语
作者:刘禹锡
朱雀桥边野草花,乌衣巷口夕阳斜。 旧时王谢堂前燕,飞入寻常百姓家。
Oracle 不 leader 了
要学习英文文档。多看英文文档。多写,多练。
[public class SubmissionPublisher
](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/SubmissionPublisher.html) A
Flow.Publisher
that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.
一个发布器,异步的 非 Null 的,null 就会异常。
你非得传递 null ,就传递 Optional。
我们多关注于 Comsumer
Publisher
- 1: N
区别
Reactive
推Observer
拉
代码尽量一次写好,重构的机会不多。
属性尽量用
final
修饰。
Interface Flow.Subscription
Enclosing class:Flow
Message control linking a
Flow.Publisher
andFlow.Subscriber
. Subscribers receive items only when requested, and may cancel at any time. The methods in this interface are intended to be invoked only by their Subscribers; usages in other contexts have undefined effects.
void
cancel()
Causes the Subscriber to (eventually) stop receiving messages. void
request(long n)
Adds the given number n
of items to the current unfulfilled demand for this subscription.request
1 void request(long n)Adds the given number
n
of items to the current unfulfilled demand for this subscription. Ifn
is less than or equal to zero, the Subscriber will receive anonError
signal with anIllegalArgumentException
argument. Otherwise, the Subscriber will receive up ton
additionalonNext
invocations (or fewer if terminated).
- Parameters:
n
- the increment of demand; a value ofLong.MAX_VALUE
may be considered as effectively unbounded
#cancel
取消#request
传递过去。
request(1)
相当于一个信号控制的源
每次 +1
到下一个来。
[a, b, c, d, e, f, g ]
100 100 100
a -> b -> c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/***
* {@link java.util.concurrent.SubmissionPublisher}
*/
public class SubmissionPublisherDemo {
public static void main(String[] args) throws InterruptedException {
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
publisher.subscribe(new IntegerSubscriber("A"));
publisher.subscribe(new IntegerSubscriber("B"));
publisher.subscribe(new IntegerSubscriber("C"));
CompletableFuture<Void> completableFuture = publisher.consume(integer -> {
System.out.println("[" + Thread.currentThread().getName() + "]:" + integer);
}).thenRunAsync(() -> System.out.println("[" + Thread.currentThread().getName() + "]"))
.thenRun(() -> System.out.println("[" + Thread.currentThread().getName() + "]"))
.exceptionally(throwable -> {
System.out.println(throwable);
return null;
});
// 提交数据到各个订阅器
publisher.submit(100);
}
// publisher.close();
Thread.currentThread().join(5000);
}
private static class IntegerSubscriber implements Flow.Subscriber<Integer> {
private final String name;
private Flow.Subscription subscription;
public IntegerSubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Thread:[" + Thread.currentThread().getName() + "]:"
+ "Surrent Subscriber:[" + name + "]"
+ "subscribes subscription:[" + subscription + "]");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Thread:[" + Thread.currentThread().getName() + "]:"
+ "Surrent Subscriber:[" + name + "]"
+ "receives item:[" + item + "]");
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println(throwable);
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Thread:[" + Thread.currentThread().getName() + "]:"
+ "is complete:[" + name + "]");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
[ForkJoinPool.commonPool-worker-9]:100
[ForkJoinPool.commonPool-worker-9]
[ForkJoinPool.commonPool-worker-9]
Thread:[ForkJoinPool.commonPool-worker-5]:Surrent Subscriber:[B]subscribes subscription:[java.util.concurrent.SubmissionPublisher$BufferedSubscription@38c95372]
Thread:[ForkJoinPool.commonPool-worker-23]:Surrent Subscriber:[C]subscribes subscription:[java.util.concurrent.SubmissionPublisher$BufferedSubscription@1bea0fb]
Thread:[ForkJoinPool.commonPool-worker-19]:Surrent Subscriber:[A]subscribes subscription:[java.util.concurrent.SubmissionPublisher$BufferedSubscription@43013828]
Thread:[ForkJoinPool.commonPool-worker-5]:Surrent Subscriber:[B]receives item:[100]
Thread:[ForkJoinPool.commonPool-worker-23]:Surrent Subscriber:[C]receives item:[100]
Thread:[ForkJoinPool.commonPool-worker-19]:Surrent Subscriber:[A]receives item:[100]
Thread:[ForkJoinPool.commonPool-worker-19]:is complete:[A]
Thread:[ForkJoinPool.commonPool-worker-5]:is complete:[B]
Thread:[ForkJoinPool.commonPool-worker-23]:is complete:[C]
先 subscription 在 receives 在 complete
但是每一个观察者的顺序是随机的。说明是异步的。
try( .. ) 实现 AotoCloseable 的时候,自动会给你关闭。
1 2 3 4 5 try{ } finnally{ xxx.close(); }
CompletableFuture
太过的灵活,都是由你的代码手动的控制。- Reactive 就比较的保守。
架构师
- 理论派
- 构造与落地
QA
-
什么时候用到消息?
答: 当你用到异步的时候,
- JAVA 的异步
- 消息的异步
当你不需要立马的返回值的时候,
假如说算一个累加的任务,今天一个 5 ,明天一个 10,你算没算完,我用
1 2
{ completalbeFuture.isDone(); }
当你操作需要预热,是你的一个可以操作的对象,可以去判断它是否完成。
-
例子
我们数据库要预热,缓存预热,要缓存 100 万的数据,你用异步的方式去做,就比较好。
我用第一步的时候,还可以操作第二步,像水流一样,数据流转
-
基础 or 架构如何取舍?
答:都要,基础和 架构都要
-
异步会不会引起数据不一致问题?
答: 多线程 并发/并行,都需要关注数据一致性,高正增加合理的同步方式
CAS
、Lock
-
没有时间怎么办?
答:要反省自己。行有不得,反求诸己。
不要
把女人当男人用,把男人当牲口用。
如果一点点空都没有,要不然,最好就辞了这份工作。 -
升级 JAVA 9 以后,怎么坑?
答:需要引入。一些东西
-
事件驱动和 JAVA 9 Flow 有什么联系
答:主要看事件,谁是事件,谁去驱动。作为状态的变更。
异步方式可以提高吞吐量
吞吐量是一个总量,以时间为考量
- 同步 200ms a b c d e f g = 1200 ms
- 异步 200ms +
异步调用
- 提交任务 1ms a b c d e f g = 7ms
- 执行任务 200ms
for
->onNext
->onComplete