18. 多线程

空~2022年8月18日
  • java
大约 66 分钟

18. 多线程

线程概述

几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个进程运行时,内部可能包含了多个顺序执行流,每个顺序执行流就是一个线程。

线程和进程

线程

现代操作系统(Windows,macOS,Linux)都可以执行多任务。多任务就是同时运行多个任务,例如:浏览器、QQ、音乐。

CPU 执行代码都是一条一条顺序执行的,但是,即使是单核 CPU,也可以同时运行多个任务。因为操作系统执行多任务实际上就是让 CPU 对多个任务轮流交替执行。

例如,假设我们有语文、数学、英语 3 门作业要做,每个作业需要 30 分钟。我们把这 3 门作业看成是 3 个任务,可以做 1 分钟语文作业,再做 1 分钟数学作业,再做 1 分钟英语作业:这样轮流做下去,在某些人眼里看来,做作业的速度就非常快,看上去就像同时在做 3 门作业一样。

类似的,操作系统轮流让多个任务交替执行,例如,让浏览器执行 0.001 秒,让 QQ 执行 0.001 秒,再让音乐播放器执行 0.001 秒,在人看来,CPU 就是在同时执行多个任务。

即使是多核 CPU,因为通常任务的数量远远多于 CPU 的核数,所以任务也是交替执行的。

进程

计算机中,我们把一个任务称为一个进程,浏览器就是一个进程,视频播放器是另一个进程,类似的,音乐播放器和 Word 都是进程。

某些进程内部还需要同时执行多个子任务。例如,我们在使用 Word 时,Word 可以让我们一边打字,一边进行拼写检查,同时还可以在后台进行打印,我们把子任务称为线程。

进程和线程的关系就是:一个进程可以包含一个或多个线程,但至少会有一个线程。

                        ┌──────────┐
                        │Process   │
                        │┌────────┐│
            ┌──────────┐││ Thread ││┌──────────┐
            │Process   ││└────────┘││Process   │
            │┌────────┐││┌────────┐││┌────────┐│
┌──────────┐││ Thread ││││ Thread ││││ Thread ││
│Process   ││└────────┘││└────────┘││└────────┘│
│┌────────┐││┌────────┐││┌────────┐││┌────────┐│
││ Thread ││││ Thread ││││ Thread ││││ Thread ││
│└────────┘││└────────┘││└────────┘││└────────┘│
└──────────┘└──────────┘└──────────┘└──────────┘
┌──────────────────────────────────────────────┐
│               Operating System               │
└──────────────────────────────────────────────┘

操作系统调度的最小任务单位其实不是进程,而是线程。常用的 Windows、Linux 等操作系统都采用抢占式多任务,如何调度线程完全由操作系统决定,程序自己不能决定什么时候执行,以及执行多长时间。

因为同一个应用程序,既可以有多个进程,也可以有多个线程,因此,实现多任务的方法,有以下几种:

多进程模式(每个进程只有一个线程):

┌──────────┐ ┌──────────┐ ┌──────────┐
│Process   │ │Process   │ │Process   │
│┌────────┐│ │┌────────┐│ │┌────────┐│
││ Thread ││ ││ Thread ││ ││ Thread ││
│└────────┘│ │└────────┘│ │└────────┘│
└──────────┘ └──────────┘ └──────────┘

多线程模式(一个进程有多个线程):

┌────────────────────┐
│Process             │
│┌────────┐┌────────┐│
││ Thread ││ Thread ││
│└────────┘└────────┘│
│┌────────┐┌────────┐│
││ Thread ││ Thread ││
│└────────┘└────────┘│
└────────────────────┘

多进程+多线程模式(复杂度最高):

┌──────────┐┌──────────┐┌──────────┐
│Process   ││Process   ││Process   │
│┌────────┐││┌────────┐││┌────────┐│
││ Thread ││││ Thread ││││ Thread ││
│└────────┘││└────────┘││└────────┘│
│┌────────┐││┌────────┐││┌────────┐│
││ Thread ││││ Thread ││││ Thread ││
│└────────┘││└────────┘││└────────┘│
└──────────┘└──────────┘└──────────┘

进程 vs 线程

进程和线程是包含关系,但是多任务既可以由多进程实现,也可以由单进程内的多线程实现,还可以混合多进程+多线程。

具体采用哪种方式,要考虑到进程和线程的特点。

和多线程相比,多进程的缺点在于:

  • 创建进程比创建线程开销大,尤其是在 Windows 系统上;
  • 进程间通信比线程间通信要慢,因为线程间通信就是读写同一个变量,速度很快。

而多进程的优点在于:

多进程稳定性比多线程高,因为在多进程的情况下,一个进程崩溃不会影响其他进程,而在多线程的情况下,任何一个线程崩溃会直接导致整个进程崩溃。

多线程

Java 语言内置了多线程支持:一个 Java 程序实际上是一个 JVM 进程,JVM 进程用一个主线程来执行main()方法,在main()方法内部,我们又可以启动多个线程。此外,JVM 还有负责垃圾回收的其他工作线程等。

因此,对于大多数 Java 程序来说,我们说多任务,实际上是说如何使用多线程实现多任务。

和单线程相比,多线程编程的特点在于:多线程经常需要读写共享数据,并且需要同步。例如,播放电影时,就必须由一个线程播放视频,另一个线程播放音频,两个线程需要协调运行,否则画面和声音就不同步。因此,多线程编程的复杂度高,调试更困难。

Java 多线程编程的特点又在于:

  • 多线程模型是 Java 程序最基本的并发模型;
  • 后续读写网络、数据库、Web 开发等都依赖 Java 多线程模型。

创建多线程

Java 语言内置了多线程支持。当 Java 程序启动的时候,实际上是启动了一个 JVM 进程,然后,JVM 启动主线程来执行main()方法。在main()方法中,我们又可以启动其他线程。

Thread 类创建线程类

通过继承Thread类来创建并启动多线程的步骤如下。

  1. 定义Thread类的子类,并重写该类的run()方法,该run()方法的方法体就代表了线程需要完成的任务。因此把run()方法称为线程执行体。
  2. 创建Thread子类的实例,即创建了线程对象。
  3. 调用线程对象的start()方法来启动该线程。
public class FirstThread extends Thread {

  private int i;

  public static void main(String[] args) {
    for (int i = 0; i < 100; i++) {
      // 调用Thread的currentThread()方法获取当前线程
      System.out.println(Thread.currentThread().getName() + " " + i);
      if (i == 20) {
        // 创建并启动第一个线程
        new FirstThread().start();
        // 创建并启动第二个线程
        new FirstThread().start();
      }
    }
  }

  // 重写run()方法,run()方法的方法体就是线程执行体
  public void run() {
    for (; i < 100; i++) {
      // 当线程类继承Thread类时,直接使用this即可获取当前线程
      // Thread对象的getName()返回当前线程的名字
      // 因此可以直接调用getName()方法返回当前线程的名字
      System.out.println(getName() + " " + i);
    }
  }
}

程序显式创建了 2 个子线程和主线程,main()方法的方法体代表主线程的线程执行体。

image-20220820104824187

程序用到的线程两个方法。

Thread.currentThread()currentThread()是 Thread 类的静态方法,该方法总是返回当前正在执行的线程对象。

getName():该方法是 Thread 类的实例方法,该方法返回调用该方法的线程名字。

提示

程序可以通过setName(String name)方法为线程设置名字,也可以通过getName()方法返回指定线程的名字。

在默认情况下,主线程的名字为main,用户启动的多个线程的名字依次为Thread-0Thread-1Thread-2、…、Thread-n等。

通过实例化一个Thread实例,然后调用它的start()方法启动多线程:

public class Main {

  public static void main(String[] args) {
    Thread t = new Thread();
    t.start(); // 启动新线程
  }
}

但是这个线程启动后实际上什么也不做就立刻结束了。我们希望新线程能执行指定的代码,有以下几种方法:

方法一:从Thread派生一个自定义类,然后覆写run()方法:

public class Main {

  public static void main(String[] args) {
    Thread t = new MyThread();
    t.start(); // 启动新线程
  }
}

class MyThread extends Thread {

  @Override
  public void run() {
    System.out.println("start new thread!");
  }
}

执行上述代码,注意到start()方法会在内部自动调用实例的run()方法。

方法二:创建Thread实例时,传入一个Runnable实例:

public class Main {

  public static void main(String[] args) {
    Thread t = new Thread(new MyRunnable());
    t.start(); // 启动新线程
  }
}

class MyRunnable implements Runnable {

  @Override
  public void run() {
    System.out.println("start new thread!");
  }
}

或者用 Java8 引入的 lambda 语法进一步简写为:

public class Main {
  public static void main(String[] args) {
    Thread t = new Thread(() -> {
      System.out.println("start new thread!");
    });
    t.start(); // 启动新线程
  }
}

使用线程执行的打印语句,和直接在main()方法执行的区别。

public class Main {

  public static void main(String[] args) {
    System.out.println("main start...");
    Thread t = new Thread(() -> {
      System.out.println("thread run...");
      System.out.println("thread end.");
    });
    t.start();
    System.out.println("main end...");
  }
}

main线程执行的代码有 4 行,首先打印main start,然后创建Thread对象,紧接着调用start()启动新线程。当start()方法被调用时,JVM 就创建了一个新线程,通过实例变量t来表示这个新线程对象,并开始执行。

接着,main线程继续执行打印main end语句,而t线程在main线程执行的同时会并发执行,打印thread runthread end语句。

run()方法结束时,新线程就结束了。而main()方法结束时,主线程也结束了。

线程的执行顺序:

  1. main线程肯定是先打印main start,再打印main end
  2. t线程肯定是先打印thread run,再打印thread end

但是,除了可以肯定,main start会先打印外,main end打印在thread run之前、thread end之后或者之间,都无法确定。因为从t线程开始运行以后,两个线程就开始同时运行了,并且由操作系统调度,程序本身无法确定线程的调度顺序。

