How to compression and tar the Stream pipelining?

⚓ Rust    📅 2025-08-13    👤 surdeus    👁️ 4      

surdeus

How can one archive and compress a stream of file paths within a pipeline without using channels?
By 'pipeline', I mean pipelining, with the goal of minimizing the time data spends in memory as much as possible.
use async_compression for async compression
use astral-tokio-tar for async tar

use async_compression::tokio::write::GzipEncoder;
use tokio::{
    io::BufWriter,
    sync::mpsc::{Receiver, Sender, error::SendError},
};
use tokio_util::{
    io::{CopyToBytes, SinkWriter},
    sync::PollSender,
};
use tokio_tar::Builder;

pub async fn archive_compress(
    archive_source: String,
    archive_stream_sx: Sender<Bytes>,
    mut file_rx: Receiver<PathBuf>,
) -> anyhow::Result<()> {
    let sink = PollSender::new(archive_stream_sx.clone())
        .sink_map_err(|_| std::io::Error::from(std::io::ErrorKind::BrokenPipe));

    let sinkwriter = SinkWriter::new(CopyToBytes::new(sink));

    let writer = BufWriter::new(sinkwriter);
    let encoder = GzipEncoder::new(writer);
    let mut builder = Builder::new(encoder);
    while let Some(path) = file_rx.recv().await {
        let source_path = PathBuf::from(archive_source.clone());
        let unused_prefix = match source_path.parent() {
            Some(name) => name,
            None => {
                return Err(SftpWorkerError::Other(format!(
                    "archive source path not have parent"
                )));
            }
        };
        let package_path = path.strip_prefix(unused_prefix)?;
        builder
            .append_path_with_name(path.clone(), package_path)
            .await
            .map_err(|e| SftpWorkerError::PathIo {
                file: path,
                source: e,
            })?;
    }
    builder.finish().await.map_err(|e| SftpWorkerError::Io {
        file: archive_source.clone(),
        source: e,
    })?;
    Ok(())
}

in my image, the correct function is like

fn archive_compression(archive_source : PathBuf, stream : impl Stream<Item = PathBuf>) -> impl Stream<Item = Bytes>;

1 post - 1 participant

Read full topic

🏷️ Rust_feed