Skip to content

Java 并发编程

编程语言⭐⭐⭐ 高级🔥🔥🔥 高频

💡 核心要点

本页聚焦 synchronized/volatile 之外的并发进阶:线程池的七个参数与执行流程、AQS 的核心机制、CAS 与原子类、ThreadLocal 的内存泄漏风险,以及并发容器与 CompletableFuture 的使用场景。

概念

并发(Concurrency)vs 并行(Parallelism): 并发是多个任务在一段时间内交替执行(单核可实现),并行是多个任务在同一时刻同时执行(需要多核)。

线程安全的三要素:

  • 原子性: 操作不可被中断,要么全做,要么不做(synchronizedAtomicInteger
  • 可见性: 一个线程的修改对其他线程立即可见(volatilesynchronized
  • 有序性: 程序执行顺序与代码顺序一致(volatile 禁止指令重排)

JMM(Java 内存模型): 规定所有变量存储在主内存,每个线程有自己的工作内存(缓存),线程只操作工作内存,写回主内存时才对其他线程可见。

happens-before 原则: JMM 保证以下操作具有可见性保证:程序顺序规则、监视器锁规则(解锁 happens-before 后续加锁)、volatile 变量规则、线程启动/终止规则。

为什么 volatile 不能保证原子性?(资深面试 Top 5

💡 高频追问:"volatile int i; i++ 多线程安全吗?" —— 不安全

volatile 只解决两件事:① 可见性(修改立即对其他线程可见,强制走主存);② 有序性(禁止 JIT / CPU 指令重排)。它根本不管原子性

i++ 的真相 = 3 条字节码

i++ 编译后:
  ① getfield  i   ← 从主内存读 i
  ② iconst_1      ← 压栈常数 1
  ③ iadd          ← 栈顶相加
  ④ putfield  i   ← 写回主内存

两个线程交错执行(即便 i 是 volatile):

线程 A 读 i = 5         ← getfield
                       线程 B 读 i = 5    ← getfield(同样看到 5)
线程 A 算 6, 写回 i = 6  ← putfield
                       线程 B 算 6, 写回 i = 6  ← putfield 覆盖
最终 i = 6,丢了一次 +1

volatile 保证的是"单次读 / 单次写"原子复合操作 读-改-写 不原子

三种正确做法

方案适合
synchronized通用,但重
AtomicInteger.incrementAndGet()首选——底层 CAS
LongAdder.increment()高并发计数(分段累加,比 AtomicLong 快 5-10×)

volatile 的真实价值(不要妖魔化)

虽然不能原子,但解决两个关键问题:

java
// 场景 1:状态标志(写一次,读多次)
class Worker implements Runnable {
    private volatile boolean running = true;        // ★ 不加 volatile 主线程改了 worker 可能永远看不到
    public void stop() { running = false; }
    public void run() {
        while (running) doWork();
    }
}

// 场景 2:禁止指令重排(双重检查锁单例 DCL 必须)
class Singleton {
    private static volatile Singleton instance;     // ★ 这里必须 volatile
    public static Singleton get() {
        if (instance == null) {
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();     // new 不是原子:① 分配内存 ② 初始化 ③ 引用赋值
                }                                   // 没有 volatile → CPU 可能重排成 ①③② → 别的线程拿到未初始化对象
            }
        }
        return instance;
    }
}

标准答题模板

"volatile 只保证可见性和有序性,不保证原子性。i++ 是'读-改-写'三步操作,volatile 只能让这三步单独对其他线程可见,不能让三步合在一起原子。要原子用 AtomicInteger.incrementAndGet()(CAS 实现)或 LongAdder(高并发更快)。

volatile 的真实场景是:① 状态标志(boolean flag)—— 写一次读多次;② DCL 双重检查锁的 instance 字段 —— 防止 new 操作重排。"


核心原理

线程池(ThreadPoolExecutor)

7 个核心参数

java
new ThreadPoolExecutor(
    int corePoolSize,         // 1. 核心线程数,即使空闲也不销毁
    int maximumPoolSize,      // 2. 最大线程数(核心线程 + 非核心线程上限)
    long keepAliveTime,       // 3. 非核心线程空闲存活时间
    TimeUnit unit,            // 4. keepAliveTime 的时间单位
    BlockingQueue<Runnable> workQueue,    // 5. 任务等待队列
    ThreadFactory threadFactory,         // 6. 创建线程的工厂(可自定义线程名)
    RejectedExecutionHandler handler     // 7. 拒绝策略(队列满且线程数已达最大时触发)
);

执行流程

提交任务


当前线程数 < corePoolSize?
    │ YES → 创建核心线程直接执行
    │ NO

workQueue 未满?
    │ YES → 任务入队等待
    │ NO

当前线程数 < maximumPoolSize?
    │ YES → 创建非核心线程执行
    │ NO

执行拒绝策略(RejectedExecutionHandler)

4 种拒绝策略

策略行为适用场景
AbortPolicy(默认)抛出 RejectedExecutionException需要感知拒绝的场景
CallerRunsPolicy由提交任务的线程自己执行减缓提交速度,不丢任务
DiscardPolicy静默丢弃新任务允许丢任务、不关心结果
DiscardOldestPolicy丢弃队列中最老的任务,再尝试提交希望优先保留新任务

为什么不推荐 Executors 工厂方法?

java
// 危险!workQueue 是无界的 LinkedBlockingQueue(容量 Integer.MAX_VALUE)
// 大量任务堆积 → OOM
ExecutorService pool1 = Executors.newFixedThreadPool(10);

// 危险!maximumPoolSize 是 Integer.MAX_VALUE
// 瞬间创建大量线程 → OOM
ExecutorService pool2 = Executors.newCachedThreadPool();

// 推荐:手动创建,明确每个参数
ExecutorService pool = new ThreadPoolExecutor(
    4, 8,
    60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(200),  // 有界队列
    new ThreadFactoryBuilder().setNameFormat("biz-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

AQS(AbstractQueuedSynchronizer)

AQS 是 JDK 并发框架的底层基石,ReentrantLockSemaphoreCountDownLatchReadWriteLock 均基于它实现。

核心思想

AQS 内部维护两个核心结构:

1. int state(volatile)
   - ReentrantLock:0=未锁,>0=重入次数
   - Semaphore:剩余许可数
   - CountDownLatch:剩余计数

2. CLH 等待队列(双向链表)
   HEAD → Node(Thread-A) ↔ Node(Thread-B) ↔ Node(Thread-C) → TAIL
          (持有锁/已出队)     (等待)          (等待)

独占模式 vs 共享模式

模式代表实现描述
独占模式ReentrantLock同一时刻只允许一个线程持有
共享模式SemaphoreCountDownLatch允许多个线程同时持有

ReentrantLock 加锁流程(独占模式)

java
// 简化示意:非公平锁 lock() 的核心逻辑
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // CAS 尝试将 state 从 0 改为 1
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        // 重入:state 累加
        setState(c + acquires);
        return true;
    }
    // 失败 → 进入 CLH 队列挂起等待
    return false;
}

CAS(Compare-And-Swap)

原理

CAS 是硬件级别的原子指令(x86 上是 CMPXCHG),JDK 通过 Unsafe 类暴露给 Java 层。

java
// 语义等价于(但实际是一条不可分割的 CPU 指令):
boolean compareAndSwap(Object obj, long offset, int expected, int update) {
    if (obj.field == expected) {  // 读当前值并比较
        obj.field = update;       // 相等则更新
        return true;
    }
    return false;                 // 不等则失败,调用方重试
}

ABA 问题

线程 1 读到 A,线程 2 将 A → B → A,线程 1 的 CAS 仍然成功,但中间发生了变化。

java
// 解决方案:AtomicStampedReference(带版本号的引用)
AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 0);

