Lesson 26: Streams — Async Iteration, StreamExt, Backpressure
What you’ll learn
- The
Streamtrait as the async equivalent ofIterator - Useful combinators from
StreamExtandTryStreamExt - Creating streams from channels, iterators, and async generators
- Backpressure considerations with streams
Key concepts
The Stream trait
#![allow(unused)]
fn main() {
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
}
Like Iterator, but poll_next can return Pending.
StreamExt combinators
#![allow(unused)]
fn main() {
use tokio_stream::StreamExt;
let mut stream = tokio_stream::iter(vec![1, 2, 3])
.map(|x| x * 2)
.filter(|x| *x > 2)
.take(5);
while let Some(item) = stream.next().await {
println!("{item}");
}
}
Key combinators: map, filter, take, merge, chain, throttle, chunks, timeout.
Creating streams
tokio_stream::iter()— from an iteratorReceiverStream::new(rx)— from anmpsc::Receiverasync_stream::stream!— from an async block withyieldBroadcastStream— from abroadcast::Receiver
Backpressure in streams
Streams are pull-based: the consumer calls next().await, so the producer only runs when demanded. This provides natural backpressure. Combine with buffer_unordered(n) to control concurrency:
#![allow(unused)]
fn main() {
stream
.map(|url| async move { fetch(url).await })
.buffer_unordered(10) // at most 10 concurrent fetches
}
Exercises
- Convert an
mpsc::Receiverinto a stream and process items withStreamExt::map - Use
buffer_unorderedto fetch 100 URLs with max 10 concurrent requests - Implement a custom
Streamthat yields Fibonacci numbers with a delay - Use
stream.chunks(10)to batch database inserts - Merge two streams and process items in arrival order