可以在线程中调用Thread.sleep(),强迫当前线程暂停一段时间:

public class Main {

  public static void main(String[] args) {
    System.out.println("main start...");
    Thread t = new Thread() {
      public void run() {
        System.out.println("thread run...");
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
        System.out.println("thread end.");
      }
    };
    t.start();
    try {
      Thread.sleep(20);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    System.out.println("main end...");
  }
}

sleep()传入的参数是毫秒。调整暂停时间的大小,我们可以看到main线程和t线程执行的先后顺序。

要特别注意:直接调用Thread实例的run()方法是无效的:

public class Main {

  public static void main(String[] args) {
    Thread t = new MyThread();
    t.run();
  }
}

class MyThread extends Thread {

  public void run() {
    System.out.println("hello");
  }
}

直接调用run()方法,相当于调用了一个普通的 Java 方法,当前线程并没有任何改变,也不会启动新线程。上述代码实际上是在main()方法内部又调用了run()方法,打印hello语句是在main线程中执行的,没有任何新线程被创建。

必须调用Thread实例的start()方法才能启动新线程,查看Thread类的源代码,会看到start()方法内部调用了一个private native void start0()方法,native修饰符表示这个方法是由 JVM 虚拟机内部的 C 代码实现的,不是由 Java 代码实现的。

实现 Runnable 接口创建线程类

实现Runnable接口来创建并启动多线程的步骤如下。

  1. 定义Runnable接口的实现类,并重写该接口的run()方法,该run()方法的方法体同样是该线程的线程执行体。
  2. 创建Runnable实现类的实例,并以此实例作为Threadtarget来创建Thread对象,该Thread对象才是真正的线程对象。
  3. 调用线程对象的start()方法来启动该线程。

代码如下所示:

public class SecondThread implements Runnable {

  private int i;

  public static void main(String[] args) {
    for (int i = 0; i < 100; i++) {
      System.out.println(Thread.currentThread().getName() + "  " + i);
      if (i == 20) {
         // 创建Runnable实现类对象
        SecondThread st = new SecondThread();    // ①
        // 通过new Thread(target , name)方法创建新线程
        new Thread(st).start();
        // 可以为该Thread对象指定名字
        new Thread(st, "新线程1").start();
        new Thread(st, "新线程2").start();
      }
    }
  }

  // run()方法同样是线程执行体
  public void run() {
    for (; i < 100; i++) {
      // 当线程类实现Runnable接口时
      // 如果想获取当前线程,只能用Thread.currentThread()方法
      System.out.println(Thread.currentThread().getName() + "  " + i);
    }
  }
}

提示

Runnable对象仅仅作为Thread对象的targetRunnable实现类里包含的run()方法仅作为线程执行体。

而实际的线程对象依然是Thread实例,只是该Thread线程负责执行其targetrun()方法。

通过实现Runnable接口来获得当前线程对象,则必须使用Thread.currentThread()方法。

程序打印出的结果中,三个子线程的i变量的值是连续的(但有重复),也就是采用Runnable接口的方式创建的多个线程可以共享线程类的实例属性。这是因为在这种方式下,程序所创建的Runnable对象只是线程的target,而多个线程可以共享同一个target,所以多个线程可以共享同一个线程类(实际上应该是线程的target类)的实例属性。

使用 Callable 和 Future 创建线程

从 Java 5 开始,Java 提供了Callable接口,该接口提供了一个call()方法可以作为线程执行体,但call()方法比run()方法功能更强大。

  1. call()方法可以有返回值。
  2. call()方法可以声明抛出异常。

Callable接口是 Java5 新增的接口,而且它不是Runnable接口的子接口,所以Callable对象不能直接作为Threadtarget。Java 5 提供了Future接口来代表Callable接口里call()方法的返回值,并为Future接口提供了一个FutureTask实现类,该实现类实现了Future接口,并实现了Runnable接口——可以作为Thread类的target

Future接口里定义了如下几个公共方法来控制它关联的Callable任务。

  1. boolean cancel(boolean mayInterruptIfRunning):试图取消该Future里关联的Callable任务。
  2. V get():返回Callable任务里call()方法的返回值。调用该方法将导致程序阻塞,必须等到子线程结束后才会得到返回值。
  3. V get(long timeout, TimeUnit unit):返回Callable任务里call()方法的返回值。该方法让程序最多阻塞timeoutunit指定的时间,如果经过指定时间后Callable任务依然没有返回值,将会抛出TimeoutException异常。
  4. boolean isCancelled():如果在Callable任务正常完成前被取消,则返回true
  5. boolean isDone():如果Callable任务已完成,则返回true

提示

Callable接口有泛型限制,Callable接口里的泛型形参类型与call()方法返回值类型相同。

创建并启动有返回值的线程的步骤如下。

  1. 创建Callable接口的实现类,并实现call()方法,该call()方法将作为线程执行体,且该call()方法有返回值。
  2. 创建Callable实现类的实例,使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。
  3. 使用FutureTask对象作为Thread对象的target创建并启动新线程。
  4. 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。
public class ThirdThread implements Callable {

  public static void main(String[] args) {
    // 创建Callable对象
    ThirdThread rt = new ThirdThread();
    // 使用FutureTask来包装Callable对象
    FutureTask task = new FutureTask(rt);
    for (int i = 0; i < 100; i++) {
      System.out.println(Thread.currentThread().getName() + " 的循环变量i的值:" + i);
      if (i == 20) {
        // 实质还是以Callable对象来创建并启动线程
        new Thread(task, "有返回值的线程").start();
      }
    }
    try {
      // 获取线程返回值
      System.out.println("子线程的返回值:" + task.get());
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  // 实现call()方法,作为线程执行体
  public Integer call() {
    int i = 0;
    for (; i < 100; i++) {
      System.out.println(Thread.currentThread().getName() + " 的循环变量i的值:" + i);
    }
    // call()方法可以有返回值
    return i;
  }
}

创建线程的三种方式对比

实现Runnable接口与实现Callable接口的方式基本相同,因此可以将实现Runnable接口和实现Callable接口归为一种方式。这种方式与继承Thread方式之间的主要差别如下。

采用实现RunnableCallable接口的方式创建多线程:

  • 线程类只是实现了Runnable接口或Callable接口,还可以继承其他类。
  • 在这种方式下,多个线程可以共享同一个target对象,所以非常适合多个相同线程来处理同一份资源的情况,从而可以将 CPU、代码和数据分开,形成清晰的模型,较好地体现了面向对象的思想。
  • 劣势是:编程稍稍复杂,如果需要访问当前线程,则必须使用Thread.currentThread()方法。

采用继承Thread类的方式创建多线程:

  • 劣势是:因为线程类已经继承了Thread类,所以不能再继承其他父类。
  • 优势是:编写简单,如果需要访问当前线程,则无须使用Thread.currentThread()方法,直接使用this即可获得当前线程。

一般推荐采用实现Runnable接口、Callable接口的方式来创建多线程。

线程的生命周期

在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead) 5 种状态。

用一个状态转移图表示如下:

         ┌─────────────┐
         │     New     │
         └─────────────┘
                │
                ▼
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
 ┌─────────────┐ ┌─────────────┐
││  Running    │ │   Blocked   ││
 └─────────────┘ └─────────────┘
│┌─────────────┐ ┌─────────────┐│
 │   Waiting   │ │Timed Waiting│
│└─────────────┘ └─────────────┘│
 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
                │
                ▼
         ┌─────────────┐
         │ Terminated  │
         └─────────────┘

当线程启动后,它可以在 RunningBlockedWaitingTimed Waiting 这几个状态之间切换,直到最后变成 Terminated 状态,线程终止。

新建和就绪状态

当程序使用new关键字创建了一个线程之后,该线程就处于新建状态,此时它和其他的 Java 对象一样,仅仅由 Java 虚拟机为其分配内存,并初始化其成员变量的值。

当线程对象调用了start()方法之后,该线程处于就绪状态,Java 虚拟机会为其创建方法调用栈和程序计数器,处于这个状态中的线程并没有开始运行,只是表示该线程可以运行了。可以通过再次运行FirstThreadSecondThread来证明。

image-20220820171342935

注意

只能对处于新建状态的线程调用start()方法,否则将引发IllegalThreadStateException异常。

运行和阻塞状态

当处于就绪状态的线程获得了 CPU 开始执行run()方法的线程执行体时,该线程处于运行状态,如果计算机只有一个 CPU,那么在任何时刻只有一个线程处于运行状态。当然,在一个多处理器的机器上,将会有多个线程并行(注意是并行:parallel)执行;当线程数大于处理器数时,依然会存在多个线程在同一个 CPU 上轮换的现象。

当一个线程开始运行后,它不可能一直处于运行状态(除非它的线程执行体足够短,瞬间就执行结束了),线程在运行过程中需要被中断,目的是使其他线程获得执行的机会,线程调度的细节取决于底层平台所采用的策略。

对于采用抢占式策略的系统而言,系统会给每个可执行的线程一个小时间段来处理任务;当该时间段用完后,系统就会剥夺该线程所占用的资源,让其他线程获得执行的机会。在选择下一个线程时,系统会考虑线程的优先级。

当发生如下情况时,线程将会进入阻塞状态。

  • 线程调用sleep()方法主动放弃所占用的处理器资源。
  • 线程调用了一个阻塞式IO方法,在该方法返回之前,该线程被阻塞。
  • 线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。关于同步监视器的知识、后面将有更深入的介绍。
  • 线程在等待某个通知(notify)。
  • 程序调用了线程的suspend()方法将该线程挂起。但这个方法容易导致死锁,所以应该尽量避免使用该方法。

当发生如下特定的情况时可以解除上面的阻塞,让该线程重新进入就绪状态。

