Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Async Rust & Tokio Internals

Build async Rust from the ground up — from raw futures and wakers to a multi-threaded runtime, then into real tokio internals and production patterns.

Courses

CourseLessonsProject
1: Async Fundamentals0-8TCP Echo Server on your executor
2: Build a Mini Tokio9-15Multi-threaded Chat Server
3: Tokio Deep Dive16-22HTTP Load Tester
4: Advanced Patterns23-28Async Job Queue

Prerequisites

  • Rust fundamentals (ownership, traits, generics)
  • TCP networking basics

Source code

git clone https://github.com/its-saeed/learn-by-building.git
cd learn-by-building
cargo run -p async-lessons --bin 1-futures -- all

Lesson 0: Why Async?

Real-life analogy: the restaurant

Imagine a restaurant with 100 tables.

Thread-per-connection model = one waiter per table:

Table 1 → Waiter 1 (stands at table, waits for customer to decide)
Table 2 → Waiter 2 (stands at table, waits for food from kitchen)
Table 3 → Waiter 3 (stands at table, waits for customer to finish)
...
Table 100 → Waiter 100

Each waiter does nothing most of the time — they’re blocked waiting. You need 100 waiters (expensive), and they’re mostly idle. If 200 guests show up, you can’t serve them.

Event-driven model = one waiter, many tables:

Waiter checks Table 1 → "still reading menu, skip"
Waiter checks Table 2 → "food is ready!" → serves it
Waiter checks Table 3 → "wants to order!" → takes order, sends to kitchen
Waiter checks Table 4 → "still eating, skip"
...

One waiter handles all tables by only doing work when a table needs something. The waiter never stands idle — they keep circling. This is event-driven I/O.

The buzzer system at a fast-food restaurant is even more accurate:

  1. You order (register interest)
  2. You sit down and do other things (non-blocking)
  3. Buzzer vibrates (event notification — kqueue/epoll)
  4. You pick up your food (handle the ready event)

The problem: one thread per connection

The traditional server model:

#![allow(unused)]
fn main() {
loop {
    let stream = listener.accept();
    std::thread::spawn(|| handle(stream));  // one thread per client
}
}

Each thread costs real resources:

┌────────────────────────────────────────────────────┐
│              Thread Memory Layout                  │
│                                                    │
│  ┌──────────────┐  Each thread gets its own stack  │
│  │ Stack: 8 MB  │  allocated by the OS.            │
│  │              │  Most of it is never used —      │
│  │  (mostly     │  the thread is just waiting      │
│  │   empty,     │  on read() or write().           │
│  │   waiting)   │                                  │
│  │              │                                  │
│  └──────────────┘                                  │
│                                                    │
│  10,000 threads × 8 MB = 80 GB virtual memory      │
│  (actual RSS is lower, but overhead is still real) │
└────────────────────────────────────────────────────┘

See it yourself

Check your system’s default thread stack size:

# macOS
ulimit -s          # prints stack size in KB (usually 8192 = 8 MB)

# Linux
ulimit -s          # usually 8192 KB
cat /proc/sys/kernel/threads-max   # max threads the kernel allows

Check how many threads a process is using:

# macOS: count threads of a process
ps -M <pid> | wc -l

# Linux: count threads of a process
ls /proc/<pid>/task | wc -l

# Or for any process by name
ps -eLf | grep <process-name> | wc -l

The C10K problem

In 1999, Dan Kegel asked: “how do you handle 10,000 concurrent connections?” With one thread per connection, you can’t — you hit OS limits on memory, thread count, and context switching overhead.

Connections     Threads     Memory (8MB stack)    Context switches/sec
──────────────────────────────────────────────────────────────────────
100             100         800 MB                 ~10,000
1,000           1,000       8 GB                   ~100,000
10,000          10,000      80 GB                  ~1,000,000 (OS melts)
100,000         ???         impossible             impossible

Modern servers need to handle 100K-1M+ connections. Threads don’t scale.

The solution: event-driven I/O

Instead of blocking a thread per connection, use one thread that watches all connections:

┌──────────────────────────────────────────────────────────────┐
│                    Event-Driven Server                       │
│                                                              │
│  ┌─────────┐                                                 │
│  │ kqueue  │ ← register: "notify me when socket 5 is ready"  │
│  │ /epoll  │ ← register: "notify me when socket 8 is ready"  │
│  │         │ ← register: "notify me when socket 12 is ready" │
│  └────┬────┘                                                 │
│       │                                                      │
│       ▼  wait() — blocks until ANY socket is ready           │
│                                                              │
│  "Socket 8 is readable!"                                     │
│       │                                                      │
│       ▼  read from socket 8, process, respond                │
│       │                                                      │
│       ▼  back to wait()                                      │
│                                                              │
│  One thread. 100,000 connections. ~10 MB memory.             │
└──────────────────────────────────────────────────────────────┘

See blocking in action

Run this in Terminal 1 — a blocking TCP server:

python3 -c "
import socket, os

print('=== Blocking Server Demo ===')
print(f'PID: {os.getpid()}')
print()

s = socket.socket()
s.bind(('127.0.0.1', 9999))
s.listen()

# BLOCKING CALL #1: accept()
# The thread is FROZEN here. It can't do anything else.
# Open another terminal and run: echo hello | nc 127.0.0.1 9999
print('[1] Calling accept()... thread is BLOCKED, waiting for a connection')
print('    → Nothing else can happen on this thread until someone connects')
print('    → In another terminal run: echo hello | nc 127.0.0.1 9999')
print()
conn, addr = s.accept()
print(f'[2] accept() returned! Someone connected from {addr}')
print()

# BLOCKING CALL #2: recv()
# The thread is FROZEN again, waiting for data.
print('[3] Calling recv()... thread is BLOCKED again, waiting for data')
data = conn.recv(1024)
print(f'[4] recv() returned! Got: {data}')
print()

print('=== Takeaway ===')
print('Between [1] and [2], the thread did NOTHING. Completely idle.')
print('Between [3] and [4], same thing. Idle.')
print('With 10,000 clients, you need 10,000 threads all sitting idle like this.')
print('That is the problem async solves.')
"

In Terminal 2:

echo hello | nc 127.0.0.1 9999

Watch the output: the server prints [1], then freezes. Nothing happens until you connect from Terminal 2. Then it prints [2], [3], and freezes again until data arrives. Each freeze = a wasted thread.

See non-blocking behavior

python3 -c "
import socket

print('=== Non-Blocking Demo ===')
print()

s = socket.socket()
s.setblocking(False)  # <-- key difference: non-blocking mode

# Non-blocking connect returns IMMEDIATELY, even though the connection
# isn't established yet. Instead of freezing the thread, it raises an error.
print('[1] Calling connect() on a NON-BLOCKING socket...')
try:
    s.connect(('example.com', 80))
except BlockingIOError as e:
    print(f'[2] connect() returned INSTANTLY with: {e}')
    print('    → The thread was NOT frozen. It got WouldBlock immediately.')
    print('    → The connection is still in progress in the background.')
    print()

# Non-blocking recv also returns immediately if no data is ready.
print('[3] Calling recv() on a NON-BLOCKING socket (no data yet)...')
try:
    s.recv(1024)
except BlockingIOError as e:
    print(f'[4] recv() returned INSTANTLY with: {e}')
    print('    → No data yet, but the thread is FREE to do other work.')
    print()

print('=== Takeaway ===')
print('Non-blocking I/O never freezes the thread.')
print('Instead of waiting, you get WouldBlock and can go handle other connections.')
print('This is what async runtimes (tokio) do under the hood:')
print('  1. Try non-blocking read → WouldBlock')
print('  2. Register with kqueue/epoll: \"tell me when this socket is ready\"')
print('  3. Go handle other tasks')
print('  4. kqueue/epoll wakes you up → try read again → data is there')
"

Blocking vs non-blocking: what happens at the OS level

When you call read() on a blocking socket:

Your code                           OS Kernel
    │                                 │
    ├── read(fd) ──────────────────►  │
    │   (your thread is FROZEN)       │  waiting for data...
    │   (can't do anything else)      │  still waiting...
    │                                 │  data arrives!
    │  ◄── returns data ──────────────┤
    │                                 │

When you call read() on a non-blocking socket:

Your code                           OS Kernel
    │                                 │
    ├── read(fd) ──────────────────►  │
    │  ◄── WouldBlock (instantly) ────┤  no data yet
    │                                 │
    │  (go do other work!)            │
    │                                 │
    ├── read(fd) ──────────────────►  │
    │  ◄── WouldBlock (instantly) ────┤  still no data
    │                                 │
    │  ... later, after kqueue says   │
    │      "fd is ready" ...          │
    │                                 │
    ├── read(fd) ──────────────────►  │
    │  ◄── returns data ──────────────┤  data was ready!

See kqueue/epoll (the OS event system)

On Linux, you can trace the actual syscalls:

strace -e epoll_wait,epoll_ctl,accept,read python3 -c "
import socket
s = socket.socket()
s.bind(('127.0.0.1', 9999))
s.listen()
print('waiting for connection...')
s.accept()
"
# You'll see: epoll_create, epoll_ctl (register fd), epoll_wait (block until event)

On macOS, SIP restricts dtruss/dtrace. Instead, use sample to see where a process is blocked:

# While the blocking server above is waiting on accept():
sudo sample <pid> 1 2>&1 | grep -i 'accept\|kevent\|select'
# You'll see it stuck in accept() or kevent() — the thread is parked in the kernel

To check if syscall tracing is available on your Mac:

csrutil status
# If "System Integrity Protection status: enabled", dtruss is blocked.
# The Python demos above show the same concepts without needing dtruss.

Where async fits

Writing event-driven code by hand is painful — you end up with callback spaghetti:

#![allow(unused)]
fn main() {
// Callback hell (event-driven without async)
socket.on_readable(|data| {
    process(data, |result| {
        socket.on_writable(|_| {
            socket.write(result, |_| {
                // deeply nested, hard to follow
            });
        });
    });
});
}

Rust’s async/.await gives you event-driven performance with sequential-looking code:

#![allow(unused)]
fn main() {
// Same logic, but readable
async fn handle(stream: TcpStream) {
    let data = stream.read().await;   // yields to runtime, doesn't block thread
    let result = process(data);
    stream.write(result).await;       // yields to runtime, doesn't block thread
}
}

The compiler transforms this into a state machine (Lesson 2). The runtime (tokio) manages the event loop (Lesson 8). You write simple code, get scalable performance.

The mental model

┌─────────────────────────────────────────────────────┐
│                What you write                       │
│                                                     │
│  async fn handle(stream: TcpStream) {               │
│      let data = stream.read().await;                │
│      stream.write(data).await;                      │
│  }                                                  │
└───────────────────┬─────────────────────────────────┘
                    │ compiler transforms
                    ▼
┌───────────────────────────────────────────────────────┐
│              What the compiler generates              │
│                                                       │
│  A state machine enum:                                │
│    State::Reading  → poll read, if not ready: Pending │
│    State::Writing  → poll write, if not ready: Pending│
│    State::Done     → return Ready(())                 │
└───────────────────┬───────────────────────────────────┘
                    │ runtime drives
                    ▼
┌─────────────────────────────────────────────────────┐
│              What the runtime does                  │
│                                                     │
│  loop {                                             │
│      events = kqueue.wait();                        │
│      for event in events {                          │
│          task = lookup(event.fd);                   │
│          task.poll(); // advance the state machine  │
│      }                                              │
│  }                                                  │
└─────────────────────────────────────────────────────┘

The cost of async

Async isn’t free. The trade-off:

                    Threads              Async
─────────────────────────────────────────────────────
Memory per task     2-8 MB (stack)       ~100 bytes (future struct)
Max connections     ~10K                 ~1M
Context switch      OS (expensive)       Userspace (cheap)
Code complexity     Simple               Pin, lifetimes, cancellation
Debugging           Good stack traces    Confusing stack traces
Ecosystem           Everything works     Need async versions of libs
CPU-bound work      Natural              Must use spawn_blocking

Use async when: many concurrent I/O operations (web servers, proxies, chat, databases)

Don’t use async when: CPU-bound work, simple scripts, low concurrency, prototyping

Exercises

Exercise 1: Thread overhead benchmark

Spawn 10,000 threads that each std::thread::sleep(Duration::from_secs(1)). Measure:

  • Wall time: std::time::Instant::now() before and after
  • Peak memory: check with ps or Activity Monitor while running

Then do the same with 10,000 tokio::spawn tasks using tokio::time::sleep. Compare both.

Useful commands while the benchmark runs:

# macOS: check memory of your process
ps -o pid,rss,vsz,comm -p <pid>
# rss = actual memory used (KB), vsz = virtual memory (KB)

# Linux
cat /proc/<pid>/status | grep -E 'VmRSS|VmSize|Threads'

Exercise 2: Max threads

Keep spawning std::thread::spawn in a loop until it fails. Print the count and the error.

# Check your system limits
ulimit -u    # max user processes
sysctl kern.num_taskthreads  # macOS max threads per process

Exercise 3: Blocking vs non-blocking syscalls

Write two programs that connect to a TCP server and read data:

  1. Using std::net::TcpStream (blocking)
  2. Using std::net::TcpStream with set_nonblocking(true)

Trace the syscalls:

# macOS
sudo dtruss -f ./target/debug/my-binary 2>&1 | grep -E 'read|recvfrom|kevent'

# Linux
strace -e read,recvfrom,epoll_wait ./target/debug/my-binary

In the blocking version, you’ll see read() that takes seconds to return. In the non-blocking version, you’ll see read() returning immediately with EAGAIN/EWOULDBLOCK.

Lesson 1: Futures by Hand

Real-life analogy: the buzzer

You’re at a burger joint. You order at the counter and they hand you a buzzer.

You:     "Can I have a burger?"
Counter: "Not ready yet. Here's a buzzer." (Poll::Pending + Waker)
You:     Go sit down, check your phone, chat with friends.
         ...
Buzzer:  *BZZZ* (waker.wake())
You:     Walk to counter. "Is my burger ready?"
Counter: "Yes, here it is!" (Poll::Ready(burger))

Without the buzzer (no waker), you’d have to keep walking to the counter every 10 seconds asking “is it ready yet?” — wasteful. Without async (blocking), you’d stand frozen at the counter unable to do anything until the burger is done.

The Future trait is the burger order. The Waker is the buzzer. The executor (runtime) is you, managing multiple buzzer orders at once.

What is a Future?

A future is a value that might not be ready yet. It’s Rust’s core async abstraction — the single most important trait in async Rust:

#![allow(unused)]
fn main() {
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),    // "Here's your result"
    Pending,     // "Not ready yet, I'll buzz you"
}
}

That’s the entire trait. One method: poll. When called:

  • Return Poll::Ready(value) → the result is available, we’re done
  • Return Poll::Pending → not ready yet, will notify via waker when ready

The three parts of poll()

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
         ─────────────────  ──────────────────────    ────────────────────
                │                     │                        │
                │                     │                        │
         Pin<&mut Self>          Context                   Poll<T>
         "I promise not        "Here's a waker            "Ready or
          to move this          (buzzer) so you            Pending?"
          future in memory"     can notify me"
  • Pin<&mut Self> — we’ll cover this in Lesson 5. For now: it prevents the future from being moved in memory. Ignore it for simple futures.
  • Context — contains the Waker. The future uses cx.waker() to notify the runtime when it should be polled again.
  • Poll<T> — the result: either done (Ready) or not yet (Pending).

What .await does

When you write:

#![allow(unused)]
fn main() {
let value = some_future.await;
}

The compiler transforms it into roughly:

#![allow(unused)]
fn main() {
loop {
    match some_future.poll(cx) {
        Poll::Ready(value) => break value,     // done! use the value
        Poll::Pending => yield_to_runtime(),   // give control back, wait for wake
    }
}
}

.await = “keep polling until ready, yielding control between attempts.”

Visualizing the poll cycle

Executor                          Future (CountdownFuture { count: 3 })
   │                                │
   ├── poll() ──────────────────►   │  count=3 → decrement → count=2
   │  ◄── Pending ──────────────────┤  wake_by_ref() → "poll me again"
   │                                │
   ├── poll() ──────────────────►   │  count=2 → decrement → count=1
   │  ◄── Pending ──────────────────┤  wake_by_ref() → "poll me again"
   │                                │
   ├── poll() ──────────────────►   │  count=1 → decrement → count=0
   │  ◄── Pending ──────────────────┤  wake_by_ref() → "poll me again"
   │                                │
   ├── poll() ──────────────────►   │  count=0 → done!
   │  ◄── Ready(()) ────────────── ─┤  future is complete
   │                                │
   │  (never poll again)            │

The contract

Three rules that futures MUST follow:

  1. Don’t poll after Ready — once a future returns Ready, it’s done. Polling it again is undefined behavior. The result has been consumed.

  2. Pending MUST wake — if you return Pending, you MUST arrange for cx.waker().wake() to be called eventually. Otherwise the executor will never poll you again and the task hangs forever. This is the most common async bug.

  3. Poll should be cheap — do a small amount of work, then return. Don’t block the thread (no std::thread::sleep, no blocking I/O). If you block inside poll, you freeze the entire executor.

Rule 2 visualized — what happens if you forget to wake:

Executor                          Future
   │                                │
   ├── poll() ────────────────►     │
   │  ◄── Pending ──────────────────┤  forgot to call wake()!
   │                                │
   │  ... executor waits ...        │  ... future waits ...
   │  ... nobody wakes anybody ...  │  ... nobody wakes anybody ...
   │                                │
   │  💀 DEADLOCK — task hangs forever

What a Waker actually is (preview)

You’ll build one from scratch in Lesson 3. For now, the key idea:

┌──────────────────────────────────────────────────────────┐
│  Waker = a callback handle                               │
│                                                          │
│  waker.wake()      → tells the executor: "re-poll me!"   │
│  waker.wake_by_ref() → same, without consuming the waker │
│  waker.clone()     → copy it, store it for later         │
│                                                          │
│  Internally: a function pointer + data pointer           │
│  The executor provides the implementation                │
│  The future just calls .wake() — doesn't know the details│
└──────────────────────────────────────────────────────────┘

For this lesson’s exercises, we’ll use a noop waker — a waker that does nothing when called. This is enough for manual polling in a loop.

Noop waker (use this for exercises)

Since Rust 1.85+, you can use:

#![allow(unused)]
fn main() {
use std::task::Waker;
let waker = Waker::noop();
}

If your Rust version is older:

#![allow(unused)]
fn main() {
use std::task::{RawWaker, RawWakerVTable, Waker};

fn noop_waker() -> Waker {
    fn no_op(_: *const ()) {}
    fn clone(_: *const ()) -> RawWaker {
        RawWaker::new(std::ptr::null(), &VTABLE)
    }
    static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, no_op, no_op, no_op);
    unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
}
}

Exercises

Exercise 1: CountdownFuture

Implement a future that counts down from N to 0. Each poll decrements the counter and returns Pending. When it hits 0, return Ready(()).

#![allow(unused)]
fn main() {
struct CountdownFuture {
    count: u32,
}

impl Future for CountdownFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // TODO:
        // if count > 0: decrement, call cx.waker().wake_by_ref(), return Pending
        // if count == 0: return Ready(())
    }
}
}

Why wake_by_ref() when we return Pending? Because without it, the executor doesn’t know to poll us again. In a real future, you’d only wake when something actually happens (data arrives, timer fires). Here, we always want to be re-polled immediately — so we wake every time.

Exercise 2: ReadyFuture

Implement a future that immediately returns a value on first poll:

#![allow(unused)]
fn main() {
struct ReadyFuture<T>(Option<T>);
}
  • First poll: take the value out of the Option, return Ready(value)
  • The Option ensures the value is only returned once

This is what std::future::ready(42) does internally.

Exercise 3: Poll manually

Don’t use any executor. Use the noop waker to manually poll futures in a loop:

#![allow(unused)]
fn main() {
let waker = Waker::noop();
let mut cx = Context::from_waker(&waker);
let mut future = CountdownFuture { count: 5 };
let mut pinned = std::pin::pin!(future);

loop {
    match pinned.as_mut().poll(&mut cx) {
        Poll::Ready(()) => { println!("Done!"); break; }
        Poll::Pending => { println!("Not ready yet..."); }
    }
}
}

See the state change with each poll. This is literally what an executor does — just a poll loop.

Lesson 2: State Machines

Real-life analogy: the vending machine

A vending machine is a state machine:

┌─────────┐  insert coin    ┌──────────┐   press button  ┌────────────┐
│  Idle   │ ──────────────► │ HasCoin  │ ──────────────► │ Dispensing │
│         │                 │          │                 │            │
└─────────┘                 └──────────┘                 └─────┬──────┘
     ▲                                                         │
     │                    item drops out                       │
     └─────────────────────────────────────────────────────────┘

At any moment, the machine is in one state. An event causes a transition to the next state. It never skips states or goes backwards unexpectedly.

async fn works the same way. Each .await is a state transition. The compiler turns your sequential code into an enum where each variant is a state.

What async fn compiles to

When you write:

#![allow(unused)]
fn main() {
async fn fetch_data() -> String {
    let url = build_url().await;         // await #1
    let response = http_get(url).await;  // await #2
    response.body
}
}

The compiler generates something like:

#![allow(unused)]
fn main() {
enum FetchData {
    // State 0: haven't started yet
    Start,
    // State 1: waiting for build_url() to complete
    // Holds the sub-future for build_url
    WaitingForUrl {
        build_url_future: BuildUrlFuture,
    },
    // State 2: got the url, waiting for http_get() to complete
    // Holds `url` (needed later) and the sub-future for http_get
    WaitingForResponse {
        http_get_future: HttpGetFuture,
    },
    // State 3: done
    Done,
}
}

And implements Future for it:

#![allow(unused)]
fn main() {
impl Future for FetchData {
    type Output = String;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
        loop {
            match self.as_mut().get_mut() {
                FetchData::Start => {
                    // Create the sub-future for build_url
                    *self = FetchData::WaitingForUrl {
                        build_url_future: build_url(),
                    };
                }
                FetchData::WaitingForUrl { build_url_future } => {
                    match Pin::new(build_url_future).poll(cx) {
                        Poll::Pending => return Poll::Pending,  // not ready, yield
                        Poll::Ready(url) => {
                            // Transition to next state
                            *self = FetchData::WaitingForResponse {
                                http_get_future: http_get(url),
                            };
                        }
                    }
                }
                FetchData::WaitingForResponse { http_get_future } => {
                    match Pin::new(http_get_future).poll(cx) {
                        Poll::Pending => return Poll::Pending,
                        Poll::Ready(response) => {
                            *self = FetchData::Done;
                            return Poll::Ready(response.body);
                        }
                    }
                }
                FetchData::Done => panic!("polled after completion"),
            }
        }
    }
}
}

Visualizing the state transitions

poll #1:  Start → WaitingForUrl → poll build_url → Pending
                                                     ↑ return to executor

poll #2:  WaitingForUrl → poll build_url → Ready(url)
          → transition to WaitingForResponse
          → poll http_get → Pending
                              ↑ return to executor

poll #3:  WaitingForResponse → poll http_get → Ready(response)
          → transition to Done
          → return Ready(response.body)

Each poll() call resumes exactly where the last one left off. No stack needed — the enum variant holds all the state.

Why this matters

Memory: enum vs thread stack

Thread:
┌────────────────────┐
│  Stack: 8 MB       │  Fixed allocation, mostly empty.
│  (99% wasted)      │  Every thread gets this whether it
│                    │  needs it or not.
└────────────────────┘

Async state machine:
┌────────────────────┐
│  Enum: ~100 bytes  │  Size = largest variant.
│  (nothing wasted)  │  Only stores what the current
│                    │  state actually needs.
└────────────────────┘

10,000 tasks:
  Threads:    10,000 × 8 MB   = 80 GB
  Async:      10,000 × 100 B  = 1 MB

The compiler does the hard work

Writing state machines by hand is tedious and error-prone. async/.await gives you:

  • Readability of sequential code
  • Performance of hand-written state machines
  • Safety guaranteed by the compiler

A simpler example: add_slowly

Let’s desugar a simple async function step by step:

#![allow(unused)]
fn main() {
// The async version (what you write):
async fn add_slowly(a: u32, b: u32) -> u32 {
    let x = yield_once(a).await;   // yields once, returns a
    let y = yield_once(b).await;   // yields once, returns b
    x + y
}
}

Where yield_once is a future that returns Pending once, then Ready(value):

#![allow(unused)]
fn main() {
struct YieldOnce<T> {
    value: Option<T>,
    yielded: bool,
}
}

The state machine for add_slowly:

           ┌──────────────────┐
           │   State: Start   │  holds: a, b
           │                  │
           └────────┬─────────┘
                    │ create YieldOnce(a), poll it → Pending
                    ▼
           ┌──────────────────┐
           │ State: YieldingA │  holds: b, yield_future_a
           │                  │
           └────────┬─────────┘
                    │ poll yield_future_a → Ready(x)
                    │ create YieldOnce(b), poll it → Pending
                    ▼
           ┌──────────────────┐
           │ State: YieldingB │  holds: x, yield_future_b
           │                  │
           └────────┬─────────┘
                    │ poll yield_future_b → Ready(y)
                    │ compute x + y
                    ▼
           ┌──────────────────┐
           │   State: Done    │  return Ready(x + y)
           └──────────────────┘

Notice: each state only holds what’s needed going forward. State YieldingB holds x (needed for the final addition) but NOT a (already consumed).

See what the compiler generates

# Install cargo-expand
cargo install cargo-expand

# Write a simple async fn and expand it
cargo expand --bin 2-state-machines 2>/dev/null | head -100

The output is verbose but you’ll see an enum with variants matching the await points.

Exercises

Exercise 1: YieldOnce future

Implement YieldOnce<T> — a future that returns Pending on the first poll (and wakes), then Ready(value) on the second poll. This simulates one async operation completing.

Exercise 2: Manual AddSlowly state machine

Implement AddSlowly as an enum with the states shown above. Implement Future for it by hand — match on the current state, poll sub-futures, transition states.

Run it with poll_to_completion from Lesson 1 and verify it returns the correct sum.

Exercise 3: Async version comparison

Write the same logic as async fn add_slowly using actual async/.await. Run both (your hand-written state machine and the async version). Verify they produce the same result.

Exercise 4: Future sizes

Print the size of various futures:

#![allow(unused)]
fn main() {
async fn no_awaits() -> u32 { 42 }
async fn one_await() -> u32 { yield_once(42).await }
async fn holds_big_data() -> u32 {
    let buf = [0u8; 1024];
    yield_once(0).await;
    buf[0] as u32
}
}

Use std::mem::size_of_val(&future). Compare the sizes — the future that holds [u8; 1024] across an await will be ~1KB larger.

Lesson 3: Wakers & Waking

Real-life analogy: notification systems

Think about all the ways you get notified in daily life:

Scenario                    Blocking (bad)              Waker (good)
─────────────────────────────────────────────────────────────────────
Pizza delivery              Stand at the door            Doorbell rings
                            staring at the street

Laundry                     Sit in front of the          Washer beeps
                            machine watching it spin     when done

Doctor's office             Stand at the counter         They call
                            asking "is it my turn?"      your name

Package delivery            Refresh tracking page        Push notification
                            every 5 seconds              "delivered!"

In all cases, the waker pattern lets you do other things while waiting. The notification system (doorbell, beep, name call, push notification) is the Waker.

In async Rust:

  • Your future is you (waiting for pizza)
  • The executor is your brain (managing all your tasks)
  • The waker is the doorbell
  • wake() = doorbell rings → your brain says “go check the door”

The problem

When a future returns Pending, the executor needs to know when to poll it again.

Option A: Busy-polling (wasteful)

  Executor: "Are you ready?"  →  Future: "No"
  Executor: "Are you ready?"  →  Future: "No"
  Executor: "Are you ready?"  →  Future: "No"
  Executor: "Are you ready?"  →  Future: "No"
  Executor: "Are you ready?"  →  Future: "YES!"

  (CPU at 100% doing nothing useful)
Option B: Wakers (efficient)

  Executor: "Here's my number (waker). Call me when ready."
  Future: (stores the waker, waits for I/O)
  ... executor goes to sleep or handles other tasks ...
  Future: (I/O ready!) waker.wake() → "Hey executor, poll me!"
  Executor: "Oh! Let me check." → Future: "Ready!"

  (CPU at 0% while waiting)

How Waker works internally

A Waker is built from a RawWaker — basically two pointers:

┌─────────────────────────────────────────────────────────┐
│  RawWaker                                               │
│                                                         │
│  ┌──────────────────┐   ┌───────────────────────────┐   │
│  │ data: *const ()  │   │ vtable: &RawWakerVTable   │   │
│  │                  │   │                           │   │
│  │ Points to the    │   │ Function pointers:        │   │
│  │ task/executor    │   │   clone()  → copy waker   │   │
│  │ state. Could be  │   │   wake()   → notify exec  │   │
│  │ an Arc<Task>,    │   │   wake_by_ref() → same    │   │
│  │ a task ID, etc.  │   │   drop()   → cleanup      │   │
│  └──────────────────┘   └───────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

The vtable is like a trait object — function pointers that define behavior. Different executors provide different vtable implementations:

Noop executor:     wake() = do nothing
Thread executor:   wake() = thread::unpark(thread_handle)
Tokio:             wake() = push task to scheduler queue

The future doesn’t know or care how the waker works internally. It just calls waker.wake() and trusts that the executor will re-poll.

Building a Waker from scratch

Step 1: Define the vtable functions

#![allow(unused)]
fn main() {
// Each function receives the `data` pointer from RawWaker
unsafe fn clone(data: *const ()) -> RawWaker {
    // Create a copy of the waker (increment refcount, clone Arc, etc.)
}
unsafe fn wake(data: *const ()) {
    // Notify the executor: "re-poll the task associated with this data"
}
unsafe fn wake_by_ref(data: *const ()) {
    // Same as wake, but doesn't consume the waker
}
unsafe fn drop(data: *const ()) {
    // Clean up (decrement refcount, free memory, etc.)
}
}

