并发基础 - 1

参考文献

并行与并发的区别在于,并行是同时执行多个任务的能力,并发是程序交替执行多个任务的能力。

在单核CPU上的多线程程序与js在浏览器中的异步操作为并发但不并行的操作,多核CPU上的Web服务器与分布式系统上的微服务即为既并发又并行的场景。

创建

线程的创建有三种方式。

第一种是重写父类Thread的run()方法,并且调用start()方法启动线程。

1
2
3
4
5
6
7
8
9
10
class ThreadTask extends Thread {
public void run() {
// Task...
}

public static void main(String[] args) {
ThreadTask task = new ThreadTask();
task.start();
}
}

该方法的缺点是如果ThreadTask已经继承了另外一个类,就不能再继承Thread类了。

第二种是实现Runnable接口的run()方法。并将实现类的对象作为参数传递给Thread对象的构造方法,最后调用start()方法启动线程。

1
2
3
4
5
6
7
8
9
10
11
class RunnableTask implements Runnable {
public void run() {
// Task...
}

public static void main(String[] args) {
RunnableTask task = new RunnableTask();
Thread thread = new Thread(task);
thread.start();
}
}

第三种需要重写Callable接口的call()方法,然后创建FutureTask对象,参数为Callable实现类的对象,紧接着创建Thread对象,参数为FutureTask对象,然后调用start()方法启动线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
class CallableTask implements Callable<String> {
public String call() {
// Task...
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableTask task = new CallableTask();
FutureTask<String> futureTask = new FutureTask<>(task);
Thread thread = new Thread(futureTask);
thread.start();
System.out.println(futureTask.get());
}
}

该方法能获取线程的执行结果。

创建线程的时候,至少需要分配一个虚拟机线,在64位操作系统中,默认大小为1M,因此一个线程大约需要1M内存。当然,由于JVM、操作系统本身运行也需要内存,因此只给这么大的内存卡跑到理论值肯定是不够的。

调用start()的时候会执行run()方法,那为什么不直接调用run()方法?因为start()方法是启动一个新的线程,然后让这个线程去执行run()方法。而run()方法本身只是在当前线程中调用的一个普通方法。
调用start()后,线程进入就绪状态,等待操作系统调度;一旦调度执行,线程会执行其run()方法中的代码。

调度

