Java多线程
Java并发
线程池

Java多线程 48—— Executors详解

简介:Executors是java.util.concurrent包提供一个用于创建各类线程池的工厂类,其中有大量的静态方法可以方便我们创建ThreadPoolExecutor和ScheduledThreadPoolExecutor线程池,本文将一一进行介绍。

Executors是java.util.concurrent包提供一个用于创建各类线程池的工厂类,其中有大量的静态方法可以方便我们创建ThreadPoolExecutor和ScheduledThreadPoolExecutor线程池,本文将一一进行介绍。Executors的构造方法被私有化了,因此只可以调用它的静态方法:

  • /** Cannot instantiate. */
  • private Executors() {}

1. 创建ThreadPoolExecutor线程池

Executors中用于创建ThreadPoolExecutor的方法有以下几个:

  • /**
  • * Creates a thread pool that reuses a fixed number of threads
  • * operating off a shared unbounded queue. At any point, at most
  • * <tt>nThreads</tt> threads will be active processing tasks.
  • * If additional tasks are submitted when all threads are active,
  • * they will wait in the queue until a thread is available.
  • * If any thread terminates due to a failure during execution
  • * prior to shutdown, a new one will take its place if needed to
  • * execute subsequent tasks. The threads in the pool will exist
  • * until it is explicitly {@link ExecutorService#shutdown shutdown}.
  • *
  • * 创建一个线程数固定的线程池,
  • * 使用默认的线程工厂,LinkedBlockingQueue作为阻塞队列
  • *
  • * @param nThreads the number of threads in the pool
  • * @return the newly created thread pool
  • * @throws IllegalArgumentException if {@code nThreads <= 0}
  • */
  • public static ExecutorService newFixedThreadPool(int nThreads) {
  • // 创建ThreadPoolExecutor线程池,corePoolSize和maximumPoolSize都是nThreads
  • return new ThreadPoolExecutor(nThreads, nThreads,
  • 0L, TimeUnit.MILLISECONDS,
  • new LinkedBlockingQueue<Runnable>());
  • }
  • /**
  • * Creates a thread pool that reuses a fixed number of threads
  • * operating off a shared unbounded queue, using the provided
  • * ThreadFactory to create new threads when needed. At any point,
  • * at most <tt>nThreads</tt> threads will be active processing
  • * tasks. If additional tasks are submitted when all threads are
  • * active, they will wait in the queue until a thread is
  • * available. If any thread terminates due to a failure during
  • * execution prior to shutdown, a new one will take its place if
  • * needed to execute subsequent tasks. The threads in the pool will
  • * exist until it is explicitly {@link ExecutorService#shutdown
  • * shutdown}.
  • *
  • * 创建一个线程数固定的线程池,可指定线程工厂
  • * 默认使用LinkedBlockingQueue作为阻塞队列
  • *
  • * @param nThreads the number of threads in the pool
  • * @param threadFactory the factory to use when creating new threads
  • * @return the newly created thread pool
  • * @throws NullPointerException if threadFactory is null
  • * @throws IllegalArgumentException if {@code nThreads <= 0}
  • */
  • public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  • return new ThreadPoolExecutor(nThreads, nThreads,
  • 0L, TimeUnit.MILLISECONDS,
  • new LinkedBlockingQueue<Runnable>(),
  • threadFactory);
  • }
  • /**
  • * Creates an Executor that uses a single worker thread operating
  • * off an unbounded queue. (Note however that if this single
  • * thread terminates due to a failure during execution prior to
  • * shutdown, a new one will take its place if needed to execute
  • * subsequent tasks.) Tasks are guaranteed to execute
  • * sequentially, and no more than one task will be active at any
  • * given time. Unlike the otherwise equivalent
  • * <tt>newFixedThreadPool(1)</tt> the returned executor is
  • * guaranteed not to be reconfigurable to use additional threads.
  • *
  • * 创建线程池大小为1,最大线程数为1的线程池
  • * 使用默认的线程工厂,LinkedBlockingQueue作为阻塞队列
  • *
  • * @return the newly created single-threaded Executor
  • */
  • public static ExecutorService newSingleThreadExecutor() {
  • /**
  • * 创建一个corePoolSize和maimumPoolSize都为1的ThreadPoolExecutor,
  • * 并使用FinalizableDelegatedExecutorService进行包装
  • */
  • return new FinalizableDelegatedExecutorService
  • (new ThreadPoolExecutor(1, 1,
  • 0L, TimeUnit.MILLISECONDS,
  • new LinkedBlockingQueue<Runnable>()));
  • }
  • /**
  • * Creates an Executor that uses a single worker thread operating
  • * off an unbounded queue, and uses the provided ThreadFactory to
  • * create a new thread when needed. Unlike the otherwise
  • * equivalent <tt>newFixedThreadPool(1, threadFactory)</tt> the
  • * returned executor is guaranteed not to be reconfigurable to use
  • * additional threads.
  • *
  • *
  • * 创建线程池大小为1,最大线程数为1的线程池,可执行线程工厂
  • * 使用LinkedBlockingQueue作为阻塞队列
  • *
  • * @param threadFactory the factory to use when creating new threads
  • *
  • * @return the newly created single-threaded Executor
  • * @throws NullPointerException if threadFactory is null
  • */
  • public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  • /**
  • * 创建一个corePoolSize和maimumPoolSize都为1,且指定ThreadFactory的ThreadPoolExecutor,
  • * 并使用FinalizableDelegatedExecutorService进行包装
  • */
  • return new FinalizableDelegatedExecutorService
  • (new ThreadPoolExecutor(1, 1,
  • 0L, TimeUnit.MILLISECONDS,
  • new LinkedBlockingQueue<Runnable>(), threadFactory));
  • }
  • /**
  • * Creates a thread pool that creates new threads as needed, but
  • * will reuse previously constructed threads when they are
  • * available. These pools will typically improve the performance
  • * of programs that execute many short-lived asynchronous tasks.
  • * Calls to <tt>execute</tt> will reuse previously constructed
  • * threads if available. If no existing thread is available, a new
  • * thread will be created and added to the pool. Threads that have
  • * not been used for sixty seconds are terminated and removed from
  • * the cache. Thus, a pool that remains idle for long enough will
  • * not consume any resources. Note that pools with similar
  • * properties but different details (for example, timeout parameters)
  • * may be created using {@link ThreadPoolExecutor} constructors.
  • *
  • * 此方法创建的线程池只会在需要的时候创建新线程,同时会重复使用之前创建的可用的线程
  • * 使用默认的线程工厂,SynchronousQueue作为阻塞队列
  • * 该线程池适合执行数量多耗时短的异步任务
  • *
  • * @return the newly created thread pool
  • */
  • public static ExecutorService newCachedThreadPool() {
  • return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  • 60L, TimeUnit.SECONDS,
  • new SynchronousQueue<Runnable>());
  • }
  • /**
  • * Creates a thread pool that creates new threads as needed, but
  • * will reuse previously constructed threads when they are
  • * available, and uses the provided
  • * ThreadFactory to create new threads when needed.
  • *
  • * 此方法创建的线程池只会在需要的时候创建新线程,同时会重复使用之前创建的可用的线程
  • * 可自定义线程工厂
  • * 使用SynchronousQueue作为阻塞队列
  • * 该线程池适合执行数量多耗时短的异步任务
  • *
  • * @param threadFactory the factory to use when creating new threads
  • * @return the newly created thread pool
  • * @throws NullPointerException if threadFactory is null
  • */
  • public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  • return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  • 60L, TimeUnit.SECONDS,
  • new SynchronousQueue<Runnable>(),
  • threadFactory);
  • }

