Info
This post is auto-generated from RSS feed The Rust Programming Language Forum - Latest topics. Source: Rust PDK Filter: Backend returns 415 Unsupported Media Type after forwarding request
Hi everyone,
I'm developing an HTTP filter in Rust using a PDK (Proxy Development Kit). The filter's main job is to act as an authentication proxy.
Here's the logic:
X-Serverless-Authorization
header to the request.The authentication part works perfectly, and I'm able to get the token. However, for POST
and PUT
requests, my backend service consistently replies with a 415 Unsupported Media Type
error.
This strongly suggests that the Content-Type
header is being lost, modified, or is missing when the request reaches the final destination. My goal is to be a transparent proxy that doesn't alter the body and correctly preserves the original headers.
Here is the core function handling the request:
`Rustasync fn on_request_entry(
** state: RequestState,**
** client: HttpClient,**
** config: &Config,**
) -> Flow {
** // 1. Access headers and original request info**
** let headers_state = state.into_headers_state().await;**
** let original_method = Method::from_str(headers_state.method()).unwrap_or(Method::GET);**
** let original_path = headers_state.path();**
** // 2. Get the body**
** let body = headers_state.into_body_state().await.into_bytes();**
** // 3. Fetch the auth token (this part works fine)**
** let token = match get_id_token(&client, config).await {**
** Ok(t) => t,**
** Err(_) => return Flow::Break(Response::new(500).with_body("Token error.")),**
** };**
** // 4. Prepare headers for the backend**
** // I start with the original headers to preserve Content-Type**
** let mut backend_headers: Vec<(String, String)> = headers_state.handler().headers();**
** // Filter out headers that will be recalculated**
** backend_headers.retain(|(key, _)| !matches!(key.to_lowercase().as_str(), "host" | "content-length"));**
** // Add my new authorization header**
** let token_header = format!("Bearer {}", token);**
** backend_headers.push(("X-Serverless-Authorization".to_string(), token_header));**
** let backend_headers_ref: Vec<(&str, &str)> = backend_headers**
** .iter()**
** .map(|(k, v)| (k.as_str(), v.as_str()))**
** .collect();**
** // 5. Build the new path and call the backend**
** let backend_path = format!("/{}", original_path.split('/').skip(2).collect::<Vec<&str>>().join("/"));**
** let response = call_backend(**
** &client,**
** &backend_headers_ref,**
** &backend_path,**
** &original_method,**
** &body,**
** &config.backend_url,**
** ).await;**
** Flow::Break(response)**
}`
My question is: What is the idiomatic way within this PDK's lifecycle (RequestState
-> HeadersState
-> BodyState
) to ensure the Content-Type
header is correctly preserved or even added if it's missing on a request that has a body? I feel like I'm doing the right thing by reading the headers first, but the backend's response tells me I'm missing a step.
mod generated;
use anyhow::{anyhow, Result};
use http::Method;
use std::str::FromStr;
use pdk::script::PayloadBinding;
use pdk::hl::{
Configuration,
EntityState,
HeadersState,
entrypoint,
Flow,
HttpClient,
IntoBodyState,
Launcher,
on_request,
RequestState,
Response,
Service,
HeadersHandler,
};
use pdk::logger;
use crate::generated::config::Config;
// ===============================
// Fonctions utilitaires pour headers
// ===============================
fn collect_headers(handler: &dyn HeadersHandler) -> Vec<(String, String)> {
handler.headers()
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
fn remove_by_key<K, V>(vec: &mut Vec<(K, V)>, key_to_remove: &K)
where
K: PartialEq,
{
vec.retain(|(key, _)| key != key_to_remove);
}
fn modify_header(headers: &mut Vec<(String, String)>, key: &str, new_value: &str) -> bool {
if let Some((_, v)) = headers.iter_mut().find(|(k, _)| k == key) {
*v = new_value.to_string();
true
} else {
false
}
}
// =======================
// Récupère le token GCP
// =======================
async fn get_id_token(client: &HttpClient, config: &Config) -> Result {
let response = client
.request(&config.metadata_url)
.headers(vec![("Metadata-Flavor", "Google")])
.get()
.await
.map_err(|e| anyhow!("HTTP request to metadata server failed: {:?}", e))?;
if response.status_code() != 200 {
return Err(anyhow!(
"Metadata call failed with status: {} and body: {:?}",
response.status_code(),
std::str::from_utf8(response.body()).unwrap_or("<invalid utf-8>")
));
}
Ok(std::str::from_utf8(response.body())?.to_string())
}
// ===================================
// Appelle le backend avec headers et Service
// ===================================
async fn call_backend(
client: &HttpClient,
headers: &Vec<(&str, &str)>,
path: &str,
method: &Method,
body: &[u8],
backend_service: &Service,
) -> Response {
logger::info!("Calling backend {} {}", method, path);
let mut request_builder = client
.request(backend_service)
.path(path)
.headers(headers.clone());
// .body(body_to_send)
if method == &Method::POST || method == &Method::PUT {
let body_to_send = if body.is_empty() { b"{}".as_ref() } else { body };
request_builder = request_builder.body(body_to_send);
}
let final_request = match *method {
Method::GET => request_builder.get(),
Method::POST => request_builder.post(),
Method::PUT => request_builder.put(),
//Method::PATCH => request_builder.patch(),
Method::DELETE => request_builder.delete(),
_ => {
logger::warn!("Unexpected HTTP method {:?}, defaulting to GET", method);
request_builder.get()
}
};
match final_request.await {
Ok(resp) => {
logger::info!(
"Backend response status: {} ({} bytes)",
resp.status_code(),
resp.body().len()
);
Response::new(resp.status_code()).with_body(resp.body())
}
Err(e) => {
logger::error!("Backend call failed: {:?}", e);
Response::new(503).with_body(e.to_string())
}
}
}
// ====================================================
// Point d’entrée : traiter headers avant body
// ====================================================
async fn on_request_entry(
_state: RequestState,
client: HttpClient,
config: &Config,
) -> Flow {
// Récupérer headers_state
let headers_state = _state.into_headers_state().await;
let headers_handler = headers_state.handler();
// 🔹 Récupérer les informations avant move
// let original_method: Method = headers_state.method();
let original_method = Method::from_str(&headers_state.method()).unwrap_or(Method::GET);
let original_request_path = headers_state.path();
let mut backend_headers_owned: Vec<(String, String)> = headers_handler.headers();
// Récupérer le token GCP
let token = match get_id_token(&client, config).await {
Ok(t) => t,
Err(err) => return Flow::Break(Response::new(500).with_body(err.to_string())),
};
let token_header = format!("Bearer {}", token);
// Supprimer le header de Google
// headers_handler.remove_header("Metadata-Flavor");
remove_by_key(&mut backend_headers_owned, &"Metadata-Flavor".to_string());
// 🔹 Définir les headers
modify_header(&mut backend_headers_owned, "Content-Type", "application/json");
modify_header(&mut backend_headers_owned, "X-Serverless-Authorization", &token_header);
// 🔹 Collecte immédiate des headers modifiés pour éviter les problèmes de borrow
// Ligne commentée : 19h13 let backend_headers_owned: Vec<(String, String)> = headers_handler.headers();
let backend_headers_ref= backend_headers_owned
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
// 🔹 Récupérer le body (consomme headers_state)
let body_state = headers_state.into_body_state().await;
let original_body_slice = body_state.as_bytes();
logger::info!(
"Incoming request: {} {} with body size: {}",
original_method,
original_request_path,
original_body_slice.len()
);
// 🔹 Construire le path pour le backend
let backend_path = {
let segments: Vec<&str> = original_request_path.split('/').skip(2).collect();
format!("/{}", segments.join("/"))
};
let backend_service = &config.backend_url;
let response = call_backend(
&client,
&backend_headers_ref,
&backend_path,
&original_method,
&original_body_slice,
backend_service,
)
.await;
Flow::Break(response)
}
// ===============================
// Entrypoint principal PDK
// ===============================
#[entrypoint]
async fn configure(launcher: Launcher, Configuration(bytes): Configuration) -> Result<()> {
let config: Config = serde_json::from_slice(&bytes).map_err(|err| {
anyhow!(
"Failed to parse configuration '{}'. Cause: {}",
String::from_utf8_lossy(&bytes),
err
)
})?;
launcher
.launch(on_request(|req: RequestState, client: HttpClient| {
on_request_entry(req, client, &config)
}))
.await?;
Ok(())
}
Any advice would be greatly appreciated. Thanks!
2 posts - 2 participants
🏷️ Rust_feed