  • 调用sleep()方法的线程经过了指定时间。
  • 线程调用的阻塞式IO方法已经返回。
  • 线程成功地获得了试图取得的同步监视器。
  • 线程正在等待某个通知时,其他线程发出了一个通知。
  • 处于挂起状态的线程被调用了resume()恢复方法。

线程状态转换图。

epub_681336_2144

线程从阻塞状态只能进入就绪状态,无法直接进入运行状态。而就绪和运行状态之间的转换通常不受程序控制,而是由系统线程调度所决定,当处于就绪状态的线程获得处理器资源时,该线程进入运行状态;当处于运行状态的线程失去处理器资源时,该线程进入就绪状态。

直接调用yield()方法也可以让运行状态的线程转入就绪状态。

线程死亡

线程会以如下 3 种方式结束,结束后就处于死亡状态。

  • run()call()方法执行完成,线程正常结束。
  • 线程抛出一个未捕获的ExceptionError
  • 直接调用该线程的stop()方法来结束该线程——该方法容易导致死锁,通常不推荐使用。

提示

当主线程结束时,其他线程不受任何影响,并不会随之结束。一旦子线程启动起来后,它就拥有和主线程相同的地位,它不会受主线程的影响。

当线程处于就绪、运行、阻塞 3 种状态时,可以调用线程对象的isAlive()方法,该方法将返回 true;当线程处于新建、死亡 2 种状态时,该方法将返回 false。

尝试对处于死亡状态的线程再次调用start()方法。

public class StartDead extends Thread {

  private int i;

  public static void main(String[] args) {
    // 创建线程对象
    StartDead sd = new StartDead();
    for (int i = 0; i < 300; i++) {
      // 调用Thread的currentThread()方法获取当前线程
      System.out.println(Thread.currentThread().getName() + " " + i);
      if (i == 20) {
        // 启动线程
        sd.start();
        // 判断启动后线程的isAlive()值,输出true
        System.out.println(sd.isAlive());
      }
      // 当线程处于新建、死亡两种状态时,isAlive()方法返回false
      // 当i > 20时,该线程肯定已经启动过了,如果sd.isAlive()为假时,
      // 那就是死亡状态了
      if (i > 20 && !sd.isAlive()) {
        // 试图再次启动该线程
        sd.start();
      }
    }
  }

  // 重写run()方法,run()方法的方法体就是线程执行体
  public void run() {
    for (; i < 10; i++) {
      System.out.println(getName() + " " + i);
    }
  }
}

运行上面程序,将引发IllegalThreadStateException异常。

提示

如果子线程在主线程之后结束则程序会正常执行,可以将子线程的循环打印次数增加至和主线程一样增加这种概率。

控制线程

Java 的线程支持提供了一些便捷的工具方法,通过这些便捷的工具方法可以很好地控制线程的执行。

join 线程

当在某个程序执行流中调用其他线程的join()方法时,调用线程将被阻塞,直到被join()方法加入的join线程执行完为止。

public class JoinThread extends Thread {

  // 提供一个有参数的构造器,用于设置该线程的名字
  public JoinThread(String name) {
    super(name);
  }

  public static void main(String[] args) throws Exception {
    // 启动子线程
    new JoinThread("新线程").start();
    for (int i = 0; i < 100; i++) {
      if (i == 20) {
        JoinThread jt = new JoinThread("被Join的线程");
        jt.start();
        // main线程调用了jt线程的join()方法,main线程
        // 必须等jt执行结束才会向下执行
        jt.join();
      }
      System.out.println(Thread.currentThread().getName()
          + "  " + i);
    }
  }

  // 重写run()方法,定义线程执行体
  public void run() {
    for (int i = 0; i < 100; i++) {
      System.out.println(getName() + "  " + i);
    }
  }
}

“被 Join 的线程”的线程,该线程不会和main线程并发执行,main线程必须等该线程执行结束后才可以向下执行。“被 Join 的线程”的线程执行时,实际上只有 2 个子线程并发执行,主线程处于等待状态。

join()方法有如下 3 种重载形式。

  • join():等待被join的线程执行完成。
  • join(long millis):等待被join的线程的时间最长为millis毫秒。如果在millis毫秒内被join的线程还没有执行结束,则不再等待。
  • join(long millis, int nanos):等待被join的线程的时间最长为millis毫秒 + nanos毫微秒。

后台线程

有一种线程,它是在后台运行的,它的任务是为其他的线程提供服务,这种线程被称为“后台线程(Daemon Thread)”,又称为“守护线程”或“精灵线程”。

后台线程有个特征:如果所有的前台线程都死亡,后台线程会自动死亡。

调用Thread对象的setDaemon(true)方法可将指定线程设置成后台线程。

public class DaemonThread extends Thread {

  public static void main(String[] args) {
    DaemonThread t = new DaemonThread();
    // 将此线程设置成后台线程
    t.setDaemon(true);
    // 启动后台线程
    t.start();
    for (int i = 0; i < 10; i++) {
      System.out.println(Thread.currentThread().getName()
          + "  " + i);
    }
    // -----程序执行到此处,前台线程(main线程)结束------
    // 后台线程也应该随之结束
  }

  // 定义后台线程的线程执行体与普通线程没有任何区别
  public void run() {
    for (int i = 0; i < 1000; i++) {
      System.out.println(getName() + "  " + i);
    }
  }
}

Thread 类还提供了一个isDaemon()方法,用于判断指定线程是否为后台线程。

要将某个线程设置为后台线程,必须在该线程启动之前设置,也就是说, setDaemon(true)必须在start()方法之前调用,否则会引发IllegalThreadStateException异常。

注意

在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。

线程睡眠:sleep

如果需要让当前正在执行的线程暂停一段时间,并进入阻塞状态,则可以通过调用Thread类的静态sleep()方法来实现。sleep()方法有两种重载形式。

  • static void sleep(long millis):让当前正在执行的线程暂停millis毫秒,并进入阻塞状态,该方法受到系统计时器和线程调度器的精度与准确度的影响。
  • static void sleep(long millis, int nanos):让当前正在执行的线程暂停millis毫秒 + nanos毫微秒,并进入阻塞状态,该方法受到系统计时器和线程调度器的精度与准确度的影响。
public class SleepTest {

  public static void main(String[] args)
      throws Exception {
    for (int i = 0; i < 10; i++) {
      System.out.println("当前时间: " + new Date());
      // 调用sleep()方法让当前线程暂停1s
      Thread.sleep(1000);
    }
  }
}

线程让步:yield

yield()方法是一个和sleep()方法有点相似的方法,它也是Thread类提供的一个静态方法,它也可以让当前正在执行的线程暂停,但它不会阻塞该线程,它只是将该线程转入就绪状态。

public class YieldTest extends Thread {

  public YieldTest(String name) {
    super(name);
  }

  public static void main(String[] args) throws Exception {
    // 启动两个并发线程
    YieldTest yt1 = new YieldTest("高级");
    // 将yt1线程设置成最高优先级
    // yt1.setPriority(Thread.MAX_PRIORITY);
    yt1.start();
    YieldTest yt2 = new YieldTest("低级");
    // 将yt2线程设置成最低优先级
    // yt2.setPriority(Thread.MIN_PRIORITY);
    yt2.start();
  }

  // 定义run()方法作为线程执行体
  public void run() {
    for (int i = 0; i < 50; i++) {
      System.out.println(getName() + "  " + i);
      // 当i等于20时,使用yield()方法让当前线程让步
      if (i == 20) {
        Thread.yield();
      }
    }
  }
}

yield()只是让当前线程暂停一下,让系统的线程调度器重新调度一次,完全可能的情况是:当某个线程调用了yield()方法暂停之后,线程调度器又将其调度出来重新执行。

当某个线程调用了yield()方法暂停之后,只有优先级与当前线程相同,或者优先级比当前线程更高的处于就绪状态的线程才会获得执行的机会。

sleep()方法和yield()方法的区别如下。

  • sleep()方法暂停当前线程后,会给其他线程执行机会,不会理会其他线程的优先级;但yield()方法只会给优先级相同,或优先级更高的线程执行机会。
  • sleep()方法会将线程转入阻塞状态,直到经过阻塞时间才会转入就绪状态;而yield()不会将线程转入阻塞状态,它只是强制当前线程进入就绪状态。因此完全有可能某个线程调用yield()方法暂停之后,立即再次获得处理器资源被执行。
  • sleep()方法声明抛出了InterruptedException异常,所以调用sleep()方法时要么捕捉该异常,要么显式声明抛出该异常;而yield()方法则没有声明抛出任何异常。
  • sleep()方法比yield()方法有更好的可移植性,通常不建议使用yield()方法来控制并发线程的执行。

改变线程优先级

每个线程执行时都具有一定的优先级,优先级高的线程获得较多的执行机会,而优先级低的线程则获得较少的执行机会。

每个线程默认的优先级都与创建它的父线程的优先级相同,在默认情况下,main线程具有普通优先级,由main线程创建的子线程也具有普通优先级。

Thread类提供了setPriority(int newPriority)getPriority()方法来设置和返回指定线程的优先级,其中setPriority()方法的参数可以是一个整数,范围是 1 ~ 10 之间,也可以使用Thread类的如下 3 个静态常量。

  • MAX_PRIORITY:其值是 10。
  • MIN_PRIORITY:其值是 1。
  • NORM_PRIORITY:其值是 5。
public class PriorityTest extends Thread {

  // 定义一个有参数的构造器,用于创建线程时指定name
  public PriorityTest(String name) {
    super(name);
  }

  public static void main(String[] args) {
    // 改变主线程的优先级
    Thread.currentThread().setPriority(6);
    for (int i = 0; i < 30; i++) {
      if (i == 10) {
        PriorityTest low = new PriorityTest("低级");
        low.start();
        System.out.println("创建之初的优先级:" + low.getPriority());
        // 设置该线程为最低优先级
        low.setPriority(Thread.MIN_PRIORITY);
      }
      if (i == 20) {
        PriorityTest high = new PriorityTest("高级");
        high.start();
        System.out.println("创建之初的优先级:"
            + high.getPriority());
        // 设置该线程为最高优先级
        high.setPriority(Thread.MAX_PRIORITY);
      }
    }
  }