Step 2: Create the vtable

#![allow(unused)]
fn main() {
static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
}

Step 3: Create the waker

#![allow(unused)]
fn main() {
let raw_waker = RawWaker::new(data_ptr, &VTABLE);
let waker = unsafe { Waker::from_raw(raw_waker) };
}

The three wakers you’ll build

1. Noop waker

All vtable functions do nothing. Useful for testing — you manually poll in a loop.

wake() → (does nothing)
Used in: Lesson 1 exercises, simple tests

2. Counting waker

wake() increments an atomic counter. Lets you verify how many times a future called wake.

wake() → counter.fetch_add(1)
Used in: testing that futures wake correctly

3. Thread-parking waker

wake() calls thread::unpark() on a specific thread. The executor thread parks itself after Pending, then the waker unparks it.

Executor thread          Future                    Waker
     │                     │                         │
     ├── poll() ────────►  │                         │
     │  ◄── Pending ───────┤  stores waker           │
     │                     │                         │
     ├── thread::park()    │                         │
     │   (sleeping)        │                         │
     │                     │  (I/O ready)            │
     │                     ├── waker.wake() ────────►│
     │                     │                         ├── thread::unpark()
     │  ◄── (wakes up!) ──────────────────────────── │
     │                     │                         │
     ├── poll() ────────►  │                         │
     │  ◄── Ready(val) ────┤                         │

This is how real single-threaded executors work. You’ll use this pattern in Lesson 4.

Exercises

Exercise 1: Noop waker

Build a noop waker from RawWaker + vtable where all functions do nothing. Use it to manually poll the CountdownFuture from Lesson 1. Print each poll result.

Exercise 2: Counting waker

Build a waker that stores an Arc<AtomicU32> as the data pointer. Each wake() call increments the counter. Poll a CountdownFuture(5) and verify the waker was called 5 times.

Hints:

  • Arc::into_raw(arc) converts Arc<T> to *const T (which you can cast to *const ())
  • Arc::from_raw(ptr) converts back (for clone and drop)
  • Be careful with reference counting — clone should increment, drop should decrement

Exercise 3: Thread-parking waker

Build a waker that calls thread.unpark() when wake() is called.

  1. Get the current thread handle: std::thread::current()
  2. Store it in the waker’s data pointer
  3. wake() calls thread_handle.unpark()
  4. In your poll loop: after Pending, call std::thread::park()
  5. The waker’s wake() will unpark you, and you’ll poll again

This is a real executor pattern. Test it with CountdownFuture — but note: the future calls wake_by_ref() synchronously during poll(), so park() will immediately return (the thread was already unparked before it parks). This is fine — park returns immediately if there’s a pending unpark.

Exercise 4: Waker contract verification

Create a future that intentionally doesn’t call wake() when returning Pending. Use your thread-parking waker. Show that the executor hangs — thread::park() blocks forever because nobody calls unpark(). This demonstrates why Rule 2 (“Pending must wake”) exists.

Interrupt with Ctrl+C after a few seconds.

Lesson 4: Tasks

Real-life analogy: the post office

A letter (future) is just content — it can’t deliver itself. To get it somewhere, you put it in an envelope (task) with:

  • The letter inside (the future)
  • A return address (the waker — how to notify when done)
  • A tracking number (so the system can find it)
  • A destination queue (which mailbag it goes in)

The postal worker (executor) doesn’t handle raw letters. They handle envelopes — because envelopes have all the metadata needed for delivery.

Letter (Future):                    Envelope (Task):
┌──────────────┐                    ┌───────────────────────────┐
│ Dear Bob,    │                    │ To: task queue            │
│ ...content...│                    │ Return addr: waker        │
│              │                    │ Tracking: Arc<Task>       │
└──────────────┘                    │ ┌──────────────┐         │
                                    │ │ Dear Bob,    │         │
 Can't deliver itself.              │ │ ...content...│         │
                                    │ └──────────────┘         │
                                    └───────────────────────────┘
                                     The system can route this.

Future vs Task

A future is a struct implementing Future. It’s passive — it just sits there until someone calls poll().

A task is a future wrapped with executor metadata: a waker, a queue reference, and shared ownership via Arc. It’s the unit of work the executor manages.

spawn(future) → Task → pushed to queue → executor polls it

You write futures. The executor manages tasks.

What a Task looks like in Rust

#![allow(unused)]
fn main() {
struct Task {
    /// The future, pinned and boxed.
    /// - Box: because different tasks hold different future types (type erasure)
    /// - Pin: because futures may be self-referential (Lesson 6)
    /// - Mutex: because the executor thread and waker may access concurrently
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,

    /// Reference to the executor's task queue.
    /// The waker uses this to push the task back when wake() is called.
    queue: Arc<Mutex<VecDeque<Arc<Task>>>>,
}
}

Why each part:

  • dyn Future<Output = ()> — type erasure. The executor holds many different future types in one queue.
  • Box — puts the future on the heap (required for dyn).
  • Pin — prevents moving the future after first poll (required by the Future trait).
  • Mutex — interior mutability. We need &mut access to poll, but the task is shared via Arc.
  • Send — the task might move between threads (multi-threaded executor).
  • Arc<Task> — shared ownership. Both the executor queue and the waker hold references.

The lifecycle of a task

                        spawn(my_future)
                              │
                              ▼
               ┌──────────────────────────┐
               │ Task created             │
               │  future = Box::pin(f)    │
               │  queue = executor.queue  │
               └────────────┬─────────────┘
                            │
                            ▼
               ┌──────────────────────────┐
               │ Pushed to executor queue │
               │  queue: [... , task]     │
               └────────────┬─────────────┘
                            │
                            ▼
               ┌──────────────────────────┐
               │ Executor pops task       │◄──────────────────┐
               │ Builds waker for it      │                   │
               │ Calls task.future.poll() │                   │
               └────────────┬─────────────┘                   │
                            │                                 │
                   ┌────────┴────────┐                        │
                   │                 │                        │
                   ▼                 ▼                        │
            Poll::Ready        Poll::Pending                  │
               │                    │                         │
               ▼                    ▼                         │
           Task done.        Task NOT in queue.               │
           Drop it.          Waiting for event.               │
                                    │                         │
                                    ▼                         │
                             Event fires                      │
                             waker.wake()  ───────────────────┘
                             pushes Arc<Task> back to queue

The waker-task connection

This is the critical piece. Each task gets a waker whose wake() pushes the task back into the queue:

#![allow(unused)]
fn main() {
fn create_waker_for_task(task: Arc<Task>) -> Waker {
    // The waker's data pointer is the Arc<Task>
    // wake() does: task.queue.lock().push_back(task.clone())
}
}

When a future inside a task calls cx.waker().wake_by_ref():

  1. The waker grabs its Arc<Task>
  2. Pushes the Arc<Task> into the executor’s queue
  3. The executor wakes up (if parked) and polls the task again
Future calls:          cx.waker().wake_by_ref()
                              │
Waker does:            queue.lock().push_back(arc_task.clone())
                              │
Executor sees:         queue is non-empty → pop task → poll it

The 'static requirement

When you call tokio::spawn(future), the future must be 'static. Why?

#![allow(unused)]
fn main() {
fn bad_example() {
    let data = vec![1, 2, 3];

    tokio::spawn(async {
        println!("{:?}", data);  // ERROR: `data` doesn't live long enough
    });

    // `data` is dropped here, but the task might still be running!
}
}

The task lives independently — it might outlive the function that spawned it. So it can’t borrow local variables. It must own everything it needs.

Fix: move ownership into the task:

#![allow(unused)]
fn main() {
fn good_example() {
    let data = vec![1, 2, 3];

    tokio::spawn(async move {  // `move` transfers ownership
        println!("{:?}", data);  // task owns `data`
    });
    // `data` has been moved, can't use it here
}
}

The Send requirement

For multi-threaded executors, tasks must be Send — they might be polled on different threads.

#![allow(unused)]
fn main() {
// This WON'T compile with tokio::spawn:
let rc = Rc::new(42);  // Rc is !Send
tokio::spawn(async move {
    println!("{}", rc);  // ERROR: Rc cannot be sent between threads
});

// Fix: use Arc instead of Rc
let arc = Arc::new(42);  // Arc is Send
tokio::spawn(async move {
    println!("{}", arc);  // OK
});
}

A future is Send if all values it holds across .await points are Send. If you hold a MutexGuard (which is !Send) across an .await, the future becomes !Send and can’t be spawned.

.await is NOT a new task

A common confusion:

#![allow(unused)]
fn main() {
tokio::spawn(async {        // ← ONE task
    let a = foo().await;    // ← state transition within the task
    let b = bar().await;    // ← another state transition, same task
    a + b                   // all inside one task
});

tokio::spawn(async {        // ← SECOND task (independent)
    baz().await;
});
}

spawn() creates a task. .await is a yield point within a task. Two awaits in one async block = one task with two state transitions. Two spawn() calls = two tasks that run concurrently.

JoinHandle: getting a result from a task

spawn() returns a JoinHandle — a future that resolves when the task completes:

#![allow(unused)]
fn main() {
let handle = tokio::spawn(async { 42 });
let result = handle.await.unwrap();  // 42
}

Internally, JoinHandle is:

#![allow(unused)]
fn main() {
struct JoinHandle<T> {
    result: Arc<Mutex<Option<T>>>,  // shared with the task
    waker: Arc<Mutex<Option<Waker>>>,  // notified when task completes
}
}

When the task finishes, it stores the result and wakes the JoinHandle’s waker. When you .await the handle, it checks if the result is ready.

Exercises

Exercise 1: Build a Task struct

Define a Task struct with:

  • A pinned, boxed, type-erased future
  • A reference to a shared task queue

Create a task from a CountdownFuture. Don’t poll it yet — just verify you can construct it.

Exercise 2: Create a waker for a task

Build a Waker whose wake() pushes Arc<Task> back into the queue.

  1. Store Arc<Task> as the waker’s data pointer (Arc::into_raw)
  2. wake() recovers the Arc, locks the queue, pushes the task
  3. Poll a task using this waker. Verify the task re-appears in the queue after returning Pending.

Exercise 3: Task lifecycle

Implement the full lifecycle:

  1. Create a task queue (Arc<Mutex<VecDeque<Arc<Task>>>>)
  2. Spawn a CountdownFuture(3) into it
  3. Loop: pop task, create waker, poll, check queue
  4. Print the queue state after each poll
  5. Verify: task appears in queue after Pending, disappears after Ready

Exercise 4: JoinHandle

Implement a simple JoinHandle<T>:

  1. spawn() returns a JoinHandle alongside the task
  2. Both share an Arc<Mutex<Option<T>>>
  3. When the task’s future completes, store the result
  4. JoinHandle implements Future — polls check if result is available
  5. Test: spawn a future that returns 42, await the handle, assert 42

Exercise 5: ’static and Send verification

Write futures that fail to compile and understand why:

  1. A future that borrows a local variable → fails 'static
  2. A future that holds Rc<T> across an await → fails Send
  3. Fix both — use move and Arc respectively

Lesson 5: A Minimal Executor

Prerequisites: Lesson 4 (Tasks) — you should understand what a Task is before building the executor that drives them.

What is a Task? (quick recap)

We’ve been using the word “task” loosely. Let’s define it precisely.

A future is a state machine that can be polled. It’s just a struct that implements Future. It doesn’t know about executors, queues, or scheduling. It’s passive — it sits there until someone calls poll().

A task is a future that has been spawned onto an executor. It’s the executor’s unit of work — a future wrapped with everything the executor needs to manage it.

Future alone:                    Task (future + executor metadata):
┌──────────────────┐             ┌──────────────────────────────┐
│  impl Future     │             │  Task                        │
│                  │             │                              │
│  poll() → Ready  │             │  future: Pin<Box<dyn Future>>│
│        → Pending │             │  waker: Waker                │
│                  │             │  state: Running | Completed  │
└──────────────────┘             │  queue: Arc<Mutex<VecDeque>> │
                                 │                              │
 Just a struct.                  └──────────────────────────────┘
 Can't run itself.
                                  Knows how to re-schedule itself.
                                  The executor polls this.

Analogy

  • Future = a recipe (instructions for making a dish)
  • Task = a kitchen ticket (recipe + order number + “notify table 5 when done” + position in the queue)

The chef (executor) works with tickets, not raw recipes. The ticket tracks everything needed to manage the order.

At the Rust code level

Here’s what a Task looks like in a real executor:

#![allow(unused)]
fn main() {
struct Task {
    /// The future this task is driving to completion.
    /// Pinned because futures may be self-referential (Lesson 5).
    /// Boxed because different tasks hold different future types.
    /// Mutex because the executor and waker may access it from different contexts.
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,

    /// A reference to the executor's task queue.
    /// When waker.wake() is called, the task pushes itself back into this queue.
    queue: Arc<Mutex<VecDeque<Arc<Task>>>>,
}
}

And spawn creates a task from a future:

#![allow(unused)]
fn main() {
fn spawn(future: impl Future<Output = ()> + Send + 'static) -> Arc<Task> {
    let task = Arc::new(Task {
        future: Mutex::new(Box::pin(future)),
        queue: queue.clone(),
    });
    queue.lock().unwrap().push_back(task.clone());
    task
}
}

The lifecycle of a task

1. spawn(my_future)
   │
   ▼
2. Task created: wraps future + gets a queue reference
   │
   ▼
3. Task pushed to executor's queue
   │
   ▼
4. Executor pops task, builds a Waker for it, calls task.future.poll(cx)
   │
   ├── Poll::Ready → task is done, drop it
   │
   └── Poll::Pending → task is NOT in the queue
       │
       ▼
5. ... time passes, I/O event or timer fires ...
   │
   ▼
6. waker.wake() → pushes Arc<Task> back into the queue
   │
   ▼
7. Executor pops it again → back to step 4

The key insight: the waker closes over the task. When you call waker.wake(), it pushes Arc<Task> back into the queue. This is the connection between the future world (poll/wake) and the executor world (queue/schedule).

Task vs Future: when people say “task”

In async Rust conversations:

  • “Spawn a task” = call tokio::spawn(future) — wraps the future in a task and schedules it
  • “The task is blocked” = the task returned Pending and is waiting for a wake
  • “Task-local storage” = per-task data (like thread-local, but per task)
  • “Task dump” = list all tasks and what state they’re in (debugging)

Every tokio::spawn() creates one task. Every .await inside that task is a state transition within the same task — NOT a new task.

#![allow(unused)]
fn main() {
tokio::spawn(async {        // ← this is ONE task
    let a = foo().await;    // ← state transition within the task
    let b = bar().await;    // ← another state transition, same task
    a + b
});

tokio::spawn(async {        // ← this is a SECOND task
    baz().await;
});
}

Real-life analogy: the project manager

A project manager doesn’t do the work — they coordinate. They have a list of tasks and a team:

Project Manager (executor):
  ┌─────────────────────────────────────┐
  │ Task list:                          │
  │   [ ] Design mockups (waiting)      │
  │   [ ] Write API (waiting)           │
  │   [→] Review PR (in progress)       │
  │   [✓] Deploy staging (done)         │
  └─────────────────────────────────────┘

Loop:
  1. Pick the next task that needs attention
  2. Ask: "are you done yet?" (poll)
  3. If done → mark complete, move on
  4. If not → task says "I'll ping you when ready" (waker)
  5. If nothing needs attention → take a nap (park)
  6. Get pinged (waker.wake()) → wake up, go to step 1

That’s an executor. It’s a loop that polls futures and sleeps when there’s nothing to do.

Two levels of executor

Level 1: block_on (runs one future)

The simplest possible executor. Runs a single future on the current thread:

block_on(future):
  ┌──────────────────────────────────────┐
  │  loop {                              │
  │      poll(future)                    │
  │      if Ready → return result        │
  │      if Pending → park thread        │
  │      ... waker fires → unpark ...    │
  │  }                                   │
  └──────────────────────────────────────┘

This is what tokio::runtime::Runtime::block_on() does at its core.

Level 2: Multi-task executor (runs many futures)

Adds a task queue. spawn() adds futures to the queue. The executor polls them round-robin:

Executor:
  ┌──────────────────────────────────────────────┐
  │                                              │
  │  Task Queue: [task_1, task_2, task_3]        │
  │                                              │
  │  loop {                                      │
  │      task = queue.pop()                      │
  │      poll(task)                              │
  │      if Ready → done, don't re-queue         │
  │      if Pending → waker will re-queue it     │
  │      if queue empty → park thread            │
  │  }                                           │
  └──────────────────────────────────────────────┘

The key insight: the Waker for each task pushes it back into the queue when wake() is called. The executor only polls tasks that are ready to make progress.

Task 1 returns Pending
  → future stores waker
  → task is NOT in the queue (nothing to do)
  → ... time passes ...
  → I/O event fires → waker.wake()
  → task is pushed back into queue
  → executor pops it, polls it → Ready!

The waker-queue connection

This is the part that makes executors work:

spawn(future):
  1. Wrap future in a Task (Arc<Task>)
  2. Create a Waker whose wake() pushes Arc<Task> to the queue
  3. Push task to queue

poll(task):
  1. Pop task from queue
  2. Build Context with the task's waker
  3. Call future.poll(cx)
  4. If Pending → nothing (waker will re-queue when ready)
  5. If Ready → done
┌─────────────┐      ┌──────────────────┐
│  Executor   │      │  Task            │
│             │      │                  │
│  queue: ────┤      │  future: ...     │
│  [t1,t2,t3] │      │  waker: ────────┐│
│             │      │                 ││
└─────────────┘      └─────────────────┘│
       ▲                                │
       │            wake() pushes       │
       └────────── task back to queue ──┘

The DelayFuture: a real timer

Now that you have an executor with a real waker (not noop), you can build a future that actually waits for real time:

#![allow(unused)]
fn main() {
struct DelayFuture {
    message: String,
    deadline: Instant,
    waker_set: bool,
}
}

How it works:

  1. First poll: spawn a background thread that sleeps until the deadline, then calls waker.wake()
  2. Return Pending
  3. Background thread wakes up → calls waker.wake() → executor re-polls
  4. Second poll: deadline has passed → return Ready(message)
Executor                    DelayFuture               Background Thread
   │                           │                           │
   ├── poll() ───────────►     │                           │
   │                           │── spawn thread ──────────►│
   │                           │   (sleeps 2 seconds)      │
   │  ◄── Pending ─────────────┤                           │
   │                           │                           │
   ├── park() (sleeping)       │                    (sleeping)
   │                           │                           │
   │                           │              ... 2 sec ...│
   │                           │                           │
   │                           │  ◄── waker.wake() ────────┤
   │  ◄── unpark! ─────────────────────────────────────────┤
   │                           │                           │
   ├── poll() ───────────►     │                           │
   │  ◄── Ready("done!") ──────┤                           │

This is how tokio::time::sleep works — except tokio uses a timer wheel instead of spawning a thread per timer.

Exercises

Exercise 1: block_on

Implement block_on<F: Future>(future: F) -> F::Output:

  1. Build a thread-parking waker (from Lesson 3)
  2. Loop: poll the future
  3. If Ready → return the value
  4. If Pendingthread::park() (waker will unpark)

Test with CountdownFuture.

Exercise 2: Multi-task executor

Implement an Executor with:

  • spawn(future) — wraps the future in an Arc<Task>, adds to queue
  • run() — pops tasks, polls them, sleeps when empty

The tricky part: building a waker whose wake() pushes the Arc<Task> back into a shared queue (Arc<Mutex<VecDeque<Arc<Task>>>>).

Test: spawn 3 CountdownFutures with different counts. Print when each completes. They should interleave.

Exercise 3: DelayFuture (real timer)

Implement DelayFuture:

  1. First poll: clone the waker, spawn a thread that sleeps then calls waker.wake()
  2. Return Pending
  3. Second poll: check if deadline passed → Ready(message)

Test with block_on:

#![allow(unused)]
fn main() {
block_on(DelayFuture::new(Duration::from_secs(2), "hello from the future!"))
}

This should print after exactly 2 seconds — proving that the executor slept efficiently (not busy-polling).

Exercise 4: Spawn DelayFutures concurrently

Using your multi-task executor, spawn three delays:

#![allow(unused)]
fn main() {
executor.spawn(DelayFuture::new(Duration::from_secs(3), "slow"));
executor.spawn(DelayFuture::new(Duration::from_secs(1), "fast"));
executor.spawn(DelayFuture::new(Duration::from_secs(2), "medium"));
}

They should complete in order: fast (1s), medium (2s), slow (3s) — all finishing within ~3 seconds total (concurrent), not 6 seconds (sequential).

Exercise 5: JoinHandle

Make spawn() return a JoinHandle<T> — a future that resolves to the spawned task’s output.

#![allow(unused)]
fn main() {
let handle = executor.spawn(async { 42 });
let result = block_on(handle);
assert_eq!(result, 42);
}

Implement with Arc<Mutex<Option<T>>> shared between the task and the handle, plus a waker for the handle to be notified when the task completes.

Lesson 6: Pinning

Prerequisites: Lesson 5 (Executor) – you should understand how an executor polls futures before learning why those futures must be pinned.

Real-life analogy: the house with an address

Imagine you buy a house at 123 Elm Street. Your friends, your bank, the post office – everyone has your address written down. Now imagine a magical crane lifts your entire house and drops it on a different lot. Your friends show up at 123 Elm Street and find an empty plot. The mail goes nowhere. Every reference to your location is now wrong.

Pin = a city ordinance that says “this house will not move.” Once pinned, everyone who has your address can trust it forever.

Before Pin:                          After Pin:
┌──────────────────────┐             ┌──────────────────────┐
│  House @ 123 Elm St  │             │  House @ 123 Elm St  │
│  (can be moved)      │             │  PINNED -- no moving │
│                      │  crane      │                      │
│  Friends: "123 Elm"  │──────X      │  Friends: "123 Elm"  │ -- always valid
│  Bank:    "123 Elm"  │             │  Bank:    "123 Elm"  │
└──────────────────────┘             └──────────────────────┘

In Rust terms:

  • House = a value in memory
  • Address = a pointer/reference to that value
  • Moving the house = mem::swap, mem::replace, or just assignment to a different variable
  • Pin = a wrapper that prevents you from getting &mut T (which would let you move the value)

The problem: self-referential structs

A self-referential struct has a field that points to another field inside the same struct. This is the core problem Pin solves.

#![allow(unused)]
fn main() {
struct SelfRef {
    data: String,
    ptr: *const String,  // points to `data` above
}
}

After initialization, ptr points to data’s memory location:

                    SelfRef (at address 0x1000)
                    ┌─────────────────────────────────┐
                    │  data: String { "hello" }       │  <-- lives at 0x1000
                    │  ptr:  0x1000 ──────────────┐   │
                    │                             │   │
                    │                             ▼   │
                    │         (points to data) ───┘   │
                    └─────────────────────────────────┘
                    ptr == &data  -- correct!

Now move the struct (e.g., by returning it from a function, pushing to a Vec, or mem::swap):

    BEFORE MOVE                              AFTER MOVE

    0x1000 (old location)                    0x2000 (new location)
    ┌───────────────────────┐                ┌───────────────────────┐
    │  data: "hello"        │                │  data: "hello"        │  <-- now at 0x2000
    │  ptr:  0x1000 ────┐   │   ──move──►    │  ptr:  0x1000 ────┐   │
    │                   │   │                │                   │   │
    │                   ▼   │                │                   │   │
    │       (self)  ────┘   │                │       DANGLING!   │   │
    └───────────────────────┘                └───────────────────┘   │
                                                                    │
                                              0x1000 (old location) │
                                              ┌─────────────────┐   │
                                              │  (freed/garbage) │◄──┘
                                              └─────────────────┘
                                              ptr still points here!

The data moved to 0x2000, but ptr still says 0x1000. Dangling pointer. Undefined behavior.

Concrete code example

#![allow(unused)]
fn main() {
use std::ptr;

struct SelfRef {
    data: String,
    ptr: *const String,
}

impl SelfRef {
    fn new(text: &str) -> Self {
        let mut s = SelfRef {
            data: text.to_string(),
            ptr: ptr::null(),
        };
        s.ptr = &s.data as *const String;
        s
    }

    fn data_addr(&self) -> *const String {
        &self.data as *const String
    }

    fn ptr_value(&self) -> *const String {
        self.ptr
    }
}

let mut a = SelfRef::new("hello");
a.ptr = &a.data;                       // fix pointer after construction
println!("a.data addr: {:p}", &a.data); // e.g., 0x1000
println!("a.ptr value: {:p}", a.ptr);   // 0x1000 -- matches!

let mut b = a;                          // MOVE a into b
println!("b.data addr: {:p}", &b.data); // e.g., 0x2000 (new location)
println!("b.ptr value: {:p}", b.ptr);   // still 0x1000 -- DANGLING!
}

What Pin<&mut T> does

Pin<&mut T> wraps a mutable reference and removes your ability to get &mut T back (for non-Unpin types). Without &mut T, you cannot:

  • mem::swap the value with another
  • mem::replace the value
  • Move it by assignment
    Normal &mut T:                   Pin<&mut T>:
    ┌──────────────────┐             ┌──────────────────┐
    │  &mut T           │             │  Pin<&mut T>      │
    │                  │             │                  │
    │  Can do:         │             │  Can do:         │
    │  - read          │             │  - read          │
    │  - write fields  │             │  - write fields  │
    │  - mem::swap !!  │             │                  │
    │  - move out !!   │             │  CANNOT:         │
    │                  │             │  - get &mut T    │
    └──────────────────┘             │  - mem::swap     │
                                     │  - move out      │
                                     └──────────────────┘

The signature of Future::poll requires Pin<&mut Self>:

