Async Rust & Tokio Internals
Build async Rust from the ground up — from raw futures and wakers to a multi-threaded runtime, then into real tokio internals and production patterns.
Courses
| Course | Lessons | Project |
|---|---|---|
| 1: Async Fundamentals | 0-8 | TCP Echo Server on your executor |
| 2: Build a Mini Tokio | 9-15 | Multi-threaded Chat Server |
| 3: Tokio Deep Dive | 16-22 | HTTP Load Tester |
| 4: Advanced Patterns | 23-28 | Async Job Queue |
Prerequisites
- Rust fundamentals (ownership, traits, generics)
- TCP networking basics
Source code
git clone https://github.com/its-saeed/learn-by-building.git
cd learn-by-building
cargo run -p async-lessons --bin 1-futures -- all
Lesson 0: Why Async?
Real-life analogy: the restaurant
Imagine a restaurant with 100 tables.
Thread-per-connection model = one waiter per table:
Table 1 → Waiter 1 (stands at table, waits for customer to decide)
Table 2 → Waiter 2 (stands at table, waits for food from kitchen)
Table 3 → Waiter 3 (stands at table, waits for customer to finish)
...
Table 100 → Waiter 100
Each waiter does nothing most of the time — they’re blocked waiting. You need 100 waiters (expensive), and they’re mostly idle. If 200 guests show up, you can’t serve them.
Event-driven model = one waiter, many tables:
Waiter checks Table 1 → "still reading menu, skip"
Waiter checks Table 2 → "food is ready!" → serves it
Waiter checks Table 3 → "wants to order!" → takes order, sends to kitchen
Waiter checks Table 4 → "still eating, skip"
...
One waiter handles all tables by only doing work when a table needs something. The waiter never stands idle — they keep circling. This is event-driven I/O.
The buzzer system at a fast-food restaurant is even more accurate:
- You order (register interest)
- You sit down and do other things (non-blocking)
- Buzzer vibrates (event notification — kqueue/epoll)
- You pick up your food (handle the ready event)
The problem: one thread per connection
The traditional server model:
#![allow(unused)]
fn main() {
loop {
let stream = listener.accept();
std::thread::spawn(|| handle(stream)); // one thread per client
}
}
Each thread costs real resources:
┌────────────────────────────────────────────────────┐
│ Thread Memory Layout │
│ │
│ ┌──────────────┐ Each thread gets its own stack │
│ │ Stack: 8 MB │ allocated by the OS. │
│ │ │ Most of it is never used — │
│ │ (mostly │ the thread is just waiting │
│ │ empty, │ on read() or write(). │
│ │ waiting) │ │
│ │ │ │
│ └──────────────┘ │
│ │
│ 10,000 threads × 8 MB = 80 GB virtual memory │
│ (actual RSS is lower, but overhead is still real) │
└────────────────────────────────────────────────────┘
See it yourself
Check your system’s default thread stack size:
# macOS
ulimit -s # prints stack size in KB (usually 8192 = 8 MB)
# Linux
ulimit -s # usually 8192 KB
cat /proc/sys/kernel/threads-max # max threads the kernel allows
Check how many threads a process is using:
# macOS: count threads of a process
ps -M <pid> | wc -l
# Linux: count threads of a process
ls /proc/<pid>/task | wc -l
# Or for any process by name
ps -eLf | grep <process-name> | wc -l
The C10K problem
In 1999, Dan Kegel asked: “how do you handle 10,000 concurrent connections?” With one thread per connection, you can’t — you hit OS limits on memory, thread count, and context switching overhead.
Connections Threads Memory (8MB stack) Context switches/sec
──────────────────────────────────────────────────────────────────────
100 100 800 MB ~10,000
1,000 1,000 8 GB ~100,000
10,000 10,000 80 GB ~1,000,000 (OS melts)
100,000 ??? impossible impossible
Modern servers need to handle 100K-1M+ connections. Threads don’t scale.
The solution: event-driven I/O
Instead of blocking a thread per connection, use one thread that watches all connections:
┌──────────────────────────────────────────────────────────────┐
│ Event-Driven Server │
│ │
│ ┌─────────┐ │
│ │ kqueue │ ← register: "notify me when socket 5 is ready" │
│ │ /epoll │ ← register: "notify me when socket 8 is ready" │
│ │ │ ← register: "notify me when socket 12 is ready" │
│ └────┬────┘ │
│ │ │
│ ▼ wait() — blocks until ANY socket is ready │
│ │
│ "Socket 8 is readable!" │
│ │ │
│ ▼ read from socket 8, process, respond │
│ │ │
│ ▼ back to wait() │
│ │
│ One thread. 100,000 connections. ~10 MB memory. │
└──────────────────────────────────────────────────────────────┘
See blocking in action
Run this in Terminal 1 — a blocking TCP server:
python3 -c "
import socket, os
print('=== Blocking Server Demo ===')
print(f'PID: {os.getpid()}')
print()
s = socket.socket()
s.bind(('127.0.0.1', 9999))
s.listen()
# BLOCKING CALL #1: accept()
# The thread is FROZEN here. It can't do anything else.
# Open another terminal and run: echo hello | nc 127.0.0.1 9999
print('[1] Calling accept()... thread is BLOCKED, waiting for a connection')
print(' → Nothing else can happen on this thread until someone connects')
print(' → In another terminal run: echo hello | nc 127.0.0.1 9999')
print()
conn, addr = s.accept()
print(f'[2] accept() returned! Someone connected from {addr}')
print()
# BLOCKING CALL #2: recv()
# The thread is FROZEN again, waiting for data.
print('[3] Calling recv()... thread is BLOCKED again, waiting for data')
data = conn.recv(1024)
print(f'[4] recv() returned! Got: {data}')
print()
print('=== Takeaway ===')
print('Between [1] and [2], the thread did NOTHING. Completely idle.')
print('Between [3] and [4], same thing. Idle.')
print('With 10,000 clients, you need 10,000 threads all sitting idle like this.')
print('That is the problem async solves.')
"
In Terminal 2:
echo hello | nc 127.0.0.1 9999
Watch the output: the server prints [1], then freezes. Nothing happens until you connect from Terminal 2. Then it prints [2], [3], and freezes again until data arrives. Each freeze = a wasted thread.
See non-blocking behavior
python3 -c "
import socket
print('=== Non-Blocking Demo ===')
print()
s = socket.socket()
s.setblocking(False) # <-- key difference: non-blocking mode
# Non-blocking connect returns IMMEDIATELY, even though the connection
# isn't established yet. Instead of freezing the thread, it raises an error.
print('[1] Calling connect() on a NON-BLOCKING socket...')
try:
s.connect(('example.com', 80))
except BlockingIOError as e:
print(f'[2] connect() returned INSTANTLY with: {e}')
print(' → The thread was NOT frozen. It got WouldBlock immediately.')
print(' → The connection is still in progress in the background.')
print()
# Non-blocking recv also returns immediately if no data is ready.
print('[3] Calling recv() on a NON-BLOCKING socket (no data yet)...')
try:
s.recv(1024)
except BlockingIOError as e:
print(f'[4] recv() returned INSTANTLY with: {e}')
print(' → No data yet, but the thread is FREE to do other work.')
print()
print('=== Takeaway ===')
print('Non-blocking I/O never freezes the thread.')
print('Instead of waiting, you get WouldBlock and can go handle other connections.')
print('This is what async runtimes (tokio) do under the hood:')
print(' 1. Try non-blocking read → WouldBlock')
print(' 2. Register with kqueue/epoll: \"tell me when this socket is ready\"')
print(' 3. Go handle other tasks')
print(' 4. kqueue/epoll wakes you up → try read again → data is there')
"
Blocking vs non-blocking: what happens at the OS level
When you call read() on a blocking socket:
Your code OS Kernel
│ │
├── read(fd) ──────────────────► │
│ (your thread is FROZEN) │ waiting for data...
│ (can't do anything else) │ still waiting...
│ │ data arrives!
│ ◄── returns data ──────────────┤
│ │
When you call read() on a non-blocking socket:
Your code OS Kernel
│ │
├── read(fd) ──────────────────► │
│ ◄── WouldBlock (instantly) ────┤ no data yet
│ │
│ (go do other work!) │
│ │
├── read(fd) ──────────────────► │
│ ◄── WouldBlock (instantly) ────┤ still no data
│ │
│ ... later, after kqueue says │
│ "fd is ready" ... │
│ │
├── read(fd) ──────────────────► │
│ ◄── returns data ──────────────┤ data was ready!
See kqueue/epoll (the OS event system)
On Linux, you can trace the actual syscalls:
strace -e epoll_wait,epoll_ctl,accept,read python3 -c "
import socket
s = socket.socket()
s.bind(('127.0.0.1', 9999))
s.listen()
print('waiting for connection...')
s.accept()
"
# You'll see: epoll_create, epoll_ctl (register fd), epoll_wait (block until event)
On macOS, SIP restricts dtruss/dtrace. Instead, use sample to see where a process is blocked:
# While the blocking server above is waiting on accept():
sudo sample <pid> 1 2>&1 | grep -i 'accept\|kevent\|select'
# You'll see it stuck in accept() or kevent() — the thread is parked in the kernel
To check if syscall tracing is available on your Mac:
csrutil status
# If "System Integrity Protection status: enabled", dtruss is blocked.
# The Python demos above show the same concepts without needing dtruss.
Where async fits
Writing event-driven code by hand is painful — you end up with callback spaghetti:
#![allow(unused)]
fn main() {
// Callback hell (event-driven without async)
socket.on_readable(|data| {
process(data, |result| {
socket.on_writable(|_| {
socket.write(result, |_| {
// deeply nested, hard to follow
});
});
});
});
}
Rust’s async/.await gives you event-driven performance with sequential-looking code:
#![allow(unused)]
fn main() {
// Same logic, but readable
async fn handle(stream: TcpStream) {
let data = stream.read().await; // yields to runtime, doesn't block thread
let result = process(data);
stream.write(result).await; // yields to runtime, doesn't block thread
}
}
The compiler transforms this into a state machine (Lesson 2). The runtime (tokio) manages the event loop (Lesson 8). You write simple code, get scalable performance.
The mental model
┌─────────────────────────────────────────────────────┐
│ What you write │
│ │
│ async fn handle(stream: TcpStream) { │
│ let data = stream.read().await; │
│ stream.write(data).await; │
│ } │
└───────────────────┬─────────────────────────────────┘
│ compiler transforms
▼
┌───────────────────────────────────────────────────────┐
│ What the compiler generates │
│ │
│ A state machine enum: │
│ State::Reading → poll read, if not ready: Pending │
│ State::Writing → poll write, if not ready: Pending│
│ State::Done → return Ready(()) │
└───────────────────┬───────────────────────────────────┘
│ runtime drives
▼
┌─────────────────────────────────────────────────────┐
│ What the runtime does │
│ │
│ loop { │
│ events = kqueue.wait(); │
│ for event in events { │
│ task = lookup(event.fd); │
│ task.poll(); // advance the state machine │
│ } │
│ } │
└─────────────────────────────────────────────────────┘
The cost of async
Async isn’t free. The trade-off:
Threads Async
─────────────────────────────────────────────────────
Memory per task 2-8 MB (stack) ~100 bytes (future struct)
Max connections ~10K ~1M
Context switch OS (expensive) Userspace (cheap)
Code complexity Simple Pin, lifetimes, cancellation
Debugging Good stack traces Confusing stack traces
Ecosystem Everything works Need async versions of libs
CPU-bound work Natural Must use spawn_blocking
Use async when: many concurrent I/O operations (web servers, proxies, chat, databases)
Don’t use async when: CPU-bound work, simple scripts, low concurrency, prototyping
Exercises
Exercise 1: Thread overhead benchmark
Spawn 10,000 threads that each std::thread::sleep(Duration::from_secs(1)). Measure:
- Wall time:
std::time::Instant::now()before and after - Peak memory: check with
psorActivity Monitorwhile running
Then do the same with 10,000 tokio::spawn tasks using tokio::time::sleep. Compare both.
Useful commands while the benchmark runs:
# macOS: check memory of your process
ps -o pid,rss,vsz,comm -p <pid>
# rss = actual memory used (KB), vsz = virtual memory (KB)
# Linux
cat /proc/<pid>/status | grep -E 'VmRSS|VmSize|Threads'
Exercise 2: Max threads
Keep spawning std::thread::spawn in a loop until it fails. Print the count and the error.
# Check your system limits
ulimit -u # max user processes
sysctl kern.num_taskthreads # macOS max threads per process
Exercise 3: Blocking vs non-blocking syscalls
Write two programs that connect to a TCP server and read data:
- Using
std::net::TcpStream(blocking) - Using
std::net::TcpStreamwithset_nonblocking(true)
Trace the syscalls:
# macOS
sudo dtruss -f ./target/debug/my-binary 2>&1 | grep -E 'read|recvfrom|kevent'
# Linux
strace -e read,recvfrom,epoll_wait ./target/debug/my-binary
In the blocking version, you’ll see read() that takes seconds to return.
In the non-blocking version, you’ll see read() returning immediately with EAGAIN/EWOULDBLOCK.
Lesson 1: Futures by Hand
Real-life analogy: the buzzer
You’re at a burger joint. You order at the counter and they hand you a buzzer.
You: "Can I have a burger?"
Counter: "Not ready yet. Here's a buzzer." (Poll::Pending + Waker)
You: Go sit down, check your phone, chat with friends.
...
Buzzer: *BZZZ* (waker.wake())
You: Walk to counter. "Is my burger ready?"
Counter: "Yes, here it is!" (Poll::Ready(burger))
Without the buzzer (no waker), you’d have to keep walking to the counter every 10 seconds asking “is it ready yet?” — wasteful. Without async (blocking), you’d stand frozen at the counter unable to do anything until the burger is done.
The Future trait is the burger order. The Waker is the buzzer. The executor (runtime) is you, managing multiple buzzer orders at once.
What is a Future?
A future is a value that might not be ready yet. It’s Rust’s core async abstraction — the single most important trait in async Rust:
#![allow(unused)]
fn main() {
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // "Here's your result"
Pending, // "Not ready yet, I'll buzz you"
}
}
That’s the entire trait. One method: poll. When called:
- Return
Poll::Ready(value)→ the result is available, we’re done - Return
Poll::Pending→ not ready yet, will notify via waker when ready
The three parts of poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
───────────────── ────────────────────── ────────────────────
│ │ │
│ │ │
Pin<&mut Self> Context Poll<T>
"I promise not "Here's a waker "Ready or
to move this (buzzer) so you Pending?"
future in memory" can notify me"
Pin<&mut Self>— we’ll cover this in Lesson 5. For now: it prevents the future from being moved in memory. Ignore it for simple futures.Context— contains theWaker. The future usescx.waker()to notify the runtime when it should be polled again.Poll<T>— the result: either done (Ready) or not yet (Pending).
What .await does
When you write:
#![allow(unused)]
fn main() {
let value = some_future.await;
}
The compiler transforms it into roughly:
#![allow(unused)]
fn main() {
loop {
match some_future.poll(cx) {
Poll::Ready(value) => break value, // done! use the value
Poll::Pending => yield_to_runtime(), // give control back, wait for wake
}
}
}
.await = “keep polling until ready, yielding control between attempts.”
Visualizing the poll cycle
Executor Future (CountdownFuture { count: 3 })
│ │
├── poll() ──────────────────► │ count=3 → decrement → count=2
│ ◄── Pending ──────────────────┤ wake_by_ref() → "poll me again"
│ │
├── poll() ──────────────────► │ count=2 → decrement → count=1
│ ◄── Pending ──────────────────┤ wake_by_ref() → "poll me again"
│ │
├── poll() ──────────────────► │ count=1 → decrement → count=0
│ ◄── Pending ──────────────────┤ wake_by_ref() → "poll me again"
│ │
├── poll() ──────────────────► │ count=0 → done!
│ ◄── Ready(()) ────────────── ─┤ future is complete
│ │
│ (never poll again) │
The contract
Three rules that futures MUST follow:
-
Don’t poll after Ready — once a future returns
Ready, it’s done. Polling it again is undefined behavior. The result has been consumed. -
Pending MUST wake — if you return
Pending, you MUST arrange forcx.waker().wake()to be called eventually. Otherwise the executor will never poll you again and the task hangs forever. This is the most common async bug. -
Poll should be cheap — do a small amount of work, then return. Don’t block the thread (no
std::thread::sleep, no blocking I/O). If you block insidepoll, you freeze the entire executor.
Rule 2 visualized — what happens if you forget to wake:
Executor Future
│ │
├── poll() ────────────────► │
│ ◄── Pending ──────────────────┤ forgot to call wake()!
│ │
│ ... executor waits ... │ ... future waits ...
│ ... nobody wakes anybody ... │ ... nobody wakes anybody ...
│ │
│ 💀 DEADLOCK — task hangs forever
What a Waker actually is (preview)
You’ll build one from scratch in Lesson 3. For now, the key idea:
┌──────────────────────────────────────────────────────────┐
│ Waker = a callback handle │
│ │
│ waker.wake() → tells the executor: "re-poll me!" │
│ waker.wake_by_ref() → same, without consuming the waker │
│ waker.clone() → copy it, store it for later │
│ │
│ Internally: a function pointer + data pointer │
│ The executor provides the implementation │
│ The future just calls .wake() — doesn't know the details│
└──────────────────────────────────────────────────────────┘
For this lesson’s exercises, we’ll use a noop waker — a waker that does nothing when called. This is enough for manual polling in a loop.
Noop waker (use this for exercises)
Since Rust 1.85+, you can use:
#![allow(unused)]
fn main() {
use std::task::Waker;
let waker = Waker::noop();
}
If your Rust version is older:
#![allow(unused)]
fn main() {
use std::task::{RawWaker, RawWakerVTable, Waker};
fn noop_waker() -> Waker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
RawWaker::new(std::ptr::null(), &VTABLE)
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
}
}
Exercises
Exercise 1: CountdownFuture
Implement a future that counts down from N to 0. Each poll decrements the counter and returns Pending. When it hits 0, return Ready(()).
#![allow(unused)]
fn main() {
struct CountdownFuture {
count: u32,
}
impl Future for CountdownFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// TODO:
// if count > 0: decrement, call cx.waker().wake_by_ref(), return Pending
// if count == 0: return Ready(())
}
}
}
Why wake_by_ref() when we return Pending? Because without it, the executor doesn’t know to poll us again. In a real future, you’d only wake when something actually happens (data arrives, timer fires). Here, we always want to be re-polled immediately — so we wake every time.
Exercise 2: ReadyFuture
Implement a future that immediately returns a value on first poll:
#![allow(unused)]
fn main() {
struct ReadyFuture<T>(Option<T>);
}
- First poll: take the value out of the
Option, returnReady(value) - The
Optionensures the value is only returned once
This is what std::future::ready(42) does internally.
Exercise 3: Poll manually
Don’t use any executor. Use the noop waker to manually poll futures in a loop:
#![allow(unused)]
fn main() {
let waker = Waker::noop();
let mut cx = Context::from_waker(&waker);
let mut future = CountdownFuture { count: 5 };
let mut pinned = std::pin::pin!(future);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(()) => { println!("Done!"); break; }
Poll::Pending => { println!("Not ready yet..."); }
}
}
}
See the state change with each poll. This is literally what an executor does — just a poll loop.
Lesson 2: State Machines
Real-life analogy: the vending machine
A vending machine is a state machine:
┌─────────┐ insert coin ┌──────────┐ press button ┌────────────┐
│ Idle │ ──────────────► │ HasCoin │ ──────────────► │ Dispensing │
│ │ │ │ │ │
└─────────┘ └──────────┘ └─────┬──────┘
▲ │
│ item drops out │
└─────────────────────────────────────────────────────────┘
At any moment, the machine is in one state. An event causes a transition to the next state. It never skips states or goes backwards unexpectedly.
async fn works the same way. Each .await is a state transition. The compiler turns your sequential code into an enum where each variant is a state.
What async fn compiles to
When you write:
#![allow(unused)]
fn main() {
async fn fetch_data() -> String {
let url = build_url().await; // await #1
let response = http_get(url).await; // await #2
response.body
}
}
The compiler generates something like:
#![allow(unused)]
fn main() {
enum FetchData {
// State 0: haven't started yet
Start,
// State 1: waiting for build_url() to complete
// Holds the sub-future for build_url
WaitingForUrl {
build_url_future: BuildUrlFuture,
},
// State 2: got the url, waiting for http_get() to complete
// Holds `url` (needed later) and the sub-future for http_get
WaitingForResponse {
http_get_future: HttpGetFuture,
},
// State 3: done
Done,
}
}
And implements Future for it:
#![allow(unused)]
fn main() {
impl Future for FetchData {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
loop {
match self.as_mut().get_mut() {
FetchData::Start => {
// Create the sub-future for build_url
*self = FetchData::WaitingForUrl {
build_url_future: build_url(),
};
}
FetchData::WaitingForUrl { build_url_future } => {
match Pin::new(build_url_future).poll(cx) {
Poll::Pending => return Poll::Pending, // not ready, yield
Poll::Ready(url) => {
// Transition to next state
*self = FetchData::WaitingForResponse {
http_get_future: http_get(url),
};
}
}
}
FetchData::WaitingForResponse { http_get_future } => {
match Pin::new(http_get_future).poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(response) => {
*self = FetchData::Done;
return Poll::Ready(response.body);
}
}
}
FetchData::Done => panic!("polled after completion"),
}
}
}
}
}
Visualizing the state transitions
poll #1: Start → WaitingForUrl → poll build_url → Pending
↑ return to executor
poll #2: WaitingForUrl → poll build_url → Ready(url)
→ transition to WaitingForResponse
→ poll http_get → Pending
↑ return to executor
poll #3: WaitingForResponse → poll http_get → Ready(response)
→ transition to Done
→ return Ready(response.body)
Each poll() call resumes exactly where the last one left off. No stack needed — the enum variant holds all the state.
Why this matters
Memory: enum vs thread stack
Thread:
┌────────────────────┐
│ Stack: 8 MB │ Fixed allocation, mostly empty.
│ (99% wasted) │ Every thread gets this whether it
│ │ needs it or not.
└────────────────────┘
Async state machine:
┌────────────────────┐
│ Enum: ~100 bytes │ Size = largest variant.
│ (nothing wasted) │ Only stores what the current
│ │ state actually needs.
└────────────────────┘
10,000 tasks:
Threads: 10,000 × 8 MB = 80 GB
Async: 10,000 × 100 B = 1 MB
The compiler does the hard work
Writing state machines by hand is tedious and error-prone. async/.await gives you:
- Readability of sequential code
- Performance of hand-written state machines
- Safety guaranteed by the compiler
A simpler example: add_slowly
Let’s desugar a simple async function step by step:
#![allow(unused)]
fn main() {
// The async version (what you write):
async fn add_slowly(a: u32, b: u32) -> u32 {
let x = yield_once(a).await; // yields once, returns a
let y = yield_once(b).await; // yields once, returns b
x + y
}
}
Where yield_once is a future that returns Pending once, then Ready(value):
#![allow(unused)]
fn main() {
struct YieldOnce<T> {
value: Option<T>,
yielded: bool,
}
}
The state machine for add_slowly:
┌──────────────────┐
│ State: Start │ holds: a, b
│ │
└────────┬─────────┘
│ create YieldOnce(a), poll it → Pending
▼
┌──────────────────┐
│ State: YieldingA │ holds: b, yield_future_a
│ │
└────────┬─────────┘
│ poll yield_future_a → Ready(x)
│ create YieldOnce(b), poll it → Pending
▼
┌──────────────────┐
│ State: YieldingB │ holds: x, yield_future_b
│ │
└────────┬─────────┘
│ poll yield_future_b → Ready(y)
│ compute x + y
▼
┌──────────────────┐
│ State: Done │ return Ready(x + y)
└──────────────────┘
Notice: each state only holds what’s needed going forward. State YieldingB holds x (needed for the final addition) but NOT a (already consumed).
See what the compiler generates
# Install cargo-expand
cargo install cargo-expand
# Write a simple async fn and expand it
cargo expand --bin 2-state-machines 2>/dev/null | head -100
The output is verbose but you’ll see an enum with variants matching the await points.
Exercises
Exercise 1: YieldOnce future
Implement YieldOnce<T> — a future that returns Pending on the first poll (and wakes), then Ready(value) on the second poll. This simulates one async operation completing.
Exercise 2: Manual AddSlowly state machine
Implement AddSlowly as an enum with the states shown above. Implement Future for it by hand — match on the current state, poll sub-futures, transition states.
Run it with poll_to_completion from Lesson 1 and verify it returns the correct sum.
Exercise 3: Async version comparison
Write the same logic as async fn add_slowly using actual async/.await. Run both (your hand-written state machine and the async version). Verify they produce the same result.
Exercise 4: Future sizes
Print the size of various futures:
#![allow(unused)]
fn main() {
async fn no_awaits() -> u32 { 42 }
async fn one_await() -> u32 { yield_once(42).await }
async fn holds_big_data() -> u32 {
let buf = [0u8; 1024];
yield_once(0).await;
buf[0] as u32
}
}
Use std::mem::size_of_val(&future). Compare the sizes — the future that holds [u8; 1024] across an await will be ~1KB larger.
Lesson 3: Wakers & Waking
Real-life analogy: notification systems
Think about all the ways you get notified in daily life:
Scenario Blocking (bad) Waker (good)
─────────────────────────────────────────────────────────────────────
Pizza delivery Stand at the door Doorbell rings
staring at the street
Laundry Sit in front of the Washer beeps
machine watching it spin when done
Doctor's office Stand at the counter They call
asking "is it my turn?" your name
Package delivery Refresh tracking page Push notification
every 5 seconds "delivered!"
In all cases, the waker pattern lets you do other things while waiting. The notification system (doorbell, beep, name call, push notification) is the Waker.
In async Rust:
- Your future is you (waiting for pizza)
- The executor is your brain (managing all your tasks)
- The waker is the doorbell
wake()= doorbell rings → your brain says “go check the door”
The problem
When a future returns Pending, the executor needs to know when to poll it again.
Option A: Busy-polling (wasteful)
Executor: "Are you ready?" → Future: "No"
Executor: "Are you ready?" → Future: "No"
Executor: "Are you ready?" → Future: "No"
Executor: "Are you ready?" → Future: "No"
Executor: "Are you ready?" → Future: "YES!"
(CPU at 100% doing nothing useful)
Option B: Wakers (efficient)
Executor: "Here's my number (waker). Call me when ready."
Future: (stores the waker, waits for I/O)
... executor goes to sleep or handles other tasks ...
Future: (I/O ready!) waker.wake() → "Hey executor, poll me!"
Executor: "Oh! Let me check." → Future: "Ready!"
(CPU at 0% while waiting)
How Waker works internally
A Waker is built from a RawWaker — basically two pointers:
┌─────────────────────────────────────────────────────────┐
│ RawWaker │
│ │
│ ┌──────────────────┐ ┌───────────────────────────┐ │
│ │ data: *const () │ │ vtable: &RawWakerVTable │ │
│ │ │ │ │ │
│ │ Points to the │ │ Function pointers: │ │
│ │ task/executor │ │ clone() → copy waker │ │
│ │ state. Could be │ │ wake() → notify exec │ │
│ │ an Arc<Task>, │ │ wake_by_ref() → same │ │
│ │ a task ID, etc. │ │ drop() → cleanup │ │
│ └──────────────────┘ └───────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
The vtable is like a trait object — function pointers that define behavior. Different executors provide different vtable implementations:
Noop executor: wake() = do nothing
Thread executor: wake() = thread::unpark(thread_handle)
Tokio: wake() = push task to scheduler queue
The future doesn’t know or care how the waker works internally. It just calls waker.wake() and trusts that the executor will re-poll.
Building a Waker from scratch
Step 1: Define the vtable functions
#![allow(unused)]
fn main() {
// Each function receives the `data` pointer from RawWaker
unsafe fn clone(data: *const ()) -> RawWaker {
// Create a copy of the waker (increment refcount, clone Arc, etc.)
}
unsafe fn wake(data: *const ()) {
// Notify the executor: "re-poll the task associated with this data"
}
unsafe fn wake_by_ref(data: *const ()) {
// Same as wake, but doesn't consume the waker
}
unsafe fn drop(data: *const ()) {
// Clean up (decrement refcount, free memory, etc.)
}
}
Step 2: Create the vtable
#![allow(unused)]
fn main() {
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
}
Step 3: Create the waker
#![allow(unused)]
fn main() {
let raw_waker = RawWaker::new(data_ptr, &VTABLE);
let waker = unsafe { Waker::from_raw(raw_waker) };
}
The three wakers you’ll build
1. Noop waker
All vtable functions do nothing. Useful for testing — you manually poll in a loop.
wake() → (does nothing)
Used in: Lesson 1 exercises, simple tests
2. Counting waker
wake() increments an atomic counter. Lets you verify how many times a future called wake.
wake() → counter.fetch_add(1)
Used in: testing that futures wake correctly
3. Thread-parking waker
wake() calls thread::unpark() on a specific thread. The executor thread parks itself after Pending, then the waker unparks it.
Executor thread Future Waker
│ │ │
├── poll() ────────► │ │
│ ◄── Pending ───────┤ stores waker │
│ │ │
├── thread::park() │ │
│ (sleeping) │ │
│ │ (I/O ready) │
│ ├── waker.wake() ────────►│
│ │ ├── thread::unpark()
│ ◄── (wakes up!) ──────────────────────────── │
│ │ │
├── poll() ────────► │ │
│ ◄── Ready(val) ────┤ │
This is how real single-threaded executors work. You’ll use this pattern in Lesson 4.
Exercises
Exercise 1: Noop waker
Build a noop waker from RawWaker + vtable where all functions do nothing. Use it to manually poll the CountdownFuture from Lesson 1. Print each poll result.
Exercise 2: Counting waker
Build a waker that stores an Arc<AtomicU32> as the data pointer. Each wake() call increments the counter. Poll a CountdownFuture(5) and verify the waker was called 5 times.
Hints:
Arc::into_raw(arc)convertsArc<T>to*const T(which you can cast to*const ())Arc::from_raw(ptr)converts back (for clone and drop)- Be careful with reference counting —
cloneshould increment,dropshould decrement
Exercise 3: Thread-parking waker
Build a waker that calls thread.unpark() when wake() is called.
- Get the current thread handle:
std::thread::current() - Store it in the waker’s data pointer
wake()callsthread_handle.unpark()- In your poll loop: after
Pending, callstd::thread::park() - The waker’s
wake()will unpark you, and you’ll poll again
This is a real executor pattern. Test it with CountdownFuture — but note: the future calls wake_by_ref() synchronously during poll(), so park() will immediately return (the thread was already unparked before it parks). This is fine — park returns immediately if there’s a pending unpark.
Exercise 4: Waker contract verification
Create a future that intentionally doesn’t call wake() when returning Pending. Use your thread-parking waker. Show that the executor hangs — thread::park() blocks forever because nobody calls unpark(). This demonstrates why Rule 2 (“Pending must wake”) exists.
Interrupt with Ctrl+C after a few seconds.
Lesson 4: Tasks
Real-life analogy: the post office
A letter (future) is just content — it can’t deliver itself. To get it somewhere, you put it in an envelope (task) with:
- The letter inside (the future)
- A return address (the waker — how to notify when done)
- A tracking number (so the system can find it)
- A destination queue (which mailbag it goes in)
The postal worker (executor) doesn’t handle raw letters. They handle envelopes — because envelopes have all the metadata needed for delivery.
Letter (Future): Envelope (Task):
┌──────────────┐ ┌───────────────────────────┐
│ Dear Bob, │ │ To: task queue │
│ ...content...│ │ Return addr: waker │
│ │ │ Tracking: Arc<Task> │
└──────────────┘ │ ┌──────────────┐ │
│ │ Dear Bob, │ │
Can't deliver itself. │ │ ...content...│ │
│ └──────────────┘ │
└───────────────────────────┘
The system can route this.
Future vs Task
A future is a struct implementing Future. It’s passive — it just sits there until someone calls poll().
A task is a future wrapped with executor metadata: a waker, a queue reference, and shared ownership via Arc. It’s the unit of work the executor manages.
spawn(future) → Task → pushed to queue → executor polls it
You write futures. The executor manages tasks.
What a Task looks like in Rust
#![allow(unused)]
fn main() {
struct Task {
/// The future, pinned and boxed.
/// - Box: because different tasks hold different future types (type erasure)
/// - Pin: because futures may be self-referential (Lesson 6)
/// - Mutex: because the executor thread and waker may access concurrently
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Reference to the executor's task queue.
/// The waker uses this to push the task back when wake() is called.
queue: Arc<Mutex<VecDeque<Arc<Task>>>>,
}
}
Why each part:
dyn Future<Output = ()>— type erasure. The executor holds many different future types in one queue.Box— puts the future on the heap (required fordyn).Pin— prevents moving the future after first poll (required by theFuturetrait).Mutex— interior mutability. We need&mutaccess to poll, but the task is shared viaArc.Send— the task might move between threads (multi-threaded executor).Arc<Task>— shared ownership. Both the executor queue and the waker hold references.
The lifecycle of a task
spawn(my_future)
│
▼
┌──────────────────────────┐
│ Task created │
│ future = Box::pin(f) │
│ queue = executor.queue │
└────────────┬─────────────┘
│
▼
┌──────────────────────────┐
│ Pushed to executor queue │
│ queue: [... , task] │
└────────────┬─────────────┘
│
▼
┌──────────────────────────┐
│ Executor pops task │◄──────────────────┐
│ Builds waker for it │ │
│ Calls task.future.poll() │ │
└────────────┬─────────────┘ │
│ │
┌────────┴────────┐ │
│ │ │
▼ ▼ │
Poll::Ready Poll::Pending │
│ │ │
▼ ▼ │
Task done. Task NOT in queue. │
Drop it. Waiting for event. │
│ │
▼ │
Event fires │
waker.wake() ───────────────────┘
pushes Arc<Task> back to queue
The waker-task connection
This is the critical piece. Each task gets a waker whose wake() pushes the task back into the queue:
#![allow(unused)]
fn main() {
fn create_waker_for_task(task: Arc<Task>) -> Waker {
// The waker's data pointer is the Arc<Task>
// wake() does: task.queue.lock().push_back(task.clone())
}
}
When a future inside a task calls cx.waker().wake_by_ref():
- The waker grabs its
Arc<Task> - Pushes the
Arc<Task>into the executor’s queue - The executor wakes up (if parked) and polls the task again
Future calls: cx.waker().wake_by_ref()
│
Waker does: queue.lock().push_back(arc_task.clone())
│
Executor sees: queue is non-empty → pop task → poll it
The 'static requirement
When you call tokio::spawn(future), the future must be 'static. Why?
#![allow(unused)]
fn main() {
fn bad_example() {
let data = vec![1, 2, 3];
tokio::spawn(async {
println!("{:?}", data); // ERROR: `data` doesn't live long enough
});
// `data` is dropped here, but the task might still be running!
}
}
The task lives independently — it might outlive the function that spawned it. So it can’t borrow local variables. It must own everything it needs.
Fix: move ownership into the task:
#![allow(unused)]
fn main() {
fn good_example() {
let data = vec![1, 2, 3];
tokio::spawn(async move { // `move` transfers ownership
println!("{:?}", data); // task owns `data`
});
// `data` has been moved, can't use it here
}
}
The Send requirement
For multi-threaded executors, tasks must be Send — they might be polled on different threads.
#![allow(unused)]
fn main() {
// This WON'T compile with tokio::spawn:
let rc = Rc::new(42); // Rc is !Send
tokio::spawn(async move {
println!("{}", rc); // ERROR: Rc cannot be sent between threads
});
// Fix: use Arc instead of Rc
let arc = Arc::new(42); // Arc is Send
tokio::spawn(async move {
println!("{}", arc); // OK
});
}
A future is Send if all values it holds across .await points are Send. If you hold a MutexGuard (which is !Send) across an .await, the future becomes !Send and can’t be spawned.
.await is NOT a new task
A common confusion:
#![allow(unused)]
fn main() {
tokio::spawn(async { // ← ONE task
let a = foo().await; // ← state transition within the task
let b = bar().await; // ← another state transition, same task
a + b // all inside one task
});
tokio::spawn(async { // ← SECOND task (independent)
baz().await;
});
}
spawn() creates a task. .await is a yield point within a task. Two awaits in one async block = one task with two state transitions. Two spawn() calls = two tasks that run concurrently.
JoinHandle: getting a result from a task
spawn() returns a JoinHandle — a future that resolves when the task completes:
#![allow(unused)]
fn main() {
let handle = tokio::spawn(async { 42 });
let result = handle.await.unwrap(); // 42
}
Internally, JoinHandle is:
#![allow(unused)]
fn main() {
struct JoinHandle<T> {
result: Arc<Mutex<Option<T>>>, // shared with the task
waker: Arc<Mutex<Option<Waker>>>, // notified when task completes
}
}
When the task finishes, it stores the result and wakes the JoinHandle’s waker. When you .await the handle, it checks if the result is ready.
Exercises
Exercise 1: Build a Task struct
Define a Task struct with:
- A pinned, boxed, type-erased future
- A reference to a shared task queue
Create a task from a CountdownFuture. Don’t poll it yet — just verify you can construct it.
Exercise 2: Create a waker for a task
Build a Waker whose wake() pushes Arc<Task> back into the queue.
- Store
Arc<Task>as the waker’s data pointer (Arc::into_raw) wake()recovers the Arc, locks the queue, pushes the task- Poll a task using this waker. Verify the task re-appears in the queue after returning
Pending.
Exercise 3: Task lifecycle
Implement the full lifecycle:
- Create a task queue (
Arc<Mutex<VecDeque<Arc<Task>>>>) - Spawn a
CountdownFuture(3)into it - Loop: pop task, create waker, poll, check queue
- Print the queue state after each poll
- Verify: task appears in queue after Pending, disappears after Ready
Exercise 4: JoinHandle
Implement a simple JoinHandle<T>:
spawn()returns aJoinHandlealongside the task- Both share an
Arc<Mutex<Option<T>>> - When the task’s future completes, store the result
JoinHandleimplementsFuture— polls check if result is available- Test: spawn a future that returns 42, await the handle, assert 42
Exercise 5: ’static and Send verification
Write futures that fail to compile and understand why:
- A future that borrows a local variable → fails
'static - A future that holds
Rc<T>across an await → failsSend - Fix both — use
moveandArcrespectively
Lesson 5: A Minimal Executor
Prerequisites: Lesson 4 (Tasks) — you should understand what a Task is before building the executor that drives them.
What is a Task? (quick recap)
We’ve been using the word “task” loosely. Let’s define it precisely.
A future is a state machine that can be polled. It’s just a struct that implements Future. It doesn’t know about executors, queues, or scheduling. It’s passive — it sits there until someone calls poll().
A task is a future that has been spawned onto an executor. It’s the executor’s unit of work — a future wrapped with everything the executor needs to manage it.
Future alone: Task (future + executor metadata):
┌──────────────────┐ ┌──────────────────────────────┐
│ impl Future │ │ Task │
│ │ │ │
│ poll() → Ready │ │ future: Pin<Box<dyn Future>>│
│ → Pending │ │ waker: Waker │
│ │ │ state: Running | Completed │
└──────────────────┘ │ queue: Arc<Mutex<VecDeque>> │
│ │
Just a struct. └──────────────────────────────┘
Can't run itself.
Knows how to re-schedule itself.
The executor polls this.
Analogy
- Future = a recipe (instructions for making a dish)
- Task = a kitchen ticket (recipe + order number + “notify table 5 when done” + position in the queue)
The chef (executor) works with tickets, not raw recipes. The ticket tracks everything needed to manage the order.
At the Rust code level
Here’s what a Task looks like in a real executor:
#![allow(unused)]
fn main() {
struct Task {
/// The future this task is driving to completion.
/// Pinned because futures may be self-referential (Lesson 5).
/// Boxed because different tasks hold different future types.
/// Mutex because the executor and waker may access it from different contexts.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// A reference to the executor's task queue.
/// When waker.wake() is called, the task pushes itself back into this queue.
queue: Arc<Mutex<VecDeque<Arc<Task>>>>,
}
}
And spawn creates a task from a future:
#![allow(unused)]
fn main() {
fn spawn(future: impl Future<Output = ()> + Send + 'static) -> Arc<Task> {
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
queue: queue.clone(),
});
queue.lock().unwrap().push_back(task.clone());
task
}
}
The lifecycle of a task
1. spawn(my_future)
│
▼
2. Task created: wraps future + gets a queue reference
│
▼
3. Task pushed to executor's queue
│
▼
4. Executor pops task, builds a Waker for it, calls task.future.poll(cx)
│
├── Poll::Ready → task is done, drop it
│
└── Poll::Pending → task is NOT in the queue
│
▼
5. ... time passes, I/O event or timer fires ...
│
▼
6. waker.wake() → pushes Arc<Task> back into the queue
│
▼
7. Executor pops it again → back to step 4
The key insight: the waker closes over the task. When you call waker.wake(), it pushes Arc<Task> back into the queue. This is the connection between the future world (poll/wake) and the executor world (queue/schedule).
Task vs Future: when people say “task”
In async Rust conversations:
- “Spawn a task” = call
tokio::spawn(future)— wraps the future in a task and schedules it - “The task is blocked” = the task returned
Pendingand is waiting for a wake - “Task-local storage” = per-task data (like thread-local, but per task)
- “Task dump” = list all tasks and what state they’re in (debugging)
Every tokio::spawn() creates one task. Every .await inside that task is a state transition within the same task — NOT a new task.
#![allow(unused)]
fn main() {
tokio::spawn(async { // ← this is ONE task
let a = foo().await; // ← state transition within the task
let b = bar().await; // ← another state transition, same task
a + b
});
tokio::spawn(async { // ← this is a SECOND task
baz().await;
});
}
Real-life analogy: the project manager
A project manager doesn’t do the work — they coordinate. They have a list of tasks and a team:
Project Manager (executor):
┌─────────────────────────────────────┐
│ Task list: │
│ [ ] Design mockups (waiting) │
│ [ ] Write API (waiting) │
│ [→] Review PR (in progress) │
│ [✓] Deploy staging (done) │
└─────────────────────────────────────┘
Loop:
1. Pick the next task that needs attention
2. Ask: "are you done yet?" (poll)
3. If done → mark complete, move on
4. If not → task says "I'll ping you when ready" (waker)
5. If nothing needs attention → take a nap (park)
6. Get pinged (waker.wake()) → wake up, go to step 1
That’s an executor. It’s a loop that polls futures and sleeps when there’s nothing to do.
Two levels of executor
Level 1: block_on (runs one future)
The simplest possible executor. Runs a single future on the current thread:
block_on(future):
┌──────────────────────────────────────┐
│ loop { │
│ poll(future) │
│ if Ready → return result │
│ if Pending → park thread │
│ ... waker fires → unpark ... │
│ } │
└──────────────────────────────────────┘
This is what tokio::runtime::Runtime::block_on() does at its core.
Level 2: Multi-task executor (runs many futures)
Adds a task queue. spawn() adds futures to the queue. The executor polls them round-robin:
Executor:
┌──────────────────────────────────────────────┐
│ │
│ Task Queue: [task_1, task_2, task_3] │
│ │
│ loop { │
│ task = queue.pop() │
│ poll(task) │
│ if Ready → done, don't re-queue │
│ if Pending → waker will re-queue it │
│ if queue empty → park thread │
│ } │
└──────────────────────────────────────────────┘
The key insight: the Waker for each task pushes it back into the queue when wake() is called. The executor only polls tasks that are ready to make progress.
Task 1 returns Pending
→ future stores waker
→ task is NOT in the queue (nothing to do)
→ ... time passes ...
→ I/O event fires → waker.wake()
→ task is pushed back into queue
→ executor pops it, polls it → Ready!
The waker-queue connection
This is the part that makes executors work:
spawn(future):
1. Wrap future in a Task (Arc<Task>)
2. Create a Waker whose wake() pushes Arc<Task> to the queue
3. Push task to queue
poll(task):
1. Pop task from queue
2. Build Context with the task's waker
3. Call future.poll(cx)
4. If Pending → nothing (waker will re-queue when ready)
5. If Ready → done
┌─────────────┐ ┌──────────────────┐
│ Executor │ │ Task │
│ │ │ │
│ queue: ────┤ │ future: ... │
│ [t1,t2,t3] │ │ waker: ────────┐│
│ │ │ ││
└─────────────┘ └─────────────────┘│
▲ │
│ wake() pushes │
└────────── task back to queue ──┘
The DelayFuture: a real timer
Now that you have an executor with a real waker (not noop), you can build a future that actually waits for real time:
#![allow(unused)]
fn main() {
struct DelayFuture {
message: String,
deadline: Instant,
waker_set: bool,
}
}
How it works:
- First poll: spawn a background thread that sleeps until the deadline, then calls
waker.wake() - Return
Pending - Background thread wakes up → calls
waker.wake()→ executor re-polls - Second poll: deadline has passed → return
Ready(message)
Executor DelayFuture Background Thread
│ │ │
├── poll() ───────────► │ │
│ │── spawn thread ──────────►│
│ │ (sleeps 2 seconds) │
│ ◄── Pending ─────────────┤ │
│ │ │
├── park() (sleeping) │ (sleeping)
│ │ │
│ │ ... 2 sec ...│
│ │ │
│ │ ◄── waker.wake() ────────┤
│ ◄── unpark! ─────────────────────────────────────────┤
│ │ │
├── poll() ───────────► │ │
│ ◄── Ready("done!") ──────┤ │
This is how tokio::time::sleep works — except tokio uses a timer wheel instead of spawning a thread per timer.
Exercises
Exercise 1: block_on
Implement block_on<F: Future>(future: F) -> F::Output:
- Build a thread-parking waker (from Lesson 3)
- Loop: poll the future
- If
Ready→ return the value - If
Pending→thread::park()(waker will unpark)
Test with CountdownFuture.
Exercise 2: Multi-task executor
Implement an Executor with:
spawn(future)— wraps the future in anArc<Task>, adds to queuerun()— pops tasks, polls them, sleeps when empty
The tricky part: building a waker whose wake() pushes the Arc<Task> back into a shared queue (Arc<Mutex<VecDeque<Arc<Task>>>>).
Test: spawn 3 CountdownFutures with different counts. Print when each completes. They should interleave.
Exercise 3: DelayFuture (real timer)
Implement DelayFuture:
- First poll: clone the waker, spawn a thread that sleeps then calls
waker.wake() - Return
Pending - Second poll: check if deadline passed →
Ready(message)
Test with block_on:
#![allow(unused)]
fn main() {
block_on(DelayFuture::new(Duration::from_secs(2), "hello from the future!"))
}
This should print after exactly 2 seconds — proving that the executor slept efficiently (not busy-polling).
Exercise 4: Spawn DelayFutures concurrently
Using your multi-task executor, spawn three delays:
#![allow(unused)]
fn main() {
executor.spawn(DelayFuture::new(Duration::from_secs(3), "slow"));
executor.spawn(DelayFuture::new(Duration::from_secs(1), "fast"));
executor.spawn(DelayFuture::new(Duration::from_secs(2), "medium"));
}
They should complete in order: fast (1s), medium (2s), slow (3s) — all finishing within ~3 seconds total (concurrent), not 6 seconds (sequential).
Exercise 5: JoinHandle
Make spawn() return a JoinHandle<T> — a future that resolves to the spawned task’s output.
#![allow(unused)]
fn main() {
let handle = executor.spawn(async { 42 });
let result = block_on(handle);
assert_eq!(result, 42);
}
Implement with Arc<Mutex<Option<T>>> shared between the task and the handle, plus a waker for the handle to be notified when the task completes.
Lesson 6: Pinning
Prerequisites: Lesson 5 (Executor) – you should understand how an executor polls futures before learning why those futures must be pinned.
Real-life analogy: the house with an address
Imagine you buy a house at 123 Elm Street. Your friends, your bank, the post office – everyone has your address written down. Now imagine a magical crane lifts your entire house and drops it on a different lot. Your friends show up at 123 Elm Street and find an empty plot. The mail goes nowhere. Every reference to your location is now wrong.
Pin = a city ordinance that says “this house will not move.” Once pinned, everyone who has your address can trust it forever.
Before Pin: After Pin:
┌──────────────────────┐ ┌──────────────────────┐
│ House @ 123 Elm St │ │ House @ 123 Elm St │
│ (can be moved) │ │ PINNED -- no moving │
│ │ crane │ │
│ Friends: "123 Elm" │──────X │ Friends: "123 Elm" │ -- always valid
│ Bank: "123 Elm" │ │ Bank: "123 Elm" │
└──────────────────────┘ └──────────────────────┘
In Rust terms:
- House = a value in memory
- Address = a pointer/reference to that value
- Moving the house =
mem::swap,mem::replace, or just assignment to a different variable - Pin = a wrapper that prevents you from getting
&mut T(which would let you move the value)
The problem: self-referential structs
A self-referential struct has a field that points to another field inside the same struct. This is the core problem Pin solves.
#![allow(unused)]
fn main() {
struct SelfRef {
data: String,
ptr: *const String, // points to `data` above
}
}
After initialization, ptr points to data’s memory location:
SelfRef (at address 0x1000)
┌─────────────────────────────────┐
│ data: String { "hello" } │ <-- lives at 0x1000
│ ptr: 0x1000 ──────────────┐ │
│ │ │
│ ▼ │
│ (points to data) ───┘ │
└─────────────────────────────────┘
ptr == &data -- correct!
Now move the struct (e.g., by returning it from a function, pushing to a Vec, or mem::swap):
BEFORE MOVE AFTER MOVE
0x1000 (old location) 0x2000 (new location)
┌───────────────────────┐ ┌───────────────────────┐
│ data: "hello" │ │ data: "hello" │ <-- now at 0x2000
│ ptr: 0x1000 ────┐ │ ──move──► │ ptr: 0x1000 ────┐ │
│ │ │ │ │ │
│ ▼ │ │ │ │
│ (self) ────┘ │ │ DANGLING! │ │
└───────────────────────┘ └───────────────────┘ │
│
0x1000 (old location) │
┌─────────────────┐ │
│ (freed/garbage) │◄──┘
└─────────────────┘
ptr still points here!
The data moved to 0x2000, but ptr still says 0x1000. Dangling pointer. Undefined behavior.
Concrete code example
#![allow(unused)]
fn main() {
use std::ptr;
struct SelfRef {
data: String,
ptr: *const String,
}
impl SelfRef {
fn new(text: &str) -> Self {
let mut s = SelfRef {
data: text.to_string(),
ptr: ptr::null(),
};
s.ptr = &s.data as *const String;
s
}
fn data_addr(&self) -> *const String {
&self.data as *const String
}
fn ptr_value(&self) -> *const String {
self.ptr
}
}
let mut a = SelfRef::new("hello");
a.ptr = &a.data; // fix pointer after construction
println!("a.data addr: {:p}", &a.data); // e.g., 0x1000
println!("a.ptr value: {:p}", a.ptr); // 0x1000 -- matches!
let mut b = a; // MOVE a into b
println!("b.data addr: {:p}", &b.data); // e.g., 0x2000 (new location)
println!("b.ptr value: {:p}", b.ptr); // still 0x1000 -- DANGLING!
}
What Pin<&mut T> does
Pin<&mut T> wraps a mutable reference and removes your ability to get &mut T back (for non-Unpin types). Without &mut T, you cannot:
mem::swapthe value with anothermem::replacethe value- Move it by assignment
Normal &mut T: Pin<&mut T>:
┌──────────────────┐ ┌──────────────────┐
│ &mut T │ │ Pin<&mut T> │
│ │ │ │
│ Can do: │ │ Can do: │
│ - read │ │ - read │
│ - write fields │ │ - write fields │
│ - mem::swap !! │ │ │
│ - move out !! │ │ CANNOT: │
│ │ │ - get &mut T │
└──────────────────┘ │ - mem::swap │
│ - move out │
└──────────────────┘
The signature of Future::poll requires Pin<&mut Self>:
#![allow(unused)]
fn main() {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
This is not an accident. The executor must guarantee that once it starts polling a future, it will never move that future again.
The Unpin trait
Unpin is an auto-trait that says “this type is safe to move even when pinned.” It is a promise that the type has no self-references.
Most types: async fn futures:
┌─────────────────┐ ┌─────────────────────────┐
│ i32 │ │ async fn foo() { │
│ String │ │ let x = 1; │
│ Vec<T> │ │ let r = &x; │
│ HashMap<K,V> │ │ bar().await; │
│ ... │ │ use(r); │
│ │ │ } │
│ All Unpin │ │ │
│ Pin::new works │ │ !Unpin (not Unpin) │
└─────────────────┘ │ needs Box::pin or pin! │
└─────────────────────────┘
Unpintypes:Pin<&mut T>freely gives you&mut TviaPin::get_mut(). Pin has no effect.!Unpintypes:Pin<&mut T>does NOT give you&mut T. The value is truly pinned.
You can opt out of Unpin manually with PhantomPinned:
#![allow(unused)]
fn main() {
use std::marker::PhantomPinned;
struct MyStruct {
data: String,
_pin: PhantomPinned, // makes MyStruct: !Unpin
}
}
Pin in practice
Pin::new – only for Unpin types
#![allow(unused)]
fn main() {
let mut x = 42_i32; // i32: Unpin
let pinned = Pin::new(&mut x); // works fine
}
This does NOT work for !Unpin types – the compiler will reject it.
Box::pin – heap-pin any value
#![allow(unused)]
fn main() {
let future = async { 42 };
let pinned: Pin<Box<dyn Future<Output = i32>>> = Box::pin(future);
// future is on the heap and cannot be moved
}
This is what executors use: Pin<Box<dyn Future>>.
pin! macro – stack-pin a value
#![allow(unused)]
fn main() {
use std::pin::pin;
let future = async { 42 };
let mut pinned = pin!(future); // pins to the stack
// pinned: Pin<&mut impl Future<Output = i32>>
}
The macro shadows the binding so you cannot access the original (un-pinned) value.
Why futures need Pin
An async fn compiles to a state machine struct. If the function holds a reference to a local variable across an .await, the struct becomes self-referential:
#![allow(unused)]
fn main() {
async fn process() {
let buffer = vec![1, 2, 3];
let slice = &buffer; // reference to local
network_send(slice).await; // <-- .await here
println!("{:?}", slice); // slice must still be valid
}
}
The compiler generates something like:
#![allow(unused)]
fn main() {
enum ProcessFuture {
// State 0: before the await
State0 {
buffer: Vec<u8>,
slice: &Vec<u8>, // <-- points to buffer above!
},
// State 1: after the await
State1 {
buffer: Vec<u8>,
slice: &Vec<u8>,
},
Done,
}
}
State0 is self-referential: slice points into buffer. If you move ProcessFuture after the first poll(), the slice pointer dangles.
ProcessFuture after first poll (State0):
┌───────────────────────────────────────┐
│ buffer: Vec [1, 2, 3] │ <-- at address 0xA000
│ slice: &buffer ───────────────┐ │
│ │ │
│ ▼ │
│ (points to 0xA000) ─┘ │
└───────────────────────────────────────┘
If moved to new address 0xB000:
┌───────────────────────────────────────┐
│ buffer: Vec [1, 2, 3] │ <-- now at 0xB000
│ slice: &buffer ───────────────┐ │
│ │ │
│ X │ slice still says 0xA000
│ DANGLING! │ │ but buffer is at 0xB000
└───────────────────────────────────────┘
This is why Future::poll takes Pin<&mut Self>: the executor pins the future before the first poll and never moves it again.
The lifecycle
1. Create future: let f = process(); // future exists, not yet polled
2. Pin it: let f = Box::pin(f); // pinned on heap, cannot move
3. First poll: f.as_mut().poll(cx); // enters State0, creates self-ref
4. Returns Pending: slice points to buffer // self-ref is valid
5. ... (future stays at same address)
6. Second poll: f.as_mut().poll(cx); // slice still valid!
7. Returns Ready: done
Without Pin, step 5 could be “move the future into a different Vec slot” and step 6 would be undefined behavior.
Summary
| Concept | What it means |
|---|---|
| Self-referential struct | A struct with a pointer to its own field |
| Move | Copies bytes to new location, old location invalid |
| Dangling pointer | Pointer to old location after a move |
Pin<&mut T> | Wrapper that prevents getting &mut T (for !Unpin) |
Unpin | Auto-trait: “safe to move when pinned” (most types) |
!Unpin | Not safe to move when pinned (async fn futures) |
PhantomPinned | Opt out of Unpin manually |
Pin::new | Pin an Unpin type (no-op, just wraps) |
Box::pin | Pin any type on the heap |
pin! | Pin any type on the stack |
Exercises
Exercise 1: Demonstrate the dangling pointer
Create a SelfRef struct with a String and a *const String raw pointer. Initialize the pointer to point at the data field. Move the struct. Print the addresses before and after – show that the pointer no longer matches the data’s actual address.
Exercise 2: Pin prevents the move
Use Pin and PhantomPinned to create a !Unpin struct. Try to move it after pinning. Observe the compiler error. Then show that you can safely read through the pointer because the value hasn’t moved.
Exercise 3: Unpin types are unaffected
Show that Pin::new works on Unpin types (like String, i32). Demonstrate that you can still get &mut T from Pin<&mut T> when T: Unpin, so Pin is effectively a no-op for these types.
Exercise 4: Why async futures need Pin
Write an async function that holds a reference across an .await. Box::pin it, poll it manually with a waker, and show it works. Then explain (in a comment) why moving the future between polls would break the internal reference.
Lesson 7: Combinators — join and select
Real-life analogy: cooking dinner
Join = make salad AND soup, serve when BOTH are done
You’re cooking dinner. You put soup on the stove and start chopping salad. You check the soup, chop some more, check the soup again. You keep switching between the two tasks. When both are done, you serve dinner.
You (the executor):
"Is the soup done?" → No, keep simmering (Poll::Pending)
"Is the salad done?" → No, still chopping (Poll::Pending)
"Is the soup done?" → No, keep simmering (Poll::Pending)
"Is the salad done?" → Yes! Set it aside. (Poll::Ready)
"Is the soup done?" → No, keep simmering (Poll::Pending)
"Is the soup done?" → Yes! Take it off heat. (Poll::Ready)
Both done → serve dinner! → return (soup, salad)
Join waits for all futures to complete. It returns a tuple of all results.
Select = order a taxi AND a bus ticket, take whichever arrives first
You need to get across town. You order a taxi on your phone and walk to the bus stop. Whichever arrives first, you take it — and cancel the other.
You (the executor):
"Has the taxi arrived?" → No (Poll::Pending)
"Has the bus arrived?" → No (Poll::Pending)
"Has the taxi arrived?" → No (Poll::Pending)
"Has the bus arrived?" → Yes! Get on the bus. (Poll::Ready)
Bus won → cancel the taxi! → DROP the taxi future
return bus
Select waits for the first future to complete. It returns that result and drops the loser.
How join works internally
join!(a, b) creates a single future that, each time it is polled, polls both a and b. It tracks which ones are done with flags. When all are Ready, it returns the collected results.
join!(soup, salad) — one combined future
─────────────────────────────────────────────────────────────────
poll #1:
┌─────────────────────────────────────────────────────┐
│ poll soup → Pending (soup_done = false) │
│ poll salad → Pending (salad_done = false) │
│ → return Pending │
└─────────────────────────────────────────────────────┘
poll #2:
┌─────────────────────────────────────────────────────┐
│ poll soup → Pending (soup_done = false) │
│ poll salad → Ready(S) (salad_done = true) │
│ → return Pending (soup not done yet) │
└─────────────────────────────────────────────────────┘
poll #3:
┌─────────────────────────────────────────────────────┐
│ poll soup → Ready(S) (soup_done = true) │
│ skip salad (already done) │
│ → return Ready((soup, salad)) ALL DONE! │
└─────────────────────────────────────────────────────┘
Key insight: join is one future that polls multiple sub-futures. There is no parallelism — it is concurrent on a single thread. Each call to poll on the join future polls each child that is not yet done.
Pseudocode
#![allow(unused)]
fn main() {
struct MyJoin<A, B> {
a: A,
b: B,
a_result: Option<A::Output>,
b_result: Option<B::Output>,
}
fn poll(self, cx) -> Poll<(A::Output, B::Output)> {
if self.a_result.is_none() {
if let Ready(val) = self.a.poll(cx) {
self.a_result = Some(val);
}
}
if self.b_result.is_none() {
if let Ready(val) = self.b.poll(cx) {
self.b_result = Some(val);
}
}
if self.a_result.is_some() && self.b_result.is_some() {
Ready((self.a_result.take().unwrap(), self.b_result.take().unwrap()))
} else {
Pending
}
}
}
How select works internally
select!(a, b) creates a single future that polls both a and b each time. As soon as one returns Ready, it returns that value and drops the other future.
select!(taxi, bus) — one combined future
─────────────────────────────────────────────────────────────────
poll #1:
┌─────────────────────────────────────────────────────┐
│ poll taxi → Pending │
│ poll bus → Pending │
│ → return Pending │
└─────────────────────────────────────────────────────┘
poll #2:
┌─────────────────────────────────────────────────────┐
│ poll taxi → Pending │
│ poll bus → Ready(bus_result) │
│ → return Ready(Right(bus_result)) │
└─────────────────────────────────────────────────────┘
After returning:
┌─────────────────────────────────────────────────────┐
│ MySelect is dropped │
│ → taxi future is DROPPED ← CANCELLATION! │
│ → taxi's Drop impl runs │
│ → any resources taxi held are freed │
└─────────────────────────────────────────────────────┘
The drop / cancellation problem (preview of Lesson 24)
When select drops the losing future, that future is cancelled mid-execution. Whatever state it was in — gone. This has real consequences:
- If the future had written half a message to a buffer — that buffer is now incomplete
- If the future had acquired a lock — the lock guard is dropped (unlocked), but any partial work under the lock is lost
- If the future was a network request — the request is abandoned; the server may still process it
Cancelled future's lifecycle:
poll #1: started work, allocated buffer ┐
poll #2: filled half the buffer │ all of this
poll #3: about to finish... │ state is LOST
↓ │
DROP: future is destroyed, buffer freed ┘
This is cancellation safety — a topic we will cover in depth in Lesson 24. For now, remember: select drops the loser, and the loser may have done partial work.
Rule of thumb: a future is cancellation-safe if dropping it at any await point does not lose data or leave things in an inconsistent state.
FuturesUnordered (brief mention)
What if you have not 2 but 100 futures, and you want to process results as they complete? FuturesUnordered from the futures crate is a collection that polls all contained futures and yields results in completion order.
#![allow(unused)]
fn main() {
use futures::stream::FuturesUnordered;
use futures::StreamExt;
let mut futs = FuturesUnordered::new();
futs.push(fetch("url1"));
futs.push(fetch("url2"));
futs.push(fetch("url3"));
while let Some(result) = futs.next().await {
println!("Got: {result:?}");
}
}
Think of it as a dynamic select over many futures. We will use it in later lessons.
Exercises
Exercise 1: MyJoin
Implement MyJoin<A, B> — a future that polls two sub-futures and returns (A::Output, B::Output) when both are done. Implement the Future trait by hand.
Hints:
- Store each sub-future and an
Optionfor each result - On each poll, poll any sub-future whose result is still
None - Return
Readyonly when bothOptions areSome - Remember to call
cx.waker().wake_by_ref()when returningPending
Exercise 2: MySelect
Implement MySelect<A, B> — a future that polls two sub-futures and returns whichever completes first. Use an enum Either<L, R> for the return type.
Make both sub-futures NamedFuture structs that print a message when dropped. Run the demo and observe the loser printing its drop message.
Exercise 3: join_all for a Vec
Implement a JoinAll<F> future that takes a Vec<F> and returns Vec<F::Output>. This generalizes MyJoin from 2 futures to N futures.
Hints:
- Store
Vec<Option<F>>for the futures andVec<Option<F::Output>>for results - On each poll, iterate and poll any future that is still
Some - When a future completes, store its result and replace the future with
None - Return
Readywhen all results areSome
Exercise 4: map combinator
Implement a Map<F, Func> future that wraps a future F and applies a function Func to its output when ready. This lets you write:
#![allow(unused)]
fn main() {
let doubled = Map::new(CountdownFuture { count: 3 }, |()| 42);
// polls the countdown, then applies the function → returns 42
}
Hints:
- Store the inner future and an
Option<Func>(take the function out on Ready) - On poll: poll the inner future. If Ready, apply the function and return the mapped result. If Pending, return Pending.
Lesson 8: Async I/O Foundations
Real-life analogy: fishing with multiple rods
Imagine you have 10 fishing rods and a lake full of fish.
Blocking I/O = hold one rod at a time, stare at it:
You: [hold rod 1........waiting........waiting........FISH!]
[hold rod 2........waiting........waiting........FISH!]
[hold rod 3........waiting............................]
Total: you can only fish one rod at a time.
9 rods sit idle while you stare at one.
Non-blocking I/O = plant all 10 rods, walk between them checking each:
You: [check rod 1: nothing]
[check rod 2: nothing]
[check rod 3: FISH! → reel it in]
[check rod 4: nothing]
[check rod 5: nothing]
[check rod 1: nothing] ← loop back
[check rod 2: FISH! → reel it in]
...
Total: you catch fish faster, but you burn energy walking
back and forth even when nothing is biting (busy-wait).
Event-driven I/O = attach a bell to each rod, sit and wait for a bell:
You: [sitting..............................RING! rod 3]
[reel in rod 3]
[sitting..........RING! rod 7, rod 1]
[reel in rod 7, reel in rod 1]
...
Total: zero wasted effort. You only act when something
is actually ready. This is kqueue / epoll.
The bell system is exactly how modern async I/O works:
- Plant rods (open sockets, register with kqueue/epoll)
- Sit and wait (call
kqueue_wait/epoll_wait) - Bell rings (OS says “fd 7 is readable”)
- Reel in (read the data)
How the OS tells you a socket is ready
When your program calls read() on a TCP socket, the kernel checks if any
data has arrived in the socket’s receive buffer. If not, blocking read()
puts your thread to sleep. In async, you cannot afford to sleep — you need
the OS to notify you instead.
kqueue (macOS) / epoll (Linux)
These are kernel APIs for event notification:
- Create an event queue:
kqueue()orepoll_create() - Register interest: “tell me when fd 5 is readable”
- Wait: block until ANY registered fd has an event
- Process: handle the ready fds
- Loop back to step 3
// Pseudocode (kqueue)
int kq = kqueue();
register(kq, socket_fd, EVFILT_READ); // "notify me when readable"
loop {
int n = kevent(kq, NULL, 0, events, MAX, NULL); // wait
for (int i = 0; i < n; i++) {
int ready_fd = events[i].ident;
// ready_fd has data — read without blocking
}
}
One kevent() call can watch thousands of file descriptors simultaneously.
This is why nginx and tokio can handle 100K connections on a single thread.
The three syscall patterns
Pattern 1: Blocking read
Thread Kernel
│ │
│── read(fd) ───>│
│ (blocked) │ ...waiting for data...
│ (blocked) │ ...still waiting...
│<── data ───────│
│ │
Thread is frozen. Cannot do anything else. One thread per connection.
Pattern 2: Non-blocking read (poll loop)
Thread Kernel
│ │
│── read(fd) ───────>│
│<── WouldBlock ─────│ (no data yet)
│ │
│── read(fd) ───────>│
│<── WouldBlock ─────│ (still no data)
│ │
│── read(fd) ───────>│
│<── 42 bytes ───────│ (data arrived!)
│ │
Thread is not frozen, but it wastes CPU spinning in a loop. This is the “walking between fishing rods” approach.
Pattern 3: Event notification (kqueue/epoll)
Thread Kernel (kqueue)
│ │
│── register(fd) ───>│ "watch fd for readability"
│<── ok ─────────────│
│ │
│── wait() ─────────>│
│ (sleeping) │ ...kernel watches all fds...
│ (sleeping) │ ...data arrives on fd...
│<── [fd ready] ─────│ "fd has data"
│ │
│── read(fd) ───────>│
│<── 42 bytes ───────│ (guaranteed not to block)
│ │
Thread sleeps efficiently (no CPU usage). Kernel wakes it only when something is ready. One thread watches thousands of fds.
Timeline comparison
Time ──────────────────────────────────────────────────>
Blocking (3 sockets, 3 threads):
Thread 1: ████████████████░░░░░░░░░░░ (blocked on fd 1)
Thread 2: ░░░░░░░░████████████████░░░ (blocked on fd 2)
Thread 3: ░░░░░░░░░░░░░░░░████████░░░ (blocked on fd 3)
Cost: 3 threads, 24 MB stack memory
Non-blocking (3 sockets, 1 thread):
Thread 1: ○○○○○●○○○○○●○○●○○○○○○○○○○○ (● = data, ○ = WouldBlock)
Cost: 1 thread, but 100% CPU usage spinning
Event-driven (3 sockets, 1 thread):
Thread 1: ___________●____●__●________ (● = event, _ = sleeping)
Cost: 1 thread, near-zero CPU when idle
Non-blocking sockets in Rust
Standard library sockets are blocking by default. You flip them to non-blocking mode with one call:
#![allow(unused)]
fn main() {
use std::net::TcpStream;
use std::io::{self, Read};
let stream = TcpStream::connect("127.0.0.1:8080")?;
stream.set_nonblocking(true)?; // ← the magic switch
let mut buf = [0u8; 1024];
match stream.read(&mut buf) {
Ok(n) => println!("got {n} bytes"),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
println!("no data yet — try again later");
}
Err(e) => eprintln!("real error: {e}"),
}
}
WouldBlock is not a failure — it means “nothing to read right now.”
The key insight: in non-blocking mode, you get to choose when to retry
instead of having the OS freeze your thread.
kqueue/epoll explained
The event loop pattern is the same on every OS, just different syscalls:
┌─────────────────────────────────────────────────────┐
│ Event Loop │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Register fd │────>│ kqueue / │ │
│ │ + interest │ │ epoll │ │
│ └──────────────┘ │ (kernel) │ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ wait() │ │
│ │ (sleeps) │ │
│ └──────┬───────┘ │
│ │ wakes up │
│ ┌──────▼───────┐ │
│ │ ready fds: │ │
│ │ [3, 7, 12] │ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ process each │─── loop ──┐ │
│ │ ready fd │ │ │
│ └──────────────┘ │ │
│ ▲ │ │
│ └────────────────────┘ │
└─────────────────────────────────────────────────────┘
Steps:
- Create:
kqueue()returns a file descriptor for the event queue itself - Register:
kevent(kq, &changes, ...)— add fds you care about - Wait:
kevent(kq, NULL, 0, &events, max, timeout)— blocks until ready - Process: iterate over returned events, read/write the ready fds
- Repeat: go back to step 3
The mio crate
mio (Metal I/O) is a thin, cross-platform wrapper around kqueue/epoll/IOCP.
Tokio is built on top of mio. The core types:
| Type | Purpose |
|---|---|
Poll | Owns the kqueue/epoll fd. You call poll.poll() to wait. |
Events | Buffer that poll() fills with ready events. |
Token(usize) | Your label for each fd. When an event fires, you get the token back. |
Interest | What you care about: READABLE, WRITABLE, or both. |
Registry | Obtained from poll.registry(). Used to register/deregister fds. |
#![allow(unused)]
fn main() {
use mio::{Poll, Events, Token, Interest};
use mio::net::TcpListener;
let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);
let addr = "127.0.0.1:9000".parse()?;
let mut listener = TcpListener::bind(addr)?;
// Register: "tell me when listener has a new connection"
poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
loop {
// Wait: sleep until something is ready
poll.poll(&mut events, None)?;
for event in events.iter() {
match event.token() {
Token(0) => {
// Listener is readable → accept new connection
let (mut conn, addr) = listener.accept()?;
println!("new connection from {addr}");
// Register the new connection too
poll.registry().register(
&mut conn,
Token(1),
Interest::READABLE,
)?;
}
Token(1) => {
// Connection is readable → read data
}
_ => unreachable!(),
}
}
}
}
How this connects to the async executor
The bridge from OS events to futures looks like this:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Kernel │ │ Reactor │ │ Executor │
│ (kqueue/ │ │ (mio Poll │ │ (task queue │
│ epoll) │ │ loop) │ │ + polling) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ fd 7 readable │ │
│────────────────────>│ │
│ │ waker.wake() │
│ │ for task on fd 7 │
│ │────────────────────>│
│ │ │ poll task's
│ │ │ future
│ │ │──┐
│ │ │ │ Future::poll()
│ │ │<─┘ → Ready(data)
- Future calls
read()on a non-blocking socket → getsWouldBlock - Future registers the fd with the reactor and stores the
Waker - Future returns
Poll::Pending - Reactor’s mio poll loop eventually gets an event for that fd
- Reactor calls
waker.wake()for the associated task - Executor re-polls the future
- This time,
read()succeeds → future returnsPoll::Ready(data)
This is the complete chain. Lesson 9 builds the reactor. This lesson gives you the foundation: raw I/O primitives that the reactor wraps.
Exercises
Exercise 1: Raw non-blocking socket
Create a TCP listener. Accept a connection with set_nonblocking(true).
Try to read in a loop — print each WouldBlock and sleep 100ms between
retries. When data arrives, print it and exit. Use only std::net, no mio.
Exercise 2: kqueue/mio event loop
Replace the busy-wait loop from Exercise 1 with mio::Poll. Register the
accepted connection for READABLE interest. Call poll.poll() to sleep
until data arrives. Read and print. Compare CPU usage with Exercise 1.
Exercise 3: mio TCP echo server
Build a multi-client echo server using mio. The listener gets Token(0).
Each accepted connection gets Token(next_id). Store connections in a
HashMap<Token, TcpStream>. On a readable event, read data and write it
back. Handle client disconnections by deregistering and removing from the map.
Exercise 4: Connect reactor to waker
Extend Exercise 2: instead of reading directly in the event handler, store
a Waker when you register the fd. When the event fires, call
waker.wake() instead of reading. In a separate “executor” loop, poll
the future which then does the actual read. This is the reactor pattern
that Lesson 9 will build in full.
Project 1: Mini Executor + TCP Echo Server
Prerequisites: Lessons 1-8 (futures, state machines, wakers, tasks, executor, pinning, combinators, async I/O, reactor). This project ties them all together into a working system.
Overview
Build a single-threaded async runtime from scratch that runs a real TCP echo server. No tokio. No external async runtime. You write every layer: executor, reactor, async TCP types. Then you run real network I/O on it.
This is the moment everything clicks. You’ve built futures, wakers, an executor, and a reactor in isolation. Now you combine them into one program that handles multiple concurrent TCP clients on a single thread.
Architecture
Your Runtime
┌──────────────────────────────────────────────────────┐
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Executor │ │
│ │ │ │
│ │ task queue: [task_1, task_2, task_3, ...] │ │
│ │ │ │
│ │ loop { │ │
│ │ drain queue → poll each task │ │
│ │ if queue empty → ask reactor to wait │ │
│ │ } │ │
│ └──────────────────────┬─────────────────────────┘ │
│ │ │
│ wake(task) │
│ │ │
│ ┌──────────────────────┴─────────────────────────┐ │
│ │ Reactor │ │
│ │ │ │
│ │ mio::Poll ←── kqueue / epoll │ │
│ │ wakers: HashMap<Token, Waker> │ │
│ │ │ │
│ │ poll() → event on Token(3) │ │
│ │ → look up waker for Token(3) │ │
│ │ → waker.wake() → task back in queue │ │
│ └────────┬──────────┬───────────────────────────┘ │
│ │ │ │
│ ┌─────┴───┐ ┌────┴────┐ │
│ │Listener │ │ Stream │ ← async wrappers │
│ │Token(0) │ │Token(1) │ around std sockets │
│ └────┬────┘ └────┬────┘ set to non-blocking │
│ │ │ │
└──────────┼───────────┼───────────────────────────────┘
│ │
┌─────┴─────┐ ┌───┴───┐
│ Client A │ │Client B│ multiple concurrent
│ nc ..8080 │ │nc .. │ connections on one
└───────────┘ └────────┘ thread
Data flow for one echo
1. Client sends "hello\n" over TCP
2. kqueue/epoll fires → reactor sees Token(3) is READABLE
3. Reactor looks up waker for Token(3) → waker.wake()
4. Executor re-polls the task that owns Token(3)
5. Task calls stream.read() → gets "hello" (non-blocking, data is ready)
6. Task calls stream.write("hello") → registers WRITABLE if needed
7. Data goes back to the client
What you implement
| Component | Description |
|---|---|
block_on | Runs the top-level future to completion on the current thread |
spawn | Wraps a future in a Task, pushes it to the executor queue |
Reactor | Owns mio::Poll, maps Tokens to Wakers, dispatches events |
TcpListener | Async wrapper around mio::net::TcpListener, returns TcpStream on accept |
TcpStream | Async read/write around mio::net::TcpStream with reactor registration |
| Event loop | The main loop: drain tasks, poll reactor, repeat |
The goal
When you’re done, this code runs:
#![allow(unused)]
fn main() {
my_runtime::block_on(async {
let listener = my_runtime::TcpListener::bind("127.0.0.1:8080").await;
loop {
let stream = listener.accept().await;
my_runtime::spawn(handle_client(stream));
}
});
async fn handle_client(mut stream: my_runtime::TcpStream) {
let mut buf = [0u8; 1024];
loop {
let n = stream.read(&mut buf).await;
if n == 0 { return; } // client disconnected
stream.write_all(&buf[..n]).await;
}
}
}
Multiple clients connect at the same time. Each gets its own task. All tasks run concurrently on a single thread.
Step-by-step implementation guide
Step 1: Start with block_on (from Lesson 5)
Copy your block_on from Lesson 5. It should:
- Pin the future
- Create a waker that calls
thread::unpark() - Loop: poll → Ready? return. Pending?
thread::park()
At this point you can run a single future to completion.
Step 2: Add the Reactor (from Lesson 8)
Create a global Reactor (use thread_local! or a static with OnceLock):
#![allow(unused)]
fn main() {
struct Reactor {
poll: mio::Poll,
wakers: HashMap<Token, Waker>,
next_token: usize,
}
}
It needs three methods:
register(source, interest) -> Token– register a socket, return a tokenset_waker(token, waker)– store the waker for a tokenwait()– callpoll.poll(), then for each event, call the stored waker
Step 3: Implement TcpListener
Wrap mio::net::TcpListener:
#![allow(unused)]
fn main() {
struct TcpListener {
inner: mio::net::TcpListener,
token: Token,
}
}
bind() creates the listener, registers it with the reactor for READABLE.
accept() returns a future that:
- Tries
inner.accept()– if it returns a connection, wrap it in TcpStream and returnReady - If
WouldBlock– store the waker with the reactor, returnPending
Step 4: Implement TcpStream
Wrap mio::net::TcpStream:
#![allow(unused)]
fn main() {
struct TcpStream {
inner: mio::net::TcpStream,
token: Token,
}
}
read() and write_all() are futures:
- Try the operation – if it succeeds, return
Ready - If
WouldBlock– register waker, returnPending
Step 5: Add spawn + task queue
Add a shared task queue (just like Lesson 5’s multi-task executor):
#![allow(unused)]
fn main() {
static TASK_QUEUE: Mutex<VecDeque<Arc<Task>>> = ...;
}
spawn(future) wraps the future in a Task, pushes it to the queue.
Update block_on to also drain the task queue on each iteration:
- Poll the main future
- Drain and poll all queued tasks
- If nothing is ready, call
reactor.wait()(which blocks until an event) - Repeat
Step 6: Wire reactor events to wakers
This is where it all connects. When reactor.wait() fires:
- It gets events from
mio::Poll - For each event, it finds the stored waker and calls
waker.wake() wake()pushes the task back into the queue- The executor loop picks it up and polls it
Test the full loop:
- Start the server
- Connect with
nc - Type a message
- See it echoed back
- Connect a second client – both work concurrently
Testing
Manual testing with netcat
Terminal 1 – start the server:
cargo run -p async-lessons --bin p1-echo-server -- run
Terminal 2 – connect a client:
nc 127.0.0.1 8080
hello # type this
hello # server echoes it back
Terminal 3 – connect another client simultaneously:
echo "world" | nc 127.0.0.1 8080
world # echoed back
Automated self-test
The test subcommand spawns the server, connects a client, sends data, and
checks the echo:
cargo run -p async-lessons --bin p1-echo-server -- test
Exercises
Exercise 1: Basic echo server
Implement all the components and get the goal code running. One client at a
time is fine for this exercise. Confirm with nc.
Success criteria: type a line into nc, see it echoed back.
Exercise 2: Multiple concurrent clients
Make spawn work so multiple clients are served concurrently. Open 3-4
nc sessions and verify they all echo independently without blocking each
other.
Success criteria: send messages from multiple nc sessions interleaved
– each gets its own echo back immediately.
Exercise 3: Add a timeout
Add an AsyncTimer future (backed by a background thread or the reactor’s
poll timeout). Disconnect clients that send nothing for 10 seconds:
#![allow(unused)]
fn main() {
async fn handle_client(mut stream: my_runtime::TcpStream) {
let mut buf = [0u8; 1024];
loop {
match my_runtime::timeout(Duration::from_secs(10), stream.read(&mut buf)).await {
Ok(0) => return,
Ok(n) => stream.write_all(&buf[..n]).await,
Err(_timeout) => {
eprintln!("client timed out");
return;
}
}
}
}
}
Success criteria: connect with nc, wait 10 seconds without typing,
connection drops.
Lesson 9: Event Loop + Reactor
Prerequisites: Lesson 5 (Executor), Lesson 8 (Async I/O). You need to understand both the executor’s poll loop and how kqueue/epoll works before combining them.
Real-life analogy: the hotel front desk
A hotel front desk manages hundreds of rooms:
┌─────────────────────────────────────────────────────┐
│ Front Desk (Reactor) │
│ │
│ Room Registry: │
│ Room 101 → Guest A's wake-up call at 7am │
│ Room 205 → Guest B wants fresh towels │
│ Room 312 → Guest C ordered room service │
│ │
│ Loop: │
│ 1. Wait for any event (phone rings, bell rings) │
│ 2. Look up which room it's for │
│ 3. Notify the right guest │
│ 4. Back to waiting │
│ │
│ The desk doesn't DO the work (cooking, cleaning). │
│ It just routes notifications to the right person. │
└─────────────────────────────────────────────────────┘
The reactor works the same way:
- Rooms = file descriptors (sockets)
- Guests = tasks (futures waiting for I/O)
- Registry = HashMap<Token, Waker>
- Phone ringing = kqueue/epoll event
- Notifying the guest =
waker.wake()
The reactor doesn’t read or write data. It just tells futures “your socket is ready — try now.”
What is a Reactor?
The reactor is the component that connects the OS event system to your async runtime:
┌───────────────────────────────────────────────────────────┐
│ Reactor │
│ │
│ ┌──────────┐ ┌─────────────────────────┐ │
│ │ mio::Poll│ │ wakers: HashMap │ │
│ │ │ │ Token(0) → Waker_A │ │
│ │ wraps │ │ Token(1) → Waker_B │ │
│ │ kqueue / │ │ Token(2) → Waker_C │ │
│ │ epoll │ │ │ │
│ └─────┬────┘ └──────────┬──────────────┘ │
│ │ │ │
│ │ poll() returns │ look up waker │
│ │ [Token(1), Token(2)] │ for each token │
│ │ │ │
│ └────────────┬────────────┘ │
│ │ │
│ ▼ │
│ waker_b.wake() │
│ waker_c.wake() │
│ → tasks re-queued in executor │
│ │
└───────────────────────────────────────────────────────────┘
The Reactor struct
#![allow(unused)]
fn main() {
struct Reactor {
/// The OS event system (wraps kqueue on macOS, epoll on Linux)
poll: mio::Poll,
/// Maps tokens to wakers. When an event fires for Token(N),
/// we look up the waker here and call wake().
wakers: HashMap<mio::Token, Waker>,
/// Counter for assigning unique tokens to new sockets
next_token: usize,
}
}
Three core methods:
register — “watch this socket”
#![allow(unused)]
fn main() {
fn register(&mut self, source: &mut impl Source, interest: Interest) -> mio::Token {
let token = mio::Token(self.next_token);
self.next_token += 1;
self.poll.registry().register(source, token, interest).unwrap();
token
}
}
Called when a future first needs to wait for I/O. The future says “tell me when socket X is readable.”
set_waker — “here’s how to notify me”
#![allow(unused)]
fn main() {
fn set_waker(&mut self, token: mio::Token, waker: Waker) {
self.wakers.insert(token, waker);
}
}
Called during poll() when a future returns Pending. The future stores its waker so the reactor can notify it later.
wait — “block until something happens”
#![allow(unused)]
fn main() {
fn wait(&mut self) {
let mut events = mio::Events::with_capacity(64);
self.poll.poll(&mut events, None).unwrap(); // blocks!
for event in events.iter() {
if let Some(waker) = self.wakers.get(&event.token()) {
waker.wake_by_ref(); // re-queue the task
}
}
}
}
This is where the thread sleeps. poll() blocks until the OS says a socket is ready. Then we wake the corresponding tasks.
How Reactor + Executor work together
Executor Reactor OS
│ │ │
├── poll task_A ──► │ │
│ task_A tries read() │ │
│ → WouldBlock │ │
│ task_A calls: │ │
│ reactor.set_waker(tok, wk) │ │
│ ◄── Pending ────────────────│ │
│ │ │
├── queue empty │ │
├── reactor.wait() ────────────►│ │
│ ├── poll.poll() ────────────►│
│ │ (thread sleeps) │
│ │ data arrives!
│ │ ◄── Token(0) ready ─────┤
│ │ │
│ ├── waker.wake() ──► │
│ ◄── task_A re-queued ────────┤ │
│ │ │
├── poll task_A ──► │ │
│ task_A tries read() │ │
│ → got data! │ │
│ ◄── Ready(data) ───────────│ │
The key insight: the executor never busy-polls. When there’s nothing to do, it calls reactor.wait() which blocks until the OS has an event. Zero CPU usage while waiting.
Where does the Reactor live?
The reactor needs to be accessible from:
- Futures — to call
register()andset_waker()duringpoll() - The executor — to call
wait()when the queue is empty
Common patterns:
Option A: Thread-local (single-threaded runtime)
thread_local! { static REACTOR: RefCell<Reactor> = ... }
Option B: Arc<Mutex<Reactor>> (shared, but lock contention)
Option C: Global static with OnceLock (initialized once)
static REACTOR: OnceLock<Mutex<Reactor>> = OnceLock::new();
Tokio uses a more sophisticated approach — the reactor runs on a dedicated thread and communicates via channels. For our mini-runtime, thread-local is simplest.
Readiness vs Completion
The reactor uses the readiness model:
- “Socket is readable” = you CAN try
read()and it probably won’t block - It does NOT mean “data has been read into your buffer”
- You still need to call
read()yourself — it might still returnWouldBlock
This is different from the completion model (used by io_uring on Linux):
- “Read is complete” = data is already in your buffer
- No need to call
read()again
Readiness (kqueue/epoll/mio):
1. Register: "tell me when fd is readable"
2. Event fires: "fd is readable"
3. You call read(fd) → get data (usually)
Completion (io_uring):
1. Submit: "read fd into this buffer"
2. Event fires: "read is done, data is in your buffer"
3. Just use the buffer
Tokio uses readiness (mio). This matters because your futures must handle WouldBlock even after being woken — the event was a hint, not a guarantee.
Exercises
Exercise 1: Single-socket reactor
Build a minimal Reactor that watches one TcpListener:
- Create
mio::Pollandmio::Events - Create a
mio::net::TcpListener, register it forREADABLE - Loop:
poll.poll(), accept connections, print their address - No executor, no wakers — just the raw event loop
Test: run the program, connect with nc 127.0.0.1 8080 from another terminal.
Exercise 2: Multi-socket reactor
Extend Exercise 1 to handle multiple connections:
- Accept connections, register each for
READABLE - Map each
Tokento aTcpStream(use aHashMap<Token, TcpStream>) - When a stream is readable, read data and echo it back
- Handle disconnection (read returns 0)
Test: connect 3 clients, type in each — all should echo independently.
Exercise 3: Waker integration
Wire the reactor into a waker:
- Create a reactor with a
HashMap<Token, Waker> - Create a
ReadableFuturethat:- First poll: registers the socket with the reactor, stores waker, returns
Pending - Later polls: tries
read()— if data, returnsReady(data), ifWouldBlock, returnsPending
- First poll: registers the socket with the reactor, stores waker, returns
- Use
block_onfrom Lesson 5 to run the future - The reactor’s
wait()should be called when the executor has nothing to do
This is the bridge between Course 1 (individual components) and Course 2 (integrated runtime).
Exercise 4: Deregistration and cleanup
Add proper cleanup:
- When a connection closes, deregister its token from
mio::Poll - Remove the waker from the HashMap
- Verify no leaked entries — print the waker map size periodically
- Handle the case where a waker fires for a token that was already deregistered (it should be a no-op)
Lesson 10: Task Scheduling
Prerequisites: Lesson 4 (Tasks), Lesson 5 (Executor), Lesson 9 (Reactor). You need to understand what a task is and how the reactor wakes them.
Real-life analogy: the hospital ER
An emergency room triages patients:
┌──────────────────────────────────────────────────────┐
│ ER Waiting Room (Run Queue) │
│ │
│ [Patient A: broken arm] ← arrived first │
│ [Patient B: headache] ← arrived second │
│ [Patient C: chest pain] ← arrived third │
│ │
│ Triage Nurse (Scheduler): │
│ FIFO: treat A first (arrived first) │
│ Priority: treat C first (most urgent) │
│ Fair: give each patient 5 minutes, rotate │
│ │
│ Doctor (Executor): │
│ Takes next patient from queue │
│ Examines them (poll) │
│ If done → discharge (Ready) │
│ If not → send back to waiting room (Pending) │
│ If needs X-ray → nurse will call them (waker) │
└──────────────────────────────────────────────────────┘
Different scheduling strategies:
- FIFO: first in, first out. Simple, fair-ish, what we build first.
- Priority: urgent tasks first. Used for I/O vs timers.
- Round-robin: each task gets a time slice. Prevents starvation.
- Work-stealing: multiple doctors, idle ones steal from busy queues. Lesson 14.
What is a scheduler?
The scheduler decides which task to poll next. In Lesson 5, we had a simple loop:
#![allow(unused)]
fn main() {
// Lesson 5: naive approach
while let Some(task) = queue.pop_front() {
poll(task); // poll whatever's next in the queue
}
}
This works but has problems:
- A task that always returns
Pendingand immediately wakes itself monopolizes the CPU - No way to prioritize I/O-ready tasks over timer tasks
- No fairness guarantee
The run queue
The run queue is where tasks wait to be polled. When waker.wake() is called, the task is pushed to the back of the queue:
Run Queue (VecDeque<Arc<Task>>)
┌───────────────────────────────┐
push back → │ task_C │ task_A │ task_B │ │ ← pop front
└───────────────────────────────┘
Executor loop:
1. Pop task_B from front
2. Poll it
3. If Pending → waker will push it back later
4. If Ready → done, drop the task
5. Pop next (task_A)
6. ...
Fairness
A task that wakes itself immediately goes to the back of the queue. This gives other tasks a chance to run:
Queue: [A, B, C]
Poll A → Pending, wakes self → queue: [B, C, A]
Poll B → Pending, wakes self → queue: [C, A, B]
Poll C → Ready → queue: [A, B]
Poll A → Ready → queue: [B]
Poll B → Ready → queue: []
Each task gets one turn per cycle. This is cooperative multitasking — tasks must yield (return Pending) to let others run. A task that never yields starves everyone.
The ArcWake pattern
In Lesson 3, we built wakers manually with RawWaker. There’s a cleaner way — implement the Wake trait:
#![allow(unused)]
fn main() {
use std::task::Wake;
impl Wake for Task {
fn wake(self: Arc<Self>) {
// Push ourselves back into the queue
self.queue.lock().unwrap().push_back(self.clone());
}
}
}
Then create a waker from an Arc
#![allow(unused)]
fn main() {
let waker = Waker::from(task.clone()); // calls task.wake() when woken
}
No unsafe code. The Wake trait handles the vtable for you.
Lesson 3 (manual): Lesson 10 (Wake trait):
RawWaker { data, vtable } impl Wake for Task {
unsafe fn clone/wake/drop fn wake(self: Arc<Self>) { ... }
→ error-prone, lots of unsafe }
Waker::from(arc_task)
→ safe, clean, idiomatic
JoinHandle: getting results from tasks
When you spawn(), you want to get the result back:
#![allow(unused)]
fn main() {
let handle = spawn(async { 42 });
let result = handle.await; // 42
}
JoinHandle<T> is a future that resolves when the task completes:
┌─────────────────────────────────────────────────────┐
│ Shared state (Arc<Mutex<JoinState>>) │
│ │
│ result: Option<T> ← None until task finishes │
│ waker: Option<Waker> ← set by JoinHandle::poll │
│ │
│ Task writes result, wakes JoinHandle │
│ JoinHandle checks result │
└──────────────────┬──────────────────────────────────┘
│
shared by both sides
│
┌──────────────┴──────────────┐
│ │
▼ ▼
┌─────────┐ ┌──────────────┐
│ Task │ │ JoinHandle │
│ │ │ (a Future) │
│ runs │ │ │
│ future │ completes │ poll(): │
│ stores │ ──────────────►│ result set? │
│ result │ │ yes → Ready │
│ wakes │ │ no → Pending│
│ handle │ │ │
└─────────┘ └──────────────┘
Implementation:
#![allow(unused)]
fn main() {
struct JoinState<T> {
result: Option<T>,
waker: Option<Waker>,
}
struct JoinHandle<T> {
state: Arc<Mutex<JoinState<T>>>,
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<T> {
let mut state = self.state.lock().unwrap();
if let Some(result) = state.result.take() {
Poll::Ready(result)
} else {
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
Putting it together: the full executor
#![allow(unused)]
fn main() {
struct Executor {
queue: Arc<Mutex<VecDeque<Arc<Task>>>>,
reactor: Reactor,
}
impl Executor {
fn spawn<T>(&self, future: impl Future<Output=T> + Send + 'static) -> JoinHandle<T> {
// 1. Create shared JoinState
// 2. Wrap future: when it completes, store result and wake handle
// 3. Create Task with the wrapped future
// 4. Push to queue
// 5. Return JoinHandle
}
fn run(&mut self) {
loop {
// 1. Drain queue, poll each task
let tasks: Vec<_> = self.queue.lock().unwrap().drain(..).collect();
if tasks.is_empty() {
// 2. Nothing to do → block on reactor
self.reactor.wait(None);
continue;
}
for task in tasks {
let waker = Waker::from(task.clone());
let mut cx = Context::from_waker(&waker);
let mut future = task.future.lock().unwrap();
let _ = future.as_mut().poll(&mut cx);
// If Ready → task dropped (not re-queued)
// If Pending → waker will re-queue later
}
}
}
}
}
Exercises
Exercise 1: Spawn and join
Implement spawn() returning a JoinHandle<T>. Spawn three tasks that each return a number. Await all three handles, assert the sum.
#![allow(unused)]
fn main() {
let a = executor.spawn(async { 10 });
let b = executor.spawn(async { 20 });
let c = executor.spawn(async { 30 });
// a.await + b.await + c.await == 60
}
Exercise 2: Task fairness
Spawn a “greedy” task that yields 1000 times (returns Pending + wakes each time) and a “quick” task that completes in one poll. Track the order of completions. The quick task should complete within a few polls, not after 1000 — proving FIFO fairness.
Exercise 3: Starvation detection
Spawn a task that never yields (infinite loop inside poll). Show that other tasks never run. Then add a yield point (YieldOnce from Lesson 2). Show that fairness is restored.
This demonstrates why async is cooperative — tasks must yield voluntarily.
Exercise 4: JoinHandle cancellation
Drop a JoinHandle before the task completes. Add a cancelled flag to JoinState — when the handle is dropped, set the flag. The task checks the flag on each poll and aborts early if cancelled.
Test: spawn a task that increments a counter each poll. Drop the handle after 3 polls. Assert the counter is 3, not more.
Lesson 11: AsyncRead / AsyncWrite
Prerequisites: Lesson 9 (Reactor), Lesson 10 (Task Scheduling). You need a working reactor that maps tokens to wakers and a scheduler that polls tasks.
Real-life analogy: the drive-through window
At a drive-through:
Blocking:
You: "One burger please"
Window: (silence for 5 minutes while they cook)
You: (frozen, can't do anything, car is blocked)
Window: "Here's your burger"
Non-blocking:
You: "One burger please"
Window: "Not ready, come back later" (WouldBlock)
You: Drive away, do other errands
You: Come back: "Ready yet?"
Window: "Not ready" (WouldBlock again)
You: Come back: "Ready yet?"
Window: "Here's your burger!"
Async (what we're building):
You: "One burger please. Call me when it's ready."
Window: (stores your number) → reactor.set_waker(token, waker)
You: Go do other errands → return Poll::Pending
...
Window: (rings your phone) → waker.wake()
You: Drive back, pick it up → read() → Ready(burger)
AsyncRead/AsyncWrite wraps the “non-blocking + callback” pattern into a clean .await-able API.
The pattern: try → WouldBlock → register → Pending
Every async I/O operation follows the same pattern:
fn poll_read(cx, buf) → Poll<usize>:
┌──────────────────────────────┐
│ Try read(buf) │
│ │
│ Got data (n bytes)? │
│ → return Poll::Ready(n) │
│ │
│ Got WouldBlock? │
│ → reactor.set_waker(token, cx.waker())
│ → return Poll::Pending │
│ │
│ Got error? │
│ → return Poll::Ready(Err) │
└──────────────────────────────┘
This is the readiness pattern:
- Try the operation (non-blocking socket, so it returns immediately)
- If success → return the data
- If WouldBlock → register with the reactor, yield to the executor
- When the reactor fires → executor re-polls → back to step 1
The AsyncTcpStream
#![allow(unused)]
fn main() {
struct AsyncTcpStream {
/// The underlying non-blocking socket
inner: mio::net::TcpStream,
/// Token registered with the reactor (for event matching)
token: Token,
}
}
poll_read
#![allow(unused)]
fn main() {
impl AsyncTcpStream {
fn poll_read(
&mut self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.inner.read(buf) {
Ok(n) => Poll::Ready(Ok(n)), // got data
Err(e) if e.kind() == WouldBlock => {
reactor().set_waker(self.token, cx.waker().clone());
Poll::Pending // wait for reactor
}
Err(e) => Poll::Ready(Err(e)), // real error
}
}
}
}
The read future
To make this .await-able, wrap it in a future:
#![allow(unused)]
fn main() {
struct ReadFuture<'a> {
stream: &'a mut AsyncTcpStream,
buf: &'a mut [u8],
}
impl<'a> Future for ReadFuture<'a> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.stream.poll_read(cx, self.buf)
}
}
// Usage:
let n = stream.read(&mut buf).await?;
}
poll_write
Same pattern, but for writing:
#![allow(unused)]
fn main() {
fn poll_write(&mut self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
match self.inner.write(buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e.kind() == WouldBlock => {
reactor().set_waker(self.token, cx.waker().clone());
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
Visualizing the data flow
Application code:
let n = stream.read(&mut buf).await;
Calls:
ReadFuture::poll()
│
├── stream.inner.read(buf)
│ │
│ ├── Ok(n) ──────────────────► Poll::Ready(Ok(n))
│ │ executor receives data
│ │
│ └── Err(WouldBlock) ────────► reactor.set_waker(token, waker)
│ Poll::Pending
│ │
│ ... executor does other │
│ tasks ... │
│ │
│ OS: data arrives on socket │
│ reactor: event for Token(N)│
│ reactor: waker.wake() │
│ │
└── (re-polled by executor) ────────┘
stream.inner.read(buf) → Ok(n)
Poll::Ready(Ok(n))
write_all: a higher-level helper
A single write() may not write all bytes (partial write). write_all loops until everything is written:
#![allow(unused)]
fn main() {
async fn write_all(stream: &mut AsyncTcpStream, mut buf: &[u8]) -> io::Result<()> {
while !buf.is_empty() {
let n = stream.write(buf).await?;
buf = &buf[n..];
}
Ok(())
}
}
Each .await might yield if the socket’s write buffer is full. The reactor wakes us when there’s space.
read_exact: fill the buffer completely
#![allow(unused)]
fn main() {
async fn read_exact(stream: &mut AsyncTcpStream, buf: &mut [u8]) -> io::Result<()> {
let mut filled = 0;
while filled < buf.len() {
let n = stream.read(&mut buf[filled..]).await?;
if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "connection closed"));
}
filled += n;
}
Ok(())
}
}
The AsyncTcpListener
#![allow(unused)]
fn main() {
struct AsyncTcpListener {
inner: mio::net::TcpListener,
token: Token,
}
impl AsyncTcpListener {
fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<(AsyncTcpStream, SocketAddr)>> {
match self.inner.accept() {
Ok((stream, addr)) => {
let token = reactor().register(&mut stream, Interest::READABLE);
Poll::Ready(Ok((AsyncTcpStream { inner: stream, token }, addr)))
}
Err(e) if e.kind() == WouldBlock => {
reactor().set_waker(self.token, cx.waker().clone());
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
}
Same pattern: try accept → got connection? Ready. WouldBlock? Register and Pending.
Edge-triggered vs level-triggered
mio uses edge-triggered events by default: you get notified once when the state changes (not readable → readable). After handling the event, you must drain all available data, otherwise you might miss events.
Level-triggered: Edge-triggered:
"socket IS readable" "socket BECAME readable"
(keeps firing until you read) (fires once on transition)
Safe: you can read one byte Must read ALL available data
and you'll be notified again. or you might not be notified again.
In practice: after waker.wake(), your poll_read should loop reading until WouldBlock, then return Pending. This ensures you don’t miss data.
Cancellation safety
What happens if a ReadFuture is dropped mid-await?
#![allow(unused)]
fn main() {
let read_future = stream.read(&mut buf);
// Drop it before completion (e.g., select! picked the other branch)
drop(read_future);
}
With our design, this is safe: no data is lost because we haven’t actually read anything yet. The read only happens inside poll(). If we never poll, no bytes are consumed.
But read_exact is NOT cancellation-safe: if you’ve read 50 of 100 bytes and the future is dropped, those 50 bytes are gone. This is Lesson 24’s topic.
Exercises
Exercise 1: AsyncTcpStream
Implement AsyncTcpStream with poll_read and poll_write. Create read() and write() methods that return futures. Test with a simple echo server on your runtime from Lesson 5.
Exercise 2: read_exact and write_all
Build read_exact and write_all as async functions. Test: send a 10KB message in small chunks, verify read_exact receives all of it.
Exercise 3: Async echo server
Combine AsyncTcpListener + AsyncTcpStream + your executor. Accept connections, spawn a task per connection, echo data back. Test with multiple nc clients.
Exercise 4: Throughput test
Implement async fn copy(src, dst) that pipes bytes from one stream to another. Measure throughput: connect two streams, send 1MB, time it. Compare with blocking std::io::copy.
Lesson 12: Timers
Prerequisites: Lesson 9 (Reactor), Lesson 10 (Task Scheduling). Timers integrate with the reactor’s poll timeout and the executor’s task queue.
Real-life analogy: the kitchen timer rack
A chef has a rack of kitchen timers:
┌──────────────────────────────────────────────────────┐
│ Timer Rack (TimerHeap) │
│ │
│ ⏰ Pasta: 8 min (soonest → at the top) │
│ ⏰ Sauce: 15 min │
│ ⏰ Bread: 25 min │
│ │
│ Chef's loop: │
│ 1. Check: "which timer is closest?" → Pasta (8m) │
│ 2. Set a kitchen alarm for 8 minutes │
│ 3. Do other work until alarm rings │
│ 4. Alarm! → drain pasta │
│ 5. Next closest: Sauce (15m) → set alarm for 7m │
│ 6. Continue... │
└──────────────────────────────────────────────────────┘
The chef doesn’t check every timer every second. They set ONE alarm for the nearest timer, then work on other things. When it rings, they handle it and set the next alarm.
This is exactly how async timers work:
- Timer rack =
BinaryHeapof(Instant, Waker)entries - Nearest timer = the heap’s minimum (the
peek()) - Kitchen alarm =
mio::Poll::poll(timeout)— the reactor blocks until this timeout - Alarm rings = poll returns, we check for expired timers and wake them
How timers integrate with the reactor
The reactor already has a wait() method that calls poll.poll(). We add a timeout:
Executor loop
│
▼
┌───────────────────────────────┐
│ Drain task queue, poll tasks │
│ │
│ Queue empty? │
│ │ │
│ ▼ │
│ Check timer heap: │
│ Nearest deadline: 200ms │
│ │
│ reactor.wait(timeout: 200ms) │──► mio::poll(200ms)
│ │ blocks at most 200ms
│ ◄─────────────────────────────│
│ │
│ Check expired timers: │
│ pasta timer expired → wake │
│ │
│ Back to draining queue │
└───────────────────────────────┘
Without timers, the reactor blocks forever (no timeout). With timers, the reactor blocks until either:
- An I/O event fires, OR
- The nearest timer expires
Whichever comes first.
The TimerHeap
#![allow(unused)]
fn main() {
use std::collections::BinaryHeap;
use std::cmp::Reverse;
use std::time::Instant;
struct TimerEntry {
deadline: Instant,
waker: Waker,
}
struct TimerHeap {
heap: BinaryHeap<Reverse<TimerEntry>>,
}
}
We use Reverse so the heap is a min-heap — the soonest deadline is at the top.
Three operations:
#![allow(unused)]
fn main() {
impl TimerHeap {
/// Add a timer. When deadline passes, the waker will be called.
fn push(&mut self, deadline: Instant, waker: Waker) {
self.heap.push(Reverse(TimerEntry { deadline, waker }));
}
/// How long until the next timer fires? Used as poll timeout.
fn next_timeout(&self) -> Option<Duration> {
self.heap.peek().map(|Reverse(entry)| {
entry.deadline.saturating_duration_since(Instant::now())
})
}
/// Wake all timers that have expired.
fn fire_expired(&mut self) {
let now = Instant::now();
while let Some(Reverse(entry)) = self.heap.peek() {
if entry.deadline <= now {
let Reverse(entry) = self.heap.pop().unwrap();
entry.waker.wake();
} else {
break; // remaining timers are in the future
}
}
}
}
}
The Sleep future
#![allow(unused)]
fn main() {
struct Sleep {
deadline: Instant,
registered: bool,
}
impl Sleep {
fn new(duration: Duration) -> Self {
Self {
deadline: Instant::now() + duration,
registered: false,
}
}
}
impl Future for Sleep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
if Instant::now() >= self.deadline {
Poll::Ready(())
} else {
if !self.registered {
timer_heap().push(self.deadline, cx.waker().clone());
self.registered = true;
}
Poll::Pending
}
}
}
}
Usage: sleep(Duration::from_secs(2)).await;
How it works end-to-end
1. Task calls sleep(2s).await
2. Sleep::poll() → deadline is in the future
→ push(deadline, waker) to timer heap
→ return Pending
3. Executor: queue empty
→ timer_heap.next_timeout() = 2s
→ reactor.wait(timeout: 2s)
→ mio::poll blocks for 2 seconds
4. poll returns (timeout expired)
→ timer_heap.fire_expired()
→ waker.wake() → task re-queued
5. Executor polls task again
→ Sleep::poll() → Instant::now() >= deadline
→ return Ready(())
The timeout combinator
Wrap any future with a deadline:
#![allow(unused)]
fn main() {
async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, TimedOut> {
select! {
result = future => Ok(result),
_ = sleep(duration) => Err(TimedOut),
}
}
}
Or without select, as a manual future:
#![allow(unused)]
fn main() {
struct Timeout<F> {
future: F,
sleep: Sleep,
}
impl<F: Future> Future for Timeout<F> {
type Output = Result<F::Output, TimedOut>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Poll the inner future first
if let Poll::Ready(val) = self.future.poll(cx) {
return Poll::Ready(Ok(val));
}
// Then check the timer
if let Poll::Ready(()) = self.sleep.poll(cx) {
return Poll::Ready(Err(TimedOut));
}
Poll::Pending
}
}
}
Exercises
Exercise 1: Sleep and ordering
Implement Sleep and the TimerHeap. Spawn three tasks:
#![allow(unused)]
fn main() {
spawn(async { sleep(Duration::from_secs(3)).await; println!("C: 3s"); });
spawn(async { sleep(Duration::from_secs(1)).await; println!("A: 1s"); });
spawn(async { sleep(Duration::from_secs(2)).await; println!("B: 2s"); });
}
They should print in order: A, B, C. Total time should be ~3 seconds (concurrent), not 6 (sequential).
Exercise 2: Timer integration with reactor
Modify your executor’s main loop:
- After draining the task queue, call
timer_heap.fire_expired() - Compute
timer_heap.next_timeout() - Pass it to
reactor.wait(timeout)
Verify: spawn a sleep(1s) task alongside an I/O task. Both should work correctly — the reactor wakes up for either I/O events or timer expiry.
Exercise 3: Timeout combinator
Implement timeout(duration, future). Test:
#![allow(unused)]
fn main() {
let result = timeout(Duration::from_millis(100), sleep(Duration::from_secs(10))).await;
assert!(result.is_err()); // timed out!
let result = timeout(Duration::from_secs(10), sleep(Duration::from_millis(100))).await;
assert!(result.is_ok()); // completed in time
}
Exercise 4: Interval
Implement interval(duration) that yields () at regular intervals. Use it to print a heartbeat every 500ms while another task does a 3-second sleep.
#![allow(unused)]
fn main() {
spawn(async {
let mut i = interval(Duration::from_millis(500));
loop {
i.tick().await;
println!("heartbeat");
}
});
spawn(async {
sleep(Duration::from_secs(3)).await;
println!("done, shutting down");
// TODO: cancel the heartbeat task
});
}
Lesson 13: Channels
Prerequisites: Lesson 4 (Tasks), Lesson 10 (Task Scheduling). Channels are how tasks communicate — they need wakers to notify receivers.
Real-life analogy: the mailbox
Oneshot channel = a one-time letter:
┌────────────────┐ ┌────────────────┐
│ Sender │ sends ONE letter │ Receiver │
│ │ ────────────────────► │ │
│ "Here's your │ │ Waits at │
│ blood test │ │ mailbox │
│ result" │ │ until letter │
│ │ │ arrives │
└────────────────┘ └────────────────┘
Used once, then both sides are done.
MPSC channel = a mailbox with multiple senders:
┌────────────────┐
│ Sender A │──┐
└────────────────┘ │
┌────────────────┐ │ ┌────────────────┐
│ Sender B │──┼───►│ Receiver │
└────────────────┘ │ │ (one mailbox) │
┌────────────────┐ │ └────────────────┘
│ Sender C │──┘
└────────────────┘
Multiple producers, single consumer.
Messages queue up if receiver is busy.
The key difference from std::sync::mpsc: async channels wake the receiver when a message arrives instead of blocking a thread.
How async channels work
The core pattern: shared state + waker.
┌────────────────────────────────────────────────┐
│ Shared State (Arc<Mutex<Inner>>) │
│ │
│ queue: VecDeque<T> ← messages waiting │
│ rx_waker: Option<Waker> ← receiver's waker │
│ closed: bool ← sender dropped? │
│ │
│ Sender writes: │
│ 1. Lock inner │
│ 2. Push message to queue │
│ 3. If rx_waker is Some → wake it │
│ │
│ Receiver reads: │
│ 1. Lock inner │
│ 2. Pop from queue → got message? Ready │
│ 3. Queue empty? Store waker, return Pending │
│ │
└────────────────────────────────────────────────┘
The sequence
Sender Shared State Receiver
│ │ │
│ │ poll() ◄───────┤
│ │ queue empty │
│ │ store waker ◄───────────┤
│ │ Pending ──►
│ │ │
│ send("hello") ──────►│ │
│ push to queue │ │
│ wake receiver ────────┼──► waker.wake() │
│ │ │
│ │ poll() ◄───────┤
│ │ pop "hello" │
│ │ Ready("hello") ──►
Oneshot channel
The simplest async channel: one message, one sender, one receiver.
#![allow(unused)]
fn main() {
struct Inner<T> {
value: Option<T>,
rx_waker: Option<Waker>,
closed: bool,
}
struct Sender<T> {
inner: Arc<Mutex<Inner<T>>>,
}
struct Receiver<T> {
inner: Arc<Mutex<Inner<T>>>,
}
}
Sender::send
#![allow(unused)]
fn main() {
impl<T> Sender<T> {
fn send(self, value: T) -> Result<(), T> {
let mut inner = self.inner.lock().unwrap();
if inner.closed {
return Err(value); // receiver dropped
}
inner.value = Some(value);
if let Some(waker) = inner.rx_waker.take() {
waker.wake(); // notify receiver
}
Ok(())
}
}
}
Note: send consumes self — you can only send once.
Receiver as a Future
#![allow(unused)]
fn main() {
impl<T: Unpin> Future for Receiver<T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = self.inner.lock().unwrap();
if let Some(value) = inner.value.take() {
Poll::Ready(Ok(value))
} else if inner.closed {
Poll::Ready(Err(RecvError)) // sender dropped without sending
} else {
inner.rx_waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
Usage
#![allow(unused)]
fn main() {
let (tx, rx) = oneshot::channel();
spawn(async move { tx.send(42).unwrap(); });
let value = rx.await.unwrap(); // 42
}
MPSC channel
Multiple senders, one receiver. Messages buffer up in a queue.
Key differences from oneshot:
- Queue instead of single value
- Sender is Clone — track how many senders exist
- Bounded vs unbounded — bounded adds backpressure (Lesson 23)
- All senders drop → channel closed
Closed channel detection
When the sender drops without sending, the receiver should be woken with an error:
#![allow(unused)]
fn main() {
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut inner = self.inner.lock().unwrap();
inner.closed = true;
if let Some(waker) = inner.rx_waker.take() {
waker.wake(); // wake receiver so it sees the closure
}
}
}
}
Exercises
Exercise 1: Oneshot channel
Implement oneshot::channel() → (Sender<T>, Receiver<T>).
- Sender has
send(value)that consumes self - Receiver implements
Future - Test: send 42 from one task, receive in another
Exercise 2: Oneshot drop detection
Test that dropping the sender without sending wakes the receiver with an error:
#![allow(unused)]
fn main() {
let (tx, rx) = oneshot::channel::<i32>();
drop(tx);
assert!(rx.await.is_err());
}
Exercise 3: MPSC channel
Implement mpsc::channel() → (Sender<T>, Receiver<T>).
- Sender is Clone, has
send(value) - Receiver has
async fn recv() → Option<T>(None when all senders dropped) - Test: 3 senders each send 10 messages, receiver collects all 30
Exercise 4: Bounded MPSC
Add capacity to your MPSC: mpsc::channel(cap).
send()returns Pending when the queue is fullrecv()wakes one blocked sender when it pops a message- Test: channel(2), send 3 messages — third blocks until receiver pops one
Lesson 14: Work-Stealing Scheduler
Prerequisites: Lesson 10 (Task Scheduling), Lesson 13 (Channels). You need a working single-threaded executor before going multi-threaded.
Real-life analogy: supermarket checkout lanes
Single-threaded executor = one checkout lane:
┌──────────────────────────────────────────┐
│ Lane 1: [customer] [customer] [customer]│ One cashier.
│ │ Long line.
└──────────────────────────────────────────┘
Multi-threaded (no stealing) = N lanes, unbalanced:
┌──────────────────────────────────────────┐
│ Lane 1: [customer] [customer] [customer]│ Busy!
│ Lane 2: [customer] │ Almost idle.
│ Lane 3: │ Empty!
│ Lane 4: [customer] [customer] │
└──────────────────────────────────────────┘
Some lanes are jammed while others are empty.
Work-stealing = lanes help each other:
┌──────────────────────────────────────────┐
│ Lane 1: [customer] [customer] │
│ Lane 2: [customer] ←── stole from lane 1│
│ Lane 3: [customer] ←── stole from lane 1│
│ Lane 4: [customer] │
└──────────────────────────────────────────┘
Idle cashiers walk to busy lanes and take customers.
Result: balanced load, shorter wait times.
Why work-stealing?
With N worker threads, you need a strategy for distributing tasks:
Strategy 1: Shared queue (simple, bad)
All workers pop from ONE queue.
Problem: lock contention. N threads fighting over one Mutex.
┌────────┐ ┌────────┐ ┌────────┐
│Worker 0│ │Worker 1│ │Worker 2│
└───┬────┘ └───┬────┘ └───┬────┘
│ │ │
└─────┬─────┴─────┬────┘
│ │
┌────▼────────────▼────┐
│ Shared Queue (Mutex)│ ← contention!
└──────────────────────┘
Strategy 2: Local queues + work stealing (complex, fast)
Each worker has its own queue. No contention for local work.
When idle, steal from a random busy worker.
┌────────┐ ┌────────┐ ┌────────┐
│Worker 0│ │Worker 1│ │Worker 2│
│[t1,t2] │ │[t3] │ │[] │ ← empty, will steal
└────────┘ └────────┘ └───┬────┘
│ steal half from Worker 0
▼
┌────────┐ ┌────────┐ ┌────────┐
│Worker 0│ │Worker 1│ │Worker 2│
│[t1] │ │[t3] │ │[t2] │ ← balanced!
└────────┘ └────────┘ └────────┘
Architecture
┌────────────────────────────────────────────────────────┐
│ Work-Stealing Runtime │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Global Queue (for overflow + cross-thread spawn)│ │
│ │ Arc<Mutex<VecDeque<Arc<Task>>>> │ │
│ └──────────┬──────────────┬──────────┬─────────────┘ │
│ │ │ │ │
│ ┌──────────▼───┐ ┌──────▼──────┐ ┌▼────────────┐ │
│ │ Worker 0 │ │ Worker 1 │ │ Worker 2 │ │
│ │ │ │ │ │ │ │
│ │ local queue │ │ local queue │ │ local queue │ │
│ │ [t1, t2] │ │ [t3] │ │ [] │ │
│ │ │ │ │ │ ↑ steal │ │
│ │ thread 0 │ │ thread 1 │ │ thread 2 │ │
│ └──────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Each worker: │
│ 1. Pop from local queue │
│ 2. If empty → check global queue │
│ 3. If empty → steal from random worker │
│ 4. If nothing → park (sleep) │
└────────────────────────────────────────────────────────┘
The worker loop
#![allow(unused)]
fn main() {
fn worker_loop(worker_id: usize, workers: &[Worker], global: &GlobalQueue) {
loop {
// 1. Try local queue first (no contention)
if let Some(task) = self.local_queue.pop() {
poll(task);
continue;
}
// 2. Try global queue (shared, needs lock)
if let Some(task) = global.pop() {
poll(task);
continue;
}
// 3. Try stealing from a random peer
let victim = random_worker(workers, worker_id);
if let Some(task) = victim.local_queue.steal() {
poll(task);
continue;
}
// 4. Nothing to do — park
thread::park();
}
}
}
The Chase-Lev deque
The data structure that makes work-stealing efficient:
Owner (the worker thread):
push to back: O(1), no synchronization
pop from back: O(1), no synchronization (LIFO — most recent first)
Stealers (other workers):
steal from front: O(1), uses atomic CAS (compare-and-swap)
┌──────────────────────────────────┐
│ front (stealers) │
│ ↓ │
│ [task_A] [task_B] [task_C] │
│ ↑ │
│ back (owner) │
└──────────────────────────────────┘
Owner pushes/pops from back (fast, no lock).
Stealers steal from front (atomic, minimal contention).
In Rust, use the crossbeam-deque crate:
#![allow(unused)]
fn main() {
use crossbeam_deque::{Worker, Stealer};
let worker = Worker::new_fifo();
let stealer = worker.stealer(); // can be cloned to other threads
worker.push(task); // owner pushes
worker.pop(); // owner pops
stealer.steal(); // other thread steals
stealer.steal_batch(&other_worker); // steal half
}
Spawn: where does the task go?
spawn(future):
Am I on a worker thread?
Yes → push to my local queue (fast path)
No → push to global queue (cross-thread spawn)
Tokio uses thread-local storage to detect if spawn is called from a worker thread. If so, the task goes directly into the local queue (no lock, no contention).
Parking and unparking
When a worker has nothing to do, it should sleep (not busy-poll):
#![allow(unused)]
fn main() {
// Worker with nothing to do:
thread::park(); // sleep, 0% CPU
// When a new task is spawned or a waker fires:
worker_thread.unpark(); // wake up!
}
The tricky part: deciding WHICH worker to unpark. Options:
- Unpark one random idle worker (tokio’s approach)
- Unpark all idle workers (simple but thundering herd)
Exercises
Exercise 1: Two-worker runtime
Build a work-stealing runtime with 2 worker threads:
- Each worker has a
VecDequelocal queue - A shared global queue for overflow
- Workers pop local → pop global → steal from peer → park
- Spawn 100 tasks that record
std::thread::current().id() - Verify both threads ran tasks
Exercise 2: Steal half
Implement “steal half” — when stealing, take half the victim’s local queue:
#![allow(unused)]
fn main() {
fn steal_batch(victim: &LocalQueue, thief: &mut LocalQueue) {
let count = victim.len() / 2;
for _ in 0..count {
if let Some(task) = victim.steal_front() {
thief.push_back(task);
}
}
}
}
Test: push 50 tasks to Worker 0, none to Worker 1. Run. Assert Worker 1 processed some.
Exercise 3: Benchmark
Compare throughput:
- Single-threaded executor (Lesson 10): spawn 10K tasks, measure time
- Work-stealing with 4 workers: same 10K tasks
Tasks should be trivial (increment atomic counter). Print tasks/second for each.
Exercise 4: crossbeam-deque
Replace your VecDeque local queue with crossbeam_deque::Worker. Compare performance. The crossbeam deque uses lock-free atomics instead of a Mutex — it should be faster under contention.
Add to Cargo.toml: crossbeam-deque = "0.8"
Lesson 15: Select Internals
Prerequisites: Lesson 7 (Combinators), Lesson 13 (Channels). You built a basic
MySelectin Lesson 7. Now we go deeper into cancellation semantics.
Real-life analogy: race to the airport
You need to get to the airport. You have two options running in parallel:
┌────────────────┐ ┌────────────────┐
│ Option A: Taxi │ │ Option B: Bus │
│ ETA: unknown │ │ ETA: 20 min │
└───────┬────────┘ └───────┬────────┘
│ │
│ ... waiting ... │
│ │
│ ▼
│ Bus arrives!
│ You get on.
│
▼
Cancel taxi! ← this is the DROP
(taxi driver goes home)
Select = race multiple futures, take the winner, cancel the losers.
The “cancel” part is where it gets tricky. What if the taxi was already en route? What if it had already picked you up but you hadn’t noticed? Cancellation has consequences.
How select works
#![allow(unused)]
fn main() {
async fn select<A, B>(fut_a: A, fut_b: B) -> Either<A::Output, B::Output>
where A: Future, B: Future
{
// Internally, each poll():
// 1. Poll fut_a → Ready? return Left(value), DROP fut_b
// 2. Poll fut_b → Ready? return Right(value), DROP fut_a
// 3. Both Pending → Pending
}
}
poll #1:
poll A → Pending
poll B → Pending
→ return Pending
poll #2:
poll A → Pending
poll B → Ready(value)
→ drop A (cancelled!)
→ return Right(value)
The cancellation problem
When a future is dropped mid-execution, any work it did between its last Pending and the drop is lost:
Cancellation-SAFE (no data loss):
recv() → Pending (no message consumed)
→ dropped (nothing lost)
Cancellation-UNSAFE (data loss):
read_exact(buf, 100 bytes)
→ read 50 bytes, Pending (50 bytes consumed from socket)
→ dropped (those 50 bytes are GONE, next read misses them)
┌──────────────────────────────────────────────────────┐
│ Cancellation Safety Rules │
│ │
│ SAFE to use in select: │
│ ✓ channel.recv() — no data consumed until Ready│
│ ✓ sleep() — no side effects │
│ ✓ listener.accept() — no connection consumed │
│ │
│ UNSAFE in select: │
│ ✗ read_exact() — partial reads lost │
│ ✗ read_line() — partial line lost │
│ ✗ collect from stream — partial results lost │
│ │
│ Rule: if dropping after Pending loses data │
│ that can't be recovered, it's UNSAFE. │
└──────────────────────────────────────────────────────┘
This is covered in depth in Lesson 24 (Cancellation Safety).
Polling bias
Naive select always polls A first:
#![allow(unused)]
fn main() {
// BIASED: A always gets polled first
if let Ready(v) = fut_a.poll(cx) { return Left(v); }
if let Ready(v) = fut_b.poll(cx) { return Right(v); }
}
If A is always ready, B is starved — never gets a chance. Solutions:
- Random order: pick which to poll first randomly each time
- Round-robin: alternate A-first and B-first
- tokio::select! has a
biased;option to explicitly choose
Fuse: poll-after-completion guard
The Future contract says: don’t poll after Ready. But in a select loop, you might accidentally poll a completed future:
#![allow(unused)]
fn main() {
loop {
select! {
msg = channel.recv() => handle(msg),
_ = timer.tick() => println!("tick"),
}
// After timer fires, the next iteration polls timer again
// But it already returned Ready — UB!
}
}
Fuse wraps a future to return Pending forever after completing:
#![allow(unused)]
fn main() {
struct Fuse<F> {
future: Option<F>, // None after completion
}
impl<F: Future> Future for Fuse<F> {
fn poll(...) -> Poll<...> {
match &mut self.future {
Some(f) => match f.poll(cx) {
Ready(v) => { self.future = None; Ready(v) }
Pending => Pending
}
None => Pending, // already completed, safe to poll
}
}
}
}
Exercises
Exercise 1: Binary select
Implement select(fut_a, fut_b) -> Either<A, B>:
- Poll both futures
- Return whichever is Ready first
- Drop the loser
Test: select(sleep(100ms), sleep(500ms)) should return Left after ~100ms.
Exercise 2: Cancellation demo
Create a CountingFuture that increments an Arc<AtomicU32> each time it’s polled. Select it against an immediately-ready future. Assert the counter shows it was polled exactly once before being dropped.
Add a Drop impl that prints “cancelled!” to make the drop visible.
Exercise 3: Select loop with channel
#![allow(unused)]
fn main() {
let (tx, mut rx) = mpsc::channel();
let mut interval = Interval::new(Duration::from_millis(200));
loop {
select! {
msg = rx.recv() => match msg {
Some(m) => println!("got: {m}"),
None => break, // channel closed
},
_ = interval.tick() => println!("tick"),
}
}
}
Implement this pattern. Spawn a task that sends 5 messages with delays, then drops the sender. The select loop should print ticks between messages and exit when the channel closes.
Exercise 4: Fuse
Implement the Fuse wrapper. Test: fuse a future that returns Ready(42). Poll it three times. First poll returns Ready(42), second and third return Pending (not UB).
Project 2: Multi-threaded Chat Server
Prerequisites: Lessons 9-15 (reactor, scheduling, async I/O, timers, channels, work-stealing, select). This project combines them all.
Overview
Build a fully working chat server on top of the async runtime you built in Lessons 9-15. No tokio, no async-std — just your reactor, executor, channels, and timers. This project proves your runtime can handle real concurrent I/O.
Architecture
┌─────────────────────────────────────────────────────────┐
│ Chat Server (your runtime) │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Accept Loop (one task) │ │
│ │ TcpListener.accept() → spawn client task │ │
│ └──────┬─────────────────────────────────────────────┘ │
│ │ spawn │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Client A │ │ Client B │ │ Client C │ │
│ │ │ │ │ │ │ │
│ │ read loop: │ │ read loop: │ │ read loop: │ │
│ │ select! { │ │ select! { │ │ select! { │ │
│ │ stream.read│ │ stream.read│ │ stream.read│ │
│ │ inbox.recv │ │ inbox.recv │ │ inbox.recv │ │
│ │ } │ │ } │ │ } │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └────────┬────────┴────────┬─────────┘ │
│ │ │ │
│ ┌──────▼─────────────────▼──────┐ │
│ │ Broker Task │ │
│ │ (owns the client map) │ │
│ │ │ │
│ │ events channel: │ │
│ │ Join(id, nick, inbox_tx) │ │
│ │ Leave(id) │ │
│ │ Message(id, text) │ │
│ │ │ │
│ │ On Message: fan out to all │ │
│ │ client inboxes except sender │ │
│ └───────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
Why a broker?
Instead of sharing a HashMap<ClientId, Sender> behind a Mutex (which every client would lock), we use a broker task that owns the state exclusively. Clients communicate with the broker through a channel.
Without broker: With broker:
Client A locks HashMap Client A sends Event to broker
Client B waits for lock Client B sends Event to broker
Client C waits for lock Broker processes events sequentially
→ lock contention → no contention, no Mutex
What you’ll build
A TCP chat server where:
- Multiple clients connect via
telnetornc - Messages from any client are broadcast to all other connected clients
- Each client has a nickname (default:
user-N), changeable with/nick name - The server detects disconnects (EOF or broken pipe) and announces departures
- The server runs on your work-stealing runtime across multiple threads
Feature list
- Accept loop –
AsyncTcpListeneraccepts connections and spawns a task per client - Broadcast – an mpsc channel per client; incoming messages fan out to every other client’s channel
- Commands –
/nick <name>changes display name,/wholists connected users,/quitdisconnects - Disconnect detection – read returning 0 bytes or an error triggers cleanup and a “user left” broadcast
- Graceful shutdown – Ctrl-C sets a flag; the accept loop exits and all client tasks drain
Key concepts
- Shared state – a
HashMap<ClientId, ClientHandle>behind an async-aware mutex or accessed from a dedicated broker task - Broker pattern – one task owns the client map and receives events (join / leave / message) over a channel, avoiding shared mutable state
- Backpressure – bounded per-client channels prevent a slow reader from exhausting memory
- Cancellation – when a client disconnects, its task is dropped; select ensures no leaked futures
- Testing – spawn the server in a background task, connect with multiple
AsyncTcpStreamclients from test tasks, and assert message delivery
Exercises
-
Basic chat – implement the accept loop, per-client read loop, and broadcast. Connect two
ncsessions and verify messages flow both ways. -
Commands and nicks – add
/nick,/who, and/quit. Verify that broadcast messages show the updated nickname after a/nickchange. -
Load test – spawn 100 client tasks that each send 10 messages. Assert every client receives all 900 messages from others (100 clients x 10 messages - own 10). Measure total time on your work-stealing runtime.
Lesson 16: Tokio Architecture
Prerequisites: Courses 1-2. You’ve built your own runtime — now see how the production one is designed.
Real-life analogy: a factory
┌──────────────────────────────────────────────────────────┐
│ Factory (Tokio Runtime) │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Factory Floor (Scheduler) │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ │
│ │ │ (thread) │ │ (thread) │ │ (thread) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ │ Work-stealing: idle workers take from busy ones │ │
│ └────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌────────────────────────▼───────────────────────────┐ │
│ │ Utility Room (Drivers) │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ I/O │ │ Timer │ │ Signal │ │ │
│ │ │ Driver │ │ Driver │ │ Driver │ │ │
│ │ │ (mio) │ │ (wheel) │ │ (unix) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ │ Handles external events → wakes tasks │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ Front Office: Runtime::block_on(), spawn(), Handle │
└──────────────────────────────────────────────────────────┘
The factory has:
- Workers (threads) on the factory floor processing tasks
- A utility room (drivers) that monitors external events (power, deliveries, alarms)
- A front office (API) where you submit work orders
Tokio’s internal structure
tokio::runtime::Runtime
│
├── Scheduler
│ ├── current_thread::Scheduler (single-threaded)
│ └── multi_thread::Scheduler (work-stealing)
│ ├── Worker 0 (thread + local queue)
│ ├── Worker 1
│ └── Worker N
│
├── Driver (layered)
│ ├── Signal Driver (outermost)
│ │ └── Time Driver
│ │ └── I/O Driver (innermost, wraps mio::Poll)
│ │
│ │ Each park() call propagates down:
│ │ signal.park() → time.park() → io.park() → mio.poll()
│ │
│ │ On return, each layer checks its events:
│ │ mio events → wake I/O tasks
│ │ expired timers → wake timer tasks
│ │ signals → wake signal listeners
│
└── Handle (cloneable, Send + Sync)
├── spawn() — submit a task from anywhere
├── block_on() — enter the runtime on current thread
└── spawn_blocking() — run sync code on a dedicated thread pool
current_thread vs multi_thread
current_thread: multi_thread:
┌──────────────────┐ ┌──────────────────┐
│ One thread │ │ N threads │
│ │ │ │
│ ┌────────────┐ │ │ ┌────┐ ┌────┐ │
│ │ Scheduler │ │ │ │ W0 │ │ W1 │ │
│ │ + Driver │ │ │ └────┘ └────┘ │
│ │ (same │ │ │ ┌────┐ ┌────┐ │
│ │ thread) │ │ │ │ W2 │ │ W3 │ │
│ └────────────┘ │ │ └────┘ └────┘ │
│ │ │ │
│ Pros: │ │ Pros: │
│ - No Send req │ │ - Uses all CPUs │
│ - No sync cost │ │ - Work stealing │
│ - Deterministic │ │ - Production │
│ │ │ │
│ Cons: │ │ Cons: │
│ - One CPU core │ │ - Send required │
│ - Can't use │ │ - More complex │
│ spawn() │ │ - Non-determ. │
│ (only │ │ │
│ spawn_local) │ │ │
└──────────────────┘ └──────────────────┘
When to use each
current_thread: tests, WASM, simple CLI tools, apps with!Sendtypesmulti_thread: web servers, database proxies, anything with high concurrency
The Runtime Builder
#![allow(unused)]
fn main() {
// Multi-threaded (default for #[tokio::main])
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // default: num_cpus
.max_blocking_threads(512) // for spawn_blocking
.enable_io() // I/O driver (mio)
.enable_time() // timer driver
.thread_name("my-worker")
.on_thread_start(|| println!("worker started"))
.build()?;
// Single-threaded
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
}
What enable_io() and enable_time() do
enable_io(): creates mio::Poll, starts I/O driver
without it: TcpStream, UdpSocket, etc. panic
enable_time(): creates timer wheel, starts time driver
without it: tokio::time::sleep panics
enable_all(): enables both I/O and time
The Driver stack
The driver is layered — each layer wraps the one below:
park() call flow:
SignalDriver::park()
│
├── check for pending signals
│
└── TimeDriver::park()
│
├── compute timeout from next timer deadline
│
└── IoDriver::park(timeout)
│
└── mio::Poll::poll(timeout)
│
└── OS: kqueue / epoll (blocks)
return flow:
mio returns events
│
└── IoDriver: wake I/O tasks
│
└── TimeDriver: fire expired timers, wake timer tasks
│
└── SignalDriver: dispatch signals
This is why enable_io() and enable_time() matter — without them, those driver layers don’t exist.
Handle: spawning from anywhere
#![allow(unused)]
fn main() {
let rt = Runtime::new()?;
let handle = rt.handle().clone(); // Handle is Send + Sync + Clone
// From another thread:
handle.spawn(async { /* runs on the runtime */ });
// From inside async code:
let handle = tokio::runtime::Handle::current();
handle.spawn(async { /* also works */ });
}
Tracing through tokio source
A fun exercise: trace tokio::spawn(my_future) through the source code.
tokio::spawn(future)
→ context::spawn(future) // get the current runtime context
→ scheduler.spawn(task) // submit to the scheduler
→ worker.schedule(task) // push to local queue (or global)
→ if worker idle: worker.unpark() // wake a sleeping worker
Exercises
Exercise 1: current_thread echo server
Build a TCP echo server on current_thread. Use spawn_local for tasks. Verify it works but only uses one CPU core.
Exercise 2: multi_thread thread distribution
Spawn 100 tasks on a multi_thread runtime with 4 workers. Each task records std::thread::current().id(). Print how many tasks ran on each thread — should be roughly balanced.
Exercise 3: Missing driver
Create a runtime with only enable_io() (no time). Try tokio::time::sleep(1s).await. What error do you get?
Then create with only enable_time() (no I/O). Try TcpListener::bind. What happens?
Exercise 4: Handle from another thread
#![allow(unused)]
fn main() {
let rt = Runtime::new()?;
let handle = rt.handle().clone();
std::thread::spawn(move || {
handle.spawn(async {
println!("running on tokio from a std thread!");
});
});
}
Exercise 5: Throughput comparison
Build a simple echo server. Benchmark with nc or a load tester:
current_threadwith 1000 concurrent connectionsmulti_thread(4 workers) with 1000 concurrent connections
Measure requests/second. How much does multi_thread help?
Lesson 17: Tokio’s I/O Driver
Prerequisites: Lesson 9 (Reactor), Lesson 11 (AsyncRead/AsyncWrite), Lesson 16 (Tokio Architecture).
Real-life analogy: the switchboard operator
Old telephone system:
┌──────────────────────────────────────────────────┐
│ Switchboard (I/O Driver) │
│ │
│ Plug board: │
│ Jack 1 → Room 101 (Alice's phone) │
│ Jack 2 → Room 205 (Bob's phone) │
│ Jack 3 → (empty) │
│ │
│ Operator loop: │
│ 1. Watch all jacks for incoming signals │
│ 2. Jack 2 lights up → "Bob has a call!" │
│ 3. Ring Bob's room (wake his task) │
│ │
│ Registration: │
│ New guest checks in → operator plugs a jack │
│ Guest checks out → operator unplugs │
└──────────────────────────────────────────────────┘
Tokio’s I/O driver is the switchboard:
- Jacks = file descriptors registered with mio
- Plugging in =
Registration::new()→mio::Poll::register() - Light up = readiness event from kqueue/epoll
- Ring the room =
waker.wake()
How tokio wraps mio
Your Lesson 9 reactor and tokio’s I/O driver do the same thing — but tokio adds a layer of abstraction:
Your reactor (Lesson 9): Tokio's I/O driver:
mio::Poll mio::Poll
HashMap<Token, Waker> Slab<ScheduledIo>
register/deregister manually Registration handles lifecycle
you call wait() driver calls park()
The Registration type
In tokio, every I/O resource (TcpStream, UdpSocket, etc.) holds a Registration:
#![allow(unused)]
fn main() {
// Simplified from tokio source
struct Registration {
handle: Handle, // reference to the I/O driver
token: usize, // slab index for event dispatch
}
}
When you create a tokio::net::TcpStream, it calls Registration::new():
- Registers the fd with
mio::Poll - Allocates a slot in the driver’s slab
- Returns a
Registrationthat derefs to wake/interest methods
The readiness flow
Application: stream.read(&mut buf).await
Tokio TcpStream::read():
│
├── poll_read_ready() // check if driver says readable
│ │
│ ├── already ready? → try read()
│ │
│ └── not ready? → register waker with driver
│ return Pending
│
├── (later) I/O driver: mio::Poll returns event
│ │
│ └── driver looks up ScheduledIo by token
│ calls waker.wake()
│
└── re-polled: poll_read_ready() → ready!
try read() → Ok(n) → Ready(n)
Interest and Ready
#![allow(unused)]
fn main() {
// Interest: what events you want
Interest::READABLE // want to know when data is available
Interest::WRITABLE // want to know when write buffer has space
Interest::READABLE | Interest::WRITABLE // both
// Ready: what actually happened
if ready.is_readable() { /* data available */ }
if ready.is_writable() { /* can write */ }
if ready.is_read_closed() { /* peer closed their write half */ }
if ready.is_write_closed() { /* peer closed their read half */ }
}
Spurious wakeups
The driver might wake you when there’s nothing to do:
Driver says: "fd 5 is readable!"
You call read(buf): → WouldBlock (nothing actually available)
This happens because:
- Edge-triggered events can be delivered before data fully arrives
- Multiple events can coalesce
- The OS might report readiness optimistically
Your I/O code MUST handle WouldBlock by returning Pending and re-registering — never assume the operation will succeed just because you were woken.
Exercises
Exercise 1: mio echo server
Build a raw mio echo server (no tokio). Register a listener, accept connections, register each for READABLE, echo data. This is what tokio does internally.
Exercise 2: Tokio echo server
Convert the mio echo server to tokio. Observe how much code disappears — tokio handles registration, wakers, and the event loop for you.
Exercise 3: Readiness exploration
#![allow(unused)]
fn main() {
use tokio::net::TcpStream;
use tokio::io::Interest;
let stream = TcpStream::connect("127.0.0.1:8080").await?;
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
println!("readable: {}, writable: {}", ready.is_readable(), ready.is_writable());
}
Connect to a server, check readiness. Write data, check again. Close the peer, check for is_read_closed().
Exercise 4: Spurious wakeup handling
Write a stream wrapper that logs every WouldBlock. Connect to a server, start reading. How many WouldBlocks do you see? This shows why the retry loop is essential.
Lesson 18: tokio::sync Internals — Mutex, RwLock, Semaphore, Notify
Prerequisites: Lessons 13-17. You know how channels work — now see the lower-level primitives.
Real-life analogy: bathroom keys at a gas station
┌─────────────────────────────────────────────────────────────────┐
│ Gas Station Bathroom Access │
│ │
│ Mutex (one key): │
│ 🔑 One key on the counter. Take it, use bathroom, return. │
│ If key is gone → WAIT (don't block the road, sit in car). │
│ │
│ Semaphore (multiple keys): │
│ 🔑🔑🔑 Three keys → three stalls. Fourth person waits. │
│ Each key = one permit. │
│ │
│ RwLock (museum analogy): │
│ Many visitors can READ the exhibit simultaneously. │
│ But the curator needs EXCLUSIVE access to rearrange. │
│ Readers share, writers exclude everyone. │
│ │
│ Notify (doorbell): │
│ Ring the bell → someone waiting inside wakes up. │
│ No data transferred, just "hey, something happened." │
└─────────────────────────────────────────────────────────────────┘
Why not std::sync in async code?
std::sync::Mutex::lock() tokio::sync::Mutex::lock()
┌──────────────────────┐ ┌──────────────────────┐
│ Thread blocks (OS) │ │ Task yields (Pending) │
│ Worker thread frozen │ │ Worker thread FREE │
│ Other tasks starve │ │ Runs other tasks │
│ Deadlock risk! │ │ Wakes when lock free │
└──────────────────────┘ └──────────────────────┘
Rule of thumb:
- Lock NOT held across .await → std::sync::Mutex is fine (faster)
- Lock held across .await → MUST use tokio::sync::Mutex
Mutex vs RwLock
| Aspect | Mutex<T> | RwLock<T> |
|---|---|---|
| Readers | One at a time | Many concurrent |
| Writers | Exclusive | Exclusive |
| Overhead | Lower | Higher (tracking readers) |
| Use when | Writes are frequent | Reads dominate |
#![allow(unused)]
fn main() {
use tokio::sync::RwLock;
let lock = RwLock::new(HashMap::new());
// Many tasks can read concurrently
let guard = lock.read().await;
// Only one task can write
let mut guard = lock.write().await;
guard.insert("key", "value");
}
Semaphore — the universal primitive
Semaphore(3): ┌───┬───┬───┐
Available permits │ ● │ ● │ ● │
└───┴───┴───┘
Task A acquires 1: ┌───┬───┬───┐
│ │ ● │ ● │ (2 left)
└───┴───┴───┘
Task B acquires 2: ┌───┬───┬───┐
│ │ │ │ (0 left)
└───┴───┴───┘
Task C acquires 1: WAITS... (no permits)
Task A drops: Task C wakes up, gets permit
Semaphore is the building block for:
- Rate limiting — permits = max concurrent operations
- Connection pooling — permits = max connections
- Bounded channels — tokio uses it internally
#![allow(unused)]
fn main() {
let sem = Arc::new(Semaphore::new(10));
let permit = sem.acquire().await?; // blocks if 10 already held
do_work().await;
drop(permit); // returns permit, wakes a waiter
}
Notify — async signaling
Notify is the simplest primitive: no data, just “wake up.”
Notifier Waiter
│ │
│ ├── notified().await (Pending)
│ │ ...sleeping...
├── notify_one() ──────────┤
│ ├── wakes up!
Building a simple async mutex with Notify
#![allow(unused)]
fn main() {
use tokio::sync::Notify;
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, Ordering};
struct SimpleMutex<T> {
locked: AtomicBool,
notify: Notify,
data: UnsafeCell<T>,
}
// The pattern: spin on atomic + sleep on Notify
// Real tokio::sync::Mutex uses a wait queue, not spinning
}
OwnedPermit and OwnedMutexGuard
When you need to move a guard into a spawned task:
#![allow(unused)]
fn main() {
let mutex = Arc::new(tokio::sync::Mutex::new(0));
let owned_guard = mutex.clone().lock_owned().await; // 'static lifetime
tokio::spawn(async move {
// owned_guard is Send + 'static — works in spawned tasks
drop(owned_guard);
});
}
Exercises
Exercise 1: Shared counter with tokio::sync::Mutex
Spawn 100 tasks that each increment a tokio::sync::Mutex<u64> counter. Verify the final count is 100.
Exercise 2: Rate limiter with Semaphore
Create a Semaphore::new(5). Spawn 20 tasks that each acquire a permit, do tokio::time::sleep(100ms), then release. Verify at most 5 run concurrently at any time.
Exercise 3: Producer-consumer with Notify
Build a queue using std::sync::Mutex<VecDeque<T>> + Notify. The producer pushes items and calls notify_one(). The consumer loops on notified().await + try-pop.
Exercise 4: Build a simple async Mutex using Notify
Implement a SimpleMutex<T> that uses AtomicBool for state and Notify for waking. It won’t be production-quality but demonstrates the concept.
Lesson 19: tokio::net — TcpListener, TcpStream, and the Reactor
Prerequisites: Lessons 8-9 (async I/O, reactor), Lesson 17 (I/O driver).
Real-life analogy: phone vs walkie-talkie
TCP = Phone call UDP = Walkie-talkie
┌─────────────────────┐ ┌─────────────────────┐
│ 1. Dial (connect) │ │ 1. Press button │
│ 2. Ring (SYN/ACK) │ │ 2. Talk (send_to) │
│ 3. "Hello?" (est.) │ │ 3. Release button │
│ 4. Conversation │ │ │
│ 5. "Bye" (FIN) │ │ No connection. │
│ │ │ No guarantee. │
│ Reliable, ordered, │ │ Fast, unordered, │
│ bidirectional stream │ │ fire-and-forget │
└─────────────────────┘ └─────────────────────┘
How tokio::net wraps mio
Your code Tokio OS
─────────────────────────────────────────────────────────────────
TcpStream::connect() → mio::net::TcpStream::connect() → connect(2)
stream.read(&buf) → poll_read() → read(2)
│ │
│ if WouldBlock: │
│ ┌─────────────────┐ │
│ │ Register with │ │
│ │ I/O driver │ │
│ │ (mio::Registry) │ │
│ │ Return Pending │ │
│ └─────────────────┘ │
│ │
│ Later, epoll/kqueue │
│ says "fd ready" ──────┤
│ → wake task │
│ → poll_read() again │
│ → data available! │
TcpListener accept loop
The fundamental server pattern:
#![allow(unused)]
fn main() {
use tokio::net::TcpListener;
let listener = TcpListener::bind("0.0.0.0:8080").await?;
loop {
let (stream, addr) = listener.accept().await?;
tokio::spawn(async move {
handle_connection(stream, addr).await;
});
}
}
Internally, accept() calls poll_accept() which:
- Tries
mio::TcpListener::accept()(non-blocking) - On
WouldBlock→ registers with reactor, returnsPending - Reactor wakes task when a new connection arrives
accept()retries → succeeds this time
Splitting a TcpStream
TcpStream
┌─────────┐
│ read │
│ write │
│ (one │
│ fd) │
└────┬────┘
│
┌─────────────┴─────────────┐
│ split() (borrowed) │ into_split() (owned)
▼ ▼
┌──────────────┐ ┌──────────────┐
│ ReadHalf<'_>│ │OwnedReadHalf │ ← Send + 'static
│ WriteHalf<'_>│ │OwnedWriteHalf│ ← Send + 'static
└──────────────┘ └──────────────┘
Same task only Different tasks OK
split()— borrows the stream, returnsReadHalf+WriteHalf. NotSend. Use within one task.into_split()— consumes the stream, returns owned halves.Send. Use when reader and writer are in different tasks.
#![allow(unused)]
fn main() {
// Two tasks: one reads, one writes
let (reader, writer) = stream.into_split();
tokio::spawn(read_loop(reader));
tokio::spawn(write_loop(writer));
}
UDP sockets
#![allow(unused)]
fn main() {
let socket = tokio::net::UdpSocket::bind("0.0.0.0:3000").await?;
// No connection — just send/receive datagrams
socket.send_to(b"ping", "127.0.0.1:4000").await?;
let mut buf = [0u8; 1024];
let (len, addr) = socket.recv_from(&mut buf).await?;
}
Exercises
Exercise 1: TCP echo with split
Write a TCP echo server that uses into_split() to put reading and writing in separate tasks. The reader reads lines and sends them through a tokio::sync::mpsc channel to the writer.
Exercise 2: UDP ping-pong
Create two UDP sockets. Socket A sends “ping” to Socket B. Socket B replies “pong”. Print the round-trip time.
Exercise 3: Connection counter
Build a TCP server that tracks total connections with an Arc<AtomicUsize>. Each new connection prints “Connection #N from {addr}”. Echo data back, then increment the counter on disconnect.
Exercise 4: Multi-client chat (simple)
TCP server where each client’s message is broadcast to all others. Use into_split() + a shared Vec<OwnedWriteHalf> behind a tokio::sync::Mutex.
Lesson 20: Task-Local Storage — tokio::task::LocalKey
Prerequisites: Lesson 4 (tasks), Lesson 14 (work-stealing).
Real-life analogy: name tags at a conference
┌───────────────────────────────────────────────────────────────┐
│ Conference Venue │
│ │
│ Thread-locals = room assignments: │
│ Room A has whiteboard "Project X" │
│ Room B has whiteboard "Project Y" │
│ If attendee MOVES rooms → sees wrong whiteboard! │
│ │
│ Task-locals = name tags on each person: │
│ Alice wears "Request #42" │
│ Bob wears "Request #99" │
│ No matter which room they walk into, │
│ their name tag follows them. │
│ │
│ In async: tasks migrate between OS threads (rooms). │
│ Thread-locals follow the room. Task-locals follow the task. │
└───────────────────────────────────────────────────────────────┘
The problem with thread-locals in async
Thread 1 Thread 2
┌────────────┐ ┌────────────┐
│ TLS: "abc" │ │ TLS: "xyz" │
│ │ │ │
│ Task A │ │ │
│ reads TLS │ │ │
│ → "abc" │ │ │
│ .await │───────►│ Task A │ (work-stealing moved it!)
│ │ │ reads TLS │
│ │ │ → "xyz" │ WRONG! Expected "abc"
└────────────┘ └────────────┘
In a multi-thread tokio runtime, a task can resume on any worker thread after .await. Thread-local values belong to the thread, not the task.
task_local! to the rescue
#![allow(unused)]
fn main() {
tokio::task_local! {
static REQUEST_ID: String;
}
async fn handle_request(id: String) {
REQUEST_ID.scope(id, async {
// Value is set for the duration of this future
do_work().await; // survives .await
do_more_work().await; // still correct
}).await;
}
async fn do_work() {
REQUEST_ID.with(|id| {
println!("Processing request: {id}");
});
}
}
Scoping rules
REQUEST_ID.scope("req-42", async {
│
│ REQUEST_ID.with(|id| ...) → Ok("req-42")
│
│ REQUEST_ID.scope("req-99", async { // nested: shadows outer
│ │
│ │ REQUEST_ID.with(|id| ...) → Ok("req-99")
│ │
│ }).await;
│
│ REQUEST_ID.with(|id| ...) → Ok("req-42") (restored)
│
}).await;
// Outside any scope:
REQUEST_ID.with(|id| ...) → PANIC!
REQUEST_ID.try_with(|id| ...) → Err(AccessError)
Key limitations
| Limitation | Why |
|---|---|
No set() method | Values are immutable within a scope |
| Not inherited by child tasks | tokio::spawn creates a fresh context |
Must use .scope() | Cannot set from outside an async context |
| One value per scope | Nesting shadows, does not merge |
Child tasks do NOT inherit
#![allow(unused)]
fn main() {
REQUEST_ID.scope("req-42".into(), async {
tokio::spawn(async {
// PANIC! REQUEST_ID is not set here.
REQUEST_ID.with(|id| println!("{id}"));
}).await;
}).await;
}
You must explicitly pass values to child tasks via .scope() or function arguments.
Common patterns
Request ID propagation
#![allow(unused)]
fn main() {
task_local! { static REQ_ID: u64; }
async fn middleware(req_id: u64, handler: impl Future<Output = ()>) {
REQ_ID.scope(req_id, handler).await;
}
async fn log_something() {
REQ_ID.with(|id| println!("[req={id}] doing something"));
}
}
Exercises
Exercise 1: Task-local survives .await
Define a REQUEST_ID task-local. Set it with .scope(), call an async function that reads it after an .await point. Verify the value is correct.
Exercise 2: Isolation between tasks
Spawn 10 tasks, each with a unique task-local value. Each task prints its value after a yield_now(). Verify no task sees another’s value.
Exercise 3: Child task does NOT inherit
Demonstrate that tokio::spawn inside a .scope() does NOT have access to the parent’s task-local. Use try_with to show it returns Err.
Lesson 21: Graceful Shutdown — CancellationToken, Signal Handling, Drain Pattern
Prerequisites: Lessons 15 (select!), 18 (tokio::sync).
Real-life analogy: closing a restaurant
┌────────────────────────────────────────────────────────────────┐
│ Closing Time Protocol │
│ │
│ Phase 1: STOP ACCEPTING │
│ Manager locks the front door. │
│ No new customers allowed. │
│ → Stop calling listener.accept() │
│ │
│ Phase 2: SIGNAL WORKERS │
│ Manager tells all waiters: "We're closing." │
│ → CancellationToken::cancel() │
│ │
│ Phase 3: DRAIN │
│ Let current diners finish their meals. │
│ Waiters complete current tables, don't start new ones. │
│ → Wait for in-flight tasks to complete │
│ │
│ Phase 4: HARD DEADLINE │
│ "Kitchen closes in 5 minutes, eat or leave." │
│ → tokio::time::timeout on the drain │
│ │
│ Phase 5: CLEANUP │
│ Turn off lights, lock up, save state. │
│ → Flush logs, close DB connections │
└────────────────────────────────────────────────────────────────┘
Architecture
┌──────────────┐
Ctrl+C ──────────► Signal │
SIGTERM ─────────► Handler │
└──────┬───────┘
│ cancel()
┌──────▼───────┐
│ Cancellation │
│ Token (root) │
└──────┬───────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
child_token child_token child_token
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│Task A │ │Task B │ │Task C │
│select!│ │select!│ │select!│
│cancel │ │cancel │ │cancel │
│or work│ │or work│ │or work│
└───────┘ └───────┘ └───────┘
Signal handling
#![allow(unused)]
fn main() {
use tokio::signal;
// Simple: wait for Ctrl+C
signal::ctrl_c().await?;
// In a server: use select! to race shutdown against work
tokio::select! {
_ = signal::ctrl_c() => {
println!("shutdown signal received");
}
_ = server.run() => {
println!("server exited on its own");
}
}
}
CancellationToken
From tokio_util::sync (but we can build the pattern with tokio::sync::Notify):
#![allow(unused)]
fn main() {
// Using Notify as a poor-man's CancellationToken
let shutdown = Arc::new(Notify::new());
// In signal handler:
shutdown.notify_waiters(); // wake ALL waiters
// In each worker task:
tokio::select! {
_ = shutdown.notified() => {
println!("shutting down");
return;
}
result = do_work() => {
// process result
}
}
}
The drain pattern
#![allow(unused)]
fn main() {
let shutdown = Arc::new(Notify::new());
let in_flight = Arc::new(AtomicUsize::new(0));
let all_done = Arc::new(Notify::new());
// Worker tasks:
in_flight.fetch_add(1, Ordering::SeqCst);
// ... do work ...
if in_flight.fetch_sub(1, Ordering::SeqCst) == 1 {
all_done.notify_one(); // last task done
}
// Shutdown sequence:
shutdown.notify_waiters(); // Phase 2: signal
tokio::time::timeout( // Phase 4: hard deadline
Duration::from_secs(30),
all_done.notified() // Phase 3: wait for drain
).await;
}
DropGuard pattern
Ensure cancellation even if the shutdown logic panics:
#![allow(unused)]
fn main() {
// Wrapping in a struct whose Drop triggers cleanup
struct ShutdownGuard {
notify: Arc<Notify>,
}
impl Drop for ShutdownGuard {
fn drop(&mut self) {
self.notify.notify_waiters();
}
}
}
Exercises
Exercise 1: Ctrl+C echo server
Build a TCP echo server that stops accepting on Ctrl+C, finishes active connections, then exits.
Exercise 2: Notify-based shutdown hierarchy
Create a parent Notify and 5 worker tasks. Each worker does a loop of work with select! checking for shutdown. Cancel all workers, verify they all exit.
Exercise 3: Drain with in-flight counter
Track in-flight requests with AtomicUsize. After signaling shutdown, wait until the counter reaches zero (use Notify). Add a 5-second hard timeout.
Exercise 4: Graceful shutdown with state persistence
On shutdown, serialize current application state (e.g., a counter value) to a file. On restart, load it back. This simulates real-world graceful shutdown where you save progress.
Lesson 22: Tracing & Debugging Async Code
Prerequisites: Lesson 16 (Tokio architecture), Lesson 20 (task-locals).
Real-life analogy: security cameras in a building
┌────────────────────────────────────────────────────────────────┐
│ Building Security System │
│ │
│ println!/log = writing on paper: │
│ "Something happened at some time." │
│ No context, no trail, hard to correlate. │
│ │
│ tracing = security cameras with zones: │
│ Camera in Lobby (span: "lobby") │
│ └── sees Person A enter at 9:01 (event) │
│ Camera in Office (span: "office_204") │
│ └── sees Person A sit down at 9:03 (event) │
│ └── sees Person A make a call at 9:05 (event) │
│ │
│ You can follow Person A across cameras (spans). │
│ Each camera records structured data (fields). │
│ Zoom in on one zone or see the whole building. │
│ │
│ Spans follow a task across .await points, │
│ just like cameras track a person across rooms. │
└────────────────────────────────────────────────────────────────┘
tracing vs log
log crate: tracing crate:
┌───────────────────────┐ ┌───────────────────────────┐
│ info!("got request") │ │ info!(id=42, "got req") │
│ │ │ │
│ Plain text messages. │ │ Structured fields. │
│ No context tracking. │ │ Span context across .await│
│ Thread-local at best. │ │ Composable layers. │
│ │ │ Works with tokio-console. │
└───────────────────────┘ └───────────────────────────┘
Spans and events
Span: "handle_request" (id=42)
│
├── Event: INFO "received request" { method: "GET", path: "/api" }
│
├── Span: "db_query" (table="users")
│ └── Event: DEBUG "query executed" { rows: 5, ms: 12 }
│
└── Event: INFO "response sent" { status: 200, ms: 15 }
Spans are like folders. Events are like log lines inside folders.
Spans carry context that events inherit.
The #[instrument] macro
#![allow(unused)]
fn main() {
use tracing::{info, instrument};
#[instrument(skip(stream), fields(addr = %addr))]
async fn handle_connection(stream: TcpStream, addr: SocketAddr) {
info!("new connection"); // automatically tagged with addr
let data = read_data(&stream).await;
info!(bytes = data.len(), "read complete");
}
}
#[instrument] automatically:
- Creates a span named after the function
- Records function arguments as span fields
- Enters/exits the span around
.awaitpoints
Subscribers and layers
┌──────────────────────────────────────────────────────────────┐
│ tracing-subscriber stack │
│ │
│ ┌─────────────────────┐ │
│ │ EnvFilter layer │ ← RUST_LOG=info,my_crate=debug │
│ └──────────┬──────────┘ │
│ ┌──────────▼──────────┐ │
│ │ fmt::Layer │ ← pretty-prints to stderr │
│ └──────────┬──────────┘ │
│ ┌──────────▼──────────┐ │
│ │ (optional: JSON) │ ← machine-readable output │
│ └──────────┬──────────┘ │
│ ┌──────────▼──────────┐ │
│ │ Registry │ ← collects spans + events │
│ └─────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
#![allow(unused)]
fn main() {
use tracing_subscriber::{fmt, EnvFilter, prelude::*};
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(fmt::layer())
.init();
}
tokio-console
A live diagnostic tool for async applications:
1. Add console-subscriber to your app
2. Build with: RUSTFLAGS="--cfg tokio_unstable" cargo build
3. Run your app
4. In another terminal: tokio-console
You see:
- All tasks: name, state (idle/running/waiting), polls, durations
- Waker counts: how often each task was woken
- Task details: where it was spawned, what it's waiting on
Common debugging patterns
| Problem | Tool |
|---|---|
| “Where is time spent?” | #[instrument] + span timing |
| “Why is this task stuck?” | tokio-console task list |
| “What’s the call chain?” | Nested spans in log output |
| “Which request caused this?” | Span fields (request_id) |
| “What are all tasks doing?” | Handle::dump() (tokio_unstable) |
Exercises
Exercise 1: Add structured tracing to a server
Add tracing and tracing-subscriber to a TCP echo server. Log connection events with structured fields (addr, bytes_read, duration). Use #[instrument].
Exercise 2: Custom event counter layer
Build a simple tracing layer (implement the Layer trait) that counts events per level (INFO, WARN, ERROR). Print the counts on shutdown.
Exercise 3: Request ID propagation with spans
Create a span with a request_id field for each incoming request. Log from nested functions — verify the request_id appears in all log lines automatically.
Project 3: HTTP Load Tester (mini wrk/hey)
Combines: Lessons 18-22 (tokio::sync, tokio::net, task-locals, graceful shutdown, tracing).
What you’ll build
A CLI tool that hammers an HTTP endpoint, controls concurrency with a Semaphore, handles Ctrl+C gracefully, propagates request IDs via task-locals, and reports latency percentiles.
Architecture
┌───────────────────────────────────────────────────────────────┐
│ load-tester --url http://example.com --requests 100 -c 10 │
└──────────────────────────┬────────────────────────────────────┘
│
┌──────▼──────┐
│ CLI Parser │ (clap)
│ --url │
│ --requests │
│ --concurrency│
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
┌────────▼───┐ ┌────▼─────┐ ┌──▼──────────┐
│ Semaphore │ │ Shutdown │ │ Result │
│ (c permits)│ │ (Notify) │ │ Collector │
│ │ │ Ctrl+C │ │ (Mutex<Vec>) │
└────────┬───┘ └────┬─────┘ └──┬──────────┘
│ │ │
└────────────┼───────────┘
│
┌────────────▼────────────────┐
│ for each request 1..N: │
│ tokio::spawn { │
│ acquire semaphore │
│ set task-local req_id │
│ select! { │
│ shutdown => return │
│ send_request => { │
│ record latency │
│ record status │
│ } │
│ } │
│ } │
└────────────┬────────────────┘
│
┌──────▼──────┐
│ Report │
│ p50, p90 │
│ p99, max │
│ throughput │
│ status map │
└─────────────┘
CLI interface
load-tester --url https://example.com/api --requests 1000 --concurrency 50
| Flag | Short | Default | Description |
|---|---|---|---|
--url | -u | required | Target URL |
--requests | -n | 100 | Total requests to send |
--concurrency | -c | 10 | Max concurrent requests |
Sample output
Target: https://example.com/api
Requests: 1000, Concurrency: 50
Running... [1000/1000] done
Results:
Total: 1000 requests
Succeeded: 985
Failed: 15
Duration: 2.34s
Throughput: 427.35 req/s
Latency:
p50: 4.2ms
p90: 12.1ms
p99: 45.3ms
max: 102.7ms
Status codes:
200: 985
503: 15
Key implementation details
Concurrency with Semaphore
#![allow(unused)]
fn main() {
let sem = Arc::new(Semaphore::new(concurrency));
for i in 0..total_requests {
let permit = sem.clone().acquire_owned().await?;
tokio::spawn(async move {
let result = send_request(&url).await;
drop(permit); // release slot
result
});
}
}
Latency percentiles
#![allow(unused)]
fn main() {
fn percentile(sorted: &[Duration], p: f64) -> Duration {
let idx = ((sorted.len() as f64) * p / 100.0) as usize;
sorted[idx.min(sorted.len() - 1)]
}
}
Graceful Ctrl+C
#![allow(unused)]
fn main() {
let shutdown = Arc::new(Notify::new());
tokio::spawn({
let s = shutdown.clone();
async move {
tokio::signal::ctrl_c().await.ok();
s.notify_waiters();
}
});
}
Making HTTP requests with raw TCP
Since we don’t have reqwest, we use raw TcpStream with minimal HTTP/1.1:
#![allow(unused)]
fn main() {
async fn http_get(url: &str) -> Result<(u16, Duration)> {
let start = Instant::now();
let mut stream = TcpStream::connect((host, port)).await?;
stream.write_all(format!("GET {path} HTTP/1.1\r\nHost: {host}\r\n\r\n").as_bytes()).await?;
// Read response, parse status code
Ok((status_code, start.elapsed()))
}
}
Exercises
Exercise 1: Basic load tester
Implement the full load tester with Semaphore-based concurrency, latency collection, and percentile reporting. Use raw TCP for HTTP requests.
Exercise 2: Ctrl+C graceful shutdown
Add tokio::signal::ctrl_c() handling. On Ctrl+C, stop spawning new requests, let in-flight ones finish, then print partial results.
Exercise 3: Live progress reporting
Print a progress line that updates every 100ms showing completed/total requests and current throughput. Use a separate task with tokio::time::interval.
Lesson 23: Backpressure – Bounded Channels, Semaphore, poll_ready
Prerequisites: Lessons 13 (channels), 18 (tokio::sync), 21 (graceful shutdown).
Real-life analogy: water pressure in pipes
UNBOUNDED (no backpressure):
Fire hydrant Tiny garden hose
=============>>>>>>>>>=============> BURST!
(fast producer) (slow consumer)
The producer pushes water faster than the pipe can carry.
Pressure builds. The pipe bursts. Your system OOMs.
BOUNDED (with backpressure):
Fire hydrant Pressure valve Garden hose
=============>>>> | BLOCKS | >>>>=============>
(fast producer) (capacity=100) (slow consumer)
When the pipe is full, the valve closes.
The producer STOPS until the consumer drains some water.
Nobody bursts. The system stays alive.
Architecture: bounded channel flow
Producer Consumer
| |
v |
tx.send(item).await |
| |
+--------v--------+ |
| Bounded Buffer | capacity=4 |
| [X] [X] [X] [X] | FULL! |
+---------+--------+ |
| |
Producer SUSPENDS here rx.recv().await
until consumer takes one |
| v
+--------v--------+ processes item
| [X] [X] [X] [_] | one slot free
+---------+--------+
|
Producer WAKES UP, sends next item
Bounded channels
#![allow(unused)]
fn main() {
let (tx, rx) = tokio::sync::mpsc::channel(100); // buffer of 100
// Producer blocks (awaits) when buffer is full
tx.send(item).await?;
// try_send for non-blocking "drop if full" strategy
match tx.try_send(item) {
Ok(()) => { /* sent */ }
Err(TrySendError::Full(item)) => { /* channel full, drop or retry */ }
Err(TrySendError::Closed(item)) => { /* receiver gone */ }
}
}
The send().await suspends when the buffer is full, naturally slowing the producer.
Semaphore-based flow control
When you do not use channels (e.g., spawned tasks hitting an API):
#![allow(unused)]
fn main() {
let sem = Arc::new(Semaphore::new(100));
loop {
let permit = sem.acquire().await?; // blocks if 100 tasks in-flight
tokio::spawn(async move {
do_work().await;
drop(permit); // releases slot
});
}
}
Strategies for overload
| Strategy | Mechanism | When to use |
|---|---|---|
| Block (await) | send().await | Default – propagate pressure |
| Drop newest | try_send + discard | Metrics, telemetry |
| Drop oldest | VecDeque pop front | Real-time video frames |
| Return error | HTTP 503 | Load shedding at the edge |
TCP analogy
TCP’s receive window is backpressure at the OS level. When the receiver’s buffer fills, the sender’s write() blocks. Tokio’s bounded channels are the application-level equivalent.
Exercises
Exercise 1: Bounded vs unbounded memory
Create a producer that generates items 10x faster than the consumer. Compare memory usage between a bounded channel (capacity 10) and an unbounded channel after 100,000 items.
Exercise 2: Semaphore rate limiter
Build a rate limiter using Semaphore that allows at most 5 concurrent HTTP-style requests. Log when a request waits for a permit vs gets one immediately.
Exercise 3: Drop-oldest policy
Implement a “drop oldest” buffer using VecDeque behind a Mutex. When the buffer is full, pop the front before pushing to the back.
Exercise 4: Chained backpressure
Chain two bounded channels: producer -> transformer -> consumer. Slow down the consumer and observe backpressure propagating all the way to the producer.
Lesson 24: Cancellation Safety — Dropped Futures, Data Loss, Safe Patterns
What you’ll learn
- What “cancellation safe” means for async functions
- How
select!can cause data loss with unsafe futures - Identifying cancellation-unsafe operations
- Patterns to make code cancellation-safe
Key concepts
The danger
When tokio::select! picks one branch, the other futures are dropped. If a dropped future had partially completed work (e.g., read some bytes into a buffer), that work is lost.
#![allow(unused)]
fn main() {
// UNSAFE: if sleep wins, partial read data is lost
tokio::select! {
result = reader.read(&mut buf) => { /* ... */ }
_ = tokio::time::sleep(timeout) => { /* timeout */ }
}
}
Cancellation-safe vs unsafe
| Safe | Unsafe |
|---|---|
tokio::sync::mpsc::Receiver::recv() | tokio::io::AsyncReadExt::read_exact() |
TcpListener::accept() | tokio::io::AsyncReadExt::read() with reused buffer |
tokio::time::sleep() | Futures that do partial work before first .await |
A future is cancellation-safe if dropping it after any .await point loses no data.
Safe patterns
Pattern 1: Use cancellation-safe alternatives
#![allow(unused)]
fn main() {
// Use recv() in select — it's cancellation-safe
tokio::select! {
msg = rx.recv() => { /* no data loss */ }
_ = token.cancelled() => { return; }
}
}
Pattern 2: Move work into a spawned task
#![allow(unused)]
fn main() {
let handle = tokio::spawn(async { read_exact(&mut buf).await });
tokio::select! {
result = handle => { /* ... */ }
_ = shutdown => { /* handle still runs to completion */ }
}
}
Pattern 3: Pin and reuse the future
#![allow(unused)]
fn main() {
let read_fut = pin!(reader.read(&mut buf));
// Reuse across select iterations instead of recreating
}
Exercises
- Demonstrate data loss: use
select!withread_exactand a timer, show bytes vanish - Fix the above using a spawned task
- Write a cancellation-safe message reader that accumulates bytes across
select!iterations - Audit
tokio::syncdocs — list which methods are cancellation-safe and which are not - Implement a
CancellationSafeReaderwrapper that buffers partial reads
Lesson 24: Bridging Sync and Async — block_on, spawn_blocking, Handle
What you’ll learn
- Calling async code from sync code (
block_on,Handle::block_on) - Calling sync/blocking code from async code (
spawn_blocking) - The blocking thread pool and its configuration
- Common pitfalls and anti-patterns
Key concepts
Async from sync: block_on
#![allow(unused)]
fn main() {
let rt = tokio::runtime::Runtime::new()?;
let result = rt.block_on(async {
fetch_data().await
});
}
block_on parks the current thread until the future completes. Never call it from inside an async context (deadlock).
Sync from async: spawn_blocking
#![allow(unused)]
fn main() {
let hash = tokio::task::spawn_blocking(move || {
// CPU-heavy or blocking I/O — runs on dedicated thread pool
compute_bcrypt_hash(&password)
}).await?;
}
The blocking pool has up to 512 threads by default. Tasks here do not block the async worker threads.
Handle for deferred async access
#![allow(unused)]
fn main() {
let handle = tokio::runtime::Handle::current();
std::thread::spawn(move || {
// From a plain OS thread, run async code:
handle.block_on(async {
client.get(url).send().await
});
});
}
Anti-patterns
| Anti-pattern | Problem | Fix |
|---|---|---|
block_on inside async | Deadlock | Use .await |
| Blocking in async task | Starves workers | spawn_blocking |
spawn_blocking for I/O | Wastes pool threads | Use async I/O |
| Nested runtimes | Panic | Use Handle::current() |
Exercises
- Call an async HTTP client from a synchronous
mainusingblock_on - Use
spawn_blockingto offload a CPU-heavy Fibonacci computation - Pass a
Handleto a std thread and use it to run async DNS resolution - Demonstrate the deadlock when calling
block_oninside an async task - Configure the blocking thread pool size with
max_blocking_threadsand observe behavior under load
Lesson 26: Streams — Async Iteration, StreamExt, Backpressure
What you’ll learn
- The
Streamtrait as the async equivalent ofIterator - Useful combinators from
StreamExtandTryStreamExt - Creating streams from channels, iterators, and async generators
- Backpressure considerations with streams
Key concepts
The Stream trait
#![allow(unused)]
fn main() {
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
}
Like Iterator, but poll_next can return Pending.
StreamExt combinators
#![allow(unused)]
fn main() {
use tokio_stream::StreamExt;
let mut stream = tokio_stream::iter(vec![1, 2, 3])
.map(|x| x * 2)
.filter(|x| *x > 2)
.take(5);
while let Some(item) = stream.next().await {
println!("{item}");
}
}
Key combinators: map, filter, take, merge, chain, throttle, chunks, timeout.
Creating streams
tokio_stream::iter()— from an iteratorReceiverStream::new(rx)— from anmpsc::Receiverasync_stream::stream!— from an async block withyieldBroadcastStream— from abroadcast::Receiver
Backpressure in streams
Streams are pull-based: the consumer calls next().await, so the producer only runs when demanded. This provides natural backpressure. Combine with buffer_unordered(n) to control concurrency:
#![allow(unused)]
fn main() {
stream
.map(|url| async move { fetch(url).await })
.buffer_unordered(10) // at most 10 concurrent fetches
}
Exercises
- Convert an
mpsc::Receiverinto a stream and process items withStreamExt::map - Use
buffer_unorderedto fetch 100 URLs with max 10 concurrent requests - Implement a custom
Streamthat yields Fibonacci numbers with a delay - Use
stream.chunks(10)to batch database inserts - Merge two streams and process items in arrival order
Lesson 27: Connection Pooling — Reuse, Health Checks, Idle Timeout
What you’ll learn
- Why connection pooling matters (TCP handshake, TLS, auth overhead)
- Building a pool with
Semaphore+VecDeque - Health checking idle connections
- Idle timeout and eviction strategies
Key concepts
Why pool?
Each new TCP connection costs: DNS lookup, TCP handshake (1 RTT), TLS handshake (1-2 RTT), and often authentication. Reusing connections amortizes this cost.
Pool architecture
checkout() -> acquire Semaphore permit
-> pop from idle queue (or create new)
-> health check
-> return PooledConnection
drop(PooledConnection) -> health check
-> push to idle queue
-> release Semaphore permit
Core components
#![allow(unused)]
fn main() {
struct Pool {
idle: Mutex<VecDeque<Connection>>,
semaphore: Semaphore, // limits total connections
max_idle: usize,
idle_timeout: Duration,
}
}
Health checks
- On checkout — ping before returning to caller
- On return — verify connection is still usable
- Background — periodic sweep of idle connections
Idle timeout
Connections sitting idle too long may be closed by the server or a firewall. Evict them:
#![allow(unused)]
fn main() {
// Background task
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
pool.evict_idle_older_than(idle_timeout);
}
}
Real-world pools
deadpool— generic async poolbb8— based on r2d2’s designsqlx— built-in pool for database connections
Exercises
- Build a simple TCP connection pool using
SemaphoreandVecDeque - Add a health check that sends a ping before returning a connection
- Implement idle timeout eviction with a background reaper task
- Add metrics: total connections, idle connections, wait time
- Stress test: 100 concurrent tasks sharing a pool of 10 connections
Lesson 28: Testing Async Code — tokio::test, Time Mocking, Deterministic Testing
What you’ll learn
- Using
#[tokio::test]for async unit tests - Mocking time with
tokio::time::pause()andadvance() - Deterministic testing with
current_threadruntime - Testing patterns for channels, tasks, and I/O
Key concepts
#[tokio::test]
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_something() {
let result = my_async_fn().await;
assert_eq!(result, 42);
}
}
By default, uses current_thread runtime. For multi-thread: #[tokio::test(flavor = "multi_thread")].
Time mocking
pause() freezes time; advance() moves it forward instantly:
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_timeout() {
tokio::time::pause();
let start = Instant::now();
tokio::time::sleep(Duration::from_secs(3600)).await;
// Completes instantly — time is mocked
assert!(start.elapsed() >= Duration::from_secs(3600));
}
}
Auto-advance: when all tasks are waiting on time, the runtime jumps to the next timer automatically.
Deterministic testing
current_thread runtime is deterministic — tasks run in a predictable order. Useful for reproducing race conditions.
Testing patterns
| Pattern | Approach |
|---|---|
| Test a spawned task | Use JoinHandle to await result |
| Test channels | Create channel, send, recv, assert |
| Test shutdown | Create CancellationToken, cancel, verify cleanup |
| Test I/O | Use tokio::io::duplex() for in-memory streams |
| Test timeouts | Pause time, advance past deadline |
tokio::io::duplex
#![allow(unused)]
fn main() {
let (client, server) = tokio::io::duplex(1024);
// Use client and server as AsyncRead + AsyncWrite
// No real TCP needed
}
Exercises
- Write a
#[tokio::test]that verifies an async function returns the correct value - Use
time::pause()andadvance()to test a retry function with exponential backoff (no real waiting) - Test a producer-consumer pipeline using
mpscchannels - Use
tokio::io::duplex()to test a protocol parser without real sockets - Test graceful shutdown: cancel a token, verify all tasks exit cleanly
Project 4: Async Job Queue with Workers, Backpressure, Graceful Shutdown
What you’ll learn
- Designing a multi-stage async pipeline
- Backpressure with bounded channels
spawn_blockingfor CPU-bound work- Cancellation safety across the pipeline
- Testing async systems with time mocking
Specification
Architecture
Producer(s)
|
v
bounded mpsc channel (backpressure)
|
v
Dispatcher
|
+---> Worker 1 (spawn_blocking for CPU work)
+---> Worker 2
+---> Worker N
|
v
Results channel --> Result collector
Job definition
#![allow(unused)]
fn main() {
struct Job {
id: u64,
payload: Vec<u8>,
job_type: JobType,
}
enum JobType {
CpuBound { iterations: u64 }, // use spawn_blocking
AsyncIo { url: String }, // use async I/O
Delayed { delay: Duration }, // sleep then execute
}
struct JobResult {
job_id: u64,
duration: Duration,
status: ResultStatus,
}
}
Components
| Component | Responsibility |
|---|---|
| Producer | Generates jobs, sends to bounded channel; blocks on backpressure |
| Dispatcher | Receives jobs, assigns to workers via Semaphore(N) |
| Worker | Executes job; uses spawn_blocking for CPU-bound work |
| Collector | Receives results, tracks stats, reports progress |
| Shutdown coordinator | CancellationToken hierarchy; drain pattern |
Cancellation safety
- Dispatcher uses cancellation-safe
recv()inselect! - Workers check cancellation between stages
- On shutdown: stop accepting new jobs, drain in-flight, collect final results
Testing requirements
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_backpressure() {
// Slow workers, fast producer -> producer blocks
}
#[tokio::test]
async fn test_graceful_shutdown() {
tokio::time::pause();
// Submit jobs, cancel token, advance time, verify all complete
}
#[tokio::test]
async fn test_worker_pool_concurrency() {
// Submit N jobs, verify at most W run concurrently
}
}
Key concepts
- Bounded channel for Producer -> Dispatcher backpressure
- Semaphore in Dispatcher to limit concurrent workers
- spawn_blocking for
JobType::CpuBoundto avoid starving async workers - CancellationToken hierarchy: root -> dispatcher -> workers
- time::pause + advance for deterministic testing of delays and timeouts
Exercises
- Implement the basic pipeline: Producer -> Channel -> Dispatcher -> Workers -> Collector
- Add backpressure: bounded channel with capacity 10, verify producer blocks when queue is full
- Implement
spawn_blockingfor CPU-bound jobs; verify async workers are not starved - Add graceful shutdown with
CancellationTokenand drain pattern - Write tests using
time::pause()for theDelayedjob type - Add a dead-letter queue for failed jobs with retry logic (max 3 retries, exponential backoff)
- Add metrics: jobs/sec, average latency, queue depth, worker utilization
Pattern 1: Task-per-Connection
Real-life analogy: the hotel concierge desk
Guest arrives → front desk assigns a personal concierge
Concierge 1: handles Guest A (room service, wake-up call, taxi)
Concierge 2: handles Guest B (restaurant booking, laundry)
Concierge 3: handles Guest C (tour arrangements)
Each concierge handles ONE guest's entire stay.
When the guest checks out, the concierge is free.
This is task-per-connection:
connection arrives → spawn a task → task handles everything → task ends
The pattern
The simplest and most common async architecture. For every incoming connection, spawn a dedicated task:
#![allow(unused)]
fn main() {
loop {
let (stream, addr) = listener.accept().await?;
tokio::spawn(async move {
handle_client(stream).await;
});
}
}
┌──────────────────────────────────────────────────┐
│ Task-per-Connection │
│ │
│ Listener │
│ │ │
│ ├── accept() → spawn(handle(client_1)) │
│ ├── accept() → spawn(handle(client_2)) │
│ ├── accept() → spawn(handle(client_3)) │
│ └── ... │
│ │
│ Each task: │
│ read request → process → write response │
│ → loop or disconnect │
│ │
│ Tasks are independent. One slow client │
│ doesn't affect others. │
└──────────────────────────────────────────────────┘
When to use
- Web servers — each HTTP request gets a task
- Database servers — each client connection gets a task
- Chat servers — each user gets a task
- Proxies — each proxied connection gets a task
- Game servers — each player gets a task
Basically: anything that accepts connections and handles them independently.
When NOT to use
- When connections need to share heavy state (use actors instead)
- When you need to limit concurrency precisely (add a semaphore)
- When tasks need to coordinate tightly (use channels between tasks)
The concurrency limit problem
Spawning unlimited tasks can exhaust memory:
10,000 connections → 10,000 tasks → fine
100,000 connections → 100,000 tasks → maybe fine
1,000,000 connections → 1,000,000 tasks → might OOM
Solution: limit concurrent connections with a semaphore:
#![allow(unused)]
fn main() {
let semaphore = Arc::new(Semaphore::new(10_000)); // max 10K concurrent
loop {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let (stream, _) = listener.accept().await?;
tokio::spawn(async move {
handle_client(stream).await;
drop(permit); // release the slot
});
}
}
Shared state between tasks
Tasks often need shared state (user list, config, counters). Three approaches:
Option A: Arc<Mutex<T>>
Simple. Lock contention if many tasks write.
Good for: counters, small config objects.
Option B: Arc<RwLock<T>>
Many readers, few writers.
Good for: shared config that rarely changes.
Option C: Dedicated state task (Actor pattern → next chapter)
One task owns the state, others send messages.
Good for: complex state, no lock contention.
Code exercise: TCP Chat Server
Build a chat server where each client gets a task:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Client A │ │ Client B │ │ Client C │
│ (task) │ │ (task) │ │ (task) │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└───────┬────────┴────────┬───────┘
│ │
┌───────▼─────────────────▼───────┐
│ Shared state: │
│ HashMap<ClientId, Sender> │
│ (behind Arc<Mutex>) │
└─────────────────────────────────┘
Requirements:
- Accept TCP connections, spawn a task per client
- Each task reads lines from its client
- Broadcast messages to all other clients
- Handle disconnect (remove from shared state)
- Limit to 100 concurrent connections with a semaphore
Starter code:
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
let semaphore = Arc::new(Semaphore::new(100));
// TODO: shared state for connected clients
loop {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let (stream, addr) = listener.accept().await.unwrap();
// TODO: clone shared state
tokio::spawn(async move {
println!("{addr} connected");
// TODO: handle client (read lines, broadcast, disconnect)
drop(permit);
});
}
}
Test with: nc 127.0.0.1 8080 in multiple terminals.
Pattern 2: Actor Model
Real-life analogy: office departments
┌──────────────┐ memo ┌──────────────┐ memo ┌──────────────┐
│ Sales Dept │ ──────►│ Accounting │ ──────►│ Shipping │
│ │ │ │ │ │
│ Inbox: 📬 │ │ Inbox: 📬 │ │ Inbox: 📬 │
│ State: leads│ │ State: books│ │ State: orders│
│ Staff: 1 │ │ Staff: 1 │ │ Staff: 1 │
└──────────────┘ └──────────────┘ └──────────────┘
Each department:
- Has its own inbox (channel)
- Processes memos one at a time (no multitasking within a dept)
- Has private state (no other dept can touch it)
- Communicates only via memos (messages)
Nobody walks into Accounting and grabs the books.
They send a memo and wait for a reply.
The pattern
An actor is a task that:
- Owns its state exclusively (no shared memory)
- Receives messages through a channel (its “inbox”)
- Processes messages one at a time (sequential, no locks needed)
- Can send messages to other actors
#![allow(unused)]
fn main() {
struct BankAccount {
balance: u64,
inbox: mpsc::Receiver<AccountMessage>,
}
enum AccountMessage {
Deposit { amount: u64 },
Withdraw { amount: u64, reply: oneshot::Sender<Result<(), String>> },
GetBalance { reply: oneshot::Sender<u64> },
}
impl BankAccount {
async fn run(mut self) {
while let Some(msg) = self.inbox.recv().await {
match msg {
AccountMessage::Deposit { amount } => {
self.balance += amount;
}
AccountMessage::Withdraw { amount, reply } => {
if self.balance >= amount {
self.balance -= amount;
let _ = reply.send(Ok(()));
} else {
let _ = reply.send(Err("insufficient funds".into()));
}
}
AccountMessage::GetBalance { reply } => {
let _ = reply.send(self.balance);
}
}
}
}
}
}
┌────────────────────────────────────────────────────────┐
│ Actor Model │
│ │
│ ┌─────────────┐ msg ┌─────────────┐ │
│ │ Client │ ──────►│ Actor │ │
│ │ (any task) │ │ │ │
│ │ │◄───────│ - inbox │ │
│ │ sends msg │ reply │ - state │ │
│ │ + oneshot │ │ - run loop │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ No locks. No shared state. No data races. │
│ The actor processes one message at a time. │
│ State is private — only the actor touches it. │
└────────────────────────────────────────────────────────┘
Actor vs Shared State
Shared state (Arc<Mutex<T>>): Actor:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Task A │ │ Task B │ │ Task A │──msg──┐
│ lock() │ │ lock() │ │ │ │
│ modify │ │ BLOCKED │ └──────────┘ ▼
│ unlock() │ │ ... │ ┌──────────┐ ┌─────────┐
└──────────┘ └──────────┘ │ Task B │──► Actor │
│ │ │ (no lock│
Lock contention. └──────────┘ │ needed)│
Deadlock risk. └─────────┘
Complex error handling. No contention.
Sequential processing.
Simple reasoning.
When to use
- Stateful services — user sessions, game entities, connection managers
- When state is complex — a mutex would be held too long or across
.await - Isolation — each actor can fail independently without corrupting shared state
- Erlang/Elixir-style systems — the actor model is their core abstraction
When NOT to use
- Simple shared counters —
AtomicU64orArc<Mutex<u64>>is simpler - Read-heavy workloads — actors serialize all access;
RwLockallows concurrent reads - Fire-and-forget operations — if you don’t need a reply, a plain
spawnis simpler
The request-reply pattern
To get data back from an actor, send a oneshot::Sender with the message:
#![allow(unused)]
fn main() {
// Client side:
let (reply_tx, reply_rx) = oneshot::channel();
actor_tx.send(AccountMessage::GetBalance { reply: reply_tx }).await?;
let balance = reply_rx.await?; // waits for the actor to respond
}
Code exercise: Bank System
Build a bank with account actors:
┌──────────┐ ┌────────────────┐
│ Client │────►│ Account "alice"│ (actor)
│ (task) │ │ balance: 1000 │
└──────────┘ └────────────────┘
│
│ ┌────────────────┐
└──────────►│ Account "bob" │ (actor)
│ balance: 500 │
└────────────────┘
Requirements:
- Each bank account is an actor (a task with a channel inbox)
- Support:
Deposit,Withdraw,GetBalance,Transfer(to_account, amount) - Transfer is atomic: withdraw from A, deposit to B. If withdraw fails, B is unchanged.
- Multiple clients can interact with accounts concurrently — no locks.
Starter code:
#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, oneshot};
enum AccountMsg {
Deposit { amount: u64 },
Withdraw { amount: u64, reply: oneshot::Sender<Result<(), String>> },
Balance { reply: oneshot::Sender<u64> },
}
#[derive(Clone)]
struct AccountHandle {
tx: mpsc::Sender<AccountMsg>,
}
impl AccountHandle {
async fn deposit(&self, amount: u64) {
self.tx.send(AccountMsg::Deposit { amount }).await.unwrap();
}
async fn balance(&self) -> u64 {
let (tx, rx) = oneshot::channel();
self.tx.send(AccountMsg::Balance { reply: tx }).await.unwrap();
rx.await.unwrap()
}
// TODO: withdraw, transfer
}
fn spawn_account(name: &str, initial_balance: u64) -> AccountHandle {
let (tx, mut rx) = mpsc::channel(32);
let name = name.to_string();
tokio::spawn(async move {
let mut balance = initial_balance;
while let Some(msg) = rx.recv().await {
match msg {
AccountMsg::Deposit { amount } => balance += amount,
// TODO: handle other messages
_ => todo!(),
}
}
println!("{name} actor shutting down");
});
AccountHandle { tx }
}
}
Test: create Alice (1000) and Bob (500). Transfer 200 from Alice to Bob. Check balances: Alice=800, Bob=700.
Pattern 3: Pipeline / Stream Processing
Real-life analogy: the car factory assembly line
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Station 1│───►│ Station 2│───►│ Station 3│───►│ Station 4│
│ Weld │ │ Paint │ │ Install │ │ Quality │
│ frame │ │ body │ │ engine │ │ check │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Car 1: [quality check]
Car 2: [install engine]
Car 3: [paint]
Car 4: [weld frame]
All stations work simultaneously on different cars.
If painting is slow, welding pauses (backpressure).
The pattern
A chain of tasks connected by channels. Each stage processes items and passes them to the next:
producer → channel → transform → channel → filter → channel → sink
┌──────────────────────────────────────────────────────────┐
│ Pipeline │
│ │
│ ┌────────┐ ch ┌──────────┐ ch ┌────────┐ ch ┌──┐│
│ │Producer│ ───► │Transform │ ───► │ Filter │ ───► │Out││
│ │ │ │ │ │ │ │ ││
│ │read │ │parse │ │errors │ │ ││
│ │lines │ │JSON │ │only │ │ ││
│ └────────┘ └──────────┘ └────────┘ └──┘│
│ │
│ Each stage is a separate task. │
│ Bounded channels provide backpressure. │
│ If the filter is slow, transform pauses. │
└──────────────────────────────────────────────────────────┘
#![allow(unused)]
fn main() {
let (tx1, rx1) = mpsc::channel(100); // producer → parser
let (tx2, rx2) = mpsc::channel(100); // parser → filter
let (tx3, rx3) = mpsc::channel(100); // filter → output
// Stage 1: produce
tokio::spawn(async move {
for line in read_lines("access.log").await {
tx1.send(line).await.unwrap();
}
});
// Stage 2: parse
tokio::spawn(async move {
while let Some(line) = rx1.recv().await {
if let Ok(record) = parse_json(&line) {
tx2.send(record).await.unwrap();
}
}
});
// Stage 3: filter
tokio::spawn(async move {
while let Some(record) = rx2.recv().await {
if record.status >= 500 {
tx3.send(record).await.unwrap();
}
}
});
// Stage 4: output
tokio::spawn(async move {
while let Some(error) = rx3.recv().await {
println!("ERROR: {} {}", error.path, error.status);
}
});
}
Why bounded channels matter
Unbounded channels (BAD):
Producer: 1,000,000 items/sec
Consumer: 100 items/sec
→ channel grows to 999,900 items → OOM
Bounded channels (GOOD):
channel(100)
Producer: 1,000,000 items/sec
Consumer: 100 items/sec
→ channel fills to 100 → producer.send().await BLOCKS
→ producer slows down to match consumer
→ memory stays constant
→ this is BACKPRESSURE
When to use
- Log/event processing — parse, filter, aggregate, alert
- ETL pipelines — extract, transform, load
- Video/audio processing — decode → transform → encode
- Network packet processing — capture → parse → analyze → store
When NOT to use
- Request/response — a pipeline flows one direction; use task-per-connection for req/res
- Simple transformations — if it’s one step, just do it inline (no pipeline needed)
- When order doesn’t matter — use fan-out/fan-in instead (next pattern)
Code exercise: Log Analyzer
Build a pipeline that processes web server access logs:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Read │───►│ Parse │───►│ Filter │───►│ Aggregate│
│ lines │ │ fields │ │ errors │ │ count by │
│ from file│ │ (split) │ │ (5xx) │ │ endpoint │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Input (access.log):
GET /api/users 200 12ms
POST /api/login 401 5ms
GET /api/data 500 1502ms
GET /api/users 200 8ms
GET /api/data 503 30000ms
POST /api/upload 500 250ms
Output:
Error summary:
/api/data: 2 errors (500, 503)
/api/upload: 1 error (500)
Total: 3 errors out of 6 requests
Requirements:
- Four pipeline stages, each a separate task
- Bounded channels (capacity 100) between stages
- Producer reads lines from a file (or generates them)
- Parser extracts: method, path, status, latency
- Filter passes only 5xx status codes
- Aggregator counts errors by endpoint, prints summary when done
Pattern 4: Fan-out / Fan-in
Real-life analogy: the research team
Professor assigns papers to read:
┌──────────┐
│Professor │
│(dispatch)│
└────┬─────┘
│ assigns
┌────┼────────────┐
│ │ │
▼ ▼ ▼
┌────┐ ┌────┐ ┌────┐
│TA 1│ │TA 2│ │TA 3│ ← fan-out: work distributed
│read│ │read│ │read│
│ 10 │ │ 10 │ │ 10 │
└──┬─┘ └──┬─┘ └──┬─┘
│ │ │
└──────┼──────┘
│ summaries
▼
┌──────────┐
│Professor │ ← fan-in: results collected
│(collect) │
│write │
│report │
└──────────┘
30 papers read in parallel, not sequentially.
Total time: time of slowest TA, not sum of all.
The pattern
One task distributes work to N workers, another collects results:
┌──────────────────────────────────────────────────────────┐
│ Fan-out / Fan-in │
│ │
│ ┌──────────┐ │
│ │Dispatcher│ │
│ └────┬─────┘ │
│ │ fan-out │
│ ┌────┼────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──┐ ┌──┐ ┌──┐ ┌──┐ │
│ │W1│ │W2│ │W3│ │W4│ ← N workers (concurrent) │
│ └┬─┘ └┬─┘ └┬─┘ └┬─┘ │
│ │ │ │ │ │
│ └────┼────┼────┘ │
│ │ │ fan-in │
│ ┌────▼────▼───┐ │
│ │ Collector │ │
│ └─────────────┘ │
└──────────────────────────────────────────────────────────┘
In Rust: JoinSet
#![allow(unused)]
fn main() {
use tokio::task::JoinSet;
let urls = vec!["https://a.com", "https://b.com", "https://c.com"];
let mut set = JoinSet::new();
// Fan-out: spawn one task per URL
for url in urls {
set.spawn(async move {
reqwest::get(url).await
});
}
// Fan-in: collect results as they complete
while let Some(result) = set.join_next().await {
match result {
Ok(Ok(response)) => println!("Got: {}", response.status()),
Ok(Err(e)) => println!("Request failed: {e}"),
Err(e) => println!("Task panicked: {e}"),
}
}
}
Concurrency limiting
Without a limit, fan-out can overwhelm the target:
Fan-out 10,000 HTTP requests simultaneously:
→ target server returns 429 Too Many Requests
→ or your machine runs out of file descriptors
Solution: Semaphore limits concurrent workers
#![allow(unused)]
fn main() {
let semaphore = Arc::new(Semaphore::new(50)); // max 50 concurrent
for url in urls {
let permit = semaphore.clone().acquire_owned().await.unwrap();
set.spawn(async move {
let result = fetch(url).await;
drop(permit); // release the slot
result
});
}
}
When to use
- Parallel HTTP requests — fetch 100 URLs, collect results
- Batch processing — process 10,000 records, N at a time
- Map-reduce — transform items in parallel, aggregate results
- Health checks — ping N services, report which are up
When NOT to use
- Sequential dependencies — if step 2 depends on step 1’s output, use a pipeline instead
- Single resource — if all workers hit the same bottleneck (one database), parallelism doesn’t help
- Ordering matters — fan-in collects in completion order, not submission order
Code exercise: Web Crawler
Build a concurrent web crawler:
┌──────────┐
│ Seed URL │
│ list │
└────┬─────┘
│ fan-out (max 10 concurrent)
┌────┼──────────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌──┐ ┌──┐ ┌──┐ ┌──┐
│F1│ │F2│ │F3│ │F4│ fetch pages
└┬─┘ └┬─┘ └┬─┘ └┬─┘
│ │ │ │ fan-in
└────┼────┼────┘
▼
┌──────────┐
│ Results │
│ - URL │
│ - status │
│ - size │
│ - time │
└──────────┘
Requirements:
- Read a list of URLs (from a file or hardcoded)
- Fetch each URL concurrently (fan-out)
- Limit concurrency to 10 with a semaphore
- Collect results (fan-in): URL, HTTP status, response size, latency
- Print a summary table when all are done
Starter code:
use tokio::task::JoinSet;
use tokio::sync::Semaphore;
use std::sync::Arc;
use std::time::{Duration, Instant};
struct CrawlResult {
url: String,
status: u16,
size: usize,
latency: Duration,
}
async fn fetch(url: &str) -> CrawlResult {
let start = Instant::now();
// TODO: make HTTP request (use tokio::net::TcpStream + raw HTTP, or reqwest)
// Return CrawlResult
todo!()
}
#[tokio::main]
async fn main() {
let urls = vec![
"http://example.com",
"http://httpbin.org/get",
"http://httpbin.org/delay/2",
// add more
];
let semaphore = Arc::new(Semaphore::new(10));
let mut set = JoinSet::new();
for url in urls {
let sem = semaphore.clone();
set.spawn(async move {
let _permit = sem.acquire().await.unwrap();
fetch(url).await
});
}
// TODO: collect results, print summary table
}
Expected output:
URL Status Size Latency
─────────────────────────────────────────────────────
http://example.com 200 1256B 120ms
http://httpbin.org/get 200 432B 89ms
http://httpbin.org/delay/2 200 312B 2045ms
Total: 3 URLs, 3 success, 0 failed
Pattern 5: Supervisor Tree
Real-life analogy: the corporate hierarchy
CEO (root supervisor)
│
├── VP Engineering (supervisor)
│ ├── Team Lead Backend (supervisor)
│ │ ├── Developer 1 (worker) ← quits
│ │ ├── Developer 2 (worker) Team Lead hires
│ │ └── Developer 3 (worker) a replacement
│ │
│ └── Team Lead Frontend (supervisor)
│ ├── Developer 4 (worker)
│ └── Developer 5 (worker)
│
└── VP Operations (supervisor)
└── SRE Team Lead (supervisor)
├── Oncall 1 (worker)
└── Oncall 2 (worker) ← quits → Team Lead hires replacement
When a developer quits (task crashes):
- Their team lead (supervisor) hires a replacement (restart)
- The VP doesn't even know it happened
- If the team lead quits → VP restarts the team lead + all their reports
The pattern
A supervisor monitors child tasks and restarts them when they fail:
┌──────────────────────────────────────────────────────────┐
│ Supervisor Tree │
│ │
│ ┌───────────┐ │
│ │ Supervisor│ │
│ │ │ │
│ │ children: │ │
│ │ [W1, W2] │ │
│ └─────┬─────┘ │
│ │ │
│ ┌───┴───┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌───┐ ┌───┐ │
│ │W1 │ │W2 │ ← W2 panics! │
│ └───┘ └───┘ │
│ │ │
│ Supervisor detects W2 exit │
│ Logs the failure │
│ Spawns a NEW W2 │
│ │ │
│ ┌──▼──┐ │
│ │W2' │ ← replacement, same job │
│ └─────┘ │
│ │
│ Restart strategies: │
│ one-for-one: restart only the crashed child │
│ one-for-all: restart ALL children (for dependent tasks)│
│ rest-for-one: restart crashed + all after it │
└──────────────────────────────────────────────────────────┘
#![allow(unused)]
fn main() {
async fn supervisor(num_workers: usize) {
let mut set = JoinSet::new();
// Spawn initial workers
for id in 0..num_workers {
set.spawn(worker(id));
}
// Monitor and restart
loop {
match set.join_next().await {
Some(Ok(())) => {
println!("Worker finished normally");
}
Some(Err(e)) => {
println!("Worker crashed: {e}. Restarting...");
set.spawn(worker(next_id()));
}
None => break, // all workers done, supervisor exits
}
}
}
async fn worker(id: usize) {
loop {
// do work...
// might panic!
}
}
}
Erlang’s “let it crash” philosophy
Traditional approach: Erlang/supervisor approach:
fn process() { fn process() {
if error { // just do the work
handle_error(); // if something goes wrong,
recover(); // let it crash
retry(); // the supervisor will restart us
log(); }
// 50 lines of error handling
}
}
Complex, error-prone. Simple, robust.
Every function handles its Errors bubble up to supervisor.
own errors. Supervisor has ONE job: restart.
When to use
- Long-running services — web servers, message brokers, game servers
- Worker pools — N workers processing a queue; crashed ones are replaced
- Unreliable external dependencies — task talks to flaky API; crashes get restarted
- Fault isolation — one bad request crashes one task, not the whole server
When NOT to use
- Short-lived programs — CLI tools, scripts (just exit on error)
- Errors that should propagate — if the whole program should stop on error
- Debugging — supervisors can mask bugs by restarting endlessly (add restart limits)
Restart limits
Prevent infinite restart loops:
#![allow(unused)]
fn main() {
let mut restart_count = 0;
let max_restarts = 5;
let reset_interval = Duration::from_secs(60);
let mut window_start = Instant::now();
// In the supervisor loop:
if window_start.elapsed() > reset_interval {
restart_count = 0;
window_start = Instant::now();
}
restart_count += 1;
if restart_count > max_restarts {
eprintln!("Too many restarts in {}s. Giving up.", reset_interval.as_secs());
break;
}
}
Code exercise: Resilient Worker Pool
Build a supervisor that manages a pool of workers:
┌──────────────┐
│ Supervisor │
│ │
│ max_restart:│
│ 5 per 60s │
└──────┬───────┘
│
┌────┼────┐
│ │ │
▼ ▼ ▼
┌──┐ ┌──┐ ┌──┐
│W1│ │W2│ │W3│ ← workers process jobs from a shared queue
└──┘ └──┘ └──┘
Requirements:
- Supervisor spawns 3 workers
- Each worker pulls jobs from a shared
mpscchannel - Workers randomly “crash” (panic) on some jobs
- Supervisor detects the crash and spawns a replacement
- Replacement worker connects to the same job channel
- After 5 restarts in 60 seconds, supervisor gives up and exits
- Remaining jobs are processed by surviving workers
Starter code:
use tokio::task::JoinSet;
use tokio::sync::mpsc;
use std::time::{Duration, Instant};
async fn worker(id: usize, mut jobs: mpsc::Receiver<String>) {
while let Some(job) = jobs.recv().await {
// Simulate random crashes
if rand::random::<f32>() < 0.1 {
panic!("Worker {id} crashed on job: {job}");
}
println!("[Worker {id}] processed: {job}");
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[tokio::main]
async fn main() {
// TODO: create job channel, spawn workers, supervise
}
Expected output:
[Worker 0] processed: job-1
[Worker 1] processed: job-2
[Worker 2] processed: job-3
Worker 1 crashed: panicked. Restarting... (restart 1/5)
[Worker 3] processed: job-4
[Worker 0] processed: job-5
...
All jobs processed. Total restarts: 3
Pattern 6: Event Bus / Pub-Sub
Real-life analogy: the radio station
┌──────────────┐
│ Radio Station│ broadcasts on 101.5 FM
│ (publisher) │
└──────┬───────┘
│ broadcast signal
┌────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌────┐ ┌────┐ ┌────┐
│Car │ │Home│ │Gym │ ← anyone tuned to 101.5 hears it
│ 🚗 │ │ 🏠 │ │ 🏋️ │
└────┘ └────┘ └────┘
Publisher sends once.
All subscribers receive.
New subscribers can join anytime.
Publisher doesn't know (or care) who's listening.
The pattern
A publisher emits events. Multiple subscribers receive them. Publisher and subscribers are decoupled — they don’t know about each other.
┌──────────────────────────────────────────────────────────┐
│ Event Bus │
│ │
│ ┌───────────┐ ┌─────────────┐ │
│ │ Publisher │────►│ broadcast │ │
│ │ (metrics) │ │ channel │ │
│ └───────────┘ └──────┬──────┘ │
│ │ │
│ ┌───────────┼───────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Dashboard │ │Logger │ │Alerter │ │
│ │subscriber│ │subscriber│ │subscriber│ │
│ │shows live│ │writes to │ │sends │ │
│ │graphs │ │disk │ │Slack msg │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Publisher doesn't know who listens. │
│ Subscribers don't know who publishes. │
│ New subscribers can join anytime. │
└──────────────────────────────────────────────────────────┘
In Rust: tokio::sync::broadcast
#![allow(unused)]
fn main() {
use tokio::sync::broadcast;
let (tx, _) = broadcast::channel(100); // buffer 100 events
// Subscriber 1
let mut rx1 = tx.subscribe();
tokio::spawn(async move {
while let Ok(event) = rx1.recv().await {
println!("[dashboard] {event}");
}
});
// Subscriber 2
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
while let Ok(event) = rx2.recv().await {
println!("[logger] {event}");
}
});
// Publisher
tx.send("user_login".to_string()).unwrap();
tx.send("page_view".to_string()).unwrap();
// Both subscribers receive both events
}
broadcast vs mpsc
mpsc (multi-producer, single consumer):
10 senders → 1 receiver
Each message consumed by ONE receiver
Use for: work queues, actor inboxes
broadcast (single producer, multi consumer):
1 sender → N receivers
Each message received by ALL receivers
Use for: events, notifications, pub-sub
watch (single value, multi reader):
1 writer → N readers
Readers always see the LATEST value (not a queue)
Use for: config changes, "current state" sharing
When to use
- Event-driven systems — user actions, system events, state changes
- Real-time updates — live dashboards, notification feeds
- Microservice communication — services emit events, others react
- Logging/monitoring — multiple consumers of the same event stream
- UI frameworks — component A changes → components B, C, D update
When NOT to use
- Reliable delivery — broadcast drops messages if a subscriber is slow (use mpsc for guaranteed delivery)
- Point-to-point — if only one consumer should handle each message, use mpsc
- Large payloads — broadcast clones the message for each subscriber (expensive for big data)
Filtering events
Subscribers often only care about certain event types:
#![allow(unused)]
fn main() {
#[derive(Clone, Debug)]
enum Event {
UserLogin { user: String },
PageView { path: String },
Error { message: String },
}
// Subscriber that only cares about errors:
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if let Event::Error { message } = event {
send_slack_alert(&message).await;
}
}
});
}
Code exercise: Real-time Dashboard
Build a system where services emit metrics and a dashboard displays them live:
┌───────────┐ ┌───────────┐
│ Web Server│ │ DB Service│
│ emit: │ │ emit: │
│ req_count │ │ query_ms │
│ latency │ │ conn_count│
└─────┬─────┘ └─────┬─────┘
│ │
└────────┬────────┘
▼
┌───────────┐
│ broadcast │
│ channel │
└─────┬─────┘
│
┌─────────┼─────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌────────┐ ┌────────┐
│Dashboard│ │Logger │ │Alerter │
│(print │ │(write │ │(if │
│ every │ │ to file│ │ error │
│ 1 sec) │ │ all) │ │ rate > │
│ │ │ │ │ 10/min)│
└─────────┘ └────────┘ └────────┘
Requirements:
- Define a
Metricenum:RequestCount,Latency(ms),QueryTime(ms),ErrorCount - Two publisher tasks emit random metrics every 100ms
- Dashboard subscriber: prints a summary every second
- Logger subscriber: writes every metric to stdout
- Alerter subscriber: prints ALERT if error count exceeds threshold
Starter code:
use tokio::sync::broadcast;
use std::time::Duration;
#[derive(Clone, Debug)]
enum Metric {
Request { path: String, latency_ms: u64 },
Error { message: String },
DbQuery { query: String, duration_ms: u64 },
}
#[tokio::main]
async fn main() {
let (tx, _) = broadcast::channel::<Metric>(256);
// Publisher: web server metrics
let tx1 = tx.clone();
tokio::spawn(async move {
loop {
tx1.send(Metric::Request {
path: "/api/data".into(),
latency_ms: rand::random::<u64>() % 200,
}).ok();
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
// TODO: more publishers, dashboard subscriber, logger, alerter
}
Expected output:
[logger] Request { path: "/api/data", latency_ms: 42 }
[logger] DbQuery { query: "SELECT *", duration_ms: 12 }
[logger] Error { message: "timeout" }
[ALERT] Error rate: 3/min — threshold exceeded!
[dashboard] === 1s summary ===
Requests: 10, avg latency: 87ms
DB queries: 5, avg: 15ms
Errors: 1
Choosing a Pattern
Decision table
I need to... → Use
─────────────────────────────────────────────────────────────────
Handle many independent clients → Task-per-Connection
Manage complex state without locks → Actor Model
Process data through multiple stages → Pipeline
Do the same thing to many items in parallel → Fan-out / Fan-in
Keep services running despite crashes → Supervisor Tree
Broadcast events to multiple consumers → Event Bus / Pub-Sub
Choosing by symptom
Problem Solution
─────────────────────────────────────────────────────────────────
"I have lock contention" → Actor (eliminate shared state)
"My pipeline stage is a bottleneck" → Fan-out (parallelize that stage)
"Tasks crash and the system dies" → Supervisor (auto-restart)
"I need to notify many components" → Event Bus (decouple with broadcast)
"Each client needs isolated state" → Task-per-Connection + Actor
"I process a stream of data" → Pipeline with bounded channels
Combining patterns
Real systems use multiple patterns together. Here’s a typical web application:
┌──────────────────────────────────────────────────────────────┐
│ Web Application │
│ │
│ TCP Listener (task-per-connection) │
│ └── spawn(handle_request) for each HTTP request │
│ │
│ Request Handler │
│ ├── Reads from Database Actor (actor model) │
│ ├── Writes to Cache Actor (actor model) │
│ └── Emits metrics to Event Bus (pub-sub) │
│ │
│ Background Jobs │
│ ├── Job Queue → Workers (fan-out/fan-in) │
│ └── Supervised by a restart manager (supervisor tree) │
│ │
│ Log Pipeline (pipeline) │
│ └── access log → parse → filter → ship to logging service │
│ │
│ Metrics Dashboard (event bus subscriber) │
│ └── Receives metrics, displays live graphs │
└──────────────────────────────────────────────────────────────┘
Pattern comparison
Pattern Concurrency State Communication Failure
─────────────────────────────────────────────────────────────────────────
Task-per-Connection per client per task shared/channels dies alone
Actor per entity per actor messages isolated
Pipeline per stage per stage channels stage stops
Fan-out/Fan-in per item none shared JoinSet retry item
Supervisor per worker per worker restart signal auto-restart
Event Bus per subscriber none shared broadcast drops msgs
Anti-patterns
Don’t spawn without joining
#![allow(unused)]
fn main() {
// BAD: fire and forget — leaked task, no error handling
tokio::spawn(async { do_work().await });
// GOOD: track the handle
let handle = tokio::spawn(async { do_work().await });
handle.await??; // propagate errors
}
Don’t use actors for everything
If you have a simple counter, Arc<AtomicU64> is better than an actor. Actors add overhead (channel, task, serialization). Use them when state is complex or when you’d hold a Mutex across .await.
Don’t use unbounded channels in production
#![allow(unused)]
fn main() {
// BAD: unbounded — OOM if consumer is slow
let (tx, rx) = mpsc::unbounded_channel();
// GOOD: bounded — backpressure if consumer is slow
let (tx, rx) = mpsc::channel(100);
}
Don’t block the executor
#![allow(unused)]
fn main() {
// BAD: blocks the worker thread
tokio::spawn(async {
std::thread::sleep(Duration::from_secs(5)); // BLOCKS!
});
// GOOD: use async sleep or spawn_blocking
tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(5)).await; // yields
});
// GOOD: for CPU-heavy or blocking I/O
tokio::task::spawn_blocking(|| {
std::fs::read("big-file.dat") // OK to block here
});
}
Capstone: Distributed Task Queue
Prerequisites: All patterns from Course 5. This project combines every pattern into one system.
What you’re building
A job processing system — like a mini Celery, Sidekiq, or Bull. Clients submit jobs via HTTP, workers process them concurrently, a supervisor keeps workers alive, and a dashboard shows live status.
# Start the system:
cargo run -p async-lessons --bin p5-task-queue
# Submit jobs:
curl -X POST http://127.0.0.1:8080/jobs -d '{"type":"resize","file":"img.png"}'
curl -X POST http://127.0.0.1:8080/jobs -d '{"type":"email","to":"bob@example.com"}'
# Check status:
curl http://127.0.0.1:8080/status
# {"pending": 5, "running": 3, "completed": 42, "failed": 1, "workers": 4}
# Dashboard (in terminal):
# Workers: 4/4 alive
# Queue: 5 pending, 3 running, 42 completed, 1 failed
# Throughput: 14 jobs/sec
# Last failure: "email to bob@example.com — timeout" (2s ago, auto-retried)
Architecture
Every pattern has a role:
┌──────────────────────────────────────────────────────────────┐
│ Distributed Task Queue │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ API Server (task-per-connection) │ │
│ │ Accept HTTP requests → enqueue jobs │ │
│ │ GET /status → query state actor │ │
│ └─────────────────┬─────────────────────────────┘ │
│ │ submit job │
│ ▼ │
│ ┌───────────────────────────────────────────────┐ │
│ │ State Manager (actor model) │ │
│ │ Owns: job queue, status map, counters │ │
│ │ No locks — processes messages sequentially │ │
│ │ Messages: Enqueue, Dequeue, UpdateStatus, │ │
│ │ GetStats │ │
│ └─────────────────┬─────────────────────────────┘ │
│ │ job ready │
│ ▼ │
│ ┌───────────────────────────────────────────────┐ │
│ │ Dispatcher (pipeline) │ │
│ │ Reads jobs from state manager │ │
│ │ Routes to appropriate worker │ │
│ │ Handles retries for failed jobs │ │
│ └─────────────────┬─────────────────────────────┘ │
│ │ distribute │
│ ┌─────┼─────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Workers (fan-out/fan-in) │ │
│ │ N worker tasks process jobs concurrently │ │
│ │ CPU-heavy jobs use spawn_blocking │ │
│ │ Report results back to state manager │ │
│ └──────────────────────────────────────────────┘ │
│ │ │ │ │
│ │ supervised by │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Supervisor (supervisor tree) │ │
│ │ Monitors workers via JoinSet │ │
│ │ Restarts crashed workers │ │
│ │ Max 5 restarts per minute │ │
│ └──────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ Event Bus (pub-sub) │ │
│ │ Events: JobEnqueued, JobStarted, │ │
│ │ JobCompleted, JobFailed, │ │
│ │ WorkerCrashed, WorkerRestarted │ │
│ │ │ │
│ │ Subscribers: │ │
│ │ Dashboard → live terminal output │ │
│ │ Logger → write events to file │ │
│ │ Alerter → Slack notification on failures │ │
│ └───────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
Pattern map
Component Pattern Why
─────────────────────────────────────────────────────────
API Server Task-per-Connection Each HTTP request = one task
State Manager Actor Owns all mutable state, no locks
Dispatcher Pipeline Route jobs through stages
Workers Fan-out/Fan-in N workers process concurrently
Supervisor Supervisor Tree Restart crashed workers
Events Event Bus Dashboard, logger, alerter subscribe
Job lifecycle
Client POSTs job
│
▼
API task sends Enqueue message to State Actor
│
▼
State Actor: job status = Pending, pushes to queue
│
▼
Dispatcher: pulls from queue, assigns to available worker
│
▼
State Actor: job status = Running
│
▼
Worker processes the job
│
┌────┴────┐
│ │
▼ ▼
Success Failure
│ │
▼ ▼
State: State: Failed
Completed Dispatcher: retry? (max 3 attempts)
│
└── if retries exhausted → Dead Letter Queue
Implementation guide
Step 1: Define the job types
#![allow(unused)]
fn main() {
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct Job {
id: String,
job_type: String,
payload: serde_json::Value,
attempts: u32,
max_retries: u32,
}
#[derive(Clone, Debug)]
enum JobStatus {
Pending,
Running { worker_id: usize },
Completed { result: String },
Failed { error: String, retries_left: u32 },
}
}
Step 2: State Manager Actor
#![allow(unused)]
fn main() {
enum StateMsg {
Enqueue { job: Job, reply: oneshot::Sender<String> },
Dequeue { reply: oneshot::Sender<Option<Job>> },
UpdateStatus { job_id: String, status: JobStatus },
GetStats { reply: oneshot::Sender<Stats> },
}
async fn state_actor(mut rx: mpsc::Receiver<StateMsg>, event_tx: broadcast::Sender<Event>) {
let mut queue: VecDeque<Job> = VecDeque::new();
let mut statuses: HashMap<String, JobStatus> = HashMap::new();
while let Some(msg) = rx.recv().await {
match msg {
StateMsg::Enqueue { job, reply } => {
let id = job.id.clone();
statuses.insert(id.clone(), JobStatus::Pending);
queue.push_back(job);
event_tx.send(Event::JobEnqueued { id: id.clone() }).ok();
reply.send(id).ok();
}
// TODO: handle other messages
_ => todo!(),
}
}
}
}
Step 3: Worker + Supervisor
#![allow(unused)]
fn main() {
async fn worker(id: usize, state: StateHandle, event_tx: broadcast::Sender<Event>) {
loop {
let job = state.dequeue().await;
match job {
Some(job) => {
event_tx.send(Event::JobStarted { id: job.id.clone(), worker: id }).ok();
match process_job(&job).await {
Ok(result) => {
state.update_status(job.id, JobStatus::Completed { result }).await;
}
Err(e) => {
state.update_status(job.id, JobStatus::Failed {
error: e.to_string(),
retries_left: job.max_retries - job.attempts,
}).await;
}
}
}
None => tokio::time::sleep(Duration::from_millis(100)).await,
}
}
}
}
Step 4: API Server
#![allow(unused)]
fn main() {
// Minimal HTTP server (raw TCP, parse HTTP manually)
async fn handle_http(stream: TcpStream, state: StateHandle) {
// Parse: POST /jobs → enqueue
// Parse: GET /status → get stats
// Return JSON responses
}
}
Step 5: Event Bus + Dashboard
#![allow(unused)]
fn main() {
tokio::spawn(async move {
let mut rx = event_tx.subscribe();
let mut stats = Stats::default();
loop {
tokio::select! {
event = rx.recv() => {
match event {
Ok(Event::JobCompleted { .. }) => stats.completed += 1,
Ok(Event::JobFailed { .. }) => stats.failed += 1,
// ...
}
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
print_dashboard(&stats);
}
}
}
});
}
Exercises
Exercise 1: Basic queue
Implement State Actor + Workers + Dispatcher. Submit 100 jobs, process them with 4 workers. Print completion count.
Exercise 2: Add supervisor
Workers randomly crash (1% chance). Supervisor restarts them. All 100 jobs should still complete.
Exercise 3: Add HTTP API
Accept jobs via POST /jobs, return status via GET /status. Test with curl.
Exercise 4: Add event bus + dashboard
Broadcast events, subscribe with a dashboard that prints live stats every second.
Exercise 5: Retry with backoff
Failed jobs are retried up to 3 times with exponential backoff (1s, 2s, 4s). After 3 failures → dead letter queue.