其中涉及到的FinalizableDelegatedExecutorService类源码如下:

  • // 该类继承自DelegatedExecutorService,功能一样,只是增加了finalize()方法
  • static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
  • FinalizableDelegatedExecutorService(ExecutorService executor) {
  • super(executor);
  • }
  • protected void finalize() {
  • super.shutdown();
  • }
  • }

虽然这些静态方法提供了一种便捷的方式创建线程池,但在实际开发中并建议这样使用;可以看出默认的创建方法中使用的等待队列是LinkedBlockingQueue或SynchronousQueue类型,其中LinkedBlockingQueue是无界队列,当添加的任务过多线程池处理速度太慢时会导致队列元素堆积进而撑满内存。

2. 创建ScheduledThreadPoolExecutor线程池

  • /**
  • * Creates a thread pool that can schedule commands to run after a
  • * given delay, or to execute periodically.
  • *
  • * 创建定时调度线程池,可执行核心线程数
  • *
  • * @param corePoolSize the number of threads to keep in the pool,
  • * even if they are idle.
  • * @return a newly created scheduled thread pool
  • * @throws IllegalArgumentException if {@code corePoolSize < 0}
  • */
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  • return new ScheduledThreadPoolExecutor(corePoolSize);
  • }
  • /**
  • * Creates a thread pool that can schedule commands to run after a
  • * given delay, or to execute periodically.
  • *
  • * 创建定时调度线程池,可执行核心线程数和线程工厂
  • *
  • * @param corePoolSize the number of threads to keep in the pool,
  • * even if they are idle.
  • * @param threadFactory the factory to use when the executor
  • * creates a new thread.
  • * @return a newly created scheduled thread pool
  • * @throws IllegalArgumentException if {@code corePoolSize < 0}
  • * @throws NullPointerException if threadFactory is null
  • */
  • public static ScheduledExecutorService newScheduledThreadPool(
  • int corePoolSize, ThreadFactory threadFactory) {
  • return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  • }
  • /**
  • * Creates a single-threaded executor that can schedule commands
  • * to run after a given delay, or to execute periodically.
  • * (Note however that if this single
  • * thread terminates due to a failure during execution prior to
  • * shutdown, a new one will take its place if needed to execute
  • * subsequent tasks.) Tasks are guaranteed to execute
  • * sequentially, and no more than one task will be active at any
  • * given time. Unlike the otherwise equivalent
  • * <tt>newScheduledThreadPool(1)</tt> the returned executor is
  • * guaranteed not to be reconfigurable to use additional threads.
  • *
  • * 创建单线程定时调度线程池,使用默认的线程工厂
  • *
  • * @return the newly created scheduled executor
  • */
  • public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  • return new DelegatedScheduledExecutorService
  • (new ScheduledThreadPoolExecutor(1));
  • }
  • /**
  • * Creates a single-threaded executor that can schedule commands
  • * to run after a given delay, or to execute periodically. (Note
  • * however that if this single thread terminates due to a failure
  • * during execution prior to shutdown, a new one will take its
  • * place if needed to execute subsequent tasks.) Tasks are
  • * guaranteed to execute sequentially, and no more than one task
  • * will be active at any given time. Unlike the otherwise
  • * equivalent <tt>newScheduledThreadPool(1, threadFactory)</tt>
  • * the returned executor is guaranteed not to be reconfigurable to
  • * use additional threads.
  • *
  • * 创建单线程定时调度线程池,可自定义线程工厂
  • *
  • * @param threadFactory the factory to use when creating new
  • * threads
  • * @return a newly created scheduled executor
  • * @throws NullPointerException if threadFactory is null
  • */
  • public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  • return new DelegatedScheduledExecutorService
  • (new ScheduledThreadPoolExecutor(1, threadFactory));
  • }

