In this post we explore one way to implement Rate Limiting at the application level using Rust and Tokio.
Sending a large number of requests in a short period of time is not polite and may be flagged as malicious behavior. At best, you would waste bandwidth and energy. At worst, Your ISP will threaten to shut off your internet connection!
The Recipe
A rate limiter can be built using two primitives from the Tokio library:
- tokio::sync::Mutex to control concurrency
- tokio::time::interval to control the rate at which requests are sent
We wrap the Interval in a Mutex to ensure that only one task can acquire a mutable reference to it at a time. After a mutable reference to the interval has been acquired, the task can wait for the interval to tick.
With this design, any client who wishes to be rate limited must first acquire the lock & then wait for the interval. This allows for concurrency control among many clients who attempt to throttle themselves using the same rate limiter.
Why does this work?
Tokio’s primitives help us solve many problems.
The asynchronous Mutex holds a tokio::sync::MutexGuard across .await
points.
This is key because this Mutex provides us with mutable access to the interval; any client who would like to send a request
must first acquire the lock on the mutex, and then wait for the interval to tick.
Using a std::sync::Mutex will not suffice here! Since the interval is asynchronous,
we must .await
it, which means we yield execution to the runtime. This is problematic with a synchronous mutex because
the std::sync::MutexGuard will still be owned by the task that just yielded!
If we used the synchronous Mutex in an asynchronous context we are prone to deadlock. This is because the MutexGuard is never dropped.
Shared mutable state in Rust is an article you can read to learn more about how this works. If you don’t read it, at least see one of it’s most important points:
You cannot .await anything while a (synchronous) mutex is locked.
It is possible to deadlock if you .await
something while holding a synchronous mutex.
Deadlock is a silent but deadly problem that you must be diligent to avoid.
The implementation
The async_throttle::RateLimiter looks like this:
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::{interval, Interval};
pub struct RateLimiter {
interval: Mutex<Interval>,
}
impl RateLimiter {
pub fn new(period: Duration) -> Self {
Self {
interval: Mutex::new(interval(period)),
}
}
pub async fn throttle<Fut, F, T>(&self, f: F) -> T
where
Fut: std::future::Future<Output = T>,
F: FnOnce() -> Fut,
{
self.wait().await;
f().await
}
async fn wait(&self) {
let mut interval = self.interval.lock().await;
interval.tick().await;
}
}
The throttle
function is exposed for clients to use when performing an action that should be rate limited.
The wait
function is used internally to cooperatively yield execution when other clients wish to share the same rate limiter concurrently.
Optimizing
So far so good!
But how do we stop independently executing clients from stepping on each other’s toes? It would be nice if we could rate limit different parts of our program separately.
We will have to implement some kind of sharding strategy! The dashmap::DashMap crate takes care of most the heavy lifting.
The optimized variant is a little bit spicier than the previous implementation. Now that we’re dealing with concurrent access to multiple rate limiters we have to be careful with our locking strategy.
What’s next
We started with this:
pub struct RateLimiter {
interval: Mutex<Interval>,
}
And we’re working towards something like this:
pub struct MultiRateLimiter<K> {
rate_limiters: dashmap::DashMap<K, RateLimiter>,
}
By associating particular keys with their own rate limiter, we’re able to make gains in performance for tasks that should be rate limited separately.
Why?
Wait, hold up. Why do we care about any of this?
I’m going through all of this trouble because I’m trying to build a web crawler.
I want this web crawler to be a little smart so that it’s able to make progress while being respectful to the servers it is crawling. For example we should be able to send requests to google.com
and yahoo.com
at the same time, without these requests sharing the same rate limiter.
We can squeeze more performance out of the web crawler by using separate rate limiters for each domain.
Fighting the Borrow Checker
Asynchronous Rust can throw a wrench into the works sometimes…
In this case, we need to be able to call interval.tick().await
while holding a mutable reference to an interval
(which comes from the mutex guard) for any key that is requested.
Our optimization strategy will revolve around creating a map that holds RateLimiter
values associated with a specific key. This way we can have multiple rate limiters, each with their own interval.
The difficult part of this is dealing with the borrowing & ownership rules. To be able to share the top-level rate limiter (either a single or multi rate limiter), we have to wrap it in a std::sync::Arc. This is a reference counted pointer that allows us to share ownership of the data across multiple threads.
The problem? Arcs don’t give us mutable access to the data they point to.
We get around this by using dashmap::mapref::entry::Entry! This entry will tell us if the key we are looking for is already in the map, and if not, it can insert a value for us. This is precisely the internal mutability we’re looking for!
Beware
We may encounter trouble if we aren’t careful.
From the dashmap::DashMap::entry docs:
entry API that tries to mimic std::collections::HashMap
Locking behaviour: May deadlock if called when holding any sort of reference into the map.
That sounds a lot like what we spoke about earlier with synchronous mutex guards and .await
points.
Luckily dashmap::DashMap::try_entry is here to save us. Here is what the API says:
entry API that tries to mimic std::collections::HashMap
Returns None if the shard is currently locked.
Let’s try it out!
Multi-Key Throttle
It turns out that dashmap::DashMap will indeed help us, but we need to employ a strategy to avoid deadlocks (and livelocks too! progress is what counts!).
It would be really nice if we could simply use dashmap::DashMap::entry and rely on its internal locking and unlocking strategy. Unfortunately, I ran into deadlock when I tried this & couldn’t find an appropriate solution.
dashmap::DashMap::try_entry is still on the table & we should consider if it’s appropriate. It’s a fallible operation, so naturally we will want to retry if it fails. This retry behavior is commonly implemented with a loop.
DashMap uses sharding to distribute each dashmap::mapref::entry::Entry in memory. This may cause multiple keys to map to the same shard, which will cause contention between clients when attempting to lock the same shard.
To reduce contention between multiple tasks who want the same shard, we use an exponential backoff to help give other tasks a chance to acquire the lock for a shard.
I don’t know if it’s ideal to use this spin-loop & backoff strategy, but it seems to work so I’ll run with it.
pub async fn throttle<Fut, F, T>(&self, key: K, f: F) -> T
where
Fut: std::future::Future<Output = T>,
F: FnOnce() -> Fut,
{
loop {
let mut backoff = get_backoff();
match self.rate_limiters.try_entry(key.clone()) {
None => {
// Safety: `next_backoff` always returns Some(Duration)
tokio::time::sleep(backoff.next_backoff().unwrap()).await
}
Some(entry) => {
let rate_limiter = entry.or_insert_with(|| RateLimiter::new(self.period));
return rate_limiter.value().throttle(f).await;
}
}
}
}
The throttle
function now accepts a key argument which it will use to select which rate limiter to throttle with.
We first try to grab the entry from the map.
If this operation fails, we back off & try again.
If the operation succeeds, we have a reference to an entry in the map. If a rate limiter isn’t associated with the key, we create a new rate limiter and insert it into the entry.
Finally, we pull the rate limiter out & throttle the client.
The implementation
Here is the (boiled down) code for async_throttle::MultiRateLimiter:
use crate::sync::RateLimiter;
use backoff::backoff::Backoff;
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use std::hash::Hash;
use std::time::Duration;
pub struct MultiRateLimiter<K> {
period: Duration,
rate_limiters: dashmap::DashMap<K, RateLimiter>,
}
impl<K: Eq + Hash + Clone> MultiRateLimiter<K> {
pub fn new(period: Duration) -> Self {
Self {
period,
rate_limiters: dashmap::DashMap::new(),
}
}
pub async fn throttle<Fut, F, T>(&self, key: K, f: F) -> T
where
Fut: std::future::Future<Output = T>,
F: FnOnce() -> Fut,
{
loop {
let mut backoff = get_backoff();
match self.rate_limiters.try_entry(key.clone()) {
None => {
// Safety: `next_backoff` always returns Some(Duration)
tokio::time::sleep(backoff.next_backoff().unwrap()).await
}
Some(entry) => {
let rate_limiter = entry.or_insert_with(|| RateLimiter::new(self.period));
return rate_limiter.value().throttle(f).await;
}
}
}
}
fn get_backoff() -> ExponentialBackoff {
ExponentialBackoffBuilder::default()
.with_initial_interval(Duration::from_millis(50))
.with_max_elapsed_time(None)
.build()
}
}
Testing
I wrote a few unit tests for each of the components.
Testing things that are time sensitive is hard; you rely on the CPU clock to provide you with a consistent time source. Using std::time::Duration and std::time::Instant::now isn’t perfect, but it is definitely pragmatic. I used these to run pseudo-benchmarks on the test code.
The curious reader can visit lib-wc/experiments/tokio-rate-limit to see the rate limiter in action. Try to run it yourself!
Conclusion
We did it!
async_throttle::MultiRateLimiter provides support for multiple, independent keys while async_throttle::RateLimiter provides the rate limiting logic.
By using a few building blocks from the Tokio and DashMap libraries we’ve built a high-level construct which helps control the rate of computation in a program.
You can find documentation for both of these tools on docs.rs/async-throttle.
Addendum
I’ve shamelessly stolen the phrase “the recipe” from Alice Ryhl.
Additionally, I’ve started to use ChatGPT and GitHub Copilot when working with Rust. It’s an interesting strategy; copilot can massively reduce the amount of typing I have to do, but it’s not perfect. I use ChatGPT as a tool which gives me pretty good answers most of the time™.
Copilot and ChatGPT can be especially helpful for discovery. I’ve learned countless things about Rust from these tools including fancy syntax, commands, idioms, and even new crates!