Confused about Send, Arc, and Mutex when passing dyn trait function parameters

⚓ Rust    📅 2026-06-10    👤 surdeus    👁️ 1      

surdeus

For a while now I've been able to skirt the intricacies of multithread programming in Rust by simply moving data into Tokio tasks as they spawn and/or letting the compiler auto-implement Send (presumably). Today, however, I hit a snag while trying to pass a boxed dyn trait to an async API in the context of a Tokio spawned thread that requires its futures to be Send.

error: future cannot be sent between threads safely
   --> src\main.rs:153:11
    |
153 |     tasks.spawn(async move {
    |           ^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src\main.rs:153:17: 153:27}`, the trait `Send` is not implemented for `NonNull<rclite::rc::RcInner<RefCell<Subscribers<Box<dyn DynObserver<f64, Infallible>>>>>>`
note: future is not `Send` as this value is used across an await
   --> src\main.rs:188:10
    |
159 |         let task_progess_subscription = task_progress_observer.clone().subscribe(move |v: f64| {
    |             ------------------------- has type `SubjectSubscription<MutRc<Subscribers<Box<dyn DynObserver<f64, Infallible>>>>, LocalScheduler, CellRc<SubscriptionState>>` which is not `Send`
...
188 |         .await
    |          ^^^^^ await occurs here, with `task_progess_subscription` maybe used later
note: required by a bound in `JoinSet::<T>::spawn`
   --> ...tokio-1.48.0\src\task\join_set.rs:145:12
    |
142 |     pub fn spawn<F>(&mut self, task: F) -> AbortHandle
    |            ----- required by a bound in this associated function
...
145 |         F: Send + 'static,
    |            ^^^^ required by this bound in `JoinSet::<T>::spawn`

The idea is to pass a payload to a remote address alongside an rxrust Subject which will act as an Observer and receive Observer::next() calls with percentage progress updates, but even though everything is local to the task and inside the async move block, I get the error above. Here's the first iteration of the code:

use rxrust::prelude::*;
use tokio::task;
...
pub type ProgressPercentageObserver = Box<dyn Observer<f64, std::convert::Infallible>>;
...
let mut tasks = task::JoinSet::new();
tasks.spawn(async move {
    let addr = String::from("test_ip");
    let payload = Vec::new();
    let task_progress_observer = Local::subject();
    let subscription_addr = addr.clone();
    let task_progess_subscription = task_progress_observer.clone().subscribe(move |v: f64| {
        tracing::info!("Progress for {}: {:.2}", subscription_addr, v);
    });
    let boxed_task_progress_observer: ProgressPercentageObserver =
        Box::new(task_progress_observer);

    let res = match upload(
        &addr,
        &payload,
        &mut Some(boxed_task_progress_observer),
    )
    .await
    {
        Ok(()) => Ok(format!(
            "Successfully uploaded {} bytes to address {}.",
            &payload.len(),
            &addr
        )),
        Err(e) => anyhow::bail!(e),
    };
    task_progess_subscription.unsubscribe();
    res
});

I figured I didn't need to use the rxrust Shared context because everything was essentially local to the current task, wherever said task might be running. I tried using Shared::subject() instead of Local::subject() in the above just in case it helped, and got the same error anyway, which is really puzzling. Evidently the issue is about my ProgressPercentageObserver type, which is really a boxed dyn Observer trait that expects f64 events. I tried just adding + Send to its declaration as Box<dyn Observer<f64, std::convert::Infallible> + Send>, but that didn't seem to change anything on its own.

Meanwhile, if I use a bespoke callback handler instead of an rxrust dynamically implemented Observer, everything seems fine:

use tokio::task;
...
pub trait ProgressHandling {
    fn handle_progress_event();
}
#[derive(Default)]
pub struct ProgressHandler {}
impl ProgressHandling for ProgressHandler {
    fn handle_progress_event(addr: &String, progress: f64) {
       tracing::info!("Progress for {}: {:.2}", addr, progress);
    }
}
let progress = ProgressHandler::default();
...
let mut tasks = task::JoinSet::new();
tasks.spawn(async move {
    let addr = String::from("test_ip");
    let payload = Vec::new();

    let res = match upload(
        &addr,
        &payload,
        &mut Some(progress),
    )
    .await
    {
        Ok(()) => Ok(format!(
            "Successfully uploaded {} bytes to address {}.",
            &payload.len(),
            &addr
        )),
        Err(e) => anyhow::bail!(e),
    };
    res
});

This approach is apparently fine, with the modified async upload() expecting an Option<impl ProgressHandling>. I also found that the compiler seems happy with a ProgressPercentageObserver that specifies + Send AND is actually used in the rxrust Shared context; either of those modifications without the other frustratingly results in the same error from above. Finally, wrapping the ProgressPercentageObserver in a Mutex wrapped in an Arc and modifying the upload() API to expect a mutable reference to the underlying ProgressPercentageObserver (to avoid attempts to move the underlying Box out of the Mutex, which it doesn't allow) also seems to work without having to modify the ProgressPercentageObserver type declaration or using rxrust Shared context.

So after that journey, I'm left with several questions:

  1. What does the compiler actually mean by note: future is not Send as this value is used across an await? Is the across an await part referring to the fact that the async move block could theoretically access the ProgressPercentageObserver again after upload().await? Why isn't that a problem when ProgressPercentageObserver is Send via + Send in its declaration plus using Shared::subject(), or Send implicitly via Tokio Mutex? It still could theoretically be used after the await.
  2. Why is Tokio spawn() happy with String and Vector and impl trait, but not dyn trait? Does it have something to do with limitations on the compiler auto-generating Send for dyn traits?
  3. Why does adding + Send to the dyn trait declaration combined with using rxrust's Shared context satisfy the Sendable future constraint? Why does locking the dyn trait behind a Mutex satisfy the Sendable future constaint? Why can't the future be Send implicitly simply because all its data is effectively thread and task local because it all lives inside the spawned task's async move block?
  4. What is the conceptual difference between Arc and Mutex with respect to Send? I think the main difference comes down to lifetime (made thread-safe by Arc) vs. access (made thread-safe by Mutex) and to be fully thread safe you need both?
  5. In terms of API design, is it preferred to enforce that the dyn trait parameter of a library function is Send via + Send in its declaration so that it can be used in multithreaded contexts or to leave the responsibility of making the dyn trait Sendable when necessary up to the calling client? I'm leaning towards the latter since nothing about the library function implies multithread vs. single thread usage, but the former has slightly cleaner syntax.

1 post - 1 participant

Read full topic

🏷️ Rust_feed