if (executor == null) { //1.1 线程创建器 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];//1.2 构造NioEventLoop
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args);//1.2 构造NioEventLoop success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type thrownew IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; }
long time = System.nanoTime(); // 说明执行完一次阻塞式select操作,如果小于发生了空轮询 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //空轮询次数 selectCnt = 1; } elseif (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } }
privatevoidrebuildSelector0(){ final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;
if (oldSelector == null) { return; }
try { newSelectorTuple = openSelector();//1 } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); //NioSocketChannel try { //jdk底层channel可能会多次向selector注册,保存到keys[]里 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; }
int interestOps = key.interestOps(); // 获取兴趣集 key.cancel(); //取消 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); // 将老的兴趣集重新注册到前面新创建的selector上 if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } }
try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); // 关闭老的seleclor } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }
if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
private SelectorTuple openSelector(){ final Selector unwrappedSelector; try { //3.2.1.1 jdk底层获取原生的selector unwrappedSelector = provider.openSelector(); } catch (IOException e) { thrownew ChannelException("failed to open a new selector", e); }
//默认是false,需要优化 if (DISABLE_KEYSET_OPTIMIZATION) {
returnnew SelectorTuple(unwrappedSelector); }
//... //3.2.1.1 需要优化的,进行包装 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); // SelectedSelectionKeySet是我们优化的类型,底层是数组,add能达到O(1)
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run(){ try { //3.2.1.2 反射获取selectedKeys、publicSelectedKeys的field Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); returnnull; } // We could not retrieve the offset, lets try reflection as last-resort. }
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil. (publicSelectedKeysField, true); if (cause != null) { return cause; }
if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) {
return; }
if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore //3.2.2.2 如果key是不合法的,则调用关闭 unsafe.close(unsafe.voidPromise()); return; }
try { int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 先处理连接事件 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
//3.3.3 任务的执行 finallong deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } }
while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // 如果向taskQueue添加失败 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); // 因为之前把定时队列取出的时候会remove,所以需要重新添加回去 returnfalse; } scheduledTask = pollScheduledTask(nanoTime); // 继续取出定时队列的一个任务,scheduledTaskQueue是一个优先级队列 } returntrue; }
// Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } }