Send references across scoped threads

⚓ Rust    📅 2025-07-11    👤 surdeus    👁️ 3      

surdeus

I'm trying to build a multi-threaded processor using crossbeam::scope to share a read-only HashMap. The idea is to send references to values inside the map through a channel to avoid cloning.

The following minimal version compiles and works fine:


fn processor_work<'a, 'b>(
    rx: Receiver<usize>,
    tx: Sender<(&'a u8, &'a u8)>,
    map: &'b HashMap<usize, (u8, u8)>,
) where
    'b: 'a,
{
    // Process the first data.
    let key = rx.recv().unwrap();
    println!("received {:?} inside worker", map);
    if let Some((first, second)) = map.get(&key) {
        let _ = tx.send((first, second));
    }

    // Process the second data.
    let key = rx.recv().unwrap();
    println!("received {:?} inside worker", map);
    if let Some((first, second)) = map.get(&key) {
        let _ = tx.send((first, second));
    }
}

fn processor<'scope, 'map>(
    scope: &'scope Scope<'scope, '_>,
    map: &'map HashMap<usize, (u8, u8)>,
    threads: usize,
) -> (Sender<usize>, Receiver<(&'scope u8, &'scope u8)>)
where
    'map: 'scope,
{
    let (main_tx, processor_rx) = unbounded();
    let (processor_tx, main_rx) = unbounded();

    let mut processor_handles = Vec::with_capacity(threads);
    for _ in 0 .. threads {
        let rx = processor_rx.clone();
        let tx = processor_tx.clone();
        let handle = scope.spawn(move || processor_work(rx, tx, map));
        processor_handles.push(handle);
    }

    (main_tx, main_rx)
}

However, when I apply the same idea to my real use case involving a HashMap<Bytes, (Bytes, Bytes)>, the compiler complains about lifetimes:

use std::collections::HashMap;
use std::thread::Scope;
use anyhow::Result;
use bytes::Bytes;
use crossbeam_channel::bounded;
use crossbeam_channel::Receiver;
use crossbeam_channel::Sender;

fn processor_work<'a, 'b>(
    rx: Receiver<Vec<Bytes>>,
    tx: Sender<(&'a Bytes, &'a Bytes)>,
    map: &'b HashMap<Bytes, (Bytes, Bytes)>,
) -> Result<()>
where
    'b: 'a,
{
    while let Ok(records) = rx.recv() {
        for record in records {
            // sequence length, taxid, lca
            if let Some((taxid, lca)) = map.get(&record) {
                tx.send((taxid, lca))?;
            }
        }
    }
    Ok(())
}

fn processor<'scope, 'map>(
    scope: &'scope Scope<'scope, '_>,
    map: &'map HashMap<Bytes, (Bytes, Bytes)>,
    threads: usize,
) -> (Sender<Vec<Bytes>>, Receiver<(&'scope Bytes, &'scope Bytes)>)
where
    'map: 'scope,
{
    let (processor_tx, writer_rx): (
        Sender<(&'scope Bytes, &'scope Bytes)>,
        Receiver<(&'scope Bytes, &'scope Bytes)>,
    ) = bounded(100);

    let (reader_tx, processor_rx): (Sender<Vec<Bytes>>, Receiver<Vec<Bytes>>) = bounded(100);

    let mut processor_handles = Vec::with_capacity(threads);
    for _ in 0 .. threads {
        let rx = processor_rx.clone();
        let tx = processor_tx.clone();
        let handle = scope.spawn(move || processor_work(rx, tx, map));
        processor_handles.push(handle);
    }
    (reader_tx, writer_rx)
}

image

Why does the minimal version using &u8 compile, but the version using &Bytes fails, even though I use the same 'b: 'a constraint and pass the references through scoped threads?

Any clarification would be deeply appreciated!

3 posts - 2 participants

Read full topic

🏷️ rust_feed