How to handle HTTP cancellation and cleanup in hyper

โš“ Rust    ๐Ÿ“… 2025-09-01    ๐Ÿ‘ค surdeus    ๐Ÿ‘๏ธ 3      

surdeus

Hi,

Iโ€™m building a web server based on hyper, and some of my API endpoints need to run a long loop. Sometimes the client may cancel the request, and in that case Iโ€™d like to perform some cleanup after the request is fully dropped.

The problem is: Iโ€™m not sure how hyper handles HTTP request cancellation, or how I can hook into it from my code.

Hereโ€™s a simplified version of my setup:

// api.rs
pub trait Action: Send + Sync {
    fn execute(&self, req: Req) -> impl Future<Output = ServerResult<Response<BoxBody>>> + Send;

    /// default requires authentication
    fn needs_auth(&self) -> bool {
        true
    }

    /// default not cancellable
    // TODO: handle request cancellation
    fn cancellable(&self) -> bool {
        false
    }

    fn auth(&self, req: &Req) -> ServerResult<()> {
        if let Some(auth_header) = req.headers().get(header::AUTHORIZATION)
            && !auth_header.is_empty()
            && auth::authorize(auth_header.to_str().unwrap_or_default())
        {
            return Ok(());
        }
        Err(ServerError { kind: ServerErrorKind::Unauthorized })
    }
}

And the server side:

// server.rs
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>;
type ServerResult<T> = std::result::Result<T, Infallible>;
type Req = Request<Incoming>;

macro_rules! define_routes {
    ($($path: literal => $action_type: ty),* $(,)?) => {
        async fn route(req: Req, socket_addr: SocketAddr) -> ServerResult<Response<BoxBody>> {
            if Config::read(|c| c.is_only_inner_ip && !is_inner_ip(socket_addr)).await {
                return Ok(ResultResponse::error_with_code(StatusCode::FORBIDDEN));
            }
            match req.uri().path() {
                $(
                    $path => {
                        let action = <$action_type>::default();
                        if action.needs_auth() {
                            if let Err(e) = action.auth(&req) {
                                return handle(e);
                            }
                        }
                        match action.execute(req).await {
                            Ok(res) => Ok(res),
                            Err(e) => handle(e),
                        }
                    }
                )*
                _ => match api::asset_api::AssetAPI.execute(req).await {
                    Ok(res) => Ok(res),
                    Err(e) => handle(e),
                }
            }
        }
    };
}

define_routes! {
    "/path/to/endpoint" => api::OneActionType,
    // other apis ...
}

And the server run loop:

pub async fn run(
    mut shutdown_rx: broadcast::Receiver<()>,
    port: u16,
) -> std::result::Result<(), Error> {
    let addr: SocketAddr = ([127, 0, 0, 1], port).into();
    let listener = TcpListener::bind(addr).await.map_err(CommonError::from)?;
    let graceful = hyper_util::server::graceful::GracefulShutdown::new();
    info!("Listening on http://{addr}");

    loop {
        select! {
            accept_result = listener.accept().fuse() => {
                match accept_result {
                    Ok((stream, socket_addr)) => {
                        let io = TokioIo::new(stream);
                        let conn = http1::Builder::new().serve_connection(io, service_fn(move |req| {
                            route(req, socket_addr)
                        }));
                        let fut = graceful.watch(conn);
                        tokio::spawn(async move {
                            if let Err(e) = fut.await {
                                error!("Error serving connection: {e}");
                            }
                        });
                    }
                    Err(e) => {
                        error!("Failed to accept connection: {e:?}");
                        continue;
                    }
                }
            }

            _ = shutdown_rx.recv().fuse() => {
                drop(listener);
                info!("Server received shutdown signal");
                break;
            }
        }
    }

    graceful.shutdown().await;
    Ok(())
}

Suppose I have one API action like this:

pub struct OneAPI;

impl Action for OneAPI {
    async fn execute(&self, req: Req) -> ServerResult<Response<BoxBody>> {
        loop {
            if meets_some_condition {
                break;
            }
        }
        Ok(ResultResponse::success())
    }
}

Now, if the client cancels the request while this loop is running, I want to:

  1. Detect that the request was cancelled.
  2. Break out of the loop.
  3. Run some cleanup logic before returning.

Whatโ€™s the recommended way to do this in hyper, ideally with minimal changes to my current structure?

Thanks!

2 posts - 2 participants

Read full topic

๐Ÿท๏ธ Rust_feed