异步事件驱动 web 开发

Posted by NotGeek on November 5, 2018

异步/事件驱动 Web 开发

异步 和 事件驱动 很相似,有点区别。

关键技术:

  • Reactive
  • Reactor
  • Async

Reactive

Reactive programming

This article is about Reactive Programming theory. For Reactive Extensions, see Reactive extensions. For Functional Reactive Programming, see Functional reactive programming.

Reactive 就是异步的 Reactor

Reactor 是 NIO 的模式

Reactor pattern

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.[1]

它是处理事件的一个服务请求的并发的。。。

多工

NIO = Noblocking I/O

NIO就是非阻塞,只是多工,写事件,读事件。

Reactive 是有具体的编程模型。

reactive-streams/reactive-streams-jvm

Reactive Streams Specification for the JVM http://www.reactive-streams.org/

https://github.com/reactive-streams/reactive-streams-jvm

Reactive-streams 是一个规范。

是一个 事件回调的机制。

  • Publisher
  • Subscriber

发布订阅模式: 主要是: 1对多

1
2
3
4
5
6
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

JAVA 默认的就有 观察者模式

Reactive 并不是一个很新的技术,它只是一种延伸。新瓶装老酒。

java.util.EventListener 事件监听,标记接口

java.util.EventObject 监听对象

windows 叫做事件循环

JAVA 的 AWT 和 Swing 也是这个样子。

java.util.Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Observer {
    /**
     * This method is called whenever the observed object is changed. An
     * application calls an {@code Observable} object's
     * {@code notifyObservers} method to have all the object's
     * observers notified of the change.
     *
     * @param   o     the observable object.
     * @param   arg   an argument passed to the {@code notifyObservers}
     *                 method.
     */
    void update(Observable o, Object arg);
}

Observable

1
private Vector<Observer> obs;

全部都是同步,

看这个事件包含不包含和添加是两个操作,所以需要同步。

J.U.C = Java.util.concurrency

发布订阅模式 就是观察者模式的变种

Event / Listener 也是观察者模式的变种。

java.beans.PropertyChangeEvent

java.beans.PropertyChangeListener

我的属性值的改变以后, PropertyChangeListener 会把我的事件传递过来。相当于源头,监听可以有多个监听,监听其实就是一个订阅者,触发事件的就是所谓的发布者。

在 Spring 用到了很多。

外国人比较注重 JAVA Beans

内省

事件。

  • begin
  • end
  • complete

Reactive 也是类似的。

since 8

java.util.concurrent.CompletableFuture

代替了 Future ,类似于 Reactive

Spring 5 WebFlux

Web on Reactive Stack

  • Annotated Controllers: Consistent with Spring MVC and based on the same annotations from the spring-web module. Both Spring MVC and WebFlux controllers support reactive (Reactor and RxJava) return types, and, as a result, it is not easy to tell them apart. One notable difference is that WebFlux also supports reactive @RequestBody arguments.
  • Functional Endpoints: Lambda-based, lightweight, and functional programming model. You can think of this as a small library or a set of utilities that an application can use to route and handle requests. The big difference with annotated controllers is that the application is in charge of request handling from start to finish versus declaring intent through annotations and being called back.

两种开发方式,一种是注解式的编程方式,一种是函数式。

基于Servlet 3.1 的 NIO 的编程方式。

前身就是 Tomcat

CometEvent 技术 彗星技术

在 WebSocket 之前,没有所谓的推,都是拉模式。

AIO 技术

《Servlet 3.1 》 规范。

异步。

javax.servlet.ServletRequest

1
2
3
4
5
6
7
8
9
AsyncContext startAsync() throws IllegalStateException;

AsyncContext startAsync(ServletRequest var1, ServletResponse var2) throws IllegalStateException;

boolean isAsyncStarted();

boolean isAsyncSupported();

AsyncContext getAsyncContext();
1
2
@WebServlet
public class SimpleAsyncServlet extends HttpServlet {

标记 @WebServlet 必须实现 HttpServlet

Spring Boot + Servlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/***
 * 访问 "/simple/async"
 */
@WebServlet(value = "/simple/async", asyncSupported = true)
public class SimpleAsyncServlet extends HttpServlet {

    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        response.setCharacterEncoding("UTF-8");
        response.setContentType("text/html;charset=UTF-8");
        // 启动异步上下文
        AsyncContext asyncContext = request.startAsync();
        PrintWriter writer = response.getWriter();
        writer.println("[" + Thread.currentThread().getName() + "]:当前的线程开始执行....");

        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent asyncEvent) throws IOException {
                writer.println("[" + Thread.currentThread().getName() + "]:请求完成了!");
            }

            @Override
            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
                writer.println("[" + Thread.currentThread().getName() + "]:请求超时了!");
            }

            @Override
            public void onError(AsyncEvent asyncEvent) throws IOException {
                writer.println("[" + Thread.currentThread().getName() + "]:请求错误了!");
            }

            @Override
            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
                writer.println("[" + Thread.currentThread().getName() + "]:异步请求开始了!");
            }
        });
        asyncContext.complete();
    }
}

1
2
3
4
5
6
7
@SpringBootApplication
@ServletComponentScan("com.darian.asyncweb.servlet")
public class AsyncWebApplication {
   public static void main(String[] args) {
      SpringApplication.run(AsyncWebApplication.class, args);
   }
}

EventListener 是一个标记接口。

javax.servlet.AsyncListener

1
2
3
4
5
6
7
8
9
public interface AsyncListener extends EventListener {
    void onComplete(AsyncEvent var1) throws IOException;

