Java 9 异步并发编程

Posted by NotGeek on November 2, 2018

JAVA 9 异步并发编程

主要议题

  • Reactive Streams
  • 前 Java 9 时代
  • Java 9 时代

Reactive Streams

前 Java 9 时代

  • 观察者模式
  • 事件监听模式
  • Reactor 模式

Java 9 时代

  • Flow API
    • Publisher<T>
    • Subscriber<T>
    • Processor<T,R>
    • Subscription

Reactive Streams

今天探讨 JAVA 8 到 JAVA 9 的过渡。

  • Publisher
    • 发布者,发布内容 Subscription
  • Subscriber
    • 订阅者,订阅内容 Subscription
  • Subscription
    • 订阅的内容
  • Processor
    • Publisher + Subscriber
Processor 图解:

1545584266345

Processor 是发布者也是订阅者
Spring Cloud Stream / Spring Cloud Stream Data 里边有消息服务总线。

https://github.com/reactive-streams/reactive-streams-jvm

API Components

The API consists of the following components that are required to be provided by Reactive Stream implementations:

  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

Glossary

Term Definition
Signal As a noun: one of the onSubscribe, onNext, onComplete, onError, request(n) or cancel methods. As a verb: calling/invoking a signal.
………. ……….

SPECIFICATION

1. Publisher (Code)

1
2
3
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
1
2
3
4
5
6
7
8
类似于我们的 mq 的形式。
WebSocket
- onOpen
- onMessage
- onError
他和 stream 的信号,相似,但是一个是直连,一个不是。

一个发布者,也可能是一个订阅者,也可能十多个,可能是单一传播或者是广播。

MQ 两种模式

  • 点对点模式
    • 一对一
  • Topic 模式
    • 1 : N

他并不是一个新的东西,只是把一些相关的东西融合到一块

http://reactivex.io/intro.html

ReactiveX

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

reactiveX 关键字

  • asynchronous (异步) event-based (基于事件)
  • observable sequences(观察者)

前 JAVA 9 时代

好的架构师会注意 技术的传承

  • 观察者模式: JAVA 支持
  • 事件监听模式(观察者模式的变种): JAVA Beans != #get + #set()
  • Reactor 模式 : (设计 NIO 的东西)

JAVA 9 的最大的特性就是 模块化

句柄的操作,JAVA 追踪堆栈很麻烦的,JAVA 9 做了优化

JAVA 一直支持 Observer

1
2
3
4
5
6
7
8
9
@Deprecated(since="9")
// 观察者
public interface Observer {
    /**
     * Observerable 事件源
     * Object arg 参数
     */
    void update(Observable o, Object arg);
}

java.util.Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void notifyObservers() {
    notifyObservers(null);
}
public void notifyObservers(Object arg) {
    Object[] arrLocal;

    synchronized (this) {
        if (!changed)
            return;
        arrLocal = obs.toArray();
        clearChanged();
    }

    for (int i = arrLocal.length-1; i>=0; i--)
        ((Observer)arrLocal[i]).update(this, arg);
}

所谓的传参数,把自己传进来,

他是倒序的,从后往前的,是一种实现方式。

消息队列,并不要求,队列必须按照时间顺序去执行,除非,后边的计算单元和前边的计算单元有强烈的计算关系的时候,拼接一个 “Hello, world” ,一个一个字符必须按照顺序,才能够符合你的期望,但是实际情况下,消息的到达并不一定按顺序的。所以这点看一下,不要太深入了。。。。。

当你依赖于某一个接口,的时候,传入的参数的顺序,就是你这里 lambda 表达式的顺序。

Observable#setChanged 方法是 protected ,所以需要自己实现。

提升一下它的级别,JAVA 的编程模型是重进入的。

观察者的统治的顺序就是倒序

观察者 Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/***
 * 观察者模式 Demo
 */
public class ObserverDemo {
    public static void main(String[] args) {
        // 第一个缺点: Vector 作为底层存储(全线程安全的)
        // 第二个缺点,没有实现泛型
        // 第三个阙殆你: 同步 -> 阻塞
        MyObserable observable = new MyObserable();// 构建观察对象
        // 增加了第一个观察者
        observable.addObserver((o, arg) -> {
            System.out.println("第一个--观察到的数值:" + arg);
        });
        // 增加了第二个观察者
        observable.addObserver((o, arg) -> {
            System.out.println("第二个--观察到的数值:" + arg);
        });

        // 设置变化值
        observable.setChanged();
        observable.notifyObservers(1212321);
    }

