Hike News
Hike News

Java多线程(五)-线程池

线程池-前言

系统启动一个线程的成本较高,而使用线程池可以提高性能,尤其在菜鸡大量短期线程时。与数据库连接池类似,线程池在系统启动时创建大量空闲线程,程序将一个Runnable对象或Callable对象传给线程池,线程池就会执行他们的run()call()方法,当执行结束后线程不会死亡,而是回到线程池变成空闲状态,等待执行runcall方法。

此外,使用线程池可以有效控制系统并发的数量,当系统包含大量的并发线程时,将导致系统性能降低,而线程池的最大线程数可以有效控制并发数量。

线程池

Executors工厂类创建线程池,该工厂类包含如下几个静态工厂方法来创建线程池:

  • newCachedThreadPool():创建一个具有缓存功能的线程池。
  • newFixedThreadPool(int n):创建一个可重用的、具有固定线程数的线程池。
  • newSingleThreadExecutor():创建只有一个线程的线程池,相当于上一个方法参数n=1。
  • newScheduledThreadPool(int Size):创建具有指定线程数size的线程池,它可以在指定延迟后执行线程任务。
  • newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。

前三个方法返回一个ExecutorService对象,代表一个线程池,后两个方法返回一个ScheduledExecutorService线程池,它是ExecutorService的子类,可以延迟后执行线程。以下是Java8新增的方法:

  • ExecutorService newWorkStealingPool(int parallelism):创建持有足够线程的线程池来支持给定的并行级别(数目)。
  • ExecutorService newWorkStealingPool():上一个简化版的方法,cpu有多少,相当于上一个方法的参数传入多少。

随着硬件的发展,多核cpu的出现,这两个方法可以利用cpu的并行能力,生成的WorkStealing池相当于后台线程池。

ExecutorService代表亟待执行线程的线程池,只要有空闲的线程就会立即执行线程任务。程序只要将一个RunnableCallable对象交给线程池,该线程池会尽快执行线程。而ExecutorService提供了如下几个方法:

  • Future<?> submit(Runnable t):将一个Runnable对象交给线程池,线程池将在有空闲线程时执行Runnable对象。Future对象将在线程run方法执行结束后返回null,可以调用FutureisDone()isCancelled()方法获取Runnable的执行状态。
  • <T> Future<T> submit(Runnable t,T r):将一个Runnable对象交给线程池,线程池将在有空闲线程时执行Runnable对象。其中r显式指定执行结束后端返回值,所以Future对象将在run方法执行结束后返回r
  • <T> Future<T> submit(Callable<T> t):将一个Callable对象交给线程池,线程池将在有空闲线程时执行Callable对象。其中Future代表Callable对象的call方法的返回值。

ScheduledExecutorService是可在延迟后或可周期性执行线程的线程池,提供了如下四个方法:

  • ScheduledFuture<V> schedule(Callable<V> c,long delay,TimeUnit u):指定c任务将在delay延迟后执行。
  • ScheduledFuture<?> schedule(Runnable c,long delay ,TimeUnit u):指定c任务将在delay延迟后执行。
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable c,long initialDelay,long period,TimeUnit u):指定c任务将在initialDelay延迟后执行,依次在initialDelay+periodinitialDelay+2*period···处重复执行。
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable c,long initialDelay,long delay,TimeUnit u):创建并执行一个在给定初始延迟后的定期操作,随后在每次执行中止和下一次执行开始之间都存在给定的延迟,具有周期性。如果任务遇到异常,就会取消后续执行。

在使用完一个线程池后应该调用其shutdown()方法,该方法启动线程池关闭序列,不会接收新的线程任务,将已提交的线程任务完成后结束所有线程;类似的另外一个方法shutdownNow(),该方法将停止所有正在执行和等待的任务,并返回等待执行的任务列表。

综上,线程池执行任务的过程如下:

  • 调用Executor类的静态工厂方法创建一个ExecutorService对象,即线程池。
  • 创建RunnableCallable实现类的实例,作为线程执行任务。
  • 调用线程池大小的submit方法来提交RunnableCallable实例。
  • 想要关闭线程池时,调用其shutdown()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadPoolTest{
public static void main(String[] args) throws Exception{
ExecutorService pool = ExecutorS.newFixedThreadPool(5);
Runnable t = ->{
for(int i=0;i<100;i++){
System.out.println(Thread.currentThread().getName()+": "+i);
}
};//这里没有直接创建启动线程来执行Runnable对象,是通过线程池执行
pool.submit(t);
pool.submit(t);
pool.shutdown();
}
}//两个线程将交替执行
ForkJoinPool拆分任务

Java提供的ForkJoinPoolExecutorService的实现类,能将一个任务拆分成多个小任务并行计算,再把结果合并,是比较特殊的线程池。ForkJoinPool提供了如下两个构造器:

  • ForkJoinPool(int p):创建一个包含p个并行线程的ForkJoinPool
  • ForkJoinPool():以Runtime.availableProcessors()方法的返回值作为p参数创建线程池。

