Oneshot channel await takes a lot of time (dozens of ms)
⚓ Rust 📅 2025-10-19 👤 surdeus 👁️ 2Hi,
I have the following issue. I'm using a mpsc tokio channel to send messages to one worker started as a tokio::task::spawn, the messages are processed and responses are returned to the caller on a oneshot channel.
This is the method that sends the messages to the mpsc tokio channel, these messages are 'Job' structs that include the oneshot 'resp_rx' in order for the worker to return the response:
async fn read(&self, first_dir: u8, second_dir: u32, filename: u32, length: u32, offset: u32, op: char) -> Result<Option<Bytes>, IOUringError> {
let (resp_tx, resp_rx) = oneshot::channel::<Option<Bytes>>();
// Send the 'job' request to an async tokio channel. 'Job' struct contains the 'respond_to' element, where we will receive the response for this oneshot.
let job = Job::new(first_dir, second_dir, filename, offset, length, 0, 0,0, Some(resp_tx), op);
if let Err((err, entries)) = self.send_job(job, CONFIG.cache.io_uring.read.timeouts.send_timeout_ms).await {
return Err(err)
}
// Await the response to be received in the oneshot channel.
let start = Instant::now();
match resp_rx.await {
Ok(result) => {
println!("id = {}/{}/{}, now = {:?}, receive found = {}us", first_dir, second_dir, filename, Instant::now(), start.elapsed().as_micros());
Ok(result) // This can be document (Some(document)) or None (not found)
},
Err(_) => {
println!("id = {}/{}/{}, receive not found = {}us", first_dir, second_dir, filename, start.elapsed().as_micros());
Err(IOUringError::ChannelReceiveError("receiver closed".to_string()))
}
}
}
Where the 'send_job' simply selects a mpsc tokio channel and sends the job:
async fn send_job(&self, job: Job, send_timeout_ms: u16) -> Result<usize, (IOUringError, usize)> {
let channel_idx = get_channel_idx(job.filename);
let channel_reader = &self.channel_vec[channel_idx];
let tx = &channel_reader.channel;
// First fast-path, non-blocking. Try to send everything without waiting.
match tx.try_send(job) {
Ok(()) => Ok(entries),
Err(TrySendError::Closed(_job)) => {
// Channel closed
Err((IOUringError::ChannelSendError(t!("iouring.channel.msg_closed").to_string()), entries))
}
Err(TrySendError::Full(job)) => {
match timeout(Duration::from_millis(send_timeout_ms as u64), tx.send(job)).await {
Ok(Ok(())) => Ok(entries),
Ok(Err(err)) => {
Err((IOUringError::ChannelSendError(err.to_string()), entries))
},
Err(err) => { // timeout
Err((IOUringError::ChannelSendTimeout(err.to_string()), entries))
}
}
}
}
}
And this is the worker:
fn start_reader(ring_vec_idx: usize, mut rx: mpsc::Receiver<Job>, ready_tx: std::sync::mpsc::Sender<()>) {
tokio::task::spawn(async move {
( ... init variables ...)
while !IO_URING_READ.is_stop() {
Self::get_packet(&mut ctx, ring_vec_idx, channel_size, max_packet_size, packet_size_timeout_duration, &mut rx).await; // Try to get a complete batch of 'packet_size' elements
// Send this packet to 'io_uring' if it's not empty, so all open/read/close_unlink operations are performed, and the responses are directly returned to the oneshot channel.
if ctx.are_jobs() {
perform_open_read_close_operation(ring_vec_idx, &mut ctx, CONFIG.cache.io_uring.read.timeouts.disk_timeout_ms, cqe_max_wait_time_duration).await;
ctx.clear();
}
}
});
}
Where 'get_packet' reads the packets received on the '&mut mpsc::Receiver' and passes these jobs to 'perform_open_read_close_operation', that makes some operations and finally writes the responses (for each job) to the oneshot channels in this way:
if let Some(channel) = job.resp_tx.take() {
let _ = channel.send(document);
println!("set_success_result :: id = {}, sent response SOME in {}us", job.filename, now.elapsed().as_micros());
return;
} else {
prometheus::inc_failed_response();
}
Well, this is what's happening. If I print a timestamp after 'perform_open_read_close_operation', all this stuff (included the response already returned on "let _ = channel.send(document)") just takes several microseconds, below 100us.
But the timestamp after "match resp_rx.await" on the "read" method, where the oneshot channel response is received, shows values at milliseconds level, I even saw hundreds of milliseconds.
And I'm stuck on this. I changed the oneshot by a mpsc tokio channel of size = 1, and the same result.
I don't understand why it's taking so long to awake this oneshot channel. In our tests the cpu is 90% idle, so the server is not stressed at all.
Any clue about what it could be happenning?
Thanks,
Joan.
2 posts - 2 participants
🏷️ Rust_feed