多线程简介
多任务
现代操作系统(Windows、Linux、MacOS)都可以执行多任务,多任务就是同时运行多个任务。例如在我们的计算机上,一般都同时跑着多个程序,例如浏览器,视频播放器,音乐播放器,Word办公软件等等,由于CPU执行代码都是一条一条顺序执行的,即时是单核CPU也可以同时执行多个任务,操作系统执行多个任务实际上就是轮流让多个任务交替执行。即使是多核CPU,因为通常任务的数量是远多于CPU的核数,所以任务也是交替执行的。
进程(Process)
在计算机中,我们把一个任务称为一个进程。浏览器就是一个进程,视频播放器是另外一个进程,音乐播放器也是一个进程,在某些进程内部还需要同时执行多个子任务。例如我们在使用Word时,在打字的同时需要进行拼写检查,还可以在后台进行打印,我们把子任务称为线程。
进程和线程的关系:
- 一个进程可以包含一个或多个线程(至少包含一个线程)
- 线程是操作系统调度的最小任务单位
- 如何调度线程完全由操作系统决定(程序自己不能决定线程何时执行,执行多长时间)
实现多任务的三种方法:
- 多进程模式(每个进程只有一个线程)
- 多线程模式(一个进程有多个线程)
- 多进程+多线程(复杂度最高)
多进程和多线程的对比:
- 创建进程比创建线程开销大(尤其是在Windows系统上)
- 进程间通信比线程间通信慢
- 多进程稳定性比多线程高(因为在多进程的情况下,一个进程的崩溃不会影响其他进程,而在多线程的情况下,任何一个线程的崩溃,会直接导致整个进程崩溃)
Java语言内置多线程支持,一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()方法,在main()方法中又可以启动多个线程,此外,JVM还有负责垃圾回收的其他工作线程等。和单线程相比,多线程编程的特点在于:多线程经常需要读写共享数据,并且需要同步。例如,播放电影时,就必须由一个线程播放视频,另一个线程播放音频,两个线程需要协调运行,否则画面和声音就不同步。因此,多线程编程的复杂度高,调试更困难。
Java多线程基础
创建一个线程对象,并启动一个新的线程。创建线程对象的方法有两种:第一种就是创建MyThread类,去继承Thread类,覆写run()方法,创建MyThread实例,调用start()启动线程。第二种:如果一个类已经从某个类派生,无法从Thread继承,就可以通过实现Runnable接口,重写run()方法,在main()方法中创建Runnable实例,创建Thread实例并传入Runnable,调用start()启动线程。注意:必须调用Thread实例的start()方法才能启动新线程,如果我们查看Thread类的源代码,会看到start()方法内部调用了一个private native void start0()方法,native修饰符表示这个方法是由JVM虚拟机内部的C代码实现的,不是由Java代码实现的。
总结:Java用Thread对象表示一个线程,通过调用start()启动一个新线程,一个线程对象只能调用一次start()方法;线程的执行代码写在run()方法中,一旦run()方法执行完毕,线程就结束了;线程调度由操作系统决定,程序本身无法决定调度顺序。
线程的状态:
- New:新创建的线程,尚未执行;
- Runnable:运行中的线程,正在执行run()方法的Java代码;
- Blocked:运行中的线程,因为某些操作被阻塞而挂起;
- Waiting:运行中的线程,因为某些操作在等待中;
- Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;
- Terminated:线程已终止,因为run()方法执行完毕。
线程终止的原因:
- run()方法执行到return语句返回(线程正常终止)
- run()方法因为未捕获的异常导致线程终止(线程意外终止)
- 对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)
通过对另一个线程对象调用join()方法可以等待其执行结束,可以指定等待时间,超过等待时间线程仍然没有结束就不再等待;对已经运行结束的线程调用join()方法会立刻返回。
中断线程:如果线程需要执行一个长时间任务,就可能需要能中断线程。中断线程就是其他线程给该线程发送一个信息,该线程收到信号后,结束执行run()方法。例如我们从网络下载一个100M的文件,如果网速很慢,我们等得不耐烦,就可能在下载过程中点“取消”,这时,程序就需要中断下载线程的执行。中断线程需要通过检测isInterrupted标志,其他线程需要通过调用interrupt()方法中断该线程。如果线程处于等待状态,该线程会捕获InterruptedException。捕获到InterruptedException说明有其他对其线程调用了interrupt()方法,通常情况下该线程应该立刻结束运行。
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new MyThread();
t.start();
Thread.sleep(1000);
t.interrupt(); // 中断t线程
t.join(); // 等待t线程结束
System.out.println("end");
}
}
class MyThread extends Thread {
public void run() {
Thread hello = new HelloThread();
hello.start(); // 启动hello线程
try {
hello.join(); // 等待hello线程结束
} catch (InterruptedException e) {
System.out.println("interrupted!");
}
hello.interrupt();
}
}
class HelloThread extends Thread {
public void run() {
int n = 0;
while (!isInterrupted()) {
n++;
System.out.println(n + " hello!");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
}
}
还可以通过设置running标志位,线程间共享变量需要使用volatile关键字标记,确保线程能读取到更新后的变量值。为什么要对线程间共享的变量用关键字volatile声明?这涉及到Java的内存模型。在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。
volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。因此,volatile关键字的目的是告诉虚拟机:每次访问变量时,总是获取主内存的最新值,每次修改变量后,立刻回写到主内存。
守护线程:为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。因此,JVM退出时,不必关心守护线程是否已结束。如何创建守护线程呢?方法和普通线程一样,只是在调用start()方法前,调用setDaemon(true)把该线程标记为守护线程。
Thread t = new MyThread();
t.setDaemon(true);
t.start();
需要注意的是:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。
多线程同步
当多个线程同时运行时,线程的调度由操作系统决定,程序本身无法决定。因此,任何一个线程都有可能在任何指令处被操作系统暂停,然后在某个时间段后继续执行,如果多个线程同时读写共享变量,会出现数据不一致的问题。对共享变量进行写入时,必须保证是原子操作,原子操作是指不能被中断的一个或一系列操作。例如,对于语句:n = n + 1;看上去是一行语句,实际上对应了3条指令,因此为了保证一系列操作为原子操作,必须保证一系列操作在执行过程中不被其他线程执行。ava程序使用synchronized关键字对一个对象进行加锁,synchronized保证了代码块在任意时刻最多只有一个线程能执行。由于synchronized代码块无法并发执行,所以使用synchronized会导致性能下降。如何使用synchronized?首先找出修改共享变量的线程代码块,选择一个实例作为锁,使用synchronized(lockObject) { ... }。在使用synchronized的时候,不必担心抛出异常。因为无论是否有异常,都会在synchronized结束处正确释放锁。
多线程同时修改变量,会造成逻辑错误,需要通过synchronized同步,同步的本质就是给指定对象加锁,注意加锁对象必须是同一个实例,对于JVM定义的单个原子操作不需要同步。JVM规范定义了几种原子操作:基本类型(long和double除外)赋值,例如:int n = m;引用类型赋值,例如:List
同步方法:当程序执行synchronized代码块时,首先要获得synchronized指定的锁。在我们添加synchronized块时,我们需要先知道锁住的哪个对象。让线程自己选择锁对象往往会使得代码逻辑混乱,也不利于封装。更好的方法是把synchronized逻辑封装起来。数据封装:把同步逻辑封装到持有数据的实例中。当我们对this进行加锁时,我们可以使用synchronized来修饰方法,这样我们就可以把同步代码块自动变成方法识别。下面的两种写法是等价的。
public synchronized void add(int n) {
n += 1;
}
public void add(int n) {
synchronized(this){
n += 1;
}
}
而静态方法锁住的是Class实例:如下
public class A {
private static count;
public static synchronized void add(int n){
count += n;
}
}
等价于下面这种写法:
public class A {
private static count;
public static void add(int n){
synchronized(A.class) {
count += n;
}
}
}
Java中线程安全的类:
- 不变类:例如String, Integer, LocalDate,因为这些类被final修饰,一但创建,实例成员变量就不能改变,不能写只能读
- 没有成员变量的类:例如Math,这些工具只提供了工具方法,自身没有成员变量
- 正确使用synchronized得类,例如StringBuffer
其它的类都是非线程安全的类,不能在多线程中共享实例并修改,例如ArrayList,但是可以在多线程以只读的方式共享
死锁
要执行 synchronized代码块,必须首先获得指定对象的锁,Java中的线程锁是可重入锁。什么叫可重入锁?JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。Java的线程可以获取多个不同对象的锁。不同的线程在获取多个不同对象的锁的时候,可能会导致死锁。例如两个线程根被执行下面两个方法,就会导致死锁。
死锁形成的条件:两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,导致死锁。
死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。如何避免死锁?线程获取锁的顺序要一致。
wait/notify
synchronized解决了多线程竞争的问题,但是synchronized并没有解决多线程协调的问题。
wait和notify用于多线程协调运行:在synchronized内部可以调用wait()使线程进入等待状态,必须在已获得的锁对象上调用wait()方法,在synchronized内部可以调用notify()或notifyAll()唤醒其他等待线程,必须在已获得的锁对象上调用notify()或notifyAll()方法,已唤醒的线程还需要重新获得锁后才能继续执行。
JUC包
从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。线程同步是因为多线程读写竞争资源需要同步,Java语言提供了synchronized/wait/notify来实现多线程的同步,但是编写多线程同步仍然很困难。jdk1.5提供的高级的java.util.concurrent包,提供了更高级的同步功能,可以简化多线程的编写,java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁。因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。
ReentrantLock也是可重入锁,一个线程可多次获取同一个锁,lock()方法获取锁,和synchronized不同的是,ReentrantLock可以通过tryLock()方法尝试获取锁并指定超时:
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}
但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,但是,对于某些方法只读取数据,不修改数据,它实际上允许多个线程同时调用,实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待。使用ReadWriteLock可以解决这个问题,它保证只允许一个线程写入(其他线程既不能写入也不能读取),没有写入时,多个线程允许同时读(提高性能)。
public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];
public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}
public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}
使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。
使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。但是,synchronized可以配合wait和notify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写wait和notify的功能呢?答案是使用Condition对象来实现wait和notify的功能。
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。Condition提供的await()、signal()、signalAll()原理和synchronized锁对象的wait()、notify()、notifyAll()是一致的,并且其行为也是一样的:await()会释放当前锁,进入等待状态;signal()会唤醒某个等待线程;signalAll()会唤醒所有等待线程;唤醒线程从await()返回后需要重新获得锁。此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来。
Concurrent集合:java.util.concurrent提供了线程安全的Blocking集合:ArrayBlockingQueue。BlockingQueue的意思就是说,当一个线程调用这个TaskQueue的getTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。 接口 |
线程不安全的实现类 | 线程安全的实现类 |
---|---|---|
List | ArrayList | CopyOnWriteArrayList |
Map | HashMap | ConcurrentHashMap |
Set | HashSet / TreeSet | CopyOnWriteArraySet |
Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue |
Deque | ArrayDeque / LinkedList | LinkedBlockingDeque |
使用java.util.concurrent包提供的线程安全的并发集合可以大大简化多线程编程,多线程同时读写并发集合是安全的,尽量使用Java标准库提供的并发集合,避免自己编写同步代码。
Atomic:java.util.concurrent.atomic提供了一组原子类型操作,Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。如果我们自己通过CAS编写incrementAndGet(),大概如下:
public int incrementAndGet(AtomicInteger var) {
int prev, next;
do {
prev = var.get();
next = prev + 1;
} while ( ! var.compareAndSet(prev, next));
return next;
}
CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。用java.util.concurrent.atomic提供的原子操作可以简化多线程编程,原子操作实现了无锁的线程安全,适用于计数器,累加器等。
ExecutorService
由于创建线程需要操作系统资源(线程资源、栈空间),频繁创建和销毁线程需要消耗大量时间。如果可以复用一组线程,那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。所以线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。Java标准库提供了ExecutorService接口表示线程池,常用用法如下:
// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:
- FixedThreadPool:线程数固定的线程池;
- CachedThreadPool:线程数根据任务动态调整的线程池;
- SingleThreadExecutor:仅单线程执行的线程池(只包含一个线程,所有的任务只能以单线程的形式执行)
创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
// 创建一个固定大小的线程池:
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
es.submit(new Task("" + i));
}
// 关闭线程池:
es.shutdown();
}
}
class Task implements Runnable {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("start task " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("end task " + name);
}
}
还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。
总结:
- JDK提供了ExecutorService实现了线程池功能;
- 线程池内部维护一组线程,可以高效执行大量小任务;
- Executors提供了静态方法创建不同类型的ExecutorService;
- 必须调用shutdown()关闭ExecutorService;
- ScheduledThreadPool可以定期调度多个任务。
Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值.
class Task implements Callable<String> {
public String call() throws Exception {
return longTimeCalculation();
}
}
并且Callable接口是一个泛型接口,可以返回指定类型的结果。现在的问题是,如何获得异步执行的结果?如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象.
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
一个Future
- get():获取结果(可能会等待)
- get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
- cancel(boolean mayInterruptIfRunning):取消当前任务;
- isDone():判断任务是否已完成。
CompletableFuture:从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务结束时,会自动回调某个对象的方法,异步任务出错时,也会自动回调某个对象的方法,所以当主线程设置好回调后,不再关心异步任务的执行。
CompletableFuture的基本用法:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(异步执行实例);
cf.thenAccept("获得结果后的操作");
cf.exceptionnally("发生异常时的操作")
创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象。可见CompletableFuture的优点是:异步任务结束时,会自动回调某个对象的方法;异步任务出错时,会自动回调某个对象的方法;主线程设置好回调后,不再关心异步任务的执行。如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:
public class Main {
public static void main(String[] args) throws Exception {
// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}
static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
总结:CompletableFuture可以指定异步处理流程:
- thenAccept()处理正常结果;
- exceptional()处理异常结果;
- thenApplyAsync()用于串行化另一个CompletableFuture;
- anyOf()和allOf()用于并行化多个CompletableFuture。
Fork Join
Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask或RecursiveAction。使用Fork/Join模式可以进行并行计算以提高效率。
ThreadLocal
多线程是Java实现多任务的基础,Thread对象代表一个线程,我们可以在代码中调用Thread.currentThread()获取当前线程。Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象,注意到普通的方法调用一定是同一个线程执行的。ThreadLocal实例通常总是以静态字段初始化如:static ThreadLocal
【说明】:本文参考了廖雪峰官方网站的Java教程,廖雪峰官方网站的多线程教程链接:https://www.liaoxuefeng.com/wiki/1252599548343744/1255943750561472