  public void run() {
    for (int i = 0; i < 50; i++) {
      System.out.println(getName() + ",其优先级是:" + getPriority() + ",循环变量的值为:" + i);
    }
  }
}

相关信息

虽然 Java 提供了 10 个优先级级别,但这些优先级级别需要操作系统的支持。遗憾的是,不同操作系统上的优先级并不相同,而且也不能很好地和 Java 的 10 个优先级对应,例如 Windows 2000 仅提供了 7 个优先级。

在这种情况下,我们应该尽量避免直接为线程指定优先级,而应该使用 MAX_PRIORITY、MIN_PRIORITY 和 NORM_PRIORITY 三个静态常量来设置优先级,这样才可以保证程序具有最好的可移植性。

线程同步

当多个线程同时运行时,线程的调度由操作系统决定,程序本身无法决定。因此,任何一个线程都有可能在任何指令处被操作系统暂停,然后在某个时间段后继续执行。

这个时候,有个单线程模型下不存在的问题就来了:如果多个线程同时读写共享变量,会出现数据不一致的问题。

线程安全问题

关于线程安全的一个经典问题——银行取钱。银行取钱的基本流程基本上可以分为如下几个步骤。

  1. 用户输入账户、密码,系统判断用户的账户、密码是否匹配。
  2. 用户输入取款金额。
  3. 系统判断账户余额是否大于取款金额。
  4. 如果余额大于取款金额,则取款成功;如果余额小于取款金额,则取款失败。

使用两个线程来模拟取钱操作,模拟两个人使用同一个账户并发取钱的问题。

@Data
public class Account {

  // 封装账户编号、账户余额两个Field
  private String accountNo;
  private double balance;

  public Account() {
  }

  // 构造器
  public Account(String accountNo, double balance) {
    this.accountNo = accountNo;
    this.balance = balance;
  }

  // 下面两个方法根据accountNo来重写hashCode()和equals()方法
  public int hashCode() {
    return accountNo.hashCode();
  }

  public boolean equals(Object obj) {
    if (this == obj) {
      return true;
    }
    if (obj != null
        && obj.getClass() == Account.class) {
      Account target = (Account) obj;
      return target.getAccountNo().equals(accountNo);
    }
    return false;
  }
}

取钱的线程类,该线程类根据执行账户、取钱数量进行取钱操作,取钱的逻辑是当其余额不足时无法提取现金,当余额足够时系统吐出钞票,余额减少。

public class DrawThread extends Thread {

  // 模拟用户账户
  private Account account;
  // 当前取钱线程所希望取的钱数
  private double drawAmount;

  public DrawThread(String name, Account account
      , double drawAmount) {
    super(name);
    this.account = account;
    this.drawAmount = drawAmount;
  }

  // 当多个线程修改同一个共享数据时,将涉及数据安全问题
  public void run() {
    // 账户余额大于取钱数目
    if (account.getBalance() >= drawAmount) {
      // 吐出钞票
      System.out.println(getName() + "取钱成功!吐出钞票:" + drawAmount);
      // 修改余额
      account.setBalance(account.getBalance() - drawAmount);
      System.out.println("\t余额为: " + account.getBalance());
    } else {
      System.out.println(getName() + "取钱失败!余额不足!");
    }
  }
}

创建一个账户,并启动两个线程从该账户中取钱。

public class DrawTest {

  public static void main(String[] args) {
    // 创建一个账户
    Account acct = new Account("1234567", 1000);
    // 模拟两个线程对同一个账户取钱
    new DrawThread("甲", acct, 800).start();
    new DrawThread("乙", acct, 800).start();
  }
}

多次运行上面程序,很有可能都会看到如图所示的错误结果。

image-20220820191008047

同步代码块

为了解决同步问题,Java 的多线程支持引入了同步监视器来解决这个问题,使用同步监视器的通用方法就是同步代码块。

synchronized(obj) {
    ...
    //此处的代码就是同步代码块
}

synchronized后括号里的obj就是同步监视器,上面代码的含义是:线程开始执行同步代码块之前,必须先获得对同步监视器的锁定。

任何时刻只能有一个线程可以获得对同步监视器的锁定,当同步代码块执行完成后,该线程会释放对该同步监视器的锁定。

通常推荐使用可能被并发访问的共享资源充当同步监视器。

对于上面的取钱模拟程序,使用账户(account)作为同步监视器。

public class DrawThread extends Thread {

  // 模拟用户账户
  private final Account account;
  // 当前取钱线程所希望取的钱数
  private final double drawAmount;

  public DrawThread(String name, Account account
      , double drawAmount) {
    super(name);
    this.account = account;
    this.drawAmount = drawAmount;
  }

  // 当多个线程修改同一个共享数据时,将涉及数据安全问题
  public void run() {
    // 使用account作为同步监视器,任何线程进入下面同步代码块之前
    // 必须先获得对account账户的锁定——其他线程无法获得锁,也就无法修改它
    // 这种做法符合:“加锁 → 修改 → 释放锁”的逻辑
    synchronized (account) {
      // 账户余额大于取钱数目
      if (account.getBalance() >= drawAmount) {
        // 吐出钞票
        System.out.println(getName()
            + "取钱成功!吐出钞票:" + drawAmount);
        try {
          Thread.sleep(1);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
        // 修改余额
        account.setBalance(account.getBalance() - drawAmount);
        System.out.println("\t余额为: " + account.getBalance());
      } else {
        System.out.println(getName() + "取钱失败!余额不足!");
      }
    }
    // 同步代码块结束,该线程释放同步锁
  }
}

同步方法

同步方法就是使用synchronized关键字来修饰某个方法,则该方法称为同步方法。对于同步方法而言,无须显式指定同步监视器,同步方法的同步监视器是this,也就是该对象本身。

通过使用同步方法可以非常方便地实现线程安全的类,线程安全的类具有如下特征。

  • 该类的对象可以被多个线程安全地访问。
  • 每个线程调用该对象的任意方法之后都将得到正确结果。
  • 每个线程调用该对象的任意方法之后,该对象状态依然保持合理状态。

Account类对balance的访问设置成线程安全的,把balance的方法修改成同步方法。

@Data
public class Account {

  // 封装账户编号、账户余额两个Field
  private String accountNo;
  private double balance;

  public Account() {
  }

  // 构造器
  public Account(String accountNo, double balance) {
    this.accountNo = accountNo;
    this.balance = balance;
  }

  // 因为账户余额不允许随便修改,所以只为balance提供getter方法
  public double getBalance() {
    return this.balance;
  }

  // 提供一个线程安全的draw()方法来完成取钱操作
  public synchronized void draw(double drawAmount) {
    // 账户余额大于取钱数目
    if (balance >= drawAmount) {
      // 吐出钞票
      System.out.println(Thread.currentThread().getName()
          + "取钱成功!吐出钞票:" + drawAmount);
      try {
        Thread.sleep(1);
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
      // 修改余额
      balance -= drawAmount;
      System.out.println("\t余额为: " + balance);
    } else {
      System.out.println(Thread.currentThread().getName()
          + "取钱失败!余额不足!");
    }
  }

  // 下面两个方法根据accountNo来重写hashCode()和equals()方法
  public int hashCode() {
    return accountNo.hashCode();
  }

  public boolean equals(Object obj) {
    if (this == obj) {
      return true;
    }
    if (obj != null
        && obj.getClass() == Account.class) {
      Account target = (Account) obj;
      return target.getAccountNo().equals(accountNo);
    }
    return false;
  }
}

修改DrawThread线程类,该线程类的run()方法只要调用Account对象的draw()方法即可执行取钱操作。

public void run() {
  // 直接调用account对象的draw()方法来执行取钱操作
  // 同步方法的同步监视器是this,this代表调用draw()方法的对象
  // 也就是说,线程进入draw()方法之前,必须先对account对象加锁
  account.draw(drawAmount);
}

可变类的线程安全是以降低程序的运行效率作为代价的,为了减少线程安全所带来的负面影响,程序可以采用如下策略。

  • 不要对线程安全类的所有方法都进行同步,只对那些会是共享资源的方法进行同步。例如上面Account类中的accountNo属性就无须同步,所以程序只对draw()方法进行了同步控制。
  • 如果可变类有两种运行环境:单线程环境和多线程环境,则应该为该可变类提供两种版本,即线程不安全版本和线程安全版本。在单线程环境中使用线程不安全版本以保证性能,在多线程环境中使用线程安全版本。

释放同步监视器的锁定

程序无法显式释放对同步监视器的锁定,线程会在如下几种情况下释放对同步监视器的锁定。

  1. 当前线程的同步方法、同步代码块执行结束,当前线程即释放同步监视器。
  2. 当前线程在同步代码块、同步方法中遇到breakreturn终止了该代码块、该方法的继续执行,当前线程将会释放同步监视器。
  3. 当前线程在同步代码块、同步方法中出现了未处理的ErrorException,导致了该代码块、该方法异常结束时,当前线程将会释放同步监视器。
  4. 当前线程执行同步代码块或同步方法时,程序执行了同步监视器对象的wait()方法,则当前线程暂停,并释放同步监视器。

在如下所示的情况下,线程不会释放同步监视器。