其中涉及到的DelegatedExecutorService和DelegatedScheduledExecutorService两个类的代码如下:

  • /**
  • * A wrapper class that exposes only the ScheduledExecutorService
  • * methods of a ScheduledExecutorService implementation.
  • *
  • * 该类对ScheduledExecutorService类进行了包装,并且只提供了ScheduledExecutorService的方法的实现
  • * 执行的方法都是ScheduledExecutorService的方法
  • */
  • static class DelegatedScheduledExecutorService
  • extends DelegatedExecutorService
  • implements ScheduledExecutorService {
  • private final ScheduledExecutorService e;
  • // 记录传入的ScheduledExecutorService对象
  • DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
  • super(executor);
  • e = executor;
  • }
  • public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  • return e.schedule(command, delay, unit);
  • }
  • public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
  • return e.schedule(callable, delay, unit);
  • }
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
  • return e.scheduleAtFixedRate(command, initialDelay, period, unit);
  • }
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
  • return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
  • }
  • }

创建ScheduledThreadPoolExecutor的方法则中规中矩,只提供了核心线程数和线程工厂的配置。

3. 隔离配置的线程池

Executors还提供了两个方法用于包装ThreadPoolExecutor和ScheduledThreadPoolExecutor,隔离对线程池的配置,以防外界修改:

  • /**
  • * Returns an object that delegates all defined {@link
  • * ExecutorService} methods to the given executor, but not any
  • * other methods that might otherwise be accessible using
  • * casts. This provides a way to safely "freeze" configuration and
  • * disallow tuning of a given concrete implementation.
  • *
  • * 根据ExecutorService对象创建一个代理的DelegatedExecutorService对象
  • * 这种方式可以屏蔽ExecutorService内的配置,防止外界修改
  • *
  • * @param executor the underlying implementation
  • * @return an <tt>ExecutorService</tt> instance
  • * @throws NullPointerException if executor null
  • */
  • public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
  • if (executor == null)
  • throw new NullPointerException();
  • return new DelegatedExecutorService(executor);
  • }
  • /**
  • * Returns an object that delegates all defined {@link
  • * ScheduledExecutorService} methods to the given executor, but
  • * not any other methods that might otherwise be accessible using
  • * casts. This provides a way to safely "freeze" configuration and
  • * disallow tuning of a given concrete implementation.
  • *
  • * 根据ScheduledExecutorService创建一个代理的DelegatedScheduledExecutorService对象
  • * 这种方式可以屏蔽ScheduledExecutorService内的配置,防止外界修改
  • *
  • * @param executor the underlying implementation
  • * @return a <tt>ScheduledExecutorService</tt> instance
  • * @throws NullPointerException if executor null
  • */
  • public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
  • if (executor == null)
  • throw new NullPointerException();
  • return new DelegatedScheduledExecutorService(executor);
  • }

