Java 异步编程初级教程
很多人第一次在 Java 里做异步,都是从 new Thread() 或者“把活丢进线程池”开始的。然后很快就会遇到这些问题:
为什么偶发卡死、吞吐忽高忽低?
为什么异步越写越乱,异常也不知道去哪了?
为什么你以为“异步了”,其实只是“换个线程同步执行”?
这篇文章不讲玄学,也不堆概念。我按真实项目里最常见的路径,带你把 Java 的异步从“能跑”写到“能长期维护”。
1. 先把“异步”说清楚:你到底想解决什么?
异步一般就两类诉求:
1.1 提升响应速度(把耗时操作挪走)
比如 Web 请求里要查库、调外部接口、读文件,你不想让主线程等着。 做法:把耗时部分丢到别的线程,主线程继续干自己的(或尽快返回)。
1.2 提升吞吐(并行做多件事)
比如一次要调 3 个接口、查 2 个库,能并行就别串行。 做法:拆成多个任务并行跑,最后合并结果。
注意:异步不等于更快。CPU 本来就满了,你再异步只会更慢(线程切换、队列堆积)。
2. 最基础的异步:ExecutorService(够用、可靠、别花里胡哨)
2.1 永远别直接 new Thread()(除非你写 demo)
原因很简单:你控制不了线程数量、生命周期、排队策略、拒绝策略。线上一旦流量上来就是事故。
用线程池:
import java.util.concurrent.*;
public class AsyncBasic {
// 生产建议:线程池应该是单例、可监控、可配置。这里写简单点。
private static final ExecutorService IO_POOL =
Executors.newFixedThreadPool(16);
public static void main(String[] args) {
IO_POOL.submit(() -> {
// 模拟耗时 IO
sleep(200);
System.out.println("done");
});
System.out.println("main thread free");
IO_POOL.shutdown();
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
}
}经验:
submit()会吞异常(异常会进Future),你不get()就看不到。线程池要
shutdown()(服务型应用通常在容器生命周期里处理)。
2.2 用 Future 拿结果(但别写成“get 阻塞地狱”)
Future<Integer> f = IO_POOL.submit(() -> {
sleep(100);
return 42;
});
// ...你可以先干别的
try {
Integer v = f.get(300, TimeUnit.MILLISECONDS);
System.out.println(v);
} catch (TimeoutException e) {
System.out.println("timeout");
} catch (Exception e) {
e.printStackTrace();
}经验:
get()是阻塞的,一旦你到处get(),异步就变成了“分段同步”。get(timeout)很重要:避免无限等待。
3. Java 真正常用的异步:CompletableFuture(别怕,它是为了工程化)
如果你只会线程池 + Future,你会发现“组合多个异步任务”非常痛苦。 CompletableFuture 的优势在于:可组合、可链式、异常可控。
3.1 最常见:supplyAsync / runAsync
import java.util.concurrent.*;
public class CFIntro {
private static final ExecutorService IO_POOL =
Executors.newFixedThreadPool(16);
public static void main(String[] args) {
CompletableFuture<Integer> cf =
CompletableFuture.supplyAsync(() -> {
sleep(120);
return 10;
}, IO_POOL);
cf.thenApply(x -> x * 2)
.thenAccept(System.out::println)
.exceptionally(ex -> {
ex.printStackTrace();
return null;
});
sleep(300);
IO_POOL.shutdown();
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
}
}经验:
默认不传线程池会用
ForkJoinPool.commonPool(),线上服务我一般不建议“随便用默认”。业务服务里最好显式指定线程池:你要可控、可隔离、可压测。
3.2 并行跑多个任务,然后合并结果(典型提升吞吐)
CompletableFuture<String> a =
CompletableFuture.supplyAsync(() -> fetchA(), IO_POOL);
CompletableFuture<String> b =
CompletableFuture.supplyAsync(() -> fetchB(), IO_POOL);
CompletableFuture<String> combined =
a.thenCombine(b, (ra, rb) -> ra + "-" + rb);
String result = combined.join(); // join 也是阻塞,但异常是 unchecked经验:
join()比get()少写一堆 checked exception,但异常会包一层CompletionException。合并时要想清楚:其中一个慢/挂了怎么办?有没有降级策略?
3.3 等全部完成:allOf(批量并发最常用)
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 1, IO_POOL);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 2, IO_POOL);
CompletableFuture<Integer> f3 = CompletableFuture.supplyAsync(() -> 3, IO_POOL);
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.join(); // 等全部完成
int sum = f1.join() + f2.join() + f3.join();
System.out.println(sum);经验:
allOf本身不返回结果,需要你自己从各个 future 里取。一旦其中一个异常,
allOf会异常结束;你要决定是“失败就失败”还是“局部失败可接受”。
4. 异常处理:异步里最容易翻车的地方
异步代码“看起来没报错”,其实异常早就丢了。 你得明确你想怎么处理异常:
记录日志并返回默认值(降级)
直接失败(让上游感知)
重试(谨慎:重试会放大流量)
4.1 exceptionally:兜底
CompletableFuture<Integer> cf =
CompletableFuture.supplyAsync(() -> 10 / 0, IO_POOL)
.exceptionally(ex -> {
// 这里做兜底返回
System.err.println("failed: " + ex);
return -1;
});4.2 handle:无论成功失败都走一遍(适合统计、打点)
CompletableFuture<Integer> cf =
CompletableFuture.supplyAsync(() -> 42, IO_POOL)
.handle((val, ex) -> {
if (ex != null) return -1;
return val;
});经验:
exceptionally只处理异常分支。handle成功失败都会进,适合做统一收口。
5. 线程池不是随便开的:初学者最该懂的 3 个点
5.1 CPU 密集 vs IO 密集,线程数不是一个套路
CPU 密集:线程数接近 CPU 核数(或略多)。
IO 密集:线程数可以更大,因为线程大部分时间在等 IO。
但别迷信公式。最靠谱的是:压测 + 监控(队列长度、响应时间、拒绝次数)。
5.2 一定要有队列与拒绝策略意识
很多“雪崩”不是线程跑满了,而是队列堆爆,延迟越来越长。 生产线程池建议用 ThreadPoolExecutor 显式配置:
ThreadPoolExecutor pool = new ThreadPoolExecutor(
8, // core
16, // max
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadFactory() {
private final ThreadFactory df = Executors.defaultThreadFactory();
public Thread newThread(Runnable r) {
Thread t = df.newThread(r);
t.setName("io-pool-" + t.getId());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略示例
);经验:
有界队列是关键:无限队列在高峰期会把你拖死。
拒绝策略你要想清楚:是丢弃、报错、还是让调用方“自己跑”(CallerRuns 会反压)。
5.3 线程命名与监控不是“锦上添花”,是救命
线程命名后,线上排查堆栈、死锁、阻塞会省一大半时间。
6. 超时与取消:不做这两件,异步迟早把你拖进坑里
6.1 超时(JDK 9+ 有 orTimeout/completeOnTimeout)
CompletableFuture<String> cf =
CompletableFuture.supplyAsync(() -> slowCall(), IO_POOL)
.orTimeout(200, TimeUnit.MILLISECONDS)
.exceptionally(ex -> "fallback");如果你是 JDK 8,可以用 ScheduledExecutorService 自己补超时(不展开了,初级先不绕)。
6.2 取消(注意:取消不一定能中断底层 IO)
CompletableFuture<String> cf =
CompletableFuture.supplyAsync(() -> slowCall(), IO_POOL);
cf.cancel(true);经验:
cancel(true)会尝试中断线程,但底层如果是不可中断 IO(或代码不响应中断),取消只是“标记取消”。你写耗时循环时,记得检查
Thread.currentThread().isInterrupted()。
7. 一个“像样的”实战例子:并行调 3 个服务,带超时与降级
import java.util.concurrent.*;
public class AsyncExample {
private static final ExecutorService IO_POOL =
new ThreadPoolExecutor(
8, 16,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
r -> {
Thread t = new Thread(r);
t.setName("io-" + t.getId());
return t;
},
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) {
CompletableFuture<String> user =
CompletableFuture.supplyAsync(AsyncExample::queryUser, IO_POOL)
.orTimeout(150, TimeUnit.MILLISECONDS)
.exceptionally(ex -> "user=unknown");
CompletableFuture<String> score =
CompletableFuture.supplyAsync(AsyncExample::queryScore, IO_POOL)
.orTimeout(150, TimeUnit.MILLISECONDS)
.exceptionally(ex -> "score=0");
CompletableFuture<String> coupon =
CompletableFuture.supplyAsync(AsyncExample::queryCoupon, IO_POOL)
.orTimeout(150, TimeUnit.MILLISECONDS)
.exceptionally(ex -> "coupon=none");
String page = CompletableFuture.allOf(user, score, coupon)
.thenApply(v -> String.join(", ",
user.join(), score.join(), coupon.join()))
.join();
System.out.println(page);
IO_POOL.shutdown();
}
static String queryUser() { sleep(80); return "user=alice"; }
static String queryScore() { sleep(220); return "score=99"; } // 故意超时
static String queryCoupon() { sleep(40); return "coupon=5off"; }
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
}
}你会看到:score 超时了,但页面仍然能返回,只是走了降级值。这种模式在服务端很常见:宁可不完整,也别卡死。
8. 常见坑(我见过太多次了)
默认 commonPool 到处用:性能隔离没了,别人一把慢任务能把你拖死。
无限队列:高峰期延迟从 50ms 变 5s,最终 OOM 或者超时雪崩。
异步里不打日志、不传 traceId:出了问题你完全不知道哪条链路坏的。
thenApply vs thenCompose 搞混:一个返回值,一个“扁平化 Future”。(下面给你一句话记住)
在异步回调里做阻塞 IO:你以为在“回调”,其实你又把线程卡住了。
8.1 thenCompose 一句话记法
thenApply:把 T -> UthenCompose:把 T -> CompletableFuture<U> 扁平化成一个 future(避免Future<Future<U>>)
CompletableFuture<String> cf =
CompletableFuture.supplyAsync(() -> "id=1", IO_POOL)
.thenCompose(id -> CompletableFuture.supplyAsync(() -> "detail(" + id + ")", IO_POOL));9. 你该怎么选:ExecutorService 还是 CompletableFuture?
你只是“扔出去跑一下”,不需要链式组合:ExecutorService 就够了。
你要并行、合并、降级、链式处理:CompletableFuture 更顺手。
但不管选哪个,底层都是线程池。线程池设计永远比“语法写法”更重要。
10. 给初学者的最后几条建议(不花哨,但管用)
先把线程池搞清楚:有界队列、拒绝策略、线程命名、监控。
异步一定要带:超时、异常处理、必要的降级。
不要到处
join/get,那是把异步写回同步。用压测说话:线程数不是拍脑袋出来的。
线上排障要靠可观测性:日志、traceId、指标(队列长度、活跃线程、拒绝次数)。