  • 线程执行同步代码块或同步方法时,程序调用Thread.sleep()Thread.yield()方法来暂停当前线程的执行,当前线程不会释放同步监视器。
  • 线程执行同步代码块时,其他线程调用了该线程的suspend()方法将该线程挂起,该线程不会释放同步监视器。尽量避免使用suspend()resume()方法来控制线程。

同步锁(Lock)

从 Java 5 开始,Java 提供了一种功能更强大的线程同步机制——通过显式定义同步锁对象来实现同步,在这种机制下,同步锁使用Lock对象充当。

Lock提供了比synchronized方法和synchronized代码块更广泛的锁定操作,Lock实现允许更灵活的结构,可以具有差别很大的属性,并且支持多个相关的Condition对象。Lock是控制多个线程对共享资源进行访问的工具。

通常,锁提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁,线程开始访问共享资源之前应先获得Lock对象。

某些锁可能允许对共享资源并发访问,如ReadWriteLock(读写锁),LockReadWriteLock是 Java5 新提供的两个根接口,并为Lock提供了ReentrantLock(可重入锁)实现类;为ReadWriteLock提供了ReentrantReadWriteLock实现类。

使用该Lock对象可以显式地加锁、释放锁,通常使用ReentrantLock的代码格式如下:

class X {

  // 定义锁对象
  private final ReentrantLock lock = new ReentrantLock();

  // ...
  // 定义需要保证线程安全的方法
  public void m() {
    // 加锁
    lock.lock();
    try {
      // 需要保证线程安全的代码
      // ... method body
    }
    // 使用finally块来保证释放锁
    finally {
      lock.unlock();
    }
  }
}

使用ReentrantLock对象来进行同步,加锁和释放锁出现在不同的作用范围内时,通常建议使用finally块来确保在必要时释放锁。

通过使用ReentrantLock对象修改Account

@Data
public class Account {

  // 定义锁对象
  private final ReentrantLock lock = new ReentrantLock();
  // 封装账户编号、账户余额两个Field
  private String accountNo;
  private double balance;

  public Account() {
  }

  // 构造器
  public Account(String accountNo, double balance) {
    this.accountNo = accountNo;
    this.balance = balance;
  }

  // 因为账户余额不允许随便修改,所以只为balance提供getter方法
  public double getBalance() {
    return this.balance;
  }

  // 提供一个线程安全的draw()方法来完成取钱操作
  public void draw(double drawAmount) {
    // 加锁
    lock.lock();
    try {
      // 账户余额大于取钱数目
      if (balance >= drawAmount) {
        // 吐出钞票
        System.out.println(Thread.currentThread().getName()
            + "取钱成功!吐出钞票:" + drawAmount);
        try {
          Thread.sleep(1);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
        // 修改余额
        balance -= drawAmount;
        System.out.println("\t余额为: " + balance);
      } else {
        System.out.println(Thread.currentThread().getName()
            + "取钱失败!余额不足!");
      }
    } finally {
      // 修改完成,释放锁
      lock.unlock();
    }
  }

  // 下面两个方法根据accountNo来重写hashCode()和equals()方法
  public int hashCode() {
    return accountNo.hashCode();
  }

  public boolean equals(Object obj) {
    if (this == obj) {
      return true;
    }
    if (obj != null
        && obj.getClass() == Account.class) {
      Account target = (Account) obj;
      return target.getAccountNo().equals(accountNo);
    }
    return false;
  }
}

Lock提供了同步方法和同步代码块所没有的其他功能,包括用于非块结构的tryLock()方法,以及试图获取可中断锁的lockInterruptibly()方法,还有获取超时失效锁的tryLock(long, TimeUnit)方法。

ReentrantLock锁具有可重入性,一个线程可以对已被加锁的ReentrantLock锁再次加锁, ReentrantLock对象会维持一个计数器来追踪lock()方法的嵌套调用,线程在每次调用lock()加锁后,必须显式调用unlock()来释放锁,所以一段被锁保护的代码可以调用另一个被相同锁保护的方法。

死锁

当两个线程相互等待对方释放同步监视器时就会发生死锁,一旦出现死锁,整个程序既不会发生任何异常,也不会给出任何提示,只是所有线程处于阻塞状态,无法继续。

class A {

  public synchronized void foo(B b) {
    System.out.println("当前线程名: " + Thread.currentThread().getName()
        + " 进入了A实例的foo方法");    //①
    try {
      Thread.sleep(200);
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
    System.out.println("当前线程名: " + Thread.currentThread().getName()
        + " 企图调用B实例的last方法");   //③
    b.last();
  }

  public synchronized void last() {
    System.out.println("进入了A类的last方法内部");
  }
}

class B {

  public synchronized void bar(A a) {
    System.out.println("当前线程名: " + Thread.currentThread().getName()
        + " 进入了B实例的bar方法");   //②
    try {
      Thread.sleep(200);
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
    System.out.println("当前线程名: " + Thread.currentThread().getName()
        + " 企图调用A实例的last方法");  //④
    a.last();
  }

  public synchronized void last() {
    System.out.println("进入了B类的last方法内部");
  }
}

public class DeadLock implements Runnable {

  A a = new A();
  B b = new B();

  public static void main(String[] args) {
    DeadLock dl = new DeadLock();
    // 以dl为target启动新线程
    new Thread(dl).start();
    // 调用init()方法
    dl.init();
  }

  public void init() {
    Thread.currentThread().setName("主线程");
    // 调用a对象的foo()方法
    a.foo(b);
    System.out.println("进入了主线程之后");
  }

  public void run() {
    Thread.currentThread().setName("副线程");
    // 调用b对象的bar()方法
    b.bar(a);
    System.out.println("进入了副线程之后");
  }
}

程序既无法向下执行,也不会抛出任何异常。

程序中A对象和B对象都是同步锁。程序中两个线程执行,一个线程的线程执行体是DeadLock类的run()方法,另一个线程的线程执行体是DeadLockinit()方法(主线程调用了init()方法)。

其中run()方法中让B对象调用bar()方法,而init()方法让A对象调用foo()方法。

  • init()方法先执行,调用了A对象的foo()方法,进入foo()方法之前,该线程对A对象加锁——当程序执行到 ① 号代码时,主线程暂停 200ms;
  • CPU 切换到执行另一个线程,让B对象执行bar()方法,进入bar()方法之前,该线程对B对象加锁——当程序执行到 ② 号代码时,副线程也暂停 200ms;
  • 接下来主线程会先醒过来,继续向下执行,直到 ③ 号代码处希望调用B对象的last()方法——执行该方法之前必须先对B对象加锁,但此时副线程正保持着B对象的锁,所以主线程阻塞;
  • 接下来副线程应该也醒过来了,继续向下执行,直到 ④ 号代码处希望调用A对象的last()方法——执行该方法之前必须先对A对象加锁,但此时主线程没有释放对A对象的锁——至此,就出现了主线程保持着A对象的锁,等待对B对象加锁,而副线程保持着B对象的锁,等待对A对象加锁,两个线程互相等待对方先释放,所以就出现了死锁。

线程通信

当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但我们可以通过一些机制来保证线程协调运行。

传统的线程通信

假设现在系统中有两个线程,这两个线程分别代表存款者和取钱者——现在假设系统有一种特殊的要求,系统要求存款者和取钱者不断地重复存款、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱。不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。

Object类的 3 个方法:

wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。该wait()方法有 3 种形式——无时间参数的wait(一直等待,直到其他线程通知),带毫秒参数的wait和带毫秒、毫微秒参数的wait(这两种方法都是等待指定时间后自动苏醒)。调用wait()方法的当前线程会释放对该同步监视器的锁定。

notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都在此同步监视器上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该同步监视器的锁定后(使用wait()方法),才可以执行被唤醒的线程。

notifyAll():唤醒在此同步监视器上等待的所有线程。只有当前线程放弃对该同步监视器的锁定后,才可以执行被唤醒的线程。

但这 3 个方法必须由同步监视器对象来调用,这可分成以下两种情况。

  • 对于使用synchronized修饰的同步方法,因为该类的默认实例(this)就是同步监视器,所以可以在同步方法中直接调用这 3 个方法。
  • 对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用该对象调用这 3 个方法。
@Data
public class Account {

  // 封装账户编号、账户余额两个Field
  private String accountNo;
  private double balance;
  // 标识账户中是否已有存款的旗标
  private boolean flag = false;

  public Account() {
  }

  // 构造器
  public Account(String accountNo, double balance) {
    this.accountNo = accountNo;
    this.balance = balance;
  }

  // 因为账户余额不允许随便修改,所以只为balance提供getter方法
  public double getBalance() {
    return this.balance;
  }

  public synchronized void draw(double drawAmount) {
    try {
      // 如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
      if (!flag) {
        wait();
      } else {
        // 执行取钱操作
        System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount);
        balance -= drawAmount;
        System.out.println("账户余额为:" + balance);
        // 将标识账户是否已有存款的旗标设为false
        flag = false;
        // 唤醒其他线程
        notifyAll();
      }
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
  }

  public synchronized void deposit(double depositAmount) {
    try {
      // 如果flag为真,表明账户中已有人存钱进去,存钱方法阻塞
      if (flag)           //①
      {
        wait();
      } else {
        // 执行存款操作
        System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount);
        balance += depositAmount;
        System.out.println("账户余额为:" + balance);
        // 将表示账户是否已有存款的旗标设为true
        flag = true;
        // 唤醒其他线程
        notifyAll();
      }
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
  }

  // 下面两个方法根据accountNo来重写hashCode()和equals()方法
  public int hashCode() {
    return accountNo.hashCode();
  }

  public boolean equals(Object obj) {
    if (this == obj) {
      return true;
    }
    if (obj != null && obj.getClass() == Account.class) {
      Account target = (Account) obj;
      return target.getAccountNo().equals(accountNo);
    }
    return false;
  }
}

程序中的存款者线程循环 100 次重复存款,而取钱者线程则循环 100 次重复取钱,存款者线程和取钱者线程分别调用Account对象的deposit()draw()方法来实现。

取钱账户。

public class DrawThread extends Thread {

  // 模拟用户账户
  private final Account account;
  // 当前取钱线程所希望取的钱数
  private final double drawAmount;

  public DrawThread(String name, Account account, double drawAmount) {
    super(name);
    this.account = account;
    this.drawAmount = drawAmount;
  }

