Understanding futures: Make the `async_io::Timer` resettable for `async-std`

⚓ Rust    📅 2025-10-03    👤 surdeus    👁️ 8      

surdeus

We have a SleepProvider containing a sleep() method which just calls async_io::Timer::after(duration) for async-std. In this MR I am trying to implement a new SleepFuture trait with a reset() method allowing us to change the duration of the sleep after it was already constructed.

Because my experience with futures in Rust is limited I wasn't sure about how to do this and made a too complex implementation for async-std.

I did some studying trying to understand futures better and based on this video and inspired by this example I got from reading the Async Rust Book I came up with a new way.

I'd like to know if:

  • The new implementation seems correct.
  • My understanding is on point based on the comments in my code.
  • If it still can be improved.

Thanks in advance!

// Wrapper around `async_io::Timer` to make it resettable.
pub struct ResettableTimer {
    // Actual future that completes after a duration.
    timer: async_io::Timer,

    // The waker for the task that `ResettableTimer` is running on.
    // We can use this after setting `completed = true` to tell
    // `ResettableTimer`'s task to wake up, see that `completed = true`, and
    // move forward.
    waker: Option<std::task::Waker>,

    // Whether or not the timer has elapsed.
    completed: bool,
}

impl ResettableTimer {
    // Create a new `ResettableTimer` that completes after `duration`.
    pub fn new(duration: std::time::Duration) -> Self {
        Self {
            timer: async_io::Timer::after(duration),
            waker: None,
            completed: false,
        }
    }

    // Reset the timer to complete after `duration` from now.
    pub fn reset(&mut self, duration: std::time::Duration) {
        self.timer = async_io::Timer::after(duration);
        self.completed = false;

        // Wake up the task that `ResettableTimer` is running on, if there is one.
        if let Some(waker) = &self.waker {
            waker.wake_by_ref();
        }
    }
}

impl Future for ResettableTimer {
    type Output = ();

    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        // Check if timer is already completed.
        if self.completed {
            return std::task::Poll::Ready(());
        }

        // Poll the underlying future (the async_io timer).
        match std::pin::Pin::new(&mut self.timer).poll(cx) {
            std::task::Poll::Ready(_) => {
                // Mark as ready so other tasks polling `ResettableTimer` can see the completion status.
                self.completed = true;
                std::task::Poll::Ready(())
            }
            std::task::Poll::Pending => {
                // Set waker so that we can wake up the current task
                // when the timer has completed, ensuring that the future is polled
                // again and sees that `completed = true`.
                //
                // We use `will_wake` to avoid cloning the waker unnecessarily.
                if self.waker.as_ref().map_or(true, |old| !old.will_wake(cx.waker())) {
                    self.waker = Some(cx.waker().clone());
                }

                std::task::Poll::Pending
            },
        }
    }
}

// Simple example.
#[async_std::main]
async fn main() {
    let mut timer = ResettableTimer::new(std::time::Duration::from_secs(15));
    timer.reset(std::time::Duration::from_secs(2));
    println!("Timer started...");
    timer.await;
    println!("Timer completed!");
}

/* More complex example demonstrating that multiple tasks can await
 * the same `ResettableTimer` instance, and that resetting the timer
 * affects all tasks. 
 * Important because of: https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3287#note_3267354
 *
 * Due to the Mutex lock task 2 awaits the timer after task 1, but
 * both will complete after 5 seconds because of the reset and the
 * completion status being shared.
 *
#[async_std::main]
async fn main() {
    let timer = ResettableTimer::new(std::time::Duration::from_secs(25));
    let pinned = std::sync::Arc::new(async_std::sync::Mutex::new(Box::pin(timer)));

    let p1 = pinned.clone();
    let f1 = async move {
        println!("Task 1 awaiting sleep...");
        let mut fut = p1.lock().await;
        (&mut *fut).await;
        println!("Task 1 woke up!");
    };

    let p2 = pinned.clone();
    let f2 = async move {
        println!("Task 2 awaiting sleep...");
        let mut fut = p2.lock().await;
        (&mut *fut).await;
        println!("Task 2 woke up!");
    };

    {
        let mut fut = pinned.lock().await;
        std::pin::Pin::new(&mut **fut).reset(std::time::Duration::from_secs(5));
    }

    futures::join!(f1, f2);
}
 */
[package]
name = "resettable-timer-demo"
version = "0.1.0"
edition = "2024"

[dependencies]
async-io = "2.6.0"
async-std = { version = "1.13.2", features = ["attributes"] }
futures = "0.3.31"

1 post - 1 participant

Read full topic

🏷️ Rust_feed