其中涉及到的DelegatedExecutorService类的代码如下:

  • /**
  • * A wrapper class that exposes only the ExecutorService methods
  • * of an ExecutorService implementation.
  • *
  • * 该类对线程池ExecutorService类进行了一层包装,执行的方法都是线程池ExecutorService的方法
  • */
  • static class DelegatedExecutorService extends AbstractExecutorService {
  • private final ExecutorService e;
  • // 记录传入的ExecutorService对象
  • DelegatedExecutorService(ExecutorService executor) { e = executor; }
  • public void execute(Runnable command) { e.execute(command); }
  • public void shutdown() { e.shutdown(); }
  • public List<Runnable> shutdownNow() { return e.shutdownNow(); }
  • public boolean isShutdown() { return e.isShutdown(); }
  • public boolean isTerminated() { return e.isTerminated(); }
  • public boolean awaitTermination(long timeout, TimeUnit unit)
  • throws InterruptedException {
  • return e.awaitTermination(timeout, unit);
  • }
  • public Future<?> submit(Runnable task) {
  • return e.submit(task);
  • }
  • public <T> Future<T> submit(Callable<T> task) {
  • return e.submit(task);
  • }
  • public <T> Future<T> submit(Runnable task, T result) {
  • return e.submit(task, result);
  • }
  • public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  • throws InterruptedException {
  • return e.invokeAll(tasks);
  • }
  • public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  • long timeout, TimeUnit unit)
  • throws InterruptedException {
  • return e.invokeAll(tasks, timeout, unit);
  • }
  • public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  • throws InterruptedException, ExecutionException {
  • return e.invokeAny(tasks);
  • }
  • public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  • long timeout, TimeUnit unit)
  • throws InterruptedException, ExecutionException, TimeoutException {
  • return e.invokeAny(tasks, timeout, unit);
  • }
  • }

4. 包装任务为Callable对象

