tomcat创建线程池 public void createExecutor() {   internalExecutor = true;   // 任务队列和普通的队列有所区别,后续分析    TaskQueue taskqueue = new TaskQueue();   // 线程工厂用于创建线程  本地项目name=http-nio-port-exec-序号   TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());   // 创建线程池,注意这个ThreadPoolExecutor和java.util.concurrent包下的ThreadPoolExecutor有所区别   executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);   // 给任务队列设置线程池,用于后续任务来了判断是创建线程执行还是将线程添加到任务队列   taskqueue.setParent( (ThreadPoolExecutor) executor); }











public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {     /**      * The string manager for this package.      */     protected static final StringManager sm = StringManager             .getManager("org.apache.tomcat.util.threads.res");      /**      * The number of tasks submitted but not yet finished. This includes tasks      * in the queue and tasks that have been handed to a worker thread but the      * latter did not start executing the task yet.      * This number is always greater or equal to {@link #getActiveCount()}.      */     private final AtomicInteger submittedCount = new AtomicInteger(0);     private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);      /**      * Most recent time in ms when a thread decided to kill itself to avoid      * potential memory leaks. Useful to throttle the rate of renewals of      * threads.      */     private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);      /**      * Delay in ms between 2 threads being renewed. If negative, do not renew threads.      */     private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);         prestartAllCoreThreads();     }      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,             RejectedExecutionHandler handler) {         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);         prestartAllCoreThreads();     }      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());         prestartAllCoreThreads();     }      public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());         prestartAllCoreThreads();     }      public long getThreadRenewalDelay() {         return threadRenewalDelay;     }      public void setThreadRenewalDelay(long threadRenewalDelay) {         this.threadRenewalDelay = threadRenewalDelay;     }      @Override     protected void afterExecute(Runnable r, Throwable t) {         submittedCount.decrementAndGet();          if (t == null) {             stopCurrentThreadIfNeeded();         }     }      /**      * If the current thread was started before the last time when a context was      * stopped, an exception is thrown so that the current thread is stopped.      */     protected void stopCurrentThreadIfNeeded() {         if (currentThreadShouldBeStopped()) {             long lastTime = lastTimeThreadKilledItself.longValue();             if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {                 if (lastTimeThreadKilledItself.compareAndSet(lastTime,                         System.currentTimeMillis() + 1)) {                     // OK, it's really time to dispose of this thread                      final String msg = sm.getString(                                     "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",                                     Thread.currentThread().getName());                      throw new StopPooledThreadException(msg);                 }             }         }     }      protected boolean currentThreadShouldBeStopped() {         if (threadRenewalDelay >= 0             && Thread.currentThread() instanceof TaskThread) {             TaskThread currentTaskThread = (TaskThread) Thread.currentThread();             if (currentTaskThread.getCreationTime() <                     this.lastContextStoppedTime.longValue()) {                 return true;             }         }         return false;     }      public int getSubmittedCount() {         return submittedCount.get();     }      /**      * {@inheritDoc}      */     @Override     public void execute(Runnable command) {         execute(command,0,TimeUnit.MILLISECONDS);     }      /**      * Executes the given command at some time in the future.  The command      * may execute in a new thread, in a pooled thread, or in the calling      * thread, at the discretion of the <code>Executor</code> implementation.      * If no threads are available, it will be added to the work queue.      * If the work queue is full, the system will wait for the specified      * time and it throw a RejectedExecutionException if the queue is still      * full after that.      *      * @param command the runnable task      * @param timeout A timeout for the completion of the task      * @param unit The timeout time unit      * @throws RejectedExecutionException if this task cannot be      * accepted for execution - the queue is full      * @throws NullPointerException if command or unit is null      */     public void execute(Runnable command, long timeout, TimeUnit unit) {         submittedCount.incrementAndGet();         try {             super.execute(command);         } catch (RejectedExecutionException rx) {             if (super.getQueue() instanceof TaskQueue) {                 final TaskQueue queue = (TaskQueue)super.getQueue();                 try {                     if (!queue.force(command, timeout, unit)) {                         submittedCount.decrementAndGet();                         throw new RejectedExecutionException("Queue capacity is full.");                     }                 } catch (InterruptedException x) {                     submittedCount.decrementAndGet();                     throw new RejectedExecutionException(x);                 }             } else {                 submittedCount.decrementAndGet();                 throw rx;             }          }     }      public void contextStopping() {         this.lastContextStoppedTime.set(System.currentTimeMillis());          // save the current pool parameters to restore them later         int savedCorePoolSize = this.getCorePoolSize();         TaskQueue taskQueue =                 getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;         if (taskQueue != null) {             // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize             // checks that queue.remainingCapacity()==0. I did not understand             // why, but to get the intended effect of waking up idle threads, I             // temporarily fake this condition.             taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));         }          // setCorePoolSize(0) wakes idle threads         this.setCorePoolSize(0);          // TaskQueue.take() takes care of timing out, so that we are sure that         // all threads of the pool are renewed in a limited time, something like         // (threadKeepAlive + longest request time)          if (taskQueue != null) {             // ok, restore the state of the queue and pool             taskQueue.setForcedRemainingCapacity(null);         }         this.setCorePoolSize(savedCorePoolSize);     }      private static class RejectHandler implements RejectedExecutionHandler {         @Override         public void rejectedExecution(Runnable r,                 java.util.concurrent.ThreadPoolExecutor executor) {             throw new RejectedExecutionException();         }      }   }













/**  * As task queue specifically designed to run with a thread pool executor. The  * task queue is optimised to properly utilize threads within a thread pool  * executor. If you use a normal queue, the executor will spawn threads when  * there are idle threads and you wont be able to force items onto the queue  * itself.  */ public class TaskQueue extends LinkedBlockingQueue<Runnable> {      private static final long serialVersionUID = 1L;      private transient volatile ThreadPoolExecutor parent = null;      // No need to be volatile. This is written and read in a single thread     // (when stopping a context and firing the  listeners)     private Integer forcedRemainingCapacity = null;      public TaskQueue() {         super();     }      public TaskQueue(int capacity) {         super(capacity);     }      public TaskQueue(Collection<? extends Runnable> c) {         super(c);     }      public void setParent(ThreadPoolExecutor tp) {         parent = tp;     }      public boolean force(Runnable o) {         if (parent == null || parent.isShutdown()) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");         return super.offer(o); //forces the item onto the queue, to be used if the task is rejected     }      public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {         if (parent == null || parent.isShutdown()) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");         return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected     }      @Override     public boolean offer(Runnable o) {       //we can't do any checks         if (parent==null) return super.offer(o);         //we are maxed out on threads, simply queue the object         if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);         //we have idle threads, just add it to the queue         if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);         //if we have less threads than maximum force creation of a new thread         if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;         //if we reached here, we need to add it to the queue         return super.offer(o);     }       @Override     public Runnable poll(long timeout, TimeUnit unit)             throws InterruptedException {         Runnable runnable = super.poll(timeout, unit);         if (runnable == null && parent != null) {             // the poll timed out, it gives an opportunity to stop the current             // thread if needed to avoid memory leaks.             parent.stopCurrentThreadIfNeeded();         }         return runnable;     }      @Override     public Runnable take() throws InterruptedException {         if (parent != null && parent.currentThreadShouldBeStopped()) {             return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),                     TimeUnit.MILLISECONDS);             // yes, this may return null (in case of timeout) which normally             // does not occur with take()             // but the ThreadPoolExecutor implementation allows this         }         return super.take();     }      @Override     public int remainingCapacity() {         if (forcedRemainingCapacity != null) {             // ThreadPoolExecutor.setCorePoolSize checks that             // remainingCapacity==0 to allow to interrupt idle threads             // I don't see why, but this hack allows to conform to this             // "requirement"             return forcedRemainingCapacity.intValue();         }         return super.remainingCapacity();     }      public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {         this.forcedRemainingCapacity = forcedRemainingCapacity;     }  }



    /**      * Process the given SocketWrapper with the given status. Used to trigger      * processing as if the Poller (for those endpoints that have one)      * selected the socket.      *      * @param socketWrapper The socket wrapper to process      * @param event         The socket event to be processed      * @param dispatch      Should the processing be performed on a new      *                          container thread      *      * @return if processing was triggered successfully      */     public boolean processSocket(SocketWrapperBase<S> socketWrapper,             SocketEvent event, boolean dispatch) {         try {             if (socketWrapper == null) {                 return false;             }             SocketProcessorBase<S> sc = processorCache.pop();             if (sc == null) {                 sc = createSocketProcessor(socketWrapper, event);             } else {                 sc.reset(socketWrapper, event);             }             Executor executor = getExecutor();             if (dispatch && executor != null) {                 executor.execute(sc);             } else {       ;             }         } catch (RejectedExecutionException ree) {             getLog().warn(sm.getString("", socketWrapper) , ree);             return false;         } catch (Throwable t) {             ExceptionUtils.handleThrowable(t);             // This means we got an OOM or similar creating a thread, or that             // the pool and its queue are full             getLog().error(sm.getString(""), t);             return false;         }         return true;     }


  • 合计 (0)