【Netty 源码】NioEventLoop 源码分析 篇二

【Netty 源码】NioEventLoop 源码分析 篇二

1.NioEventLoop 继承关系

NioEventLoop继承自SingleThreadEventLoop,一次只能执行一个线程任务,因此在父类SingleThreadEventLoop中维护了 Queue tailTasks 线程队列。

NioEventLoop又间接继承了 AbstractScheduledEventExecutor ,因此也具有提交定时任务的能力

image-20240310191243677

2. Selector 的创建时机

1.构造方法执行时赋值Selector

NioEventLoop 有且仅有一个有参构造方法,在构造方法执行时,对成员对象Selector 进行赋值

io.netty.channel.nio.NioEventLoop#NioEventLoop

image-20240316200151955

2.两个Selector类型的成员变量

每个NioEventLoop 都维护了两个Selector

image-20240316194153562

在其有参构造方法执行时,调用 openSelector() 方法,这里截图部分代码片段

final Selector unwrappedSelector;
        try {
            //调用java原生的api创建Selector
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        //如果没有开启对key Set集合的优化,默认返回原生的Selector,原生Selector遍历key时,使用的是set集合,效率低
        //默认 false,也就是开启对key set的优化
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }
        .......

默认 DISABLE_KEY_SET_OPTIMIZATION 等于false

    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

因此默认情况下,代码会继续往下执行

									......
									
                  Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }
                    //开启暴力反射
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    //将原生的setKeys集合替换
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    
                    .......

在这段代码中,我们可以看到Netty使用反射,将原生的Selector的两个成员变量 selectedKeys,publicSelectedKeys 进行替换,而替换后的对象SelectedSelectionKeySet,使用的是数组去存储keys

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

image-20240316195357937

而原生的keys对象则是set集合

image-20240316195726333

所以Netty对原生的Selector进行优化 第一个Selector对象 selector 遍历keys的效率更高,而第二Selector对象 unwrappedSelector 则是对第一个selector功能的完善,很多能力还是需要原生的Selectoral去实现

3. NioEventLoop 什么时候启动NIO线程

这里我们通过idea断点追踪调用栈类分析

public static void main(String[] args) {
    
    EventLoop next = new NioEventLoopGroup().next();

    next.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("任务执行");
        }
    });
}

image-20240316201853365

通过调用栈,发现main线程最终调用到 io.netty.util.concurrent.SingleThreadEventExecutor#addTask ,然后由nioEventLoopGroup-2-1线程调用run方法执行

NioEventLoop执行线程任务时,会调用父类SingleThreadEventExecutor.execute 方法,然后再调用 addTask 方法,将任务添加到 taskQueue 队列中。

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    //将任务添加到队列中 只是添加  取出使用在后面
    addTask(task);
    //此时是main方法调用 false false 就是ture
    if (!inEventLoop) {
        // 开始执行线程任务
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
private void startThread() {
    // 更改线程状态
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                //继续往下跟
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

往 线程池 中提交任务,调用 io.netty.channel.nio.NioEventLoop#run 方法

 private void doStartThread() {
        assert thread == null;
        //往线程池中提交任务
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //调用NioEventLoop的run方法 启动nio线程,监听事件并从队列中弹出任务并执行
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                      
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            //关闭selector
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.countDown();
                            if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + taskQueue.size() + ')');
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

·死循环监听、处理事件

protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    //处理keys
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
             
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
               
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

因此我们从这可以知道,在首次调用EventLoop的executor方法,将会启动nio线程,重复调用并不会重复启动nio线程,因为有状态位进行控制

4. NIO线程会阻塞普通任务吗?

NioEventLoop 首先它是单线程 ,不仅仅会启动 Nio线程,有时还需要执行普通任务,那么nio会影响普通任务的执行吗?

Selector.select具有阻塞性,首先我们看netty是怎么处理的

io.netty.channel.nio.NioEventLoop#select

/**
 * 核心思想:没有task要做时,select阻塞1s,如果有task,wakeup去做。
 * @param oldWakenUp
 * @throws IOException
 */
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        //按scheduled的task时间来计算select timeout时间。
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
        if (nextWakeupTime != normalizedDeadlineNanos) {
            nextWakeupTime = normalizedDeadlineNanos;
        }

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) { //已经有定时task需要执行了,或者超过最长等待时间了
                if (selectCnt == 0) {
                    //非阻塞,没有数据返回0
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
            // Selector#wakeup. So we need to check task queue again before executing select operation.
            // If we don't, the task might be pended until select operation was timed out.
            // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            //设置超时时间,不会一直阻塞 其次
            //下面select阻塞中,别人唤醒也可以可以的
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The code exists in an extra method to ensure the method is not too big to inline as this
                // branch is not very likely to get hit very frequently.
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
        // Harmless exception - log anyway
    }
}

可以看到 selector.select(timeoutMillis) 设置了超时时间。其次,nio线程只有首次NioEventloop调用executor方法才会启动,后续再次调用不会二次启动,并且会唤醒Selector.select

io.netty.util.concurrent.SingleThreadEventExecutor#execute

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        //将任务添加到队列中 只是添加  取出使用在后面
        addTask(task);
        //此时是main方法调用 false false 就是ture
        if (!inEventLoop) {
            // 开始执行线程任务
            //二次调用不会执行
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
        //唤醒Selector
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

注意,此时我们是在NioEventloop的父类SingleThreadEventExecutor中跟踪源码,子类NioEventloop 对父类的 wakeup 方法进行了重写

io.netty.channel.nio.NioEventLoop#wakeup

@Override
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        //唤醒 selector 以便执行普通任务
        selector.wakeup();
    }
}