Executors提供了多个方法用于将Runnable或Callable等对象包装为特定的Callable对象:

  • /**
  • * Returns a {@link Callable} object that, when
  • * called, runs the given task and returns the given result. This
  • * can be useful when applying methods requiring a
  • * <tt>Callable</tt> to an otherwise resultless action.
  • *
  • * 根据Runnable任务和给定的默认结果创建一个Callable对象
  • * 表现为RunnableAdapter,该类继承自Callable
  • *
  • * @param task the task to run
  • * @param result the result to return
  • * @return a callable object
  • * @throws NullPointerException if task null
  • */
  • public static <T> Callable<T> callable(Runnable task, T result) {
  • if (task == null)
  • throw new NullPointerException();
  • return new RunnableAdapter<T>(task, result);
  • }
  • /**
  • * Returns a {@link Callable} object that, when
  • * called, runs the given task and returns <tt>null</tt>.
  • *
  • * 根据Runnable任务创建一个Callable对象,返回值为null
  • * 表现为RunnableAdapter,该类继承自Callable
  • *
  • * @param task the task to run
  • * @return a callable object
  • * @throws NullPointerException if task null
  • */
  • public static Callable<Object> callable(Runnable task) {
  • if (task == null)
  • throw new NullPointerException();
  • return new RunnableAdapter<Object>(task, null);
  • }
  • /**
  • * Returns a {@link Callable} object that, when
  • * called, runs the given privileged action and returns its result.
  • *
  • * 返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果
  • *
  • * @param action the privileged action to run
  • * @return a callable object
  • * @throws NullPointerException if action null
  • */
  • public static Callable<Object> callable(final PrivilegedAction<?> action) {
  • // 检查参数
  • if (action == null)
  • throw new NullPointerException();
  • // 返回一个Callable对象,内部call()方法主体是调用action的run()方法并返回结果
  • return new Callable<Object>() {
  • public Object call() {
  • return action.run(); }
  • };
  • }
  • /**
  • * Returns a {@link Callable} object that, when
  • * called, runs the given privileged exception action and returns
  • * its result.
  • *
  • * 返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果
  • *
  • * @param action the privileged exception action to run
  • * @return a callable object
  • * @throws NullPointerException if action null
  • */
  • public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
  • // 检查参数
  • if (action == null)
  • throw new NullPointerException();
  • // 返回一个Callable对象,内部call()方法主体是调用action的run()方法并返回结果
  • return new Callable<Object>() {
  • public Object call() throws Exception {
  • return action.run();
  • }
  • };
  • }
  • /**
  • * Returns a {@link Callable} object that will, when
  • * called, execute the given <tt>callable</tt> under the current
  • * access control context. This method should normally be
  • * invoked within an {@link AccessController#doPrivileged} action
  • * to create callables that will, if possible, execute under the
  • * selected permission settings holding within that action; or if
  • * not possible, throw an associated {@link
  • * AccessControlException}.
  • * 返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象。
  • * 通常应该在 AccessController.doPrivileged(java.security.PrivilegedAction) 操作中调用此方法,以便创建 callable 对象,
  • * 并且如有可能,则在该操作中保持的所选权限设置下执行此对象;
  • * 如果无法调用,则抛出相关的AccessControlException。
  • *
  • * @param callable the underlying task
  • * @return a callable object
  • * @throws NullPointerException if callable null
  • *
  • */
  • public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
  • if (callable == null)
  • throw new NullPointerException();
  • return new PrivilegedCallable<T>(callable);
  • }
  • /**
  • * Returns a {@link Callable} object that will, when
  • * called, execute the given <tt>callable</tt> under the current
  • * access control context, with the current context class loader
  • * as the context class loader. This method should normally be
  • * invoked within an {@link AccessController#doPrivileged} action
  • * to create callables that will, if possible, execute under the
  • * selected permission settings holding within that action; or if
  • * not possible, throw an associated {@link
  • * AccessControlException}.
  • *
  • * 返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象。
  • * 通常应该在AccessController.doPrivileged(java.security.PrivilegedAction) 操作中调用此方法,以创建 callable 对象,
  • * 并且如有可能,则在该操作中保持的所选权限设置下执行此对象;如果无法调用,则抛出相关的AccessControlException。
  • *
  • * @param callable the underlying task
  • *
  • * @return a callable object
  • * @throws NullPointerException if callable null
  • * @throws AccessControlException if the current access control
  • * context does not have permission to both set and get context
  • * class loader.
  • */
  • public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
  • if (callable == null)
  • throw new NullPointerException();
  • return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
  • }