    void onTimeout(AsyncEvent var1) throws IOException;

    void onError(AsyncEvent var1) throws IOException;

    void onStartAsync(AsyncEvent var1) throws IOException;
}

Servlet Listener 和 Reactive 类似。

WebSocket 和 ServletListener、Reactive 都是类似的。

  • Servlet <3.0 同步
  • Servlet > 3.1 支持异步了

为了兼容,默认关闭。

注意:

  • asyncContext.complete();
  • response.setCharacterEncoding("UTF-8");
  • response.setContentType("text/html;charset=UTF-8");

需要 主动的触发事件,

还需要设置编码和 ContentType

1
2
[http-nio-8081-exec-3]:当前的线程开始执行.... 
[http-nio-8081-exec-3]:请求完成了!

事件过来了,但是是一个同步的事件。

要做异步事件的话呢?

java 代码
1
2
3
4
5
6
// 异步调用
asyncContext.start(() -> {
    writer.println("[" + Thread.currentThread().getName() + "]:执行中!....\n");
    // 异步调用完成仍然需要通知
    asyncContext.complete();
});
1
2
3
[http-nio-8081-exec-5]:当前的线程开始执行.... 
[http-nio-8081-exec-6]:执行中!.... 
[http-nio-8081-exec-5]:请求完成了!

刷新网页,数字一直在变,

初始化 10 个线程。

java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(........)

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         thre adFactory, defaultHandler);
}

异步 Web 编程 是有场景的。

  • 不再需要自己写线程池。不断地 ExecutorService. #execute()
  • 不需要马上返回值
  • 数据的 dump ,

你直接丢过来就好了,它异步的慢慢的去执行就好了。 dump 1G 数据,100s 返回,然后,1s 直接响应你,比消息靠谱的是,它告诉你是不是执行。异步就是 Future

java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable<T>)

1
<T> Future<T> submit(Callable<T> task);

#submit 的时候,会返回一个 Future<T>

文件上传也可以。

运行结果需要二次校验。

消息,是自定义协议的。

异步Web,是走 HTTP 的。可以直接告诉你是否响应

还是一个 ResponseRequest

只是告诉你我接受了,但是,执行完成的结果 需要二次校验。

comet 技术,实现聊天室

comet

  • EventType.BEGIN
  • EventType.END
  • EventType.ETTOR
  • EventSubType.TIMEOUT
和上边的非常像

Tomcat 的

1
<connector connectionTimeout="2000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/>

更改为 :

1
<connector connectionTimeout="2000" port="8080" protocol="org.apache.coyote.http11.http1NioProtocol" redirectPort="8443"/>

Tomcat 的 XML 是 visitor 模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@WebServlet("/chat/ajax")
public class ChatAjaxServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /***
     * 1 个服务器,对应多个客户端
     */
    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String userName = request.getParameter("userName");
        String message = request.getParameter("message");

        String content = "User : [" + userName + "]::--" + message;
        // 通过上下文去传递

        ServletContext servletContext = request.getServletContext();
        BlockingQueue<String> messages = (BlockingQueue<String>) servletContext.getAttribute("messages");
        messages.offer(content);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@WebServlet("/chat/comet")
public class ChatCometServlet extends HttpServlet
        implements CometProcessor {
    private List<Writer> streams;
    private BlockingQueue<String> messages;

    @Override
    public void init(ServletConfig servletConfig) throws ServletException {
        // Servlet 是线程不安全的,我要并发,
        // 所有的用户的连接
        streams = new CopyOnWriteArrayList<Writer>();
        messages = new LinkedBlockingQueue<String>();

        ServletContext servletContext = servletConfig.getServletContext();
        servletContext.setAttribute("messages", messages);
        // 我需要异步的通知所有的用户
        Thread thread = new Thread(()->{
            try {
                // 获取最新的消息
                final String message  = messages.take();

                // 广播所有的客户端
                streams.forEach(writer -> {
                    try {
                        writer.write(message);
                        writer.flush();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (InterruptedException e) {
                // 标记个状态,让线程停止。
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
        },"聊天广播线程");

        thread.start();
    }

    public void event(CometEvent cometEvent) throws IOException, ServletException {
        EventType eventType = cometEvent.getEventType();
        switch (eventType) {
            // Reactor 模式
            case BEGIN:
                onBegin(cometEvent);
                break;
            case READ:
                onRead(cometEvent);
                break;
            case END:
                onEnd(cometEvent);
                break;
            case ERROR:
                onError(cometEvent);
                break;
            default:
                break;
        }
    }

    private void onBegin(CometEvent cometEvent)
            throws IOException, ServletException {
        HttpServletResponse response = cometEvent.getHttpServletResponse();
        PrintWriter writer = response.getWriter();
        // 增加一个用户
        streams.add(writer);
        System.out.println("当前在线的用户的数量:" + streams.size());
    }

    private void onRead(CometEvent cometEvent)
            throws IOException, ServletException {

    }

    private void onEnd(CometEvent cometEvent)
            throws IOException, ServletException {
        HttpServletResponse response = cometEvent.getHttpServletResponse();
        PrintWriter writer = response.getWriter();
        // 完成的时候移除
        streams.remove(writer);
        System.out.println("当前在线的用户的数量:" + streams.size());
    }

    private void onError(CometEvent cometEvent)
            throws IOException, ServletException {
        // 错误的时候也要移除
        onEnd(cometEvent);
    }
}

聊天代码没有跑起来

web 程序添加 jar 包

1545630492059

1545630453107