Java8又增加了通用池功能,ForkJoinPool提供了两个静态方法

  • ForkJoinPool commonPool():该方法返回一个通用池,通用池的运行不会受shutdown()shutdownNow()的影响,除非退出JVM,中止虚拟机工作。
  • int getCommonPoolParallelism():该方法返回通用池并行级别(数量)。

创建了ForkJoinPool实例后,就可调用submit(ForkJoinTask t)invoke(ForkJoinTask t)方法执行任务,其中ForkJoinTask代表一个可并行、合并的任务。ForkJoinTask是一个抽象类,RecursiveActionRecursiveTask是它的两个抽象子类,RecursiveAction代表没有返回值的任务,RecursiveTask代表有返回值的任务。

线程相关类

Java为线程安全提供了一些工具类,如ThreadLocal类,它代表一个线程局部变量,通过把数据放在ThreadLocal就可以让每个线程创建一个该变量的副本,从而避免并发访问的线程安全问题。

ThreadLocal

工具类ThreadLocal支持泛型,通过该类可以简化多线程编程的并发访问。它提供如下三个public方法:

  • T get():返回此线程局部变量中当前线程副本中的值。
  • void remove():删除此线程局部变量中的值。
  • void set(T value):设置此线程局部变量中线程副本中的值。

ThreadLocal和同步机制一样,都是为了解决多线程中对变量访问的冲突问题。在同步机制中,通过对对象加锁来实现安全访问,该变量为多个线程所共享;而ThreadLocal从另一个角度解决并发访问,它将所访问的资源复制多份,每个线程拥有一份,从而避免了线程之间的访问冲突。

ThreadLocal不能代替同步机制,同步机制是同步多个线程对同一资源的访问,是多个线程之间进行通信的方法;ThreadLocal是为了隔离多个线程对资源的共享,根本上避免线程为竞争资源的冲突。

包装线程不安全的集合

之前讲过的Java集合ArrayListHashSetHashMapTreeMap等线程不安全的集合,当线程并发访问时可能会集合数据的完整。所以需要包装成安全的集合,Collections通过了如下静态方法

  • static <T> Collection<T> synchronizedCollection(Collection<T> c):返回指定Collection对应的线程安全的集合Collection
  • static <T> List<T> synchronizedList(List<T> list):返回指定List对象对应的线程安全的List对象。
  • static <K,V> Map<K,V> synchronizedMap(Map<K,V> m):返回指定Map对象对应的线程安全的Map对象。
  • static <T> Set<T> synchronizedSet(Set<T> s):返回指定Set对象对应的线程安全的Set对象。
  • static <K,V> sortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m):返回指定SortedMap对象对应的线程安全的SortedMap对象。
  • static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s):返回指定SortedSet对象对应的线程安全的SortedSet对象。

需要包某个集合包装成线程安全的集合,应该在创建后立即包装。

1
HashMap m = Collections.synchronizedMap(new HashMap());//包装成线程安全的HashMap
线程安全的集合

java.util.concurrent包下提供了大量支持高效并发访问的集合接口和实现类,这些线程安全的集合类可分为:

  • Concurrent开头的集合类,如ConcurrentHashMapConcurrentSkipListMapConcurrentLinkedQueue
  • CopyOnWrite开头的集合,如CopyOnWriteArrayListCopyOnWriteArraySet

Concurrent集合

其中以Concurrent开头的集合类代表了支持并发访问的集合,它们可以支持多个线程并发写入访问,而且都是安全的,读取操作不必锁定。以CopyOnWrite开头的集合采用了复杂算法保证不会锁住整个集合,因此在并发写入时有很好的性能。

当多个线程共享一个公共集合时,ConcurrentLinkedQueue是一个恰当的选择,它不允许使用null元素,实现了多线程无需等待访问。

默认情况下,ConcurrentHashMap支持16给线程并发写入,当超过16个线程时,可能有一些线程需要等待,程序可通过设置concurrencyLevel构造参数(默认为16)来支持更多并发写入。Java8又新增了多个方法增加ConcurrentHashMap功能,大致分为三类:

  • forEach(forEach,forEachKey,forEachEntry)
  • search(search,searchKeys,searchValues)
  • reduce(reduce,reduceToDouble,reduceKeys,reduceValues)

此外还添加了mappingCount()newKeySet()等方法,增强后的ConcurrentHashMap更合适作为缓存实现类。

CopyOnWriteArrayList集合

由于CopyOnWriteArraySet底层封装了CopyOnWriteArrayList,因此它的实现机制类似于CopyOnWriteArrayList集合。

当线程对CopyOnWriteArrayList集合进行读取操作时,线程会直接读取集合本身,无需加锁和阻塞;当线程对CopyOnWriteArrayList集合进行写入操作时(包括add(),remove(),set()),该集合会在底层复制一份新的数组,接下来对新数组执行写入操作。由于写入操作是对数组副本进行的,所以保证了线程安全。

然而线程安全并不是没有代价,CopyOnWriteArrayList执行写入操作时要多次复制数组,所以性能较差,但是读取和写入操作的数组不是同一个,因此读取操作就很快又安全,比较适合用来读取缓存。

这是多线程的最后一篇,估计也是Java的最后一篇,以后可能就不会给大家整理Java基础的文章了,但会在Java道路上一直陪伴大家,谢谢大家的关注!