UnixStream blocking on read even when messages present in socket

⚓ rust    📅 2025-05-05    👤 surdeus    👁️ 7      

surdeus

Warning

This post was published 60 days ago. The information described in this article may have changed.

I am trying to create an async runtime. Currently trying to test and see if reactor is working correctly. This is the test code:

    use std::{io::{Read, Write}, os::unix::net::UnixStream, pin::Pin, task::{Context, Poll}, thread};

    use crate::lib::reactor::IoEventType;

    use super::{Executor, Task};

    enum State {
        Step1,
        Step2,
        Step3,
        Step4,
        Done,
    }

    // TODO: Enforce that a compatible future must hold a key
    pub struct MultiStep {
        fd: UnixStream,
        state: State,
        key: Option<usize>
    }

    impl MultiStep {
        pub fn new(fd: UnixStream) -> Self {
            Self {
                fd,
                state: State::Step1,
                key: None
            }
        }
    }

    impl Future for MultiStep {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            use State::*;

            match self.state {
                Step1 => {
                    println!("Step 1 in progress...");
                    self.state = Step2;
                    cx.waker().wake_by_ref();  // Simulate readiness for next step
                    Poll::Pending
                }
                Step2 => {
                    println!("Step 2 in progress...");
                    self.state = Step3;
                    cx.waker().wake_by_ref();  // Simulate readiness again
                    Poll::Pending
                }
                Step3 => {
                    println!("Step 3 in progress...");
                    self.state = Step4;
                    let task: *const Task = cx.waker().data().cast();
                    if !task.is_aligned(){
                        panic!("Task is not aligned");
                    }

                    // Register Read event in reactor
                    unsafe {
                        let executor = (*task).executor.clone();
                        self.key = Some(executor.reactor.register(
                            &self.fd,
                            IoEventType::Readable,
                            cx.waker().clone()
                        ).unwrap());
                    }

                    Poll::Pending
                }
                Step4 => {
                    println!("Step 4 in progress...");
                    // Simulate a read operation
                    let mut buf = [0; 1024];
                    self.fd.read_exact(&mut buf).unwrap();

                    println!("Read data: {}", String::from_utf8_lossy(&buf));
                    self.state = Done;

                    let task: *const Task = cx.waker().data().cast();
                    if !task.is_aligned(){
                        panic!("Task is not aligned");
                    }
                    // Unregister the event
                    unsafe {
                        let executor = (*task).executor.clone();
                        executor.reactor.unregister(
                            self.key.unwrap(),
                            &self.fd
                        ).unwrap();
                    }

                    cx.waker().wake_by_ref();  // Simulate readiness again
                    Poll::Pending
                }
                Done => {
                    println!("All steps complete!");
                    Poll::Ready(())
                }
            }
        }
    }

    #[test]
    fn test_executor() {
        // Create a UnixStream for testing
        let (mut sender, receiver) = UnixStream::pair().unwrap();

        println!("Sender: {:?}", sender.local_addr().unwrap());
        println!("Receiver: {:?}", receiver.local_addr().unwrap());
        
        let future = Box::pin(MultiStep::new(receiver));
        let handle = thread::spawn(|| {
            Executor::init(future);
        });

        thread::sleep(std::time::Duration::from_secs(1));
        sender.write_all(b"Hello, world!").unwrap();
        println!("Data sent!");
        handle.join().unwrap();
    } 

when running the test with cargo test -- --nocapture, I am getting output till "Step 4 is in progress" but it is blocking indefinitely and not reading data at self.fd.read_exact(...). This is the wait and wake up method in the reactor:

    pub fn wait_and_wake(&self) -> Result<(), std::io::Error> {
        let mut events = Events::new();
        // TODO: Need to look into if timeout is required
        self.poller.wait(&mut events, None)?;

        let mut event_map = self.event_map.lock().unwrap();
        for event in events.iter() {
            if let Some(waker) = event_map.get(&event.key) {
                waker.wake_by_ref();
                event_map.remove(&event.key);
            }
        }

        Ok(())
    }

I have tried sending data multiple times in the test function by just duplicating the line

sender.write_all(b"Hello, world!").unwrap();

multiple times so that it executes after we enter the step 4 block. Yet, it is still blocked indefinitely. What is causing this issue and how do I fix it?

link to repo: GitHub - MercMayhem/rust-async-runtime at testing

3 posts - 3 participants

Read full topic

🏷️ rust_feed