本文主要描述Netty的启动过程。
服务端启动样例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class ) .childHandler (channelInitializer ) .option (ChannelOption .SO_BACKLOG , 128) .childOption (ChannelOption .SO_KEEPALIVE , true ) ; ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }
Q:
1. 服务端的socket在哪里初始化?
2. 在哪里accept连接?
服务端启动伴随着4个过程:
创建服务端Channel
初始化服务端Channel
注册selector
端口绑定
服务端启动过程 创建服务端Channel 从bind()
的调用一步一步地深入,其中方法里
创建服务端Channel对象以及初始化
AbstractBootstrap#initAndRegister()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null ) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
1 创建channel
ReflectiveChannelFactory#newChannel()
1 2 3 4 5 6 7 public T newChannel () { try { return (Channel)this .clazz.getConstructor().newInstance(); } catch (Throwable var2) { throw new ChannelException("Unable to create Channel from class " + this .clazz, var2); } }
上述代码在ReflectiveChannelFactory
类里,它是ChannelFactory
的一种实现方式,是将该对象的clazz字段反射实例化生成channel,那么这个clazz又是从何而来呢? 这就要追溯到.channel(NioServerSocketChannel.class)
这行代码里:
AbstractBootstrap#channel()
1 2 3 4 5 6 7 public B channel (Class<? extends C> channelClass) { if (channelClass == null ) { throw new NullPointerException("channelClass" ); } else { return this .channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass))); } }
设置channelFactory
字段。将class传入到ReflectiveChannelFactory
构造函数。之前newChannel反射实例化的channel就是NioServerSocketChannel
反射创建服务端Channel 1.1 调用jdk底层方法,创建ServerSocketChannel
NioServerSocketChannel#NioServerSocketChannel()
1 2 3 4 public NioServerSocketChannel () { this (newSocket(DEFAULT_SELECTOR_PROVIDER)); }
1 2 3 4 5 6 7 8 private static java.nio.channels.ServerSocketChannel newSocket (SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException var2) { throw new ChannelException("Failed to open a server socket." , var2); } }
1.2 NioServerSocketChannelConfig:tcp参数配置类
1 2 3 4 public NioServerSocketChannel (java.nio.channels.ServerSocketChannel channel) { super ((Channel)null , channel, 16 ); this .config = new NioServerSocketChannel.NioServerSocketChannelConfig(this , this .javaChannel().socket()); }
1.3 设置非阻塞模式
在父类构造函数中ch.configureBlocking(false);
设置为非阻塞模式。
1.4 AbstractChannel
1 2 3 4 5 6 7 protected AbstractChannel (Channel parent) { this .parent = parent; this .id = this .newId(); this .unsafe = this .newUnsafe(); this .pipeline = this .newChannelPipeline(); }
2 初始化channel
设置:tcp参数(options)、一些用户自定义的属性(attributeKey)
配置服务端handler加入到pipeline
创建一个Acceptor连接器 下文会提到
注册selector
为了实现NIO中把ServerSocketChannel注册到 Selector中去,这样就是可以实现client请求的监听
将channel注册到selector
ChannelFuture regFuture = config().group().register(channel);
AbstractChannel$AbstractUnsafe#register()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null ) { throw new NullPointerException("eventLoop" ); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already" )); return ; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return ; } AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
config:ServerBootstrapConfig
group:EventLoopGroup
这里调用的register(),会调用父类的next(), chooser策略从 EventExecutor[]数组中选择一个 ,使用最终会调用unsafe接口的register方法,AbstractChannel里的内部类AbstractUnsafe实现了该接口。
3.2 实际注册方法
AbstractChannel$AbstractUnsafe#register0()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !this .ensureOpen(promise)) { return ; } boolean firstRegistration = this .neverRegistered; AbstractChannel.this .doRegister(); this .neverRegistered = false ; AbstractChannel.this .registered = true ; AbstractChannel.this .pipeline.invokeHandlerAddedIfNeeded(); this .safeSetSuccess(promise); AbstractChannel.this .pipeline.fireChannelRegistered(); if (AbstractChannel.this .isActive()) { if (firstRegistration) { AbstractChannel.this .pipeline.fireChannelActive(); } else if (AbstractChannel.this .config().isAutoRead()) { this .beginRead(); } } } catch (Throwable var3) { this .closeForcibly(); AbstractChannel.this .closeFuture.setClosed(); this .safeSetFailure(promise, var3); } }
3.2.1 jdk底层注册
AbstractNioChannel#doRegister()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected void doRegister () throws Exception { boolean selected = false ; while (true ) { try { this .selectionKey = this .javaChannel().register(this .eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException var3) { if (selected) { throw var3; } this .eventLoop().selectNow(); selected = true ; } } }
总结:
对应的NIO线程与当前的channel进行绑定
然后实际注册
调用jdk底层的注册,把实际的channel绑定到selector
触发对应用户的handlerAdded事件
传播对应用户的channelRegistered事件
initAndRegister()
到此已经完毕了,包括了channel的创建、初始化、注册过程
端口绑定 doBind()
->doBind0()
->AbstractChannel内部类unsafe的bind
AbstractChannel$AbstractUnsafe#bind()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested." ); } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
4.1 调用jdk底层的bind
NioServerSocketChannel#doBind()
1 2 3 4 5 6 7 8 @Override protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
4.2 绑定之后,active变更,触发事件
1 2 3 4 5 6 7 8 9 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireChannelActive(); } }); }
DefaultChannelPipeline$HeadContext#channelActive();
1 2 3 4 5 6 7 8 9 @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); }
4.2.1.2 传播read事件,最终事件执行逻辑
AbstractChannel$AbstractUnsafe#beginRead()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public final void beginRead () { assertEventLoop(); if (!isActive()) { return ; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
4.2.1.2.1 selector注册一个accept的感兴趣事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
总结
反射创建channel,包装成Netty自己的channel
初始化的过程,添加一些组件,例如pipeline,为此添加一个handler
将jdk的channel绑定到事件轮询器selector,并且将netty抽象的一个channel作为一个attchment绑定jdk底层的channel
调用底层的bind,监听端口,向select注册一个OP_ACCEPT事件,这样Netty就可以接收新的连接了