Info
This post is auto-generated from RSS feed The Rust Programming Language Forum - Latest topics. Source: How to lazily split a Non-cloneable stream to two stream
the incorrect_stream_split have a bug : moved tx in FnMut was not droped.
but the correct_stream_split was not lazily
#![allow(warnings)]
use futures; // 0.3.31
use futures::{SinkExt, Stream, StreamExt, stream, pin_mut};
use tokio::sync::mpsc::channel;
use tokio_stream; // 0.1.17
use tokio_stream::wrappers::ReceiverStream;
use rand;
fn pred<T>(_value : &T) -> bool {
rand::random()
}
fn incorrect_stream_split<T>(
stream: impl Stream<Item = T>,
) -> (impl Stream<Item = T>, impl Stream<Item = T>) {
let (tx, rx) = channel(1024);
let st1 = stream.filter_map(move |x| {
let tx_c = tx.clone();
async move {
if pred(&x) {
Some(x)
} else {
tx_c.send(x).await.unwrap();
None
}
}
});
let st2 = ReceiverStream::new(rx);
return (st1, st2);
}
fn correct_stream_split<T>(
mut stream: impl Stream<Item = T> + Send + Unpin + 'static,
) -> (impl Stream<Item = T>, impl Stream<Item = T>)
where
T : Send + 'static
{
let (tx1, rx1) = channel(1024);
let (tx2, rx2) = channel(1024);
tokio::spawn(async move {
while let Some(value) = stream.next().await {
if pred(&value) {
tx1.send(value).await.unwrap();
} else {
tx2.send(value).await.unwrap();
}
}
});
(
ReceiverStream::new(rx1),
ReceiverStream::new(rx2)
)
}
#[tokio::main]
async fn main() {
let mut data = stream::iter(vec![1, 2, 3, 4, 5, 6]);
let (mut s1, mut s2) = incorrect_stream_split(data);
pin_mut!(s1);
while let Some(num) = s1.next().await {
println!("get num {} from s1", num);
}
while let Some(num) = s2.next().await {
println!("get num {} from s2", num);
}
}
1 post - 1 participant
🏷️ Rust_feed