Using async/await internally with no internal runtime, but exposing a nonblocking poll API โ is this a reasonable design?
โ Rust ๐ 2026-04-23 ๐ค surdeus ๐๏ธ 3I'm designing a database driver in Rust, and I'm exploring a hybrid design where:
- async/await is used internally to implement protocol logic
- but the public API is a libpq-style nonblocking poll interface
So instead of exposing async functions, the API looks more like:
- start an operation
- poll for progress
- if it would block, report whether the caller should wait for read or write readiness
- poll again
- eventually return the result
One of my main goals is to expose this library to Python with good async support.
Iโve looked at approaches that embed a Rust async runtime (for example via pyo3 + tokio) and then expose that as Python async APIs. However, in my benchmark, this did not perform as well as I expected, likely because two runtimes end up interacting.
Because of that, Iโm intentionally trying to avoid embedding or depending on any async runtime inside the library.
Instead, I want:
- the Rust library to stay runtime-agnostic
- no internal executor
- and the outer environment (for example Python's asyncio) to drive IO readiness
This is partly inspired by libpq/psycopg, where libpq's nonblocking API can be integrated into Python async code.
Internally, though, I would still like to use async/await, because writing the protocol as a manual FSM is quite verbose and hard to maintain once there are multiple write/read steps.
So the design I'm considering is:
- async/await is used purely as an implementation technique
- futures are stored internally and manually polled
- no executor is used; futures are polled directly
- when IO would block, the code records whether it needs read or write readiness
- the public poll() API returns that information to the caller
Here this example shows the control-flow shape of the design.
use std::cell::Cell;
use std::future::Future;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
// the reason Iโm using Tokioโs traits is to leverage tokio-utilโs Codec.
// I found that tokio-postgres uses Framed and Codec, and they seemed
// like very convenient utilities.
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IoInterest {
Read,
Write,
}
#[derive(Debug, Clone)]
pub struct SocketInterest(Rc<Cell<Option<IoInterest>>>);
#[derive(Debug)]
pub struct Socket {
inner: TcpStream,
interest: SocketInterest,
}
impl Socket {
pub fn new(stream: TcpStream) -> io::Result<Self> {
stream.set_nonblocking(true)?;
Ok(Self {
inner: stream,
interest: SocketInterest(Rc::new(Cell::new(None))),
})
}
pub fn interest(&self) -> SocketInterest {
self.interest.clone()
}
}
impl AsyncRead for Socket {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
let unfilled = buf.initialize_unfilled();
match this.inner.read(unfilled) {
Ok(0) => Poll::Ready(Ok(())),
Ok(n) => {
buf.advance(n);
Poll::Ready(Ok(()))
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
this.interest.0.set(Some(IoInterest::Read));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
impl AsyncWrite for Socket {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
match this.inner.write(buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
this.interest.0.set(Some(IoInterest::Write));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
match this.inner.flush() {
Ok(()) => Poll::Ready(Ok(())),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
this.interest.0.set(Some(IoInterest::Write));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(self.get_mut().inner.shutdown(std::net::Shutdown::Write))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientResponse {
CommandComplete { rows_affected: u64 },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientPoll {
Idle,
Pending(IoInterest),
Ready(ClientResponse),
}
type ClientFuture =
Pin<Box<dyn Future<Output = io::Result<(Socket, ClientResponse)>>>>;
enum ClientState {
Idle(Socket),
Executing(ClientFuture, SocketInterest),
Transitioning,
}
pub struct Client {
state: ClientState,
}
async fn do_execute(mut socket: Socket) -> io::Result<(Socket, ClientResponse)> {
// toy protocol:
//
// client -> server:
// [b'Q'][len: u8][query bytes...]
//
// server -> client:
// [b'C'][rows_affected: u64 big-endian]
let query = b"SELECT 1";
let query_len = u8::try_from(query.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "query too long"))?;
socket.write_all(&[b'Q', query_len]).await?;
socket.write_all(query).await?;
socket.flush().await?;
let mut tag = [0u8; 1];
socket.read_exact(&mut tag).await?;
if tag[0] != b'C' {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"unexpected response tag",
));
}
let mut rows_buf = [0u8; 8];
socket.read_exact(&mut rows_buf).await?;
let rows_affected = u64::from_be_bytes(rows_buf);
Ok((
socket,
ClientResponse::CommandComplete { rows_affected },
))
}
impl Client {
pub fn new(socket: Socket) -> Self {
Self {
state: ClientState::Idle(socket),
}
}
pub fn start_execute(&mut self) -> io::Result<()> {
let old_state = std::mem::replace(&mut self.state, ClientState::Transitioning);
match old_state {
ClientState::Idle(socket) => {
let sensor = socket.interest();
let fut = Box::pin(do_execute(socket));
self.state = ClientState::Executing(fut, sensor);
Ok(())
}
other => {
self.state = other;
Err(io::Error::new(io::ErrorKind::Other, "client is busy"))
}
}
}
pub fn poll(&mut self) -> io::Result<ClientPoll> {
let ClientState::Executing(fut, sensor) = &mut self.state else {
return Ok(ClientPoll::Idle);
};
// no executor: the caller explicitly drives progress
let waker = Waker::noop();
let mut cx = Context::from_waker(waker);
match fut.as_mut().poll(&mut cx) {
Poll::Ready(result) => {
let (socket, response) = result?;
self.state = ClientState::Idle(socket);
Ok(ClientPoll::Ready(response))
}
Poll::Pending => {
let interest = sensor.0.take().unwrap_or(IoInterest::Read);
Ok(ClientPoll::Pending(interest))
}
}
}
}
My questions are:
- Does this seem like a reasonable architecture?
(async/await internally, nonblocking poll API externally, no runtime) - What kinds of subtle issues might I be missing?
- In practice, would it be better to implement the protocol as a manual FSM?
- Are there existing projects using a similar approach?
- Any advice would be greatly appreciated.
3 posts - 2 participants
๐ท๏ธ Rust_feed