Java 异步编程初级教程

很多人第一次在 Java 里做异步,都是从 new Thread() 或者“把活丢进线程池”开始的。然后很快就会遇到这些问题:

  • 为什么偶发卡死、吞吐忽高忽低?

  • 为什么异步越写越乱,异常也不知道去哪了?

  • 为什么你以为“异步了”,其实只是“换个线程同步执行”?

这篇文章不讲玄学,也不堆概念。我按真实项目里最常见的路径,带你把 Java 的异步从“能跑”写到“能长期维护”。


1. 先把“异步”说清楚:你到底想解决什么?

异步一般就两类诉求:

1.1 提升响应速度(把耗时操作挪走)

比如 Web 请求里要查库、调外部接口、读文件,你不想让主线程等着。 做法:把耗时部分丢到别的线程,主线程继续干自己的(或尽快返回)。

1.2 提升吞吐(并行做多件事)

比如一次要调 3 个接口、查 2 个库,能并行就别串行。 做法:拆成多个任务并行跑,最后合并结果。

注意:异步不等于更快。CPU 本来就满了,你再异步只会更慢(线程切换、队列堆积)。


2. 最基础的异步:ExecutorService(够用、可靠、别花里胡哨)

2.1 永远别直接 new Thread()(除非你写 demo)

原因很简单:你控制不了线程数量、生命周期、排队策略、拒绝策略。线上一旦流量上来就是事故。

用线程池:

import java.util.concurrent.*;
​
public class AsyncBasic {
    // 生产建议:线程池应该是单例、可监控、可配置。这里写简单点。
    private static final ExecutorService IO_POOL =
            Executors.newFixedThreadPool(16);
​
    public static void main(String[] args) {
        IO_POOL.submit(() -> {
            // 模拟耗时 IO
            sleep(200);
            System.out.println("done");
        });
​
        System.out.println("main thread free");
​
        IO_POOL.shutdown();
    }
​
    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
    }
}

经验

  • submit() 会吞异常(异常会进 Future),你不 get() 就看不到。

  • 线程池要 shutdown()(服务型应用通常在容器生命周期里处理)。

2.2 用 Future 拿结果(但别写成“get 阻塞地狱”)

Future<Integer> f = IO_POOL.submit(() -> {
    sleep(100);
    return 42;
});
​
// ...你可以先干别的
​
try {
    Integer v = f.get(300, TimeUnit.MILLISECONDS);
    System.out.println(v);
} catch (TimeoutException e) {
    System.out.println("timeout");
} catch (Exception e) {
    e.printStackTrace();
}

经验

  • get() 是阻塞的,一旦你到处 get(),异步就变成了“分段同步”。

  • get(timeout) 很重要:避免无限等待。


3. Java 真正常用的异步:CompletableFuture(别怕,它是为了工程化)

如果你只会线程池 + Future,你会发现“组合多个异步任务”非常痛苦。 CompletableFuture 的优势在于:可组合、可链式、异常可控

3.1 最常见:supplyAsync / runAsync

import java.util.concurrent.*;
​
public class CFIntro {
    private static final ExecutorService IO_POOL =
            Executors.newFixedThreadPool(16);
​
    public static void main(String[] args) {
        CompletableFuture<Integer> cf =
                CompletableFuture.supplyAsync(() -> {
                    sleep(120);
                    return 10;
                }, IO_POOL);
​
        cf.thenApply(x -> x * 2)
          .thenAccept(System.out::println)
          .exceptionally(ex -> {
              ex.printStackTrace();
              return null;
          });
​
        sleep(300);
        IO_POOL.shutdown();
    }
​
    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
    }
}

经验

  • 默认不传线程池会用 ForkJoinPool.commonPool(),线上服务我一般不建议“随便用默认”。

  • 业务服务里最好显式指定线程池:你要可控、可隔离、可压测。

3.2 并行跑多个任务,然后合并结果(典型提升吞吐)

CompletableFuture<String> a =
    CompletableFuture.supplyAsync(() -> fetchA(), IO_POOL);
