服务停止
应用程序中通常会创建拥有多个线程的服务,比如线程池。
一般来说,服务的生命周期基本都比线程的生命周期要长。
当要停止服务时,服务拥有的这些线程也需要同时结束,所以服务应该要提供相应的生命周期方法,比如 shutdown()
、shutdownNow()
这样的。
一、结束正在运行的线程
服务如何结束正在运行的线程?理论上可以通过中断来关闭。
中断线程只能由其所有者才能做,而服务是线程的所有者,刚好可以对线程执行中断。
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class LogWriter {
private final BlockingQueue<String> queue; private final LoggerThread logger;
public LogWriter(Writer writer) { this.queue = new LinkedBlockingQueue<>(); logger = new LoggerThread(writer); }
public void start() { logger.start(); }
public void log(String msg) throws InterruptedException { queue.put(msg); }
private class LoggerThread extends Thread { @Override public void run() { try { while (true) { writer.println(queue.take()); } } catch (InterruptedException e) { } finally { writer.close(); } } } }
|
仅仅只是中断了线程,就可以关闭线程了吗?
还不行,因为中断还有可能来着其他未知的地方,比如非所有者发起的非法中断。
而且直接退出也有可能会丢失部分信息,比如上面的例子就会导致后面的日志信息丢失。
一个好的服务,应该具备更好的关闭线程机制。
1.1 关闭标志
可以在服务中设置一个标志,表明服务要停止了,线程应该要结束。
此时禁止向线程提交任何数据,等线程清理完当前剩余的数据后,就可以退出了。
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class LogService { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; private boolean isShutdown; private int reservations; public void start() { loggerThread.start(); } public void stop() { synchronized (this) { isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException { synchronized (this) { if (isShutdown) { throw new IllegalStateException("Shutdown"); } ++reservations; } queue.put(msg); }
private class LoggerThread extends Thread {
@Override public void run() { try { while (true) { synchronized(LogService.this) { if (isShutdown && reservations == 0) { break; } } String msg = queue.take(); synchronized (LogService.this) { reservations--; } writer.println(msg); } } catch (InterruptedException e) { } finally { writer.close(); } } } }
|
这里通过设置“请求关闭”标志,避免了日志的提交,并且服务线程是在处理完日志后才退出的。
1.2 “毒丸”对象
当服务是一种类似生产者-消费者模式的运行方式时,还有一种“毒丸”方式可以用。
“毒丸”,是将一个结束对象放到处理队列中,当处理线程收到这个对象时,就立即结束退出。
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class LogPoisonService {
private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; private boolean isShutdown; private final String poison = new String("");
public void stop() { synchronized (this) { isShutdown = true; } while (true) { try { queue.put(poison); break; } catch (InterruptedException e) { } } loggerThread.interrupt(); }
public void log(String msg) throws InterruptedException { synchronized (this) { if (isShutdown) { throw new IllegalStateException("Shutdown"); } } queue.put(msg); }
private class LoggerThread extends Thread {
@Override public void run() { try { while (true) { String msg = queue.take(); if (msg == poison) { break; } writer.println(msg); } } catch (InterruptedException e) { } finally { writer.close(); } } }
}
|
不过“毒丸”对象的使用,并不是那么简单,需要满足一些条件:
- 只有在生产者和消费者数量已知的情况下,“毒丸”对象才能使用
- “毒丸”对象只有在无界队列中才能可靠工作
一颗“毒丸”,只能停止一条线程,所以如果有多条生产者-消费者线程,那就需要提供多个“毒丸”。
提交“毒丸”的时候,不能被阻塞,否则会存在线程阻塞,关闭操作不应该被阻塞,所以只能用无界队列。
二、处理非正常的线程终止
线程并非一定会正常终止,而是可能会提前死亡。
导致线程提前死亡的主要原因就是 RuntimeException
异常,而任何代码都可能抛出 RuntimeException
异常。
2.1 主动捕获异常
处理线程异常死亡的最简单方式,就是捕获异常:
1 2 3 4 5 6 7 8 9 10 11 12
| public void run() { Throwable thrown = null; try { while (!isInterrupted()) { runTask(getTaskFromWorkQueue()); } } catch (Throwable e) { thrown = e; } finally { threadExited(this, thrown); } }
|
捕获异常后,再通过正常方式退出线程,这样可以降低线程异常死亡对程序的影响。
2.2 异常处理回调
除了主动捕获异常的方式,在 Thread API 中还提供了异常回调接口 UncaughtExceptionHandler
。
UncaughtExceptionHandler
可以用于处理线程由于未捕获异常而终止的情况:
1 2 3
| public interface UncaughtExceptionHandler { void uncaughtException(Thread t, Throwable e); }
|
只要实现这个接口,并将其注册到线程对象中即可:
1 2 3 4 5
| thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { } });
|
另外,Thread 还提供了全局的异常回调接口 Thread.setDefaultUncaughtExceptionHandler
。
三、关闭 ExecutorService
ExecutorService
提供了2种方式停止服务:
shutdownNow()
:强制停止服务,关闭速度快,但是存在很大的风险,任务可能在执行过程中被迫结束
shutdown()
:正常停止服务,会等到队列中的所有任务都执行完毕后才停止服务
这2种方式的差别就在于各自的安全性和响应性。
3.1 强制关闭 shutdownNow
shutdownNow()
会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务。
但是没有常规办法识别出哪些任务已经开始但尚未结束,即不知道哪些任务处于可执行状态。
可以用以下的方法来跟踪关闭后被取消的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<>()); @Override public void execute(Runnable runnable) { exec.execute(() -> { try { runnable.run(); } finally { if (isShutdown() && Thread.currentThread().isInterrupted()) { tasksCancelledAtShutdown.add(runnable); } } }); } }
|
不过这个方法依赖于任务不会屏蔽中断请求,否则没有办法跟踪。
3.2 正常关闭 shutdown
shutdown()
只会尝试取消正在执行的任务,然后就直接返回了。
等待队列中所有任务执行完成后,ExecutorService
才会真正停止。
不过,为了避免任务长时间不结束,一般都会加上超时等待:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final AtomicBoolean hasMail = new AtomicBoolean(false); try { for (final String host : hosts) { exec.execute(() -> { if (checkMail(host)) { hasMail.set(true); } }); } } finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasMail.get(); }
|
shutdown()
虽然好,但就是存在永远停止不了的风险。
所以正常停止一般都是等待超时后,再循环等待,或者直接强制停止。