// 获取当前值和版本号
int[] stampHolder = new int[1];
Integer value = ref.get(stampHolder);
int stamp = stampHolder[0];

// CAS 时同时比较值和版本号
ref.compareAndSet(value, 200, stamp, stamp + 1);

Atomic 类家族

说明
AtomicInteger / AtomicLong单个整数的原子操作
AtomicReference<V>任意对象的原子引用
AtomicStampedReference<V>带版本号,解决 ABA
LongAdder高并发计数器,分段累加,最终汇总(比 AtomicLong 减少 CAS 竞争)
java
// LongAdder 在高并发下比 AtomicLong 性能更好
LongAdder counter = new LongAdder();
counter.increment();
long total = counter.sum(); // 汇总各分段

ThreadLocal

原理

每个 Thread 对象内部持有一个 ThreadLocalMap,以 ThreadLocal 实例为 key,存储线程私有的值。

Thread-A
  └── ThreadLocalMap
        ├── WeakRef(threadLocal1) → value1
        └── WeakRef(threadLocal2) → value2

Thread-B
  └── ThreadLocalMap
        └── WeakRef(threadLocal1) → value1(独立的副本)
java
// 基本用法
ThreadLocal<String> userCtx = new ThreadLocal<>();

userCtx.set("user-123");        // 设置当前线程的值
String user = userCtx.get();    // 获取当前线程的值
userCtx.remove();               // 用完必须 remove!

内存泄漏风险

ThreadLocalMap 的 key 是 ThreadLocal弱引用,value 是强引用

ThreadLocalMap Entry:
  key   → WeakReference(ThreadLocal 实例)   ← 若外部无强引用,GC 会回收 key(变成 null)
  value → 强引用(用户数据)                 ← key 为 null 后,value 永远无法被访问,但不会被 GC

泄漏场景: 使用线程池时,线程复用不销毁,ThreadLocalMap 长期存在。若忘记调用 remove(),key 被 GC 后 value 会一直留在内存中。

最佳实践:

java
// 每次使用后在 finally 中 remove
try {
    userCtx.set(currentUser);
    doWork();
} finally {
    userCtx.remove(); // 防止内存泄漏和线程复用时数据串用
}

使用场景: 数据库连接(每个线程持有独立 Connection)、用户登录上下文(Spring Security 的 SecurityContextHolder)、MDC 日志追踪 ID。

