Using tokio channel permits in a tower service

⚓ Rust    📅 2026-04-22    👤 surdeus    👁️ 2      

surdeus

Hi,

I am designing a tower service that sends requests through a tokio mpsc channel and I am trying my best not to store boxed futures to avoid the extra heap allocation. I came to the idea of using permits with the poll_ready function for backpressure. This is the design I came up with

use std::pin::Pin;
use std::task::{Context, Poll};
use hyper::{Request, Response};
use tokio::sync::mpsc::{channel, OwnedPermit, Receiver, Sender};
use tower::Service;
use crate::php::Job;

#[derive(Debug)]
struct PhpService {
    sender: Sender<Job>,
    permit: Option<OwnedPermit<Job>>
}

impl PhpService {
    pub fn new(sender: Sender<Job>) -> Self {
        Self {
            sender,
            permit: None
        }
    }
}

impl Service<Request<()>> for PhpService {
    type Response = Response<String>;
    type Error = ();
    type Future = PhpFuture;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let fut = self.sender.clone().reserve_owned();

        tokio::pin!(fut);

        match fut.poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(Err(_)) => Poll::Ready(Err(())),
            Poll::Ready(Ok(permit)) => {
                self.permit = Some(permit);

                Poll::Ready(Ok(()))
            }
        }
    }

    fn call(&mut self, req: Request<()>) -> Self::Future {
        let permit = self.permit.take().expect("poll_ready not called");

        let (tx_resp, rx_resp) = channel::<Self::Response>(10);
        let job = Job {
            request: req,
            respond_to: tx_resp
        };

        permit.send(job);

        PhpFuture {
            receiver: rx_resp,
        }

    }
}

struct PhpFuture {
    receiver: Receiver<Response<String>>,
}

impl Future for PhpFuture {
    type Output = Result<Response<String>, ()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let fut = self.receiver.recv();

        tokio::pin!(fut);

        fut.poll(cx).map(|o| o.ok_or(()))
    }
}

My question would be if it is okay that every call to PhpService::poll_ready (or PhpFuture::poll for that matter) creates a new future. Do these futures hold some kind of state and need to be stored? and if so, is there any way around Box::pin?

Many thanks in advance.

2 posts - 2 participants

Read full topic

🏷️ Rust_feed