How to lazily split a Non-cloneable stream to two stream

⚓ Rust    📅 2025-09-02    👤 surdeus    👁️ 1      

surdeus

the incorrect_stream_split have a bug : moved tx in FnMut was not droped.
but the correct_stream_split was not lazily

#![allow(warnings)]

use futures; // 0.3.31
use futures::{SinkExt, Stream, StreamExt, stream, pin_mut};
use tokio::sync::mpsc::channel;
use tokio_stream; // 0.1.17
use tokio_stream::wrappers::ReceiverStream;
use rand;


fn pred<T>(_value : &T) -> bool {
    rand::random()
}

fn incorrect_stream_split<T>(
    stream: impl Stream<Item = T>,
) -> (impl Stream<Item = T>, impl Stream<Item = T>) {
    let (tx, rx) = channel(1024);
    let st1 = stream.filter_map(move |x| { 
        let tx_c = tx.clone();
        async move {
            if pred(&x) {
                Some(x)
            } else {
                tx_c.send(x).await.unwrap();
                None
            }
        }
    });
    let st2 = ReceiverStream::new(rx);

    return (st1, st2);
}

fn correct_stream_split<T>(
    mut stream: impl Stream<Item = T> + Send + Unpin + 'static,
) -> (impl Stream<Item = T>, impl Stream<Item = T>) 
where
    T : Send + 'static
{
    let (tx1, rx1) = channel(1024);
    let (tx2, rx2) = channel(1024);
    tokio::spawn(async move {
        while let Some(value) = stream.next().await {
            if pred(&value) {
                tx1.send(value).await.unwrap();    
            } else {
                tx2.send(value).await.unwrap();
            }
        }
    });
    
    
    (
        ReceiverStream::new(rx1),
        ReceiverStream::new(rx2)
    )
}

#[tokio::main]
async fn main() {
    let mut data = stream::iter(vec![1, 2, 3, 4, 5, 6]);
    
    let (mut s1, mut s2) = incorrect_stream_split(data);
    pin_mut!(s1);
    while let Some(num) = s1.next().await {
        println!("get num {} from s1", num);
    }
    while let Some(num) = s2.next().await {
        println!("get num {} from s2", num);
    }
}


1 post - 1 participant

Read full topic

🏷️ Rust_feed