#![allow(unused)]
fn main() {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

This is not an accident. The executor must guarantee that once it starts polling a future, it will never move that future again.

The Unpin trait

Unpin is an auto-trait that says “this type is safe to move even when pinned.” It is a promise that the type has no self-references.

    Most types:              async fn futures:
    ┌─────────────────┐     ┌─────────────────────────┐
    │  i32             │     │  async fn foo() {       │
    │  String          │     │      let x = 1;         │
    │  Vec<T>          │     │      let r = &x;        │
    │  HashMap<K,V>    │     │      bar().await;       │
    │  ...             │     │      use(r);            │
    │                  │     │  }                      │
    │  All Unpin       │     │                         │
    │  Pin::new works  │     │  !Unpin (not Unpin)     │
    └─────────────────┘     │  needs Box::pin or pin! │
                             └─────────────────────────┘
  • Unpin types: Pin<&mut T> freely gives you &mut T via Pin::get_mut(). Pin has no effect.
  • !Unpin types: Pin<&mut T> does NOT give you &mut T. The value is truly pinned.

You can opt out of Unpin manually with PhantomPinned:

#![allow(unused)]
fn main() {
use std::marker::PhantomPinned;

struct MyStruct {
    data: String,
    _pin: PhantomPinned,  // makes MyStruct: !Unpin
}
}

Pin in practice

Pin::new – only for Unpin types

#![allow(unused)]
fn main() {
let mut x = 42_i32;  // i32: Unpin
let pinned = Pin::new(&mut x);  // works fine
}

This does NOT work for !Unpin types – the compiler will reject it.

Box::pin – heap-pin any value

#![allow(unused)]
fn main() {
let future = async { 42 };
let pinned: Pin<Box<dyn Future<Output = i32>>> = Box::pin(future);
// future is on the heap and cannot be moved
}

This is what executors use: Pin<Box<dyn Future>>.

pin! macro – stack-pin a value

#![allow(unused)]
fn main() {
use std::pin::pin;

let future = async { 42 };
let mut pinned = pin!(future);  // pins to the stack
// pinned: Pin<&mut impl Future<Output = i32>>
}

The macro shadows the binding so you cannot access the original (un-pinned) value.

Why futures need Pin

An async fn compiles to a state machine struct. If the function holds a reference to a local variable across an .await, the struct becomes self-referential:

#![allow(unused)]
fn main() {
async fn process() {
    let buffer = vec![1, 2, 3];
    let slice = &buffer;          // reference to local
    network_send(slice).await;    // <-- .await here
    println!("{:?}", slice);      // slice must still be valid
}
}

The compiler generates something like:

#![allow(unused)]
fn main() {
enum ProcessFuture {
    // State 0: before the await
    State0 {
        buffer: Vec<u8>,
        slice: &Vec<u8>,         // <-- points to buffer above!
    },
    // State 1: after the await
    State1 {
        buffer: Vec<u8>,
        slice: &Vec<u8>,
    },
    Done,
}
}

State0 is self-referential: slice points into buffer. If you move ProcessFuture after the first poll(), the slice pointer dangles.

    ProcessFuture after first poll (State0):

    ┌───────────────────────────────────────┐
    │  buffer: Vec [1, 2, 3]               │  <-- at address 0xA000
    │  slice:  &buffer ───────────────┐     │
    │                                 │     │
    │                                 ▼     │
    │              (points to 0xA000) ─┘     │
    └───────────────────────────────────────┘

    If moved to new address 0xB000:

    ┌───────────────────────────────────────┐
    │  buffer: Vec [1, 2, 3]               │  <-- now at 0xB000
    │  slice:  &buffer ───────────────┐     │
    │                                 │     │
    │                                 X     │  slice still says 0xA000
    │              DANGLING!          │     │  but buffer is at 0xB000
    └───────────────────────────────────────┘

This is why Future::poll takes Pin<&mut Self>: the executor pins the future before the first poll and never moves it again.

The lifecycle

    1. Create future:       let f = process();           // future exists, not yet polled
    2. Pin it:              let f = Box::pin(f);         // pinned on heap, cannot move
    3. First poll:          f.as_mut().poll(cx);         // enters State0, creates self-ref
    4. Returns Pending:     slice points to buffer       // self-ref is valid
    5. ...                  (future stays at same address)
    6. Second poll:         f.as_mut().poll(cx);         // slice still valid!
    7. Returns Ready:       done

Without Pin, step 5 could be “move the future into a different Vec slot” and step 6 would be undefined behavior.

Summary

ConceptWhat it means
Self-referential structA struct with a pointer to its own field
MoveCopies bytes to new location, old location invalid
Dangling pointerPointer to old location after a move
Pin<&mut T>Wrapper that prevents getting &mut T (for !Unpin)
UnpinAuto-trait: “safe to move when pinned” (most types)
!UnpinNot safe to move when pinned (async fn futures)
PhantomPinnedOpt out of Unpin manually
Pin::newPin an Unpin type (no-op, just wraps)
Box::pinPin any type on the heap
pin!Pin any type on the stack

Exercises

Exercise 1: Demonstrate the dangling pointer

Create a SelfRef struct with a String and a *const String raw pointer. Initialize the pointer to point at the data field. Move the struct. Print the addresses before and after – show that the pointer no longer matches the data’s actual address.

Exercise 2: Pin prevents the move

Use Pin and PhantomPinned to create a !Unpin struct. Try to move it after pinning. Observe the compiler error. Then show that you can safely read through the pointer because the value hasn’t moved.

Exercise 3: Unpin types are unaffected

Show that Pin::new works on Unpin types (like String, i32). Demonstrate that you can still get &mut T from Pin<&mut T> when T: Unpin, so Pin is effectively a no-op for these types.

Exercise 4: Why async futures need Pin

Write an async function that holds a reference across an .await. Box::pin it, poll it manually with a waker, and show it works. Then explain (in a comment) why moving the future between polls would break the internal reference.

Lesson 7: Combinators — join and select

Real-life analogy: cooking dinner

Join = make salad AND soup, serve when BOTH are done

You’re cooking dinner. You put soup on the stove and start chopping salad. You check the soup, chop some more, check the soup again. You keep switching between the two tasks. When both are done, you serve dinner.

You (the executor):

  "Is the soup done?"   → No, keep simmering     (Poll::Pending)
  "Is the salad done?"  → No, still chopping      (Poll::Pending)
  "Is the soup done?"   → No, keep simmering      (Poll::Pending)
  "Is the salad done?"  → Yes! Set it aside.       (Poll::Ready)
  "Is the soup done?"   → No, keep simmering      (Poll::Pending)
  "Is the soup done?"   → Yes! Take it off heat.   (Poll::Ready)

  Both done → serve dinner!  → return (soup, salad)

Join waits for all futures to complete. It returns a tuple of all results.

Select = order a taxi AND a bus ticket, take whichever arrives first

You need to get across town. You order a taxi on your phone and walk to the bus stop. Whichever arrives first, you take it — and cancel the other.

You (the executor):

  "Has the taxi arrived?"  → No                    (Poll::Pending)
  "Has the bus arrived?"   → No                    (Poll::Pending)
  "Has the taxi arrived?"  → No                    (Poll::Pending)
  "Has the bus arrived?"   → Yes! Get on the bus.  (Poll::Ready)

  Bus won → cancel the taxi!  → DROP the taxi future
  return bus

Select waits for the first future to complete. It returns that result and drops the loser.

How join works internally

join!(a, b) creates a single future that, each time it is polled, polls both a and b. It tracks which ones are done with flags. When all are Ready, it returns the collected results.

join!(soup, salad) — one combined future
─────────────────────────────────────────────────────────────────
  poll #1:
  ┌─────────────────────────────────────────────────────┐
  │  poll soup  → Pending    (soup_done = false)        │
  │  poll salad → Pending    (salad_done = false)       │
  │  → return Pending                                   │
  └─────────────────────────────────────────────────────┘

  poll #2:
  ┌─────────────────────────────────────────────────────┐
  │  poll soup  → Pending    (soup_done = false)        │
  │  poll salad → Ready(S)   (salad_done = true)        │
  │  → return Pending  (soup not done yet)              │
  └─────────────────────────────────────────────────────┘

  poll #3:
  ┌─────────────────────────────────────────────────────┐
  │  poll soup  → Ready(S)   (soup_done = true)         │
  │  skip salad (already done)                          │
  │  → return Ready((soup, salad))  ALL DONE!           │
  └─────────────────────────────────────────────────────┘

Key insight: join is one future that polls multiple sub-futures. There is no parallelism — it is concurrent on a single thread. Each call to poll on the join future polls each child that is not yet done.

Pseudocode

#![allow(unused)]
fn main() {
struct MyJoin<A, B> {
    a: A,
    b: B,
    a_result: Option<A::Output>,
    b_result: Option<B::Output>,
}

fn poll(self, cx) -> Poll<(A::Output, B::Output)> {
    if self.a_result.is_none() {
        if let Ready(val) = self.a.poll(cx) {
            self.a_result = Some(val);
        }
    }
    if self.b_result.is_none() {
        if let Ready(val) = self.b.poll(cx) {
            self.b_result = Some(val);
        }
    }
    if self.a_result.is_some() && self.b_result.is_some() {
        Ready((self.a_result.take().unwrap(), self.b_result.take().unwrap()))
    } else {
        Pending
    }
}
}

How select works internally

select!(a, b) creates a single future that polls both a and b each time. As soon as one returns Ready, it returns that value and drops the other future.

select!(taxi, bus) — one combined future
─────────────────────────────────────────────────────────────────
  poll #1:
  ┌─────────────────────────────────────────────────────┐
  │  poll taxi → Pending                                │
  │  poll bus  → Pending                                │
  │  → return Pending                                   │
  └─────────────────────────────────────────────────────┘

  poll #2:
  ┌─────────────────────────────────────────────────────┐
  │  poll taxi → Pending                                │
  │  poll bus  → Ready(bus_result)                      │
  │  → return Ready(Right(bus_result))                  │
  └─────────────────────────────────────────────────────┘

  After returning:
  ┌─────────────────────────────────────────────────────┐
  │  MySelect is dropped                                │
  │  → taxi future is DROPPED  ← CANCELLATION!         │
  │  → taxi's Drop impl runs                           │
  │  → any resources taxi held are freed                │
  └─────────────────────────────────────────────────────┘

The drop / cancellation problem (preview of Lesson 24)

When select drops the losing future, that future is cancelled mid-execution. Whatever state it was in — gone. This has real consequences:

  • If the future had written half a message to a buffer — that buffer is now incomplete
  • If the future had acquired a lock — the lock guard is dropped (unlocked), but any partial work under the lock is lost
  • If the future was a network request — the request is abandoned; the server may still process it
Cancelled future's lifecycle:

  poll #1: started work, allocated buffer        ┐
  poll #2: filled half the buffer                │  all of this
  poll #3: about to finish...                    │  state is LOST
           ↓                                     │
  DROP: future is destroyed, buffer freed        ┘

This is cancellation safety — a topic we will cover in depth in Lesson 24. For now, remember: select drops the loser, and the loser may have done partial work.

Rule of thumb: a future is cancellation-safe if dropping it at any await point does not lose data or leave things in an inconsistent state.

FuturesUnordered (brief mention)

What if you have not 2 but 100 futures, and you want to process results as they complete? FuturesUnordered from the futures crate is a collection that polls all contained futures and yields results in completion order.

#![allow(unused)]
fn main() {
use futures::stream::FuturesUnordered;
use futures::StreamExt;

let mut futs = FuturesUnordered::new();
futs.push(fetch("url1"));
futs.push(fetch("url2"));
futs.push(fetch("url3"));

while let Some(result) = futs.next().await {
    println!("Got: {result:?}");
}
}

Think of it as a dynamic select over many futures. We will use it in later lessons.

Exercises

Exercise 1: MyJoin

Implement MyJoin<A, B> — a future that polls two sub-futures and returns (A::Output, B::Output) when both are done. Implement the Future trait by hand.

Hints:

  • Store each sub-future and an Option for each result
  • On each poll, poll any sub-future whose result is still None
  • Return Ready only when both Options are Some
  • Remember to call cx.waker().wake_by_ref() when returning Pending

Exercise 2: MySelect

Implement MySelect<A, B> — a future that polls two sub-futures and returns whichever completes first. Use an enum Either<L, R> for the return type.

Make both sub-futures NamedFuture structs that print a message when dropped. Run the demo and observe the loser printing its drop message.

Exercise 3: join_all for a Vec

Implement a JoinAll<F> future that takes a Vec<F> and returns Vec<F::Output>. This generalizes MyJoin from 2 futures to N futures.

Hints:

  • Store Vec<Option<F>> for the futures and Vec<Option<F::Output>> for results
  • On each poll, iterate and poll any future that is still Some
  • When a future completes, store its result and replace the future with None
  • Return Ready when all results are Some

Exercise 4: map combinator

Implement a Map<F, Func> future that wraps a future F and applies a function Func to its output when ready. This lets you write:

#![allow(unused)]
fn main() {
let doubled = Map::new(CountdownFuture { count: 3 }, |()| 42);
// polls the countdown, then applies the function → returns 42
}

Hints:

  • Store the inner future and an Option<Func> (take the function out on Ready)
  • On poll: poll the inner future. If Ready, apply the function and return the mapped result. If Pending, return Pending.

Lesson 8: Async I/O Foundations

Real-life analogy: fishing with multiple rods

Imagine you have 10 fishing rods and a lake full of fish.

Blocking I/O = hold one rod at a time, stare at it:

You:   [hold rod 1........waiting........waiting........FISH!]
       [hold rod 2........waiting........waiting........FISH!]
       [hold rod 3........waiting............................]

Total: you can only fish one rod at a time.
       9 rods sit idle while you stare at one.

Non-blocking I/O = plant all 10 rods, walk between them checking each:

You:   [check rod 1: nothing]
       [check rod 2: nothing]
       [check rod 3: FISH! → reel it in]
       [check rod 4: nothing]
       [check rod 5: nothing]
       [check rod 1: nothing]  ← loop back
       [check rod 2: FISH! → reel it in]
       ...

Total: you catch fish faster, but you burn energy walking
       back and forth even when nothing is biting (busy-wait).

Event-driven I/O = attach a bell to each rod, sit and wait for a bell:

You:   [sitting..............................RING! rod 3]
       [reel in rod 3]
       [sitting..........RING! rod 7, rod 1]
       [reel in rod 7, reel in rod 1]
       ...

Total: zero wasted effort. You only act when something
       is actually ready. This is kqueue / epoll.

The bell system is exactly how modern async I/O works:

  1. Plant rods (open sockets, register with kqueue/epoll)
  2. Sit and wait (call kqueue_wait / epoll_wait)
  3. Bell rings (OS says “fd 7 is readable”)
  4. Reel in (read the data)

How the OS tells you a socket is ready

When your program calls read() on a TCP socket, the kernel checks if any data has arrived in the socket’s receive buffer. If not, blocking read() puts your thread to sleep. In async, you cannot afford to sleep — you need the OS to notify you instead.

kqueue (macOS) / epoll (Linux)

These are kernel APIs for event notification:

  1. Create an event queue: kqueue() or epoll_create()
  2. Register interest: “tell me when fd 5 is readable”
  3. Wait: block until ANY registered fd has an event
  4. Process: handle the ready fds
  5. Loop back to step 3
// Pseudocode (kqueue)
int kq = kqueue();
register(kq, socket_fd, EVFILT_READ);  // "notify me when readable"

loop {
    int n = kevent(kq, NULL, 0, events, MAX, NULL);  // wait
    for (int i = 0; i < n; i++) {
        int ready_fd = events[i].ident;
        // ready_fd has data — read without blocking
    }
}

One kevent() call can watch thousands of file descriptors simultaneously. This is why nginx and tokio can handle 100K connections on a single thread.

The three syscall patterns

Pattern 1: Blocking read

Thread          Kernel
  │                │
  │── read(fd) ───>│
  │   (blocked)    │  ...waiting for data...
  │   (blocked)    │  ...still waiting...
  │<── data ───────│
  │                │

Thread is frozen. Cannot do anything else. One thread per connection.

Pattern 2: Non-blocking read (poll loop)

Thread              Kernel
  │                    │
  │── read(fd) ───────>│
  │<── WouldBlock ─────│  (no data yet)
  │                    │
  │── read(fd) ───────>│
  │<── WouldBlock ─────│  (still no data)
  │                    │
  │── read(fd) ───────>│
  │<── 42 bytes ───────│  (data arrived!)
  │                    │

Thread is not frozen, but it wastes CPU spinning in a loop. This is the “walking between fishing rods” approach.

Pattern 3: Event notification (kqueue/epoll)

Thread              Kernel (kqueue)
  │                    │
  │── register(fd) ───>│  "watch fd for readability"
  │<── ok ─────────────│
  │                    │
  │── wait() ─────────>│
  │   (sleeping)       │  ...kernel watches all fds...
  │   (sleeping)       │  ...data arrives on fd...
  │<── [fd ready] ─────│  "fd has data"
  │                    │
  │── read(fd) ───────>│
  │<── 42 bytes ───────│  (guaranteed not to block)
  │                    │

Thread sleeps efficiently (no CPU usage). Kernel wakes it only when something is ready. One thread watches thousands of fds.

Timeline comparison

Time ──────────────────────────────────────────────────>

Blocking (3 sockets, 3 threads):
  Thread 1: ████████████████░░░░░░░░░░░  (blocked on fd 1)
  Thread 2: ░░░░░░░░████████████████░░░  (blocked on fd 2)
  Thread 3: ░░░░░░░░░░░░░░░░████████░░░  (blocked on fd 3)
  Cost: 3 threads, 24 MB stack memory

Non-blocking (3 sockets, 1 thread):
  Thread 1: ○○○○○●○○○○○●○○●○○○○○○○○○○○  (● = data, ○ = WouldBlock)
  Cost: 1 thread, but 100% CPU usage spinning

Event-driven (3 sockets, 1 thread):
  Thread 1: ___________●____●__●________  (● = event, _ = sleeping)
  Cost: 1 thread, near-zero CPU when idle

Non-blocking sockets in Rust

Standard library sockets are blocking by default. You flip them to non-blocking mode with one call:

#![allow(unused)]
fn main() {
use std::net::TcpStream;
use std::io::{self, Read};

let stream = TcpStream::connect("127.0.0.1:8080")?;
stream.set_nonblocking(true)?;  // ← the magic switch

let mut buf = [0u8; 1024];
match stream.read(&mut buf) {
    Ok(n) => println!("got {n} bytes"),
    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
        println!("no data yet — try again later");
    }
    Err(e) => eprintln!("real error: {e}"),
}
}

WouldBlock is not a failure — it means “nothing to read right now.” The key insight: in non-blocking mode, you get to choose when to retry instead of having the OS freeze your thread.

kqueue/epoll explained

The event loop pattern is the same on every OS, just different syscalls:

┌─────────────────────────────────────────────────────┐
│                    Event Loop                        │
│                                                      │
│   ┌──────────────┐     ┌──────────────┐             │
│   │  Register fd │────>│   kqueue /   │             │
│   │  + interest  │     │   epoll      │             │
│   └──────────────┘     │  (kernel)    │             │
│                        └──────┬───────┘             │
│                               │                      │
│                        ┌──────▼───────┐             │
│                        │    wait()    │             │
│                        │  (sleeps)   │             │
│                        └──────┬───────┘             │
│                               │ wakes up            │
│                        ┌──────▼───────┐             │
│                        │ ready fds:   │             │
│                        │ [3, 7, 12]   │             │
│                        └──────┬───────┘             │
│                               │                      │
│                        ┌──────▼───────┐             │
│                        │ process each │─── loop ──┐ │
│                        │   ready fd   │           │ │
│                        └──────────────┘           │ │
│                               ▲                    │ │
│                               └────────────────────┘ │
└─────────────────────────────────────────────────────┘

Steps:

  1. Create: kqueue() returns a file descriptor for the event queue itself
  2. Register: kevent(kq, &changes, ...) — add fds you care about
  3. Wait: kevent(kq, NULL, 0, &events, max, timeout) — blocks until ready
  4. Process: iterate over returned events, read/write the ready fds
  5. Repeat: go back to step 3

The mio crate

mio (Metal I/O) is a thin, cross-platform wrapper around kqueue/epoll/IOCP. Tokio is built on top of mio. The core types:

TypePurpose
PollOwns the kqueue/epoll fd. You call poll.poll() to wait.
EventsBuffer that poll() fills with ready events.
Token(usize)Your label for each fd. When an event fires, you get the token back.
InterestWhat you care about: READABLE, WRITABLE, or both.
RegistryObtained from poll.registry(). Used to register/deregister fds.
#![allow(unused)]
fn main() {
use mio::{Poll, Events, Token, Interest};
use mio::net::TcpListener;

let mut poll = Poll::new()?;
let mut events = Events::with_capacity(128);

let addr = "127.0.0.1:9000".parse()?;
let mut listener = TcpListener::bind(addr)?;

// Register: "tell me when listener has a new connection"
poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;

loop {
    // Wait: sleep until something is ready
    poll.poll(&mut events, None)?;

    for event in events.iter() {
        match event.token() {
            Token(0) => {
                // Listener is readable → accept new connection
                let (mut conn, addr) = listener.accept()?;
                println!("new connection from {addr}");

                // Register the new connection too
                poll.registry().register(
                    &mut conn,
                    Token(1),
                    Interest::READABLE,
                )?;
            }
            Token(1) => {
                // Connection is readable → read data
            }
            _ => unreachable!(),
        }
    }
}
}

How this connects to the async executor

The bridge from OS events to futures looks like this:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│    Kernel     │     │   Reactor    │     │   Executor   │
│  (kqueue/     │     │  (mio Poll   │     │  (task queue  │
│   epoll)      │     │   loop)      │     │   + polling)  │
└──────┬───────┘     └──────┬───────┘     └──────┬───────┘
       │                     │                     │
       │  fd 7 readable      │                     │
       │────────────────────>│                     │
       │                     │  waker.wake()       │
       │                     │  for task on fd 7   │
       │                     │────────────────────>│
       │                     │                     │  poll task's
       │                     │                     │  future
       │                     │                     │──┐
       │                     │                     │  │ Future::poll()
       │                     │                     │<─┘ → Ready(data)
  1. Future calls read() on a non-blocking socket → gets WouldBlock
  2. Future registers the fd with the reactor and stores the Waker
  3. Future returns Poll::Pending
  4. Reactor’s mio poll loop eventually gets an event for that fd
  5. Reactor calls waker.wake() for the associated task
  6. Executor re-polls the future
  7. This time, read() succeeds → future returns Poll::Ready(data)

This is the complete chain. Lesson 9 builds the reactor. This lesson gives you the foundation: raw I/O primitives that the reactor wraps.

Exercises

Exercise 1: Raw non-blocking socket

Create a TCP listener. Accept a connection with set_nonblocking(true). Try to read in a loop — print each WouldBlock and sleep 100ms between retries. When data arrives, print it and exit. Use only std::net, no mio.

Exercise 2: kqueue/mio event loop

Replace the busy-wait loop from Exercise 1 with mio::Poll. Register the accepted connection for READABLE interest. Call poll.poll() to sleep until data arrives. Read and print. Compare CPU usage with Exercise 1.

Exercise 3: mio TCP echo server

Build a multi-client echo server using mio. The listener gets Token(0). Each accepted connection gets Token(next_id). Store connections in a HashMap<Token, TcpStream>. On a readable event, read data and write it back. Handle client disconnections by deregistering and removing from the map.

Exercise 4: Connect reactor to waker

Extend Exercise 2: instead of reading directly in the event handler, store a Waker when you register the fd. When the event fires, call waker.wake() instead of reading. In a separate “executor” loop, poll the future which then does the actual read. This is the reactor pattern that Lesson 9 will build in full.

Project 1: Mini Executor + TCP Echo Server

Prerequisites: Lessons 1-8 (futures, state machines, wakers, tasks, executor, pinning, combinators, async I/O, reactor). This project ties them all together into a working system.

Overview

Build a single-threaded async runtime from scratch that runs a real TCP echo server. No tokio. No external async runtime. You write every layer: executor, reactor, async TCP types. Then you run real network I/O on it.

This is the moment everything clicks. You’ve built futures, wakers, an executor, and a reactor in isolation. Now you combine them into one program that handles multiple concurrent TCP clients on a single thread.

Architecture

                        Your Runtime
┌──────────────────────────────────────────────────────┐
│                                                      │
│  ┌────────────────────────────────────────────────┐  │
│  │                 Executor                       │  │
│  │                                                │  │
│  │  task queue: [task_1, task_2, task_3, ...]     │  │
│  │                                                │  │
│  │  loop {                                        │  │
│  │      drain queue → poll each task              │  │
│  │      if queue empty → ask reactor to wait      │  │
│  │  }                                             │  │
│  └──────────────────────┬─────────────────────────┘  │
│                         │                            │
│                    wake(task)                         │
│                         │                            │
│  ┌──────────────────────┴─────────────────────────┐  │
│  │                  Reactor                        │  │
│  │                                                │  │
│  │  mio::Poll  ←──  kqueue / epoll               │  │
│  │  wakers: HashMap<Token, Waker>                 │  │
│  │                                                │  │
│  │  poll() → event on Token(3)                    │  │
│  │        → look up waker for Token(3)            │  │
│  │        → waker.wake() → task back in queue     │  │
│  └────────┬──────────┬───────────────────────────┘  │
│           │          │                               │
│     ┌─────┴───┐ ┌────┴────┐                         │
│     │Listener │ │ Stream  │  ← async wrappers       │
│     │Token(0) │ │Token(1) │    around std sockets    │
│     └────┬────┘ └────┬────┘    set to non-blocking   │
│          │           │                               │
└──────────┼───────────┼───────────────────────────────┘
           │           │
     ┌─────┴─────┐ ┌───┴───┐
     │ Client A  │ │Client B│    multiple concurrent
     │ nc ..8080 │ │nc ..   │    connections on one
     └───────────┘ └────────┘    thread

Data flow for one echo

1. Client sends "hello\n" over TCP
2. kqueue/epoll fires → reactor sees Token(3) is READABLE
3. Reactor looks up waker for Token(3) → waker.wake()
4. Executor re-polls the task that owns Token(3)
5. Task calls stream.read() → gets "hello" (non-blocking, data is ready)
6. Task calls stream.write("hello") → registers WRITABLE if needed
7. Data goes back to the client

What you implement

ComponentDescription
block_onRuns the top-level future to completion on the current thread
spawnWraps a future in a Task, pushes it to the executor queue
ReactorOwns mio::Poll, maps Tokens to Wakers, dispatches events
TcpListenerAsync wrapper around mio::net::TcpListener, returns TcpStream on accept
TcpStreamAsync read/write around mio::net::TcpStream with reactor registration
Event loopThe main loop: drain tasks, poll reactor, repeat

The goal

When you’re done, this code runs:

#![allow(unused)]
fn main() {
my_runtime::block_on(async {
    let listener = my_runtime::TcpListener::bind("127.0.0.1:8080").await;
    loop {
        let stream = listener.accept().await;
        my_runtime::spawn(handle_client(stream));
    }
});

async fn handle_client(mut stream: my_runtime::TcpStream) {
    let mut buf = [0u8; 1024];
    loop {
        let n = stream.read(&mut buf).await;
        if n == 0 { return; }          // client disconnected
        stream.write_all(&buf[..n]).await;
    }
}
}

Multiple clients connect at the same time. Each gets its own task. All tasks run concurrently on a single thread.

Step-by-step implementation guide

Step 1: Start with block_on (from Lesson 5)

Copy your block_on from Lesson 5. It should:

  1. Pin the future
  2. Create a waker that calls thread::unpark()
  3. Loop: poll → Ready? return. Pending? thread::park()

At this point you can run a single future to completion.

Step 2: Add the Reactor (from Lesson 8)

Create a global Reactor (use thread_local! or a static with OnceLock):

#![allow(unused)]
fn main() {
struct Reactor {
    poll: mio::Poll,
    wakers: HashMap<Token, Waker>,
    next_token: usize,
}
}

It needs three methods:

  • register(source, interest) -> Token – register a socket, return a token
  • set_waker(token, waker) – store the waker for a token
  • wait() – call poll.poll(), then for each event, call the stored waker

Step 3: Implement TcpListener

Wrap mio::net::TcpListener:

#![allow(unused)]
fn main() {
struct TcpListener {
    inner: mio::net::TcpListener,
    token: Token,
}
}

bind() creates the listener, registers it with the reactor for READABLE. accept() returns a future that:

  • Tries inner.accept() – if it returns a connection, wrap it in TcpStream and return Ready
  • If WouldBlock – store the waker with the reactor, return Pending

Step 4: Implement TcpStream

Wrap mio::net::TcpStream:

#![allow(unused)]
fn main() {
struct TcpStream {
    inner: mio::net::TcpStream,
    token: Token,
}
}

read() and write_all() are futures:

  • Try the operation – if it succeeds, return Ready
  • If WouldBlock – register waker, return Pending

Step 5: Add spawn + task queue

Add a shared task queue (just like Lesson 5’s multi-task executor):

#![allow(unused)]
fn main() {
static TASK_QUEUE: Mutex<VecDeque<Arc<Task>>> = ...;
}

spawn(future) wraps the future in a Task, pushes it to the queue.

Update block_on to also drain the task queue on each iteration:

  1. Poll the main future
  2. Drain and poll all queued tasks
  3. If nothing is ready, call reactor.wait() (which blocks until an event)
  4. Repeat

Step 6: Wire reactor events to wakers

This is where it all connects. When reactor.wait() fires:

  • It gets events from mio::Poll
  • For each event, it finds the stored waker and calls waker.wake()
  • wake() pushes the task back into the queue
  • The executor loop picks it up and polls it

Test the full loop:

  1. Start the server
  2. Connect with nc
  3. Type a message
  4. See it echoed back
  5. Connect a second client – both work concurrently

Testing

Manual testing with netcat

Terminal 1 – start the server:

cargo run -p async-lessons --bin p1-echo-server -- run

Terminal 2 – connect a client:

nc 127.0.0.1 8080
hello           # type this
hello           # server echoes it back

Terminal 3 – connect another client simultaneously:

echo "world" | nc 127.0.0.1 8080
world           # echoed back

Automated self-test

The test subcommand spawns the server, connects a client, sends data, and checks the echo:

cargo run -p async-lessons --bin p1-echo-server -- test

Exercises

Exercise 1: Basic echo server

Implement all the components and get the goal code running. One client at a time is fine for this exercise. Confirm with nc.

Success criteria: type a line into nc, see it echoed back.

Exercise 2: Multiple concurrent clients

Make spawn work so multiple clients are served concurrently. Open 3-4 nc sessions and verify they all echo independently without blocking each other.

Success criteria: send messages from multiple nc sessions interleaved – each gets its own echo back immediately.

Exercise 3: Add a timeout

Add an AsyncTimer future (backed by a background thread or the reactor’s poll timeout). Disconnect clients that send nothing for 10 seconds:

#![allow(unused)]
fn main() {
async fn handle_client(mut stream: my_runtime::TcpStream) {
    let mut buf = [0u8; 1024];
    loop {
        match my_runtime::timeout(Duration::from_secs(10), stream.read(&mut buf)).await {
            Ok(0) => return,
            Ok(n) => stream.write_all(&buf[..n]).await,
            Err(_timeout) => {
                eprintln!("client timed out");
                return;
            }
        }
    }
}
}

Success criteria: connect with nc, wait 10 seconds without typing, connection drops.

Lesson 9: Event Loop + Reactor

Prerequisites: Lesson 5 (Executor), Lesson 8 (Async I/O). You need to understand both the executor’s poll loop and how kqueue/epoll works before combining them.

Real-life analogy: the hotel front desk

A hotel front desk manages hundreds of rooms:

┌─────────────────────────────────────────────────────┐
│  Front Desk (Reactor)                               │
│                                                     │
│  Room Registry:                                     │
│    Room 101 → Guest A's wake-up call at 7am        │
│    Room 205 → Guest B wants fresh towels            │
│    Room 312 → Guest C ordered room service          │
│                                                     │
│  Loop:                                              │
│    1. Wait for any event (phone rings, bell rings)  │
│    2. Look up which room it's for                   │
│    3. Notify the right guest                        │
│    4. Back to waiting                               │
│                                                     │
│  The desk doesn't DO the work (cooking, cleaning).  │
│  It just routes notifications to the right person.  │
└─────────────────────────────────────────────────────┘

The reactor works the same way:

  • Rooms = file descriptors (sockets)
  • Guests = tasks (futures waiting for I/O)
  • Registry = HashMap<Token, Waker>
  • Phone ringing = kqueue/epoll event
  • Notifying the guest = waker.wake()

The reactor doesn’t read or write data. It just tells futures “your socket is ready — try now.”

What is a Reactor?

The reactor is the component that connects the OS event system to your async runtime:

┌───────────────────────────────────────────────────────────┐
│                        Reactor                            │
│                                                           │
│   ┌──────────┐         ┌─────────────────────────┐       │
│   │ mio::Poll│         │ wakers: HashMap          │       │
│   │          │         │   Token(0) → Waker_A     │       │
│   │ wraps    │         │   Token(1) → Waker_B     │       │
│   │ kqueue / │         │   Token(2) → Waker_C     │       │
│   │ epoll    │         │                           │       │
│   └─────┬────┘         └──────────┬──────────────┘       │
│         │                         │                       │
│         │  poll() returns         │  look up waker        │
│         │  [Token(1), Token(2)]   │  for each token       │
│         │                         │                       │
│         └────────────┬────────────┘                       │
│                      │                                    │
│                      ▼                                    │
│              waker_b.wake()                               │
│              waker_c.wake()                               │
│              → tasks re-queued in executor                │
│                                                           │
└───────────────────────────────────────────────────────────┘