  // 重复100次执行取钱操作
  public void run() {
    for (int i = 0; i < 100; i++) {
      account.draw(drawAmount);
    }
  }
}

存钱账户。

public class DepositThread extends Thread {

  // 模拟用户账户
  private final Account account;
  // 当前存款线程所希望存的钱数
  private final double depositAmount;

  public DepositThread(String name, Account account, double depositAmount) {
    super(name);
    this.account = account;
    this.depositAmount = depositAmount;
  }

  // 重复100次执行存款操作
  public void run() {
    for (int i = 0; i < 100; i++) {
      account.deposit(depositAmount);
    }
  }
}

主程序。

public class DrawTest {

  public static void main(String[] args) {
    // 创建一个账户
    Account acct = new Account("1234567", 0);
    new DrawThread("取钱者", acct, 800).start();
    new DepositThread("存款者甲", acct, 800).start();
    new DepositThread("存款者乙", acct, 800).start();
    new DepositThread("存款者丙", acct, 800).start();
  }
}

运行该程序,会看到如图所示的结果。

image-20220820204844705

注意

阻塞并不是死锁,对于这种情况,取钱者线程已经执行结束,而存款者线程只是在等待其他线程来取钱而已,并不是等待其他线程释放同步监视器。

使用 Condition 控制线程通信

当使用Lock对象来保证同步时,Java 提供了一个Condition类来保持协调,使用Condition可以让那些已经得到 Lock 对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待的线程。

Condition实例被绑定在一个Lock对象上。要获得特定Lock实例的Condition实例,调用Lock对象的newCondition()方法即可。Condition类提供了如下 3 个方法。

  • await():类似于隐式同步监视器上的wait()方法,导致当前线程等待,直到其他线程调用该Condition的signal()方法或signalAll()方法来唤醒该线程。该await()方法有更多变体,如long awaitNanos(long nanosTimeout)void awaitUninterruptibly()awaitUntil(Date deadline)等,可以完成更丰富的等待操作。
  • signal():唤醒在此 Lock 对象上等待的单个线程。如果所有线程都在该Lock对象上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该Lock对象的锁定后(使用await()方法),才可以执行被唤醒的线程。
  • signalAll():唤醒在此 Lock 对象上等待的所有线程。只有当前线程放弃对该Lock对象的锁定后,才可以执行被唤醒的线程。

修改Account类:

@Data
public class Account {

  // 显式定义Lock对象
  private final Lock lock = new ReentrantLock();
  // 获得指定Lock对象对应的Condition
  private final Condition cond = lock.newCondition();
  // 封装账户编号、账户余额两个Field
  private String accountNo;
  private double balance;
  // 标识账户中是否已有存款的旗标
  private boolean flag = false;

  public Account() {
  }

  // 构造器
  public Account(String accountNo, double balance) {
    this.accountNo = accountNo;
    this.balance = balance;
  }

  // 因为账户余额不允许随便修改,所以只为balance提供getter方法
  public double getBalance() {
    return this.balance;
  }

  public void draw(double drawAmount) {
    // 加锁
    lock.lock();
    try {
      // 如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
      if (!flag) {
        cond.await();
      } else {
        // 执行取钱操作
        System.out.println(Thread.currentThread().getName() + " 取钱:" + drawAmount);
        balance -= drawAmount;
        System.out.println("账户余额为:" + balance);
        // 将标识账户是否已有存款的旗标设为false
        flag = false;
        // 唤醒其他线程
        cond.signalAll();
      }
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
    // 使用finally块来释放锁
    finally {
      lock.unlock();
    }
  }

  public void deposit(double depositAmount) {
    lock.lock();
    try {
      // 如果flag为真,表明账户中已有人存钱进去,存钱方法阻塞
      if (flag)           //①
      {
        cond.await();
      } else {
        // 执行存款操作
        System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount);
        balance += depositAmount;
        System.out.println("账户余额为:" + balance);
        // 将表示账户是否已有存款的旗标设为true
        flag = true;
        // 唤醒其他线程
        cond.signalAll();
      }
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
    // 使用finally块来释放锁
    finally {
      lock.unlock();
    }
  }


  // 下面两个方法根据accountNo来重写hashCode()和equals()方法
  public int hashCode() {
    return accountNo.hashCode();
  }

  public boolean equals(Object obj) {
    if (this == obj) {
      return true;
    }
    if (obj != null && obj.getClass() == Account.class) {
      Account target = (Account) obj;
      return target.getAccountNo().equals(accountNo);
    }
    return false;
  }
}

运行该程序的效果与前一个示例程序的运行效果完全一样。

Condition将同步监视器方法(wait()notify()notifyAll())分解成截然不同的对象,以便通过将这些对象与Lock对象组合使用,为每个对象提供多个等待集(wait-set)。在这种情况下,Lock替代了同步方法或同步代码块,Condition替代了同步监视器的功能。

使用阻塞队列(BlockingQueue)控制线程通信

Java 5 提供了一个BlockingQueue接口,虽然BlockingQueue也是Queue的子接口,但它的主要用途并不是作为容器,而是作为线程同步的工具。

BlockingQueue具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。

BlockingQueue提供如下两个支持阻塞的方法。

  • put(E e):尝试把E元素放入BlockingQueue中,如果该队列的元素已满,则阻塞该线程。
  • take():尝试从BlockingQueue的头部取出元素,如果该队列的元素已空,则阻塞该线程。

BlockingQueue继承了Queue接口,可使用Queue接口中的方法。这些方法归纳起来可分为如下 3 组。

  • 在队列尾部插入元素。包括add(E e)offer(E e)put(E e)方法,当该队列已满时,这 3 个方法分别会抛出异常、返回false、阻塞队列。
  • 在队列头部删除并返回删除的元素。包括remove()poll()take()方法。当该队列已空时,这 3 个方法分别会抛出异常、返回false、阻塞队列。
  • 在队列头部取出但不删除元素。包括element()peek()方法,当队列已空时,这两个方法分别抛出异常、返回false

epub_681336_2209

BlockingQueue与其实现类:

epub_681336_2207

以黑色方框框出的都是 Java 7 新增的阻塞队列。

BlockingQueue包含的 5 个实现类。

  • ArrayBlockingQueue:基于数组实现的BlockingQueue队列。

  • LinkedBlockingQueue:基于链表实现的BlockingQueue队列。

  • PriorityBlockingQueue:它并不是标准的阻塞队列。

    PriorityQueue类似,该队列调用remove()poll()take()等方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。

    PriorityBlockingQueue判断元素的大小即可根据元素(实现Comparable接口)的本身大小来自然排序,也可使用Comparator进行定制排序。

  • SynchronousQueue:同步队列。对该队列的存、取操作必须交替进行。

  • DelayQueue:它是一个特殊的BlockingQueue,底层基于PriorityBlockingQueue实现。

    不过, DelayQueue要求集合元素都实现Delay接口(该接口里只有一个long getDelay()方法), DelayQueue根据集合元素的getDalay()方法的返回值进行排序。

public class BlockingQueueTest {

  public static void main(String[] args)
      throws Exception {
    // 定义一个长度为2的阻塞队列
    BlockingQueue bq = new ArrayBlockingQueue<>(2);
    bq.put("Java");// 与bq.add("Java"、bq.offer("Java")相同
    bq.put("Java");// 与bq.add("Java"、bq.offer("Java")相同
    bq.put("Java");//① 阻塞线程
  }
}

利用BlockingQueue来实现线程通信。

class Producer extends Thread {

  private final BlockingQueue<String> bq;

  public Producer(BlockingQueue<String> bq) {
    this.bq = bq;
  }

  public void run() {
    String[] strArr = new String[]{"Java", "Struts", "Spring"};
    for (int i = 0; i < 999999999; i++) {
      System.out.println(getName() + "生产者准备生产集合元素!");
      try {
        Thread.sleep(200);
        // 尝试放入元素,如果队列已满,则线程被阻塞
        bq.put(strArr[i % 3]);
      } catch (Exception ex) {
        ex.printStackTrace();
      }
      System.out.println(getName() + "生产完成:" + bq);
    }
  }
}

class Consumer extends Thread {

  private final BlockingQueue<String> bq;

  public Consumer(BlockingQueue<String> bq) {
    this.bq = bq;
  }

  public void run() {
    while (true) {
      System.out.println(getName() + "消费者准备消费集合元素!");
      try {
        Thread.sleep(200);
        // 尝试取出元素,如果队列已空,则线程被阻塞
        bq.take();
      } catch (Exception ex) {
        ex.printStackTrace();
      }
      System.out.println(getName() + "消费完成:" + bq);
    }
  }
}

public class BlockingQueueTest2 {

  public static void main(String[] args) {
    // 创建一个容量为1的BlockingQueue
    BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
    // 启动3个生产者线程
    new Producer(bq).start();
    new Producer(bq).start();
    new Producer(bq).start();
    // 启动一个消费者线程
    new Consumer(bq).start();
  }
}

线程组和未处理的异常

Java 使用ThreadGroup来表示线程组,它可以对一批线程进行分类管理,Java 允许程序直接对线程组进行控制。

对线程组的控制相当于同时控制这批线程。用户创建的所有线程都属于指定线程组,如果程序没有显式指定线程属于哪个线程组,则该线程属于默认线程组。

在默认情况下,子线程和创建它的父线程处于同一个线程组内,例如 A 线程创建了 B 线程,并且没有指定 B 线程的线程组,则 B 线程属于 A 线程所在的线程组。

一旦某个线程加入了指定线程组之后,该线程将一直属于该线程组,直到该线程死亡,线程运行中途不能改变它所属的线程组。

Thread类提供了如下几个构造器来设置新创建的线程属于哪个线程组。

