Once upon a time I was making a Chat Server. Clients needed to wait on three things at once:
- An incoming message from the server
- A message from the current user’s keyboard
- A signal to shut the program down
I ran into deadlock when trying to accomplish the 2nd task, reading from stdin.
The deadlock occurred because the program couldn’t shut down until all task handles were dropped (a feature of shutdown-async). The task handle that read from stdin was never dropped because the program was waiting for the user to type something!
Digging
Interestingly, tokio::io::Stdin was not the solution to this.
When select!
ing on a read from stdin, the program still deadlocked!
Looking closer at the docs, we see this:
For technical reasons, stdin is implemented by using an ordinary blocking read on a separate thread, and it is impossible to cancel that read. This can make shutdown of the runtime hang until the user presses enter.
For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.
As of writing, the Tokio docs state to “roll your own” for interactive use cases when reading from stdin.
So I did just that.
The async-stdin crate
Using a tokio::sync::mpsc channel and a std::thread, it is possible to pipe stdin into a channel that can be read from asynchronously.
The async-stdin crate provides a simple interface for this:
use async_stdin::recv_from_stdin;
#[tokio::main]
async fn main() {
let mut rx = recv_from_stdin(10);
while let Some(s) = rx.recv().await {
println!("Received: {}", s);
}
}
Implementation
The code is simple, a thread blocks on reading from stdin and sends the result to the channel in a loop.
Clients use the receiving end of the channel to read from stdin asynchronously.
use std::io::{stdin, BufRead, BufReader};
use tokio::sync::mpsc;
pub fn recv_from_stdin(buffer_size: usize) -> mpsc::Receiver<String> {
let (tx, rx) = mpsc::channel::<String>(buffer_size);
let stdin = BufReader::new(stdin());
std::thread::spawn(move || read_loop(stdin, tx));
rx
}
fn read_loop<R>(reader: R, tx: mpsc::Sender<String>)
where
R: BufRead,
{
let mut lines = reader.lines();
loop {
if let Some(Ok(line)) = lines.next() {
let _ = tx.blocking_send(line);
}
}
}