The Reactor struct

#![allow(unused)]
fn main() {
struct Reactor {
    /// The OS event system (wraps kqueue on macOS, epoll on Linux)
    poll: mio::Poll,

    /// Maps tokens to wakers. When an event fires for Token(N),
    /// we look up the waker here and call wake().
    wakers: HashMap<mio::Token, Waker>,

    /// Counter for assigning unique tokens to new sockets
    next_token: usize,
}
}

Three core methods:

register — “watch this socket”

#![allow(unused)]
fn main() {
fn register(&mut self, source: &mut impl Source, interest: Interest) -> mio::Token {
    let token = mio::Token(self.next_token);
    self.next_token += 1;
    self.poll.registry().register(source, token, interest).unwrap();
    token
}
}

Called when a future first needs to wait for I/O. The future says “tell me when socket X is readable.”

set_waker — “here’s how to notify me”

#![allow(unused)]
fn main() {
fn set_waker(&mut self, token: mio::Token, waker: Waker) {
    self.wakers.insert(token, waker);
}
}

Called during poll() when a future returns Pending. The future stores its waker so the reactor can notify it later.

wait — “block until something happens”

#![allow(unused)]
fn main() {
fn wait(&mut self) {
    let mut events = mio::Events::with_capacity(64);
    self.poll.poll(&mut events, None).unwrap();  // blocks!

    for event in events.iter() {
        if let Some(waker) = self.wakers.get(&event.token()) {
            waker.wake_by_ref();  // re-queue the task
        }
    }
}
}

This is where the thread sleeps. poll() blocks until the OS says a socket is ready. Then we wake the corresponding tasks.

How Reactor + Executor work together

Executor                         Reactor                        OS
   │                                │                            │
   ├── poll task_A ──►              │                            │
   │   task_A tries read()          │                            │
   │   → WouldBlock                 │                            │
   │   task_A calls:                │                            │
   │     reactor.set_waker(tok, wk) │                            │
   │   ◄── Pending ────────────────│                            │
   │                                │                            │
   ├── queue empty                  │                            │
   ├── reactor.wait() ────────────►│                            │
   │                                ├── poll.poll() ────────────►│
   │                                │   (thread sleeps)          │
   │                                │                     data arrives!
   │                                │   ◄── Token(0) ready ─────┤
   │                                │                            │
   │                                ├── waker.wake() ──►        │
   │   ◄── task_A re-queued ────────┤                            │
   │                                │                            │
   ├── poll task_A ──►              │                            │
   │   task_A tries read()          │                            │
   │   → got data!                  │                            │
   │   ◄── Ready(data) ───────────│                            │

The key insight: the executor never busy-polls. When there’s nothing to do, it calls reactor.wait() which blocks until the OS has an event. Zero CPU usage while waiting.

Where does the Reactor live?

The reactor needs to be accessible from:

  1. Futures — to call register() and set_waker() during poll()
  2. The executor — to call wait() when the queue is empty

Common patterns:

Option A: Thread-local (single-threaded runtime)
  thread_local! { static REACTOR: RefCell<Reactor> = ... }

Option B: Arc<Mutex<Reactor>> (shared, but lock contention)

Option C: Global static with OnceLock (initialized once)
  static REACTOR: OnceLock<Mutex<Reactor>> = OnceLock::new();

Tokio uses a more sophisticated approach — the reactor runs on a dedicated thread and communicates via channels. For our mini-runtime, thread-local is simplest.

Readiness vs Completion

The reactor uses the readiness model:

  • “Socket is readable” = you CAN try read() and it probably won’t block
  • It does NOT mean “data has been read into your buffer”
  • You still need to call read() yourself — it might still return WouldBlock

This is different from the completion model (used by io_uring on Linux):

  • “Read is complete” = data is already in your buffer
  • No need to call read() again
Readiness (kqueue/epoll/mio):
  1. Register: "tell me when fd is readable"
  2. Event fires: "fd is readable"
  3. You call read(fd) → get data (usually)

Completion (io_uring):
  1. Submit: "read fd into this buffer"
  2. Event fires: "read is done, data is in your buffer"
  3. Just use the buffer

Tokio uses readiness (mio). This matters because your futures must handle WouldBlock even after being woken — the event was a hint, not a guarantee.

Exercises

Exercise 1: Single-socket reactor

Build a minimal Reactor that watches one TcpListener:

  1. Create mio::Poll and mio::Events
  2. Create a mio::net::TcpListener, register it for READABLE
  3. Loop: poll.poll(), accept connections, print their address
  4. No executor, no wakers — just the raw event loop

Test: run the program, connect with nc 127.0.0.1 8080 from another terminal.

Exercise 2: Multi-socket reactor

Extend Exercise 1 to handle multiple connections:

  1. Accept connections, register each for READABLE
  2. Map each Token to a TcpStream (use a HashMap<Token, TcpStream>)
  3. When a stream is readable, read data and echo it back
  4. Handle disconnection (read returns 0)

Test: connect 3 clients, type in each — all should echo independently.

Exercise 3: Waker integration

Wire the reactor into a waker:

  1. Create a reactor with a HashMap<Token, Waker>
  2. Create a ReadableFuture that:
    • First poll: registers the socket with the reactor, stores waker, returns Pending
    • Later polls: tries read() — if data, returns Ready(data), if WouldBlock, returns Pending
  3. Use block_on from Lesson 5 to run the future
  4. The reactor’s wait() should be called when the executor has nothing to do

This is the bridge between Course 1 (individual components) and Course 2 (integrated runtime).

Exercise 4: Deregistration and cleanup

Add proper cleanup:

  1. When a connection closes, deregister its token from mio::Poll
  2. Remove the waker from the HashMap
  3. Verify no leaked entries — print the waker map size periodically
  4. Handle the case where a waker fires for a token that was already deregistered (it should be a no-op)

Lesson 10: Task Scheduling

Prerequisites: Lesson 4 (Tasks), Lesson 5 (Executor), Lesson 9 (Reactor). You need to understand what a task is and how the reactor wakes them.

Real-life analogy: the hospital ER

An emergency room triages patients:

┌──────────────────────────────────────────────────────┐
│  ER Waiting Room (Run Queue)                         │
│                                                      │
│  [Patient A: broken arm]  ← arrived first            │
│  [Patient B: headache]    ← arrived second           │
│  [Patient C: chest pain]  ← arrived third            │
│                                                      │
│  Triage Nurse (Scheduler):                           │
│    FIFO: treat A first (arrived first)               │
│    Priority: treat C first (most urgent)             │
│    Fair: give each patient 5 minutes, rotate         │
│                                                      │
│  Doctor (Executor):                                  │
│    Takes next patient from queue                     │
│    Examines them (poll)                              │
│    If done → discharge (Ready)                       │
│    If not → send back to waiting room (Pending)      │
│    If needs X-ray → nurse will call them (waker)     │
└──────────────────────────────────────────────────────┘

Different scheduling strategies:

  • FIFO: first in, first out. Simple, fair-ish, what we build first.
  • Priority: urgent tasks first. Used for I/O vs timers.
  • Round-robin: each task gets a time slice. Prevents starvation.
  • Work-stealing: multiple doctors, idle ones steal from busy queues. Lesson 14.

What is a scheduler?

The scheduler decides which task to poll next. In Lesson 5, we had a simple loop:

#![allow(unused)]
fn main() {
// Lesson 5: naive approach
while let Some(task) = queue.pop_front() {
    poll(task);  // poll whatever's next in the queue
}
}

This works but has problems:

  • A task that always returns Pending and immediately wakes itself monopolizes the CPU
  • No way to prioritize I/O-ready tasks over timer tasks
  • No fairness guarantee

The run queue

The run queue is where tasks wait to be polled. When waker.wake() is called, the task is pushed to the back of the queue:

                    Run Queue (VecDeque<Arc<Task>>)
                    ┌───────────────────────────────┐
        push back → │ task_C │ task_A │ task_B │    │ ← pop front
                    └───────────────────────────────┘

Executor loop:
  1. Pop task_B from front
  2. Poll it
  3. If Pending → waker will push it back later
  4. If Ready → done, drop the task
  5. Pop next (task_A)
  6. ...

Fairness

A task that wakes itself immediately goes to the back of the queue. This gives other tasks a chance to run:

Queue: [A, B, C]
  Poll A → Pending, wakes self → queue: [B, C, A]
  Poll B → Pending, wakes self → queue: [C, A, B]
  Poll C → Ready → queue: [A, B]
  Poll A → Ready → queue: [B]
  Poll B → Ready → queue: []

Each task gets one turn per cycle. This is cooperative multitasking — tasks must yield (return Pending) to let others run. A task that never yields starves everyone.

The ArcWake pattern

In Lesson 3, we built wakers manually with RawWaker. There’s a cleaner way — implement the Wake trait:

#![allow(unused)]
fn main() {
use std::task::Wake;

impl Wake for Task {
    fn wake(self: Arc<Self>) {
        // Push ourselves back into the queue
        self.queue.lock().unwrap().push_back(self.clone());
    }
}
}

Then create a waker from an Arc:

#![allow(unused)]
fn main() {
let waker = Waker::from(task.clone());  // calls task.wake() when woken
}

No unsafe code. The Wake trait handles the vtable for you.

Lesson 3 (manual):                    Lesson 10 (Wake trait):
  RawWaker { data, vtable }            impl Wake for Task {
  unsafe fn clone/wake/drop                fn wake(self: Arc<Self>) { ... }
  → error-prone, lots of unsafe        }
                                        Waker::from(arc_task)
                                        → safe, clean, idiomatic

JoinHandle: getting results from tasks

When you spawn(), you want to get the result back:

#![allow(unused)]
fn main() {
let handle = spawn(async { 42 });
let result = handle.await;  // 42
}

JoinHandle<T> is a future that resolves when the task completes:

┌─────────────────────────────────────────────────────┐
│  Shared state (Arc<Mutex<JoinState>>)               │
│                                                     │
│  result: Option<T>       ← None until task finishes │
│  waker: Option<Waker>   ← set by JoinHandle::poll  │
│                                                     │
│  Task writes result, wakes JoinHandle               │
│  JoinHandle checks result                           │
└──────────────────┬──────────────────────────────────┘
                   │
          shared by both sides
                   │
    ┌──────────────┴──────────────┐
    │                             │
    ▼                             ▼
┌─────────┐                ┌──────────────┐
│  Task   │                │  JoinHandle  │
│         │                │  (a Future)  │
│ runs    │                │              │
│ future  │   completes    │ poll():      │
│ stores  │ ──────────────►│  result set? │
│ result  │                │  yes → Ready │
│ wakes   │                │  no → Pending│
│ handle  │                │              │
└─────────┘                └──────────────┘

Implementation:

