Info
This post is auto-generated from RSS feed The Rust Programming Language Forum - Latest topics. Source: Send references across scoped threads
I'm trying to build a multi-threaded processor using crossbeam::scope
to share a read-only HashMap
. The idea is to send references to values inside the map through a channel to avoid cloning.
The following minimal version compiles and works fine:
fn processor_work<'a, 'b>(
rx: Receiver<usize>,
tx: Sender<(&'a u8, &'a u8)>,
map: &'b HashMap<usize, (u8, u8)>,
) where
'b: 'a,
{
// Process the first data.
let key = rx.recv().unwrap();
println!("received {:?} inside worker", map);
if let Some((first, second)) = map.get(&key) {
let _ = tx.send((first, second));
}
// Process the second data.
let key = rx.recv().unwrap();
println!("received {:?} inside worker", map);
if let Some((first, second)) = map.get(&key) {
let _ = tx.send((first, second));
}
}
fn processor<'scope, 'map>(
scope: &'scope Scope<'scope, '_>,
map: &'map HashMap<usize, (u8, u8)>,
threads: usize,
) -> (Sender<usize>, Receiver<(&'scope u8, &'scope u8)>)
where
'map: 'scope,
{
let (main_tx, processor_rx) = unbounded();
let (processor_tx, main_rx) = unbounded();
let mut processor_handles = Vec::with_capacity(threads);
for _ in 0 .. threads {
let rx = processor_rx.clone();
let tx = processor_tx.clone();
let handle = scope.spawn(move || processor_work(rx, tx, map));
processor_handles.push(handle);
}
(main_tx, main_rx)
}
However, when I apply the same idea to my real use case involving a HashMap<Bytes, (Bytes, Bytes)>
, the compiler complains about lifetimes:
use std::collections::HashMap;
use std::thread::Scope;
use anyhow::Result;
use bytes::Bytes;
use crossbeam_channel::bounded;
use crossbeam_channel::Receiver;
use crossbeam_channel::Sender;
fn processor_work<'a, 'b>(
rx: Receiver<Vec<Bytes>>,
tx: Sender<(&'a Bytes, &'a Bytes)>,
map: &'b HashMap<Bytes, (Bytes, Bytes)>,
) -> Result<()>
where
'b: 'a,
{
while let Ok(records) = rx.recv() {
for record in records {
// sequence length, taxid, lca
if let Some((taxid, lca)) = map.get(&record) {
tx.send((taxid, lca))?;
}
}
}
Ok(())
}
fn processor<'scope, 'map>(
scope: &'scope Scope<'scope, '_>,
map: &'map HashMap<Bytes, (Bytes, Bytes)>,
threads: usize,
) -> (Sender<Vec<Bytes>>, Receiver<(&'scope Bytes, &'scope Bytes)>)
where
'map: 'scope,
{
let (processor_tx, writer_rx): (
Sender<(&'scope Bytes, &'scope Bytes)>,
Receiver<(&'scope Bytes, &'scope Bytes)>,
) = bounded(100);
let (reader_tx, processor_rx): (Sender<Vec<Bytes>>, Receiver<Vec<Bytes>>) = bounded(100);
let mut processor_handles = Vec::with_capacity(threads);
for _ in 0 .. threads {
let rx = processor_rx.clone();
let tx = processor_tx.clone();
let handle = scope.spawn(move || processor_work(rx, tx, map));
processor_handles.push(handle);
}
(reader_tx, writer_rx)
}
Why does the minimal version using &u8
compile, but the version using &Bytes
fails, even though I use the same 'b: 'a
constraint and pass the references through scoped threads?
Any clarification would be deeply appreciated!
3 posts - 2 participants
🏷️ rust_feed