Using async/await internally with no internal runtime, but exposing a nonblocking poll API โ€” is this a reasonable design?

โš“ Rust    ๐Ÿ“… 2026-04-23    ๐Ÿ‘ค surdeus    ๐Ÿ‘๏ธ 3      

surdeus

I'm designing a database driver in Rust, and I'm exploring a hybrid design where:

  • async/await is used internally to implement protocol logic
  • but the public API is a libpq-style nonblocking poll interface

So instead of exposing async functions, the API looks more like:

  • start an operation
  • poll for progress
  • if it would block, report whether the caller should wait for read or write readiness
  • poll again
  • eventually return the result

One of my main goals is to expose this library to Python with good async support.

Iโ€™ve looked at approaches that embed a Rust async runtime (for example via pyo3 + tokio) and then expose that as Python async APIs. However, in my benchmark, this did not perform as well as I expected, likely because two runtimes end up interacting.

Because of that, Iโ€™m intentionally trying to avoid embedding or depending on any async runtime inside the library.

Instead, I want:

  • the Rust library to stay runtime-agnostic
  • no internal executor
  • and the outer environment (for example Python's asyncio) to drive IO readiness

This is partly inspired by libpq/psycopg, where libpq's nonblocking API can be integrated into Python async code.

Internally, though, I would still like to use async/await, because writing the protocol as a manual FSM is quite verbose and hard to maintain once there are multiple write/read steps.

So the design I'm considering is:

  • async/await is used purely as an implementation technique
  • futures are stored internally and manually polled
  • no executor is used; futures are polled directly
  • when IO would block, the code records whether it needs read or write readiness
  • the public poll() API returns that information to the caller

Here this example shows the control-flow shape of the design.

use std::cell::Cell;
use std::future::Future;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

// the reason Iโ€™m using Tokioโ€™s traits is to leverage tokio-utilโ€™s Codec.
// I found that tokio-postgres uses Framed and Codec, and they seemed
// like very convenient utilities.
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IoInterest {
    Read,
    Write,
}

#[derive(Debug, Clone)]
pub struct SocketInterest(Rc<Cell<Option<IoInterest>>>);

#[derive(Debug)]
pub struct Socket {
    inner: TcpStream,
    interest: SocketInterest,
}

impl Socket {
    pub fn new(stream: TcpStream) -> io::Result<Self> {
        stream.set_nonblocking(true)?;
        Ok(Self {
            inner: stream,
            interest: SocketInterest(Rc::new(Cell::new(None))),
        })
    }

    pub fn interest(&self) -> SocketInterest {
        self.interest.clone()
    }
}

impl AsyncRead for Socket {
    fn poll_read(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        let this = self.get_mut();
        let unfilled = buf.initialize_unfilled();

        match this.inner.read(unfilled) {
            Ok(0) => Poll::Ready(Ok(())),
            Ok(n) => {
                buf.advance(n);
                Poll::Ready(Ok(()))
            }
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                this.interest.0.set(Some(IoInterest::Read));
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }
}

impl AsyncWrite for Socket {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        let this = self.get_mut();

        match this.inner.write(buf) {
            Ok(n) => Poll::Ready(Ok(n)),
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                this.interest.0.set(Some(IoInterest::Write));
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        let this = self.get_mut();

        match this.inner.flush() {
            Ok(()) => Poll::Ready(Ok(())),
            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                this.interest.0.set(Some(IoInterest::Write));
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        Poll::Ready(self.get_mut().inner.shutdown(std::net::Shutdown::Write))
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientResponse {
    CommandComplete { rows_affected: u64 },
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientPoll {
    Idle,
    Pending(IoInterest),
    Ready(ClientResponse),
}

type ClientFuture =
    Pin<Box<dyn Future<Output = io::Result<(Socket, ClientResponse)>>>>;

enum ClientState {
    Idle(Socket),
    Executing(ClientFuture, SocketInterest),
    Transitioning,
}

pub struct Client {
    state: ClientState,
}

async fn do_execute(mut socket: Socket) -> io::Result<(Socket, ClientResponse)> {
    // toy protocol:
    //
    // client -> server:
    //   [b'Q'][len: u8][query bytes...]
    //
    // server -> client:
    //   [b'C'][rows_affected: u64 big-endian]

    let query = b"SELECT 1";
    let query_len = u8::try_from(query.len())
        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "query too long"))?;

    socket.write_all(&[b'Q', query_len]).await?;
    socket.write_all(query).await?;
    socket.flush().await?;

    let mut tag = [0u8; 1];
    socket.read_exact(&mut tag).await?;
    if tag[0] != b'C' {
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            "unexpected response tag",
        ));
    }

    let mut rows_buf = [0u8; 8];
    socket.read_exact(&mut rows_buf).await?;
    let rows_affected = u64::from_be_bytes(rows_buf);

    Ok((
        socket,
        ClientResponse::CommandComplete { rows_affected },
    ))
}

impl Client {
    pub fn new(socket: Socket) -> Self {
        Self {
            state: ClientState::Idle(socket),
        }
    }

    pub fn start_execute(&mut self) -> io::Result<()> {
        let old_state = std::mem::replace(&mut self.state, ClientState::Transitioning);

        match old_state {
            ClientState::Idle(socket) => {
                let sensor = socket.interest();
                let fut = Box::pin(do_execute(socket));
                self.state = ClientState::Executing(fut, sensor);
                Ok(())
            }
            other => {
                self.state = other;
                Err(io::Error::new(io::ErrorKind::Other, "client is busy"))
            }
        }
    }

    pub fn poll(&mut self) -> io::Result<ClientPoll> {
        let ClientState::Executing(fut, sensor) = &mut self.state else {
            return Ok(ClientPoll::Idle);
        };

        // no executor: the caller explicitly drives progress
        let waker = Waker::noop();
        let mut cx = Context::from_waker(waker);

        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(result) => {
                let (socket, response) = result?;
                self.state = ClientState::Idle(socket);
                Ok(ClientPoll::Ready(response))
            }
            Poll::Pending => {
                let interest = sensor.0.take().unwrap_or(IoInterest::Read);
                Ok(ClientPoll::Pending(interest))
            }
        }
    }
}

My questions are:

  1. Does this seem like a reasonable architecture?
    (async/await internally, nonblocking poll API externally, no runtime)
  2. What kinds of subtle issues might I be missing?
  3. In practice, would it be better to implement the protocol as a manual FSM?
  4. Are there existing projects using a similar approach?
  5. Any advice would be greatly appreciated.

3 posts - 2 participants

Read full topic

๐Ÿท๏ธ Rust_feed