Netty NioEventLoop 创建过程源码分析

前面 ,我们分析了Netty中的Channel组件,本篇我们来介绍一下与Channel关联的另一个核心的组件 —— EventLoop

Netty版本:4.1.30

概述

EventLoop定义了Netty的核心抽象,用于处理网络连接生命周期中所有发生的事件。

我们先来从一个比较高的视角来了解一下Channels、Thread、EventLoops、EventLoopGroups之间的关系。

Netty-EventLoop-Channel

上图是表示了拥有4个EventLoop的EventLoopGroup处理IO的流程图。它们之间的关系如下:

  • 一个 EventLoopGroup包含一个或多个EventLoop
  • 一个 EventLoop在它的生命周期内只和一个Thread绑定
  • 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册于一个EventLoop
  • 一个EventLoop可能会被分配给一个或多个Channel

EventLoop 原理

下图是Netty EventLoop相关类的UML图。从中我们可以看到EventLoop相关的类都是实现了 java.util.concurrent 包中的 ExecutorService 接口。我们可以直接将任务(Runable 或 Callable) 提交给EventLoop去立即执行或定时执行。

Netty EventLoop UML

例如,使用EventLoop去执行定时任务,样例代码:

1
2
3
4
5
public static void scheduleViaEventLoop() {
Channel ch = new NioSocketChannel();
ScheduledFuture<?> future = ch.eventLoop().schedule(
() -> System.out.println("60 seconds later"), 60, TimeUnit.SECONDS);
}

Thread 管理

Netty线程模型的高性能主要取决于当前所执行线程的身份的确定。一个线程提交到EventLoop执行的流程如下:

  • 将Task任务提交给EventLoop执行
  • 在Task传递到execute方法之后,检查当前要执行的Task的线程是否是分配给EventLoop的那个线程
  • 如果是,则该线程会立即执行
  • 如果不是,则将线程放入任务队列中,等待下一次执行

其中,Netty中的每一个EventLoop都有它自己的任务队列,并且和其他的EventLoop的任务队列独立开来。

Nettu EventLoop Thread management

Thread 分配

服务于Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根据不同的传输实现,EventLoop的创建和分配方式也不同。

NIO传输

Netty EventLoop Thread allocation NIO

在NIO传输方式中,使用尽可能少的EventLoop就可以服务多个Channel。如图所示,EventLoopGroup采用顺序循环的方式负责为每一个新创建的Channel分配EventLoop,每一个EventLoop会被分配给多个Channels。

一旦一个Channel被分配给了一个EventLoop,则这个Channel的生命周期内,只会绑定这个EventLoop。这就让我们在ChannelHandler的实现省去了对线程安全和同步问题的担心。

OIO传输

Netty EventLoop Thread allocation OIO

与NIO方式的不同在于,一个EventLoop只会服务于一个Channel。

NioEventLoop & NioEventLoopGroup 创建

初步了解了 EventLoop 以及 EventLoopGroup 的工作机制,接下来我们以 NioEventLoopGroup 为例,来深入分析 NioEventLoopGroup 是如何创建的,又是如何启动的,它的内部执行逻辑又是怎样的等等问题。

MultithreadEventExecutorGroup 构造器

我们从 NioEventLoopGroup 的构造函数开始分析:

1
EventLoopGroup acceptorEventLoopGroup = new NioEventLoopGroup(1);

NioEventLoopGroup构造函数会调用到父类 MultithreadEventLoopGroup 的构造函数,默认情况下,EventLoop的数量 = 处理器数量 x 2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

private static final int DEFAULT_EVENT_LOOP_THREADS;

// 默认情况下,EventLoop的数量 = 处理器数量 x 2
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

...
}

继续调用父类,会调用到 MultithreadEventExecutorGroup 的构造器,主要做三件事情:

  • 创建线程任务执行器 ThreadPerTaskExecutor
  • 通过for循环创建数量为 nThreads 个的 EventLoop
  • 创建 EventLoop 选择器 EventExecutorChooser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

// 创建任务执行器 ThreadPerTaskExecutor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

// 创建 EventExecutor 数组
children = new EventExecutor[nThreads];

// 通过for循环创建数量为 nThreads 个的 EventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 调用 newChild 接口
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

// 创建选择器
chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

创建线程任务执行器 ThreadPerTaskExecutor

1
2
3
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

线程任务执行器 ThreadPerTaskExecutor 源码如下,具体的任务都由 ThreadFactory 去执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

// 使用 threadFactory 执行任务
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

来看看 newDefaultThreadFactory 方法:

1
2
3
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}

DefaultThreadFactory

接下来看看 DefaultThreadFactory 这个类,实现了 ThreadFactory 接口,我们可以了解到:

  • EventLoopGroup的命名规则
  • 具体的线程为 FastThreadLocalThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class DefaultThreadFactory implements ThreadFactory {

// 线程池ID编号自增器
private static final AtomicInteger poolId = new AtomicInteger();
// 线程ID自增器
private final AtomicInteger nextId = new AtomicInteger();
// 线程名称前缀
private final String prefix;
// 是否为守护进程
private final boolean daemon;
// 线程优先级
private final int priority;
// 线程组
protected final ThreadGroup threadGroup;

public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}

...

// 获取线程名,返回结果:nioEventLoopGroup
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}

String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}