  • Thread(ThreadGroup group, Runnable target):以targetrun()方法作为线程执行体创建新线程,属于group线程组。
  • Thread(ThreadGroup group, Runnable target, String name):以targetrun()方法作为线程执行体创建新线程,该线程属于group线程组,且线程名为name
  • Thread(ThreadGroup group, String name):创建新线程,新线程名为name,属于group线程组。

Thread类没有提供了一个getThreadGroup()方法来返回该线程所属的线程组,getThreadGroup()方法的返回值是ThreadGroup对象,表示一个线程组。ThreadGroup类提供了如下两个简单的构造器来创建实例。

  • ThreadGroup(String name):以指定的线程组名字来创建新的线程组。
  • ThreadGroup(ThreadGroup parent, String name):以指定的名字、指定的父线程组创建一个新线程组。

线程组通过调用ThreadGroupgetName()方法来获取名字,但不允许改变线程组的名字。

ThreadGroup类提供了如下几个常用的方法来操作整个线程组里的所有线程。

  • int activeCount():返回此线程组中活动线程的数目。

  • interrupt():中断此线程组中的所有线程。

  • isDaemon():判断该线程组是否是后台线程组。

  • setDaemon(boolean daemon):把该线程组设置成后台线程组。

    后台线程组具有一个特征——当后台线程组的最后一个线程执行结束或最后一个线程被销毁后,后台线程组将自动销毁。

  • setMaxPriority(int pri):设置线程组的最高优先级。

class MyThread extends Thread {

  // 提供指定线程名的构造器
  public MyThread(String name) {
    super(name);
  }

  // 提供指定线程名、线程组的构造器
  public MyThread(ThreadGroup group, String name) {
    super(group, name);
  }

  public void run() {
    for (int i = 0; i < 20; i++) {
      System.out.println(getName() + " 线程的i变量" + i);
    }
  }
}

public class ThreadGroupTest {

  public static void main(String[] args) {
    // 获取主线程所在的线程组,这是所有线程默认的线程组
    ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
    System.out.println("主线程组的名字:"
        + mainGroup.getName());
    System.out.println("主线程组是否是后台线程组:"
        + mainGroup.isDaemon());
    new MyThread("主线程组的线程").start();
    ThreadGroup tg = new ThreadGroup("新线程组");
    tg.setDaemon(true);
    System.out.println("tg线程组是否是后台线程组:"
        + tg.isDaemon());
    MyThread tt = new MyThread(tg, "tg组的线程甲");
    tt.start();
    new MyThread(tg, "tg组的线程乙").start();
  }
}

从 Java 5 开始,Java 加强了线程的异常处理,如果线程执行过程中抛出了一个未处理异常,JVM 在结束该线程之前会自动查找是否有对应的Thread.UncaughtExceptionHandler对象,如果找到该处理器对象,则会调用该对象的uncaughtException(Thread t, Throwable e)方法来处理该异常。

Thread.UncaughtExceptionHandlerThread类的一个静态内部接口,该接口内只有一个方法:void uncaughtException(Thread t, Throwable e),该方法中的t代表出现异常的线程,而e代表该线程抛出的异常。

Thread类提供了如下两个方法来设置异常处理器。

  • static setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):为该线程类的所有线程实例设置默认的异常处理器。
  • setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):为指定的线程实例设置异常处理器。

ThreadGroup类实现了Thread.UncaughtExceptionHandler接口,所以每个线程所属的线程组将会作为默认的异常处理器。

当一个线程抛出未处理异常时,JVM 会首先查找该异常对应的异常处理器(setUncaughtExceptionHandler()方法设置的异常处理器),如果找到该异常处理器,则将调用该异常处理器处理该异常;否则,JVM 将会调用该线程所属的线程组对象的uncaughtException()方法来处理该异常。

线程组处理异常的默认流程如下。

  1. 如果该线程组有父线程组,则调用父线程组的uncaughtException()方法来处理该异常。
  2. 如果该线程实例所属的线程类有默认的异常处理器(由setDefaultUncaughtExceptionHandler()方法设置的异常处理器),那么就调用该异常处理器来处理该异常。
  3. 如果该异常对象是ThreadDeath的对象,则不做任何处理;否则,将异常跟踪栈的信息打印到System.err错误输出流,并结束该线程。
class MyExHandler implements Thread.UncaughtExceptionHandler {

  // 实现uncaughtException()方法,该方法将处理线程的未处理异常
  public void uncaughtException(Thread t, Throwable e) {
    System.out.println(t + " 线程出现了异常:" + e);
  }
}

public class ExHandler {

  public static void main(String[] args) {
    // 设置主线程的异常处理器
    Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler());
    int a = 5 / 0;       //①
    System.out.println("程序正常结束!");
  }
}

运行该程序,会看到如下输出:

image-20220822132326025

虽然程序中指定了异常处理器对未捕获的异常进行处理,而且该异常处理器也确实起作用了,但程序依然不会正常结束。这说明异常处理器与通过 catch 捕获异常是不同的——当使用 catch 捕获异常时,异常不会向上传播给上一级调用者;但使用异常处理器对异常进行处理之后,异常依然会传播给上一级调用者。

线程池

Java 语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。

如果可以复用一组线程:

┌─────┐ execute  ┌──────────────────┐
│Task1│─────────>│ThreadPool        │
├─────┤          │┌───────┐┌───────┐│
│Task2│          ││Thread1││Thread2││
├─────┤          │└───────┘└───────┘│
│Task3│          │┌───────┐┌───────┐│
├─────┤          ││Thread3││Thread4││
│Task4│          │└───────┘└───────┘│
├─────┤          └──────────────────┘
│Task5│
├─────┤
│Task6│
└─────┘
  ...

那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。

简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

实现的线程池

Java 5 新增了一个Executors工厂类来产生线程池,该工厂类包含如下几个静态工厂方法来创建线程池。

  • newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。

  • newFixedThreadPool(int nThreads):创建一个可重用的、具有固定线程数的线程池。

  • newSingleThreadExecutor():创建一个只有单线程的线程池,它相当于调用newFixedThread Pool()方法时传入参数为 1。

  • newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。

    corePoolSize指池中所保存的线程数,即使线程是空闲的也被保存在线程池内。

  • newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。

前 3 个方法返回一个ExecutorService对象,该对象代表一个线程池,它可以执行Runnable对象或Callable对象所代表的线程;

而后 2 个方法返回一个ScheduledExecutorService线程池,它是ExecutorService的子类,它可以在指定延迟后执行线程任务。

ExecutorService代表尽快执行线程的线程池(只要线程池中有空闲线程,就立即执行线程任务),程序只要将一个Runnable对象或Callable对象(代表线程任务)提交给该线程池,该线程池就会尽快执行该任务。ExecutorService里提供了如下 3 个方法。

  • Future<?> submit(Runnable task):将一个Runnable对象提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。

    其中Future对象代表Runnable任务的返回值——但run()方法没有返回值,所以Future对象将在run()方法执行结束后返回null

    但可以调用FutureisDone()isCancelled()方法来获得Runnable对象的执行状态。

  • <T> Future<T> submit(Runnable task, T result):将一个Runnable对象提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。

    其中result显式指定线程执行结束后的返回值,所以Future对象将在run()方法执行结束后返回result

  • <T> Future<T> submit(Callable<T> task):将一个Callable对象提交给指定的线程池,线程池将在有空闲线程时执行Callable对象代表的任务。

    其中Future代表Callable对象里call()方法的返回值。

ScheduledExecutorService代表可在指定延迟后或周期性地执行线程任务的线程池,它提供了如下 4 个方法。

  • ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit):指定command任务将在delay延迟后执行。

  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):指定command任务将在delay延迟后执行,而且以设定频率重复执行。

    也就是说,在initialDelay后开始执行,依次在initialDelay+periodinitialDelay+2 * period…处重复执行,依此类推。

  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay,TimeUnit unit):创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都存在给定的延迟。

    如果任务在任一次执行时遇到异常,就会取消后续执行;否则,只能通过程序来显式取消或终止该任务。

当用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,调用shutdown()方法后的线程池不再接收新任务,但会将以前所有已提交任务执行完成。

当线程池中的所有任务都执行完成后,池中的所有线程都会死亡;另外也可以调用线程池的shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

使用线程池来执行线程任务的步骤如下。

  1. 调用Executors类的静态工厂方法创建一个ExecutorService对象,该对象代表一个线程池。
  2. 创建Runnable实现类或Callable实现类的实例,作为线程执行任务。
  3. 调用ExecutorService对象的submit()方法来提交Runnable实例或Callable实例。
  4. 当不想提交任何任务时,调用ExecutorService对象的shutdown()方法来关闭线程池。
public class Main {

  public static void main(String[] args) {
    // 创建一个固定大小的线程池:
    ExecutorService es = Executors.newFixedThreadPool(4);
    for (int i = 0; i < 6; i++) {
      es.submit(new Task("" + i));
    }
    // 关闭线程池:
    es.shutdown();
  }
}

class Task implements Runnable {

  private final String name;

  public Task(String name) {
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("start task " + name);
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    System.out.println("end task " + name);
  }
}

观察执行结果,一次性放入 6 个任务,由于线程池只有固定的 4 个线程,因此,前 4 个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。

线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。

如果把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以 6 个任务可一次性全部同时执行。

如果想把线程池的大小限制在 4 ~ 10 个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

因此,想创建指定动态范围的线程池,可以这么写:

int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,
        60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

ForkJoinPool

Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPoolExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。

  • ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool
  • ForkJoinPool():以Runtime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool

创建了ForkJoinPool实例之后,就可调用ForkJoinPoolsubmit(ForkJoinTask task)invoke (ForkJoinTask task)方法来执行指定任务了。

其中ForkJoinTask代表一个可以并行、合并的任务。

ForkJoinTask是一个抽象类,它还有两个抽象子类:RecursiveActionRecursiveTask

其中RecursiveTask代表有返回值的任务,而RecursiveAction代表没有返回值的任务。

ForkJoinPoolForkJoinTask等类的类图。

epub_681336_2244

下面以执行没有返回值的“大任务”(简单地打印 0~300 的数值)为例,程序将一个“大任务”拆分成多个“小任务”,并将任务交给ForkJoinPool来执行。

class PrintTask extends RecursiveAction {