类别方法说明
线程状态控制方法start()启动线程,使其进入就绪状态,等待CPU调度
run()线程执行体,定义线程要执行的任务逻辑
线程休眠sleep()线程休眠,参数填毫秒级时间
线程等待wait()必须在synchronized块中使用,
让当前线程进入WAITING或TIMED_WAITING状态,
需要被notify()或notifyAll()唤醒,
参数可指定毫秒级最长等待时间
join()让当前线程等待目标线程执行完毕,
参数可指定毫秒级等待时间
线程唤醒notify()唤醒单个等待线程
notifyAll()唤醒所有等待线程
线程让步与中断yield()线程让步,提示可让出CPU(但不保证立刻让出,
线程从RUNNING回到RUNNABLE状态
interrupt()中断线程
isInterrupted()检查中断状态
interrupted()检查并清除中断状态
线程优先级setPriority()设置线程优先级,参数从1到10,
优先级只是建议,不保证执行顺序
getPriority()获取线程优先级
守护线程setDaemon()设置守护线程,
当所有非守护线程结束时,JVM会自动退出,不管守护线程是否执行完毕,
参数为布尔值
isDaemon()检查是否为守护线程

当线程A调用共享对象的wait()方法时,线程A会被挂起,直到

  • 线程B调用了共享对象的notify()方法或者notifyAll()方法;
  • 其他线程调用线程A的 interrupt() 方法,导致线程A抛出 InterruptedException 异常。

notify()唤醒哪个正在wait()的线程是随机的。

interrupt()方法能中断线程,但它只是改变中断状态,不会中断一个正在运行的线程,需要线程自行处理中断标志。若中断一个已经中断或正在等待的线程,就会抛异常。

守护线程的作用大多是为其他线程提供服务。

OS课上的不大一样,Java中管理线程有6种状态

6种状态变化

线程上下文切换是指CPU从一个线程切换到另一个线程执行的过程。而多核处理器的每个核心都可以独立执行一个或多个线程,可以让一些并发任务变成并行任务。

通信

上文中提到的wait()notify()之间的互动就是一种线程通信方式。除此之外,还有许多种通信方式。

  • 阻塞队列
    这是最常用的通信方式(毕竟用上消息队列就得上redis或者rabbit MQ了)。可以简单实现生产者消费者模式,也不用死记那两个锁的作用了。
    常用的示例有ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue等。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    new Thread(() -> {
    try {
    String data = "message";
    queue.put(data);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    }).start();

    new Thread(() -> {
    try {
    String data = queue.take();
    System.out.println("收到: " + data);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    }).start();
  • 信号量
    依旧是经典的信号量和PV操作。控制并发访问资源的线程数量。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    Semaphore semaphore = new Semaphore(5); // 最大5个许可

    for (int i = 0; i < 10; i++) {
    new Thread(() -> {
    try {
    semaphore.acquire(); // 获取许可
    // 使用数据库连接
    System.out.println(Thread.currentThread().getName() + " 使用连接");
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    } finally {
    semaphore.release(); // 释放许可
    }
    }).start();
    }

    Semaphore 可以用于流量控制,比如数据库连接池、网络连接池等。假如有这样一个需求,要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取。但是在读到内存后,需要存储到数据库,而数据库连接数是有限的,比如说只有 10 个,那我们就必须控制线程的数量,保证同时只有 10 个线程在使用数据库连接。这个时候,就可以使用 Semaphore 来做流量控制。

  • Exchanger 交换器
    两个线程在同步点交换数据。一个线程调用exchange()方法时会阻塞,直到另一个线程也调用exchange()方法,然后两个线程交换数据后继续执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    Exchanger<String> exchanger = new Exchanger<>();

    new Thread(() -> {
    try {
    String data = "来自线程1的数据";
    String response = exchanger.exchange(data);
    System.out.println("线程1收到: " + response);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    }).start();

    new Thread(() -> {
    try {
    String data = "来自线程2的数据";
    String response = exchanger.exchange(data);
    System.out.println("线程2收到: " + response);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    }
    }).start();

    Exchanger 可以用于遗传算法,也可以用于校对工作,比如我们将纸制银行流水通过人工的方式录入到电子银行时,为了避免错误,可以录入两遍,然后通过 Exchanger 来校对两次录入的结果。

多个线程可以通过volatile关键字和synchronized关键字来实现通信。volatile关键字确保变量的可见性,synchronized关键字确保线程之间的互斥访问。

volatile可以用于修饰成员变量,告知程序任何对该变量的访问均需要从共享内存中获取最新值,而不是从线程的本地缓存中获取。

synchronized关键字可以用于修饰方法或代码块,确保同一时刻只有一个线程可以访问被修饰的代码,从而避免数据竞争和不一致的问题。

CompletableFuture类提供了一种更简洁和强大的方式来处理异步编程和线程间通信。它允许线程在完成计算后将结果传递给其他线程,并支持链式调用和组合多个异步任务。

1
2
3
4
5
6
7
8
9
10
11
12
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
}).thenApply(result -> {
return result + " World";
}).thenAccept(finalResult -> {
System.out.println(finalResult);
});

保证线程安全

Java中提供了许多方法来保证线程安全。

  • 可以在代码块或方法上使用synchronized关键字。
  • 使用ReentrantLock类来显式地加锁和解锁,这种锁还支持并发重入。
  • 保证变量的可见性,可以使用volatile关键字修饰变量。
  • 使用Atomic类(如AtomicIntegerAtomicLong等)
  • 对于线程独立的数据,可以使用ThreadLocal类来存储和访问这些数据。
  • 对于需要并发容器的地方,可以使用ConcurrentHashMapCopyOnWriteArrayList等线程安全的集合类。

一个int的变量为0,10个线程轮流对它进行++操作,循环一万次,结果会小于十万。原因是多线程环境下,++操作不是原子操作,可能会出现多个线程同时读取和写入变量的情况,导致数据丢失。
一般的++操作可以分解为三个步骤:

  1. 读取变量的当前值
  2. 对值进行加1操作
  3. 将新的值写回变量
    在多个线程并发进行++操作时,可能会出现以下情况:
  • 线程A读取变量的值为0
  • 线程B读取变量的值也为0
  • 线程A对值进行加1操作,得到1
  • 线程B对值进行加1操作,得到1
  • 线程A将1写回变量
  • 线程B将1写回变量
    最终变量的值为1,而不是预期的2。因此最后需要通过某种方式保证++操作的原子性,比如使用synchronized关键字、ReentrantLock类或者AtomicInteger类。

如果多个线程同时尝试创建实例,单例类必须确保只创建一个,并提供一个全局访问点。在多种实现单例类的方式中,饿汉式是一种比较直接的实现方式。

1
2
3
4
5
6
7
8
9
public class Singleton {
private static final Singleton instance = new Singleton();

private Singleton() {}

public static Singleton getInstance() {
return instance;
}
}

饿汉式单例则在第一次使用时初始化单例对象,这种方式需要使用双重检查锁定来确保线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Singleton {
private static volatile Singleton instance;

private Singleton() {}

public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}

ThreadLocal

ThreadLocal类提供了一种线程本地存储机制,使得每个线程都可以拥有自己的独立变量副本,避免了多线程环境下的共享变量带来的数据竞争和不一致问题。

ThreadLocal的操作有4种方法:

  1. set(T value):设置当前线程的局部变量值。
  2. get():获取当前线程的局部变量值。
  3. remove():移除当前线程的局部变量值。
  4. initialValue():提供初始值的方式,子类可以重写该方法。

在Web应用中,可以使用ThreadLocal存储用户对话信息,这样每个线程在处理请求时都可以访问自己的用户信息,而不会影响其他线程。很多场景的cookie、session等信息都可以通过ThreadLocal来存储。

在数据库操作中,可以使用ThreadLocal存储数据库连接对象,确保每个线程使用自己的连接,避免连接冲突,以及避免多线程竞争下同一数据库链接的部分问题。

ThreadLocal中,每个线程的访问的变量的副本都是独立的,避免了共享变量引起的线程安全问题。ThreadLocal可用于跨方法、跨类时间传递数据,避免了通过方法参数传递数据的繁琐。

当我们创建一个ThreadLocal变量时,实际上是为每个线程创建了一个独立的副本,这些副本存储在每个线程的ThreadLocalMap中。

ThreadLocalMapThreadLocal类的一个内部类,它是一个哈希表,用于存储每个线程的ThreadLocal变量及其对应的值。ThreadLocalMap的键是ThreadLocal对象,值是对应的变量值。键值对继承了弱引用的特性,当ThreadLocal对象不再被引用时,垃圾回收器可以回收它,从而避免内存泄漏。

强引用与弱引用的区别详见jvm篇。

ThreadLocalMap的key是弱引用,但value是强引用,因此如果不手动调用remove()方法,value会一直存在,导致内存泄漏。因此需要在不需要使用ThreadLocal变量时,调用remove()方法来清除当前线程的局部变量值。remove()方法会调用ThreadLocalMapremove()方法,删除当前线程的ThreadLocal变量及其对应的值。

将key设计成弱引用的好处是,jvm能够及时回收掉弱引用的对象。一旦key被回收,ThreadLocalMap在进行set()get()操作时会发现key为null,然后会将该entry删除,从而避免内存泄漏。

ThreadLocal也有许多改进方案:

  • 在Netty中的FastThreadLocal,内部维护了一个索引常量index,每次创建FastThreadLocal中都会自动+1,用来取代hash冲突带来的损耗,用空间换时间。

  • 而阿里的TransmittableThreadLocal,不仅实现了子线程可以继承父线程 ThreadLocal的功能,并且还可以跨线程池传递值。

ThreadLocalMap底层的数据结构是一个数组,也是一个简单的线性探测表。毕竟ThreadLocalMap设计的目的是存储线程私有数据,不会有大量的key,设计得太复杂没必要。
ThreadLocalMap并不会直接在元素数量达到阈值时立即扩容,而是首先清理掉那些key为null的entry,然后在填充率达到四分之三时扩容。扩容时,会将数组长度翻倍,并重新计算每个entry的位置,重新放入。

注意,父线程不能用ThreadLocal给子线程传值。子线程不会继承父线程的ThreadLocalMap,可以使用InheritableThreadLocal类来实现父线程向子线程传递ThreadLocal变量。因为子线程在创建的时候会拷贝父线程的InheritableThreadLocalMap,从而实现继承。

Fork/Join

该框架主要用于分治算法的并行执行,可以将一个大任务拆分成多个小任务并行处理。其底层结构是个特殊的线程池——ForkJoinPool,且使用了工作窃取算法(一个线程执行完自己的任务后,可以窃取其他线程的任务,避免线程闲置)。


并发基础 - 1
https://ivanclf.github.io/2025/09/29/concurrent/
作者
Ivan Chan
发布于
2025年9月29日
许可协议