#![allow(unused)]
fn main() {
struct JoinState<T> {
    result: Option<T>,
    waker: Option<Waker>,
}

struct JoinHandle<T> {
    state: Arc<Mutex<JoinState<T>>>,
}

impl<T> Future for JoinHandle<T> {
    type Output = T;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<T> {
        let mut state = self.state.lock().unwrap();
        if let Some(result) = state.result.take() {
            Poll::Ready(result)
        } else {
            state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
}

Putting it together: the full executor

#![allow(unused)]
fn main() {
struct Executor {
    queue: Arc<Mutex<VecDeque<Arc<Task>>>>,
    reactor: Reactor,
}

impl Executor {
    fn spawn<T>(&self, future: impl Future<Output=T> + Send + 'static) -> JoinHandle<T> {
        // 1. Create shared JoinState
        // 2. Wrap future: when it completes, store result and wake handle
        // 3. Create Task with the wrapped future
        // 4. Push to queue
        // 5. Return JoinHandle
    }

    fn run(&mut self) {
        loop {
            // 1. Drain queue, poll each task
            let tasks: Vec<_> = self.queue.lock().unwrap().drain(..).collect();

            if tasks.is_empty() {
                // 2. Nothing to do → block on reactor
                self.reactor.wait(None);
                continue;
            }

            for task in tasks {
                let waker = Waker::from(task.clone());
                let mut cx = Context::from_waker(&waker);
                let mut future = task.future.lock().unwrap();
                let _ = future.as_mut().poll(&mut cx);
                // If Ready → task dropped (not re-queued)
                // If Pending → waker will re-queue later
            }
        }
    }
}
}

Exercises

Exercise 1: Spawn and join

Implement spawn() returning a JoinHandle<T>. Spawn three tasks that each return a number. Await all three handles, assert the sum.

#![allow(unused)]
fn main() {
let a = executor.spawn(async { 10 });
let b = executor.spawn(async { 20 });
let c = executor.spawn(async { 30 });
// a.await + b.await + c.await == 60
}

Exercise 2: Task fairness

Spawn a “greedy” task that yields 1000 times (returns Pending + wakes each time) and a “quick” task that completes in one poll. Track the order of completions. The quick task should complete within a few polls, not after 1000 — proving FIFO fairness.

Exercise 3: Starvation detection

Spawn a task that never yields (infinite loop inside poll). Show that other tasks never run. Then add a yield point (YieldOnce from Lesson 2). Show that fairness is restored.

This demonstrates why async is cooperative — tasks must yield voluntarily.

Exercise 4: JoinHandle cancellation

Drop a JoinHandle before the task completes. Add a cancelled flag to JoinState — when the handle is dropped, set the flag. The task checks the flag on each poll and aborts early if cancelled.

Test: spawn a task that increments a counter each poll. Drop the handle after 3 polls. Assert the counter is 3, not more.

Lesson 11: AsyncRead / AsyncWrite

Prerequisites: Lesson 9 (Reactor), Lesson 10 (Task Scheduling). You need a working reactor that maps tokens to wakers and a scheduler that polls tasks.

Real-life analogy: the drive-through window

At a drive-through:

Blocking:
  You:     "One burger please"
  Window:  (silence for 5 minutes while they cook)
  You:     (frozen, can't do anything, car is blocked)
  Window:  "Here's your burger"

Non-blocking:
  You:     "One burger please"
  Window:  "Not ready, come back later" (WouldBlock)
  You:     Drive away, do other errands
  You:     Come back: "Ready yet?"
  Window:  "Not ready" (WouldBlock again)
  You:     Come back: "Ready yet?"
  Window:  "Here's your burger!"

Async (what we're building):
  You:     "One burger please. Call me when it's ready."
  Window:  (stores your number)    → reactor.set_waker(token, waker)
  You:     Go do other errands     → return Poll::Pending
           ...
  Window:  (rings your phone)      → waker.wake()
  You:     Drive back, pick it up  → read() → Ready(burger)

AsyncRead/AsyncWrite wraps the “non-blocking + callback” pattern into a clean .await-able API.

The pattern: try → WouldBlock → register → Pending

Every async I/O operation follows the same pattern:

fn poll_read(cx, buf) → Poll<usize>:
  ┌──────────────────────────────┐
  │ Try read(buf)                │
  │                              │
  │ Got data (n bytes)?          │
  │   → return Poll::Ready(n)   │
  │                              │
  │ Got WouldBlock?              │
  │   → reactor.set_waker(token, cx.waker())
  │   → return Poll::Pending    │
  │                              │
  │ Got error?                   │
  │   → return Poll::Ready(Err) │
  └──────────────────────────────┘

This is the readiness pattern:

  1. Try the operation (non-blocking socket, so it returns immediately)
  2. If success → return the data
  3. If WouldBlock → register with the reactor, yield to the executor
  4. When the reactor fires → executor re-polls → back to step 1

The AsyncTcpStream

#![allow(unused)]
fn main() {
struct AsyncTcpStream {
    /// The underlying non-blocking socket
    inner: mio::net::TcpStream,
    /// Token registered with the reactor (for event matching)
    token: Token,
}
}

poll_read

#![allow(unused)]
fn main() {
impl AsyncTcpStream {
    fn poll_read(
        &mut self,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        match self.inner.read(buf) {
            Ok(n) => Poll::Ready(Ok(n)),          // got data
            Err(e) if e.kind() == WouldBlock => {
                reactor().set_waker(self.token, cx.waker().clone());
                Poll::Pending                      // wait for reactor
            }
            Err(e) => Poll::Ready(Err(e)),        // real error
        }
    }
}
}

The read future

To make this .await-able, wrap it in a future:

#![allow(unused)]
fn main() {
struct ReadFuture<'a> {
    stream: &'a mut AsyncTcpStream,
    buf: &'a mut [u8],
}

impl<'a> Future for ReadFuture<'a> {
    type Output = io::Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_read(cx, self.buf)
    }
}

// Usage:
let n = stream.read(&mut buf).await?;
}

poll_write

Same pattern, but for writing:

#![allow(unused)]
fn main() {
fn poll_write(&mut self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
    match self.inner.write(buf) {
        Ok(n) => Poll::Ready(Ok(n)),
        Err(e) if e.kind() == WouldBlock => {
            reactor().set_waker(self.token, cx.waker().clone());
            Poll::Pending
        }
        Err(e) => Poll::Ready(Err(e)),
    }
}
}

Visualizing the data flow

Application code:
  let n = stream.read(&mut buf).await;

Calls:
  ReadFuture::poll()
    │
    ├── stream.inner.read(buf)
    │     │
    │     ├── Ok(n) ──────────────────► Poll::Ready(Ok(n))
    │     │                              executor receives data
    │     │
    │     └── Err(WouldBlock) ────────► reactor.set_waker(token, waker)
    │                                   Poll::Pending
    │                                    │
    │         ... executor does other    │
    │             tasks ...              │
    │                                    │
    │         OS: data arrives on socket │
    │         reactor: event for Token(N)│
    │         reactor: waker.wake()      │
    │                                    │
    └── (re-polled by executor) ────────┘
        stream.inner.read(buf) → Ok(n)
        Poll::Ready(Ok(n))

write_all: a higher-level helper

A single write() may not write all bytes (partial write). write_all loops until everything is written:

#![allow(unused)]
fn main() {
async fn write_all(stream: &mut AsyncTcpStream, mut buf: &[u8]) -> io::Result<()> {
    while !buf.is_empty() {
        let n = stream.write(buf).await?;
        buf = &buf[n..];
    }
    Ok(())
}
}

Each .await might yield if the socket’s write buffer is full. The reactor wakes us when there’s space.

read_exact: fill the buffer completely

#![allow(unused)]
fn main() {
async fn read_exact(stream: &mut AsyncTcpStream, buf: &mut [u8]) -> io::Result<()> {
    let mut filled = 0;
    while filled < buf.len() {
        let n = stream.read(&mut buf[filled..]).await?;
        if n == 0 {
            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "connection closed"));
        }
        filled += n;
    }
    Ok(())
}
}

The AsyncTcpListener

#![allow(unused)]
fn main() {
struct AsyncTcpListener {
    inner: mio::net::TcpListener,
    token: Token,
}

impl AsyncTcpListener {
    fn poll_accept(&mut self, cx: &mut Context) -> Poll<io::Result<(AsyncTcpStream, SocketAddr)>> {
        match self.inner.accept() {
            Ok((stream, addr)) => {
                let token = reactor().register(&mut stream, Interest::READABLE);
                Poll::Ready(Ok((AsyncTcpStream { inner: stream, token }, addr)))
            }
            Err(e) if e.kind() == WouldBlock => {
                reactor().set_waker(self.token, cx.waker().clone());
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }
}
}

Same pattern: try accept → got connection? Ready. WouldBlock? Register and Pending.

Edge-triggered vs level-triggered

mio uses edge-triggered events by default: you get notified once when the state changes (not readable → readable). After handling the event, you must drain all available data, otherwise you might miss events.

Level-triggered:                    Edge-triggered:
  "socket IS readable"               "socket BECAME readable"
  (keeps firing until you read)       (fires once on transition)

  Safe: you can read one byte         Must read ALL available data
  and you'll be notified again.       or you might not be notified again.

In practice: after waker.wake(), your poll_read should loop reading until WouldBlock, then return Pending. This ensures you don’t miss data.

Cancellation safety

What happens if a ReadFuture is dropped mid-await?

#![allow(unused)]
fn main() {
let read_future = stream.read(&mut buf);
// Drop it before completion (e.g., select! picked the other branch)
drop(read_future);
}

With our design, this is safe: no data is lost because we haven’t actually read anything yet. The read only happens inside poll(). If we never poll, no bytes are consumed.

But read_exact is NOT cancellation-safe: if you’ve read 50 of 100 bytes and the future is dropped, those 50 bytes are gone. This is Lesson 24’s topic.

Exercises

Exercise 1: AsyncTcpStream

Implement AsyncTcpStream with poll_read and poll_write. Create read() and write() methods that return futures. Test with a simple echo server on your runtime from Lesson 5.

Exercise 2: read_exact and write_all

Build read_exact and write_all as async functions. Test: send a 10KB message in small chunks, verify read_exact receives all of it.

Exercise 3: Async echo server

Combine AsyncTcpListener + AsyncTcpStream + your executor. Accept connections, spawn a task per connection, echo data back. Test with multiple nc clients.

Exercise 4: Throughput test

Implement async fn copy(src, dst) that pipes bytes from one stream to another. Measure throughput: connect two streams, send 1MB, time it. Compare with blocking std::io::copy.

Lesson 12: Timers

Prerequisites: Lesson 9 (Reactor), Lesson 10 (Task Scheduling). Timers integrate with the reactor’s poll timeout and the executor’s task queue.

Real-life analogy: the kitchen timer rack

A chef has a rack of kitchen timers:

┌──────────────────────────────────────────────────────┐
│  Timer Rack (TimerHeap)                              │
│                                                      │
│  ⏰ Pasta: 8 min    (soonest → at the top)           │
│  ⏰ Sauce: 15 min                                    │
│  ⏰ Bread: 25 min                                    │
│                                                      │
│  Chef's loop:                                        │
│    1. Check: "which timer is closest?"  → Pasta (8m) │
│    2. Set a kitchen alarm for 8 minutes              │
│    3. Do other work until alarm rings                │
│    4. Alarm! → drain pasta                           │
│    5. Next closest: Sauce (15m) → set alarm for 7m  │
│    6. Continue...                                    │
└──────────────────────────────────────────────────────┘

The chef doesn’t check every timer every second. They set ONE alarm for the nearest timer, then work on other things. When it rings, they handle it and set the next alarm.

This is exactly how async timers work:

  • Timer rack = BinaryHeap of (Instant, Waker) entries
  • Nearest timer = the heap’s minimum (the peek())
  • Kitchen alarm = mio::Poll::poll(timeout) — the reactor blocks until this timeout
  • Alarm rings = poll returns, we check for expired timers and wake them

How timers integrate with the reactor

The reactor already has a wait() method that calls poll.poll(). We add a timeout:

                    Executor loop
                        │
                        ▼
        ┌───────────────────────────────┐
        │ Drain task queue, poll tasks  │
        │                               │
        │ Queue empty?                  │
        │   │                           │
        │   ▼                           │
        │ Check timer heap:             │
        │   Nearest deadline: 200ms     │
        │                               │
        │ reactor.wait(timeout: 200ms)  │──► mio::poll(200ms)
        │                               │    blocks at most 200ms
        │ ◄─────────────────────────────│
        │                               │
        │ Check expired timers:         │
        │   pasta timer expired → wake  │
        │                               │
        │ Back to draining queue        │
        └───────────────────────────────┘

Without timers, the reactor blocks forever (no timeout). With timers, the reactor blocks until either:

  1. An I/O event fires, OR
  2. The nearest timer expires

Whichever comes first.

The TimerHeap

#![allow(unused)]
fn main() {
use std::collections::BinaryHeap;
use std::cmp::Reverse;
use std::time::Instant;

struct TimerEntry {
    deadline: Instant,
    waker: Waker,
}

struct TimerHeap {
    heap: BinaryHeap<Reverse<TimerEntry>>,
}
}

We use Reverse so the heap is a min-heap — the soonest deadline is at the top.

Three operations:

#![allow(unused)]
fn main() {
impl TimerHeap {
    /// Add a timer. When deadline passes, the waker will be called.
    fn push(&mut self, deadline: Instant, waker: Waker) {
        self.heap.push(Reverse(TimerEntry { deadline, waker }));
    }

    /// How long until the next timer fires? Used as poll timeout.
    fn next_timeout(&self) -> Option<Duration> {
        self.heap.peek().map(|Reverse(entry)| {
            entry.deadline.saturating_duration_since(Instant::now())
        })
    }

    /// Wake all timers that have expired.
    fn fire_expired(&mut self) {
        let now = Instant::now();
        while let Some(Reverse(entry)) = self.heap.peek() {
            if entry.deadline <= now {
                let Reverse(entry) = self.heap.pop().unwrap();
                entry.waker.wake();
            } else {
                break;  // remaining timers are in the future
            }
        }
    }
}
}

The Sleep future

#![allow(unused)]
fn main() {
struct Sleep {
    deadline: Instant,
    registered: bool,
}

impl Sleep {
    fn new(duration: Duration) -> Self {
        Self {
            deadline: Instant::now() + duration,
            registered: false,
        }
    }
}

impl Future for Sleep {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        if Instant::now() >= self.deadline {
            Poll::Ready(())
        } else {
            if !self.registered {
                timer_heap().push(self.deadline, cx.waker().clone());
                self.registered = true;
            }
            Poll::Pending
        }
    }
}
}

Usage: sleep(Duration::from_secs(2)).await;

How it works end-to-end

1. Task calls sleep(2s).await
2. Sleep::poll() → deadline is in the future
   → push(deadline, waker) to timer heap
   → return Pending
3. Executor: queue empty
   → timer_heap.next_timeout() = 2s
   → reactor.wait(timeout: 2s)
   → mio::poll blocks for 2 seconds
4. poll returns (timeout expired)
   → timer_heap.fire_expired()
   → waker.wake() → task re-queued
5. Executor polls task again
   → Sleep::poll() → Instant::now() >= deadline
   → return Ready(())

The timeout combinator

Wrap any future with a deadline:

#![allow(unused)]
fn main() {
async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, TimedOut> {
    select! {
        result = future => Ok(result),
        _ = sleep(duration) => Err(TimedOut),
    }
}
}

Or without select, as a manual future:

#![allow(unused)]
fn main() {
struct Timeout<F> {
    future: F,
    sleep: Sleep,
}

impl<F: Future> Future for Timeout<F> {
    type Output = Result<F::Output, TimedOut>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        // Poll the inner future first
        if let Poll::Ready(val) = self.future.poll(cx) {
            return Poll::Ready(Ok(val));
        }
        // Then check the timer
        if let Poll::Ready(()) = self.sleep.poll(cx) {
            return Poll::Ready(Err(TimedOut));
        }
        Poll::Pending
    }
}
}

Exercises

Exercise 1: Sleep and ordering

Implement Sleep and the TimerHeap. Spawn three tasks:

#![allow(unused)]
fn main() {
spawn(async { sleep(Duration::from_secs(3)).await; println!("C: 3s"); });
spawn(async { sleep(Duration::from_secs(1)).await; println!("A: 1s"); });
spawn(async { sleep(Duration::from_secs(2)).await; println!("B: 2s"); });
}

They should print in order: A, B, C. Total time should be ~3 seconds (concurrent), not 6 (sequential).

Exercise 2: Timer integration with reactor

Modify your executor’s main loop:

  1. After draining the task queue, call timer_heap.fire_expired()
  2. Compute timer_heap.next_timeout()
  3. Pass it to reactor.wait(timeout)

Verify: spawn a sleep(1s) task alongside an I/O task. Both should work correctly — the reactor wakes up for either I/O events or timer expiry.

Exercise 3: Timeout combinator

Implement timeout(duration, future). Test:

#![allow(unused)]
fn main() {
let result = timeout(Duration::from_millis(100), sleep(Duration::from_secs(10))).await;
assert!(result.is_err());  // timed out!

let result = timeout(Duration::from_secs(10), sleep(Duration::from_millis(100))).await;
assert!(result.is_ok());   // completed in time
}

Exercise 4: Interval

Implement interval(duration) that yields () at regular intervals. Use it to print a heartbeat every 500ms while another task does a 3-second sleep.

#![allow(unused)]
fn main() {
spawn(async {
    let mut i = interval(Duration::from_millis(500));
    loop {
        i.tick().await;
        println!("heartbeat");
    }
});
spawn(async {
    sleep(Duration::from_secs(3)).await;
    println!("done, shutting down");
    // TODO: cancel the heartbeat task
});
}

Lesson 13: Channels

Prerequisites: Lesson 4 (Tasks), Lesson 10 (Task Scheduling). Channels are how tasks communicate — they need wakers to notify receivers.

Real-life analogy: the mailbox

Oneshot channel = a one-time letter:
┌────────────────┐                      ┌────────────────┐
│  Sender        │   sends ONE letter   │  Receiver      │
│                │ ────────────────────► │                │
│  "Here's your  │                      │  Waits at      │
│   blood test   │                      │  mailbox       │
│   result"      │                      │  until letter  │
│                │                      │  arrives       │
└────────────────┘                      └────────────────┘
  Used once, then both sides are done.

MPSC channel = a mailbox with multiple senders:
┌────────────────┐
│  Sender A      │──┐
└────────────────┘  │
┌────────────────┐  │    ┌────────────────┐
│  Sender B      │──┼───►│  Receiver      │
└────────────────┘  │    │  (one mailbox) │
┌────────────────┐  │    └────────────────┘
│  Sender C      │──┘
└────────────────┘
  Multiple producers, single consumer.
  Messages queue up if receiver is busy.

The key difference from std::sync::mpsc: async channels wake the receiver when a message arrives instead of blocking a thread.

How async channels work

The core pattern: shared state + waker.

┌────────────────────────────────────────────────┐
│  Shared State (Arc<Mutex<Inner>>)              │
│                                                │
│  queue: VecDeque<T>     ← messages waiting     │
│  rx_waker: Option<Waker> ← receiver's waker   │
│  closed: bool           ← sender dropped?      │
│                                                │
│  Sender writes:                                │
│    1. Lock inner                               │
│    2. Push message to queue                    │
│    3. If rx_waker is Some → wake it            │
│                                                │
│  Receiver reads:                               │
│    1. Lock inner                               │
│    2. Pop from queue → got message? Ready      │
│    3. Queue empty? Store waker, return Pending │
│                                                │
└────────────────────────────────────────────────┘

The sequence

Sender                  Shared State              Receiver
  │                        │                         │
  │                        │          poll() ◄───────┤
  │                        │  queue empty             │
  │                        │  store waker ◄───────────┤
  │                        │                Pending ──►
  │                        │                         │
  │  send("hello") ──────►│                         │
  │  push to queue         │                         │
  │  wake receiver ────────┼──► waker.wake()         │
  │                        │                         │
  │                        │          poll() ◄───────┤
  │                        │  pop "hello"             │
  │                        │         Ready("hello") ──►

Oneshot channel

The simplest async channel: one message, one sender, one receiver.

#![allow(unused)]
fn main() {
struct Inner<T> {
    value: Option<T>,
    rx_waker: Option<Waker>,
    closed: bool,
}

struct Sender<T> {
    inner: Arc<Mutex<Inner<T>>>,
}

struct Receiver<T> {
    inner: Arc<Mutex<Inner<T>>>,
}
}

Sender::send

#![allow(unused)]
fn main() {
impl<T> Sender<T> {
    fn send(self, value: T) -> Result<(), T> {
        let mut inner = self.inner.lock().unwrap();
        if inner.closed {
            return Err(value);  // receiver dropped
        }
        inner.value = Some(value);
        if let Some(waker) = inner.rx_waker.take() {
            waker.wake();  // notify receiver
        }
        Ok(())
    }
}
}

Note: send consumes self — you can only send once.

Receiver as a Future

#![allow(unused)]
fn main() {
impl<T: Unpin> Future for Receiver<T> {
    type Output = Result<T, RecvError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let mut inner = self.inner.lock().unwrap();
        if let Some(value) = inner.value.take() {
            Poll::Ready(Ok(value))
        } else if inner.closed {
            Poll::Ready(Err(RecvError))  // sender dropped without sending
        } else {
            inner.rx_waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
}

Usage

#![allow(unused)]
fn main() {
let (tx, rx) = oneshot::channel();
spawn(async move { tx.send(42).unwrap(); });
let value = rx.await.unwrap();  // 42
}

MPSC channel

Multiple senders, one receiver. Messages buffer up in a queue.

Key differences from oneshot:

  • Queue instead of single value
  • Sender is Clone — track how many senders exist
  • Bounded vs unbounded — bounded adds backpressure (Lesson 23)
  • All senders drop → channel closed

Closed channel detection

When the sender drops without sending, the receiver should be woken with an error:

#![allow(unused)]
fn main() {
impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        let mut inner = self.inner.lock().unwrap();
        inner.closed = true;
        if let Some(waker) = inner.rx_waker.take() {
            waker.wake();  // wake receiver so it sees the closure
        }
    }
}
}

Exercises

Exercise 1: Oneshot channel

Implement oneshot::channel()(Sender<T>, Receiver<T>).

  • Sender has send(value) that consumes self
  • Receiver implements Future
  • Test: send 42 from one task, receive in another

Exercise 2: Oneshot drop detection

Test that dropping the sender without sending wakes the receiver with an error:

#![allow(unused)]
fn main() {
let (tx, rx) = oneshot::channel::<i32>();
drop(tx);
assert!(rx.await.is_err());
}

Exercise 3: MPSC channel

Implement mpsc::channel()(Sender<T>, Receiver<T>).

  • Sender is Clone, has send(value)
  • Receiver has async fn recv() → Option<T> (None when all senders dropped)
  • Test: 3 senders each send 10 messages, receiver collects all 30

Exercise 4: Bounded MPSC

Add capacity to your MPSC: mpsc::channel(cap).

  • send() returns Pending when the queue is full
  • recv() wakes one blocked sender when it pops a message
  • Test: channel(2), send 3 messages — third blocks until receiver pops one

Lesson 14: Work-Stealing Scheduler

Prerequisites: Lesson 10 (Task Scheduling), Lesson 13 (Channels). You need a working single-threaded executor before going multi-threaded.

Real-life analogy: supermarket checkout lanes

Single-threaded executor = one checkout lane:
┌──────────────────────────────────────────┐
│  Lane 1: [customer] [customer] [customer]│  One cashier.
│                                          │  Long line.
└──────────────────────────────────────────┘

Multi-threaded (no stealing) = N lanes, unbalanced:
┌──────────────────────────────────────────┐
│  Lane 1: [customer] [customer] [customer]│  Busy!
│  Lane 2: [customer]                      │  Almost idle.
│  Lane 3:                                 │  Empty!
│  Lane 4: [customer] [customer]           │
└──────────────────────────────────────────┘
  Some lanes are jammed while others are empty.

Work-stealing = lanes help each other:
┌──────────────────────────────────────────┐
│  Lane 1: [customer] [customer]           │
│  Lane 2: [customer] ←── stole from lane 1│
│  Lane 3: [customer] ←── stole from lane 1│
│  Lane 4: [customer]                      │
└──────────────────────────────────────────┘
  Idle cashiers walk to busy lanes and take customers.
  Result: balanced load, shorter wait times.

Why work-stealing?

With N worker threads, you need a strategy for distributing tasks:

Strategy 1: Shared queue (simple, bad)
  All workers pop from ONE queue.
  Problem: lock contention. N threads fighting over one Mutex.

  ┌────────┐  ┌────────┐  ┌────────┐
  │Worker 0│  │Worker 1│  │Worker 2│
  └───┬────┘  └───┬────┘  └───┬────┘
      │           │           │
      └─────┬─────┴─────┬────┘
            │            │
       ┌────▼────────────▼────┐
       │  Shared Queue (Mutex)│  ← contention!
       └──────────────────────┘

Strategy 2: Local queues + work stealing (complex, fast)
  Each worker has its own queue. No contention for local work.
  When idle, steal from a random busy worker.

  ┌────────┐  ┌────────┐  ┌────────┐
  │Worker 0│  │Worker 1│  │Worker 2│
  │[t1,t2] │  │[t3]    │  │[]      │ ← empty, will steal
  └────────┘  └────────┘  └───┬────┘
                               │ steal half from Worker 0
                               ▼
  ┌────────┐  ┌────────┐  ┌────────┐
  │Worker 0│  │Worker 1│  │Worker 2│
  │[t1]    │  │[t3]    │  │[t2]    │ ← balanced!
  └────────┘  └────────┘  └────────┘

Architecture

┌────────────────────────────────────────────────────────┐
│  Work-Stealing Runtime                                 │
│                                                        │
│  ┌──────────────────────────────────────────────────┐  │
│  │  Global Queue (for overflow + cross-thread spawn)│  │
│  │  Arc<Mutex<VecDeque<Arc<Task>>>>                  │  │
│  └──────────┬──────────────┬──────────┬─────────────┘  │
│             │              │          │                 │
│  ┌──────────▼───┐  ┌──────▼──────┐  ┌▼────────────┐  │
│  │  Worker 0    │  │  Worker 1   │  │  Worker 2   │  │
│  │              │  │             │  │             │  │
│  │  local queue │  │ local queue │  │ local queue │  │
│  │  [t1, t2]   │  │ [t3]        │  │ []          │  │
│  │              │  │             │  │   ↑ steal   │  │
│  │  thread 0    │  │  thread 1   │  │  thread 2   │  │
│  └──────────────┘  └─────────────┘  └─────────────┘  │
│                                                        │
│  Each worker:                                          │
│    1. Pop from local queue                             │
│    2. If empty → check global queue                    │
│    3. If empty → steal from random worker              │
│    4. If nothing → park (sleep)                        │
└────────────────────────────────────────────────────────┘

The worker loop

#![allow(unused)]
fn main() {
fn worker_loop(worker_id: usize, workers: &[Worker], global: &GlobalQueue) {
    loop {
        // 1. Try local queue first (no contention)
        if let Some(task) = self.local_queue.pop() {
            poll(task);
            continue;
        }

        // 2. Try global queue (shared, needs lock)
        if let Some(task) = global.pop() {
            poll(task);
            continue;
        }

        // 3. Try stealing from a random peer
        let victim = random_worker(workers, worker_id);
        if let Some(task) = victim.local_queue.steal() {
            poll(task);
            continue;
        }

        // 4. Nothing to do — park
        thread::park();
    }
}
}

The Chase-Lev deque

The data structure that makes work-stealing efficient:

Owner (the worker thread):
  push to back:   O(1), no synchronization
  pop from back:  O(1), no synchronization (LIFO — most recent first)

Stealers (other workers):
  steal from front: O(1), uses atomic CAS (compare-and-swap)

┌──────────────────────────────────┐
│  front (stealers)                │
│  ↓                               │
│  [task_A] [task_B] [task_C]      │
│                              ↑   │
│                    back (owner)   │
└──────────────────────────────────┘

Owner pushes/pops from back (fast, no lock).
Stealers steal from front (atomic, minimal contention).

In Rust, use the crossbeam-deque crate:

#![allow(unused)]
fn main() {
use crossbeam_deque::{Worker, Stealer};

let worker = Worker::new_fifo();
let stealer = worker.stealer();  // can be cloned to other threads

worker.push(task);         // owner pushes
worker.pop();              // owner pops
stealer.steal();           // other thread steals
stealer.steal_batch(&other_worker);  // steal half
}

Spawn: where does the task go?

spawn(future):
  Am I on a worker thread?
    Yes → push to my local queue (fast path)
    No  → push to global queue (cross-thread spawn)

Tokio uses thread-local storage to detect if spawn is called from a worker thread. If so, the task goes directly into the local queue (no lock, no contention).

Parking and unparking

When a worker has nothing to do, it should sleep (not busy-poll):

#![allow(unused)]
fn main() {
// Worker with nothing to do:
thread::park();  // sleep, 0% CPU

// When a new task is spawned or a waker fires:
worker_thread.unpark();  // wake up!
}

The tricky part: deciding WHICH worker to unpark. Options:

  • Unpark one random idle worker (tokio’s approach)
  • Unpark all idle workers (simple but thundering herd)

Exercises

Exercise 1: Two-worker runtime

Build a work-stealing runtime with 2 worker threads:

  1. Each worker has a VecDeque local queue
  2. A shared global queue for overflow
  3. Workers pop local → pop global → steal from peer → park
  4. Spawn 100 tasks that record std::thread::current().id()
  5. Verify both threads ran tasks

Exercise 2: Steal half

Implement “steal half” — when stealing, take half the victim’s local queue:

#![allow(unused)]
fn main() {
fn steal_batch(victim: &LocalQueue, thief: &mut LocalQueue) {
    let count = victim.len() / 2;
    for _ in 0..count {
        if let Some(task) = victim.steal_front() {
            thief.push_back(task);
        }
    }
}
}

Test: push 50 tasks to Worker 0, none to Worker 1. Run. Assert Worker 1 processed some.

Exercise 3: Benchmark

Compare throughput:

  • Single-threaded executor (Lesson 10): spawn 10K tasks, measure time
  • Work-stealing with 4 workers: same 10K tasks

Tasks should be trivial (increment atomic counter). Print tasks/second for each.

Exercise 4: crossbeam-deque

Replace your VecDeque local queue with crossbeam_deque::Worker. Compare performance. The crossbeam deque uses lock-free atomics instead of a Mutex — it should be faster under contention.

Add to Cargo.toml: crossbeam-deque = "0.8"

Lesson 15: Select Internals

Prerequisites: Lesson 7 (Combinators), Lesson 13 (Channels). You built a basic MySelect in Lesson 7. Now we go deeper into cancellation semantics.

Real-life analogy: race to the airport

You need to get to the airport. You have two options running in parallel:

┌────────────────┐     ┌────────────────┐
│ Option A: Taxi │     │ Option B: Bus  │
│ ETA: unknown   │     │ ETA: 20 min    │
└───────┬────────┘     └───────┬────────┘
        │                      │
        │    ... waiting ...   │
        │                      │
        │                      ▼
        │               Bus arrives!
        │               You get on.
        │
        ▼
  Cancel taxi!  ← this is the DROP
  (taxi driver goes home)

Select = race multiple futures, take the winner, cancel the losers.

The “cancel” part is where it gets tricky. What if the taxi was already en route? What if it had already picked you up but you hadn’t noticed? Cancellation has consequences.

How select works

#![allow(unused)]
fn main() {
async fn select<A, B>(fut_a: A, fut_b: B) -> Either<A::Output, B::Output>
where A: Future, B: Future
{
    // Internally, each poll():
    //   1. Poll fut_a → Ready? return Left(value), DROP fut_b
    //   2. Poll fut_b → Ready? return Right(value), DROP fut_a
    //   3. Both Pending → Pending
}
}
poll #1:
  poll A → Pending
  poll B → Pending
  → return Pending

poll #2:
  poll A → Pending
  poll B → Ready(value)
  → drop A (cancelled!)
  → return Right(value)

The cancellation problem

When a future is dropped mid-execution, any work it did between its last Pending and the drop is lost:

Cancellation-SAFE (no data loss):
  recv() → Pending (no message consumed)
         → dropped (nothing lost)

Cancellation-UNSAFE (data loss):
  read_exact(buf, 100 bytes)
         → read 50 bytes, Pending (50 bytes consumed from socket)
         → dropped (those 50 bytes are GONE, next read misses them)
┌──────────────────────────────────────────────────────┐
│  Cancellation Safety Rules                           │
│                                                      │
│  SAFE to use in select:                              │
│    ✓ channel.recv()    — no data consumed until Ready│
│    ✓ sleep()           — no side effects             │
│    ✓ listener.accept() — no connection consumed      │
│                                                      │
│  UNSAFE in select:                                   │
│    ✗ read_exact()      — partial reads lost          │
│    ✗ read_line()       — partial line lost           │
│    ✗ collect from stream — partial results lost      │
│                                                      │
│  Rule: if dropping after Pending loses data          │
│        that can't be recovered, it's UNSAFE.         │
└──────────────────────────────────────────────────────┘

This is covered in depth in Lesson 24 (Cancellation Safety).

Polling bias

Naive select always polls A first:

#![allow(unused)]
fn main() {
// BIASED: A always gets polled first
if let Ready(v) = fut_a.poll(cx) { return Left(v); }
if let Ready(v) = fut_b.poll(cx) { return Right(v); }
}

If A is always ready, B is starved — never gets a chance. Solutions:

  • Random order: pick which to poll first randomly each time
  • Round-robin: alternate A-first and B-first
  • tokio::select! has a biased; option to explicitly choose

Fuse: poll-after-completion guard

The Future contract says: don’t poll after Ready. But in a select loop, you might accidentally poll a completed future:

#![allow(unused)]
fn main() {
loop {
    select! {
        msg = channel.recv() => handle(msg),
        _ = timer.tick() => println!("tick"),
    }
    // After timer fires, the next iteration polls timer again
    // But it already returned Ready — UB!
}
}

Fuse wraps a future to return Pending forever after completing:

#![allow(unused)]
fn main() {
struct Fuse<F> {
    future: Option<F>,  // None after completion
}

impl<F: Future> Future for Fuse<F> {
    fn poll(...) -> Poll<...> {
        match &mut self.future {
            Some(f) => match f.poll(cx) {
                Ready(v) => { self.future = None; Ready(v) }
                Pending => Pending
            }
            None => Pending,  // already completed, safe to poll
        }
    }
}
}

Exercises

Exercise 1: Binary select

Implement select(fut_a, fut_b) -> Either<A, B>:

  • Poll both futures
  • Return whichever is Ready first
  • Drop the loser

Test: select(sleep(100ms), sleep(500ms)) should return Left after ~100ms.

Exercise 2: Cancellation demo

Create a CountingFuture that increments an Arc<AtomicU32> each time it’s polled. Select it against an immediately-ready future. Assert the counter shows it was polled exactly once before being dropped.

Add a Drop impl that prints “cancelled!” to make the drop visible.

Exercise 3: Select loop with channel

#![allow(unused)]
fn main() {
let (tx, mut rx) = mpsc::channel();
let mut interval = Interval::new(Duration::from_millis(200));

loop {
    select! {
        msg = rx.recv() => match msg {
            Some(m) => println!("got: {m}"),
            None => break,  // channel closed
        },
        _ = interval.tick() => println!("tick"),
    }
}
}

Implement this pattern. Spawn a task that sends 5 messages with delays, then drops the sender. The select loop should print ticks between messages and exit when the channel closes.

Exercise 4: Fuse

Implement the Fuse wrapper. Test: fuse a future that returns Ready(42). Poll it three times. First poll returns Ready(42), second and third return Pending (not UB).

Project 2: Multi-threaded Chat Server

Prerequisites: Lessons 9-15 (reactor, scheduling, async I/O, timers, channels, work-stealing, select). This project combines them all.

Overview

Build a fully working chat server on top of the async runtime you built in Lessons 9-15. No tokio, no async-std — just your reactor, executor, channels, and timers. This project proves your runtime can handle real concurrent I/O.

Architecture

┌─────────────────────────────────────────────────────────┐
│  Chat Server (your runtime)                              │
│                                                          │
│  ┌────────────────────────────────────────────────────┐  │
│  │  Accept Loop (one task)                            │  │
│  │    TcpListener.accept() → spawn client task        │  │
│  └──────┬─────────────────────────────────────────────┘  │
│         │ spawn                                          │
│         ▼                                                │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │ Client A     │  │ Client B     │  │ Client C     │  │
│  │              │  │              │  │              │  │
│  │ read loop:   │  │ read loop:   │  │ read loop:   │  │
│  │  select! {   │  │  select! {   │  │  select! {   │  │
│  │   stream.read│  │   stream.read│  │   stream.read│  │
│  │   inbox.recv │  │   inbox.recv │  │   inbox.recv │  │
│  │  }           │  │  }           │  │  }           │  │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘  │
│         │                 │                  │           │
│         └────────┬────────┴────────┬─────────┘           │
│                  │                 │                      │
│           ┌──────▼─────────────────▼──────┐              │
│           │  Broker Task                  │              │
│           │  (owns the client map)        │              │
│           │                               │              │
│           │  events channel:              │              │
│           │    Join(id, nick, inbox_tx)   │              │
│           │    Leave(id)                  │              │
│           │    Message(id, text)          │              │
│           │                               │              │
│           │  On Message: fan out to all   │              │
│           │  client inboxes except sender │              │
│           └───────────────────────────────┘              │
└──────────────────────────────────────────────────────────┘

Why a broker?

Instead of sharing a HashMap<ClientId, Sender> behind a Mutex (which every client would lock), we use a broker task that owns the state exclusively. Clients communicate with the broker through a channel.

Without broker:                       With broker:
  Client A locks HashMap              Client A sends Event to broker
  Client B waits for lock             Client B sends Event to broker
  Client C waits for lock             Broker processes events sequentially
  → lock contention                   → no contention, no Mutex

What you’ll build

A TCP chat server where:

  • Multiple clients connect via telnet or nc
  • Messages from any client are broadcast to all other connected clients
  • Each client has a nickname (default: user-N), changeable with /nick name
  • The server detects disconnects (EOF or broken pipe) and announces departures
  • The server runs on your work-stealing runtime across multiple threads

Feature list

  • Accept loopAsyncTcpListener accepts connections and spawns a task per client
  • Broadcast – an mpsc channel per client; incoming messages fan out to every other client’s channel
  • Commands/nick <name> changes display name, /who lists connected users, /quit disconnects
  • Disconnect detection – read returning 0 bytes or an error triggers cleanup and a “user left” broadcast
  • Graceful shutdown – Ctrl-C sets a flag; the accept loop exits and all client tasks drain

Key concepts

  • Shared state – a HashMap<ClientId, ClientHandle> behind an async-aware mutex or accessed from a dedicated broker task
  • Broker pattern – one task owns the client map and receives events (join / leave / message) over a channel, avoiding shared mutable state
  • Backpressure – bounded per-client channels prevent a slow reader from exhausting memory
  • Cancellation – when a client disconnects, its task is dropped; select ensures no leaked futures
  • Testing – spawn the server in a background task, connect with multiple AsyncTcpStream clients from test tasks, and assert message delivery

Exercises

  1. Basic chat – implement the accept loop, per-client read loop, and broadcast. Connect two nc sessions and verify messages flow both ways.

  2. Commands and nicks – add /nick, /who, and /quit. Verify that broadcast messages show the updated nickname after a /nick change.

  3. Load test – spawn 100 client tasks that each send 10 messages. Assert every client receives all 900 messages from others (100 clients x 10 messages - own 10). Measure total time on your work-stealing runtime.

Lesson 16: Tokio Architecture

Prerequisites: Courses 1-2. You’ve built your own runtime — now see how the production one is designed.

Real-life analogy: a factory

┌──────────────────────────────────────────────────────────┐
│  Factory (Tokio Runtime)                                 │
│                                                          │
│  ┌────────────────────────────────────────────────────┐  │
│  │  Factory Floor (Scheduler)                         │  │
│  │                                                    │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐          │  │
│  │  │ Worker 1 │ │ Worker 2 │ │ Worker 3 │          │  │
│  │  │ (thread) │ │ (thread) │ │ (thread) │          │  │
│  │  └──────────┘ └──────────┘ └──────────┘          │  │
│  │                                                    │  │
│  │  Work-stealing: idle workers take from busy ones   │  │
│  └────────────────────────┬───────────────────────────┘  │
│                           │                              │
│  ┌────────────────────────▼───────────────────────────┐  │
│  │  Utility Room (Drivers)                            │  │
│  │                                                    │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐          │  │
│  │  │ I/O      │ │ Timer    │ │ Signal   │          │  │
│  │  │ Driver   │ │ Driver   │ │ Driver   │          │  │
│  │  │ (mio)    │ │ (wheel)  │ │ (unix)   │          │  │
│  │  └──────────┘ └──────────┘ └──────────┘          │  │
│  │                                                    │  │
│  │  Handles external events → wakes tasks             │  │
│  └────────────────────────────────────────────────────┘  │
│                                                          │
│  Front Office: Runtime::block_on(), spawn(), Handle      │
└──────────────────────────────────────────────────────────┘

The factory has:

  • Workers (threads) on the factory floor processing tasks
  • A utility room (drivers) that monitors external events (power, deliveries, alarms)
  • A front office (API) where you submit work orders

Tokio’s internal structure

tokio::runtime::Runtime
  │
  ├── Scheduler
  │     ├── current_thread::Scheduler (single-threaded)
  │     └── multi_thread::Scheduler (work-stealing)
  │           ├── Worker 0 (thread + local queue)
  │           ├── Worker 1
  │           └── Worker N
  │
  ├── Driver (layered)
  │     ├── Signal Driver (outermost)
  │     │     └── Time Driver
  │     │           └── I/O Driver (innermost, wraps mio::Poll)
  │     │
  │     │  Each park() call propagates down:
  │     │    signal.park() → time.park() → io.park() → mio.poll()
  │     │
  │     │  On return, each layer checks its events:
  │     │    mio events → wake I/O tasks
  │     │    expired timers → wake timer tasks
  │     │    signals → wake signal listeners
  │
  └── Handle (cloneable, Send + Sync)
        ├── spawn() — submit a task from anywhere
        ├── block_on() — enter the runtime on current thread
        └── spawn_blocking() — run sync code on a dedicated thread pool

current_thread vs multi_thread

current_thread:                      multi_thread:
┌──────────────────┐                 ┌──────────────────┐
│  One thread      │                 │  N threads       │
│                  │                 │                  │
│  ┌────────────┐  │                 │  ┌────┐ ┌────┐  │
│  │  Scheduler │  │                 │  │ W0 │ │ W1 │  │
│  │  + Driver  │  │                 │  └────┘ └────┘  │
│  │  (same     │  │                 │  ┌────┐ ┌────┐  │
│  │   thread)  │  │                 │  │ W2 │ │ W3 │  │
│  └────────────┘  │                 │  └────┘ └────┘  │
│                  │                 │                  │
│  Pros:           │                 │  Pros:           │
│  - No Send req   │                 │  - Uses all CPUs │
│  - No sync cost  │                 │  - Work stealing │
│  - Deterministic │                 │  - Production    │
│                  │                 │                  │
│  Cons:           │                 │  Cons:           │
│  - One CPU core  │                 │  - Send required │
│  - Can't use     │                 │  - More complex  │
│    spawn()       │                 │  - Non-determ.   │
│    (only         │                 │                  │
│    spawn_local)  │                 │                  │
└──────────────────┘                 └──────────────────┘

When to use each

  • current_thread: tests, WASM, simple CLI tools, apps with !Send types
  • multi_thread: web servers, database proxies, anything with high concurrency

The Runtime Builder

#![allow(unused)]
fn main() {
// Multi-threaded (default for #[tokio::main])
let rt = tokio::runtime::Builder::new_multi_thread()
    .worker_threads(4)        // default: num_cpus
    .max_blocking_threads(512) // for spawn_blocking
    .enable_io()              // I/O driver (mio)
    .enable_time()            // timer driver
    .thread_name("my-worker")
    .on_thread_start(|| println!("worker started"))
    .build()?;

// Single-threaded
let rt = tokio::runtime::Builder::new_current_thread()
    .enable_all()
    .build()?;
}

What enable_io() and enable_time() do

enable_io():   creates mio::Poll, starts I/O driver
               without it: TcpStream, UdpSocket, etc. panic

enable_time(): creates timer wheel, starts time driver
               without it: tokio::time::sleep panics

enable_all():  enables both I/O and time

The Driver stack

The driver is layered — each layer wraps the one below:

park() call flow:

  SignalDriver::park()
    │
    ├── check for pending signals
    │
    └── TimeDriver::park()
          │
          ├── compute timeout from next timer deadline
          │
          └── IoDriver::park(timeout)
                │
                └── mio::Poll::poll(timeout)
                      │
                      └── OS: kqueue / epoll (blocks)

return flow:

  mio returns events
    │
    └── IoDriver: wake I/O tasks
          │
          └── TimeDriver: fire expired timers, wake timer tasks
                │
                └── SignalDriver: dispatch signals

This is why enable_io() and enable_time() matter — without them, those driver layers don’t exist.

Handle: spawning from anywhere

#![allow(unused)]
fn main() {
let rt = Runtime::new()?;
let handle = rt.handle().clone();  // Handle is Send + Sync + Clone

// From another thread:
handle.spawn(async { /* runs on the runtime */ });

// From inside async code:
let handle = tokio::runtime::Handle::current();
handle.spawn(async { /* also works */ });
}

Tracing through tokio source

A fun exercise: trace tokio::spawn(my_future) through the source code.

tokio::spawn(future)
  → context::spawn(future)        // get the current runtime context
  → scheduler.spawn(task)          // submit to the scheduler
  → worker.schedule(task)          // push to local queue (or global)
  → if worker idle: worker.unpark() // wake a sleeping worker

Exercises

Exercise 1: current_thread echo server

Build a TCP echo server on current_thread. Use spawn_local for tasks. Verify it works but only uses one CPU core.

Exercise 2: multi_thread thread distribution

Spawn 100 tasks on a multi_thread runtime with 4 workers. Each task records std::thread::current().id(). Print how many tasks ran on each thread — should be roughly balanced.

Exercise 3: Missing driver

Create a runtime with only enable_io() (no time). Try tokio::time::sleep(1s).await. What error do you get?

Then create with only enable_time() (no I/O). Try TcpListener::bind. What happens?

Exercise 4: Handle from another thread

#![allow(unused)]
fn main() {
let rt = Runtime::new()?;
let handle = rt.handle().clone();

std::thread::spawn(move || {
    handle.spawn(async {
        println!("running on tokio from a std thread!");
    });
});
}

Exercise 5: Throughput comparison

Build a simple echo server. Benchmark with nc or a load tester:

  • current_thread with 1000 concurrent connections
  • multi_thread (4 workers) with 1000 concurrent connections

Measure requests/second. How much does multi_thread help?

Lesson 17: Tokio’s I/O Driver

Prerequisites: Lesson 9 (Reactor), Lesson 11 (AsyncRead/AsyncWrite), Lesson 16 (Tokio Architecture).

Real-life analogy: the switchboard operator

Old telephone system:
┌──────────────────────────────────────────────────┐
│  Switchboard (I/O Driver)                        │
│                                                  │
│  Plug board:                                     │
│    Jack 1 → Room 101 (Alice's phone)             │
│    Jack 2 → Room 205 (Bob's phone)               │
│    Jack 3 → (empty)                              │
│                                                  │
│  Operator loop:                                  │
│    1. Watch all jacks for incoming signals        │
│    2. Jack 2 lights up → "Bob has a call!"       │
│    3. Ring Bob's room (wake his task)            │
│                                                  │
│  Registration:                                   │
│    New guest checks in → operator plugs a jack   │
│    Guest checks out → operator unplugs           │
└──────────────────────────────────────────────────┘

Tokio’s I/O driver is the switchboard:

  • Jacks = file descriptors registered with mio
  • Plugging in = Registration::new()mio::Poll::register()
  • Light up = readiness event from kqueue/epoll
  • Ring the room = waker.wake()

How tokio wraps mio

Your Lesson 9 reactor and tokio’s I/O driver do the same thing — but tokio adds a layer of abstraction:

Your reactor (Lesson 9):              Tokio's I/O driver:
  mio::Poll                             mio::Poll
  HashMap<Token, Waker>                 Slab<ScheduledIo>
  register/deregister manually          Registration handles lifecycle
  you call wait()                       driver calls park()

The Registration type

In tokio, every I/O resource (TcpStream, UdpSocket, etc.) holds a Registration:

#![allow(unused)]
fn main() {
// Simplified from tokio source
struct Registration {
    handle: Handle,     // reference to the I/O driver
    token: usize,       // slab index for event dispatch
}
}

When you create a tokio::net::TcpStream, it calls Registration::new():

  1. Registers the fd with mio::Poll
  2. Allocates a slot in the driver’s slab
  3. Returns a Registration that derefs to wake/interest methods

The readiness flow

Application:  stream.read(&mut buf).await

Tokio TcpStream::read():
  │
  ├── poll_read_ready()              // check if driver says readable
  │     │
  │     ├── already ready? → try read()
  │     │
  │     └── not ready? → register waker with driver
  │                      return Pending
  │
  ├── (later) I/O driver: mio::Poll returns event
  │     │
  │     └── driver looks up ScheduledIo by token
  │         calls waker.wake()
  │
  └── re-polled: poll_read_ready() → ready!
      try read() → Ok(n) → Ready(n)

Interest and Ready

#![allow(unused)]
fn main() {
// Interest: what events you want
Interest::READABLE    // want to know when data is available
Interest::WRITABLE    // want to know when write buffer has space
Interest::READABLE | Interest::WRITABLE  // both

// Ready: what actually happened
if ready.is_readable() { /* data available */ }
if ready.is_writable() { /* can write */ }
if ready.is_read_closed() { /* peer closed their write half */ }
if ready.is_write_closed() { /* peer closed their read half */ }
}

Spurious wakeups

The driver might wake you when there’s nothing to do:

Driver says: "fd 5 is readable!"
You call read(buf):  → WouldBlock  (nothing actually available)

This happens because:

  • Edge-triggered events can be delivered before data fully arrives
  • Multiple events can coalesce
  • The OS might report readiness optimistically

Your I/O code MUST handle WouldBlock by returning Pending and re-registering — never assume the operation will succeed just because you were woken.

Exercises

Exercise 1: mio echo server

Build a raw mio echo server (no tokio). Register a listener, accept connections, register each for READABLE, echo data. This is what tokio does internally.

Exercise 2: Tokio echo server

Convert the mio echo server to tokio. Observe how much code disappears — tokio handles registration, wakers, and the event loop for you.

Exercise 3: Readiness exploration

#![allow(unused)]
fn main() {
use tokio::net::TcpStream;
use tokio::io::Interest;

let stream = TcpStream::connect("127.0.0.1:8080").await?;
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
println!("readable: {}, writable: {}", ready.is_readable(), ready.is_writable());
}

Connect to a server, check readiness. Write data, check again. Close the peer, check for is_read_closed().

Exercise 4: Spurious wakeup handling

Write a stream wrapper that logs every WouldBlock. Connect to a server, start reading. How many WouldBlocks do you see? This shows why the retry loop is essential.

Lesson 18: tokio::sync Internals — Mutex, RwLock, Semaphore, Notify

Prerequisites: Lessons 13-17. You know how channels work — now see the lower-level primitives.

Real-life analogy: bathroom keys at a gas station

┌─────────────────────────────────────────────────────────────────┐
│  Gas Station Bathroom Access                                    │
│                                                                 │
│  Mutex (one key):                                               │
│    🔑 One key on the counter. Take it, use bathroom, return.   │
│    If key is gone → WAIT (don't block the road, sit in car).   │
│                                                                 │
│  Semaphore (multiple keys):                                     │
│    🔑🔑🔑 Three keys → three stalls. Fourth person waits.      │
│    Each key = one permit.                                       │
│                                                                 │
│  RwLock (museum analogy):                                       │
│    Many visitors can READ the exhibit simultaneously.           │
│    But the curator needs EXCLUSIVE access to rearrange.         │
│    Readers share, writers exclude everyone.                     │
│                                                                 │
│  Notify (doorbell):                                             │
│    Ring the bell → someone waiting inside wakes up.             │
│    No data transferred, just "hey, something happened."         │
└─────────────────────────────────────────────────────────────────┘

Why not std::sync in async code?

std::sync::Mutex::lock()              tokio::sync::Mutex::lock()
┌──────────────────────┐              ┌──────────────────────┐
│ Thread blocks (OS)   │              │ Task yields (Pending) │
│ Worker thread frozen │              │ Worker thread FREE    │
│ Other tasks starve   │              │ Runs other tasks      │
│ Deadlock risk!       │              │ Wakes when lock free  │
└──────────────────────┘              └──────────────────────┘

Rule of thumb:
  - Lock NOT held across .await → std::sync::Mutex is fine (faster)
  - Lock held across .await     → MUST use tokio::sync::Mutex

Mutex vs RwLock

AspectMutex<T>RwLock<T>
ReadersOne at a timeMany concurrent
WritersExclusiveExclusive
OverheadLowerHigher (tracking readers)
Use whenWrites are frequentReads dominate
#![allow(unused)]
fn main() {
use tokio::sync::RwLock;

let lock = RwLock::new(HashMap::new());

// Many tasks can read concurrently
let guard = lock.read().await;

// Only one task can write
let mut guard = lock.write().await;
guard.insert("key", "value");
}

Semaphore — the universal primitive

Semaphore(3):       ┌───┬───┬───┐
  Available permits │ ● │ ● │ ● │
                    └───┴───┴───┘

Task A acquires 1:  ┌───┬───┬───┐
                    │   │ ● │ ● │  (2 left)
                    └───┴───┴───┘

Task B acquires 2:  ┌───┬───┬───┐
                    │   │   │   │  (0 left)
                    └───┴───┴───┘

Task C acquires 1:  WAITS... (no permits)
Task A drops:       Task C wakes up, gets permit

Semaphore is the building block for:

  • Rate limiting — permits = max concurrent operations
  • Connection pooling — permits = max connections
  • Bounded channels — tokio uses it internally
#![allow(unused)]
fn main() {
let sem = Arc::new(Semaphore::new(10));
let permit = sem.acquire().await?; // blocks if 10 already held
do_work().await;
drop(permit); // returns permit, wakes a waiter
}

Notify — async signaling

Notify is the simplest primitive: no data, just “wake up.”

Notifier                    Waiter
   │                          │
   │                          ├── notified().await  (Pending)
   │                          │       ...sleeping...
   ├── notify_one() ──────────┤
   │                          ├── wakes up!

Building a simple async mutex with Notify

#![allow(unused)]
fn main() {
use tokio::sync::Notify;
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, Ordering};

struct SimpleMutex<T> {
    locked: AtomicBool,
    notify: Notify,
    data: UnsafeCell<T>,
}

// The pattern: spin on atomic + sleep on Notify
// Real tokio::sync::Mutex uses a wait queue, not spinning
}

OwnedPermit and OwnedMutexGuard

When you need to move a guard into a spawned task:

#![allow(unused)]
fn main() {
let mutex = Arc::new(tokio::sync::Mutex::new(0));
let owned_guard = mutex.clone().lock_owned().await; // 'static lifetime
tokio::spawn(async move {
    // owned_guard is Send + 'static — works in spawned tasks
    drop(owned_guard);
});
}

Exercises

Exercise 1: Shared counter with tokio::sync::Mutex

Spawn 100 tasks that each increment a tokio::sync::Mutex<u64> counter. Verify the final count is 100.

Exercise 2: Rate limiter with Semaphore

Create a Semaphore::new(5). Spawn 20 tasks that each acquire a permit, do tokio::time::sleep(100ms), then release. Verify at most 5 run concurrently at any time.

Exercise 3: Producer-consumer with Notify

Build a queue using std::sync::Mutex<VecDeque<T>> + Notify. The producer pushes items and calls notify_one(). The consumer loops on notified().await + try-pop.

Exercise 4: Build a simple async Mutex using Notify

Implement a SimpleMutex<T> that uses AtomicBool for state and Notify for waking. It won’t be production-quality but demonstrates the concept.

Lesson 19: tokio::net — TcpListener, TcpStream, and the Reactor

Prerequisites: Lessons 8-9 (async I/O, reactor), Lesson 17 (I/O driver).

Real-life analogy: phone vs walkie-talkie

TCP = Phone call                       UDP = Walkie-talkie
┌─────────────────────┐                ┌─────────────────────┐
│ 1. Dial (connect)   │                │ 1. Press button     │
│ 2. Ring (SYN/ACK)   │                │ 2. Talk (send_to)   │
│ 3. "Hello?" (est.)  │                │ 3. Release button   │
│ 4. Conversation     │                │                     │
│ 5. "Bye" (FIN)      │                │ No connection.      │
│                      │                │ No guarantee.       │
│ Reliable, ordered,   │                │ Fast, unordered,    │
│ bidirectional stream │                │ fire-and-forget     │
└─────────────────────┘                └─────────────────────┘

How tokio::net wraps mio

Your code                  Tokio                         OS
─────────────────────────────────────────────────────────────────
TcpStream::connect()  →  mio::net::TcpStream::connect()  →  connect(2)
stream.read(&buf)     →  poll_read()                      →  read(2)
  │                        │
  │  if WouldBlock:        │
  │  ┌─────────────────┐   │
  │  │ Register with    │   │
  │  │ I/O driver       │   │
  │  │ (mio::Registry)  │   │
  │  │ Return Pending   │   │
  │  └─────────────────┘   │
  │                        │
  │  Later, epoll/kqueue   │
  │  says "fd ready" ──────┤
  │  → wake task           │
  │  → poll_read() again   │
  │  → data available!     │

TcpListener accept loop

The fundamental server pattern:

#![allow(unused)]
fn main() {
use tokio::net::TcpListener;

let listener = TcpListener::bind("0.0.0.0:8080").await?;
loop {
    let (stream, addr) = listener.accept().await?;
    tokio::spawn(async move {
        handle_connection(stream, addr).await;
    });
}
}

Internally, accept() calls poll_accept() which:

  1. Tries mio::TcpListener::accept() (non-blocking)
  2. On WouldBlock → registers with reactor, returns Pending
  3. Reactor wakes task when a new connection arrives
  4. accept() retries → succeeds this time

Splitting a TcpStream

                    TcpStream
                   ┌─────────┐
                   │  read   │
                   │  write  │
                   │  (one   │
                   │   fd)   │
                   └────┬────┘
                        │
          ┌─────────────┴─────────────┐
          │  split() (borrowed)        │  into_split() (owned)
          ▼                            ▼
  ┌──────────────┐              ┌──────────────┐
  │  ReadHalf<'_>│              │OwnedReadHalf │ ← Send + 'static
  │  WriteHalf<'_>│              │OwnedWriteHalf│ ← Send + 'static
  └──────────────┘              └──────────────┘
  Same task only               Different tasks OK
  • split() — borrows the stream, returns ReadHalf + WriteHalf. Not Send. Use within one task.
  • into_split() — consumes the stream, returns owned halves. Send. Use when reader and writer are in different tasks.
#![allow(unused)]
fn main() {
// Two tasks: one reads, one writes
let (reader, writer) = stream.into_split();
tokio::spawn(read_loop(reader));
tokio::spawn(write_loop(writer));
}

UDP sockets

#![allow(unused)]
fn main() {
let socket = tokio::net::UdpSocket::bind("0.0.0.0:3000").await?;
// No connection — just send/receive datagrams
socket.send_to(b"ping", "127.0.0.1:4000").await?;

let mut buf = [0u8; 1024];
let (len, addr) = socket.recv_from(&mut buf).await?;
}

Exercises

Exercise 1: TCP echo with split

Write a TCP echo server that uses into_split() to put reading and writing in separate tasks. The reader reads lines and sends them through a tokio::sync::mpsc channel to the writer.

Exercise 2: UDP ping-pong

Create two UDP sockets. Socket A sends “ping” to Socket B. Socket B replies “pong”. Print the round-trip time.

Exercise 3: Connection counter

Build a TCP server that tracks total connections with an Arc<AtomicUsize>. Each new connection prints “Connection #N from {addr}”. Echo data back, then increment the counter on disconnect.

Exercise 4: Multi-client chat (simple)

TCP server where each client’s message is broadcast to all others. Use into_split() + a shared Vec<OwnedWriteHalf> behind a tokio::sync::Mutex.

Lesson 20: Task-Local Storage — tokio::task::LocalKey

Prerequisites: Lesson 4 (tasks), Lesson 14 (work-stealing).

Real-life analogy: name tags at a conference

┌───────────────────────────────────────────────────────────────┐
│  Conference Venue                                             │
│                                                               │
│  Thread-locals = room assignments:                            │
│    Room A has whiteboard "Project X"                          │
│    Room B has whiteboard "Project Y"                          │
│    If attendee MOVES rooms → sees wrong whiteboard!           │
│                                                               │
│  Task-locals = name tags on each person:                      │
│    Alice wears "Request #42"                                  │
│    Bob wears "Request #99"                                    │
│    No matter which room they walk into,                       │
│    their name tag follows them.                               │
│                                                               │
│  In async: tasks migrate between OS threads (rooms).          │
│  Thread-locals follow the room. Task-locals follow the task.  │
└───────────────────────────────────────────────────────────────┘

The problem with thread-locals in async

Thread 1              Thread 2
┌────────────┐        ┌────────────┐
│ TLS: "abc" │        │ TLS: "xyz" │
│            │        │            │
│ Task A     │        │            │
│  reads TLS │        │            │
│  → "abc"   │        │            │
│  .await    │───────►│ Task A     │   (work-stealing moved it!)
│            │        │  reads TLS │
│            │        │  → "xyz"   │   WRONG! Expected "abc"
└────────────┘        └────────────┘

In a multi-thread tokio runtime, a task can resume on any worker thread after .await. Thread-local values belong to the thread, not the task.

task_local! to the rescue

#![allow(unused)]
fn main() {
tokio::task_local! {
    static REQUEST_ID: String;
}

async fn handle_request(id: String) {
    REQUEST_ID.scope(id, async {
        // Value is set for the duration of this future
        do_work().await;        // survives .await
        do_more_work().await;   // still correct
    }).await;
}

async fn do_work() {
    REQUEST_ID.with(|id| {
        println!("Processing request: {id}");
    });
}
}

Scoping rules

REQUEST_ID.scope("req-42", async {
    │
    │  REQUEST_ID.with(|id| ...)    → Ok("req-42")
    │
    │  REQUEST_ID.scope("req-99", async {   // nested: shadows outer
    │      │
    │      │  REQUEST_ID.with(|id| ...)  → Ok("req-99")
    │      │
    │  }).await;
    │
    │  REQUEST_ID.with(|id| ...)    → Ok("req-42")  (restored)
    │
}).await;

// Outside any scope:
REQUEST_ID.with(|id| ...)           → PANIC!
REQUEST_ID.try_with(|id| ...)       → Err(AccessError)

Key limitations

LimitationWhy
No set() methodValues are immutable within a scope
Not inherited by child taskstokio::spawn creates a fresh context
Must use .scope()Cannot set from outside an async context
One value per scopeNesting shadows, does not merge

Child tasks do NOT inherit

#![allow(unused)]
fn main() {
REQUEST_ID.scope("req-42".into(), async {
    tokio::spawn(async {
        // PANIC! REQUEST_ID is not set here.
        REQUEST_ID.with(|id| println!("{id}"));
    }).await;
}).await;
}

You must explicitly pass values to child tasks via .scope() or function arguments.

Common patterns

Request ID propagation

#![allow(unused)]
fn main() {
task_local! { static REQ_ID: u64; }

async fn middleware(req_id: u64, handler: impl Future<Output = ()>) {
    REQ_ID.scope(req_id, handler).await;
}

async fn log_something() {
    REQ_ID.with(|id| println!("[req={id}] doing something"));
}
}

Exercises

Exercise 1: Task-local survives .await

Define a REQUEST_ID task-local. Set it with .scope(), call an async function that reads it after an .await point. Verify the value is correct.

Exercise 2: Isolation between tasks

Spawn 10 tasks, each with a unique task-local value. Each task prints its value after a yield_now(). Verify no task sees another’s value.

Exercise 3: Child task does NOT inherit

Demonstrate that tokio::spawn inside a .scope() does NOT have access to the parent’s task-local. Use try_with to show it returns Err.

Lesson 21: Graceful Shutdown — CancellationToken, Signal Handling, Drain Pattern

Prerequisites: Lessons 15 (select!), 18 (tokio::sync).

Real-life analogy: closing a restaurant

┌────────────────────────────────────────────────────────────────┐
│  Closing Time Protocol                                         │
│                                                                │
│  Phase 1: STOP ACCEPTING                                       │
│    Manager locks the front door.                               │
│    No new customers allowed.                                   │
│    → Stop calling listener.accept()                            │
│                                                                │
│  Phase 2: SIGNAL WORKERS                                       │
│    Manager tells all waiters: "We're closing."                 │
│    → CancellationToken::cancel()                               │
│                                                                │
│  Phase 3: DRAIN                                                │
│    Let current diners finish their meals.                      │
│    Waiters complete current tables, don't start new ones.      │
│    → Wait for in-flight tasks to complete                      │
│                                                                │
│  Phase 4: HARD DEADLINE                                        │
│    "Kitchen closes in 5 minutes, eat or leave."                │
│    → tokio::time::timeout on the drain                         │
│                                                                │
│  Phase 5: CLEANUP                                              │
│    Turn off lights, lock up, save state.                       │
│    → Flush logs, close DB connections                          │
└────────────────────────────────────────────────────────────────┘

Architecture

                    ┌──────────────┐
  Ctrl+C ──────────►  Signal       │
  SIGTERM ─────────►  Handler      │
                    └──────┬───────┘
                           │ cancel()
                    ┌──────▼───────┐
                    │ Cancellation  │
                    │ Token (root)  │
                    └──────┬───────┘
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
        child_token   child_token  child_token
              │            │            │
          ┌───▼───┐   ┌───▼───┐   ┌───▼───┐
          │Task A │   │Task B │   │Task C │
          │select!│   │select!│   │select!│
          │cancel │   │cancel │   │cancel │
          │or work│   │or work│   │or work│
          └───────┘   └───────┘   └───────┘

Signal handling

#![allow(unused)]
fn main() {
use tokio::signal;

// Simple: wait for Ctrl+C
signal::ctrl_c().await?;

// In a server: use select! to race shutdown against work
tokio::select! {
    _ = signal::ctrl_c() => {
        println!("shutdown signal received");
    }
    _ = server.run() => {
        println!("server exited on its own");
    }
}
}

CancellationToken

From tokio_util::sync (but we can build the pattern with tokio::sync::Notify):

#![allow(unused)]
fn main() {
// Using Notify as a poor-man's CancellationToken
let shutdown = Arc::new(Notify::new());

// In signal handler:
shutdown.notify_waiters();   // wake ALL waiters

// In each worker task:
tokio::select! {
    _ = shutdown.notified() => {
        println!("shutting down");
        return;
    }
    result = do_work() => {
        // process result
    }
}
}

The drain pattern

#![allow(unused)]
fn main() {
let shutdown = Arc::new(Notify::new());
let in_flight = Arc::new(AtomicUsize::new(0));
let all_done = Arc::new(Notify::new());

// Worker tasks:
in_flight.fetch_add(1, Ordering::SeqCst);
// ... do work ...
if in_flight.fetch_sub(1, Ordering::SeqCst) == 1 {
    all_done.notify_one();  // last task done
}

// Shutdown sequence:
shutdown.notify_waiters();             // Phase 2: signal
tokio::time::timeout(                  // Phase 4: hard deadline
    Duration::from_secs(30),
    all_done.notified()                // Phase 3: wait for drain
).await;
}

DropGuard pattern

Ensure cancellation even if the shutdown logic panics:

#![allow(unused)]
fn main() {
// Wrapping in a struct whose Drop triggers cleanup
struct ShutdownGuard {
    notify: Arc<Notify>,
}
impl Drop for ShutdownGuard {
    fn drop(&mut self) {
        self.notify.notify_waiters();
    }
}
}

Exercises

Exercise 1: Ctrl+C echo server

Build a TCP echo server that stops accepting on Ctrl+C, finishes active connections, then exits.

Exercise 2: Notify-based shutdown hierarchy

Create a parent Notify and 5 worker tasks. Each worker does a loop of work with select! checking for shutdown. Cancel all workers, verify they all exit.

Exercise 3: Drain with in-flight counter

Track in-flight requests with AtomicUsize. After signaling shutdown, wait until the counter reaches zero (use Notify). Add a 5-second hard timeout.

Exercise 4: Graceful shutdown with state persistence

On shutdown, serialize current application state (e.g., a counter value) to a file. On restart, load it back. This simulates real-world graceful shutdown where you save progress.

Lesson 22: Tracing & Debugging Async Code

Prerequisites: Lesson 16 (Tokio architecture), Lesson 20 (task-locals).

Real-life analogy: security cameras in a building

┌────────────────────────────────────────────────────────────────┐
│  Building Security System                                      │
│                                                                │
│  println!/log = writing on paper:                              │
│    "Something happened at some time."                          │
│    No context, no trail, hard to correlate.                    │
│                                                                │
│  tracing = security cameras with zones:                        │
│    Camera in Lobby (span: "lobby")                             │
│      └── sees Person A enter at 9:01 (event)                  │
│    Camera in Office (span: "office_204")                       │
│      └── sees Person A sit down at 9:03 (event)               │
│      └── sees Person A make a call at 9:05 (event)            │
│                                                                │
│    You can follow Person A across cameras (spans).             │
│    Each camera records structured data (fields).               │
│    Zoom in on one zone or see the whole building.              │
│                                                                │
│  Spans follow a task across .await points,                     │
│  just like cameras track a person across rooms.                │
└────────────────────────────────────────────────────────────────┘

tracing vs log

log crate:                           tracing crate:
┌───────────────────────┐            ┌───────────────────────────┐
│ info!("got request")  │            │ info!(id=42, "got req")   │
│                       │            │                           │
│ Plain text messages.  │            │ Structured fields.        │
│ No context tracking.  │            │ Span context across .await│
│ Thread-local at best. │            │ Composable layers.        │
│                       │            │ Works with tokio-console. │
└───────────────────────┘            └───────────────────────────┘

Spans and events

Span: "handle_request" (id=42)
  │
  ├── Event: INFO "received request" { method: "GET", path: "/api" }
  │
  ├── Span: "db_query" (table="users")
  │     └── Event: DEBUG "query executed" { rows: 5, ms: 12 }
  │
  └── Event: INFO "response sent" { status: 200, ms: 15 }

Spans are like folders. Events are like log lines inside folders.
Spans carry context that events inherit.

The #[instrument] macro

#![allow(unused)]
fn main() {
use tracing::{info, instrument};

#[instrument(skip(stream), fields(addr = %addr))]
async fn handle_connection(stream: TcpStream, addr: SocketAddr) {
    info!("new connection");           // automatically tagged with addr
    let data = read_data(&stream).await;
    info!(bytes = data.len(), "read complete");
}
}

#[instrument] automatically:

  • Creates a span named after the function
  • Records function arguments as span fields
  • Enters/exits the span around .await points

Subscribers and layers

┌──────────────────────────────────────────────────────────────┐
│  tracing-subscriber stack                                     │
│                                                               │
│  ┌─────────────────────┐                                      │
│  │  EnvFilter layer    │ ← RUST_LOG=info,my_crate=debug       │
│  └──────────┬──────────┘                                      │
│  ┌──────────▼──────────┐                                      │
│  │  fmt::Layer         │ ← pretty-prints to stderr            │
│  └──────────┬──────────┘                                      │
│  ┌──────────▼──────────┐                                      │
│  │  (optional: JSON)   │ ← machine-readable output            │
│  └──────────┬──────────┘                                      │
│  ┌──────────▼──────────┐                                      │
│  │  Registry           │ ← collects spans + events            │
│  └─────────────────────┘                                      │
└──────────────────────────────────────────────────────────────┘
#![allow(unused)]
fn main() {
use tracing_subscriber::{fmt, EnvFilter, prelude::*};

tracing_subscriber::registry()
    .with(EnvFilter::from_default_env())
    .with(fmt::layer())
    .init();
}

tokio-console

A live diagnostic tool for async applications:

1. Add console-subscriber to your app
2. Build with: RUSTFLAGS="--cfg tokio_unstable" cargo build
3. Run your app
4. In another terminal: tokio-console

You see:
  - All tasks: name, state (idle/running/waiting), polls, durations
  - Waker counts: how often each task was woken
  - Task details: where it was spawned, what it's waiting on

Common debugging patterns

ProblemTool
“Where is time spent?”#[instrument] + span timing
“Why is this task stuck?”tokio-console task list
“What’s the call chain?”Nested spans in log output
“Which request caused this?”Span fields (request_id)
“What are all tasks doing?”Handle::dump() (tokio_unstable)

Exercises

Exercise 1: Add structured tracing to a server

Add tracing and tracing-subscriber to a TCP echo server. Log connection events with structured fields (addr, bytes_read, duration). Use #[instrument].

Exercise 2: Custom event counter layer

Build a simple tracing layer (implement the Layer trait) that counts events per level (INFO, WARN, ERROR). Print the counts on shutdown.

Exercise 3: Request ID propagation with spans

Create a span with a request_id field for each incoming request. Log from nested functions — verify the request_id appears in all log lines automatically.

Project 3: HTTP Load Tester (mini wrk/hey)

Combines: Lessons 18-22 (tokio::sync, tokio::net, task-locals, graceful shutdown, tracing).

What you’ll build

A CLI tool that hammers an HTTP endpoint, controls concurrency with a Semaphore, handles Ctrl+C gracefully, propagates request IDs via task-locals, and reports latency percentiles.

Architecture

┌───────────────────────────────────────────────────────────────┐
│  load-tester --url http://example.com --requests 100 -c 10   │
└──────────────────────────┬────────────────────────────────────┘
                           │
                    ┌──────▼──────┐
                    │  CLI Parser │ (clap)
                    │  --url      │
                    │  --requests │
                    │  --concurrency│
                    └──────┬──────┘
                           │
              ┌────────────┼────────────┐
              │            │            │
     ┌────────▼───┐  ┌────▼─────┐  ┌──▼──────────┐
     │ Semaphore  │  │ Shutdown │  │ Result       │
     │ (c permits)│  │ (Notify) │  │ Collector    │
     │            │  │ Ctrl+C   │  │ (Mutex<Vec>) │
     └────────┬───┘  └────┬─────┘  └──┬──────────┘
              │            │           │
              └────────────┼───────────┘
                           │
              ┌────────────▼────────────────┐
              │  for each request 1..N:      │
              │    tokio::spawn {            │
              │      acquire semaphore       │
              │      set task-local req_id   │
              │      select! {               │
              │        shutdown => return    │
              │        send_request => {     │
              │          record latency      │
              │          record status       │
              │        }                     │
              │      }                       │
              │    }                         │
              └────────────┬────────────────┘
                           │
                    ┌──────▼──────┐
                    │  Report     │
                    │  p50, p90   │
                    │  p99, max   │
                    │  throughput │
                    │  status map │
                    └─────────────┘

CLI interface

load-tester --url https://example.com/api --requests 1000 --concurrency 50
FlagShortDefaultDescription
--url-urequiredTarget URL
--requests-n100Total requests to send
--concurrency-c10Max concurrent requests

Sample output

Target: https://example.com/api
Requests: 1000, Concurrency: 50

Running...  [1000/1000] done

Results:
  Total:      1000 requests
  Succeeded:  985
  Failed:     15
  Duration:   2.34s
  Throughput: 427.35 req/s

Latency:
  p50:    4.2ms
  p90:   12.1ms
  p99:   45.3ms
  max:   102.7ms

Status codes:
  200: 985
  503: 15

Key implementation details

Concurrency with Semaphore

#![allow(unused)]
fn main() {
let sem = Arc::new(Semaphore::new(concurrency));
for i in 0..total_requests {
    let permit = sem.clone().acquire_owned().await?;
    tokio::spawn(async move {
        let result = send_request(&url).await;
        drop(permit); // release slot
        result
    });
}
}

Latency percentiles

#![allow(unused)]
fn main() {
fn percentile(sorted: &[Duration], p: f64) -> Duration {
    let idx = ((sorted.len() as f64) * p / 100.0) as usize;
    sorted[idx.min(sorted.len() - 1)]
}
}

Graceful Ctrl+C

#![allow(unused)]
fn main() {
let shutdown = Arc::new(Notify::new());
tokio::spawn({
    let s = shutdown.clone();
    async move {
        tokio::signal::ctrl_c().await.ok();
        s.notify_waiters();
    }
});
}

Making HTTP requests with raw TCP

Since we don’t have reqwest, we use raw TcpStream with minimal HTTP/1.1:

#![allow(unused)]
fn main() {
async fn http_get(url: &str) -> Result<(u16, Duration)> {
    let start = Instant::now();
    let mut stream = TcpStream::connect((host, port)).await?;
    stream.write_all(format!("GET {path} HTTP/1.1\r\nHost: {host}\r\n\r\n").as_bytes()).await?;
    // Read response, parse status code
    Ok((status_code, start.elapsed()))
}
}

Exercises

Exercise 1: Basic load tester

Implement the full load tester with Semaphore-based concurrency, latency collection, and percentile reporting. Use raw TCP for HTTP requests.

Exercise 2: Ctrl+C graceful shutdown

Add tokio::signal::ctrl_c() handling. On Ctrl+C, stop spawning new requests, let in-flight ones finish, then print partial results.

Exercise 3: Live progress reporting

Print a progress line that updates every 100ms showing completed/total requests and current throughput. Use a separate task with tokio::time::interval.

Lesson 23: Backpressure – Bounded Channels, Semaphore, poll_ready

Prerequisites: Lessons 13 (channels), 18 (tokio::sync), 21 (graceful shutdown).

Real-life analogy: water pressure in pipes

UNBOUNDED (no backpressure):

  Fire hydrant          Tiny garden hose
  =============>>>>>>>>>=============>  BURST!
  (fast producer)       (slow consumer)

  The producer pushes water faster than the pipe can carry.
  Pressure builds. The pipe bursts. Your system OOMs.


BOUNDED (with backpressure):

  Fire hydrant      Pressure valve      Garden hose
  =============>>>> |  BLOCKS  | >>>>=============>
  (fast producer)   (capacity=100)     (slow consumer)

  When the pipe is full, the valve closes.
  The producer STOPS until the consumer drains some water.
  Nobody bursts. The system stays alive.

Architecture: bounded channel flow

          Producer                 Consumer
            |                        |
            v                        |
   tx.send(item).await               |
            |                        |
   +--------v--------+              |
   |  Bounded Buffer  |  capacity=4  |
   | [X] [X] [X] [X] |  FULL!       |
   +---------+--------+              |
             |                       |
   Producer SUSPENDS here    rx.recv().await
   until consumer takes one          |
             |                       v
   +--------v--------+       processes item
   | [X] [X] [X] [_] |  one slot free
   +---------+--------+
             |
   Producer WAKES UP, sends next item

Bounded channels

#![allow(unused)]
fn main() {
let (tx, rx) = tokio::sync::mpsc::channel(100); // buffer of 100

// Producer blocks (awaits) when buffer is full
tx.send(item).await?;

// try_send for non-blocking "drop if full" strategy
match tx.try_send(item) {
    Ok(()) => { /* sent */ }
    Err(TrySendError::Full(item)) => { /* channel full, drop or retry */ }
    Err(TrySendError::Closed(item)) => { /* receiver gone */ }
}
}

The send().await suspends when the buffer is full, naturally slowing the producer.

Semaphore-based flow control

When you do not use channels (e.g., spawned tasks hitting an API):

#![allow(unused)]
fn main() {
let sem = Arc::new(Semaphore::new(100));
loop {
    let permit = sem.acquire().await?; // blocks if 100 tasks in-flight
    tokio::spawn(async move {
        do_work().await;
        drop(permit); // releases slot
    });
}
}

Strategies for overload

StrategyMechanismWhen to use
Block (await)send().awaitDefault – propagate pressure
Drop newesttry_send + discardMetrics, telemetry
Drop oldestVecDeque pop frontReal-time video frames
Return errorHTTP 503Load shedding at the edge

TCP analogy

TCP’s receive window is backpressure at the OS level. When the receiver’s buffer fills, the sender’s write() blocks. Tokio’s bounded channels are the application-level equivalent.

Exercises

Exercise 1: Bounded vs unbounded memory

Create a producer that generates items 10x faster than the consumer. Compare memory usage between a bounded channel (capacity 10) and an unbounded channel after 100,000 items.

Exercise 2: Semaphore rate limiter

Build a rate limiter using Semaphore that allows at most 5 concurrent HTTP-style requests. Log when a request waits for a permit vs gets one immediately.

Exercise 3: Drop-oldest policy

Implement a “drop oldest” buffer using VecDeque behind a Mutex. When the buffer is full, pop the front before pushing to the back.

Exercise 4: Chained backpressure

Chain two bounded channels: producer -> transformer -> consumer. Slow down the consumer and observe backpressure propagating all the way to the producer.

Lesson 24: Cancellation Safety — Dropped Futures, Data Loss, Safe Patterns

What you’ll learn

  • What “cancellation safe” means for async functions
  • How select! can cause data loss with unsafe futures
  • Identifying cancellation-unsafe operations
  • Patterns to make code cancellation-safe

Key concepts

The danger

When tokio::select! picks one branch, the other futures are dropped. If a dropped future had partially completed work (e.g., read some bytes into a buffer), that work is lost.

#![allow(unused)]
fn main() {
// UNSAFE: if sleep wins, partial read data is lost
tokio::select! {
    result = reader.read(&mut buf) => { /* ... */ }
    _ = tokio::time::sleep(timeout) => { /* timeout */ }
}
}

Cancellation-safe vs unsafe

SafeUnsafe
tokio::sync::mpsc::Receiver::recv()tokio::io::AsyncReadExt::read_exact()
TcpListener::accept()tokio::io::AsyncReadExt::read() with reused buffer
tokio::time::sleep()Futures that do partial work before first .await

A future is cancellation-safe if dropping it after any .await point loses no data.

Safe patterns

Pattern 1: Use cancellation-safe alternatives

#![allow(unused)]
fn main() {
// Use recv() in select — it's cancellation-safe
tokio::select! {
    msg = rx.recv() => { /* no data loss */ }
    _ = token.cancelled() => { return; }
}
}

Pattern 2: Move work into a spawned task

#![allow(unused)]
fn main() {
let handle = tokio::spawn(async { read_exact(&mut buf).await });
tokio::select! {
    result = handle => { /* ... */ }
    _ = shutdown => { /* handle still runs to completion */ }
}
}

Pattern 3: Pin and reuse the future

#![allow(unused)]
fn main() {
let read_fut = pin!(reader.read(&mut buf));
// Reuse across select iterations instead of recreating
}

Exercises

  1. Demonstrate data loss: use select! with read_exact and a timer, show bytes vanish
  2. Fix the above using a spawned task
  3. Write a cancellation-safe message reader that accumulates bytes across select! iterations
  4. Audit tokio::sync docs — list which methods are cancellation-safe and which are not
  5. Implement a CancellationSafeReader wrapper that buffers partial reads

Lesson 24: Bridging Sync and Async — block_on, spawn_blocking, Handle

What you’ll learn

  • Calling async code from sync code (block_on, Handle::block_on)
  • Calling sync/blocking code from async code (spawn_blocking)
  • The blocking thread pool and its configuration
  • Common pitfalls and anti-patterns

Key concepts

Async from sync: block_on

#![allow(unused)]
fn main() {
let rt = tokio::runtime::Runtime::new()?;
let result = rt.block_on(async {
    fetch_data().await
});
}

block_on parks the current thread until the future completes. Never call it from inside an async context (deadlock).

Sync from async: spawn_blocking

#![allow(unused)]
fn main() {
let hash = tokio::task::spawn_blocking(move || {
    // CPU-heavy or blocking I/O — runs on dedicated thread pool
    compute_bcrypt_hash(&password)
}).await?;
}

The blocking pool has up to 512 threads by default. Tasks here do not block the async worker threads.

Handle for deferred async access

#![allow(unused)]
fn main() {
let handle = tokio::runtime::Handle::current();

std::thread::spawn(move || {
    // From a plain OS thread, run async code:
    handle.block_on(async {
        client.get(url).send().await
    });
});
}

Anti-patterns

Anti-patternProblemFix
block_on inside asyncDeadlockUse .await
Blocking in async taskStarves workersspawn_blocking
spawn_blocking for I/OWastes pool threadsUse async I/O
Nested runtimesPanicUse Handle::current()

Exercises

  1. Call an async HTTP client from a synchronous main using block_on
  2. Use spawn_blocking to offload a CPU-heavy Fibonacci computation
  3. Pass a Handle to a std thread and use it to run async DNS resolution
  4. Demonstrate the deadlock when calling block_on inside an async task
  5. Configure the blocking thread pool size with max_blocking_threads and observe behavior under load

Lesson 26: Streams — Async Iteration, StreamExt, Backpressure

What you’ll learn

  • The Stream trait as the async equivalent of Iterator
  • Useful combinators from StreamExt and TryStreamExt
  • Creating streams from channels, iterators, and async generators
  • Backpressure considerations with streams

Key concepts

The Stream trait

#![allow(unused)]
fn main() {
pub trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}
}

Like Iterator, but poll_next can return Pending.

StreamExt combinators

#![allow(unused)]
fn main() {
use tokio_stream::StreamExt;

let mut stream = tokio_stream::iter(vec![1, 2, 3])
    .map(|x| x * 2)
    .filter(|x| *x > 2)
    .take(5);

while let Some(item) = stream.next().await {
    println!("{item}");
}
}

Key combinators: map, filter, take, merge, chain, throttle, chunks, timeout.

Creating streams

  • tokio_stream::iter() — from an iterator
  • ReceiverStream::new(rx) — from an mpsc::Receiver
  • async_stream::stream! — from an async block with yield
  • BroadcastStream — from a broadcast::Receiver

Backpressure in streams

Streams are pull-based: the consumer calls next().await, so the producer only runs when demanded. This provides natural backpressure. Combine with buffer_unordered(n) to control concurrency:

#![allow(unused)]
fn main() {
stream
    .map(|url| async move { fetch(url).await })
    .buffer_unordered(10)  // at most 10 concurrent fetches
}

Exercises

  1. Convert an mpsc::Receiver into a stream and process items with StreamExt::map
  2. Use buffer_unordered to fetch 100 URLs with max 10 concurrent requests
  3. Implement a custom Stream that yields Fibonacci numbers with a delay
  4. Use stream.chunks(10) to batch database inserts
  5. Merge two streams and process items in arrival order

Lesson 27: Connection Pooling — Reuse, Health Checks, Idle Timeout

What you’ll learn

  • Why connection pooling matters (TCP handshake, TLS, auth overhead)
  • Building a pool with Semaphore + VecDeque
  • Health checking idle connections
  • Idle timeout and eviction strategies

Key concepts

Why pool?

Each new TCP connection costs: DNS lookup, TCP handshake (1 RTT), TLS handshake (1-2 RTT), and often authentication. Reusing connections amortizes this cost.

Pool architecture

checkout() -> acquire Semaphore permit
           -> pop from idle queue (or create new)
           -> health check
           -> return PooledConnection

drop(PooledConnection) -> health check
                       -> push to idle queue
                       -> release Semaphore permit

Core components

#![allow(unused)]
fn main() {
struct Pool {
    idle: Mutex<VecDeque<Connection>>,
    semaphore: Semaphore,        // limits total connections
    max_idle: usize,
    idle_timeout: Duration,
}
}

Health checks

  • On checkout — ping before returning to caller
  • On return — verify connection is still usable
  • Background — periodic sweep of idle connections

Idle timeout

Connections sitting idle too long may be closed by the server or a firewall. Evict them:

#![allow(unused)]
fn main() {
// Background task
loop {
    tokio::time::sleep(Duration::from_secs(30)).await;
    pool.evict_idle_older_than(idle_timeout);
}
}

Real-world pools

  • deadpool — generic async pool
  • bb8 — based on r2d2’s design
  • sqlx — built-in pool for database connections

Exercises

  1. Build a simple TCP connection pool using Semaphore and VecDeque
  2. Add a health check that sends a ping before returning a connection
  3. Implement idle timeout eviction with a background reaper task
  4. Add metrics: total connections, idle connections, wait time
  5. Stress test: 100 concurrent tasks sharing a pool of 10 connections

Lesson 28: Testing Async Code — tokio::test, Time Mocking, Deterministic Testing

What you’ll learn

  • Using #[tokio::test] for async unit tests
  • Mocking time with tokio::time::pause() and advance()
  • Deterministic testing with current_thread runtime
  • Testing patterns for channels, tasks, and I/O

Key concepts

#[tokio::test]

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_something() {
    let result = my_async_fn().await;
    assert_eq!(result, 42);
}
}

By default, uses current_thread runtime. For multi-thread: #[tokio::test(flavor = "multi_thread")].

Time mocking

pause() freezes time; advance() moves it forward instantly:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_timeout() {
    tokio::time::pause();

    let start = Instant::now();
    tokio::time::sleep(Duration::from_secs(3600)).await;

    // Completes instantly — time is mocked
    assert!(start.elapsed() >= Duration::from_secs(3600));
}
}