其中涉及的RunnableAdapter、PrivilegedCallable和PrivilegedCallableUsingCurrentClassLoader的源码如下:

  • /**
  • * A callable that runs given task and returns given result
  • */
  • static final class RunnableAdapter<T> implements Callable<T> {
  • final Runnable task;
  • final T result;
  • // 记录任务和默认结果
  • RunnableAdapter(Runnable task, T result) {
  • this.task = task;
  • this.result = result;
  • }
  • // 运行
  • public T call() {
  • // 直接运行任务
  • task.run();
  • // 返回默认结果
  • return result;
  • }
  • }
  • /**
  • * A callable that runs under established access control settings
  • */
  • static final class PrivilegedCallable<T> implements Callable<T> {
  • // 任务
  • private final Callable<T> task;
  • // 访问控制上下文
  • private final AccessControlContext acc;
  • PrivilegedCallable(Callable<T> task) {
  • // 记录任务
  • this.task = task;
  • // 获取访问控制上下文
  • this.acc = AccessController.getContext();
  • }
  • // 执行任务
  • public T call() throws Exception {
  • try {
  • // 授权并执行任务
  • return AccessController.doPrivileged(
  • new PrivilegedExceptionAction<T>() {
  • public T run() throws Exception {
  • return task.call();
  • }
  • }, acc);
  • } catch (PrivilegedActionException e) {
  • throw e.getException();
  • }
  • }
  • }
  • /**
  • * A callable that runs under established access control settings and
  • * current ClassLoader
  • */
  • static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
  • // 任务
  • private final Callable<T> task;
  • // 访问控制上下文
  • private final AccessControlContext acc;
  • // 类加载器
  • private final ClassLoader ccl;
  • PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
  • // 获取系统安全管理器
  • SecurityManager sm = System.getSecurityManager();
  • if (sm != null) {
  • // Calls to getContextClassLoader from this class
  • // never trigger a security check, but we check
  • // whether our callers have this permission anyways.
  • // 检查权限
  • sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
  • // Whether setContextClassLoader turns out to be necessary
  • // or not, we fail fast if permission is not available.
  • // 检查权限
  • sm.checkPermission(new RuntimePermission("setContextClassLoader"));
  • }
  • // 记录任务
  • this.task = task;
  • // 获取访问控制上下文
  • this.acc = AccessController.getContext();
  • // 类加载器
  • this.ccl = Thread.currentThread().getContextClassLoader();
  • }
  • // 执行任务
  • public T call() throws Exception {
  • try {
  • // 授权并执行任务
  • return AccessController.doPrivileged(
  • new PrivilegedExceptionAction<T>() {
  • public T run() throws Exception {
  • ClassLoader savedcl = null;
  • Thread t = Thread.currentThread();
  • try {
  • ClassLoader cl = t.getContextClassLoader();
  • if (ccl != cl) {
  • t.setContextClassLoader(ccl);
  • savedcl = cl;
  • }
  • return task.call();
  • } finally {
  • if (savedcl != null)
  • t.setContextClassLoader(savedcl);
  • }
  • }
  • }, acc);
  • } catch (PrivilegedActionException e) {
  • throw e.getException();
  • }
  • }
  • }

5. 内置线程工厂

Executors提供了两个方法可以创建内置的线程工厂,源码如下:

  • /**
  • * Returns a default thread factory used to create new threads.
  • * This factory creates all new threads used by an Executor in the
  • * same {@link ThreadGroup}. If there is a {@link
  • * java.lang.SecurityManager}, it uses the group of {@link
  • * System#getSecurityManager}, else the group of the thread
  • * invoking this <tt>defaultThreadFactory</tt> method. Each new
  • * thread is created as a non-daemon thread with priority set to
  • * the smaller of <tt>Thread.NORM_PRIORITY</tt> and the maximum
  • * priority permitted in the thread group. New threads have names
  • * accessible via {@link Thread#getName} of
  • * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
  • * number of this factory, and <em>M</em> is the sequence number
  • * of the thread created by this factory.
  • *
  • * 创建默认的线程池
  • *
  • * @return a thread factory
  • */
  • public static ThreadFactory defaultThreadFactory() {
  • return new DefaultThreadFactory();
  • }
  • /**
  • * Returns a thread factory used to create new threads that
  • * have the same permissions as the current thread.
  • * This factory creates threads with the same settings as {@link
  • * Executors#defaultThreadFactory}, additionally setting the
  • * AccessControlContext and contextClassLoader of new threads to
  • * be the same as the thread invoking this
  • * <tt>privilegedThreadFactory</tt> method. A new
  • * <tt>privilegedThreadFactory</tt> can be created within an
  • * {@link AccessController#doPrivileged} action setting the
  • * current thread's access control context to create threads with
  • * the selected permission settings holding within that action.
  • *
  • * <p> Note that while tasks running within such threads will have
  • * the same access control and class loader settings as the
  • * current thread, they need not have the same {@link
  • * java.lang.ThreadLocal} or {@link
  • * java.lang.InheritableThreadLocal} values. If necessary,
  • * particular values of thread locals can be set or reset before
  • * any task runs in {@link ThreadPoolExecutor} subclasses using
  • * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
  • * necessary to initialize worker threads to have the same
  • * InheritableThreadLocal settings as some other designated
  • * thread, you can create a custom ThreadFactory in which that
  • * thread waits for and services requests to create others that
  • * will inherit its values.
  • *
  • * 创建PrivilegedThreadFactory线程池,该线程池会检查访问上下文和ClassLoader
  • *
  • * @return a thread factory
  • * @throws AccessControlException if the current access control
  • * context does not have permission to both get and set context
  • * class loader.
  • */
  • public static ThreadFactory privilegedThreadFactory() {
  • return new PrivilegedThreadFactory();
  • }

