// 如果当前任务队列为空,并且超时时间未到,则进行一个阻塞式的selector操作。timeoutMillis 为最大的select时间 intselectedKeys= selector.select(timeoutMillis); // 操作计数 +1 selectCnt ++; // 存在以下情况,本次selector则终止 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - 轮训到了事件(Selected something,) // - 被用户唤醒(waken up by user,) // - 已有任务队列(the task queue has a pending task.) // - 已有定时任务(a scheduled task is ready for processing) break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; }
// 记录当前时间 longtime= System.nanoTime(); // 如果time > currentTimeNanos + timeoutMillis(超时时间),则表明已经执行过一次select操作 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } // 如果 time <= currentTimeNanos + timeoutMillis,表示触发了空轮训 // 如果空轮训的次数超过 SELECTOR_AUTO_REBUILD_THRESHOLD (512),则重建一个新的selctor,避免空轮训 elseif (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
try { // 创建一个新的 SelectorTuple // openSelector()在之前分析过了 newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. // 记录select上注册的channel数量 intnChannels=0; // 遍历老的 selector 上的 SelectionKey for (SelectionKey key: oldSelector.keys()) { // 获取 attachment,这里的attachment就是我们前面在讲 Netty Channel注册时,select会将channel赋值到 attachment 变量上。 // 获取老的selector上注册的channel Objecta= key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } // 获取兴趣集 intinterestOps= key.interestOps(); // 取消 SelectionKey key.cancel(); // 将老的兴趣集重新注册到前面新创建的selector上 SelectionKeynewKey= key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } // nChannels计数 + 1 nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannelch= (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } // 设置新的 selector selector = newSelectorTuple.selector; // 设置新的 unwrappedSelector unwrappedSelector = newSelectorTuple.unwrappedSelector;
try { // time to close the old selector as everything else is registered to the new one // 关闭老的seleclor oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }
if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
privatevoidprocessSelectedKeysOptimized() { for (inti=0; i < selectedKeys.size; ++i) { finalSelectionKeyk= selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 // 设置为null,有利于GC回收 selectedKeys.keys[i] = null; // 获取 SelectionKey 中的 attachment, 我们这里就是 NioChannel finalObjecta= k.attachment();
if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
selectAgain(); i = -1; } } }
// 处理 SelectedKey privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch) { // 获取Netty Channel中的 NioUnsafe 对象,用于后面的IO操作 final AbstractNioChannel.NioUnsafeunsafe= ch.unsafe(); // 判断 SelectedKey 的有效性,如果无效,则直接返回并关闭channel if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore // 关闭channel unsafe.close(unsafe.voidPromise()); return; } try { // 获取 SelectionKey 中所有准备就绪的操作集 intreadyOps= k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // 在调用处理READ与WRITE事件之间,先调用finishConnect()接口,避免异常 NotYetConnectedException 发生。 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 intops= k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. // 处理 WRITE 事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 处理 ACCEPT 与 READ 事件 // 如果当前的EventLoop是WorkGroup,则表示有 READ 事件 // 如果当前的EventLoop是BossGroup,则表示有 ACCEPT 事件,有新连接进来了 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 读取数据 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
// 将定时任务从定时队列中取出,聚合到普通队列中: privatebooleanfetchFromScheduledTaskQueue() { // 得到nanoTime = 当前时间 - ScheduledFutureTask的START_TIME(开始时间) longnanoTime= AbstractScheduledEventExecutor.nanoTime(); // 获得截止时间小于nanoTime的定时任务 RunnablescheduledTask= pollScheduledTask(nanoTime); while (scheduledTask != null) { // 将定时任务放入普通队列中,以备运行 if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. // 如果 taskQueue 没有足够的空间,导致添加失败,则将其返回定时任务队列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); returnfalse; } scheduledTask = pollScheduledTask(nanoTime); } returntrue; }