Very confusing async error: async closure is not the same as itself

⚓ Rust    📅 2025-10-28    👤 surdeus    👁️ 5      

surdeus

Warning

This post was published 35 days ago. The information described in this article may have changed.

I would love to provide a playground link, but the following code uses crates not found there I believe:

use std::{
    any::{Any, TypeId},
    hash::{DefaultHasher, Hash, Hasher},
    sync::Arc,
    time::{Duration, Instant},
};

#[tokio::main]
async fn main() {
    myfunc(&GrpcCache::default(), &Client::default()).await;
}

#[derive(Default)]
struct Client {
    // Placeholder
}

impl Client {
    async fn do_query(&self, query: &Query) -> Result<String, tonic::Status> {
        // Placeholder implementation, really this is actually a tonic GRPC client
        Ok(query.value.to_string())
    }
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
struct Query {
    // Placeholder
    value: u32,
}

async fn myfunc(cache: &GrpcCache, client: &Client) {
    let runner = async move |query: &Query| client.do_query(query).await;

    // This works fine:
    //let grpc_response =
    //    runner("dummy_query").await;

    // This doesn't work:
    let grpc_response = cache
        .query_with_cache(
            Box::new(Query { value: 42 }),
            Duration::from_secs(30),
            runner,
        )
        .await;

    todo!()
}

type CacheKey = Box<dyn DynKey>;
type TypeErasedValue = Arc<dyn Any + Send + Sync>;
type CacheValue = (std::time::Duration, Result<TypeErasedValue, tonic::Status>);

/// A cache for queries
#[derive(Debug)]
pub struct GrpcCache {
    /// Generic cache for requests to responses
    cache: moka::future::Cache<CacheKey, CacheValue>,
}

impl Default for GrpcCache {
    fn default() -> Self {
        Self {
            cache: moka::future::Cache::builder()
                .max_capacity(1024)
                .initial_capacity(1024)
                .expire_after(ExpiryPolicy)
                .build(),
        }
    }
}

impl GrpcCache {
    /// Execute a GRPC query with caching.
    ///
    /// The actual resolving function is provided as a FnOnce closure,
    /// which will be called if the response was not cached.
    // Allow trivial casts, casting to dyn Any is trivial but needed to guide the type system.
    pub async fn query_with_cache<Query, F, Response, Fut>(
        &self,
        query: Box<Query>,
        ttl: Duration,
        f: F,
    ) -> std::result::Result<Arc<Response>, tonic::Status>
    where
        Query: DynKey + std::fmt::Debug,
        F: FnOnce(&Query) -> Fut,
        Fut: std::future::Future<Output = std::result::Result<Response, tonic::Status>>,
        Response: Any + Clone + Send + Sync + 'static,
    {
        let query = query as Box<dyn DynKey>;
        let cache_lookup = self.cache.get(&query).await;
        if let Some((_, value)) = cache_lookup {
            match value {
                Ok(response) => match response.downcast::<Response>() {
                    Ok(value) => {
                        return Ok(value);
                    }
                    Err(_) => {
                        return Err(tonic::Status::internal(
                            "Cache hit with invalid unexpected type",
                        ));
                    }
                },
                Err(status) => return Err(status.clone()),
            }
        }

        // Not in cache, run the query
        // Hilariously, since the Box<dyn DynKey> cast above took ownership, we need to
        // cast it back to the known concrete type:
        let query = (query as Box<dyn Any + 'static>)
            .downcast::<Query>()
            .expect("Internal error: we just cast this...");

        let resp = f(&*query).await;
        let resp = resp.map(|r| r);
        let resp = resp.map(|inner| Arc::new(inner));
        let dyn_resp = resp.clone().map(|inner| inner as TypeErasedValue);
        self.cache
            .insert(query as Box<dyn DynKey>, (ttl, dyn_resp))
            .await;
        resp
    }
}

/// A key for a type-erased hashmap or cache.
///
/// Based on https://stackoverflow.com/questions/64838355/how-do-i-create-a-hashmap-with-type-erased-keys but with
/// improvements that are possible after 5 years of Rust development.
pub trait DynKey: Any + Send + Sync {
    fn eq(&self, other: &dyn DynKey) -> bool;
    fn hash(&self) -> u64;
    fn type_name(&self) -> &'static str;
}

impl<T: Eq + Send + Sync + Hash + 'static> DynKey for T {
    fn eq(&self, other: &dyn DynKey) -> bool {
        #[allow(trivial_casts)]
        if let Some(other) = (other as &dyn Any).downcast_ref::<T>() {
            return self == other;
        }
        false
    }

