Detect ANSI control character?

⚓ Rust    📅 2026-02-23    👤 surdeus    👁️ 1      

surdeus

Hi

I am looking for a safe way to remove control characters from a string by replacing them with the replacement character (U+FFFD).

My implementation should mimic the following Python code.

View Python code (click for more details)
  • I use the unicode-general-category crate using the Unicode v16.0 database.
  • Is my detection mechanism in the filter method safe?
  • Is my thread handling okay?
  • On L39 I use buf = [0u8; 1024]. If an adversary would use a multi-byte utf-8 character in the payload, wouldn't they be able to bypass my filter since a checked chunk might not contain the escape code?
use anyhow::{Context, Result};
use std::io::{Read, Write};
use std::process::{Command, Stdio};

fn filter_ansi_escape_codes(s: &str) -> String {
    fn is_safe(c: char) -> bool {
        !matches!(
            unicode_general_category::get_general_category(c),
            unicode_general_category::GeneralCategory::Control
                | unicode_general_category::GeneralCategory::Format
                | unicode_general_category::GeneralCategory::PrivateUse
                | unicode_general_category::GeneralCategory::Unassigned
                | unicode_general_category::GeneralCategory::LineSeparator
                | unicode_general_category::GeneralCategory::ParagraphSeparator
        )
    }

    s.chars()
        .map(|c| if !is_safe(c) { '\u{FFFD}' } else { c })
        .collect()
}

fn main() -> Result<()> {
    let mut child = Command::new("sh")
        .arg("-c")
        .arg(
            "printf '\\033[31mRED\\033[0m plain\\n' >&2; \\
             printf '\\033[32mGREEN\\033[0m plain\\n' >&2",
        )
        .stdout(Stdio::null())
        .stderr(Stdio::piped())
        .spawn()
        .context("Failed to spawn child process")?;

    let mut stderr = child.stderr.take().context("Failed to open child stderr")?;

    let stderr_thread = std::thread::spawn(move || -> Result<()> {
        let mut stderr_out = std::io::stderr().lock();
        let mut buf = [0u8; 1024];

        loop {
            let n = stderr.read(&mut buf).context("Failed to read child stderr")?;
            if n == 0 {
                break;
            }

            let s = std::str::from_utf8(&buf[..n]).context("Failed to decode UTF-8")?;
            let filtered = filter_ansi_escape_codes(s);

            stderr_out
                .write_all(filtered.as_bytes())
                .context("Failed to write filtered stderr")?;
            stderr_out.flush().context("Failed to flush stderr")?;
        }

        Ok(())
    });

    let status = child.wait().context("Failed to wait for child")?;
    stderr_thread
        .join()
        .expect("stderr thread panicked")
        .context("stderr thread failed")?;

    eprintln!("child exited with: {status}");
    Ok(())
}

[dependencies]
anyhow = "1.0.102"
unicode-general-category = "1.1.0"

Thanks!

1 post - 1 participant

Read full topic

🏷️ Rust_feed