Unsafe messaging to running Future

⚓ Rust    📅 2025-08-26    👤 surdeus    👁️ 4      

surdeus

Hi,
I'm currently working on an embedded rust project where I implement a super-server style server (just a never-ending future) that spawns servers that implement the specific protocol.
These sub-servers obviously need to receive the following messages.
I think that I'd need a channel or something like a Arc<Mutex<BinaryHeap<Msg>>> for this in safe rust.

To circumvent the channel and locking mechanisms, I've created the following unsafe code.
Is this actually safe and could something like this be implemented in safe rust?

struct Running {
    queue: BinaryHeap<Msg>,
    task: Box<dyn Future<Output = ()>>,
}

impl Running {
    pub fn new<FN: FnOnce(MessageStream) -> Box<dyn Future<Output = ()>>>(
        task_creation_fn: FN,
    ) -> Pin<Box<Self>> {
        #[allow(invalid_value)]
        //SAEFTY is initialized below
        let mut this = Box::new(unsafe { MaybeUninit::<Self>::uninit().assume_init() });

        this.queue = BinaryHeap::new();
        this.task = task_creation_fn(MessageStream(NonNull::from_mut(&mut this.queue)));

        Box::into_pin(this)
    }
    fn enqueue(self: Pin<&mut Self>, msg: Msg) {
        //SAEFTY nothing is moved out
        let this = unsafe { self.get_unchecked_mut() };
        this.queue.push(msg);
    }
}
impl Future for Running {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        //SAEFTY nothing is moved out
        let task = unsafe { self.map_unchecked_mut(|x| x.task.as_mut()) };
        task.poll(cx)
    }
}

struct MessageStream(NonNull<BinaryHeap<Msg>>);

impl MessageStream {
    fn poll_recv(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Msg> {
        let this = self.get_mut();
        // SAEFTY pointer is valid because the corresponding `Running` is pinned and not dropped.
        let queue = unsafe { this.0.as_mut() };

        match queue.pop() {
            Some(x) => Poll::Ready(x),
            None => Poll::Pending,
        }
    }
}

6 posts - 4 participants

Read full topic

🏷️ Rust_feed