Auto-advance: when all tasks are waiting on time, the runtime jumps to the next timer automatically.

Deterministic testing

current_thread runtime is deterministic — tasks run in a predictable order. Useful for reproducing race conditions.

Testing patterns

PatternApproach
Test a spawned taskUse JoinHandle to await result
Test channelsCreate channel, send, recv, assert
Test shutdownCreate CancellationToken, cancel, verify cleanup
Test I/OUse tokio::io::duplex() for in-memory streams
Test timeoutsPause time, advance past deadline

tokio::io::duplex

#![allow(unused)]
fn main() {
let (client, server) = tokio::io::duplex(1024);
// Use client and server as AsyncRead + AsyncWrite
// No real TCP needed
}

Exercises

  1. Write a #[tokio::test] that verifies an async function returns the correct value
  2. Use time::pause() and advance() to test a retry function with exponential backoff (no real waiting)
  3. Test a producer-consumer pipeline using mpsc channels
  4. Use tokio::io::duplex() to test a protocol parser without real sockets
  5. Test graceful shutdown: cancel a token, verify all tasks exit cleanly

Project 4: Async Job Queue with Workers, Backpressure, Graceful Shutdown

What you’ll learn

  • Designing a multi-stage async pipeline
  • Backpressure with bounded channels
  • spawn_blocking for CPU-bound work
  • Cancellation safety across the pipeline
  • Testing async systems with time mocking

