###XNIO
undertow 是以XNIO为实现核心。XNIO有两个核心概念:
####Channel
Channel,是传输管道的抽象概念,在NIO的Channel上进行的扩展加强,使用ChannelListener API进行事件通知。在创建Channel时,就赋予IO线程,用于执行所有的ChannelListener回调方法。
####IOWorker
区分IO线程和工作线程,创建一个工作线程池可以用来执行阻塞任务。一般情况下,非阻塞的Handler由IO线程执行,而阻塞任务比如Servlet则被调度到工作线程池执行。这样就很好的区分了阻塞和非阻塞的两种情形。
- WORKER_IO_THREADS, IO thread处理非阻塞任务,要保证不做阻塞操作,因为很多连接同时用到这类线程,类似于nodejs中的loop,这个线程只要有任务就去执行,实际配置时每个CPU一个线程比较好。
- WORKER_TASK_CORE_THREADS,用于执行阻塞任务,从线程池中获得,任务完成后返回到线程池中。因为不同应用对应的服务器负载不同,所以不易给出具体数值,一般建议每个CPU core设置10个。
我们知道NIO的基本要求是不阻塞当前线程的执行,对于非阻塞请求的结果,可以用两种方式获得:一种是对于请求很快返回一个引用(如JDK中Future,XNIO中称为IoFuture,其中很多方法是类似的),过一段时间再查询结果;还有一种是当结果就绪时,调用事先注册的回调方法来通知(如NIO2的CompletionHandler,XNIO的ChannelListener)。显而易见后者效率更高一些,避免了数据未就绪情景下的无用处理过程。但JDK7之前无法将函数作为方法参数,所以只能用Java的匿名内部类来模拟函数式方法,造成代码嵌套层次过多,难以理解和维护,所以Netty和XNIO这样的框架通过调度方法调用过程,简化了编程工作。
XNIO和Netty都对ByteBuffer进行池化管理,简单来说就是开发者在程序开始时就计划好读写缓存区大小,统一分配好放到池中,Xnio中有Pool和Pooled接口用来管理池化缓存区。开发过高并发应用就知道,JVM GC经常出现并难以控制是很头疼的问题。我们通常在接收网络数据时,往往简单的new出一块数据区,填充,解析,使用,最后丢弃,这种方法随着大量的数据读入,必然造成GC反复出现。重用缓存区就可以在这个方面解决一部分问题。
和Netty的ChannelHandler不同,XNIO对应的ChannelListener只有一个方法handleEvent(),也就意味着所有的事件都要经由这个方法。在实际实行过程中,会进行若干状态机的转变,比如在服务器端,开始时accept状态就绪,当连接建立后转变为可读或者可写状态。请参见下面的例子。
除了Channel 和 IOWorker 两个重要的基础,XNIO还提供了SSL支持
undertow core
####Listeners
目前undertow中支持的Listner类型主要有
- HTTP
- HTTPS
- AJP
- HTTP2
####Handler
undertow原生提供了io.undertow.server.HttpHandler,接口定义比较简单:1
2
3public interface HttpHandler {
void handleRequest(HttpServerExchange exchange) throws Exception;
}
undertow中并没有pipeline的概念,但是可以在构建hanlder时指定next,如下代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class SetHeaderHandler implements HttpHandler {
private final HttpString header;
private final String value;
private final HttpHandler next;
public SetHeaderHandler(final HttpHandler next, final String header, final String value) {
this.next = next;
this.value = value;
this.header = new HttpString(header);
}
public void handleRequest(final HttpServerExchange exchange) throws Exception {
exchange.getResponseHeaders().put(header, value);
next.handleRequest(exchange);
}
}
或者类似下面的方式,只做自身逻辑的处理,不做传递:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class FilterHandler implements HttpHandler {
private final Map<DispatcherType, List<ManagedFilter>> filters;
private final Map<DispatcherType, Boolean> asyncSupported;
private final boolean allowNonStandardWrappers;
private final HttpHandler next;
public FilterHandler(final Map<DispatcherType, List<ManagedFilter>> filters, final boolean allowNonStandardWrappers, final HttpHandler next) {
this.allowNonStandardWrappers = allowNonStandardWrappers;
this.next = next;
this.filters = new EnumMap<>(filters);
Map<DispatcherType, Boolean> asyncSupported = new EnumMap<>(DispatcherType.class);
for(Map.Entry<DispatcherType, List<ManagedFilter>> entry : filters.entrySet()) {
boolean supported = true;
for(ManagedFilter i : entry.getValue()) {
if(!i.getFilterInfo().isAsyncSupported()) {
supported = false;
break;
}
}
asyncSupported.put(entry.getKey(), supported);
}
this.asyncSupported = asyncSupported;
}
public void handleRequest(final HttpServerExchange exchange) throws Exception {
final ServletRequestContext servletRequestContext = exchange.getAttachment(ServletRequestContext.ATTACHMENT_KEY);
ServletRequest request = servletRequestContext.getServletRequest();
ServletResponse response = servletRequestContext.getServletResponse();
DispatcherType dispatcher = servletRequestContext.getDispatcherType();
Boolean supported = asyncSupported.get(dispatcher);
if(supported != null && ! supported) {
exchange.putAttachment(AsyncContextImpl.ASYNC_SUPPORTED, false );
}
final List<ManagedFilter> filters = this.filters.get(dispatcher);
if(filters == null) {
next.handleRequest(exchange);
} else {
final FilterChainImpl filterChain = new FilterChainImpl(exchange, filters, next, allowNonStandardWrappers);
filterChain.doFilter(request, response);
}
}
####组装服务器
1.创建XNIO Workder
2.创建 XNIO SSL实例
3.Create an instance of the relevant Undertow listener class
4.Open a server socket using XNIO and set its accept listener1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38Xnio xnio = Xnio.getInstance();
XnioWorker worker = xnio.createWorker(OptionMap.builder()
.set(Options.WORKER_IO_THREADS, ioThreads)
.set(Options.WORKER_TASK_CORE_THREADS, workerThreads)
.set(Options.WORKER_TASK_MAX_THREADS, workerThreads)
.set(Options.TCP_NODELAY, true)
.getMap());
OptionMap socketOptions = OptionMap.builder()
.set(Options.WORKER_IO_THREADS, ioThreads)
.set(Options.TCP_NODELAY, true)
.set(Options.REUSE_ADDRESSES, true)
.getMap();
Pool<ByteBuffer> buffers = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR,bufferSize, bufferSize * buffersPerRegion);
if (listener.type == ListenerType.AJP) {
AjpOpenListener openListener = new AjpOpenListener(buffers, serverOptions, bufferSize);
openListener.setRootHandler(rootHandler);
ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
AcceptingChannel<? extends StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), acceptListener, socketOptions);
server.resumeAccepts();
} else if (listener.type == ListenerType.HTTP) {
HttpOpenListener openListener = new HttpOpenListener(buffers, OptionMap.builder().set(UndertowOptions.BUFFER_PIPELINED_DATA, true).addAll(serverOptions).getMap(), bufferSize);
openListener.setRootHandler(rootHandler);
ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
AcceptingChannel<? extends StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), acceptListener, socketOptions);
server.resumeAccepts();
} else if (listener.type == ListenerType.HTTPS){
HttpOpenListener openListener = new HttpOpenListener(buffers, OptionMap.builder().set(UndertowOptions.BUFFER_PIPELINED_DATA, true).addAll(serverOptions).getMap(), bufferSize);
openListener.setRootHandler(rootHandler);
ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
XnioSsl xnioSsl;
if(listener.sslContext != null) {
xnioSsl = new JsseXnioSsl(xnio, OptionMap.create(Options.USE_DIRECT_BUFFERS, true), listener.sslContext);
} else {
xnioSsl = xnio.getSslProvider(listener.keyManagers, listener.trustManagers, OptionMap.create(Options.USE_DIRECT_BUFFERS, true));
}
AcceptingChannel <SslConnection> sslServer = xnioSsl.createSslConnectionServer(worker, new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), (ChannelListener) acceptListener, socketOptions);
sslServer.resumeAccepts();
}
###undertow servlet
类似与jetty,undertow 也可以部署采用部署war包的方式启动项目:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22DeploymentInfo servletBuilder = Servlets.deployment()
.setClassLoader(ServletServer.class.getClassLoader())
.setContextPath("/myapp")
.setDeploymentName("test.war")
.addServlets(
Servlets.servlet("MessageServlet", MessageServlet.class)
.addInitParam("message", "Hello World")
.addMapping("/*"),
Servlets.servlet("MyServlet", MessageServlet.class)
.addInitParam("message", "MyServlet")
.addMapping("/myservlet"));
DeploymentManager manager = Servlets.defaultContainer().addDeployment(servletBuilder);
manager.deploy();
PathHandler path = Handlers.path(Handlers.redirect("/myapp"))
.addPrefixPath("/myapp", manager.start());
Undertow server = Undertow.builder()
.addHttpListener(8080, "localhost")
.setHandler(path)
.build();
server.start();
###Refernces
Undertow服务器基础分析 - 概述
Undertow服务器基础分析 - XNIO
Undertow服务器基础分析
undertow cookdoc