为什么 key 用 WeakReference,value 用 StrongReference?(资深面试 Top 1

💡 这是 Java 高级面试 被问最多 的设计题,必须能 5 分钟答完整

先理解引用关系全景图
                      Thread 对象(栈帧的根可达)
                          │ 强引用

                    ThreadLocalMap

                  ┌───────┴───────┐
                  │  Entry[i]      │
                  │ ┌──────────┐   │  ★ WeakRef
                  │ │  key  ◯───┼───────────────→ ThreadLocal 实例
                  │ ├──────────┤   │                    ↑ 强引用
                  │ │ value ●───┼─→ User Data            │
                  │ └──────────┘   │            ┌────────┴────────┐
                  └────────────────┘            │  业务代码        │
                                               │ static ThreadLocal│
                                               └─────────────────┘
设计 1:为什么 key 用 WeakRef? —— 防止"忘记 remove 时"的 ThreadLocal 实例泄漏

假设 key 也用强引用

Thread → ThreadLocalMap → Entry → strong key → ThreadLocal 实例
  • 一旦业务代码不再持有 ThreadLocal(如方法局部变量),但 Thread 还活着(线程池复用)
  • 链路 Thread → ThreadLocalMap → Entry → strong key 仍然强引用 ThreadLocal
  • ThreadLocal 永远不能被 GC —— 即使你已经不用它了
  • 结果:ThreadLocal 实例本身泄漏

改用 WeakRef 后

  • 业务代码不再引用 ThreadLocal → 唯一的引用只剩下 ThreadLocalMap Entry 的 WeakRef
  • 下次 GC 时 WeakRef 被自动清理,key 变成 null
  • ThreadLocal 实例被回收,避免类级别的泄漏
设计 2:为什么 value 用 StrongRef? —— 保证业务能正常用到 value

假设 value 也用 WeakRef

java
ThreadLocal<User> ctx = new ThreadLocal<>();
ctx.set(new User("alice"));    // 这里 new User 没有其他强引用
// ... GC 发生 ...
ctx.get();                      // 返回 null!!!业务逻辑彻底坏掉
  • 大多数 ThreadLocal.set(new X()) 的 value 是临时创建的对象,没有外部强引用
  • 用 WeakRef 会让 GC 之后 value 莫名其妙变成 null
  • value 必须是强引用,否则 ThreadLocal 无法可靠工作
设计的真实代价:value 泄漏

WeakRef key 解决了"ThreadLocal 实例泄漏",但留下了"value 泄漏"

GC 后:Entry[i] = (null, value)   ← key 已被回收
       但 ThreadLocalMap → Entry → strong value 链仍在
       value 永远无法访问也永远不会被回收,直到线程死亡
JDK 的"挽救机制":每次 set / get / remove 顺手清理

Java 团队意识到这个问题,所以在 ThreadLocalMap 中实现了启发式清理

触发时机做什么
get(key)找到 hash 槽时,若发现 key 为 null,顺手把这个 Entry 的 value 置 null
set(key, value)探测过程中遇到 key=null 的"过期 Entry",用新 Entry 替换
remove(key)直接清理当前槽 + 触发清理后续 null key 槽
rehash()容量调整时全表扫一遍清空 null key 槽

结论:JDK 已经尽力了,但清理不彻底(依赖运气:取决于你后续是否还访问 ThreadLocal)。业务代码必须主动 remove(),这是契约不是建议

完整设计权衡总结
引用类型实际做法不这样做的代价
key = WeakRef✅ Java 选择用 StrongRef → ThreadLocal 实例本身永远不能 GC
key = SoftRef❌ 不用太晚回收(内存不足才回收);解决不了泄漏
value = StrongRef✅ Java 选择用 WeakRef → GC 后 value 莫名变 null,业务崩溃
value = WeakRef❌ 不用同上
标准答题模板(背下来)

"ThreadLocalMap 的 key 用 WeakReference,是为了防止 ThreadLocal 实例自身泄漏 —— 业务不再持有时 GC 能回收。value 用 StrongReference,是因为 value 通常是用户创建的临时对象,如果用 WeakRef 会导致 get() 莫名返回 null。

但这个设计留下一个尾巴:key 被回收后,value 因为强引用还在 ThreadLocalMap 里。JDK 在 set/get/remove 时会顺手清理这些 key=null 的 Entry,但不彻底——所以业务代码必须主动调用 remove(),尤其是在线程池场景下。

JDK 21 之后这个问题在虚拟线程 + ScopedValue 范式下从根本上消失了:ScopedValue 的作用域是 try-with-resources 语义,离开 Lambda 自动清理。"

使用场景

数据库连接(每个线程持有独立 Connection)、用户登录上下文(Spring Security 的 SecurityContextHolder)、MDC 日志追踪 ID、DateTimeFormatter 缓存。

⚠️ 虚拟线程时代的 ThreadLocal 新考点(JDK 21+)

虚拟线程够轻量,一个进程可能同时存在数百万个,每个线程都背负一个 ThreadLocalMap 会造成: ① 内存爆炸:100 万虚拟线程 × 每线程 1KB ThreadLocal 数据 = 1GB 额外开销; ② 语义变化:传统线程池时代 "一线程贯穿多请求" 的模型在虚拟线程下变成 "一虚拟线程一请求",ThreadLocal 的 "跨调用联动" 用法意义不大。

JDK 21 引入的替代品:ScopedValue(JEP 446 Preview / JEP 481 Second Preview)——不可变、生命周期严格限定在一个 Lambda 范围内,定位虚拟线程的轻量最佳选择:

java
private static final ScopedValue<User> USER = ScopedValue.newInstance();

ScopedValue.where(USER, currentUser).run(() -> {
    // 范围内的代码可用 USER.get() 读取;退出后自动清理。
    doWork();
});
// 优点:① 不可变→线程安全 ② 无需手动 remove(隐含清理) ③ 嵌套作用域天然支持

面试高频追问:虚拟线程 + ThreadLocal 有什么问题?→ 答 内存、语义、生命周期三点,并提 ScopedValue 作为 JDK 21+ 推荐替代。


并发容器

ConcurrentHashMap(JDK 8)

JDK 7 用 Segment 分段锁(16 个段),JDK 8 改为 CAS + synchronized,锁粒度降到单个数组桶:

java
// put 核心逻辑(简化)
if (桶为空) {
    CAS 方式插入节点;           // 无锁竞争时零开销
} else {
    synchronized (桶头节点) {    // 只锁单个桶,其他桶并发不受影响
        插入链表或红黑树;
    }
}

与 HashMap 的区别: 线程安全;不允许 null key/value;size() 返回近似值(并发下精确计数成本高)。

CopyOnWriteArrayList

写时复制:每次修改(add/remove/set)都复制整个底层数组,写完后替换引用。

java
// 读操作无锁,并发性能极高
// 写操作加锁 + 数组复制,适合读多写少
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

缺点: 大数组写操作内存开销大;读操作不保证实时一致性(读的是旧快照)。

BlockingQueue 家族

实现特点典型场景
ArrayBlockingQueue有界、基于数组、公平可选线程池 workQueue,限流
LinkedBlockingQueue可选有界(默认 Integer.MAX_VALUE)、基于链表FixedThreadPool 的默认队列(注意 OOM!)
SynchronousQueue容量为 0,每次 put 必须等 takeCachedThreadPool 的队列,任务直接移交
PriorityBlockingQueue按优先级排序的无界队列带优先级的任务调度
DelayQueue元素到期才可取出延迟任务、缓存过期

CompletableFuture

CompletableFuture 是 JDK 8 引入的异步编排工具,解决了 Future.get() 阻塞和多任务协同的问题。

异步编排

java
// thenApply:串行转换(有返回值)
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> fetchUserId())           // 异步获取用户 ID
    .thenApply(id -> fetchUserName(id))         // 用 ID 查名字(串行)
    .thenApply(name -> "Hello, " + name);       // 拼接字符串

// thenCompose:串行,下一步也是 CompletableFuture(防止嵌套)
CompletableFuture<String> future2 = CompletableFuture
    .supplyAsync(() -> fetchUserId())
    .thenCompose(id -> CompletableFuture.supplyAsync(() -> fetchUserName(id)));

// thenCombine:并行合并两个独立任务的结果
CompletableFuture<Integer> priceF = CompletableFuture.supplyAsync(() -> getPrice());
CompletableFuture<Integer> stockF = CompletableFuture.supplyAsync(() -> getStock());
CompletableFuture<String> result = priceF.thenCombine(stockF,
    (price, stock) -> "价格: " + price + ", 库存: " + stock);

// allOf:等待所有任务完成
CompletableFuture.allOf(priceF, stockF).join();

异常处理

java
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) throw new RuntimeException("失败");
        return "成功";
    })
    // exceptionally:发生异常时返回默认值(只处理异常)
    .exceptionally(ex -> "降级结果: " + ex.getMessage())
    // handle:无论成功失败都执行(类似 finally,可访问结果和异常)
    .handle((result, ex) -> {
        if (ex != null) return "处理异常: " + ex.getMessage();
        return result;
    });

注意: 默认使用 ForkJoinPool.commonPool(),生产环境建议传入自定义线程池,避免影响其他任务。


现代并发工具与高性能模式

JDK 8 之后引入了一系列针对高并发场景的专用工具,能在特定场景下把性能再压榨一个数量级。2025-2026 年大厂 Java 面试中**"AtomicLong 不够用怎么办"、"读多写少用什么锁"、"为什么 Disruptor 比 BlockingQueue 快"** 是高频追问。

LongAdder 原理:分段累加的设计

LongAdder(JDK 8)是 AtomicLong 的高并发替代品。在 64 核机器上计数性能可比 AtomicLong 高 5-10 倍

为什么 AtomicLong 在高并发下会慢

所有线程都 CAS 同一个 long 变量

高并发下大量 CAS 失败 → 重试自旋 → CPU 空转

吞吐量 / 核数曲线变成"先上升后下降"

LongAdder 的解法:分段+按需扩容

AtomicLong(单点竞争):
  Thread1 ↘
  Thread2 → [base]      ← 所有线程争一个变量
  Thread3 ↗

LongAdder(分段写入 + 求和读取):
  Thread1 → [Cell 0]
  Thread2 → [Cell 1]    ← 不同线程落到不同 Cell(按哈希)
  Thread3 → [Cell 2]    ← 完全无竞争
  ...
  sum() = base + Σ Cell  ← 读取时再求和

关键源码思想

