StreamExt::buffered

⚓ Rust    📅 2025-08-15    👤 surdeus    👁️ 5      

surdeus

Info

This post is auto-generated from RSS feed The Rust Programming Language Forum - Latest topics. Source: StreamExt::buffered

I have a Stream of values representing work items. Each item has an id. The ids aren't sequential, but they always arrive in order (I might get item 36, then 37, then 483, then 991, ...).

I want to do these concurrently, and always be able to answer the question "what's the maximum id X such that item X and all preceding items are done?"

I've got some code using StreamExt::buffered in the futures crate. It's a way of achieving concurrency, but ... I just found out the number of futures you pass to .buffered() is not the number of futures you can expect to have running concurrently, because finished futures count against the limit!

If your futures finish strictly in the order they are inserted, you'll get n running at all times.

If the latency instead follows a power law, so that futures have a "half-life", about half of the pending promises at any given time will be finished, so you'll only get n/2 concurrency.

If you get unlucky, and job 0 of n takes much longer to finish than others, you'll have no concurrency at all while it runs and the other futures are all finished.

Has anyone run into this? Is there a better tool for what I'm doing?

4 posts - 3 participants

Read full topic

🏷️ Rust_feed