创建的两个线程工厂DefaultThreadFactory和PrivilegedThreadFactory源码如下:

  • /**
  • * The default thread factory
  • * 默认线程工厂
  • */
  • static class DefaultThreadFactory implements ThreadFactory {
  • // 线程池序号
  • private static final AtomicInteger poolNumber = new AtomicInteger(1);
  • private final ThreadGroup group;
  • private final AtomicInteger threadNumber = new AtomicInteger(1);
  • private final String namePrefix;
  • DefaultThreadFactory() {
  • // 系统安全管理器
  • SecurityManager s = System.getSecurityManager();
  • // 获取线程池组
  • group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
  • /**
  • * 获取线程名的前缀
  • * 其中线程池序号在线程工厂每次创建时逐渐递增
  • */
  • namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
  • }
  • public Thread newThread(Runnable r) {
  • // 根据任务创建线程
  • Thread t = new Thread(group, r,
  • namePrefix + threadNumber.getAndIncrement(),
  • 0);
  • // 如果线程置为非守护线程
  • if (t.isDaemon())
  • t.setDaemon(false);
  • // 将线程优先级置为NORM_PRIORITY
  • if (t.getPriority() != Thread.NORM_PRIORITY)
  • t.setPriority(Thread.NORM_PRIORITY);
  • return t;
  • }
  • }
  • /**
  • * Thread factory capturing access control context and class loader
  • *
  • * 保存有访问控制上下文和CLassLoader的线程工厂
  • */
  • static class PrivilegedThreadFactory extends DefaultThreadFactory {
  • // 访问控制上下文
  • private final AccessControlContext acc;
  • // 类加载器
  • private final ClassLoader ccl;
  • PrivilegedThreadFactory() {
  • super();
  • // 获取系统安全管理器
  • SecurityManager sm = System.getSecurityManager();
  • if (sm != null) {
  • // Calls to getContextClassLoader from this class
  • // never trigger a security check, but we check
  • // whether our callers have this permission anyways.
  • // 检查权限
  • sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
  • // Fail fast
  • // 检查权限
  • sm.checkPermission(new RuntimePermission("setContextClassLoader"));
  • }
  • // 记录访问控制上下文
  • this.acc = AccessController.getContext();
  • // 记录线程的类加载器
  • this.ccl = Thread.currentThread().getContextClassLoader();
  • }
  • // 创建新线程的工厂方法
  • public Thread newThread(final Runnable r) {
  • return super.newThread(new Runnable() {
  • public void run() {
  • // 检查线程并执行任务
  • AccessController.doPrivileged(new PrivilegedAction<Void>() {
  • public Void run() {
  • // 设置类加载器
  • Thread.currentThread().setContextClassLoader(ccl);
  • // 任务运行
  • r.run();
  • return null;
  • }
  • }, acc);
  • }
  • });
  • }
  • }

推荐阅读

Java虚拟机06——垃圾收集器之CMS

Java
Java虚拟机

2015-01-29 0 328

Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。这些区域都有各自的用途,以及创建和...

目录