Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Capstone: Distributed Task Queue

Prerequisites: All patterns from Course 5. This project combines every pattern into one system.

What you’re building

A job processing system — like a mini Celery, Sidekiq, or Bull. Clients submit jobs via HTTP, workers process them concurrently, a supervisor keeps workers alive, and a dashboard shows live status.

# Start the system:
cargo run -p async-lessons --bin p5-task-queue

# Submit jobs:
curl -X POST http://127.0.0.1:8080/jobs -d '{"type":"resize","file":"img.png"}'
curl -X POST http://127.0.0.1:8080/jobs -d '{"type":"email","to":"bob@example.com"}'

# Check status:
curl http://127.0.0.1:8080/status
# {"pending": 5, "running": 3, "completed": 42, "failed": 1, "workers": 4}

# Dashboard (in terminal):
# Workers: 4/4 alive
# Queue: 5 pending, 3 running, 42 completed, 1 failed
# Throughput: 14 jobs/sec
# Last failure: "email to bob@example.com — timeout" (2s ago, auto-retried)

Architecture

Every pattern has a role:

┌──────────────────────────────────────────────────────────────┐
│  Distributed Task Queue                                      │
│                                                              │
│  ┌───────────────────────────────────────────────┐           │
│  │  API Server (task-per-connection)              │           │
│  │  Accept HTTP requests → enqueue jobs           │           │
│  │  GET /status → query state actor               │           │
│  └─────────────────┬─────────────────────────────┘           │
│                    │ submit job                               │
│                    ▼                                          │
│  ┌───────────────────────────────────────────────┐           │
│  │  State Manager (actor model)                   │           │
│  │  Owns: job queue, status map, counters         │           │
│  │  No locks — processes messages sequentially     │           │
│  │  Messages: Enqueue, Dequeue, UpdateStatus,     │           │
│  │            GetStats                            │           │
│  └─────────────────┬─────────────────────────────┘           │
│                    │ job ready                                │
│                    ▼                                          │
│  ┌───────────────────────────────────────────────┐           │
│  │  Dispatcher (pipeline)                         │           │
│  │  Reads jobs from state manager                 │           │
│  │  Routes to appropriate worker                  │           │
│  │  Handles retries for failed jobs               │           │
│  └─────────────────┬─────────────────────────────┘           │
│                    │ distribute                               │
│              ┌─────┼─────┐                                   │
│              │     │     │                                   │
│              ▼     ▼     ▼                                   │
│  ┌──────────────────────────────────────────────┐            │
│  │  Workers (fan-out/fan-in)                     │            │
│  │  N worker tasks process jobs concurrently     │            │
│  │  CPU-heavy jobs use spawn_blocking            │            │
│  │  Report results back to state manager         │            │
│  └──────────────────────────────────────────────┘            │
│              │     │     │                                   │
│              │ supervised by                                 │
│              ▼                                               │
│  ┌──────────────────────────────────────────────┐            │
│  │  Supervisor (supervisor tree)                 │            │
│  │  Monitors workers via JoinSet                 │            │
│  │  Restarts crashed workers                     │            │
│  │  Max 5 restarts per minute                    │            │
│  └──────────────────────────────────────────────┘            │
│                                                              │
│  ┌───────────────────────────────────────────────┐           │
│  │  Event Bus (pub-sub)                           │           │
│  │  Events: JobEnqueued, JobStarted,              │           │
│  │          JobCompleted, JobFailed,              │           │
│  │          WorkerCrashed, WorkerRestarted        │           │
│  │                                                │           │
│  │  Subscribers:                                  │           │
│  │    Dashboard → live terminal output            │           │
│  │    Logger → write events to file               │           │
│  │    Alerter → Slack notification on failures    │           │
│  └───────────────────────────────────────────────┘           │
└──────────────────────────────────────────────────────────────┘

Pattern map

Component        Pattern               Why
─────────────────────────────────────────────────────────
API Server       Task-per-Connection   Each HTTP request = one task
State Manager    Actor                 Owns all mutable state, no locks
Dispatcher       Pipeline              Route jobs through stages
Workers          Fan-out/Fan-in        N workers process concurrently
Supervisor       Supervisor Tree       Restart crashed workers
Events           Event Bus             Dashboard, logger, alerter subscribe