  // 每个“小任务”最多只打印50个数
  private static final int THRESHOLD = 50;
  private final int start;
  private final int end;

  // 打印从start到end的任务
  public PrintTask(int start, int end) {
    this.start = start;
    this.end = end;
  }

  @Override
  protected void compute() {
    // 当end与start之间的差小于THRESHOLD时,开始打印
    if (end - start < THRESHOLD) {
      for (int i = start; i < end; i++) {
        System.out.println(Thread.currentThread().getName()
            + "的i值:" + i);
      }
    } else {
      // 当end与start之间的差大于THRESHOLD,即要打印的数超过50个时
      // 将大任务分解成两个“小任务”
      int middle = (start + end) / 2;
      PrintTask left = new PrintTask(start, middle);
      PrintTask right = new PrintTask(middle, end);
      // 并行执行两个“小任务”
      left.fork();
      right.fork();
    }
  }
}

public class ForkJoinPoolTest {

  public static void main(String[] args)
      throws Exception {
    ForkJoinPool pool = new ForkJoinPool();
    // 提交可分解的PrintTask任务
    pool.submit(new PrintTask(0, 300));
    pool.awaitTermination(2, TimeUnit.SECONDS);
    // 关闭线程池
    pool.shutdown();
  }
}

程序虽然打印了 0 ~ 299 这 300 个数字,但并不是连续打印的,这是因为程序将这个打印任务进行了分解,分解后的任务会并行执行,所以不会按顺序从 0 打印到 299。

上面定义的任务是一个没有返回值的打印任务,如果任务是有返回值的任务,则可以让任务继承RecursiveTask<T>,其中泛型参数T就代表了该任务的返回值类型。

class CalTask extends RecursiveTask {

  // 每个“小任务”最多只累加20个数
  private static final int THRESHOLD = 20;
  private final int[] arr;
  private final int start;
  private final int end;

  // 累加从start到end的数组元素
  public CalTask(int[] arr, int start, int end) {
    this.arr = arr;
    this.start = start;
    this.end = end;
  }

  @Override
  protected Integer compute() {
    int sum = 0;
    // 当end与start之间的差小于THRESHOLD时,开始进行实际累加
    if (end - start < THRESHOLD) {
      for (int i = start; i < end; i++) {
        sum += arr[i];
      }
      return sum;
    } else {
      // 当end与start之间的差大于THRESHOLD,即要打印的数超过20个时
      // 将大任务分解成两个“小任务”
      int middle = (start + end) / 2;
      CalTask left = new CalTask(arr, start, middle);
      CalTask right = new CalTask(arr, middle, end);
      // 并行执行两个“小任务”
      left.fork();
      right.fork();
      // 把两个“小任务”累加的结果合并起来
      return Integer.parseInt(left.join().toString()) + Integer.parseInt(
          right.join().toString());       //①
    }
  }
}

public class Sum {

  public static void main(String[] args)
      throws Exception {
    int[] arr = new int[100];
    Random rand = new Random();
    int total = 0;
    // 初始化100个数字元素
    for (int i = 0, len = arr.length; i < len; i++) {
      int tmp = rand.nextInt(20);
      // 对数组元素赋值,并将数组元素的值添加到total总和中
      total += (arr[i] = tmp);
    }
    System.out.println(total);
    ForkJoinPool pool = new ForkJoinPool();
    // 提交可分解的CalTask任务
    Future future = pool.submit(new CalTask(arr, 0, arr.length));
    System.out.println(future.get());
    // 关闭线程池
    pool.shutdown();
  }
}

线程相关类

Java 还为线程安全提供了一些工具类,如ThreadLocal类,它代表一个线程局部变量,通过把数据放在ThreadLocal中就可以让每个线程创建一个该变量的副本,从而避免并发访问的线程安全问题。除此之外,Java 5 还新增了大量的线程安全类。

ThreadLocal

ThreadLocal,是 Thread Local Variable(线程局部变量)的意思。线程局部变量(ThreadLocal)的功用其实非常简单,就是为每一个使用该变量的线程都提供一个变量值的副本,使每一个线程都可以独立地改变自己的副本,而不会和其他线程的副本冲突。

从线程的角度看,就好像每一个线程都完全拥有该变量一样。

ThreadLocal类的用法非常简单,它只提供了如下 3 个public方法。

  • T get():返回此线程局部变量中当前线程副本中的值。
  • void remove():删除此线程局部变量中当前线程的值。
  • void set(T value):设置此线程局部变量中当前线程副本中的值。
class Account2 {

  /* 定义一个ThreadLocal类型的变量,该变量将是一个线程局部变量
  每个线程都会保留该变量的一个副本 */
  private final ThreadLocal<String> name = new ThreadLocal<>();

  // 定义一个初始化name属性的构造器
  public Account2(String str) {
    this.name.set(str);
    // 下面代码用于访问当前线程的name副本的值
    System.out.println("---" + this.name.get());
  }

  // name的setter和getter方法
  public String getName() {
    return name.get();
  }

  public void setName(String str) {
    this.name.set(str);
  }
}

class MyTest extends Thread {

  // 定义一个Account属性
  private final Account2 account2;

  public MyTest(Account2 account2, String name) {
    super(name);
    this.account2 = account2;
  }

  public void run() {
    // 循环10次
    for (int i = 0; i < 10; i++) {
      // 当i==6时输出将账户名替换成当前线程名
      if (i == 6) {
        account2.setName(getName());
      }
      // 输出同一个账户的账户名和循环变量
      System.out.println(account2.getName()
          + " 账户的i值:" + i);
    }
  }
}

public class ThreadLocalTest {

  public static void main(String[] args) {
    // 启动两个线程,两个线程共享同一个Account
    Account2 at = new Account2("初始名");
                  /*
                  虽然两个线程共享同一个账户,即只有一个账户名
                  但由于账户名是ThreadLocal类型的,所以每个线程
                  都完全拥有各自的账户名副本,因此在i==6之后,将看到两个
                  线程访问同一个账户时出现不同的账户名
                  */
    new MyTest(at, "线程甲").start();
    new MyTest(at, "线程乙").start();
  }
}

上面Account类中的代码分别完成了创建ThreadLocal对象、从ThreadLocal中取出线程局部变量、修改线程局部变量的操作。

由于程序中的账户名是一个ThreadLocal变量,所以虽然程序中只有一个Account对象,但两个子线程将会产生两个账户名(主线程也持有一个账户名的副本)。

两个线程进行循环时都会在i == 6时将账户名改为与线程名相同,这样就可以看到两个线程拥有两个账户名的情形,如图所示。

image-20220822151334474

如果多个线程之间需要共享资源,以达到线程之间的通信功能,就使用同步机制;如果仅仅需要隔离多个线程之间的共享冲突,则可以使用ThreadLocal

包装线程不安全的集合

ArrayListLinkedListHashSetTreeSetHashMapTreeMap等都是线程不安全的,当多个并发线程向这些集合中存、取元素时,就可能会破坏这些集合的数据完整性。

使用Collections提供的静态方法把这些集合包装成线程安全的集合。Collections提供了如下几个静态方法。

  • <T> Collection<T> synchronizedCollection(Collection<T> c):返回指定collection对应的线程安全的collection
  • static <T> List<T> synchronizedList(List<T> list):返回指定List对象对应的线程安全的List对象。
  • static <K,V> Map<K,V> synchronizedMap(Map<K,V> m):返回指定Map对象对应的线程安全的Map对象。
  • static <T> Set<T> synchronizedSet(Set<T> s):返回指定Set对象对应的线程安全的 Set对象
  • static <K,V> SortedMap<K,V>synchronizedSortedMap(SortedMap<K,V> m):返回指定SortedMap对象对应的线程安全的SortedMap对象。
  • static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s):返回指定SortedSet对象对应的线程安全的SortedSet对象。
// 使用Collections的synchronizedMap方法将一个普通的HashMap包装成线程安全的类
HashMap m = (HashMap) Collections.synchronizedMap(new HashMap());

线程安全的集合类

Java 5 开始,在java.util.concurrent包下提供了大量支持高效并发访问的集合接口和实现类。

epub_681336_2259

这些线程安全的集合类可分为如下两类。

  • Concurrent开头的集合类,如ConcurrentHashMapConcurrentSkipListMapConcurrentSkipListSetConcurrentLinkedQueueConcurrentLinkedDeque
  • CopyOnWrite开头的集合类,如CopyOnWriteArrayListCopyOnWriteArraySet

Concurrent开头的集合类代表了支持并发访问的集合,它们可以支持多个线程并发写入访问,这些写入线程的所有操作都是线程安全的,但读取操作不必锁定。

Concurrent开头的集合类采用了更复杂的算法来保证永远不会锁住整个集合,因此在并发写入时有较好的性能。

当多个线程共享访问一个公共集合时,ConcurrentLinkedQueue是一个恰当的选择。ConcurrentLinkedQueue不允许使用 null 元素。

ConcurrentLinkedQueue实现了多线程的高效访问,多个线程访问ConcurrentLinkedQueue集合时无须等待。

在默认情况下,ConcurrentHashMap支持 16 个线程并发写入,当有超过 16 个线程并发向该Map中写入数据时,可能有一些线程需要等待。

实际上,程序通过设置concurrencyLevel构造参数(默认值为 16)来支持更多的并发写入线程。

HashMap和普通集合不同的是,因为ConcurrentLinkedQueueConcurrentHashMap支持多线程并发访问,所以当使用迭代器来遍历集合元素时,该迭代器可能不能反映出创建迭代器之后所做的修改,但程序不会抛出任何异常。

提示

使用java.util包下的Collection作为集合对象时,如果该集合对象创建迭代器后集合元素发生改变,则会引发ConcurrentModificationException异常。