Rust PDK Filter: Backend returns 415 Unsupported Media Type after forwarding request

⚓ Rust    📅 2025-10-13    👤 surdeus    👁️ 1      

surdeus

Hi everyone,

I'm developing an HTTP filter in Rust using a PDK (Proxy Development Kit). The filter's main job is to act as an authentication proxy.

Here's the logic:

  1. It intercepts an incoming request.
  2. It calls the GCP metadata server to fetch an identity token.
  3. It adds this token as an X-Serverless-Authorization header to the request.
  4. It then forwards the request, including the original method, path, headers, and body, to the backend service.

The authentication part works perfectly, and I'm able to get the token. However, for POST and PUT requests, my backend service consistently replies with a 415 Unsupported Media Type error.

This strongly suggests that the Content-Type header is being lost, modified, or is missing when the request reaches the final destination. My goal is to be a transparent proxy that doesn't alter the body and correctly preserves the original headers.

Here is the core function handling the request:

`Rustasync fn on_request_entry(
** state: RequestState,**
** client: HttpClient,**
** config: &Config,**
) -> Flow {
** // 1. Access headers and original request info**
** let headers_state = state.into_headers_state().await;**
** let original_method = Method::from_str(headers_state.method()).unwrap_or(Method::GET);**
** let original_path = headers_state.path();**


** // 2. Get the body**
** let body = headers_state.into_body_state().await.into_bytes();**

** // 3. Fetch the auth token (this part works fine)**
** let token = match get_id_token(&client, config).await {**
** Ok(t) => t,**
** Err(_) => return Flow::Break(Response::new(500).with_body("Token error.")),**
** };**

** // 4. Prepare headers for the backend**
** // I start with the original headers to preserve Content-Type**
** let mut backend_headers: Vec<(String, String)> = headers_state.handler().headers();**


** // Filter out headers that will be recalculated**
** backend_headers.retain(|(key, _)| !matches!(key.to_lowercase().as_str(), "host" | "content-length"));**


** // Add my new authorization header**
** let token_header = format!("Bearer {}", token);**
** backend_headers.push(("X-Serverless-Authorization".to_string(), token_header));**

** let backend_headers_ref: Vec<(&str, &str)> = backend_headers**
** .iter()**
** .map(|(k, v)| (k.as_str(), v.as_str()))**
** .collect();**

** // 5. Build the new path and call the backend**
** let backend_path = format!("/{}", original_path.split('/').skip(2).collect::<Vec<&str>>().join("/"));**

** let response = call_backend(**
** &client,**
** &backend_headers_ref,**
** &backend_path,**
** &original_method,**
** &body,**
** &config.backend_url,**
** ).await;**

** Flow::Break(response)**
}`

My question is: What is the idiomatic way within this PDK's lifecycle (RequestState -> HeadersState -> BodyState) to ensure the Content-Type header is correctly preserved or even added if it's missing on a request that has a body? I feel like I'm doing the right thing by reading the headers first, but the backend's response tells me I'm missing a step.

mod generated;

use anyhow::{anyhow, Result};
use http::Method;
use std::str::FromStr;
use pdk::script::PayloadBinding;

use pdk::hl::{
Configuration,
EntityState,
HeadersState,
entrypoint,
Flow,
HttpClient,
IntoBodyState,
Launcher,
on_request,
RequestState,
Response,
Service,
HeadersHandler,
};
use pdk::logger;

use crate::generated::config::Config;

// ===============================
// Fonctions utilitaires pour headers
// ===============================
fn collect_headers(handler: &dyn HeadersHandler) -> Vec<(String, String)> {
handler.headers()
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}

fn remove_by_key<K, V>(vec: &mut Vec<(K, V)>, key_to_remove: &K)
where
K: PartialEq,
{
vec.retain(|(key, _)| key != key_to_remove);
}

fn modify_header(headers: &mut Vec<(String, String)>, key: &str, new_value: &str) -> bool {
if let Some((_, v)) = headers.iter_mut().find(|(k, _)| k == key) {
*v = new_value.to_string();
true
} else {
false
}
}

// =======================
// Récupère le token GCP
// =======================
async fn get_id_token(client: &HttpClient, config: &Config) -> Result {
let response = client
.request(&config.metadata_url)
.headers(vec![("Metadata-Flavor", "Google")])
.get()
.await
.map_err(|e| anyhow!("HTTP request to metadata server failed: {:?}", e))?;

if response.status_code() != 200 {
    return Err(anyhow!(
        "Metadata call failed with status: {} and body: {:?}",
        response.status_code(),
        std::str::from_utf8(response.body()).unwrap_or("<invalid utf-8>")
    ));
}

Ok(std::str::from_utf8(response.body())?.to_string())

}