Specification

Architecture

Producer(s)
    |
    v
bounded mpsc channel (backpressure)
    |
    v
Dispatcher
    |
    +---> Worker 1 (spawn_blocking for CPU work)
    +---> Worker 2
    +---> Worker N
    |
    v
Results channel --> Result collector

Job definition

#![allow(unused)]
fn main() {
struct Job {
    id: u64,
    payload: Vec<u8>,
    job_type: JobType,
}

enum JobType {
    CpuBound { iterations: u64 },   // use spawn_blocking
    AsyncIo { url: String },         // use async I/O
    Delayed { delay: Duration },     // sleep then execute
}

struct JobResult {
    job_id: u64,
    duration: Duration,
    status: ResultStatus,
}
}

Components

ComponentResponsibility
ProducerGenerates jobs, sends to bounded channel; blocks on backpressure
DispatcherReceives jobs, assigns to workers via Semaphore(N)
WorkerExecutes job; uses spawn_blocking for CPU-bound work
CollectorReceives results, tracks stats, reports progress
Shutdown coordinatorCancellationToken hierarchy; drain pattern

Cancellation safety

  • Dispatcher uses cancellation-safe recv() in select!
  • Workers check cancellation between stages
  • On shutdown: stop accepting new jobs, drain in-flight, collect final results

Testing requirements

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_backpressure() {
    // Slow workers, fast producer -> producer blocks
}

#[tokio::test]
async fn test_graceful_shutdown() {
    tokio::time::pause();
    // Submit jobs, cancel token, advance time, verify all complete
}

#[tokio::test]
async fn test_worker_pool_concurrency() {
    // Submit N jobs, verify at most W run concurrently
}
}

Key concepts

  • Bounded channel for Producer -> Dispatcher backpressure
  • Semaphore in Dispatcher to limit concurrent workers
  • spawn_blocking for JobType::CpuBound to avoid starving async workers
  • CancellationToken hierarchy: root -> dispatcher -> workers
  • time::pause + advance for deterministic testing of delays and timeouts

Exercises

  1. Implement the basic pipeline: Producer -> Channel -> Dispatcher -> Workers -> Collector
  2. Add backpressure: bounded channel with capacity 10, verify producer blocks when queue is full
  3. Implement spawn_blocking for CPU-bound jobs; verify async workers are not starved
  4. Add graceful shutdown with CancellationToken and drain pattern
  5. Write tests using time::pause() for the Delayed job type
  6. Add a dead-letter queue for failed jobs with retry logic (max 3 retries, exponential backoff)
  7. Add metrics: jobs/sec, average latency, queue depth, worker utilization

Pattern 1: Task-per-Connection

Real-life analogy: the hotel concierge desk

Guest arrives → front desk assigns a personal concierge
  Concierge 1: handles Guest A (room service, wake-up call, taxi)
  Concierge 2: handles Guest B (restaurant booking, laundry)
  Concierge 3: handles Guest C (tour arrangements)

Each concierge handles ONE guest's entire stay.
When the guest checks out, the concierge is free.

This is task-per-connection:
  connection arrives → spawn a task → task handles everything → task ends

The pattern

The simplest and most common async architecture. For every incoming connection, spawn a dedicated task:

#![allow(unused)]
fn main() {
loop {
    let (stream, addr) = listener.accept().await?;
    tokio::spawn(async move {
        handle_client(stream).await;
    });
}
}
┌──────────────────────────────────────────────────┐
│  Task-per-Connection                             │
│                                                  │
│  Listener                                        │
│    │                                             │
│    ├── accept() → spawn(handle(client_1))        │
│    ├── accept() → spawn(handle(client_2))        │
│    ├── accept() → spawn(handle(client_3))        │
│    └── ...                                       │
│                                                  │
│  Each task:                                      │
│    read request → process → write response       │
│    → loop or disconnect                          │
│                                                  │
│  Tasks are independent. One slow client          │
│  doesn't affect others.                          │
└──────────────────────────────────────────────────┘

When to use

  • Web servers — each HTTP request gets a task
  • Database servers — each client connection gets a task
  • Chat servers — each user gets a task
  • Proxies — each proxied connection gets a task
  • Game servers — each player gets a task

Basically: anything that accepts connections and handles them independently.

When NOT to use

  • When connections need to share heavy state (use actors instead)
  • When you need to limit concurrency precisely (add a semaphore)
  • When tasks need to coordinate tightly (use channels between tasks)

The concurrency limit problem

Spawning unlimited tasks can exhaust memory:

10,000 connections → 10,000 tasks → fine
100,000 connections → 100,000 tasks → maybe fine
1,000,000 connections → 1,000,000 tasks → might OOM

Solution: limit concurrent connections with a semaphore:

#![allow(unused)]
fn main() {
let semaphore = Arc::new(Semaphore::new(10_000)); // max 10K concurrent

loop {
    let permit = semaphore.clone().acquire_owned().await.unwrap();
    let (stream, _) = listener.accept().await?;
    tokio::spawn(async move {
        handle_client(stream).await;
        drop(permit); // release the slot
    });
}
}

Shared state between tasks

Tasks often need shared state (user list, config, counters). Three approaches:

Option A: Arc<Mutex<T>>
  Simple. Lock contention if many tasks write.
  Good for: counters, small config objects.

Option B: Arc<RwLock<T>>
  Many readers, few writers.
  Good for: shared config that rarely changes.

Option C: Dedicated state task (Actor pattern → next chapter)
  One task owns the state, others send messages.
  Good for: complex state, no lock contention.

Code exercise: TCP Chat Server

Build a chat server where each client gets a task:

┌──────────┐     ┌──────────┐     ┌──────────┐
│ Client A │     │ Client B │     │ Client C │
│  (task)  │     │  (task)  │     │  (task)  │
└────┬─────┘     └────┬─────┘     └────┬─────┘
     │                │                │
     └───────┬────────┴────────┬───────┘
             │                 │
     ┌───────▼─────────────────▼───────┐
     │  Shared state:                  │
     │  HashMap<ClientId, Sender>       │
     │  (behind Arc<Mutex>)            │
     └─────────────────────────────────┘

Requirements:

  1. Accept TCP connections, spawn a task per client
  2. Each task reads lines from its client
  3. Broadcast messages to all other clients
  4. Handle disconnect (remove from shared state)
  5. Limit to 100 concurrent connections with a semaphore

Starter code:

use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    let semaphore = Arc::new(Semaphore::new(100));
    // TODO: shared state for connected clients

    loop {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        let (stream, addr) = listener.accept().await.unwrap();
        // TODO: clone shared state

        tokio::spawn(async move {
            println!("{addr} connected");
            // TODO: handle client (read lines, broadcast, disconnect)
            drop(permit);
        });
    }
}

Test with: nc 127.0.0.1 8080 in multiple terminals.

Pattern 2: Actor Model

Real-life analogy: office departments

┌──────────────┐  memo  ┌──────────────┐  memo  ┌──────────────┐
│  Sales Dept  │ ──────►│  Accounting  │ ──────►│  Shipping    │
│              │        │              │        │              │
│  Inbox: 📬   │        │  Inbox: 📬   │        │  Inbox: 📬   │
│  State: leads│        │  State: books│        │  State: orders│
│  Staff: 1    │        │  Staff: 1    │        │  Staff: 1    │
└──────────────┘        └──────────────┘        └──────────────┘

Each department:
  - Has its own inbox (channel)
  - Processes memos one at a time (no multitasking within a dept)
  - Has private state (no other dept can touch it)
  - Communicates only via memos (messages)

Nobody walks into Accounting and grabs the books.
They send a memo and wait for a reply.

The pattern

An actor is a task that:

  1. Owns its state exclusively (no shared memory)
  2. Receives messages through a channel (its “inbox”)
  3. Processes messages one at a time (sequential, no locks needed)
  4. Can send messages to other actors
#![allow(unused)]
fn main() {
struct BankAccount {
    balance: u64,
    inbox: mpsc::Receiver<AccountMessage>,
}

enum AccountMessage {
    Deposit { amount: u64 },
    Withdraw { amount: u64, reply: oneshot::Sender<Result<(), String>> },
    GetBalance { reply: oneshot::Sender<u64> },
}

impl BankAccount {
    async fn run(mut self) {
        while let Some(msg) = self.inbox.recv().await {
            match msg {
                AccountMessage::Deposit { amount } => {
                    self.balance += amount;
                }
                AccountMessage::Withdraw { amount, reply } => {
                    if self.balance >= amount {
                        self.balance -= amount;
                        let _ = reply.send(Ok(()));
                    } else {
                        let _ = reply.send(Err("insufficient funds".into()));
                    }
                }
                AccountMessage::GetBalance { reply } => {
                    let _ = reply.send(self.balance);
                }
            }
        }
    }
}
}
┌────────────────────────────────────────────────────────┐
│  Actor Model                                           │
│                                                        │
│  ┌─────────────┐  msg   ┌─────────────┐               │
│  │  Client     │ ──────►│  Actor      │               │
│  │  (any task) │        │             │               │
│  │             │◄───────│  - inbox    │               │
│  │  sends msg  │ reply  │  - state    │               │
│  │  + oneshot  │        │  - run loop │               │
│  └─────────────┘        └─────────────┘               │
│                                                        │
│  No locks. No shared state. No data races.             │
│  The actor processes one message at a time.            │
│  State is private — only the actor touches it.         │
└────────────────────────────────────────────────────────┘

Actor vs Shared State

Shared state (Arc<Mutex<T>>):         Actor:
  ┌──────────┐ ┌──────────┐           ┌──────────┐
  │ Task A   │ │ Task B   │           │ Task A   │──msg──┐
  │ lock()   │ │ lock()   │           │          │       │
  │ modify   │ │ BLOCKED  │           └──────────┘       ▼
  │ unlock() │ │ ...      │           ┌──────────┐  ┌─────────┐
  └──────────┘ └──────────┘           │ Task B   │──►  Actor  │
                                      │          │  │ (no lock│
  Lock contention.                    └──────────┘  │  needed)│
  Deadlock risk.                                    └─────────┘
  Complex error handling.              No contention.
                                       Sequential processing.
                                       Simple reasoning.

When to use

  • Stateful services — user sessions, game entities, connection managers
  • When state is complex — a mutex would be held too long or across .await
  • Isolation — each actor can fail independently without corrupting shared state
  • Erlang/Elixir-style systems — the actor model is their core abstraction

When NOT to use

  • Simple shared countersAtomicU64 or Arc<Mutex<u64>> is simpler
  • Read-heavy workloads — actors serialize all access; RwLock allows concurrent reads
  • Fire-and-forget operations — if you don’t need a reply, a plain spawn is simpler

The request-reply pattern

To get data back from an actor, send a oneshot::Sender with the message:

#![allow(unused)]
fn main() {
// Client side:
let (reply_tx, reply_rx) = oneshot::channel();
actor_tx.send(AccountMessage::GetBalance { reply: reply_tx }).await?;
let balance = reply_rx.await?;  // waits for the actor to respond
}

Code exercise: Bank System

Build a bank with account actors:

┌──────────┐     ┌────────────────┐
│  Client  │────►│ Account "alice"│  (actor)
│  (task)  │     │ balance: 1000  │
└──────────┘     └────────────────┘
     │
     │           ┌────────────────┐
     └──────────►│ Account "bob"  │  (actor)
                 │ balance: 500   │
                 └────────────────┘

Requirements:

  1. Each bank account is an actor (a task with a channel inbox)
  2. Support: Deposit, Withdraw, GetBalance, Transfer(to_account, amount)
  3. Transfer is atomic: withdraw from A, deposit to B. If withdraw fails, B is unchanged.
  4. Multiple clients can interact with accounts concurrently — no locks.

Starter code:

#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, oneshot};

enum AccountMsg {
    Deposit { amount: u64 },
    Withdraw { amount: u64, reply: oneshot::Sender<Result<(), String>> },
    Balance { reply: oneshot::Sender<u64> },
}

#[derive(Clone)]
struct AccountHandle {
    tx: mpsc::Sender<AccountMsg>,
}

impl AccountHandle {
    async fn deposit(&self, amount: u64) {
        self.tx.send(AccountMsg::Deposit { amount }).await.unwrap();
    }

    async fn balance(&self) -> u64 {
        let (tx, rx) = oneshot::channel();
        self.tx.send(AccountMsg::Balance { reply: tx }).await.unwrap();
        rx.await.unwrap()
    }

    // TODO: withdraw, transfer
}

fn spawn_account(name: &str, initial_balance: u64) -> AccountHandle {
    let (tx, mut rx) = mpsc::channel(32);
    let name = name.to_string();

    tokio::spawn(async move {
        let mut balance = initial_balance;
        while let Some(msg) = rx.recv().await {
            match msg {
                AccountMsg::Deposit { amount } => balance += amount,
                // TODO: handle other messages
                _ => todo!(),
            }
        }
        println!("{name} actor shutting down");
    });

    AccountHandle { tx }
}
}

Test: create Alice (1000) and Bob (500). Transfer 200 from Alice to Bob. Check balances: Alice=800, Bob=700.

Pattern 3: Pipeline / Stream Processing

Real-life analogy: the car factory assembly line

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│ Station 1│───►│ Station 2│───►│ Station 3│───►│ Station 4│
│ Weld     │    │ Paint    │    │ Install  │    │ Quality  │
│ frame    │    │ body     │    │ engine   │    │ check    │
└──────────┘    └──────────┘    └──────────┘    └──────────┘

Car 1: [quality check]
Car 2:           [install engine]
Car 3:                     [paint]
Car 4:                               [weld frame]

All stations work simultaneously on different cars.
If painting is slow, welding pauses (backpressure).

The pattern

A chain of tasks connected by channels. Each stage processes items and passes them to the next:

producer → channel → transform → channel → filter → channel → sink
┌──────────────────────────────────────────────────────────┐
│  Pipeline                                                │
│                                                          │
│  ┌────────┐  ch  ┌──────────┐  ch  ┌────────┐  ch  ┌──┐│
│  │Producer│ ───► │Transform │ ───► │ Filter │ ───► │Out││
│  │        │      │          │      │        │      │  ││
│  │read    │      │parse     │      │errors  │      │  ││
│  │lines   │      │JSON      │      │only    │      │  ││
│  └────────┘      └──────────┘      └────────┘      └──┘│
│                                                          │
│  Each stage is a separate task.                          │
│  Bounded channels provide backpressure.                  │
│  If the filter is slow, transform pauses.                │
└──────────────────────────────────────────────────────────┘
#![allow(unused)]
fn main() {
let (tx1, rx1) = mpsc::channel(100);  // producer → parser
let (tx2, rx2) = mpsc::channel(100);  // parser → filter
let (tx3, rx3) = mpsc::channel(100);  // filter → output

// Stage 1: produce
tokio::spawn(async move {
    for line in read_lines("access.log").await {
        tx1.send(line).await.unwrap();
    }
});

// Stage 2: parse
tokio::spawn(async move {
    while let Some(line) = rx1.recv().await {
        if let Ok(record) = parse_json(&line) {
            tx2.send(record).await.unwrap();
        }
    }
});

// Stage 3: filter
tokio::spawn(async move {
    while let Some(record) = rx2.recv().await {
        if record.status >= 500 {
            tx3.send(record).await.unwrap();
        }
    }
});

// Stage 4: output
tokio::spawn(async move {
    while let Some(error) = rx3.recv().await {
        println!("ERROR: {} {}", error.path, error.status);
    }
});
}

Why bounded channels matter

Unbounded channels (BAD):
  Producer: 1,000,000 items/sec
  Consumer: 100 items/sec
  → channel grows to 999,900 items → OOM

Bounded channels (GOOD):
  channel(100)
  Producer: 1,000,000 items/sec
  Consumer: 100 items/sec
  → channel fills to 100 → producer.send().await BLOCKS
  → producer slows down to match consumer
  → memory stays constant
  → this is BACKPRESSURE

When to use

  • Log/event processing — parse, filter, aggregate, alert
  • ETL pipelines — extract, transform, load
  • Video/audio processing — decode → transform → encode
  • Network packet processing — capture → parse → analyze → store

When NOT to use

  • Request/response — a pipeline flows one direction; use task-per-connection for req/res
  • Simple transformations — if it’s one step, just do it inline (no pipeline needed)
  • When order doesn’t matter — use fan-out/fan-in instead (next pattern)

Code exercise: Log Analyzer

Build a pipeline that processes web server access logs:

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│ Read     │───►│ Parse    │───►│ Filter   │───►│ Aggregate│
│ lines    │    │ fields   │    │ errors   │    │ count by │
│ from file│    │ (split)  │    │ (5xx)    │    │ endpoint │
└──────────┘    └──────────┘    └──────────┘    └──────────┘

Input (access.log):

GET /api/users 200 12ms
POST /api/login 401 5ms
GET /api/data 500 1502ms
GET /api/users 200 8ms
GET /api/data 503 30000ms
POST /api/upload 500 250ms

Output:

Error summary:
  /api/data:   2 errors (500, 503)
  /api/upload: 1 error  (500)
  Total: 3 errors out of 6 requests

Requirements:

  1. Four pipeline stages, each a separate task
  2. Bounded channels (capacity 100) between stages
  3. Producer reads lines from a file (or generates them)
  4. Parser extracts: method, path, status, latency
  5. Filter passes only 5xx status codes
  6. Aggregator counts errors by endpoint, prints summary when done