java
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // base 上 CAS 失败 → 走 Cell 数组
        boolean uncontended = true;
        if (as == null
            || (m = as.length - 1) < 0
            || (a = as[getProbe() & m]) == null   // 按线程哈希定位 Cell
            || !(uncontended = a.cas(v = a.value, v + x))) {
            longAccumulate(x, null, uncontended); // 扩容 Cell 数组
        }
    }
}

何时用 LongAdder

场景选什么
低并发计数AtomicLong 内存占用更小(1 个 long)
需要"读后立即更新"原子操作AtomicLongincrementAndGet() 返回新值)
高并发只写、读少LongAdder 首选

💡 一句话辨析

"AtomicLong 是单点 CAS,所有线程争同一个变量;LongAdder 是分段写、汇总读,把竞争分散到多个 Cell。但 LongAdder 的 sum() 不是强一致的——读取瞬间可能不等于真实值,所以不能用于"读后立即决策"的场景。"

StampedLock:读多写少的乐观读神器

StampedLock(JDK 8)是为读极多、写极少场景优化的锁。相比 ReentrantReadWriteLock 提供乐观读模式——读时不上锁,写完后再验证。

三种模式

java
StampedLock lock = new StampedLock();

// 1. 写锁(独占)
long stamp = lock.writeLock();
try { /* 写操作 */ }
finally { lock.unlockWrite(stamp); }

// 2. 悲观读锁(共享)
long stamp = lock.readLock();
try { /* 读操作 */ }
finally { lock.unlockRead(stamp); }

// 3. 乐观读(无锁!只标记一个 stamp)
long stamp = lock.tryOptimisticRead();
int currentX = x, currentY = y;          // 不上锁直接读
if (!lock.validate(stamp)) {              // 验证读期间是否有写
    stamp = lock.readLock();              // 失败回退到悲观读
    try { currentX = x; currentY = y; }
    finally { lock.unlockRead(stamp); }
}

三种锁对比

维度synchronizedReentrantReadWriteLockStampedLock
读读并发
乐观读(无锁)
可重入❌(陷阱!)
可中断
条件变量
适用简单同步读多写少读极多 + 短写(如配置/路由表)

⚠️ StampedLock 三大坑

  1. 不可重入——同一线程二次获取会死锁
  2. 不支持 Condition —— 不能做经典的"等待-通知"
  3. 乐观读期间读到的字段可能被部分修改 —— 必须用 final / volatile 保护

生产者-消费者模式(Producer-Consumer)

最经典的并发解耦模式:生产者只造数据、消费者只处理数据,中间通过有界队列隔开,两边速度不一致由队列吸收。Java 的线程池、Reactor 事件循环、Kafka 消费者,本质都是这个模式的不同形态。

模型与三要素

┌──────────┐   put()    ┌─────────────┐   take()   ┌──────────┐
│ Producer │ ────────▶ │  Buffer     │ ────────▶ │ Consumer │
│  (N 个)  │  满则阻塞  │ (有界队列)  │  空则阻塞  │  (M 个)  │
└──────────┘            └─────────────┘            └──────────┘
角色职责阻塞条件
生产者生成任务放进队列队列 → 阻塞 / 丢弃 / 降级
缓冲区线程安全的有界容器
消费者从队列取任务处理队列 → 阻塞

为什么必须有界? 无界队列 = 没有背压,生产者一旦快于消费者就会堆积到 OOM(Executors.newFixedThreadPool 的著名陷阱)。

Java 三种实现

① BlockingQueue(首选)

java
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(1000);

// 生产者
Runnable producer = () -> {
    while (running) {
        Task t = produce();
        queue.put(t);                                // 满则阻塞
        // 或 queue.offer(t, 100, MILLISECONDS);     // 带超时
    }
};

// 消费者
Runnable consumer = () -> {
    while (running) {
        try {
            Task t = queue.take();                   // 空则阻塞
            consume(t);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
};

② BlockingQueue 选型

实现特点适用
ArrayBlockingQueue数组、有界、单锁、可选公平生产首选
LinkedBlockingQueue链表、双锁(put/take 分离)、默认无界吞吐高,但容量必须显式传
SynchronousQueue容量 0,直接交接newCachedThreadPool 底层
LinkedTransferQueuetransfer() 等消费者拿走才返回极致低延迟
PriorityBlockingQueue堆排序、无界任务有优先级
DelayQueue元素到期才能取出定时任务、缓存过期

③ Lock + Condition(手写,面试题原型)

java
public class BoundedBuffer<T> {
    private final Object[] items;
    private int putIdx, takeIdx, count;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int cap) { items = new Object[cap]; }

    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) notFull.await();   // while 防虚假唤醒
            items[putIdx] = x;
            putIdx = (putIdx + 1) % items.length;
            count++;
            notEmpty.signal();                                // 只唤醒消费者
        } finally { lock.unlock(); }
    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) notEmpty.await();
            T x = (T) items[takeIdx];
            items[takeIdx] = null;
            takeIdx = (takeIdx + 1) % items.length;
            count--;
            notFull.signal();
            return x;
        } finally { lock.unlock(); }
    }
}

手写题 4 大易错点

  1. await() 必须用 while 而不是 if —— 防虚假唤醒(spurious wakeup)
  2. 两个独立 Condition(notFull / notEmpty)—— 一个 Condition + signalAll() 会有惊群
  3. 业务处理放在锁外 —— 临界区只保留入队/出队,缩短持锁时间
  4. 释放锁必须放 finally —— 否则异常时永久占锁

工程化关键点

维度方案
优雅关闭毒丸(Poison Pill)/ interrupt() + 检查标志 / ExecutorService.shutdown()
背压策略4 种线程池拒绝策略:Abort / CallerRuns / Discard / DiscardOldest
批量消费queue.drainTo(buffer, 100) 一次取多条,减少锁竞争 10×+
监控指标队列长度、入队/出队 TPS、阻塞次数、平均等待时间
持久化版本Kafka / RocketMQ —— 跨进程、跨机器的"分布式生产者-消费者"

一句话本质

Java 线程池就是生产者-消费者submit() 是生产,Worker 线程是消费者,workQueue 是缓冲区,拒绝策略就是队列满的降级方案。

端到端可运行示例(main + 监控 + 优雅关闭 + 毒丸)

下面是一个完整可运行的版本,包含面试官常追问的所有细节:多生产者多消费者、毒丸停机、监控线程、批量消费、性能统计。

java
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;

public class ProducerConsumerDemo {

    /** 任务对象 */
    record Task(long id, String payload) {}

    /** 毒丸:消费者收到它就退出,避免硬中断丢任务 */
    private static final Task POISON_PILL = new Task(-1L, "POISON");

