第一章Netty,Worker代码优化后分析

发布时间:2026/7/3 4:25:41
第一章Netty,Worker代码优化后分析 先看优化后代码packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.ByteBuffer;importjava.nio.channels.*;importjava.util.Iterator;importjava.util.Set;importjava.util.concurrent.ConcurrentLinkedDeque;importjava.util.concurrent.ConcurrentLinkedQueue;Slf4jpublicclassMultiThreadServerTest{publicstaticvoidmain(String[]args)throwsIOException{Thread.currentThread().setName(boss);ServerSocketChannelsscServerSocketChannel.open();ssc.configureBlocking(false);SelectorbossSelector.open();SelectionKeybosskeyssc.register(boss,0,null);bosskey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(newInetSocketAddress(8081));// 1创建固定数据的worker并初始化WorkerworkernewWorker(worker-0);// worker.register();while(true){boss.select();IteratorSelectionKeyiteratorboss.selectedKeys().iterator();while(iterator.hasNext()){SelectionKeykeyiterator.next();iterator.remove();if(key.isAcceptable()){SocketChannelscssc.accept();sc.configureBlocking(false);log.info(connected-------{},sc.getRemoteAddress());// 2,关联selectorlog.info(before register-------{},sc.getRemoteAddress());worker.register(sc);log.info(after register-------{},sc.getRemoteAddress());// 【核心修复】唤醒 Worker 线程使其从 select() 中返回// worker.workerSelector.wakeup();}}}}staticclassWorkerimplementsRunnable{privateThreadthread;publicSelectorworkerSelector;privateStringname;privatevolatilebooleanstartfalse;privateConcurrentLinkedQueueRunnablequeuenewConcurrentLinkedQueue();publicWorker(Stringname){this.namename;}// 初始化线程和selectorpublicvoidregister(SocketChannelsc)throwsIOException{if(!start){workerSelectorSelector.open();threadnewThread(this,name);thread.start();starttrue;}// 给队列加了任务但是任务并没有立刻执行queue.add(()-{try{sc.register(workerSelector,SelectionKey.OP_READ,null);}catch(ClosedChannelExceptione){thrownewRuntimeException(e);}});workerSelector.wakeup();//唤醒 workerSelector.select();}Overridepublicvoidrun(){while(true){try{workerSelector.select();Runnablepollqueue.poll();if(poll!null){poll.run();// 执行 sc.register(workerSelector,SelectionKey.OP_READ,null);}SetSelectionKeyselectionKeysworkerSelector.selectedKeys();IteratorSelectionKeyiteratorselectionKeys.iterator();while(iterator.hasNext()){SelectionKeykeyiterator.next();iterator.remove();if(key.isReadable()){ByteBufferbufferByteBuffer.allocate(16);SocketChannelchannel(SocketChannel)key.channel();log.info(read-------{},channel.getRemoteAddress());channel.read(buffer);buffer.flip();while(buffer.hasRemaining()){System.out.print((char)buffer.get());}System.out.println();}}}catch(IOExceptione){thrownewRuntimeException(e);}}}}}优化后的代码运行似乎看起来没有问题正确的输出了发送的消息sc.register(workerSelector,SelectionKey.OP_READ,null);workerSelector.wakeup();workerSelector.select();这3行代码的执行顺序问题这段代码存在一个‌严重的逻辑缺陷‌会导致新连接的注册事件OP_READ无法被及时处理甚至完全丢失。核心问题select() 与 queue.poll() 的执行顺序错误在 Worker 线程的 run() 方法中目前的执行顺序是workerSelector.select(); ‌阻塞等待‌ I/O 事件。queue.poll(); 处理注册任务。处理 I/O 事件。‌问题分析‌当 Boss 线程调用 worker.register(sc) 时它将注册任务放入 queue。它调用 workerSelector.wakeup()。此时如果 Worker 线程正阻塞在 select() 上wakeup() 会使 select() 立即返回。‌但是‌select() 返回后代码紧接着执行 queue.poll()。如果 queue 中有任务任务被执行即 sc.register(…)。‌关键点‌这次注册操作发生在当前这一轮 select() 循环的‌后半段‌。这意味着刚刚注册的 Channel 的 OP_READ 事件‌不会‌在当前这次 select() 返回的 selectedKeys 中被检测到因为注册发生在 select 之后。程序进入下一轮循环再次调用 select()。如果此时没有新的 I/O 事件也没有新的 wakeupselect() 将再次阻塞。‌更严重的问题‌如果 wakeup() 调用时Worker 线程‌不在‌ select() 阻塞状态例如正在处理上一个请求或正在执行 queue.poll()wakeup() 会设置一个“唤醒标志”。当下一次调用 select() 时它会立即返回。但如果此时 queue 为空任务已被前一次 poll 取走并执行或者任务执行失败逻辑依然混乱。‌最大的隐患在于register 操作本身不是线程安全的且不应该在 select() 返回后的业务逻辑中随意穿插除非你非常清楚 SelectionKey 的状态。‌虽然代码中使用了 wakeup()但由于 register 是通过队列异步执行的而 select() 是在队列处理‌之前‌调用的这导致了一个时间窗口问题Boss 线程: queue.add(task), wakeup().Worker 线程: select() 被唤醒返回。Worker 线程: poll() 取出任务并执行 register。Worker 线程: 遍历 selectedKeys。‌注意此时刚注册的 key 不在 selectedKeys 中因为 register 发生在 select 之后。‌Worker 线程: 下一轮 select()。如果客户端此时发送数据select() 能检测到吗能。但如果客户端在 register 完成前就发送了数据或者由于时序问题可能导致事件延迟处理。‌然而上述分析还不是最致命的。最致命的错误是‌在 Worker.run() 中workerSelector.select() 是阻塞的。当 boss 线程调用 workerSelector.wakeup() 时select() 返回。然后 queue.poll() 执行注册。‌但是如果 wakeup() 调用时Worker 线程并没有在 select() 中阻塞呢‌比如 Worker 线程正在处理上一个 read 事件。此时 wakeup() 设置标志位。Worker 处理完 read进入下一轮循环调用 select()。select() 发现标志位立即返回不阻塞。然后 queue.poll() 执行注册。这看起来没问题。‌真正的 Bug 在于register 操作必须在 select() 调用之前完成或者在 select() 返回后立即处理但必须确保下一次 select() 能感知到新的 Channel。‌当前代码的逻辑是select() (可能阻塞可能被 wakeup 唤醒)poll() 并执行 register处理 selectedKeys这种模式下‌新注册的 Channel 的事件永远会在下一轮 select() 中才被检测到‌。这在功能上是可行的但效率低且容易出错。‌更严重的潜在 BugClosedChannelException 和并发修改‌‌queue 的使用方式不当‌ConcurrentLinkedQueue 是线程安全的但 register 操作本身涉及到 Selector 的内部数据结构。虽然 NIO 的 register 是线程安全的它会同步但在 select() 循环中动态注册需要格外小心。‌wakeup() 的幂等性与丢失‌如果 Boss 线程连续两次调用 register 和 wakeup而 Worker 线程还在处理第一个任务第二个 wakeup 可能不会生效如果第一次 wakeup 的标志位还没被 select() 消费。不过 wakeup 是幂等的通常只会保证至少一次唤醒。‌最核心的逻辑错误select() 返回后selectedKeys 中不包含本次循环中 newly registered 的 channel。‌这是 NIO 的标准行为。你的代码依赖下一轮 select() 来发现新连接的数据。这在低负载下没问题但在高负载或特定时序下可能导致响应延迟。修正方案标准的 Reactor 模式中Worker 线程处理注册任务的正确姿势通常是将注册任务放入队列。唤醒 Selector。在 select() ‌之前‌ 或 ‌之后立即‌ 处理队列中的注册任务但要确保这些新注册的 Channel 不会被当前轮的 selectedKeys 遗漏实际上它们本来就不会在当前轮出现所以重点是确保它们被正确注册到 Selector 中以便下一轮监听。‌你的代码主要问题在于wakeup() 调用位置和执行顺序的配合不够严谨且没有处理 register 可能抛出的异常对后续逻辑的影响。‌此外还有一个常见的 ‌NIO 空轮询 Bug‌JDK Bug虽然你的代码没有体现处理逻辑但在生产环境中需要考虑。代码优化建议以下是修复后的 Worker 类主要改进点‌确保注册任务在 select() 之前被处理‌或者接受“下一轮生效”的事实并确保逻辑正确。‌增加异常处理‌防止单个 Channel 注册失败影响整个 Worker 线程。‌优化日志和流程‌。staticclassWorkerimplementsRunnable{privateThreadthread;publicSelectorworkerSelector;privateStringname;privatevolatilebooleanstartfalse;// 使用队列来串行化注册操作避免多线程直接操作 Selector 导致的复杂同步问题privateConcurrentLinkedQueueRunnablequeuenewConcurrentLinkedQueue();publicWorker(Stringname){this.namename;}publicvoidregister(SocketChannelsc)throwsIOException{if(!start){synchronized(this){if(!start){workerSelectorSelector.open();threadnewThread(this,name);thread.start();starttrue;}}}// 将注册操作封装为任务放入队列queue.add(()-{try{// 注册为读就绪并附加一个处理器对象可选sc.register(workerSelector,SelectionKey.OP_READ,null);log.info(Channel registered to worker: {},sc.getRemoteAddress());}catch(ClosedChannelExceptione){log.error(Channel closed before registration,e);}catch(Exceptione){log.error(Registration failed,e);}});// 唤醒 selector使其从 select() 阻塞中返回从而尽快处理队列中的注册任务workerSelector.wakeup();}Overridepublicvoidrun(){while(true){try{// 1. 先处理队列中的注册任务非阻塞// 注意也可以在 select() 之后处理但放在这里可以确保新注册的 channel// 不会干扰当前 selectedKeys 的迭代且逻辑更清晰Runnabletask;while((taskqueue.poll())!null){task.run();}// 2. 阻塞等待事件// 如果队列为空且无事件select() 会阻塞// 如果有 wakeup() 调用select() 会立即返回intselectedCountworkerSelector.select();// 如果 select() 返回 0可能是被 wakeup() 唤醒但无实际 I/O 事件// 此时继续循环再次检查队列防止 wakeup 后又有新任务加入if(selectedCount0){continue;}// 3. 处理 I/O 事件SetSelectionKeyselectionKeysworkerSelector.selectedKeys();IteratorSelectionKeyiteratorselectionKeys.iterator();while(iterator.hasNext()){SelectionKeykeyiterator.next();// 必须移除防止重复处理iterator.remove();if(!key.isValid()){continue;}if(key.isReadable()){SocketChannelchannel(SocketChannel)key.channel();ByteBufferbufferByteBuffer.allocate(1024);// 适当增大缓冲区try{intbytesReadchannel.read(buffer);if(bytesRead0){buffer.flip();log.info(Read from {}: {},channel.getRemoteAddress(),newString(buffer.array(),0,buffer.limit()));// 这里可以添加业务逻辑处理}elseif(bytesRead-1){// 对端关闭连接log.info(Client disconnected: {},channel.getRemoteAddress());key.cancel();channel.close();}}catch(IOExceptione){log.error(IO error on channel: {},channel.getRemoteAddress(),e);key.cancel();try{channel.close();}catch(IOExceptionex){// ignore}}}}}catch(IOExceptione){log.error(Selector error in worker: {},name,e);// 在实际生产中可能需要重建 Selector 或退出线程break;}}}}主要修改说明调整 select() 和 queue.poll() 的顺序‌原代码select() - poll() - 处理事件。新代码poll() - select() - 处理事件。理由‌先处理注册任务再调用 select()。这样如果 wakeup() 被调用select() 返回后新注册的 Channel 已经存在于 Selector 中虽然它们的事件不会在当前轮次被触发因为注册发生在 select 之前但事件可能还没到达但逻辑上更顺畅。更重要的是如果在 select() 阻塞期间有 wakeup()select() 返回后我们先处理可能新增的注册任务然后再去检查是否有 I/O 事件。这避免了原代码中“先 select 后注册”导致的时序混淆。注其实两种顺序在 NIO 中都是常见的关键是‌注册操作必须原子性地完成‌且‌不能在当前轮的 selectedKeys 中期望看到刚注册的 Channel 的事件‌。新代码将 poll 放在 select 前可以确保每次 select 开始前Selector 的注册表是最新的。增加 selectedCount 0 的判断‌wakeup() 会导致 select() 返回 0。此时应该继续循环而不是直接去遍历空的 selectedKeys。原代码虽然没有显式判断但 iterator 为空时也不会出错。显式判断可以提高效率避免不必要的迭代器创建。增强异常处理和资源清理‌在读取数据时处理 bytesRead -1客户端关闭的情况并正确取消 Key 和关闭 Channel。捕获 IOException防止单个连接异常导致 Worker 线程崩溃。start 标志的双重检查锁定‌在 register 方法中初始化 Worker 线程的部分增加了 synchronized 块防止多线程同时启动多个 Worker 线程。总结你原来的代码‌能跑通‌但在高并发或特定时序下可能存在‌事件处理延迟‌或‌逻辑不清晰‌的问题。主要风险在于 select() 和 register() 的时序配合。按照上述优化后的代码逻辑更加健壮符合标准的 NIO Reactor 模式实践。