    private static class MyObserable extends Observable {
        public void setChanged() {
            super.setChanged();
        }
    }
}
事件监听者模式 Demo

Pojo 不单单是 getter / setter

  • xxxEvent
  • xxxListener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/***
 * 事件监听者模式
 * {@link java.util.EventObject} 标准的事件对象
 * {@link java.util.EventListener} 标准的事件监听对象
 * @see java.util.EventListener
 */
public class EventListenerDemo {
    public static void main(String[] args) {
        Person person = new Person();
        // JAVA Beans 规范 -> 内省
        // propertyChangeEvent 广播源
        // PropertyChangeListener 注册器
        PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(person);
        propertyChangeSupport.addPropertyChangeListener("name", evt -> {
            Person bean = (Person) evt.getSource();
            System.out.println("【Person】:" + bean + "\n,[旧值]:"
                    + evt.getOldValue() + ",【新值】" + evt.getNewValue());
        });

        // 触发事件
        propertyChangeSupport.firePropertyChange("name",
                null, "darian");
    }

    /***
     * Pojo getter / setter
     */
    public static class Person {
        private String name;
        private int age;

        @Override
        public String toString() {
            return "Person{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }
}

java.beans.PropertyChangeSupport#addPropertyChangeListener(java.lang.String, java.beans.PropertyChangeListener)

也是一个一个添加进去,一个一个循环的去更新。数组之类的

reactor

Reactor pattern

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexesthe incoming requests and dispatches them synchronously to the associated request handlers.[1]

和 NIO 非常的像

NIO 有一个二注册的事件,有一个 SelectorKey 有一个值,读、写、连接。

Reactor

JAVA 9 时代

https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html

  • 正常返回
    • onComplete
    • 错误返回
  • flatMap
  • map
  • reduce

就是类似的。

Flow 的引入是开源的胜利,大家都在用 reactive ,所以 JAVA API 被迫支持,

衣巷_百度汉语

作者:刘禹锡

朱雀桥边野草花,乌衣巷口夕阳斜。 旧时王谢堂前燕,飞入寻常百姓家。

Oracle 不 leader 了

要学习英文文档。多看英文文档。多写,多练。

[public class SubmissionPublisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/SubmissionPublisher.html)

Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.

一个发布器,异步的 非 Null 的,null 就会异常。

你非得传递 null ,就传递 Optional。

我们多关注于 Comsumer

Publisher

  • 1: N

区别

  • Reactive
  • Observer

代码尽量一次写好,重构的机会不多。

属性尽量用 final 修饰。

Interface Flow.Subscription

Enclosing class:Flow

Message control linking a Flow.Publisher and Flow.Subscriber. Subscribers receive items only when requested, and may cancel at any time. The methods in this interface are intended to be invoked only by their Subscribers; usages in other contexts have undefined effects.

void cancel() Causes the Subscriber to (eventually) stop receiving messages.
void request(long n) Adds the given number n of items to the current unfulfilled demand for this subscription.

request

1
void request(long n)

Adds the given number n of items to the current unfulfilled demand for this subscription. If n is less than or equal to zero, the Subscriber will receive an onError signal with an IllegalArgumentException argument. Otherwise, the Subscriber will receive up to n additional onNext invocations (or fewer if terminated).

  • Parameters:

n - the increment of demand; a value of Long.MAX_VALUE may be considered as effectively unbounded

  • #cancel 取消
  • #request 传递过去。

request(1) 相当于一个信号控制的源

每次 +1 到下一个来。

[a, b, c, d, e, f, g ]

100 100 100

a -> b -> c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/***
 * {@link java.util.concurrent.SubmissionPublisher}
 */
public class SubmissionPublisherDemo {
    public static void main(String[] args) throws InterruptedException {
        try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
            publisher.subscribe(new IntegerSubscriber("A"));
            publisher.subscribe(new IntegerSubscriber("B"));
            publisher.subscribe(new IntegerSubscriber("C"));

            CompletableFuture<Void> completableFuture = publisher.consume(integer -> {
                System.out.println("[" + Thread.currentThread().getName() + "]:" + integer);
            }).thenRunAsync(() -> System.out.println("[" + Thread.currentThread().getName() + "]"))
                    .thenRun(() -> System.out.println("[" + Thread.currentThread().getName() + "]"))
                    .exceptionally(throwable -> {
                        System.out.println(throwable);
                        return null;
                    });

            // 提交数据到各个订阅器
            publisher.submit(100);
        }
//        publisher.close();
        Thread.currentThread().join(5000);
    }

