异步/事件驱动 Web 开发
异步 和 事件驱动 很相似,有点区别。
关键技术:
- Reactive
- Reactor
- Async
Reactive
Reactive programming
This article is about Reactive Programming theory. For Reactive Extensions, see Reactive extensions. For Functional Reactive Programming, see Functional reactive programming.
Reactive 就是异步的 Reactor
Reactor 是 NIO 的模式
Reactor pattern
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.[1]
它是处理事件的一个服务请求的并发的。。。
多工
NIO = Noblocking I/O
NIO就是非阻塞,只是多工,写事件,读事件。
Reactive 是有具体的编程模型。
reactive-streams/reactive-streams-jvm
Reactive Streams Specification for the JVM http://www.reactive-streams.org/
https://github.com/reactive-streams/reactive-streams-jvm
Reactive-streams 是一个规范。
是一个 事件回调的机制。
Publisher
Subscriber
发布订阅模式: 主要是: 1对多
1
2
3
4
5
6
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
JAVA 默认的就有 观察者模式
Reactive 并不是一个很新的技术,它只是一种延伸。新瓶装老酒。
java.util.EventListener
事件监听,标记接口
java.util.EventObject
监听对象windows 叫做事件循环
JAVA 的 AWT 和 Swing 也是这个样子。
java.util.Observer
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Observer {
/**
* This method is called whenever the observed object is changed. An
* application calls an {@code Observable} object's
* {@code notifyObservers} method to have all the object's
* observers notified of the change.
*
* @param o the observable object.
* @param arg an argument passed to the {@code notifyObservers}
* method.
*/
void update(Observable o, Object arg);
}
Observable
1
private Vector<Observer> obs;
全部都是同步,
看这个事件包含不包含和添加是两个操作,所以需要同步。
J.U.C
= Java.util.concurrency
发布订阅模式 就是观察者模式的变种
Event / Listener 也是观察者模式的变种。
java.beans.PropertyChangeEvent
java.beans.PropertyChangeListener
我的属性值的改变以后, PropertyChangeListener
会把我的事件传递过来。相当于源头,监听可以有多个监听,监听其实就是一个订阅者,触发事件的就是所谓的发布者。
在 Spring 用到了很多。
外国人比较注重 JAVA Beans
内省
事件。
begin
end
complete
Reactive 也是类似的。
since 8
java.util.concurrent.CompletableFuture
代替了 Future
,类似于 Reactive
Spring 5 WebFlux
Web on Reactive Stack
- Annotated Controllers: Consistent with Spring MVC and based on the same annotations from the
spring-web
module. Both Spring MVC and WebFlux controllers support reactive (Reactor and RxJava) return types, and, as a result, it is not easy to tell them apart. One notable difference is that WebFlux also supports reactive@RequestBody
arguments.- Functional Endpoints: Lambda-based, lightweight, and functional programming model. You can think of this as a small library or a set of utilities that an application can use to route and handle requests. The big difference with annotated controllers is that the application is in charge of request handling from start to finish versus declaring intent through annotations and being called back.
两种开发方式,一种是注解式的编程方式,一种是函数式。
基于Servlet 3.1 的 NIO 的编程方式。
前身就是 Tomcat
CometEvent 技术 彗星技术
在 WebSocket 之前,没有所谓的推,都是拉模式。
AIO 技术
《Servlet 3.1 》 规范。
异步。
javax.servlet.ServletRequest
1 2 3 4 5 6 7 8 9 AsyncContext startAsync() throws IllegalStateException; AsyncContext startAsync(ServletRequest var1, ServletResponse var2) throws IllegalStateException; boolean isAsyncStarted(); boolean isAsyncSupported(); AsyncContext getAsyncContext();
1
2
@WebServlet
public class SimpleAsyncServlet extends HttpServlet {
标记 @WebServlet
必须实现 HttpServlet
Spring Boot + Servlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/***
* 访问 "/simple/async"
*/
@WebServlet(value = "/simple/async", asyncSupported = true)
public class SimpleAsyncServlet extends HttpServlet {
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
response.setCharacterEncoding("UTF-8");
response.setContentType("text/html;charset=UTF-8");
// 启动异步上下文
AsyncContext asyncContext = request.startAsync();
PrintWriter writer = response.getWriter();
writer.println("[" + Thread.currentThread().getName() + "]:当前的线程开始执行....");
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
writer.println("[" + Thread.currentThread().getName() + "]:请求完成了!");
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
writer.println("[" + Thread.currentThread().getName() + "]:请求超时了!");
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
writer.println("[" + Thread.currentThread().getName() + "]:请求错误了!");
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
writer.println("[" + Thread.currentThread().getName() + "]:异步请求开始了!");
}
});
asyncContext.complete();
}
}
1
2
3
4
5
6
7
@SpringBootApplication
@ServletComponentScan("com.darian.asyncweb.servlet")
public class AsyncWebApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncWebApplication.class, args);
}
}
EventListener 是一个标记接口。
javax.servlet.AsyncListener
1
2
3
4
5
6
7
8
9
public interface AsyncListener extends EventListener {
void onComplete(AsyncEvent var1) throws IOException;
void onTimeout(AsyncEvent var1) throws IOException;
void onError(AsyncEvent var1) throws IOException;
void onStartAsync(AsyncEvent var1) throws IOException;
}
Servlet Listener 和 Reactive 类似。
WebSocket 和 ServletListener、Reactive 都是类似的。
- Servlet <3.0 同步
- Servlet > 3.1 支持异步了
为了兼容,默认关闭。
注意:
asyncContext.complete();
response.setCharacterEncoding("UTF-8");
response.setContentType("text/html;charset=UTF-8");
需要 主动的触发事件,
还需要设置编码和 ContentType
1
2
[http-nio-8081-exec-3]:当前的线程开始执行....
[http-nio-8081-exec-3]:请求完成了!
事件过来了,但是是一个同步的事件。
要做异步事件的话呢?
java 代码
1
2
3
4
5
6
// 异步调用
asyncContext.start(() -> {
writer.println("[" + Thread.currentThread().getName() + "]:执行中!....\n");
// 异步调用完成仍然需要通知
asyncContext.complete();
});
1
2
3
[http-nio-8081-exec-5]:当前的线程开始执行....
[http-nio-8081-exec-6]:执行中!....
[http-nio-8081-exec-5]:请求完成了!
刷新网页,数字一直在变,
初始化 10 个线程。
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(........)
1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
thre adFactory, defaultHandler);
}
异步 Web 编程 是有场景的。
- 不再需要自己写线程池。不断地
ExecutorService
.#execute()
- 不需要马上返回值
- 数据的 dump ,
你直接丢过来就好了,它异步的慢慢的去执行就好了。 dump 1G 数据,100s 返回,然后,1s 直接响应你,比消息靠谱的是,它告诉你是不是执行。异步就是
Future
java.util.concurrent.ExecutorService#submit(java.util.concurrent.Callable<T>)
1
<T> Future<T> submit(Callable<T> task);
#submit
的时候,会返回一个 Future<T>
文件上传也可以。
运行结果需要二次校验。
消息,是自定义协议的。
异步Web,是走 HTTP 的。可以直接告诉你是否响应
还是一个 Response
, Request
只是告诉你我接受了,但是,执行完成的结果 需要二次校验。
comet 技术,实现聊天室
comet
- EventType.BEGIN
- EventType.END
- EventType.ETTOR
- EventSubType.TIMEOUT
和上边的非常像
Tomcat 的
1
<connector connectionTimeout="2000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/>
更改为 :
1
<connector connectionTimeout="2000" port="8080" protocol="org.apache.coyote.http11.http1NioProtocol" redirectPort="8443"/>
Tomcat 的 XML 是 visitor 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@WebServlet("/chat/ajax")
public class ChatAjaxServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/***
* 1 个服务器,对应多个客户端
*/
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String userName = request.getParameter("userName");
String message = request.getParameter("message");
String content = "User : [" + userName + "]::--" + message;
// 通过上下文去传递
ServletContext servletContext = request.getServletContext();
BlockingQueue<String> messages = (BlockingQueue<String>) servletContext.getAttribute("messages");
messages.offer(content);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@WebServlet("/chat/comet")
public class ChatCometServlet extends HttpServlet
implements CometProcessor {
private List<Writer> streams;
private BlockingQueue<String> messages;
@Override
public void init(ServletConfig servletConfig) throws ServletException {
// Servlet 是线程不安全的,我要并发,
// 所有的用户的连接
streams = new CopyOnWriteArrayList<Writer>();
messages = new LinkedBlockingQueue<String>();
ServletContext servletContext = servletConfig.getServletContext();
servletContext.setAttribute("messages", messages);
// 我需要异步的通知所有的用户
Thread thread = new Thread(()->{
try {
// 获取最新的消息
final String message = messages.take();
// 广播所有的客户端
streams.forEach(writer -> {
try {
writer.write(message);
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (InterruptedException e) {
// 标记个状态,让线程停止。
Thread.currentThread().interrupt();
e.printStackTrace();
}
},"聊天广播线程");
thread.start();
}
public void event(CometEvent cometEvent) throws IOException, ServletException {
EventType eventType = cometEvent.getEventType();
switch (eventType) {
// Reactor 模式
case BEGIN:
onBegin(cometEvent);
break;
case READ:
onRead(cometEvent);
break;
case END:
onEnd(cometEvent);
break;
case ERROR:
onError(cometEvent);
break;
default:
break;
}
}
private void onBegin(CometEvent cometEvent)
throws IOException, ServletException {
HttpServletResponse response = cometEvent.getHttpServletResponse();
PrintWriter writer = response.getWriter();
// 增加一个用户
streams.add(writer);
System.out.println("当前在线的用户的数量:" + streams.size());
}
private void onRead(CometEvent cometEvent)
throws IOException, ServletException {
}
private void onEnd(CometEvent cometEvent)
throws IOException, ServletException {
HttpServletResponse response = cometEvent.getHttpServletResponse();
PrintWriter writer = response.getWriter();
// 完成的时候移除
streams.remove(writer);
System.out.println("当前在线的用户的数量:" + streams.size());
}
private void onError(CometEvent cometEvent)
throws IOException, ServletException {
// 错误的时候也要移除
onEnd(cometEvent);
}
}
聊天代码没有跑起来