In this post I will describe how to signal for your application to stop accepting new work and wait for any ongoing work to complete.
Graceful shutdown is useful for applications that, when possible, should not stop abruptly. For example a web server should attempt to finish serving any ongoing requests before shutting down.
Tokio has a great article on Graceful Shutdown with a low-level design; I assume you’re familiar with it & show how to build a high-level construct that leverages the building blocks from the article.
The Recipe
Graceful shutdown can be implemented using two of Tokio’s synchronization primitives:
- tokio::sync::broadcast to signal shutdown
- tokio::sync::mpsc to wait for all tasks to complete
As we will see, the Drop mechanics for these types will be useful for coordinating shutdown within a program.
The gist is that the program will know when to signal shutdown when it receives a message on the broadcast channel & it will know when all tasks have completed when all tokio::sync::mpsc::Senders have been dropped.
Drop
When objects go out of scope in Rust, they are dropped automatically.
It is possible to implement the Drop trait for a type to customize the behavior when it is dropped.
The Tokio channels are designed to close when either the sending or receiving sides are completely dropped (e.g., when the last Sender or Receiver is dropped).
Using Drop for Coordination
Now that we know that the Tokio channels will close when they are dropped, we can leverage them to coordinate shutdown.
Let’s look at both of the channels we will be using to see how they behave.
tokio::sync::broadcast
The broadcast channel is used to tell all ongoing tasks that shutdown has started.
From the tokio::sync::broadcast docs:
When all Sender handles have been dropped, no new values may be sent. At this point, the channel is “closed”. Once a receiver has received all values retained by the channel, the next call to recv will return with RecvError::Closed.
Every task will be provided with a receiver that it can use to check if shutdown has started.
When shutdown has started, the send half of the channel will be dropped, causing the tokio::sync::broadcast::Receiver to return Err(RecvError::Closed)
when it is polled.
Clients will use this to determine if they should continue processing work or if they should finish.
tokio::sync::mpsc
The mpsc channel is used to wait for all ongoing tasks to complete.
From the tokio::sync::mpsc:Receiver::recv docs:
This method returns None if the channel has been closed and there are no remaining messages in the channel’s buffer. The channel is closed when all senders have been dropped, or when close is called.
Every task will be provided with a tokio::sync::mpsc::Sender that it can use to implicitly signal completion.
When all tasks are done, they will drop their tokio::sync::mpsc::Sender which will cause the tokio::sync::mpsc::Receiver to return None
when it is polled.
High Level Design
We will hide the complexity of the low-level Graceful Shutdown implementation behind a high-level API.
Doing so will make it cleaner to integrate this functionality into a program since the method signatures will be simpler and clients who use this will have less to keep track of.
To accomplish this, we wrap the low-level logic inside two types:
pub struct ShutdownController { ... }
pub struct ShutdownMonitor { ... }
The program will create a ShutdownController instance when it starts up. It will be used to create listeners, signal shutdown, and wait for completion.
Clients will accept a ShutdownMonitor instance when they are created & use it to determine if they should continue processing work.
Example Usage
use shutdown_async::ShutdownController;
use tokio::task::spawn;
#[tokio::main]
async fn main() {
let shutdown = ShutdownController::new();
let t = spawn({
let mut monitor = shutdown.subscribe();
assert!(!monitor.is_shutdown());
async move {
monitor.recv().await;
assert!(monitor.is_shutdown());
}
});
shutdown.shutdown().await;
}
Signaling Shutdown
The implementation follows directly from the Graceful Shutdown article.
pub struct ShutdownController {
/// Used to tell all [`ShutdownMonitor`] instances that shutdown has started.
notify_shutdown: broadcast::Sender<()>,
/// Implicitly used to determine when all [`ShutdownMonitor`] instances have been dropped.
task_tracker: mpsc::Sender<()>,
/// Used to determine when all tasks have finished. Calling `recv()` on this channel
/// will return when all of the send halves of the `task_tracker` channel have been dropped.
task_waiter: mpsc::Receiver<()>,
}
impl ShutdownController {
pub fn new() -> Self {
let (notify_shutdown, _) = broadcast::channel::<()>(1);
let (task_tracker, task_waiter) = mpsc::channel::<()>(1);
Self {
notify_shutdown,
task_tracker,
task_waiter,
}
}
pub async fn shutdown(mut self) {
// Notify all tasks that shutdown has started
drop(self.notify_shutdown);
// Destroy our mpsc::Sender so that the mpsc::Receiver::recv() will return immediately
// once all tasks have completed (i.e. dropped their mpsc::Sender)
drop(self.task_tracker);
// Wait for all tasks to finish
let _ = self.task_waiter.recv().await;
}
pub fn subscribe(&self) -> ShutdownMonitor {
ShutdownMonitor::new(self.notify_shutdown.subscribe(), self.task_tracker.clone())
}
}
Waiting for Completion
Again, the implementation follows directly from the Graceful Shutdown article.
pub struct ShutdownMonitor {
/// `true` if the shutdown signal has been received
shutdown_received: bool,
/// The receive half of the channel used to listen for shutdown.
shutdown_notifier: broadcast::Receiver<()>,
/// Implicitly used to help [`ShutdownController`] understand when the program
/// has completed shutdown.
_task_tracker: mpsc::Sender<()>,
}
impl ShutdownMonitor {
fn new(
shutdown_notifier: broadcast::Receiver<()>,
_task_tracker: mpsc::Sender<()>,
) -> ShutdownMonitor {
ShutdownMonitor {
shutdown_received: false,
shutdown_notifier,
_task_tracker,
}
}
pub fn is_shutdown(&self) -> bool {
self.shutdown_received
}
pub async fn recv(&mut self) {
// If the shutdown signal has already been received, then return
// immediately.
if self.shutdown_received {
return;
}
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.shutdown_notifier.recv().await;
// Remember that the signal has been received.
self.shutdown_received = true;
}
}
Conclusion
We did not introduce any new concepts in this article. Instead, we simply wrapped the low-level Graceful Shutdown implementation in a high-level API.
See shutdown_async::ShutdownController on docs.rs to view the documentation for the API.
I personally found the high-level approach much easier to grok than the low-level approach. I hope you do too!