Tomcat 线程池的设计与实现:StandardThreadExecutor

发布时间:2026/7/2 3:06:03
Tomcat 线程池的设计与实现:StandardThreadExecutor 上文中我们了解到Executor是包含在Service中的Service中关于Executor的配置和相关代码如下server.xml中service里包含Executor的配置xmlService nameCatalina !-- 1. 属性说明 name:Service的名称 -- !--2. 一个或多个excecutors -- // 看这里 !-- Executor nametomcatThreadPool namePrefixcatalina-exec- maxThreads150 minSpareThreads4/ -- /ServiceService中executors相关方法java/** * Adds a named executor to the service * param ex Executor */ Override public void addExecutor(Executor ex) { synchronized (executors) { if (!executors.contains(ex)) { executors.add(ex); if (getState().isAvailable()) { try { ex.start(); // 启动 } catch (LifecycleException x) { log.error(sm.getString(standardService.executor.start), x); } } } } } /** * Retrieves all executors * return Executor[] */ Override public Executor[] findExecutors() { synchronized (executors) { Executor[] arr new Executor[executors.size()]; executors.toArray(arr); return arr; } } /** * Retrieves executor by name, null if not found * param executorName String * return Executor */ Override public Executor getExecutor(String executorName) { synchronized (executors) { for (Executor executor: executors) { if (executorName.equals(executor.getName())) return executor; } } return null; } /** * Removes an executor from the service * param ex Executor */ Override public void removeExecutor(Executor ex) { synchronized (executors) { if ( executors.remove(ex) getState().isAvailable() ) { try { ex.stop(); // 停止 } catch (LifecycleException e) { log.error(sm.getString(standardService.executor.stop), e); } } } }和Server、Service实现一样StandardThreadExecutor也是继承LifecycleMBeanBase然后实现Executor的接口。Tomcat关于Executor相关的配置文档http://tomcat.apache.org/tomcat-9.0-doc/config/executor.htmlExecutor接口设计Executor的设计很简单在理解的时候需要理解两点Tomcat希望将Executor也纳入Lifecycle生命周期管理所以让它实现了Lifecycle接口引入超时机制也就是说当work queue满时会等待指定的时间如果超时将抛出RejectedExecutionException所以这里增加了一个void execute(Runnable command, long timeout, TimeUnit unit)方法; 其实本质上它构造了JUC中ThreadPoolExecutor通过它调用ThreadPoolExecutor的void execute(Runnable command, long timeout, TimeUnit unit)方法。javapublic interface Executor extends java.util.concurrent.Executor, Lifecycle { public String getName(); /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the codeExecutor/code implementation. * If no threads are available, it will be added to the work queue. * If the work queue is full, the system will wait for the specified * time until it throws a RejectedExecutionException * * param command the runnable task * param timeout the length of time to wait for the task to complete * param unit the units in which timeout is expressed * * throws java.util.concurrent.RejectedExecutionException if this task * cannot be accepted for execution - the queue is full * throws NullPointerException if command or unit is null */ void execute(Runnable command, long timeout, TimeUnit unit); }找到Executor的实现类StandardThreadExecutor的实现接下来我们看下具体的实现类StandardThreadExecutor。理解相关配置参数Executor官方配置说明文档公共属性Executor的所有实现都 支持以下属性属性描述className实现的类。实现必须实现 org.apache.catalina.Executor接口。此接口确保可以通过其name属性引用对象并实现Lifecycle以便可以使用容器启动和停止对象。className的默认值是org.apache.catalina.core.StandardThreadExecutorname用于在server.xml中的其他位置引用此池的名称。该名称是必需的必须是唯一的。StandardThreadExecutor属性默认实现支持以下属性属性描述threadPriorityint执行程序中线程的线程优先级默认为 5Thread.NORM_PRIORITY常量的值daemonboolean线程是否应该是守护程序线程默认为 truenamePrefix字符串执行程序创建的每个线程的名称前缀。单个线程的线程名称将是namePrefixthreadNumbermaxThreadsint此池中活动线程的最大数量默认为 200minSpareThreadsint最小线程数空闲和活动始终保持活动状态默认为 25maxIdleTimeint空闲线程关闭之前的毫秒数除非活动线程数小于或等于minSpareThreads。默认值为600001分钟maxQueueSizeint在我们拒绝之前可以排队等待执行的可运行任务的最大数量。默认值是Integer.MAX_VALUEprestartminSpareThreadsboolean是否应该在启动Executor时启动minSpareThreads默认值为 falsethreadRenewalDelaylong如果配置了ThreadLocalLeakPreventionListener它将通知此执行程序有关已停止的上下文。上下文停止后池中的线程将被更新。为避免同时更新所有线程此选项在任意2个线程的续订之间设置延迟。该值以ms为单位默认值为1000ms。如果值为负则不会续订线程。Lifecycle模板方法先看核心变量java// 任务队列 private TaskQueue taskqueue null; // 包装了一个ThreadPoolExecutor protected ThreadPoolExecutor executor null;initInternal和destroyInternal默认父类实现javaOverride protected void initInternal() throws LifecycleException { super.initInternal(); } Override protected void destroyInternal() throws LifecycleException { super.destroyInternal(); }startInternal方法这个方法中我们不难看出就是初始化taskqueue同时构造ThreadPoolExecutor的实例后面Tomcat的StandardThreadExecutor的实现本质上通过ThreadPoolExecutor实现的。java/** * Start the component and implement the requirements * of {link org.apache.catalina.util.LifecycleBase#startInternal()}. * * exception LifecycleException if this component detects a fatal error * that prevents this component from being used */ Override protected void startInternal() throws LifecycleException { taskqueue new TaskQueue(maxQueueSize); TaskThreadFactory tf new TaskThreadFactory(namePrefix,daemon,getThreadPriority()); executor new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); executor.setThreadRenewalDelay(threadRenewalDelay); if (prestartminSpareThreads) { executor.prestartAllCoreThreads(); } taskqueue.setParent(executor); setState(LifecycleState.STARTING); }stopInternal方法代码很简单关闭线程池后置null, 方便GC回收。java/** * Stop the component and implement the requirements * of {link org.apache.catalina.util.LifecycleBase#stopInternal()}. * * exception LifecycleException if this component detects a fatal error * that needs to be reported */ Override protected void stopInternal() throws LifecycleException { setState(LifecycleState.STOPPING); if (executor ! null) { executor.shutdownNow(); } executor null; taskqueue null; }核心executor方法本质上就是调用ThreadPoolExecutor的实例的相关方法。javaOverride public void execute(Runnable command, long timeout, TimeUnit unit) { if (executor ! null) { executor.execute(command,timeout,unit); } else { throw new IllegalStateException(sm.getString(standardThreadExecutor.notStarted)); } } Override public void execute(Runnable command) { if (executor ! null) { try { executor.execute(command); } catch (RejectedExecutionException rx) { //there could have been contention around the queue if (!((TaskQueue) executor.getQueue()).force(command)) { throw new RejectedExecutionException(sm.getString(standardThreadExecutor.queueFull)); } } } else { throw new IllegalStateException(sm.getString(standardThreadExecutor.notStarted)); } }动态调整线程池我们还注意到StandardThreadExecutor还实现了ResizeableExecutor从名称上我们就可知道它是希望实现对线程池的动态调整所以呢它封装了一个ResizeableExecutor的接口看下接口。javapublic interface ResizableExecutor extends Executor { /** * Returns the current number of threads in the pool. * * return the number of threads */ public int getPoolSize(); public int getMaxThreads(); /** * Returns the approximate number of threads that are actively executing * tasks. * * return the number of threads */ public int getActiveCount(); public boolean resizePool(int corePoolSize, int maximumPoolSize); public boolean resizeQueue(int capacity); }前三个方法比较简单我们看下后两个方法是如何实现的, 其实也很简单。javaOverride public boolean resizePool(int corePoolSize, int maximumPoolSize) { if (executor null) return false; executor.setCorePoolSize(corePoolSize); executor.setMaximumPoolSize(maximumPoolSize); return true; } // 默认没有实现 Override public boolean resizeQueue(int capacity) { return false; }补充TaskQueue我们知道工作队列是有TaskQueue保障的它集成自LinkedBlockingQueue一个阻塞的链表队列来看下源代码吧。java/** * As task queue specifically designed to run with a thread pool executor. The * task queue is optimised to properly utilize threads within a thread pool * executor. If you use a normal queue, the executor will spawn threads when * there are idle threads and you wont be able to force items onto the queue * itself. */ public class TaskQueue extends LinkedBlockingQueueRunnable { private static final long serialVersionUID 1L; protected static final StringManager sm StringManager .getManager(org.apache.tomcat.util.threads.res); private static final int DEFAULT_FORCED_REMAINING_CAPACITY -1; private transient volatile ThreadPoolExecutor parent null; // No need to be volatile. This is written and read in a single thread // (when stopping a context and firing the listeners) private int forcedRemainingCapacity -1; public TaskQueue() { super(); } public TaskQueue(int capacity) { super(capacity); } public TaskQueue(Collection? extends Runnable c) { super(c); } public void setParent(ThreadPoolExecutor tp) { parent tp; } public boolean force(Runnable o) { if (parent null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString(taskQueue.notRunning)); return super.offer(o); //forces the item onto the queue, to be used if the task is rejected } public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (parent null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString(taskQueue.notRunning)); return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected } Override public boolean offer(Runnable o) { //we cant do any checks if (parentnull) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()(parent.getPoolSize())) return super.offer(o); //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); } Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { Runnable runnable super.poll(timeout, unit); if (runnable null parent ! null) { // the poll timed out, it gives an opportunity to stop the current // thread if needed to avoid memory leaks. parent.stopCurrentThreadIfNeeded(); } return runnable; } Override public Runnable take() throws InterruptedException { if (parent ! null parent.currentThreadShouldBeStopped()) { return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); // yes, this may return null (in case of timeout) which normally // does not occur with take() // but the ThreadPoolExecutor implementation allows this } return super.take(); } Override public int remainingCapacity() { if (forcedRemainingCapacity DEFAULT_FORCED_REMAINING_CAPACITY) { // ThreadPoolExecutor.setCorePoolSize checks that // remainingCapacity0 to allow to interrupt idle threads // I dont see why, but this hack allows to conform to this // requirement return forcedRemainingCapacity; } return super.remainingCapacity(); } public void setForcedRemainingCapacity(int forcedRemainingCapacity) { this.forcedRemainingCapacity forcedRemainingCapacity; } void resetForcedRemainingCapacity() { this.forcedRemainingCapacity DEFAULT_FORCED_REMAINING_CAPACITY; }