// ===================================
// Appelle le backend avec headers et Service
// ===================================
async fn call_backend(
client: &HttpClient,
headers: &Vec<(&str, &str)>,
path: &str,
method: &Method,
body: &[u8],
backend_service: &Service,
) -> Response {
logger::info!("Calling backend {} {}", method, path);

let mut request_builder = client
    .request(backend_service)
    .path(path)
    .headers(headers.clone());
   // .body(body_to_send)
if method == &Method::POST || method == &Method::PUT {
    let body_to_send = if body.is_empty() { b"{}".as_ref() } else { body };
    request_builder = request_builder.body(body_to_send);
}

let final_request = match *method {
    Method::GET => request_builder.get(),
    Method::POST => request_builder.post(),
    Method::PUT => request_builder.put(),
    //Method::PATCH => request_builder.patch(),
    Method::DELETE => request_builder.delete(),
    _ => {
        logger::warn!("Unexpected HTTP method {:?}, defaulting to GET", method);
        request_builder.get()
    }
};

match final_request.await {
    Ok(resp) => {
        logger::info!(
            "Backend response status: {} ({} bytes)",
            resp.status_code(),
            resp.body().len()
        );
        Response::new(resp.status_code()).with_body(resp.body())
    }
    Err(e) => {
        logger::error!("Backend call failed: {:?}", e);
        Response::new(503).with_body(e.to_string())
    }
}

}

// ====================================================
// Point d’entrée : traiter headers avant body
// ====================================================
async fn on_request_entry(
_state: RequestState,
client: HttpClient,
config: &Config,
) -> Flow {
// :small_blue_diamond: Récupérer headers_state
let headers_state = _state.into_headers_state().await;
let headers_handler = headers_state.handler();

// 🔹 Récupérer les informations avant move

// let original_method: Method = headers_state.method();
let original_method = Method::from_str(&headers_state.method()).unwrap_or(Method::GET);
let original_request_path = headers_state.path();
let mut backend_headers_owned: Vec<(String, String)> = headers_handler.headers();
// :small_blue_diamond: Récupérer le token GCP
let token = match get_id_token(&client, config).await {
Ok(t) => t,
Err(err) => return Flow::Break(Response::new(500).with_body(err.to_string())),
};
let token_header = format!("Bearer {}", token);
// Supprimer le header de Google
// headers_handler.remove_header("Metadata-Flavor");
remove_by_key(&mut backend_headers_owned, &"Metadata-Flavor".to_string());

// 🔹 Définir les headers
modify_header(&mut backend_headers_owned, "Content-Type", "application/json");
modify_header(&mut backend_headers_owned, "X-Serverless-Authorization", &token_header);



// 🔹 Collecte immédiate des headers modifiés pour éviter les problèmes de borrow
// Ligne commentée : 19h13 let backend_headers_owned: Vec<(String, String)> = headers_handler.headers();
let backend_headers_ref=  backend_headers_owned
    .iter()
    .map(|(k, v)| (k.as_str(), v.as_str()))
    .collect();

// 🔹 Récupérer le body (consomme headers_state)
let body_state = headers_state.into_body_state().await;
let original_body_slice = body_state.as_bytes();

logger::info!(
    "Incoming request: {} {} with body size: {}",
    original_method,
    original_request_path,
    original_body_slice.len()
);

// 🔹 Construire le path pour le backend
let backend_path = {
    let segments: Vec<&str> = original_request_path.split('/').skip(2).collect();
    format!("/{}", segments.join("/"))
};

let backend_service = &config.backend_url;

let response = call_backend(
    &client,
    &backend_headers_ref,
    &backend_path,
    &original_method,
    &original_body_slice,
    backend_service,
)
.await;

Flow::Break(response)

}

// ===============================
// Entrypoint principal PDK
// ===============================
#[entrypoint]
async fn configure(launcher: Launcher, Configuration(bytes): Configuration) -> Result<()> {
let config: Config = serde_json::from_slice(&bytes).map_err(|err| {
anyhow!(
"Failed to parse configuration '{}'. Cause: {}",
String::from_utf8_lossy(&bytes),
err
)
})?;

launcher
    .launch(on_request(|req: RequestState, client: HttpClient| {
        on_request_entry(req, client, &config)
    }))
    .await?;

Ok(())

}

Any advice would be greatly appreciated. Thanks!

2 posts - 2 participants

Read full topic

🏷️ Rust_feed