Content Table

Spring 与多线程

直接创建线程

1
2
3
4
5
6
7
8
9
10
11
12
public void thread1() {
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
System.out.println("thread-1: " + i);
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

使用 @Async

使用 @Async 异步执行方法,只需要 2 步:

  1. 使用 @EnableAsync 开启
  2. 然后自己在方法上使用注解 @Async ,方法被调用时就能异步执行了
1
2
3
4
5
6
7
@SpringBootApplication
@EnableAsync
public class DemoThreadApplication {
public static void main(String[] args) {
SpringApplication.run(DemoThreadApplication.class, args);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 使用 @Async 创建线程
*/
@Async
public void thread2() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("thread-2: " + i);
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@ASync 的执行器

以前的版本 @Async 默认使用 SimpleAsyncTaskExecutor 作为执行器,每次提交都直接创建一个新线程, 新版 Spring Boot 中默认使用 ThreadPoolTaskExecutor。

怎么判断使用的是哪一个执行器呢?

  • 看文档是一种方式

  • 在代码中验证: 在代码中注入 AsyncListenableTaskExecutor 对象,在 @Async 的方法中打印它的信息

    1
    2
    3
    4
    5
    6
    7
    @Autowired
    private AsyncListenableTaskExecutor executor;

    @Async
    public void thread2() {
    System.out.println(Thread.currentThread());
    }

@Async 自定义执行器

只创建一个 Executor 对象注入到 Spring 容器中,所有 @Async 注解的方法默认都会使用这个执行器:

1
2
3
4
@Bean
public Executor simpleAsyncTaskExecutor() {
return new SimpleAsyncTaskExecutor();
}

如果创建了多个 Executor 对象,可以实现接口 AsyncConfigurer 指定默认的执行器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableAsync
public class DemoThreadApplication implements AsyncConfigurer {
// 创建默认的执行器
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize(); // 提示: 必须执行这个语句,否则报错
return executor;
}

public static void main(String[] args) {
SpringApplication.run(DemoThreadApplication.class, args);
}
}

@Async 指定执行器

当创建了多个执行器,给 @Async 传入需要使用的执行器的名字:

1
2
3
4
@Async("simpleAsyncTaskExecutor")
public void thread2() {
System.out.println(Thread.currentThread());
}

调用类自己的 @Async 的方法

需要使用 @Lazy @Autowired 延迟创建的方式注入类自己的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class ZooService {
@Lazy
@Autowired
private ZooService self;

public void thread2() {
System.out.println(Thread.currentThread());
self.doAsync();
}

@Async
public void doAsync() {
System.out.println("doAsync: " + Thread.currentThread());
}
}

提示: 直接调用 this.doAsync() 的话 @Async 不生效,涉及到 Spring AOP 的实现原理,调用的是类原来的方法,而不是经过 AOP 生成的方法。

使用线程池

可以使用 Executors 创建线程池,然后提交 Runnable 或者 Callable:

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class ZooService {
ExecutorService executeService = Executors.newSingleThreadExecutor();

public void thread2() {
System.out.println("Outer: " + Thread.currentThread());

executeService.submit(() -> {
System.out.println("Inner: " + Thread.currentThread());
});
}
}

Executors 提供了多种创建 ExecutorService 的方法,根据不同的用途进行使用:

线程间传递 ThreadLocal 变量

线程间需要传递的 ThreadLocal 可以使用 java.lang.InheritableThreadLocal,手动创建线程、线程池、@Async 等场景下都生效:

This class extends ThreadLocal to provide inheritance of values from parent thread to child thread: when a child thread is created, the child receives initial values for all inheritable thread-local variables for which the parent has values. Normally the child’s values will be identical to the parent’s; however, the child’s value can be made an arbitrary function of the parent’s by overriding the childValue method in this class.
Inheritable thread-local variables are used in preference to ordinary thread-local variables when the per-thread-attribute being maintained in the variable (e.g., User ID, Transaction ID) must be automatically transmitted to any child threads that are created.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class ZooService {
public static final ThreadLocal<String> user = new InheritableThreadLocal<>();
// public static final ThreadLocal<String> user = new ThreadLocal<>();

ExecutorService executeService = Executors.newSingleThreadExecutor();

public void thread2() {
System.out.println("Outer: " + Thread.currentThread());
ZooService.user.set("Alice");

executeService.submit(() -> {
System.out.println("Inner: " + Thread.currentThread());
System.out.println(ZooService.user.get());
});
}
}

等待多个线程结束后继续执行

常规有两种实现方式,CountDownLatch 和 CompletionService。

CountDownLatch 的使用步骤:

  1. 创建一个 CountDownLatch: c = new CountDownLatch(5)
  2. 线程 A 中创建并启动 5 个线程
  3. 线程 A 中调用 c.await(),等待
  4. 当 5 个线程都调用了 c.countDown() 后,线程 A 继续往下执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Service
public class ZooService {
private final Random random = new Random();

public void thread2() throws InterruptedException {
System.out.println("thread2 start");
CountDownLatch latch = new CountDownLatch(5);

// 启动 5 个线程异步执行
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
String name = Thread.currentThread().getName();
System.out.println("Start: " + name);

try {
int time = random.nextInt(10);
System.out.printf("Thread %s waits %d seconds\n", name, time);
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}

System.out.println("End: " + name);
}, "Thread-" + i).start();
}

// 等待直到 latch.getCount() 为 0
latch.await();
System.out.println("thread2 end");
}
}

