|
| 1 | +## NioServerSocketChannel读取数据原理分析 |
| 2 | + |
| 3 | +NioServerSocketChannel是AbstractNioMessageChannel的子类,而NioSocketChannel是AbstractNioByteChannel的子类,并且他们都有一个公共的父类:AbstractChannel。 |
| 4 | + |
| 5 | +在Netty中Channel是用来定义对网络IO的读写操作的相关接口,与NIO的Channel接口类似。Channel的功能主要有网络IO的读写、客户端发起的连接、主动关闭连接、关闭链路、获取通信双方的网络地址等。一些公共的基础方法都在这个AbstractChannel抽象类中实现,但对于一些特定的功能则需要不同的实现类去实现,这样最大限度地实现了功能和接口的重用。 |
| 6 | + |
| 7 | +另外,AbstractChannel的构造方法中对Unsafe类和ChannelPipeline类进行了初始化。 |
| 8 | + |
| 9 | +## 1. NioServerSocketChannel源码分析 |
| 10 | + |
| 11 | +NioServerSocketChannel是AbstractNioMessageChannel的子类,由于它由服务端使用,并且只负责监听Socket的接入,不关心IO的读写,所以与NioSocketChannel相比要简单得多。 |
| 12 | + |
| 13 | +NioServerSocketChannel封装了NIO中的ServerSocketChannel,并通过newSocket()方法打开了ServerSocketChannel |
| 14 | + |
| 15 | +NioServerSocketChannel.class |
| 16 | + |
| 17 | +```java |
| 18 | + private static ServerSocketChannel newSocket(SelectorProvider provider) { |
| 19 | + try { |
| 20 | + return provider.openServerSocketChannel(); |
| 21 | + } catch (IOException e) { |
| 22 | + throw new ChannelException( |
| 23 | + "Failed to open a server socket.", e); |
| 24 | + } |
| 25 | + } |
| 26 | +``` |
| 27 | + |
| 28 | +对于NioServerSocketChannel注册至selector上的操作,是在AbstractNioChannel中实现的,源码如下: |
| 29 | + |
| 30 | +```java |
| 31 | + @Override |
| 32 | + protected void doRegister() throws Exception { |
| 33 | + boolean selected = false; |
| 34 | + for (;;) { |
| 35 | + try { |
| 36 | + selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); |
| 37 | + return; |
| 38 | + } catch (CancelledKeyException e) { |
| 39 | + if (!selected) { |
| 40 | + eventLoop().selectNow(); |
| 41 | + selected = true; |
| 42 | + } else { |
| 43 | + throw e; |
| 44 | + } |
| 45 | + } |
| 46 | + } |
| 47 | + } |
| 48 | +``` |
| 49 | + |
| 50 | +在ServerChannel的开启,selector上的注册等前期工作完成后,NioServerSocketChannel的开始监听新连接的加入,源码如下: |
| 51 | + |
| 52 | +```java |
| 53 | + @Override |
| 54 | + protected int doReadMessages(List<Object> buf) throws Exception { |
| 55 | + // 拿到jdk底层channel |
| 56 | + SocketChannel ch = SocketUtils.accept(javaChannel()); |
| 57 | + |
| 58 | + try { |
| 59 | + if (ch != null) { |
| 60 | + // new出一个NioSocketChannel,将jdk SocketChannel封装成NioSocketChannel,并且这里给NioSocketChannel注册了一个SelectionKey.OP_READ事件 |
| 61 | + buf.add(new NioSocketChannel(this, ch)); // 往buf里写入NioSocketChannel |
| 62 | + return 1; |
| 63 | + } |
| 64 | + } catch (Throwable t) { |
| 65 | + logger.warn("Failed to create a new channel from an accepted socket.", t); |
| 66 | + |
| 67 | + try { |
| 68 | + ch.close(); |
| 69 | + } catch (Throwable t2) { |
| 70 | + logger.warn("Failed to close a socket.", t2); |
| 71 | + } |
| 72 | + } |
| 73 | + |
| 74 | + return 0; |
| 75 | + } |
| 76 | +``` |
| 77 | + |
| 78 | +上面的源码展示了Netty最终拿到新连接请求后,将jdk底层的SocketChannel封装NioSocketChannel的过程,那么selector是如何获取到accept事件后,调用到这个doReadMessages方法的呢? |
| 79 | + |
| 80 | +为了分析原理的延续,故事还要回到bossGroup的NioEventLoop里,当bossGroup启动,NioServerSocketChannel实例新建并注册到selector之后,Netty的bossGroup就会运行一个NioEventLoop,它的核心工作就是作为一个selector一直去监听客户端发出的accept、connect、read、write等事件。具体逻辑查看NioEventLoop#run()方法,详细的原理请回看之前的NioEventLoop的原理分析,此处只分析NioEventLoop#run()获取到链接事件到调用NioServerSocketChannel#doReadMessages()的链路。 |
| 81 | + |
| 82 | +1. NioEventLoop#run()一直轮训,监听这客户端发出的事件,在轮训过程中如果有任务产生,则会优先执行这些任务,调用非阻塞的selectNow(),否则调用select(deadlineNanos)阻塞指定时间去监听客户端事件。 |
| 83 | +2. 调用NioEventLoop#processSelectedKeys(),Netty默认用的是优化过后的selectedKey,所以调用的是NioEventLoop#processSelectedKeysOptimized()方法。 |
| 84 | +3. 在processSelectedKeysOptimized方法里会遍历selectedKeys,去拿selectedKeys中的SelectionKey,这个key就是从网络中获取到的感兴趣事件。 |
| 85 | +4. 先通过SelectionKey获取attachment,及对应的事件channel。由于这里是获取的是accept事件,所以SelectionKey#attachment()获取到的是NioServerSocketChannel对象。 |
| 86 | +5. 在NioEventLoop#processSelectedKey()方法中,首先拿到NioServerSocketChannel父类AbstractNioMessageChannel中的NioMessageUnsafe对象,接着根据readyOps进行判断,这里当然就是SelectionKey.OP_ACCEPT事件。 |
| 87 | +6. 调用NioMessageUnsafe#read()方法,最终该方法调用了NioServerSocketChannel#doReadMessages(),完了之后会新建一个对SelectionKey.OP_READ事件感兴趣的NioSocketChannel对象,并存放在readBuf的一个集合中。 |
| 88 | +7. 接着调用ChannelPipeline#fireChannelRead()方法,目的在于最终调用ServerBootstrapAcceptor#channelRead()方法,调用childGroup#register(child),把新建的NioSocketChannel对象注册到selector上。 |
| 89 | + |
| 90 | +这样,NioServerSocketChannel监听accept事件,接收到客户端连接后,封装客户端的“连接”到NioSocketChannel对象,并注册到selector上,后面的网络IO的读写操作都由这个NioSocketChannel对象来负责处理。 |
| 91 | + |
| 92 | +上述核心的6步源码如下: |
| 93 | + |
| 94 | +NioEventLoop.class |
| 95 | +```java |
| 96 | + @Override |
| 97 | + protected void run() { |
| 98 | + for (;;) { |
| 99 | + try { |
| 100 | + try { |
| 101 | + switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { |
| 102 | + // ... 省略 |
| 103 | + case SelectStrategy.SELECT: |
| 104 | + select(wakenUp.getAndSet(false)); |
| 105 | + // ... 省略 |
| 106 | + if (wakenUp.get()) { |
| 107 | + selector.wakeup(); |
| 108 | + } |
| 109 | + // fall through |
| 110 | + default: |
| 111 | + } |
| 112 | + } catch (IOException e) { |
| 113 | + rebuildSelector0(); |
| 114 | + handleLoopException(e); |
| 115 | + continue; |
| 116 | + } |
| 117 | + // ... 省略 |
| 118 | + |
| 119 | + // 步骤1 |
| 120 | + processSelectedKeys(); |
| 121 | + runAllTasks(); |
| 122 | + |
| 123 | + // ... 省略 |
| 124 | + } catch (Throwable t) { |
| 125 | + handleLoopException(t); |
| 126 | + // ... 省略 |
| 127 | + } |
| 128 | + } |
| 129 | + } |
| 130 | +``` |
| 131 | + |
| 132 | +NioEventLoop.class |
| 133 | +```java |
| 134 | + // 步骤2 |
| 135 | + private void processSelectedKeysOptimized() { |
| 136 | + for (int i = 0; i < selectedKeys.size; ++i) { |
| 137 | + // 步骤3 |
| 138 | + final SelectionKey k = selectedKeys.keys[i]; |
| 139 | + selectedKeys.keys[i] = null; |
| 140 | + |
| 141 | + // 步骤4 |
| 142 | + final Object a = k.attachment(); |
| 143 | + |
| 144 | + if (a instanceof AbstractNioChannel) { |
| 145 | + // 步骤5 |
| 146 | + processSelectedKey(k, (AbstractNioChannel) a); |
| 147 | + } else { |
| 148 | + @SuppressWarnings("unchecked") |
| 149 | + NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; |
| 150 | + processSelectedKey(k, task); |
| 151 | + } |
| 152 | + |
| 153 | + if (needsToSelectAgain) { |
| 154 | + selectedKeys.reset(i + 1); |
| 155 | + |
| 156 | + selectAgain(); |
| 157 | + i = -1; |
| 158 | + } |
| 159 | + } |
| 160 | + } |
| 161 | +``` |
| 162 | + |
| 163 | +NioEventLoop.class |
| 164 | +```java |
| 165 | + private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
| 166 | + final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); |
| 167 | + if (!k.isValid()) { |
| 168 | + final EventLoop eventLoop; |
| 169 | + try { |
| 170 | + eventLoop = ch.eventLoop(); |
| 171 | + } catch (Throwable ignored) { |
| 172 | + return; |
| 173 | + } |
| 174 | + if (eventLoop != this || eventLoop == null) { |
| 175 | + return; |
| 176 | + } |
| 177 | + unsafe.close(unsafe.voidPromise()); |
| 178 | + return; |
| 179 | + } |
| 180 | + |
| 181 | + try { |
| 182 | + int readyOps = k.readyOps(); |
| 183 | + if ((readyOps & SelectionKey.OP_CONNECT) != 0) { |
| 184 | + int ops = k.interestOps(); |
| 185 | + ops &= ~SelectionKey.OP_CONNECT; |
| 186 | + k.interestOps(ops); |
| 187 | + |
| 188 | + unsafe.finishConnect(); |
| 189 | + } |
| 190 | + |
| 191 | + if ((readyOps & SelectionKey.OP_WRITE) != 0) { |
| 192 | + ch.unsafe().forceFlush(); |
| 193 | + } |
| 194 | + |
| 195 | + // 步骤5 |
| 196 | + if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { |
| 197 | + unsafe.read(); |
| 198 | + } |
| 199 | + } catch (CancelledKeyException ignored) { |
| 200 | + unsafe.close(unsafe.voidPromise()); |
| 201 | + } |
| 202 | + } |
| 203 | +``` |
| 204 | + |
| 205 | +NioServerSocketChannel.class |
| 206 | + |
| 207 | +```java |
| 208 | + @Override |
| 209 | + protected int doReadMessages(List<Object> buf) throws Exception { |
| 210 | + // 拿到jdk 的SocketChannel,代表着和客户端的一个连接socket |
| 211 | + SocketChannel ch = SocketUtils.accept(javaChannel()); |
| 212 | + |
| 213 | + try { |
| 214 | + if (ch != null) { |
| 215 | + // 步骤6 |
| 216 | + // 封装一个NioSocketChannel对象,并且设置感兴趣事件为:SelectionKey.OP_READ |
| 217 | + buf.add(new NioSocketChannel(this, ch)); |
| 218 | + return 1; |
| 219 | + } |
| 220 | + } catch (Throwable t) { |
| 221 | + logger.warn("Failed to create a new channel from an accepted socket.", t); |
| 222 | + |
| 223 | + try { |
| 224 | + ch.close(); |
| 225 | + } catch (Throwable t2) { |
| 226 | + logger.warn("Failed to close a socket.", t2); |
| 227 | + } |
| 228 | + } |
| 229 | + |
| 230 | + return 0; |
| 231 | + } |
| 232 | +``` |
| 233 | + |
| 234 | +ServerBootstrapAcceptor.class |
| 235 | + |
| 236 | +```java |
| 237 | + public void channelRead(ChannelHandlerContext ctx, Object msg) { |
| 238 | + final Channel child = (Channel) msg; |
| 239 | + |
| 240 | + child.pipeline().addLast(childHandler); |
| 241 | + |
| 242 | + setChannelOptions(child, childOptions, logger); |
| 243 | + setAttributes(child, childAttrs); |
| 244 | + |
| 245 | + try { |
| 246 | + // 步骤7 |
| 247 | + // 在workerGroup的NioEventLoop上的selector注册了NioSocketChannel |
| 248 | + childGroup.register(child).addListener(new ChannelFutureListener() { |
| 249 | + @Override |
| 250 | + public void operationComplete(ChannelFuture future) throws Exception { |
| 251 | + if (!future.isSuccess()) { |
| 252 | + forceClose(child, future.cause()); |
| 253 | + } |
| 254 | + } |
| 255 | + }); |
| 256 | + } catch (Throwable t) { |
| 257 | + forceClose(child, t); |
| 258 | + } |
| 259 | + } |
| 260 | +``` |
| 261 | + |
| 262 | +以上就是Netty中有关NioServerSocketChannel读取数据的底层原理分析。 |
| 263 | + |
| 264 | +下一篇分析NioSocketChannel的发送、读取数据底层原理。 |
0 commit comments