    public static void main(String[] args) throws InterruptedException {
        final int PRODUCERS = 3;
        final int CONSUMERS = 4;
        final int QUEUE_CAP = 1000;
        final int RUN_SECONDS = 10;

        BlockingQueue<Task> queue = new ArrayBlockingQueue<>(QUEUE_CAP);

        // ====== 指标 ======
        LongAdder produced = new LongAdder();
        LongAdder consumed = new LongAdder();
        LongAdder rejected = new LongAdder();
        volatile boolean running = true;

        // ====== 监控线程 ======
        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(
            r -> { Thread t = new Thread(r, "monitor"); t.setDaemon(true); return t; });
        monitor.scheduleAtFixedRate(() -> System.out.printf(
            "[mon] qsize=%4d  produced=%d  consumed=%d  rejected=%d%n",
            queue.size(), produced.sum(), consumed.sum(), rejected.sum()),
            1, 1, TimeUnit.SECONDS);

        // ====== 启动生产者 ======
        ExecutorService producerPool = Executors.newFixedThreadPool(PRODUCERS,
            namedThread("producer"));
        for (int i = 0; i < PRODUCERS; i++) {
            final int pid = i;
            producerPool.submit(() -> {
                long seq = 0;
                while (running) {
                    Task t = new Task(seq++, "p" + pid + "-" + seq);
                    try {
                        // 满则等待最多 100ms,超时则计入拒绝
                        if (queue.offer(t, 100, TimeUnit.MILLISECONDS)) {
                            produced.increment();
                        } else {
                            rejected.increment();    // 背压:可改成丢弃 / CallerRuns / 入 DB
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            });
        }

        // ====== 启动消费者(批量消费 + 毒丸优雅退出)======
        ExecutorService consumerPool = Executors.newFixedThreadPool(CONSUMERS,
            namedThread("consumer"));
        for (int i = 0; i < CONSUMERS; i++) {
            consumerPool.submit(() -> {
                List<Task> batch = new ArrayList<>(64);
                try {
                    while (true) {
                        // 阻塞拿第一条
                        Task first = queue.take();
                        if (first == POISON_PILL) return;       // ★ 收到毒丸退出
                        batch.add(first);

                        // 顺手批量取(减少锁竞争)
                        queue.drainTo(batch, 63);

                        for (Task t : batch) {
                            if (t == POISON_PILL) return;
                            handle(t);                          // 真正业务
                            consumed.increment();
                        }
                        batch.clear();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // ====== 运行 N 秒后优雅关闭 ======
        Thread.sleep(Duration.ofSeconds(RUN_SECONDS).toMillis());
        System.out.println("[main] stop signal sent");

        running = false;                                        // 1) 生产者退出循环
        producerPool.shutdown();
        producerPool.awaitTermination(5, TimeUnit.SECONDS);

        // 2) 给每个消费者投一颗毒丸
        for (int i = 0; i < CONSUMERS; i++) queue.put(POISON_PILL);

        consumerPool.shutdown();
        consumerPool.awaitTermination(10, TimeUnit.SECONDS);
        monitor.shutdownNow();

        System.out.printf("[main] DONE  produced=%d  consumed=%d  rejected=%d  remain=%d%n",
            produced.sum(), consumed.sum(), rejected.sum(), queue.size());
    }

    private static void handle(Task t) {
        // 模拟业务耗时
        try { Thread.sleep(1); } catch (InterruptedException ignored) {}
    }

    private static ThreadFactory namedThread(String prefix) {
        var counter = new java.util.concurrent.atomic.AtomicInteger();
        return r -> new Thread(r, prefix + "-" + counter.incrementAndGet());
    }
}

重点解读

设计点为什么
offer(t, 100ms) 而非 put()队列满时有限等待 + 计数,不会让生产者无限阻塞 → 实现背压可观测
批量 drainTo(batch, 63)第一条阻塞拿到后顺手取一批,锁竞争减少 64×
毒丸(Poison Pill)退出消费者自然处理完队列剩余消息再退出,不丢任务(vs interrupt() 会丢未处理的)
LongAdder 计数高并发计数比 AtomicLong 性能高 5-10×(分段累加)
shutdown() + awaitTermination()两步走优雅关闭,给业务时间完成;超时再 shutdownNow() 兜底
守护线程做监控主流程退出时监控自动跟着退出,不卡进程

4 种线程池拒绝策略对比(追问必备)

当线程池满 + 队列满,新任务怎么办?RejectedExecutionHandler 决定:

java
new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.CallerRunsPolicy());   // ★ 选一种
策略行为适合
AbortPolicy(默认)RejectedExecutionException失败必须感知
CallerRunsPolicy让提交者自己执行(变相限流)不能丢任务(如日志写入)
DiscardPolicy静默丢弃可丢的监控数据
DiscardOldestPolicy丢队列最老任务,腾位置给新任务只关心最新数据(如行情)

⚠️ 生产推荐 CallerRunsPolicy

  1. 失败有反压(提交者被卡住,自然慢下来)
  2. 不会悄无声息丢任务
  3. 但要确保提交者线程能接受偶尔被业务卡住——一般业务线程都可以;如果提交者是 EventLoop / Netty IO 线程绝对不能用 CallerRuns

现代版生产者-消费者:CompletableFuture / 虚拟线程

JDK 21+ 时代,传统"线程池 + BlockingQueue"在很多场景被两个新方案替代。面试 2026 高频追问:「有了虚拟线程,还需要 BlockingQueue 吗?

版本 A:CompletableFuture 异步编排

适合已知数量的并行任务 fan-out / fan-in(如批量调用 N 个下游接口聚合结果):

java
public class CompletableFutureDemo {

    public static void main(String[] args) {
        // 业务线程池(IO 密集型用大池,CPU 密集型用 ForkJoinPool.commonPool 即可)
        ExecutorService pool = Executors.newFixedThreadPool(16);

        List<Long> userIds = List.of(1L, 2L, 3L, 4L, 5L);

        // fan-out: 并行查 5 个用户
        List<CompletableFuture<User>> futures = userIds.stream()
            .map(id -> CompletableFuture.supplyAsync(() -> fetchUser(id), pool)
                .orTimeout(3, TimeUnit.SECONDS)                  // 单个任务超时
                .exceptionally(ex -> User.empty(id)))            // 兜底
            .toList();

        // fan-in: 等全部完成后聚合
        List<User> users = futures.stream()
            .map(CompletableFuture::join)
            .filter(u -> !u.isEmpty())
            .toList();

        System.out.println("got " + users.size() + " users");
        pool.shutdown();
    }

    record User(Long id, String name, boolean isEmpty) {
        static User empty(Long id) { return new User(id, null, true); }
    }

    static User fetchUser(Long id) {
        try { Thread.sleep(200); } catch (InterruptedException ignored) {}
        return new User(id, "user-" + id, false);
    }
}

与传统生产者-消费者的区别

维度传统 BlockingQueueCompletableFuture
任务数流式无限已知有限(数组 / 列表)
生产-消费解耦生产/消费独立线程任务在 supplyAsync 时就关联好线程池
结果获取消费者无返回值天然有 Future<T> 返回值
错误处理各自 try-catchexceptionally / handle 链式
适合持续投递的事件流批量并行 + 聚合

版本 B:虚拟线程 + 直接 Thread.startVirtualThread(JDK 21+)

虚拟线程的革命:每个任务一个线程变得免费——抛弃线程池 + 队列,直接 spawn:

java
public class VirtualThreadDemo {

    public static void main(String[] args) throws InterruptedException {
        // ① 持续投递的事件流:用虚拟线程替代生产者-消费者
        BlockingQueue<Task> queue = new LinkedBlockingQueue<>();

        // 生产者:1 个虚拟线程持续生产
        Thread producer = Thread.ofVirtual().name("vt-producer").start(() -> {
            for (long i = 0; i < 10_000; i++) {
                queue.offer(new Task(i, "p-" + i));
            }
            queue.offer(POISON_PILL);
        });

        // 消费者:每条消息起一个虚拟线程(不再需要"消费者池"!)
        Thread.ofVirtual().name("vt-dispatcher").start(() -> {
            try {
                while (true) {
                    Task t = queue.take();
                    if (t == POISON_PILL) return;
                    // 每条消息一个虚拟线程处理 —— JDK 21 之前绝对不敢这么写
                    Thread.startVirtualThread(() -> handle(t));
                }
            } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        }).join();
    }

    /** ② 批量任务并行:直接用 newVirtualThreadPerTaskExecutor */
    public static List<User> batchFetch(List<Long> ids) {
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            return ids.stream()
                .map(id -> executor.submit(() -> fetchUser(id)))
                .map(f -> { try { return f.get(); } catch (Exception e) { return User.empty(); } })
                .toList();
        }   // try-with-resources 自动 shutdown + awaitTermination
    }

    record Task(long id, String payload) {}
    record User(Long id, String name) { static User empty() { return new User(-1L, null); } }
    private static final Task POISON_PILL = new Task(-1L, "POISON");

    static void handle(Task t) { try { Thread.sleep(10); } catch (InterruptedException ignored) {} }
    static User fetchUser(Long id) { try { Thread.sleep(50); } catch (InterruptedException ignored) {} return new User(id, "u-" + id); }
}

三方案决策表(2026 必备)

场景推荐方案
持续无界事件流(监控、日志、Kafka 消费)BlockingQueue + 线程池(资源可控)或 虚拟线程 + 直接 spawn(JDK 21+ 简单场景)
已知数量批量任务 fan-out(聚合 N 个 RPC)CompletableFuture + 线程池newVirtualThreadPerTaskExecutor()
高并发 I/O 业务(API 处理)虚拟线程(一行 spring.threads.virtual.enabled=true
CPU 密集型并行ForkJoinPool / parallelStream
百万级 TPS 单机内存队列Disruptor(详见下节)
跨进程跨机器Kafka / RocketMQ / RabbitMQ

💡 「虚拟线程让 BlockingQueue 过时了吗?」标准回答

没有,定位变了:

  • 队列依然是"背压 + 削峰"工具——虚拟线程能轻松创建 100 万个,但不代表下游能处理 100 万 QPS;队列是显式的"系统消化能力"声明
  • 虚拟线程让"消费者池"几乎消失——每条消息直接 Thread.startVirtualThread(),不再为线程数量设计
  • CompletableFuture 仍然有用——它是"异步编排表达式",虚拟线程是"廉价线程",两者正交

Disruptor:单机百万 TPS 的无锁队列

LMAX Disruptor 是 2010 年开源的高性能内存队列,性能比 BlockingQueue 高 10-100 倍。Log4j 2、Apache Storm、Spring Cloud Stream 都在底层使用它。

为什么比 BlockingQueue 快

痛点(BlockingQueue)Disruptor 怎么解
ReentrantLock 锁竞争CAS + 内存屏障,完全无锁
链表节点频繁分配/GC预分配环形数组(Ring Buffer)
多线程访问共享变量导致 伪共享(False Sharing)Cache Line Padding 强制对齐 64 字节
生产者/消费者通过 head/tail 双指针 → 缓存乒乓Sequence 单调递增,独立缓存行

Ring Buffer 核心结构

       producer

   ┌───┬───┬───┬───┬───┬───┬───┬───┐
   │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │  ← 预分配的固定大小数组(2^n)
   └───┴───┴───┴───┴───┴───┴───┴───┘
                ↑              ↑
            consumer 1     consumer 2
  • 数组替代链表:CPU 缓存友好(顺序访问)+ 零 GC 压力
  • 2 的幂次取模index = sequence & (size - 1),比 % 快 10×
  • 多消费者依赖图:Consumer B 可以等 Consumer A 处理完同一事件再处理

最小示例

java
// 1. 定义事件
public static class LongEvent { private long value; /* ... */ }

// 2. 创建 Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(
    LongEvent::new, 1024,           // 工厂 + 环形大小
    DaemonThreadFactory.INSTANCE,
    ProducerType.SINGLE,            // 单/多生产者
    new BusySpinWaitStrategy()      // 等待策略
);

// 3. 注册消费者
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
    System.out.println("Received: " + event.getValue());
});

// 4. 启动 + 生产
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long seq = ringBuffer.next();
try {
    ringBuffer.get(seq).setValue(123L);
} finally {
    ringBuffer.publish(seq);
}

何时选 Disruptor

场景推荐
单 JVM 内、极致低延迟Disruptor(金融交易、日志、游戏循环)
跨进程 / 跨机器Kafka / RocketMQ(持久化 + 网络)
简单线程间通信BlockingQueue(实现简单、足够好)
响应式编程Reactor / RxJava

Caffeine:现代 JVM 缓存事实标准

Caffeine(JDK 8+,Spring Boot 2 起默认本地缓存)已经全面替代 Guava Cache。核心改进是 W-TinyLFU 淘汰算法,命中率比 LRU 高 10-30%。

W-TinyLFU 一句话原理

传统 LRU: 只看"最近用过"
  → 偶发的批量扫描会冲掉热点
  
W-TinyLFU: 看"最近 + 频率"
  ├── Window LRU(1%):保护新进来的"未来可能成为热点"
  └── Main Cache(99%):基于近似计数(Count-Min Sketch)的 LFU
  → 抗扫描污染、命中率高

关键能力对比

维度Guava CacheCaffeine
淘汰算法LRUW-TinyLFU(命中率高 10-30%)
异步加载✅(asyncLoadAll
刷新策略refreshAfterWriterefreshAfterWrite + 后台 refresh
统计简单完整 + 低开销
基准 QPS3-5×
生产推荐已不推荐首选

Spring Boot 一键接入

java
@Configuration
@EnableCaching
public class CacheConfig {
    @Bean
    public CacheManager cacheManager() {
        CaffeineCacheManager mgr = new CaffeineCacheManager("users");
        mgr.setCaffeine(Caffeine.newBuilder()
            .maximumSize(10_000)
            .expireAfterWrite(Duration.ofMinutes(10))
            .refreshAfterWrite(Duration.ofMinutes(5))   // 5 分钟后后台刷新
            .recordStats());
        return mgr;
    }
}

@Service
public class UserService {
    @Cacheable(value = "users", key = "#id")
    public User findUser(Long id) {
        return userRepository.findById(id);
    }
}

Caffeine + Redis 二级缓存(生产黄金组合)

请求 → Caffeine(本地,纳秒级,10K QPS/节点)
        ↓ miss
       Redis(中心,毫秒级,跨节点共享)
        ↓ miss
       DB

详见 缓存策略 — L1/L2 分层缓存

ForkJoinPool:CPU 密集型并行计算

ForkJoinPool 是 JDK 7 引入的"分治并行"线程池,parallelStream()CompletableFuture 默认线程池都是它(commonPool)。

工作窃取(Work-Stealing)核心思想

线程 1 的任务队列: [T1, T2, T3, T4]    (从队尾 push/pop,LIFO)
线程 2 的任务队列: [T5, T6]             (处理完了)

线程 2 从线程 1 的队头偷一个: 偷走 T1

减少了线程闲置,比传统 ThreadPool 利用率高

vs ThreadPoolExecutor

维度ThreadPoolExecutorForkJoinPool
任务队列一个全局队列每个线程独立队列 + 工作窃取
适用任务I/O 密集、独立任务CPU 密集、可分治的任务
任务粒度中等越细越好(递归分治)
典型场景Web 请求处理大数组计算、归并排序、AI 推理

parallelStream 的陷阱

java
// ❌ 错误:用 commonPool 跑慢的 I/O,会拖累整个 JVM
list.parallelStream().forEach(this::callRemoteApi);

// ✅ 正确:传入独立 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(20);
customPool.submit(() -> list.parallelStream().forEach(this::callRemoteApi)).get();

commonPool全 JVM 共享的,被你的慢任务占满后,其他依赖它的代码(包括 CompletableFutureparallelStream)全部受影响。

VarHandle / Atomic 家族对比(JDK 9+)

VarHandle 是 JDK 9 的"sun.misc.Unsafe 的官方替代",提供字段级别的原子操作 + 内存屏障控制

java
// 替代 AtomicReferenceFieldUpdater + Unsafe 的现代方案
private static final VarHandle VALUE_HANDLE;
private volatile long value;
static {
    try {
        VALUE_HANDLE = MethodHandles.lookup()
            .findVarHandle(MyClass.class, "value", long.class);
    } catch (ReflectiveOperationException e) { throw new Error(e); }
}

// 多种内存语义
VALUE_HANDLE.compareAndSet(this, 0L, 1L);          // CAS
VALUE_HANDLE.getAndAdd(this, 5L);                  // 原子加
VALUE_HANDLE.setRelease(this, 10L);                // Release 语义
long v = (long) VALUE_HANDLE.getAcquire(this);     // Acquire 语义

何时用 VarHandle:库作者、需要细粒度内存语义控制。应用层用 AtomicXxx / LongAdder 即可

限流器:从 Guava RateLimiter 到分布式令牌桶

限流是高并发系统的标配防线。Java 单机方案首选 Guava RateLimiter(令牌桶),多实例部署必须升级到 Redis + Lua 的分布式令牌桶才能跨节点共享状态。

四大算法心智模型

算法一句话突发流量平滑度实现复杂度
计数器(固定窗口)每秒清零计数容许❌ 边界双倍流量
滑动窗口用环形数组细分小窗口部分⭐⭐
漏桶(Leaky Bucket)出口恒定速率,水太多就溢出❌ 严格匀速✅✅⭐⭐
令牌桶(Token Bucket)桶里攒令牌,请求扣令牌✅ 允许突发⭐⭐

业务首选令牌桶 —— 既能限平均速率,又允许短时突发(攒着的令牌可一次性消费)。

单机:Guava RateLimiter(令牌桶)

java
// 每秒 100 个令牌,允许预热 1 秒(SmoothWarmingUp 慢启动防冷启动雪崩)
RateLimiter limiter = RateLimiter.create(100, 1, TimeUnit.SECONDS);

if (limiter.tryAcquire(1, 50, TimeUnit.MILLISECONDS)) {
    handleRequest();              // 50ms 内拿到令牌 → 放行
} else {
    rejectFastFail();             // 拿不到 → 立即拒绝
}

两种模式

  • SmoothBursty(默认)—— 允许突发,桶最多攒 permitsPerSecond 个令牌
  • SmoothWarmingUp —— 慢启动,冷却后速率从低到高线性增长,防数据库刚启动被打挂

Guava RateLimiter 的硬伤

  1. 仅单机:8 个实例 × 100 QPS = 实际 800 QPS,与目标偏差 8 倍
  2. 不能动态调整阈值:改值要重启
  3. 不支持分布式协同:跨节点不感知

分布式:Redis + Lua 手写令牌桶(面试高频手写题)

核心思路:把"桶状态(剩余令牌数 + 上次补令牌时间)"放 Redis Hash,每次请求用 Lua 脚本 原子地完成"惰性补令牌 + 扣减 + 写回"。

lua
-- KEYS[1] = bucket:userId
-- ARGV   = capacity, rate(每秒令牌), now_ms, requested
local capacity  = tonumber(ARGV[1])
local rate      = tonumber(ARGV[2])
local now       = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local data = redis.call('HMGET', KEYS[1], 'tokens', 'ts')
local tokens = tonumber(data[1]) or capacity
local lastTs = tonumber(data[2]) or now

-- 惰性补令牌(不开后台线程)
local delta = math.max(0, now - lastTs) * rate / 1000
tokens = math.min(capacity, tokens + delta)

local allowed = 0
if tokens >= requested then
    tokens = tokens - requested
    allowed = 1
end

redis.call('HMSET', KEYS[1], 'tokens', tokens, 'ts', now)
redis.call('PEXPIRE', KEYS[1], 60000)
return allowed

Java 调用端

java
@Component
public class DistributedRateLimiter {
    private final StringRedisTemplate redis;
    private final DefaultRedisScript<Long> script;          // 加载上面的 Lua

    public boolean tryAcquire(String key, int capacity, int rate, int permits) {
        Long ok = redis.execute(script,
            List.of("bucket:" + key),
            String.valueOf(capacity),
            String.valueOf(rate),
            String.valueOf(System.currentTimeMillis()),
            String.valueOf(permits));
        return ok != null && ok == 1L;
    }
}

关键设计点(面试加分)

维度要点
原子性必须 Lua —— GET → 计算 → SET 三步走会产生并发覆盖
时间源用客户端 now 可能时钟漂移,用 Redis TIME 命令更稳但有 RTT
惰性补令牌不开后台定时任务 —— 用 now - lastTs 反推应补多少,省 CPU
降级Redis 挂了 → 回退本地 Guava RateLimiter(双层防线)
热 Key同一 userId 打到同一槽 → 本地预扣 N 个 + 批量同步(类 Sentinel cluster-flow)
Cluster 模式Lua 涉及多 key 时必须 hash tag:bucket:{userId}
生产替代Redisson RRateLimiter / Sentinel 集群流控 / Resilience4j RateLimiter

生产框架对比

框架算法分布式适用
Guava RateLimiter令牌桶单机限流、防自家爬虫
Bucket4j令牌桶✅(Hazelcast/Ignite/Redis)优雅 API、生产推荐
Redisson RRateLimiter令牌桶已用 Redis 的项目
Sentinel滑动窗口 + 令牌桶 + 漏桶✅ 集群流控阿里系、Spring Cloud Alibaba
Resilience4j RateLimiter信号量 + 时间窗口❌(自己接 Redis)函数式风格、与 CircuitBreaker 同体系

一句话答题

"单机用 Guava,多实例必须 Redis+Lua 令牌桶;生产直接上 Sentinel / Bucket4j 别造轮子;时钟漂移和 Redis 宕机要有双层降级。"

选型速查(生产黄金一张表)

场景选什么
高并发计数LongAdder(高并发) / AtomicLong(低并发)
读极多 + 短写(配置/路由表)StampedLock + 乐观读
读多写少(缓存等)ReentrantReadWriteLock
简单线程间通信 / 生产者-消费者ArrayBlockingQueue(有界、单锁、生产首选)
极致单机消息队列Disruptor
本地缓存Caffeine(替代 Guava)
CPU 密集分治计算ForkJoinPool + 自定义实例
库级原子字段 + 内存屏障VarHandle(替代 Unsafe)
单机限流Guava RateLimiter / Bucket4j
分布式限流Redis + Lua 令牌桶 / Sentinel / Redisson RRateLimiter
异步编排CompletableFuture / StructuredTaskScope(JDK 21+)

面试常问 & 怎么答

📝 面试真题4 道高频
1. 线程池的核心参数有哪些?执行流程是什么?困难
2. AQS 是什么?ReentrantLock 怎么基于 AQS 实现的?困难
3. CAS 是什么?有什么问题?中等
4. ThreadLocal 原理?为什么会内存泄漏?中等

Q1: 线程池的核心参数有哪些?执行流程是什么?

7 个参数: corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、keepAliveTime + unit(非核心线程空闲存活时间)、workQueue(任务队列)、threadFactory(线程工厂)、handler(拒绝策略)。

执行流程:

  1. 线程数 < corePoolSize → 创建核心线程直接执行
  2. 队列未满 → 任务入队等待
  3. 队列已满且线程数 < maximumPoolSize → 创建非核心线程执行
  4. 全满 → 触发拒绝策略

为什么不用 Executors? newFixedThreadPool 用无界队列可能堆积任务导致 OOM;newCachedThreadPool 最大线程数无限制可能创建大量线程导致 OOM。

Q2: AQS 是什么?ReentrantLock 怎么基于 AQS 实现的?

AQSAbstractQueuedSynchronizer,并发工具类的底层框架,核心是:

  • volatile int state:表示锁状态
  • CLH 双向等待队列:存放等待线程

ReentrantLock 的实现:

  • lock():调用 tryAcquire(),用 CAS 将 state 从 0 改为 1;若 state > 0 且当前线程是持有者则重入(state 累加);失败则进入 CLH 队列挂起
  • unlock():调用 tryRelease(),state 递减,减到 0 时唤醒队列头部线程

公平锁 vs 非公平锁: 公平锁 tryAcquire() 会先检查 CLH 队列是否有等待线程;非公平锁直接 CAS 抢锁,减少线程切换开销。

Q3: CAS 是什么?有什么问题?

CAS 是 Compare-And-Swap,硬件级别的原子指令。读取当前值与期望值比较,相等才更新,失败则自旋重试。AtomicInteger.incrementAndGet() 底层就是 CAS 循环。

三个问题:

  1. ABA 问题: 值从 A 改成 B 再改回 A,CAS 无法感知。解决:AtomicStampedReference(带版本号)
  2. 自旋开销: 竞争激烈时大量自旋 CPU 空转。解决:改用 LongAdder(分段累加,减少竞争)
  3. 只能保证单个变量的原子性: 多变量需要 synchronizedAtomicReference 封装对象

Q4: ThreadLocal 原理?为什么会内存泄漏?

原理: 每个 Thread 对象持有一个 ThreadLocalMap,以 ThreadLocal 实例的弱引用为 key,以线程私有数据为 value。调用 get()/set() 都是在操作当前线程自己的 Map,线程间完全隔离。

内存泄漏原因:

  • key 是弱引用,外部无强引用时 GC 会回收 ThreadLocal 对象,key 变为 null
  • value 是强引用,key 为 null 后 value 永远无法被访问,也无法被 GC
  • 线程池中线程长期存活,ThreadLocalMap 不销毁,泄漏持续积累

解决方案: 使用完毕后在 finally 块中调用 threadLocal.remove(),彻底清除 entry。


深度图解

AQS 核心流程

CLH 队列节点 waitStatus 含义:

状态值名称含义
0初始节点刚入队
1CANCELLED因超时或中断取消,需从队列移除
-1SIGNAL后继节点需要被唤醒
-2CONDITION节点在条件队列中等待
-3PROPAGATE共享模式下需传播唤醒

线程池任务执行完整流程


CAS 与 ABA 问题

解决方案:AtomicStampedReference(版本戳)

java
AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

// 线程T1读取值和版本
String oldVal = ref.getReference();  // "A"
int oldStamp = ref.getStamp();       // 0

// 线程T2执行 A→B→A,版本变为 2
// T1 再次 CAS,值相同但版本已变,失败
boolean ok = ref.compareAndSet("A", "C", 0, 1); // false,stamp 已是 2

ThreadLocal 内存泄漏原理

规避规则: 使用完 ThreadLocal 后必须调用 remove(),尤其在线程池场景(线程复用,ThreadLocalMap 长期存活)。


看到什么就先想到这类

见到这个关键词先想到
线程池、任务队列、拒绝、OOMThreadPoolExecutor 7 参数 + 有界队列
公平锁、可重入、条件变量ReentrantLock → AQS state + CLH 队列
并发计数、许可、倒计时Semaphore/CountDownLatch → AQS 共享模式
无锁、原子操作、自旋CAS → AtomicXxx / LongAdder
ABA、版本号AtomicStampedReference
线程私有数据、用户上下文ThreadLocal → 记得 remove()
高并发 MapConcurrentHashMap(CAS + 桶级 synchronized)
读多写少、快照CopyOnWriteArrayList
生产者-消费者BlockingQueueArrayBlockingQueue 有界)
异步任务、回调、串并行编排CompletableFuture