CompletionService 的使用步骤:

  1. 使用线程池创建 CompletionService
  2. 异步执行任务
  3. 获取结果
  4. 关闭线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Service
public class ZooService {
private final Random random = new Random();

public void thread2() throws InterruptedException, ExecutionException {
System.out.println("thread2 start");

// 1. 使用线程池创建 CompletionService
ExecutorService exec = Executors.newFixedThreadPool(5);
CompletionService<Integer> service = new ExecutorCompletionService<>(exec);

// 2. 异步执行任务
for (int i = 1; i <= 5; i++) {
service.submit(() -> {
String name = Thread.currentThread().getName();
System.out.println("Start: " + name);
int time = random.nextInt(10);

try {
System.out.printf("Thread %s waits %d seconds\n", name, time);
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("End: " + name);
return time;
});
}

// 3. 获取结果
for (int i = 1; i <= 5; i++) {
Integer result = service.take().get(); // 获取已经执行完成的任务的结果
System.out.println("Result: " + result);
}

System.out.println("thread2 end");

// 4. 关闭线程池
exec.shutdown();
}
}

多个线程等待达到某个条件后一起继续执行

可以使用 CyclicBarrier 实现这个功能,使用步骤:

  1. 创建一个 CyclicBarrier: b = new CyclicBarrier(5)
  2. 创建并启动 5 个线程
  3. 这 5 个线程中调用 b.await(),等待
  4. 当 5 个线程中都调用了 b.await() 时这 5 个线程一起往下执行
  5. 如果前 4 个线程都执行到了 b.await(),但第 5 个线程却没有调用 b.await(),而是遇到问题,转而去调用了 b.reset(), 那么,前面 4 个阻塞在 await() 方法上的线程将抛出 BrokenBarrierException 异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Service
public class ZooService {
private final Random random = new Random();

public void thread2() {
System.out.println("thread2 start");

CyclicBarrier b = new CyclicBarrier(5);

for (int i = 1; i <= 5; i++) {
new Thread(() -> {
String name = Thread.currentThread().getName();
System.out.println("Start: " + name);
int time = random.nextInt(10);

try {
System.out.printf("Thread %s waits %d seconds\n", name, time);
TimeUnit.SECONDS.sleep(time);

// 多个线程等待达到某个条件后一起继续执行
b.await();
System.out.printf("继续执行: %s, At %d\n", name, System.nanoTime());
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("End: " + name);
}, "Thread-" + i).start();
}

System.out.println("thread2 end");
}
}

异步任务的应用场景

异步任务一般用于耗时场景:

  • 文档转换: word, excel 等转为 pdf,方便 HTML5 或者移动端在线预览,例如微信里打开 word 文档
  • 视频转换: 把 mov, avi, wmv, rmvb 等格式转为 mp4 在网页和手机里播放
  • 报表导出: 有些报表计算需要花很多时间
  • 微服务调用: 一个方法中需要使用几个其他服务的数据,使用 CompletionService 能够提高效率

案例:

  1. 前端提交了一个导出报表的请求
  2. 服务器异步执行报表导出 (需要 1 分钟),并且马上响应给前端请求已经执行
  3. 前端显示请求已经执行

问题: 前端怎么才能知道报表导出的进度?

作业

  1. 同一个方法上同时使用 @Async + @Transactional 会有问题吗?如果有,怎么解决?

  2. java.util.concurrent.Pharser 和 CyclicBarrier 与 CountDownLatch 很相似,具体有什么一同呢?

    A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.

参考资料