    private static class IntegerSubscriber implements Flow.Subscriber<Integer> {
        private final String name;
        private Flow.Subscription subscription;

        public IntegerSubscriber(String name) {
            this.name = name;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("Thread:[" + Thread.currentThread().getName() + "]:"
                    + "Surrent Subscriber:[" + name + "]"
                    + "subscribes subscription:[" + subscription + "]");
            this.subscription = subscription;
            subscription.request(1);
        }

        @Override
        public void onNext(Integer item) {
            System.out.println("Thread:[" + Thread.currentThread().getName() + "]:"
                    + "Surrent Subscriber:[" + name + "]"
                    + "receives item:[" + item + "]");
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println(throwable);
            throwable.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("Thread:[" + Thread.currentThread().getName() + "]:"
                    + "is complete:[" + name + "]");
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
[ForkJoinPool.commonPool-worker-9]:100
[ForkJoinPool.commonPool-worker-9]
[ForkJoinPool.commonPool-worker-9]
Thread:[ForkJoinPool.commonPool-worker-5]:Surrent Subscriber:[B]subscribes subscription:[java.util.concurrent.SubmissionPublisher$BufferedSubscription@38c95372]
Thread:[ForkJoinPool.commonPool-worker-23]:Surrent Subscriber:[C]subscribes subscription:[java.util.concurrent.SubmissionPublisher$BufferedSubscription@1bea0fb]
Thread:[ForkJoinPool.commonPool-worker-19]:Surrent Subscriber:[A]subscribes subscription:[java.util.concurrent.SubmissionPublisher$BufferedSubscription@43013828]
Thread:[ForkJoinPool.commonPool-worker-5]:Surrent Subscriber:[B]receives item:[100]
Thread:[ForkJoinPool.commonPool-worker-23]:Surrent Subscriber:[C]receives item:[100]
Thread:[ForkJoinPool.commonPool-worker-19]:Surrent Subscriber:[A]receives item:[100]
Thread:[ForkJoinPool.commonPool-worker-19]:is complete:[A]
Thread:[ForkJoinPool.commonPool-worker-5]:is complete:[B]
Thread:[ForkJoinPool.commonPool-worker-23]:is complete:[C]

先 subscription 在 receives 在 complete

但是每一个观察者的顺序是随机的。说明是异步的。

try( .. ) 实现 AotoCloseable 的时候,自动会给你关闭。

1
2
3
4
5
try{
   
} finnally{
    xxx.close();
}
  • CompletableFuture 太过的灵活,都是由你的代码手动的控制。
  • Reactive 就比较的保守。

架构师

  • 理论派
  • 构造与落地

QA

  1. 什么时候用到消息?

    答: 当你用到异步的时候,

    • JAVA 的异步
    • 消息的异步

    当你不需要立马的返回值的时候,

    假如说算一个累加的任务,今天一个 5 ,明天一个 10,你算没算完,我用

    1
    2
    
    {   completalbeFuture.isDone();
    }
    

    当你操作需要预热,是你的一个可以操作的对象,可以去判断它是否完成。

    • 例子

      我们数据库要预热,缓存预热,要缓存 100 万的数据,你用异步的方式去做,就比较好。

    我用第一步的时候,还可以操作第二步,像水流一样,数据流转

  2. 基础 or 架构如何取舍?

    答:都要,基础和 架构都要

  3. 异步会不会引起数据不一致问题?

    答: 多线程 并发/并行,都需要关注数据一致性,高正增加合理的同步方式

    CASLock

  4. 没有时间怎么办?

    答:要反省自己。行有不得,反求诸己。

    不要 把女人当男人用,把男人当牲口用。 如果一点点空都没有,要不然,最好就辞了这份工作。

  5. 升级 JAVA 9 以后,怎么坑?

    答:需要引入。一些东西

  6. 事件驱动和 JAVA 9 Flow 有什么联系

    答:主要看事件,谁是事件,谁去驱动。作为状态的变更。

异步方式可以提高吞吐量

吞吐量是一个总量,以时间为考量

  • 同步 200ms a b c d e f g = 1200 ms
  • 异步 200ms +

异步调用

  • 提交任务 1ms a b c d e f g = 7ms
  • 执行任务 200ms for -> onNext -> onComplete