Info
This post is auto-generated from RSS feed The Rust Programming Language Forum - Latest topics. Source: How to handle HTTP cancellation and cleanup in hyper
Hi,
Iโm building a web server based on hyper, and some of my API endpoints need to run a long loop. Sometimes the client may cancel the request, and in that case Iโd like to perform some cleanup after the request is fully dropped.
The problem is: Iโm not sure how hyper handles HTTP request cancellation, or how I can hook into it from my code.
Hereโs a simplified version of my setup:
// api.rs
pub trait Action: Send + Sync {
fn execute(&self, req: Req) -> impl Future<Output = ServerResult<Response<BoxBody>>> + Send;
/// default requires authentication
fn needs_auth(&self) -> bool {
true
}
/// default not cancellable
// TODO: handle request cancellation
fn cancellable(&self) -> bool {
false
}
fn auth(&self, req: &Req) -> ServerResult<()> {
if let Some(auth_header) = req.headers().get(header::AUTHORIZATION)
&& !auth_header.is_empty()
&& auth::authorize(auth_header.to_str().unwrap_or_default())
{
return Ok(());
}
Err(ServerError { kind: ServerErrorKind::Unauthorized })
}
}
And the server side:
// server.rs
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>;
type ServerResult<T> = std::result::Result<T, Infallible>;
type Req = Request<Incoming>;
macro_rules! define_routes {
($($path: literal => $action_type: ty),* $(,)?) => {
async fn route(req: Req, socket_addr: SocketAddr) -> ServerResult<Response<BoxBody>> {
if Config::read(|c| c.is_only_inner_ip && !is_inner_ip(socket_addr)).await {
return Ok(ResultResponse::error_with_code(StatusCode::FORBIDDEN));
}
match req.uri().path() {
$(
$path => {
let action = <$action_type>::default();
if action.needs_auth() {
if let Err(e) = action.auth(&req) {
return handle(e);
}
}
match action.execute(req).await {
Ok(res) => Ok(res),
Err(e) => handle(e),
}
}
)*
_ => match api::asset_api::AssetAPI.execute(req).await {
Ok(res) => Ok(res),
Err(e) => handle(e),
}
}
}
};
}
define_routes! {
"/path/to/endpoint" => api::OneActionType,
// other apis ...
}
And the server run loop:
pub async fn run(
mut shutdown_rx: broadcast::Receiver<()>,
port: u16,
) -> std::result::Result<(), Error> {
let addr: SocketAddr = ([127, 0, 0, 1], port).into();
let listener = TcpListener::bind(addr).await.map_err(CommonError::from)?;
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
info!("Listening on http://{addr}");
loop {
select! {
accept_result = listener.accept().fuse() => {
match accept_result {
Ok((stream, socket_addr)) => {
let io = TokioIo::new(stream);
let conn = http1::Builder::new().serve_connection(io, service_fn(move |req| {
route(req, socket_addr)
}));
let fut = graceful.watch(conn);
tokio::spawn(async move {
if let Err(e) = fut.await {
error!("Error serving connection: {e}");
}
});
}
Err(e) => {
error!("Failed to accept connection: {e:?}");
continue;
}
}
}
_ = shutdown_rx.recv().fuse() => {
drop(listener);
info!("Server received shutdown signal");
break;
}
}
}
graceful.shutdown().await;
Ok(())
}
Suppose I have one API action like this:
pub struct OneAPI;
impl Action for OneAPI {
async fn execute(&self, req: Req) -> ServerResult<Response<BoxBody>> {
loop {
if meets_some_condition {
break;
}
}
Ok(ResultResponse::success())
}
}
Now, if the client cancels the request while this loop is running, I want to:
Whatโs the recommended way to do this in hyper, ideally with minimal changes to my current structure?
Thanks!
2 posts - 2 participants
๐ท๏ธ Rust_feed