5.什么时候去获取Selector上的事件

回到run方法,处理逻辑都在这个switch代码块中

io.netty.channel.nio.NioEventLoop#run

image-20240323162005881

selectNowSupplier 是 IntSupplier 接口的实现类,重写了get方法,重写后的get方法会调用Selector.selectNow()方法,立即获取当前Selector上的IO事件
hasTasks() 判断当前Queue中是否有任务

在来看这个策略方法 :

​ 如果有任务,获取当前Selector上的IO事件,并立刻返回 ;如果没有任务,返回 SelectStrategy.SELECT,进入匹配case,执行select(wakenUp.getAndSet(false)) 方法 超时阻塞等待获取Selector上的io事件

 public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

在当前Queue中有任务的情况下,即会处理keys,也会执行所有的任务

try {
    //处理keys
    processSelectedKeys();
} finally {
    // Ensure we always run tasks.
    runAllTasks();
}

6.Netty怎么处理NIO空轮询BUG

1.什么是NIO空轮询BUG

即使无客户端连接,NIO照样不断的从select本应该阻塞的Selector.select()中wake up出来,导致CPU100%问题

在这里插入图片描述

2.Netty处理方式

io.netty.channel.nio.NioEventLoop#select

for (;;) {

			
          int selectedKeys = selector.select(timeoutMillis);
          selectCnt ++;

                long time = System.nanoTime();
          if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
              // timeoutMillis elapsed without anything selected.
              selectCnt = 1;
          } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                  selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
              // The code exists in an extra method to ensure the method is not too big to inline as this
              // branch is not very likely to get hit very frequently.
              selector = selectRebuildSelector(selectCnt);
              selectCnt = 1;
              break;
          }

                
               }

在这截取了部分代码,可以看到Netty定义了一个 selectCnt ,当 selectCnt > SELECTOR_AUTO_REBUILD_THRESHOLD 时,就会调用selectRebuildSelector(selectCnt)创建一个新的selector。

SELECTOR_AUTO_REBUILD_THRESHOLD的默认值为512.用户可以设置 io.netty.selectorAutoRebuildThreshold 的值来进行控制

int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);

7.Netty怎么控制NIO事件处理与普通任务执行的任务时间

for循环执行在一个单线程中,又要执行 processSelectedKeys 处理Selector上的io事件,又要执行 runAllTasks 执行普通任务。

因此Netty需要协调两个任务的执行时间。

io.netty.channel.nio.NioEventLoop#ioRatio int类型的变量,默认值为50

private volatile int ioRatio = 50;

io.netty.channel.nio.NioEventLoop#run

  for (;;) {
  			//ioRatio默认等于50 ,走else
        if (ioRatio == 100) {
            try {
                processSelectedKeys();
            } finally {
                runAllTasks();
            }
        } else {
        		//记录 Selector处理io事件的开始时间
            final long ioStartTime = System.nanoTime();
            try {
                processSelectedKeys();
            } finally {
                //当前时间 - io事件开始时间 = io耗时时间
                final long ioTime = System.nanoTime() - ioStartTime
                // io耗时时间 * ioRatio比例 = 普通任务的执行时间
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        }
 }

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-24 08:00:05       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-24 08:00:05       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-24 08:00:05       82 阅读
  4. Python语言-面向对象

    2024-03-24 08:00:05       91 阅读

热门阅读

  1. Spring MVC的<mvc:view-controller>标签

    2024-03-24 08:00:05       36 阅读
  2. Github 2024-03-23 开源项目日报 Top10

    2024-03-24 08:00:05       34 阅读
  3. arm核的DMPIS是如何计算的

    2024-03-24 08:00:05       33 阅读
  4. 使用 AWS CloudWatch 全面监控 SQS 队列

    2024-03-24 08:00:05       39 阅读
  5. 基于AWS云服务构建智能家居系统的最佳实践

    2024-03-24 08:00:05       38 阅读
  6. Redis 教程系列之Redis 安全(六)

    2024-03-24 08:00:05       44 阅读
  7. xss漏洞总结

    2024-03-24 08:00:05       36 阅读
  8. opencv常用数据结构和函数?

    2024-03-24 08:00:05       37 阅读
  9. Unity按键相关的事件(自己记忆用)

    2024-03-24 08:00:05       40 阅读
  10. 设计模式总结(四)

    2024-03-24 08:00:05       34 阅读
  11. 在flutter项目中使用自己做的flutter插件

    2024-03-24 08:00:05       47 阅读