Lifetime bounds while passing mutable references across async boundaries

⚓ Rust    📅 2025-10-21    👤 surdeus    👁️ 7      

surdeus

I am trying to write a util function to do a retry until success loop on an api call on a mutable reference

// Helper trait to express the HRTB requirement for the work closure.
pub trait WorkFn<'a, T, ResT>: Send + 'static {
    type Future: Future<Output = Result<ResT, tonic::Status>> + Send + 'a;
    fn call_mut(&mut self, t: &'a mut T) -> Self::Future;
}

// Blanket implementation to automatically satisfy WorkFn for any closure F
impl<'a, T, ResT, F, Fut> WorkFn<'a, T, ResT> for F
where
    F: FnMut(&'a mut T) -> Fut + Send + 'static,
    // The Future must live at least as long as the short-lived mutable reference.
    Fut: Future<Output = Result<ResT, tonic::Status>> + Send + 'a,
    T: 'a,
    ResT: 'a,
{
    type Future = Fut;
    fn call_mut(&mut self, t: &'a mut T) -> Self::Future {
        self(t)
    }
}

async fn async_mut_retry<T, F, S, R, ResT>(
    // 1. Mutable reference to the resource (e.g., &mut Client)
    resource: &mut T,
    // 2. The shutdown signal
    shutdown_notify: Arc<Notify>,
    // 3. The retry strategy (an iterator of Durations)
    mut strategy: S,
    // 4. The work to be done (takes &mut T)
    mut work: F,
    // 5. The custom retry logic (takes the error)
    mut is_retryable: R,
) -> Result<ResT, tonic::Status>
where
    F: for<'a> WorkFn<'a, T, ResT>, // Work closure
    ResT: Send + Sized,               // Success type
    S: Iterator<Item = Duration>,     // Strategy iterator
    R: FnMut(&tonic::Status) -> bool, // Retry logic closure
    T: Send + 'static,
    ResT: Send + 'static,
{
    loop {
        // --- 1. NON-BLOCKING SHUTDOWN CHECK BEFORE WORK ---
        tokio::select! {
            _ = shutdown_notify.notified() => {
                return Err(tonic::Status::cancelled("Shutdown requested before attempt"));
            }
            _ = tokio::task::yield_now() => {}
        }

        // --- 2. PERFORM THE WORK ---
        let result = work.call_mut(resource).await;

        match result {
            // Success
            Ok(output) => return Ok(output),

            // Failure: Check custom retry logic
            Err(e) => {
                if is_retryable(&e) {
                    // Get the next delay
                    let delay = match strategy.next() {
                        Some(d) => d,
                        None => {
                            // Strategy exhausted
                            return Err(e);
                        }
                    };

                    println!("Retryable error: {}. Retrying in {:?}...", e, delay);

                    // --- 3. AWAIT BACKOFF OR SHUTDOWN ---
                    tokio::select! {
                        _ = tokio::time::sleep(delay) => {
                            continue; // Delay finished, loop continues
                        }
                        _ = shutdown_notify.notified() => {
                            // Shutdown arrived during delay
                            return Err(tonic::Status::cancelled("Shutdown requested during backoff"));
                        }
                    }
                } else {
                    // Non-retryable error
                    return Err(e);
                }
            }
        }
    }
}

I call this util with a client like this

        let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(INITIAL_RETRY_DELAY_MS)
            .max_delay(Duration::from_millis(MAX_RETRY_DELAY_MS));

        let result = async_mut_retry(
            &mut client,
            shutdown_notify.clone(),
            retry_strategy,
            |client: &mut TonicGeneratedClient<u64>| {
//               let mut client_clone = client.clone();
//               async move {
//                    let request = TonicApiRequest {};
//                    client_clone.api_call(request).await
//               }
                 let request = TonicApiRequest {};
                 client.api_call(request)
            },
            |e| is_recoverable_error(e),
        )
        .await;

My generated tonic code is roughly like this

        pub async fn api_call(
            &mut self,
            request: impl tonic::IntoRequest<super::TonicApiRequest>,
        ) -> std::result::Result<
            tonic::Response<tonic::codec::Streaming<super::TonicApiResponse>>,
            tonic::Status,
        > {
            self.inner
                .ready()
                .await
                .map_err(|e| {
                    tonic::Status::unknown(
                        format!("Service was not ready: {}", e.into()),
                    )
                })?;
            let codec = tonic_prost::ProstCodec::default();
            let path = http::uri::PathAndQuery::from_static(
                "/dummy.DummyService/endpoint",
            );
            let mut req = request.into_request();
            req.extensions_mut()
                .insert(GrpcMethod::new("dummy.DummyService", "Endpoint"));
            self.inner.server_streaming(req, path, codec).await
        }

The current version of the call does not work and has lifetime errors as follows (on the client.api_call line)

  • lifetime may not live long enough: returning this value requires that '1 must outlive '2
  • let's call the lifetime of this reference '1
  • return type of closure impl futures::Future<Output = Result<tonic::Response<Streaming<TonicApiResponse>>, Status>> contains a lifetime '2

However, the commented code works. I understand that it works because the clone creates an owned version and the lifetime of the return value is no longer tied to the passed mutable reference . I am trying to not clone the client if possible. Arc<> does not work since the api_call method needs &mut self. Any suggestions on how to fix this ? I do not understand Rust lifetime parameters and bounds very well but I am thinking there is a fix somehow if I specify correct bounds on the helper api method.

The code also works if I inline the entire function at the call site

5 posts - 3 participants

Read full topic

🏷️ Rust_feed