Job lifecycle

   Client POSTs job
        │
        ▼
   API task sends Enqueue message to State Actor
        │
        ▼
   State Actor: job status = Pending, pushes to queue
        │
        ▼
   Dispatcher: pulls from queue, assigns to available worker
        │
        ▼
   State Actor: job status = Running
        │
        ▼
   Worker processes the job
        │
   ┌────┴────┐
   │         │
   ▼         ▼
 Success    Failure
   │         │
   ▼         ▼
 State:    State: Failed
 Completed  Dispatcher: retry? (max 3 attempts)
             │
             └── if retries exhausted → Dead Letter Queue

Implementation guide

Step 1: Define the job types

#![allow(unused)]
fn main() {
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct Job {
    id: String,
    job_type: String,
    payload: serde_json::Value,
    attempts: u32,
    max_retries: u32,
}

#[derive(Clone, Debug)]
enum JobStatus {
    Pending,
    Running { worker_id: usize },
    Completed { result: String },
    Failed { error: String, retries_left: u32 },
}
}

Step 2: State Manager Actor

#![allow(unused)]
fn main() {
enum StateMsg {
    Enqueue { job: Job, reply: oneshot::Sender<String> },
    Dequeue { reply: oneshot::Sender<Option<Job>> },
    UpdateStatus { job_id: String, status: JobStatus },
    GetStats { reply: oneshot::Sender<Stats> },
}

async fn state_actor(mut rx: mpsc::Receiver<StateMsg>, event_tx: broadcast::Sender<Event>) {
    let mut queue: VecDeque<Job> = VecDeque::new();
    let mut statuses: HashMap<String, JobStatus> = HashMap::new();

    while let Some(msg) = rx.recv().await {
        match msg {
            StateMsg::Enqueue { job, reply } => {
                let id = job.id.clone();
                statuses.insert(id.clone(), JobStatus::Pending);
                queue.push_back(job);
                event_tx.send(Event::JobEnqueued { id: id.clone() }).ok();
                reply.send(id).ok();
            }
            // TODO: handle other messages
            _ => todo!(),
        }
    }
}
}

Step 3: Worker + Supervisor

#![allow(unused)]
fn main() {
async fn worker(id: usize, state: StateHandle, event_tx: broadcast::Sender<Event>) {
    loop {
        let job = state.dequeue().await;
        match job {
            Some(job) => {
                event_tx.send(Event::JobStarted { id: job.id.clone(), worker: id }).ok();
                match process_job(&job).await {
                    Ok(result) => {
                        state.update_status(job.id, JobStatus::Completed { result }).await;
                    }
                    Err(e) => {
                        state.update_status(job.id, JobStatus::Failed {
                            error: e.to_string(),
                            retries_left: job.max_retries - job.attempts,
                        }).await;
                    }
                }
            }
            None => tokio::time::sleep(Duration::from_millis(100)).await,
        }
    }
}
}

Step 4: API Server

#![allow(unused)]
fn main() {
// Minimal HTTP server (raw TCP, parse HTTP manually)
async fn handle_http(stream: TcpStream, state: StateHandle) {
    // Parse: POST /jobs → enqueue
    // Parse: GET /status → get stats
    // Return JSON responses
}
}

Step 5: Event Bus + Dashboard

#![allow(unused)]
fn main() {
tokio::spawn(async move {
    let mut rx = event_tx.subscribe();
    let mut stats = Stats::default();

    loop {
        tokio::select! {
            event = rx.recv() => {
                match event {
                    Ok(Event::JobCompleted { .. }) => stats.completed += 1,
                    Ok(Event::JobFailed { .. }) => stats.failed += 1,
                    // ...
                }
            }
            _ = tokio::time::sleep(Duration::from_secs(1)) => {
                print_dashboard(&stats);
            }
        }
    }
});
}

Exercises

Exercise 1: Basic queue

Implement State Actor + Workers + Dispatcher. Submit 100 jobs, process them with 4 workers. Print completion count.

Exercise 2: Add supervisor

Workers randomly crash (1% chance). Supervisor restarts them. All 100 jobs should still complete.

Exercise 3: Add HTTP API

Accept jobs via POST /jobs, return status via GET /status. Test with curl.

Exercise 4: Add event bus + dashboard

Broadcast events, subscribe with a dashboard that prints live stats every second.

Exercise 5: Retry with backoff

Failed jobs are retried up to 3 times with exponential backoff (1s, 2s, 4s). After 3 failures → dead letter queue.