Lifetime Inference Issue around Async and HRTB

⚓ Rust    📅 2025-12-04    👤 surdeus    👁️ 2      

surdeus

I want to explain my overall problem domain a bit before giving the failing code example to avoid an xy problem.


I am working on a web application that uses async-graphql as a frontend interface and the official mongodb-rust-driver as a database interface.

My application has quite strong consistency guarantees, i.e. a user must always "read their writes". To that end, I implemented a cache for the mongodb::ClientSession indexed by the users. The session must not be used in parallel, so I needed to wrap it as

pub struct Session(Arc<tokio::sync::Mutex<mongodb::ClientSession>>);

for synchronization. The tokio::sync::Mutex is needed, since the mongodb API forces me to hold the &mut ClientSession, and thus the lock, across .await points.

I've read that you should avoid the n+1 query problem in async-graphql via dataloaders, so I implemented them. In my first implementation, when a GraphQL request comes in, a dataloader is created and it holds a Session. In its Loader implementation, it would acquire a lock from the Session and use it for the database request.


Now for the problem in the application domain: I have noticed quite slow queries. I investigated and found out, that async-graphql batches the calls to the Loader::load method. That way, instead of one dataloader task per GraphQL query depth, multiple tasks try to acquire the lock and cause up to 5 times the lock contention than without batching.

To avoid this contention, a chat bot recommended to me to use the actor pattern, which I tried. I implemented an actor task that gets spawned when the dataloader gets created. Instead of the Session, the dataloader holds the Sender end of a channel. The Receiver end is held by the actor task together with the Session. When a dataloader gets invoked, it sends a "future" to the actor via the channel, and the actor can lock the Session once and execute multiple "futures" with out constantly locking and unlocking, reducing contention in my hopes.

The problem with this approach comes with these "futures" that the actor should act on. They are really closures that consume a &mut ClientSession and return a future. The type is this:

type LoadFutureFactory = Box<
    dyn for<'session> FnOnce(
            &'session mut mongodb::ClientSession,
        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'session>>
        + Send,
>;

I am pretty sure that is conceptually the correct type to send through the channel. But I cannot seem to actually construct this type without the compiler complaining about the lifetimes.

I get this error:

error: lifetime may not live long enough
  --> src/main.rs:85:13
   |
76 |         let factory = move |session: &mut mongodb::ClientSession| {
   |                                      -                          - return type of closure is Pin<Box<(dyn Future<Output = ()> + Send + '2)>>
   |                                      |
   |                                      let's call the lifetime of this reference `'1`
...
85 |             fut
   |             ^^^ returning this value requires that `'1` must outlive `'2`

The weird part is, I agree with the error message:

  • The reference to the ClientSession must indeed outlive the returned future. That is what I tried to express with the + 'session trait bound in the LoadFutureFactory definition.

What I don't understand is why the compiler seems to give the future another livetime. Unfortunately, I cannot specify the type of the factory closure in which I can express that the ClientSession and dyn Future are linked in lifetime.

Here is the whole code. You will need to add tokio and mongodb as dependencies. Unfortunately, mongodb is not among the 100 most downloaded crates, so the Rust Playground does not support it. Maybe I'll post a stripped down version of this in the comments without the dependencies but with the same errors. As I said, I wanted to include the original design as much as possible to avoid xy problems.

use std::collections::HashMap;
use std::io::Error;
use std::ops::DerefMut;
use std::{pin::Pin, sync::Arc};

pub struct Session(Arc<tokio::sync::Mutex<mongodb::ClientSession>>);

impl Session {
    async fn lock(&self) -> tokio::sync::MutexGuard<'_, mongodb::ClientSession> {
        self.0.lock().await
    }
}

pub struct LoadFutureSender {
    sender: tokio::sync::mpsc::UnboundedSender<LoadFutureFactory>,
}

type LoadFutureFactory = Box<
    dyn for<'session> FnOnce(
            &'session mut mongodb::ClientSession,
        ) -> Pin<Box<dyn Future<Output = ()> + Send + 'session>>
        + Send,
>;

impl LoadFutureSender {
    pub fn new(session: Session) -> Self {
        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<LoadFutureFactory>();
        let load_future_receiver_task = async move {
            let mut load_tasks = Vec::new();
            while let Some(load_task) = receiver.recv().await {
                let mut session_lock = session.lock().await;
                let future = load_task(session_lock.deref_mut());
                future.await;
                // Keep the lock and continue executing futures, if any exist
                if receiver.is_empty() {
                    drop(session_lock);
                    continue;
                }
                while !receiver.is_empty() {
                    receiver.recv_many(&mut load_tasks, 100).await;
                    while let Some(load_task) = load_tasks.pop() {
                        let future = load_task(session_lock.deref_mut());
                        future.await;
                    }
                }
                drop(session_lock);
            }
        };
        tokio::task::spawn(load_future_receiver_task);
        Self { sender }
    }

    pub fn send(&self, factory: LoadFutureFactory) {
        self.sender
            .send(factory)
            .expect("the receiver lives as long as self")
    }
}

#[derive(Clone, Copy, Debug)]
struct Id(i64);

impl From<Id> for mongodb::bson::Bson {
    fn from(Id(id): Id) -> Self {
        mongodb::bson::Bson::Int64(id)
    }
}

struct DbObjectLoader(LoadFutureSender);

impl DbObjectLoader {
    /// This is roughly the api of the dataloader trait of async_graphql
    async fn load(&self, keys: &[Id]) -> Result<HashMap<Id, String>, Error> {
        let (sender, receiver) = tokio::sync::oneshot::channel();
        let query = mongodb::bson::doc! { "$match": { "_id": keys }};
        let factory = move |session: &mut mongodb::ClientSession| {
            let future = async move {
                // Go to mongodb, use the session and fetch something
                let _query = query;
                let _session = session;
                let result = HashMap::new();
                sender.send(Ok(result));
            };
            let fut: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(future);
            fut
        };
        self.0.send(Box::new(factory));
        receiver.await.expect(
            "the receiver will eventually yield the hashmap, as the sender is in the future that is executed, and the executor is the LoadFutureReceiver task that lives as long as the LoadFutureSender is alive, i.e. as long as self",
        )
    }
}

fn main() {
    println!("Hello, world!");
}

4 posts - 3 participants

Read full topic

🏷️ Rust_feed