前面 ,我们分析了 Netty 中的 Channel 组件,本篇我们来介绍一下与 Channel 关联的另一个核心的组件 —— EventLoop 。
Netty 版本:4.1.30
概述 EventLoop 定义了 Netty 的核心抽象,用于处理网络连接生命周期中所有发生的事件。
我们先来从一个比较高的视角来了解一下 Channels、Thread、EventLoops、EventLoopGroups 之间的关系。
上图是表示了拥有 4 个 EventLoop 的 EventLoopGroup 处理 IO 的流程图。它们之间的关系如下:
一个 EventLoopGroup 包含一个或多个 EventLoop
一个 EventLoop 在它的生命周期内只和一个 Thread 绑定
所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理
一个 Channel 在它的生命周期内只注册于一个 EventLoop
一个 EventLoop 可能会被分配给一个或多个 Channel
EventLoop 原理 下图是 Netty EventLoop 相关类的 UML 图。从中我们可以看到 EventLoop 相关的类都是实现了 java.util.concurrent
包中的 ExecutorService 接口。我们可以直接将任务 (Runable 或 Callable) 提交给 EventLoop 去立即执行或定时执行。
例如,使用 EventLoop 去执行定时任务,样例代码:
1 2 3 4 5 public static void scheduleViaEventLoop () { Channel ch = new NioSocketChannel (); ScheduledFuture<?> future = ch.eventLoop().schedule( () -> System.out.println("60 seconds later" ), 60 , TimeUnit.SECONDS); }
Thread 管理 Netty 线程模型的高性能主要取决于当前所执行线程的身份的确定。一个线程提交到 EventLoop 执行的流程如下:
将 Task 任务提交给 EventLoop 执行
在 Task 传递到 execute 方法之后,检查当前要执行的 Task 的线程是否是分配给 EventLoop 的那个线程
如果是,则该线程会立即执行
如果不是,则将线程放入任务队列中,等待下一次执行
其中,Netty 中的每一个 EventLoop 都有它自己的任务队列,并且和其他的 EventLoop 的任务队列独立开来。
Thread 分配 服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。根据不同的传输实现,EventLoop 的创建和分配方式也不同。
NIO 传输
在 NIO 传输方式中,使用尽可能少的 EventLoop 就可以服务多个 Channel。如图所示,EventLoopGroup 采用顺序循环的方式负责为每一个新创建的 Channel 分配 EventLoop,每一个 EventLoop 会被分配给多个 Channels。
一旦一个 Channel 被分配给了一个 EventLoop,则这个 Channel 的生命周期内,只会绑定这个 EventLoop。这就让我们在 ChannelHandler 的实现省去了对线程安全和同步问题的担心。
OIO 传输
与 NIO 方式的不同在于,一个 EventLoop 只会服务于一个 Channel。
NioEventLoop & NioEventLoopGroup 创建 初步了解了 EventLoop 以及 EventLoopGroup 的工作机制,接下来我们以 NioEventLoopGroup 为例,来深入分析 NioEventLoopGroup 是如何创建的,又是如何启动的,它的内部执行逻辑又是怎样的等等问题。
MultithreadEventExecutorGroup 构造器 我们从 NioEventLoopGroup 的构造函数开始分析:
1 EventLoopGroup acceptorEventLoopGroup = new NioEventLoopGroup (1 );
NioEventLoopGroup 构造函数会调用到父类 MultithreadEventLoopGroup 的构造函数,默认情况下,EventLoop 的数量 = 处理器数量 x 2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1 , SystemPropertyUtil.getInt( "io.netty.eventLoopThreads" , NettyRuntime.availableProcessors() * 2 )); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}" , DEFAULT_EVENT_LOOP_THREADS); } } protected MultithreadEventLoopGroup (int nThreads, Executor executor, Object... args) { super (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } ... }
继续调用父类,会调用到 MultithreadEventExecutorGroup 的构造器,主要做三件事情:
创建线程任务执行器 ThreadPerTaskExecutor
通过 for 循环创建数量为 nThreads 个的 EventLoop
创建 EventLoop 选择器 EventExecutorChooser
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 protected MultithreadEventExecutorGroup (int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0 ) { throw new IllegalArgumentException (String.format("nThreads: %d (expected: > 0)" , nThreads)); } if (executor == null ) { executor = new ThreadPerTaskExecutor (newDefaultThreadFactory()); } children = new EventExecutor [nThreads]; for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } finally { if (!success) { for (int j = 0 ; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0 ; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break ; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener <Object>() { @Override public void operationComplete (Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null ); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet <EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
创建线程任务执行器 ThreadPerTaskExecutor 1 2 3 if (executor == null ) { executor = new ThreadPerTaskExecutor (newDefaultThreadFactory()); }
线程任务执行器 ThreadPerTaskExecutor 源码如下,具体的任务都由 ThreadFactory 去执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor (ThreadFactory threadFactory) { if (threadFactory == null ) { throw new NullPointerException ("threadFactory" ); } this .threadFactory = threadFactory; } @Override public void execute (Runnable command) { threadFactory.newThread(command).start(); } }
来看看 newDefaultThreadFactory 方法:
1 2 3 protected ThreadFactory newDefaultThreadFactory () { return new DefaultThreadFactory (getClass()); }
DefaultThreadFactory 接下来看看 DefaultThreadFactory 这个类,实现了 ThreadFactory 接口,我们可以了解到:
EventLoopGroup 的命名规则
具体的线程为 FastThreadLocalThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolId = new AtomicInteger (); private final AtomicInteger nextId = new AtomicInteger (); private final String prefix; private final boolean daemon; private final int priority; protected final ThreadGroup threadGroup; public DefaultThreadFactory (Class<?> poolType) { this (poolType, false , Thread.NORM_PRIORITY); } ... public static String toPoolName (Class<?> poolType) { if (poolType == null ) { throw new NullPointerException ("poolType" ); } String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0 : return "unknown" ; case 1 : return poolName.toLowerCase(Locale.US); default : if (Character.isUpperCase(poolName.charAt(0 )) && Character.isLowerCase(poolName.charAt(1 ))) { return Character.toLowerCase(poolName.charAt(0 )) + poolName.substring(1 ); } else { return poolName; } } } public DefaultThreadFactory (String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null ) { throw new NullPointerException ("poolName" ); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException ( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)" ); } prefix = poolName + '-' + poolId.incrementAndGet() + '-' ; this .daemon = daemon; this .priority = priority; this .threadGroup = threadGroup; } public DefaultThreadFactory (String poolName, boolean daemon, int priority) { this (poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } @Override public Thread newThread (Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { } return t; } protected Thread newThread (Runnable r, String name) { return new FastThreadLocalThread (threadGroup, r, name); } }
创建 NioEventLoop 继续从 MultithreadEventExecutorGroup 构造器开始,创建完任务执行器 ThreadPerTaskExecutor 之后,进入 for 循环,开始创建 NioEventLoop:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } ... }
NioEventLoopGroup 类中的 newChild()
方法:
1 2 3 4 5 @Override protected EventLoop newChild (Executor executor, Object... args) throws Exception { return new NioEventLoop (this , executor, (SelectorProvider) args[0 ], ((SelectStrategyFactory) args[1 ]).newSelectStrategy(), (RejectedExecutionHandler) args[2 ]); }
NioEventLoop 构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final class NioEventLoop extends SingleThreadEventLoop { ... NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super (parent, executor, false , DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null ) { throw new NullPointerException ("selectorProvider" ); } if (strategy == null ) { throw new NullPointerException ("selectStrategy" ); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } ... }
接下来我们看看 获取多路复用选择器 方法 —— openSelector () ,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization" , false ); private SelectorTuple openSelector () { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException ("failed to open a new selector" , e); } if (DISABLE_KEYSET_OPTIMIZATION) { return new SelectorTuple (unwrappedSelector); } Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction <Object>() { @Override public Object run () { try { return Class.forName( "sun.nio.ch.SelectorImpl" , false , PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}" , unwrappedSelector, t); } return new SelectorTuple (unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet (); Object maybeException = AccessController.doPrivileged(new PrivilegedAction <Object>() { @Override public Object run () { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys" ); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys" ); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1 ) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null ; } } Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true ); if (cause != null ) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true ); if (cause != null ) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null ; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null ; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}" , unwrappedSelector, e); return new SelectorTuple (unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}" , unwrappedSelector); return new SelectorTuple (unwrappedSelector, new SelectedSelectionKeySetSelector (unwrappedSelector, selectedKeySet)); }
优化后的 SelectedSelectionKeySet 对象,内部采用 SelectionKey 数组的形式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 final class SelectedSelectionKeySet extends AbstractSet <SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey [1024 ]; } @Override public boolean add (SelectionKey o) { if (o == null ) { return false ; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true ; } @Override public boolean remove (Object o) { return false ; } @Override public boolean contains (Object o) { return false ; } @Override public int size () { return size; } @Override public Iterator<SelectionKey> iterator () { return new Iterator <SelectionKey>() { private int idx; @Override public boolean hasNext () { return idx < size; } @Override public SelectionKey next () { if (!hasNext()) { throw new NoSuchElementException (); } return keys[idx++]; } @Override public void remove () { throw new UnsupportedOperationException (); } }; } void reset () { reset(0 ); } void reset (int start) { Arrays.fill(keys, start, size, null ); size = 0 ; } private void increaseCapacity () { SelectionKey[] newKeys = new SelectionKey [keys.length << 1 ]; System.arraycopy(keys, 0 , newKeys, 0 , size); keys = newKeys; } }
SingleThreadEventLoop 构造器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { ... protected SingleThreadEventLoop (EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super (parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); } ... }
SingleThreadEventExecutor 构造器,主要做两件事情:
设置线程任务执行器。
设置任务队列。前面讲到 EventLoop 对于不能立即执行的 Task 会放入一个队列中,就是这里设置的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ... protected SingleThreadEventExecutor (EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super (parent); this .addTaskWakesUp = addTaskWakesUp; this .maxPendingTasks = Math.max(16 , maxPendingTasks); this .executor = ObjectUtil.checkNotNull(executor, "executor" ); taskQueue = newTaskQueue(this .maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler" ); } ... }
NioEventLoop 中对 newTaskQueue 接口的实现,返回的是 JCTools 工具包 Mpsc 队列。后面我们写文章单独介绍 JCTools 中的相关队列。
Mpsc:Multi Producer Single Consumer (Lock less, bounded and unbounded)
多个生产者对单个消费者(无锁、有界和无界都有实现)
1 2 3 4 5 6 7 8 9 10 11 12 13 public final class NioEventLoop extends SingleThreadEventLoop { ... @Override protected Queue<Runnable> newTaskQueue (int maxPendingTasks) { return maxPendingTasks = = Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); } ... }
创建线程执行选择器 chooser 接下来,我们看看 MultithreadEventExecutorGroup 构造器的最后一个部分内容,创建线程执行选择器 chooser,它的主要作用就是 EventLoopGroup 用于从 EventLoop 数组中选择一个 EventLoop 去执行任务。
1 2 chooser = chooserFactory.newChooser(children);
EventLoopGroup 中定义的 next()
接口:
1 2 3 4 5 6 7 8 9 10 11 public interface EventLoopGroup extends EventExecutorGroup { ... @Override EventLoop next () ; ... }
MultithreadEventExecutorGroup 中对 next () 的实现:
1 2 3 4 5 @Override public EventExecutor next () { return chooser.next(); }
DefaultEventExecutorChooserFactory 对于如何从数组中选择任务执行器,也做了巧妙的优化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory (); private DefaultEventExecutorChooserFactory () { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser (EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser (executors); } else { return new GenericEventExecutorChooser (executors); } } private static boolean isPowerOfTwo (int val) { return (val & -val) == val; } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger (); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this .executors = executors; } @Override public EventExecutor next () { return executors[idx.getAndIncrement() & executors.length - 1 ]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger (); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this .executors = executors; } @Override public EventExecutor next () { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }
小结 通过本节内容,我们了解到了 EventLoop 与 EventLoopGroup 的基本原理,EventLoopGroup 与 EventLoop 的创建过程:
创建线程任务执行器 ThreadPerTaskExecutor
创建 EventLoop
创建任务选择器 EventExecutorChooser
参考资料