​
CompletableFuture<String> b =
    CompletableFuture.supplyAsync(() -> fetchB(), IO_POOL);
​
CompletableFuture<String> combined =
    a.thenCombine(b, (ra, rb) -> ra + "-" + rb);
​
String result = combined.join(); // join 也是阻塞,但异常是 unchecked

经验

  • join()get() 少写一堆 checked exception,但异常会包一层 CompletionException

  • 合并时要想清楚:其中一个慢/挂了怎么办?有没有降级策略?

3.3 等全部完成:allOf(批量并发最常用)

CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 1, IO_POOL);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 2, IO_POOL);
CompletableFuture<Integer> f3 = CompletableFuture.supplyAsync(() -> 3, IO_POOL);
​
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
​
all.join(); // 等全部完成
​
int sum = f1.join() + f2.join() + f3.join();
System.out.println(sum);

经验

  • allOf 本身不返回结果,需要你自己从各个 future 里取。

  • 一旦其中一个异常,allOf 会异常结束;你要决定是“失败就失败”还是“局部失败可接受”。


4. 异常处理:异步里最容易翻车的地方

异步代码“看起来没报错”,其实异常早就丢了。 你得明确你想怎么处理异常:

  • 记录日志并返回默认值(降级)

  • 直接失败(让上游感知)

  • 重试(谨慎:重试会放大流量)

4.1 exceptionally:兜底

CompletableFuture<Integer> cf =
    CompletableFuture.supplyAsync(() -> 10 / 0, IO_POOL)
        .exceptionally(ex -> {
            // 这里做兜底返回
            System.err.println("failed: " + ex);
            return -1;
        });

4.2 handle:无论成功失败都走一遍(适合统计、打点)

CompletableFuture<Integer> cf =
    CompletableFuture.supplyAsync(() -> 42, IO_POOL)
        .handle((val, ex) -> {
            if (ex != null) return -1;
            return val;
        });

经验

  • exceptionally 只处理异常分支。

  • handle 成功失败都会进,适合做统一收口。


5. 线程池不是随便开的:初学者最该懂的 3 个点

5.1 CPU 密集 vs IO 密集,线程数不是一个套路

  • CPU 密集:线程数接近 CPU 核数(或略多)。

  • IO 密集:线程数可以更大,因为线程大部分时间在等 IO。

但别迷信公式。最靠谱的是:压测 + 监控(队列长度、响应时间、拒绝次数)

5.2 一定要有队列与拒绝策略意识

很多“雪崩”不是线程跑满了,而是队列堆爆,延迟越来越长。 生产线程池建议用 ThreadPoolExecutor 显式配置:

