Invoke subprocess with piped output and direct sending of Ctrl+C

⚓ Rust    📅 2025-06-24    👤 surdeus    👁️ 5      

surdeus

Warning

This post was published 49 days ago. The information described in this article may have changed.

I am currently writing a wrapper around a third-party console program (to manage firmware updates on a Zigbee chip).

In order to read out the current firmware version, I need to invoke the third-party program with certain parameters and capture its output.
Due to the nature of the third-party program, I need to terminate it via Ctrl+C, since it always starts into an interactive session, which I do not need for this scenario. And, no, there isn't a flag to not launch an interactive session of said program.
Luckily it terminates after sending Ctrl+C reliably, so I came up with these extension traits:

ctrl_c.rs

use std::io::{Error, ErrorKind, Result, Write};
use std::process::Child;

const CTRL_C: u8 = 0x03; // ASCII Control-C

pub trait CtrlC: Sized {
    /// Sends Ctrl-C to the STDIN.
    fn ctrl_c(self) -> Result<Self>;
}

impl CtrlC for Child {
    fn ctrl_c(mut self) -> Result<Self> {
        let Some(ref mut stdin) = self.stdin else {
            self.kill()?;
            return Err(Error::new(ErrorKind::Other, "Failed to open STDIN"))?;
        };

        stdin.write_all(&[CTRL_C])?;
        Ok(self)
    }
}

z3gateway_host.rs

use std::process::{Command, Stdio};

const Z3GATEWAY_HOST: &str = "/usr/bin/Z3GatewayHost";

/// Represents a host for the Z3 Gateway, which is used to communicate with Silicon Labs devices.
pub trait Z3GatewayHost {
    fn z3gateway_host() -> Self;
}

impl Z3GatewayHost for Command {
    /// Creates a new instance of `Z3GatewayHost`.
    fn z3gateway_host() -> Self {
        let mut command = Self::new(Z3GATEWAY_HOST);
        command
            .stdin(Stdio::piped())
            .stderr(Stdio::piped())
            .stdout(Stdio::piped());
        command
    }
}

Library code that uses the above traits (in status()):

use std::ffi::OsStr;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::process::{Command, Output};
use std::str::FromStr;

use log::error;
use regex::{Captures, Regex};
use semver::{BuildMetadata, Version};

use ctrl_c::CtrlC;
use z3gateway_host::Z3GatewayHost;

use crate::FirmwareUpdater;

mod ctrl_c;
mod manifest;
mod z3gateway_host;

const BAUD_RATE: u32 = 115200;
const VERSION_REGEX: &str = r"\[(\d+\.\d+\.\d+) (?:.+) build (\d+)\]";

/// Represents the Silicon Labs MGM210P22A device.
#[derive(Debug)]
pub struct MGM210P22A {
    tty: PathBuf,
}

impl MGM210P22A {
    /// Creates a new instance of `MGM210P22A`.
    ///
    /// # Arguments
    ///
    /// * `tty` - The path to the TTY device.
    pub fn new(tty: PathBuf) -> Self {
        Self { tty }
    }

    /// Returns the TTY path for the device.
    pub fn tty(&self) -> &Path {
        &self.tty
    }

    /// Read out the status of the device connected to the specified TTY.
    fn status(&self) -> std::io::Result<Output> {
        Command::z3gateway_host()
            .arg("-n")
            .arg(1.to_string())
            .arg("-b")
            .arg(BAUD_RATE.to_string())
            .arg("-f")
            .arg("x")
            .arg("-p")
            .arg(self.tty())
            .spawn()?
            .ctrl_c()?
            .wait_with_output()
    }

    fn read_version(&self) -> std::io::Result<Version> {
        let output = self.status()?;
        let stdout = String::from_utf8_lossy(&output.stdout);
        let regex = Regex::new(VERSION_REGEX)
            .map_err(|error| std::io::Error::new(ErrorKind::InvalidData, error))?;
        stdout
            .lines()
            .find_map(|line| regex.captures(line).and_then(capture_version))
            .ok_or_else(|| std::io::Error::new(ErrorKind::NotFound, "Version not found"))
    }

    fn list_versions(&self) -> std::io::Result<Vec<Version>> {
        Self::base_dir().read_dir().map(|elements| {
            elements
                // Exclude invalid entries.
                .filter_map(|entry| entry.inspect_err(|error| error!("{error}")).ok())
                // Filter for files only.
                .filter_map(|entry| {
                    if entry.path().is_file() {
                        Some(entry.path())
                    } else {
                        None
                    }
                })
                // Extract the file stem and parse it as a version.
                .filter_map(|path| {
                    path.file_stem().and_then(OsStr::to_str).and_then(|stem| {
                        Version::from_str(stem)
                            .inspect_err(|error| error!("Invalid version in file name: {error}"))
                            .ok()
                    })
                })
                .collect()
        })
    }
}

impl FirmwareUpdater for MGM210P22A {
    const BASE_DIR: &'static str = "MGM210P22A";

    type Version = Version;

    fn current_version(&self) -> std::io::Result<Self::Version> {
        self.read_version()
    }

    fn latest_version(&self) -> Option<Self::Version> {
        self.available_versions().into_iter().next_back()
    }

    fn available_versions(&self) -> Vec<Self::Version> {
        let mut versions: Vec<Version> = self
            .list_versions()
            .inspect_err(|error| error!("Error reading directory: {error}"))
            .unwrap_or_default();
        versions.sort();
        versions
    }

    fn install(&self, _version: &Self::Version) -> std::io::Result<()> {
        todo!()
    }
}

fn capture_version(captures: Captures) -> Option<Version> {
    Version::parse(captures.get(1)?.as_str())
        .inspect_err(|error| error!("Invalid version: {error}"))
        .ok()
        .and_then(|mut version| {
            BuildMetadata::from_str(captures.get(2)?.as_str())
                .inspect_err(|error| error!("Invalid build metadata: {error}"))
                .ok()
                .map(|build| {
                    version.build = build;
                    version
                })
        })
}

#[cfg(test)]
mod tests {
    use super::{VERSION_REGEX, capture_version};
    use regex::Regex;
    use semver::{BuildMetadata, Version};

    const VERSION_LINE: &str = "ezsp ver 0x08 stack type 0x02 stack ver. [6.10.3 GA build 297]";

    #[test]
    fn test_capture_version() {
        let mut version = Version::new(6, 10, 3);
        Version::new(6, 10, 3);
        version.build = BuildMetadata::new(297.to_string().as_str()).unwrap();
        assert_eq!(
            capture_version(
                Regex::new(VERSION_REGEX)
                    .unwrap()
                    .captures(VERSION_LINE)
                    .unwrap()
            ),
            Some(version)
        );
    }
}

Any thoughts on this approach?

3 posts - 2 participants

Read full topic

🏷️ rust_feed