public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}

// nioEventLoopGroup-2-
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}

public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}

@Override
public Thread newThread(Runnable r) {
// 创建新线程 nioEventLoopGroup-2-1
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}

// 创建新线程 FastThreadLocalThread
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}

}

创建NioEventLoop

继续从 MultithreadEventExecutorGroup 构造器开始,创建完任务执行器 ThreadPerTaskExecutor 之后,进入for循环,开始创建 NioEventLoop:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建 nioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
}

...

}

NioEventLoopGroup类中的 newChild() 方法:

1
2
3
4
5
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop 构造器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class NioEventLoop extends SingleThreadEventLoop{

...

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类 SingleThreadEventLoop 构造器
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
// 设置 selectorProvider
provider = selectorProvider;
// 获取 SelectorTuple 对象,里面封装了原生的selector和优化过的selector
final SelectorTuple selectorTuple = openSelector();
// 设置优化过的selector
selector = selectorTuple.selector;
// 设置原生的selector
unwrappedSelector = selectorTuple.unwrappedSelector;
// 设置select策略
selectStrategy = strategy;
}

...

}

接下来我们看看 获取多路复用选择器 方法—— openSelector() ,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// selectKey 优化选项flag
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

private SelectorTuple openSelector() {
// JDK原生的selector
final Selector unwrappedSelector;
try {
// 通过 SelectorProvider 创建获得selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

// 如果不优化,则直接返回
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

// 通过反射创建 sun.nio.ch.SelectorImpl 对象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});

// 如果 maybeSelectorImplClass 不是 selector 的一个实现,则直接返回原生的Selector
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
// 确保当前的选择器实现是我们可以检测的
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
// maybeSelectorImplClass 是selector的实现,则转化为 selector 实现类
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// 创建新的 SelectionKey 集合 SelectedSelectionKeySet,内部采用的是 SelectionKey 数组的形
// 式,而非 set 集合
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 selectedKeys
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
// 通过反射的方式获取 sun.nio.ch.SelectorImpl 的成员变量 publicSelectedKeys
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);

if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}

// 设置字段 selectedKeys Accessible 为true
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
// 设置字段 publicSelectedKeys Accessible 为true
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}

selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});

if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}

// 设置 SelectedSelectionKeySet
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 返回包含了原生selector和优化过的selector的SelectorTuple
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

优化后的 SelectedSelectionKeySet 对象,内部采用 SelectionKey 数组的形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;

SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}

// 使用数组,来替代HashSet,可以降低时间复杂度为O(1)
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}

keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}

return true;
}

@Override
public boolean remove(Object o) {
return false;
}

@Override
public boolean contains(Object o) {
return false;
}

@Override
public int size() {
return size;
}

@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;

@Override
public boolean hasNext() {
return idx < size;
}

@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}

void reset() {
reset(0);
}

void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}

// 扩容
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}

SingleThreadEventLoop 构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

...

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
// 调用 SingleThreadEventExecutor 构造器
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}

...
}

SingleThreadEventExecutor 构造器,主要做两件事情:

  • 设置线程任务执行器。
  • 设置任务队列。前面讲到EventLoop对于不能立即执行的Task会放入一个队列中,就是这里设置的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

...

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 设置线程任务执行器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 设置任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");

}

...

}

NioEventLoop 中对 newTaskQueue 接口的实现,返回的是 JCTools 工具包 Mpsc 队列。后面我们写文章单独介绍 JCTools 中的相关队列。

Mpsc:Multi Producer Single Consumer (Lock less, bounded and unbounded)

多个生产者对单个消费者(无锁、有界和无界都有实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class NioEventLoop extends SingleThreadEventLoop {

...

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

...
}

创建线程执行选择器chooser

接下来,我们看看 MultithreadEventExecutorGroup 构造器的最后一个部分内容,创建线程执行选择器chooser,它的主要作用就是 EventLoopGroup 用于从 EventLoop 数组中选择一个 EventLoop 去执行任务。

1
2
// 创建选择器
chooser = chooserFactory.newChooser(children);

EventLoopGroup 中定义的 next() 接口:

1
2
3
4
5
6
7
8
9
10
11
public interface EventLoopGroup extends EventExecutorGroup {

...

// 选择下一个 EventLoop 用于执行任务
@Override
EventLoop next();

...

}

MultithreadEventExecutorGroup 中对 next() 的实现:

1
2
3
4
5
@Override
public EventExecutor next() {
// 调用 DefaultEventExecutorChooserFactory 中的next()
return chooser.next();
}

DefaultEventExecutorChooserFactory 对于如何从数组中选择任务执行器,也做了巧妙的优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

private DefaultEventExecutorChooserFactory() { }

@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

// 判断线程任务执行的个数是否为 2 的幂次方。e.g: 2、4、8、16
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

// 幂次方选择器
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
// 通过二级制进行 & 运算,效率更高
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

// 普通选择器
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
// 按照最普通的取模的方式从index=0开始向后开始选择
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}

小结

通过本节内容,我们了解到了EventLoop与EventLoopGroup的基本原理,EventLoopGroup与EventLoop的创建过程:

  • 创建线程任务执行器 ThreadPerTaskExecutor
  • 创建EventLoop
  • 创建任务选择器 EventExecutorChooser

参考资料

请我喝杯咖啡吧~