    fn hash(&self) -> u64 {
        let mut h = DefaultHasher::new();
        // mix the typeid of T into the hash to make distinct types
        // provide distinct hashes
        Hash::hash(&(TypeId::of::<T>(), self), &mut h);
        h.finish()
    }

    fn type_name(&self) -> &'static str {
        std::any::type_name::<T>()
    }
}

impl PartialEq for Box<dyn DynKey> {
    fn eq(&self, other: &Self) -> bool {
        DynKey::eq(self.as_ref(), other.as_ref())
    }
}

impl Eq for Box<dyn DynKey> {}

impl Hash for Box<dyn DynKey> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        let key_hash = DynKey::hash(self.as_ref());
        state.write_u64(key_hash);
    }
}

impl std::fmt::Debug for Box<dyn DynKey> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("DynKey")
            .field("type", &self.type_name())
            .finish_non_exhaustive()
    }
}

/// Expiry policy for use with moka
struct ExpiryPolicy;

impl moka::Expiry<CacheKey, CacheValue> for ExpiryPolicy {
    fn expire_after_create(
        &self,
        _key: &CacheKey,
        value: &CacheValue,
        _created_at: Instant,
    ) -> Option<Duration> {
        Some(value.0)
    }
}

Gives the following error:

error[E0308]: mismatched types
  --> src/main.rs:37:25
   |
32 |       let runner = async move |query: &Query| client.do_query(query).await;
   |                                               ----------------------------
   |                                               |
   |                                               the expected `async` closure body
   |                                               the found `async` closure body
...
37 |       let grpc_response = cache
   |  _________________________^
38 | |         .query_with_cache(
39 | |             Box::new(Query { value: 42 }),
40 | |             Duration::from_secs(30),
41 | |             runner,
42 | |         )
   | |_________^ one type is more general than the other
   |
   = note: expected `async` closure body `{async closure body@src/main.rs:32:45: 32:73}`
              found `async` closure body `{async closure body@src/main.rs:32:45: 32:73}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object
note: the lifetime requirement is introduced here
  --> src/main.rs:87:30
   |
87 |         F: FnOnce(&Query) -> Fut,
   |                              ^^^

error[E0308]: mismatched types
  --> src/main.rs:43:10
   |
32 |     let runner = async move |query: &Query| client.do_query(query).await;
   |                                             ----------------------------
   |                                             |
   |                                             the expected `async` closure body
   |                                             the found `async` closure body
...
43 |         .await;
   |          ^^^^^ one type is more general than the other
   |
   = note: expected `async` closure body `{async closure body@src/main.rs:32:45: 32:73}`
              found `async` closure body `{async closure body@src/main.rs:32:45: 32:73}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object
note: the lifetime requirement is introduced here
  --> src/main.rs:87:30
   |
87 |         F: FnOnce(&Query) -> Fut,
   |                              ^^^

For more information about this error, try `rustc --explain E0308`.
warning: `async_reproducer` (bin "async_reproducer") generated 1 warning
error: could not compile `async_reproducer` (bin "async_reproducer") due to 2 previous errors; 1 warning emitted

Very strange that {async closure body@src/main.rs:32:45: 32:73} is not equal to itself!

Use the following Cargo.toml:

[package]
name = "async_reproducer"
version = "0.1.0"
edition = "2024"

[dependencies]
moka = { version = "0.12.11", features = ["future"] }
tokio = { version = "1.48.0", features = ["rt-multi-thread", "macros"] }
tonic = "0.14.2"

I'm very confused by the error, I have tried pinning the runner future using pin! before calling query_with_cache, which doesn't work:

error[E0277]: expected a `FnOnce(&Query)` closure, found `Pin<&mut {async closure@src/main.rs:32:18: 32:44}>`
  --> src/main.rs:40:25
   |
40 |       let grpc_response = cache
   |  _________________________^
41 | |         .query_with_cache(
42 | |             Box::new(Query { value: 42 }),
43 | |             Duration::from_secs(30),
44 | |             runner,
45 | |         )
   | |_________^ expected an `FnOnce(&Query)` closure, found `Pin<&mut {async closure@src/main.rs:32:18: 32:44}>`
   |

If I just call the future directly (commented out in the code) it works fine. It is only when passed to query_with_cache it fails.

I don't understand the trait object suggestion either: I do not want to allocate the future on the heap, and I want inlining here (as the generics will make query_with_cache monomorphised anyway, I might as well get the advantage of inlining the future). I suspect it is just a bogus suggestion.

Any help is highly appreciated!

4 posts - 2 participants

Read full topic

🏷️ Rust_feed