Pattern 4: Fan-out / Fan-in

Real-life analogy: the research team

Professor assigns papers to read:
  ┌──────────┐
  │Professor │
  │(dispatch)│
  └────┬─────┘
       │ assigns
  ┌────┼────────────┐
  │    │            │
  ▼    ▼            ▼
┌────┐ ┌────┐ ┌────┐
│TA 1│ │TA 2│ │TA 3│   ← fan-out: work distributed
│read│ │read│ │read│
│ 10 │ │ 10 │ │ 10 │
└──┬─┘ └──┬─┘ └──┬─┘
   │      │      │
   └──────┼──────┘
          │ summaries
          ▼
   ┌──────────┐
   │Professor │              ← fan-in: results collected
   │(collect) │
   │write     │
   │report    │
   └──────────┘

30 papers read in parallel, not sequentially.
Total time: time of slowest TA, not sum of all.

The pattern

One task distributes work to N workers, another collects results:

┌──────────────────────────────────────────────────────────┐
│  Fan-out / Fan-in                                        │
│                                                          │
│  ┌──────────┐                                            │
│  │Dispatcher│                                            │
│  └────┬─────┘                                            │
│       │ fan-out                                          │
│  ┌────┼────────────┐                                     │
│  │    │            │                                     │
│  ▼    ▼            ▼                                     │
│ ┌──┐ ┌──┐ ┌──┐ ┌──┐                                     │
│ │W1│ │W2│ │W3│ │W4│  ← N workers (concurrent)           │
│ └┬─┘ └┬─┘ └┬─┘ └┬─┘                                     │
│  │    │    │    │                                        │
│  └────┼────┼────┘                                        │
│       │    │ fan-in                                      │
│  ┌────▼────▼───┐                                         │
│  │  Collector  │                                         │
│  └─────────────┘                                         │
└──────────────────────────────────────────────────────────┘

In Rust: JoinSet

#![allow(unused)]
fn main() {
use tokio::task::JoinSet;

let urls = vec!["https://a.com", "https://b.com", "https://c.com"];
let mut set = JoinSet::new();

// Fan-out: spawn one task per URL
for url in urls {
    set.spawn(async move {
        reqwest::get(url).await
    });
}

// Fan-in: collect results as they complete
while let Some(result) = set.join_next().await {
    match result {
        Ok(Ok(response)) => println!("Got: {}", response.status()),
        Ok(Err(e)) => println!("Request failed: {e}"),
        Err(e) => println!("Task panicked: {e}"),
    }
}
}

Concurrency limiting

Without a limit, fan-out can overwhelm the target:

Fan-out 10,000 HTTP requests simultaneously:
  → target server returns 429 Too Many Requests
  → or your machine runs out of file descriptors

Solution: Semaphore limits concurrent workers
#![allow(unused)]
fn main() {
let semaphore = Arc::new(Semaphore::new(50)); // max 50 concurrent

for url in urls {
    let permit = semaphore.clone().acquire_owned().await.unwrap();
    set.spawn(async move {
        let result = fetch(url).await;
        drop(permit); // release the slot
        result
    });
}
}

When to use

  • Parallel HTTP requests — fetch 100 URLs, collect results
  • Batch processing — process 10,000 records, N at a time
  • Map-reduce — transform items in parallel, aggregate results
  • Health checks — ping N services, report which are up

When NOT to use

  • Sequential dependencies — if step 2 depends on step 1’s output, use a pipeline instead
  • Single resource — if all workers hit the same bottleneck (one database), parallelism doesn’t help
  • Ordering matters — fan-in collects in completion order, not submission order

Code exercise: Web Crawler

Build a concurrent web crawler:

┌──────────┐
│ Seed URL │
│ list     │
└────┬─────┘
     │ fan-out (max 10 concurrent)
┌────┼──────────────────┐
│    │         │        │
▼    ▼         ▼        ▼
┌──┐ ┌──┐ ┌──┐ ┌──┐
│F1│ │F2│ │F3│ │F4│   fetch pages
└┬─┘ └┬─┘ └┬─┘ └┬─┘
 │    │    │    │     fan-in
 └────┼────┼────┘
      ▼
┌──────────┐
│ Results  │
│ - URL    │
│ - status │
│ - size   │
│ - time   │
└──────────┘

Requirements:

  1. Read a list of URLs (from a file or hardcoded)
  2. Fetch each URL concurrently (fan-out)
  3. Limit concurrency to 10 with a semaphore
  4. Collect results (fan-in): URL, HTTP status, response size, latency
  5. Print a summary table when all are done

Starter code:

use tokio::task::JoinSet;
use tokio::sync::Semaphore;
use std::sync::Arc;
use std::time::{Duration, Instant};

struct CrawlResult {
    url: String,
    status: u16,
    size: usize,
    latency: Duration,
}

async fn fetch(url: &str) -> CrawlResult {
    let start = Instant::now();
    // TODO: make HTTP request (use tokio::net::TcpStream + raw HTTP, or reqwest)
    // Return CrawlResult
    todo!()
}

#[tokio::main]
async fn main() {
    let urls = vec![
        "http://example.com",
        "http://httpbin.org/get",
        "http://httpbin.org/delay/2",
        // add more
    ];
    let semaphore = Arc::new(Semaphore::new(10));
    let mut set = JoinSet::new();

    for url in urls {
        let sem = semaphore.clone();
        set.spawn(async move {
            let _permit = sem.acquire().await.unwrap();
            fetch(url).await
        });
    }

    // TODO: collect results, print summary table
}

Expected output:

URL                          Status  Size     Latency
─────────────────────────────────────────────────────
http://example.com           200     1256B    120ms
http://httpbin.org/get       200     432B     89ms
http://httpbin.org/delay/2   200     312B     2045ms

Total: 3 URLs, 3 success, 0 failed

Pattern 5: Supervisor Tree

Real-life analogy: the corporate hierarchy

CEO (root supervisor)
  │
  ├── VP Engineering (supervisor)
  │     ├── Team Lead Backend (supervisor)
  │     │     ├── Developer 1 (worker) ← quits
  │     │     ├── Developer 2 (worker)     Team Lead hires
  │     │     └── Developer 3 (worker)     a replacement
  │     │
  │     └── Team Lead Frontend (supervisor)
  │           ├── Developer 4 (worker)
  │           └── Developer 5 (worker)
  │
  └── VP Operations (supervisor)
        └── SRE Team Lead (supervisor)
              ├── Oncall 1 (worker)
              └── Oncall 2 (worker) ← quits → Team Lead hires replacement

When a developer quits (task crashes):
  - Their team lead (supervisor) hires a replacement (restart)
  - The VP doesn't even know it happened
  - If the team lead quits → VP restarts the team lead + all their reports

The pattern

A supervisor monitors child tasks and restarts them when they fail:

┌──────────────────────────────────────────────────────────┐
│  Supervisor Tree                                         │
│                                                          │
│  ┌───────────┐                                           │
│  │ Supervisor│                                           │
│  │           │                                           │
│  │ children: │                                           │
│  │  [W1, W2] │                                           │
│  └─────┬─────┘                                           │
│        │                                                 │
│    ┌───┴───┐                                             │
│    │       │                                             │
│    ▼       ▼                                             │
│  ┌───┐  ┌───┐                                            │
│  │W1 │  │W2 │  ← W2 panics!                             │
│  └───┘  └───┘                                            │
│              │                                           │
│    Supervisor detects W2 exit                             │
│    Logs the failure                                      │
│    Spawns a NEW W2                                       │
│              │                                           │
│           ┌──▼──┐                                        │
│           │W2'  │  ← replacement, same job               │
│           └─────┘                                        │
│                                                          │
│  Restart strategies:                                     │
│    one-for-one: restart only the crashed child            │
│    one-for-all: restart ALL children (for dependent tasks)│
│    rest-for-one: restart crashed + all after it           │
└──────────────────────────────────────────────────────────┘
#![allow(unused)]
fn main() {
async fn supervisor(num_workers: usize) {
    let mut set = JoinSet::new();

    // Spawn initial workers
    for id in 0..num_workers {
        set.spawn(worker(id));
    }

    // Monitor and restart
    loop {
        match set.join_next().await {
            Some(Ok(())) => {
                println!("Worker finished normally");
            }
            Some(Err(e)) => {
                println!("Worker crashed: {e}. Restarting...");
                set.spawn(worker(next_id()));
            }
            None => break, // all workers done, supervisor exits
        }
    }
}

async fn worker(id: usize) {
    loop {
        // do work...
        // might panic!
    }
}
}

Erlang’s “let it crash” philosophy

Traditional approach:              Erlang/supervisor approach:
  fn process() {                     fn process() {
    if error {                         // just do the work
      handle_error();                  // if something goes wrong,
      recover();                       // let it crash
      retry();                         // the supervisor will restart us
      log();                         }
      // 50 lines of error handling
    }
  }

  Complex, error-prone.              Simple, robust.
  Every function handles its          Errors bubble up to supervisor.
  own errors.                         Supervisor has ONE job: restart.

When to use

  • Long-running services — web servers, message brokers, game servers
  • Worker pools — N workers processing a queue; crashed ones are replaced
  • Unreliable external dependencies — task talks to flaky API; crashes get restarted
  • Fault isolation — one bad request crashes one task, not the whole server

When NOT to use

  • Short-lived programs — CLI tools, scripts (just exit on error)
  • Errors that should propagate — if the whole program should stop on error
  • Debugging — supervisors can mask bugs by restarting endlessly (add restart limits)

Restart limits

Prevent infinite restart loops:

#![allow(unused)]
fn main() {
let mut restart_count = 0;
let max_restarts = 5;
let reset_interval = Duration::from_secs(60);
let mut window_start = Instant::now();

// In the supervisor loop:
if window_start.elapsed() > reset_interval {
    restart_count = 0;
    window_start = Instant::now();
}
restart_count += 1;
if restart_count > max_restarts {
    eprintln!("Too many restarts in {}s. Giving up.", reset_interval.as_secs());
    break;
}
}

Code exercise: Resilient Worker Pool

Build a supervisor that manages a pool of workers:

┌──────────────┐
│  Supervisor  │
│              │
│  max_restart:│
│  5 per 60s   │
└──────┬───────┘
       │
  ┌────┼────┐
  │    │    │
  ▼    ▼    ▼
┌──┐ ┌──┐ ┌──┐
│W1│ │W2│ │W3│  ← workers process jobs from a shared queue
└──┘ └──┘ └──┘

Requirements:

  1. Supervisor spawns 3 workers
  2. Each worker pulls jobs from a shared mpsc channel
  3. Workers randomly “crash” (panic) on some jobs
  4. Supervisor detects the crash and spawns a replacement
  5. Replacement worker connects to the same job channel
  6. After 5 restarts in 60 seconds, supervisor gives up and exits
  7. Remaining jobs are processed by surviving workers

Starter code:

use tokio::task::JoinSet;
use tokio::sync::mpsc;
use std::time::{Duration, Instant};

async fn worker(id: usize, mut jobs: mpsc::Receiver<String>) {
    while let Some(job) = jobs.recv().await {
        // Simulate random crashes
        if rand::random::<f32>() < 0.1 {
            panic!("Worker {id} crashed on job: {job}");
        }
        println!("[Worker {id}] processed: {job}");
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
}

#[tokio::main]
async fn main() {
    // TODO: create job channel, spawn workers, supervise
}

Expected output:

[Worker 0] processed: job-1
[Worker 1] processed: job-2
[Worker 2] processed: job-3
Worker 1 crashed: panicked. Restarting... (restart 1/5)
[Worker 3] processed: job-4
[Worker 0] processed: job-5
...
All jobs processed. Total restarts: 3

Pattern 6: Event Bus / Pub-Sub

Real-life analogy: the radio station

┌──────────────┐
│ Radio Station│  broadcasts on 101.5 FM
│ (publisher)  │
└──────┬───────┘
       │ broadcast signal
  ┌────┼──────────────────┐
  │    │                  │
  ▼    ▼                  ▼
┌────┐ ┌────┐       ┌────┐
│Car │ │Home│       │Gym │  ← anyone tuned to 101.5 hears it
│ 🚗 │ │ 🏠 │       │ 🏋️ │
└────┘ └────┘       └────┘

Publisher sends once.
All subscribers receive.
New subscribers can join anytime.
Publisher doesn't know (or care) who's listening.

The pattern

A publisher emits events. Multiple subscribers receive them. Publisher and subscribers are decoupled — they don’t know about each other.

┌──────────────────────────────────────────────────────────┐
│  Event Bus                                               │
│                                                          │
│  ┌───────────┐     ┌─────────────┐                       │
│  │ Publisher  │────►│  broadcast  │                       │
│  │ (metrics)  │     │  channel    │                       │
│  └───────────┘     └──────┬──────┘                       │
│                           │                              │
│               ┌───────────┼───────────┐                  │
│               │           │           │                  │
│               ▼           ▼           ▼                  │
│         ┌──────────┐ ┌──────────┐ ┌──────────┐          │
│         │Dashboard │ │Logger    │ │Alerter   │          │
│         │subscriber│ │subscriber│ │subscriber│          │
│         │shows live│ │writes to │ │sends     │          │
│         │graphs    │ │disk      │ │Slack msg │          │
│         └──────────┘ └──────────┘ └──────────┘          │
│                                                          │
│  Publisher doesn't know who listens.                     │
│  Subscribers don't know who publishes.                   │
│  New subscribers can join anytime.                       │
└──────────────────────────────────────────────────────────┘

In Rust: tokio::sync::broadcast

#![allow(unused)]
fn main() {
use tokio::sync::broadcast;

let (tx, _) = broadcast::channel(100); // buffer 100 events

// Subscriber 1
let mut rx1 = tx.subscribe();
tokio::spawn(async move {
    while let Ok(event) = rx1.recv().await {
        println!("[dashboard] {event}");
    }
});

// Subscriber 2
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
    while let Ok(event) = rx2.recv().await {
        println!("[logger] {event}");
    }
});

// Publisher
tx.send("user_login".to_string()).unwrap();
tx.send("page_view".to_string()).unwrap();
// Both subscribers receive both events
}

broadcast vs mpsc

mpsc (multi-producer, single consumer):
  10 senders → 1 receiver
  Each message consumed by ONE receiver
  Use for: work queues, actor inboxes

broadcast (single producer, multi consumer):
  1 sender → N receivers
  Each message received by ALL receivers
  Use for: events, notifications, pub-sub

watch (single value, multi reader):
  1 writer → N readers
  Readers always see the LATEST value (not a queue)
  Use for: config changes, "current state" sharing

When to use

  • Event-driven systems — user actions, system events, state changes
  • Real-time updates — live dashboards, notification feeds
  • Microservice communication — services emit events, others react
  • Logging/monitoring — multiple consumers of the same event stream
  • UI frameworks — component A changes → components B, C, D update

When NOT to use

  • Reliable delivery — broadcast drops messages if a subscriber is slow (use mpsc for guaranteed delivery)
  • Point-to-point — if only one consumer should handle each message, use mpsc
  • Large payloads — broadcast clones the message for each subscriber (expensive for big data)

Filtering events

Subscribers often only care about certain event types:

#![allow(unused)]
fn main() {
#[derive(Clone, Debug)]
enum Event {
    UserLogin { user: String },
    PageView { path: String },
    Error { message: String },
}

// Subscriber that only cares about errors:
tokio::spawn(async move {
    while let Ok(event) = rx.recv().await {
        if let Event::Error { message } = event {
            send_slack_alert(&message).await;
        }
    }
});
}

Code exercise: Real-time Dashboard

Build a system where services emit metrics and a dashboard displays them live:

┌───────────┐     ┌───────────┐
│ Web Server│     │ DB Service│
│ emit:     │     │ emit:     │
│ req_count │     │ query_ms  │
│ latency   │     │ conn_count│
└─────┬─────┘     └─────┬─────┘
      │                 │
      └────────┬────────┘
               ▼
         ┌───────────┐
         │ broadcast  │
         │ channel    │
         └─────┬─────┘
               │
     ┌─────────┼─────────┐
     │         │         │
     ▼         ▼         ▼
┌─────────┐ ┌────────┐ ┌────────┐
│Dashboard│ │Logger  │ │Alerter │
│(print   │ │(write  │ │(if     │
│ every   │ │ to file│ │ error  │
│ 1 sec)  │ │ all)   │ │ rate > │
│         │ │        │ │ 10/min)│
└─────────┘ └────────┘ └────────┘

Requirements:

  1. Define a Metric enum: RequestCount, Latency(ms), QueryTime(ms), ErrorCount
  2. Two publisher tasks emit random metrics every 100ms
  3. Dashboard subscriber: prints a summary every second
  4. Logger subscriber: writes every metric to stdout
  5. Alerter subscriber: prints ALERT if error count exceeds threshold

Starter code:

use tokio::sync::broadcast;
use std::time::Duration;

#[derive(Clone, Debug)]
enum Metric {
    Request { path: String, latency_ms: u64 },
    Error { message: String },
    DbQuery { query: String, duration_ms: u64 },
}

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel::<Metric>(256);

    // Publisher: web server metrics
    let tx1 = tx.clone();
    tokio::spawn(async move {
        loop {
            tx1.send(Metric::Request {
                path: "/api/data".into(),
                latency_ms: rand::random::<u64>() % 200,
            }).ok();
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });

    // TODO: more publishers, dashboard subscriber, logger, alerter
}

Expected output:

[logger] Request { path: "/api/data", latency_ms: 42 }
[logger] DbQuery { query: "SELECT *", duration_ms: 12 }
[logger] Error { message: "timeout" }
[ALERT] Error rate: 3/min — threshold exceeded!
[dashboard] === 1s summary ===
  Requests: 10, avg latency: 87ms
  DB queries: 5, avg: 15ms
  Errors: 1

Choosing a Pattern

Decision table

I need to...                                    → Use
─────────────────────────────────────────────────────────────────
Handle many independent clients                 → Task-per-Connection
Manage complex state without locks              → Actor Model
Process data through multiple stages            → Pipeline
Do the same thing to many items in parallel     → Fan-out / Fan-in
Keep services running despite crashes           → Supervisor Tree
Broadcast events to multiple consumers          → Event Bus / Pub-Sub

Choosing by symptom

Problem                              Solution
─────────────────────────────────────────────────────────────────
"I have lock contention"             → Actor (eliminate shared state)
"My pipeline stage is a bottleneck"  → Fan-out (parallelize that stage)
"Tasks crash and the system dies"    → Supervisor (auto-restart)
"I need to notify many components"   → Event Bus (decouple with broadcast)
"Each client needs isolated state"   → Task-per-Connection + Actor
"I process a stream of data"        → Pipeline with bounded channels

Combining patterns

Real systems use multiple patterns together. Here’s a typical web application:

┌──────────────────────────────────────────────────────────────┐
│  Web Application                                             │
│                                                              │
│  TCP Listener (task-per-connection)                          │
│    └── spawn(handle_request) for each HTTP request           │
│                                                              │
│  Request Handler                                             │
│    ├── Reads from Database Actor (actor model)               │
│    ├── Writes to Cache Actor (actor model)                   │
│    └── Emits metrics to Event Bus (pub-sub)                  │
│                                                              │
│  Background Jobs                                             │
│    ├── Job Queue → Workers (fan-out/fan-in)                  │
│    └── Supervised by a restart manager (supervisor tree)     │
│                                                              │
│  Log Pipeline (pipeline)                                     │
│    └── access log → parse → filter → ship to logging service │
│                                                              │
│  Metrics Dashboard (event bus subscriber)                    │
│    └── Receives metrics, displays live graphs                │
└──────────────────────────────────────────────────────────────┘

Pattern comparison

Pattern              Concurrency    State         Communication  Failure
─────────────────────────────────────────────────────────────────────────
Task-per-Connection  per client     per task      shared/channels dies alone
Actor                per entity     per actor     messages        isolated
Pipeline             per stage      per stage     channels        stage stops
Fan-out/Fan-in       per item       none shared   JoinSet         retry item
Supervisor           per worker     per worker    restart signal  auto-restart
Event Bus            per subscriber none shared   broadcast       drops msgs

Anti-patterns

Don’t spawn without joining

#![allow(unused)]
fn main() {
// BAD: fire and forget — leaked task, no error handling
tokio::spawn(async { do_work().await });

// GOOD: track the handle
let handle = tokio::spawn(async { do_work().await });
handle.await??; // propagate errors
}

Don’t use actors for everything

If you have a simple counter, Arc<AtomicU64> is better than an actor. Actors add overhead (channel, task, serialization). Use them when state is complex or when you’d hold a Mutex across .await.

Don’t use unbounded channels in production

#![allow(unused)]
fn main() {
// BAD: unbounded — OOM if consumer is slow
let (tx, rx) = mpsc::unbounded_channel();

// GOOD: bounded — backpressure if consumer is slow
let (tx, rx) = mpsc::channel(100);
}

Don’t block the executor

#![allow(unused)]
fn main() {
// BAD: blocks the worker thread
tokio::spawn(async {
    std::thread::sleep(Duration::from_secs(5)); // BLOCKS!
});

// GOOD: use async sleep or spawn_blocking
tokio::spawn(async {
    tokio::time::sleep(Duration::from_secs(5)).await; // yields
});

// GOOD: for CPU-heavy or blocking I/O
tokio::task::spawn_blocking(|| {
    std::fs::read("big-file.dat") // OK to block here
});
}

Capstone: Distributed Task Queue

Prerequisites: All patterns from Course 5. This project combines every pattern into one system.

What you’re building

A job processing system — like a mini Celery, Sidekiq, or Bull. Clients submit jobs via HTTP, workers process them concurrently, a supervisor keeps workers alive, and a dashboard shows live status.

# Start the system:
cargo run -p async-lessons --bin p5-task-queue

# Submit jobs:
curl -X POST http://127.0.0.1:8080/jobs -d '{"type":"resize","file":"img.png"}'
curl -X POST http://127.0.0.1:8080/jobs -d '{"type":"email","to":"bob@example.com"}'

# Check status:
curl http://127.0.0.1:8080/status
# {"pending": 5, "running": 3, "completed": 42, "failed": 1, "workers": 4}

# Dashboard (in terminal):
# Workers: 4/4 alive
# Queue: 5 pending, 3 running, 42 completed, 1 failed
# Throughput: 14 jobs/sec
# Last failure: "email to bob@example.com — timeout" (2s ago, auto-retried)

Architecture

Every pattern has a role:

┌──────────────────────────────────────────────────────────────┐
│  Distributed Task Queue                                      │
│                                                              │
│  ┌───────────────────────────────────────────────┐           │
│  │  API Server (task-per-connection)              │           │
│  │  Accept HTTP requests → enqueue jobs           │           │
│  │  GET /status → query state actor               │           │
│  └─────────────────┬─────────────────────────────┘           │
│                    │ submit job                               │
│                    ▼                                          │
│  ┌───────────────────────────────────────────────┐           │
│  │  State Manager (actor model)                   │           │
│  │  Owns: job queue, status map, counters         │           │
│  │  No locks — processes messages sequentially     │           │
│  │  Messages: Enqueue, Dequeue, UpdateStatus,     │           │
│  │            GetStats                            │           │
│  └─────────────────┬─────────────────────────────┘           │
│                    │ job ready                                │
│                    ▼                                          │
│  ┌───────────────────────────────────────────────┐           │
│  │  Dispatcher (pipeline)                         │           │
│  │  Reads jobs from state manager                 │           │
│  │  Routes to appropriate worker                  │           │
│  │  Handles retries for failed jobs               │           │
│  └─────────────────┬─────────────────────────────┘           │
│                    │ distribute                               │
│              ┌─────┼─────┐                                   │
│              │     │     │                                   │
│              ▼     ▼     ▼                                   │
│  ┌──────────────────────────────────────────────┐            │
│  │  Workers (fan-out/fan-in)                     │            │
│  │  N worker tasks process jobs concurrently     │            │
│  │  CPU-heavy jobs use spawn_blocking            │            │
│  │  Report results back to state manager         │            │
│  └──────────────────────────────────────────────┘            │
│              │     │     │                                   │
│              │ supervised by                                 │
│              ▼                                               │
│  ┌──────────────────────────────────────────────┐            │
│  │  Supervisor (supervisor tree)                 │            │
│  │  Monitors workers via JoinSet                 │            │
│  │  Restarts crashed workers                     │            │
│  │  Max 5 restarts per minute                    │            │
│  └──────────────────────────────────────────────┘            │
│                                                              │
│  ┌───────────────────────────────────────────────┐           │
│  │  Event Bus (pub-sub)                           │           │
│  │  Events: JobEnqueued, JobStarted,              │           │
│  │          JobCompleted, JobFailed,              │           │
│  │          WorkerCrashed, WorkerRestarted        │           │
│  │                                                │           │
│  │  Subscribers:                                  │           │
│  │    Dashboard → live terminal output            │           │
│  │    Logger → write events to file               │           │
│  │    Alerter → Slack notification on failures    │           │
│  └───────────────────────────────────────────────┘           │
└──────────────────────────────────────────────────────────────┘

Pattern map

Component        Pattern               Why
─────────────────────────────────────────────────────────
API Server       Task-per-Connection   Each HTTP request = one task
State Manager    Actor                 Owns all mutable state, no locks
Dispatcher       Pipeline              Route jobs through stages
Workers          Fan-out/Fan-in        N workers process concurrently
Supervisor       Supervisor Tree       Restart crashed workers
Events           Event Bus             Dashboard, logger, alerter subscribe

Job lifecycle

   Client POSTs job
        │
        ▼
   API task sends Enqueue message to State Actor
        │
        ▼
   State Actor: job status = Pending, pushes to queue
        │
        ▼
   Dispatcher: pulls from queue, assigns to available worker
        │
        ▼
   State Actor: job status = Running
        │
        ▼
   Worker processes the job
        │
   ┌────┴────┐
   │         │
   ▼         ▼
 Success    Failure
   │         │
   ▼         ▼
 State:    State: Failed
 Completed  Dispatcher: retry? (max 3 attempts)
             │
             └── if retries exhausted → Dead Letter Queue

Implementation guide

Step 1: Define the job types

#![allow(unused)]
fn main() {
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct Job {
    id: String,
    job_type: String,
    payload: serde_json::Value,
    attempts: u32,
    max_retries: u32,
}

#[derive(Clone, Debug)]
enum JobStatus {
    Pending,
    Running { worker_id: usize },
    Completed { result: String },
    Failed { error: String, retries_left: u32 },
}
}

Step 2: State Manager Actor

#![allow(unused)]
fn main() {
enum StateMsg {
    Enqueue { job: Job, reply: oneshot::Sender<String> },
    Dequeue { reply: oneshot::Sender<Option<Job>> },
    UpdateStatus { job_id: String, status: JobStatus },
    GetStats { reply: oneshot::Sender<Stats> },
}

async fn state_actor(mut rx: mpsc::Receiver<StateMsg>, event_tx: broadcast::Sender<Event>) {
    let mut queue: VecDeque<Job> = VecDeque::new();
    let mut statuses: HashMap<String, JobStatus> = HashMap::new();

    while let Some(msg) = rx.recv().await {
        match msg {
            StateMsg::Enqueue { job, reply } => {
                let id = job.id.clone();
                statuses.insert(id.clone(), JobStatus::Pending);
                queue.push_back(job);
                event_tx.send(Event::JobEnqueued { id: id.clone() }).ok();
                reply.send(id).ok();
            }
            // TODO: handle other messages
            _ => todo!(),
        }
    }
}
}

Step 3: Worker + Supervisor

#![allow(unused)]
fn main() {
async fn worker(id: usize, state: StateHandle, event_tx: broadcast::Sender<Event>) {
    loop {
        let job = state.dequeue().await;
        match job {
            Some(job) => {
                event_tx.send(Event::JobStarted { id: job.id.clone(), worker: id }).ok();
                match process_job(&job).await {
                    Ok(result) => {
                        state.update_status(job.id, JobStatus::Completed { result }).await;
                    }
                    Err(e) => {
                        state.update_status(job.id, JobStatus::Failed {
                            error: e.to_string(),
                            retries_left: job.max_retries - job.attempts,
                        }).await;
                    }
                }
            }
            None => tokio::time::sleep(Duration::from_millis(100)).await,
        }
    }
}
}

Step 4: API Server

#![allow(unused)]
fn main() {
// Minimal HTTP server (raw TCP, parse HTTP manually)
async fn handle_http(stream: TcpStream, state: StateHandle) {
    // Parse: POST /jobs → enqueue
    // Parse: GET /status → get stats
    // Return JSON responses
}
}

Step 5: Event Bus + Dashboard

#![allow(unused)]
fn main() {
tokio::spawn(async move {
    let mut rx = event_tx.subscribe();
    let mut stats = Stats::default();

    loop {
        tokio::select! {
            event = rx.recv() => {
                match event {
                    Ok(Event::JobCompleted { .. }) => stats.completed += 1,
                    Ok(Event::JobFailed { .. }) => stats.failed += 1,
                    // ...
                }
            }
            _ = tokio::time::sleep(Duration::from_secs(1)) => {
                print_dashboard(&stats);
            }
        }
    }
});
}

Exercises

Exercise 1: Basic queue

Implement State Actor + Workers + Dispatcher. Submit 100 jobs, process them with 4 workers. Print completion count.

Exercise 2: Add supervisor

Workers randomly crash (1% chance). Supervisor restarts them. All 100 jobs should still complete.

Exercise 3: Add HTTP API

Accept jobs via POST /jobs, return status via GET /status. Test with curl.

Exercise 4: Add event bus + dashboard

Broadcast events, subscribe with a dashboard that prints live stats every second.

Exercise 5: Retry with backoff

Failed jobs are retried up to 3 times with exponential backoff (1s, 2s, 4s). After 3 failures → dead letter queue.