Info
This post is auto-generated from RSS feed The Rust Programming Language Forum - Latest topics. Source: How to compression and tar the Stream pipelining?
How can one archive and compress a stream of file paths within a pipeline without using channels?
By 'pipeline', I mean pipelining, with the goal of minimizing the time data spends in memory as much as possible.
use async_compression
for async compression
use astral-tokio-tar
for async tar
use async_compression::tokio::write::GzipEncoder;
use tokio::{
io::BufWriter,
sync::mpsc::{Receiver, Sender, error::SendError},
};
use tokio_util::{
io::{CopyToBytes, SinkWriter},
sync::PollSender,
};
use tokio_tar::Builder;
pub async fn archive_compress(
archive_source: String,
archive_stream_sx: Sender<Bytes>,
mut file_rx: Receiver<PathBuf>,
) -> anyhow::Result<()> {
let sink = PollSender::new(archive_stream_sx.clone())
.sink_map_err(|_| std::io::Error::from(std::io::ErrorKind::BrokenPipe));
let sinkwriter = SinkWriter::new(CopyToBytes::new(sink));
let writer = BufWriter::new(sinkwriter);
let encoder = GzipEncoder::new(writer);
let mut builder = Builder::new(encoder);
while let Some(path) = file_rx.recv().await {
let source_path = PathBuf::from(archive_source.clone());
let unused_prefix = match source_path.parent() {
Some(name) => name,
None => {
return Err(SftpWorkerError::Other(format!(
"archive source path not have parent"
)));
}
};
let package_path = path.strip_prefix(unused_prefix)?;
builder
.append_path_with_name(path.clone(), package_path)
.await
.map_err(|e| SftpWorkerError::PathIo {
file: path,
source: e,
})?;
}
builder.finish().await.map_err(|e| SftpWorkerError::Io {
file: archive_source.clone(),
source: e,
})?;
Ok(())
}
in my image, the correct function is like
fn archive_compression(archive_source : PathBuf, stream : impl Stream<Item = PathBuf>) -> impl Stream<Item = Bytes>;
1 post - 1 participant
🏷️ Rust_feed