ThreadPoolExecutor pool = new ThreadPoolExecutor(
    8,                 // core
    16,                // max
    60, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000), // 有界队列
    new ThreadFactory() {
        private final ThreadFactory df = Executors.defaultThreadFactory();
        public Thread newThread(Runnable r) {
            Thread t = df.newThread(r);
            t.setName("io-pool-" + t.getId());
            t.setDaemon(false);
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略示例
);

经验

  • 有界队列是关键:无限队列在高峰期会把你拖死。

  • 拒绝策略你要想清楚:是丢弃、报错、还是让调用方“自己跑”(CallerRuns 会反压)。

5.3 线程命名与监控不是“锦上添花”,是救命

线程命名后,线上排查堆栈、死锁、阻塞会省一大半时间。


6. 超时与取消:不做这两件,异步迟早把你拖进坑里

6.1 超时(JDK 9+ 有 orTimeout/completeOnTimeout)

CompletableFuture<String> cf =
    CompletableFuture.supplyAsync(() -> slowCall(), IO_POOL)
        .orTimeout(200, TimeUnit.MILLISECONDS)
        .exceptionally(ex -> "fallback");

如果你是 JDK 8,可以用 ScheduledExecutorService 自己补超时(不展开了,初级先不绕)。

6.2 取消(注意:取消不一定能中断底层 IO)

CompletableFuture<String> cf =
    CompletableFuture.supplyAsync(() -> slowCall(), IO_POOL);

cf.cancel(true);

经验

  • cancel(true) 会尝试中断线程,但底层如果是不可中断 IO(或代码不响应中断),取消只是“标记取消”。

  • 你写耗时循环时,记得检查 Thread.currentThread().isInterrupted()


7. 一个“像样的”实战例子:并行调 3 个服务,带超时与降级

import java.util.concurrent.*;

public class AsyncExample {
    private static final ExecutorService IO_POOL =
            new ThreadPoolExecutor(
                    8, 16,
                    60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(500),
                    r -> {
                        Thread t = new Thread(r);
                        t.setName("io-" + t.getId());
                        return t;
                    },
                    new ThreadPoolExecutor.AbortPolicy()
            );

    public static void main(String[] args) {
        CompletableFuture<String> user =
                CompletableFuture.supplyAsync(AsyncExample::queryUser, IO_POOL)
                        .orTimeout(150, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> "user=unknown");

        CompletableFuture<String> score =
                CompletableFuture.supplyAsync(AsyncExample::queryScore, IO_POOL)
                        .orTimeout(150, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> "score=0");

        CompletableFuture<String> coupon =
                CompletableFuture.supplyAsync(AsyncExample::queryCoupon, IO_POOL)
                        .orTimeout(150, TimeUnit.MILLISECONDS)
                        .exceptionally(ex -> "coupon=none");

        String page = CompletableFuture.allOf(user, score, coupon)
                .thenApply(v -> String.join(", ",
                        user.join(), score.join(), coupon.join()))
                .join();

        System.out.println(page);
        IO_POOL.shutdown();
    }

    static String queryUser() { sleep(80); return "user=alice"; }
    static String queryScore() { sleep(220); return "score=99"; } // 故意超时
    static String queryCoupon() { sleep(40); return "coupon=5off"; }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
    }
}

你会看到:score 超时了,但页面仍然能返回,只是走了降级值。这种模式在服务端很常见:宁可不完整,也别卡死


8. 常见坑(我见过太多次了)

  1. 默认 commonPool 到处用:性能隔离没了,别人一把慢任务能把你拖死。

  2. 无限队列:高峰期延迟从 50ms 变 5s,最终 OOM 或者超时雪崩。

  3. 异步里不打日志、不传 traceId:出了问题你完全不知道哪条链路坏的。

  4. thenApply vs thenCompose 搞混:一个返回值,一个“扁平化 Future”。(下面给你一句话记住)

  5. 在异步回调里做阻塞 IO:你以为在“回调”,其实你又把线程卡住了。

8.1 thenCompose 一句话记法

  • thenApply:把 T -> U

  • thenCompose:把 T -> CompletableFuture<U> 扁平化成一个 future(避免 Future<Future<U>>

CompletableFuture<String> cf =
    CompletableFuture.supplyAsync(() -> "id=1", IO_POOL)
        .thenCompose(id -> CompletableFuture.supplyAsync(() -> "detail(" + id + ")", IO_POOL));

9. 你该怎么选:ExecutorService 还是 CompletableFuture?

  • 你只是“扔出去跑一下”,不需要链式组合:ExecutorService 就够了。

  • 你要并行、合并、降级、链式处理:CompletableFuture 更顺手。

但不管选哪个,底层都是线程池。线程池设计永远比“语法写法”更重要。


10. 给初学者的最后几条建议(不花哨,但管用)

  • 先把线程池搞清楚:有界队列、拒绝策略、线程命名、监控。

  • 异步一定要带:超时、异常处理、必要的降级。

  • 不要到处 join/get,那是把异步写回同步。

  • 用压测说话:线程数不是拍脑袋出来的。

  • 线上排障要靠可观测性:日志、traceId、指标(队列长度、活跃线程、拒绝次数)。


Java 异步编程初级教程
https://www.the2333.com//archives/wei-ming-ming-wen-zhang
作者
Administrator
发布于
2026年01月28日
许可协议