What is the most high performance way to handle more than 5 million Tokio tasks with a time limit, and then drop the ones that don’t finish within that duration?

⚓ Rust    📅 2025-09-22    👤 surdeus    👁️ 8      

surdeus

Warning

This post was published 39 days ago. The information described in this article may have changed.

I’ve got 2 ways to do what’s mentioned in the title, and I’ve been benchmarking them, but I’m not fully sure if my benchmarking approach is correct. Is there anyone here who could add more insights

So I have shared code like this

pub struct Pool {
    tx: flume::Sender<tokio::net::TcpStream>,
    rx: flume::Receiver<tokio::net::TcpStream>,
    i: std::sync::atomic::AtomicUsize,
    host: String,
    req: String,
    max: usize,
}

impl Pool {
    // async fn new ....

 pub async fn get(&self) -> Option<tokio::net::TcpStream> {
        if let Ok(conn) = self.rx.try_recv() {
            Some(conn)
        } else {
            let current = self.i.load(std::sync::atomic::Ordering::Relaxed);
            if current < self.max
                && let Ok(_) = self.i.compare_exchange(
                    current,
                    current + 1,
                    std::sync::atomic::Ordering::AcqRel,
                    std::sync::atomic::Ordering::Relaxed,
                )
            {
                Some(tokio::net::TcpStream::connect(&self.host).await.unwrap())
            } else {
                None
            }
        }
    }

    pub async fn put(&self, conn: tokio::net::TcpStream) {
        let _ = self.tx.send_async(conn).await.unwrap();
    }

Then I will reuse the connection to make http request

1st code is 1 tokio timeout + 5 milions tokio timeouts

tokio::time::timeout(tokio::time::Duration::from_secs(30),
    tokio::spawn(async move {
        loop {
            if http_context_ref
                .count
                .load(std::sync::atomic::Ordering::Relaxed)
                < max_concurrent
            {
                let http_context_ref = http_context_ref.clone();

                tokio::time::timeout(tokio::time::Duration::from_secs(30),
                tokio::spawn(async move {
                    http_context_ref
                        .count
                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);

                    let _permit = http_context_ref.semaphore.acquire().await.unwrap();

                    let request_start = tokio::time::Instant::now();
                    
                    let (cek, conn) = http_context_ref.client.send_get().await;

                    http_context_ref
                        .count
                        .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
                    http_context_ref.notify.notify_one();

                    if let Some(val) = conn {
                        let data = if cek {
                            crate::Data {
                                time: Some(request_start.elapsed()),
                                total_send: 1,
                            }
                        } else {
                            crate::Data {
                                time: None,
                                total_send: 1,
                            }
                        };

                        http_context_ref.client.put(val).await;
                        http_context_ref.s.send(data).unwrap();
                    }
                })
                )
                .await
                .unwrap()
                .unwrap();

            } else {
                http_context_ref.notify.notified().await;
            }
        }

    })
    )
    .await
    .unwrap()
    .unwrap();

2nd code is join set to save the handles, then dropping the join set after reached the deadline

tokio::spawn(async move {

    let mut join_set = tokio::task::JoinSet::new();
    let deadline = start + tokio::time::Duration::from_secs(http_config.max_duration as u64);


        while tokio::time::Instant::now() < deadline {
            if http_context_ref
                .count
                .load(std::sync::atomic::Ordering::Relaxed)
                < max_concurrent
            {
                let http_context_ref = http_context_ref.clone();

                tokio::spawn(async move {
                    http_context_ref
                        .count
                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);

                    let _permit = http_context_ref.semaphore.acquire().await.unwrap();

                    let request_start = tokio::time::Instant::now();
                    
                    let (cek, conn) = http_context_ref.client.send_get().await;

                    http_context_ref
                        .count
                        .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
                    http_context_ref.notify.notify_one();

                    if let Some(val) = conn {
                        let data = if cek {
                            crate::Data {
                                time: Some(request_start.elapsed()),
                                total_send: 1,
                            }
                        } else {
                            crate::Data {
                                time: None,
                                total_send: 1,
                            }
                        };

                        http_context_ref.client.put(val).await;
                        http_context_ref.s.send(data).unwrap();
                    }
                });
                
                if join_set.len() > 100 {
                    while let Some(_) = join_set.try_join_next() {
                        continue;
                    }
                    tokio::task::yield_now().await;
                }

            } else {
                http_context_ref.notify.notified().await;
            }
        }
        
        drop(join_set);

    }).await.unwrap();

My additional question for the second code: does saving 5 million handles consume a lot of RAM and potentially cause resource exhaustion? Which approach is better, draining the join set on every threesolve or not? If I drain it on every threesolve like in this part, will that block a single thread from picking tasks for a noticeable amount of time when the while condition is met?

if join_set.len() > 100 {
    while let Some(_) = join_set.try_join_next() {
        continue;
    }
    tokio::task::yield_now().await;
}

The issue I’m running into if I do not drop the tasks is that when task takes longer than the time limit, the program won’t finish until all tasks are done. I think it might be because the channel isn’t fully closed. When I drop all the tasks after the time limit ends, it doesn’t get stuck anymore. What I want is for the program to finish within the time limit, even if some tasks are still running. Which method is clearly able to spawn and finish the most tasks without exceeding the time limit?

1 post - 1 participant

Read full topic

🏷️ Rust_feed