How to create lock free ring buffer MPSC channel?
⚓ Rust 📅 2026-03-02 👤 surdeus 👁️ 2I'm learning lock-free programming in Rust. Is this correct? I need help verifying the code. Also, is there another way so I don't have to push elements one by one in fn new? I'm trying to use MaybeUninit with a loop and write to fill the values in new. Which is better between using a Vec or MaybeUninit for initialization with minimal overhead if the channel size is large?
My progress is like this
use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
#[repr(align(64))]
struct CachePaddedAtomic(AtomicUsize);
#[repr(align(64))]
struct Slot<T> {
status: AtomicUsize,
data: UnsafeCell<T>,
}
pub struct MpscBuffer<T, const N: usize> {
buffer: Box<[Slot<T>]>,
head: CachePaddedAtomic,
tail: CachePaddedAtomic,
}
unsafe impl<T: Send, const N: usize> Sync for MpscBuffer<T, N> {}
impl<T: Default, const N: usize> MpscBuffer<T, N> {
const CHECK: () = assert!(N > 0 && (N & (N - 1) == 0), "N must be a power of two");
pub fn new() -> Self {
let _ = Self::CHECK;
let mut vec = Vec::with_capacity(N);
for i in 0..N {
vec.push(Slot {
status: AtomicUsize::new(i),
data: UnsafeCell::new(T::default()),
});
}
Self {
buffer: vec.into_boxed_slice(),
head: CachePaddedAtomic(AtomicUsize::new(0)),
tail: CachePaddedAtomic(AtomicUsize::new(0)),
}
}
pub fn push(&self, item: T) -> bool {
let mut tail = self.tail.0.load(Ordering::Relaxed);
let mask = N - 1;
loop {
let slot = unsafe { self.buffer.get_unchecked(tail & mask) };
let step = slot.status.load(Ordering::Acquire);
let diff = step as isize - tail as isize;
if diff == 0 {
let actual_tail = self.tail.0.fetch_add(1, Ordering::Relaxed);
let actual_slot = unsafe { self.buffer.get_unchecked(actual_tail & mask) };
while (actual_slot.status.load(Ordering::Acquire) as isize - actual_tail as isize) != 0 {
std::hint::spin_loop();
}
unsafe { *actual_slot.data.get() = item; }
actual_slot.status.store(actual_tail + 1, Ordering::Release);
return true;
} else if diff < 0 {
return false;
} else {
tail = self.tail.0.load(Ordering::Relaxed);
}
}
}
pub fn pop(&self) -> Option<T> {
let head = self.head.0.load(Ordering::Relaxed);
let mask = N - 1;
let slot = unsafe { self.buffer.get_unchecked(head & mask) };
let step = slot.status.load(Ordering::Acquire);
let diff = step as isize - (head + 1) as isize;
if diff == 0 {
let item = unsafe { std::ptr::read(slot.data.get()) };
slot.status.store(head.wrapping_add(mask + 1), Ordering::Release);
self.head.0.store(head + 1, Ordering::Relaxed);
return Some(item);
}
None
}
pub fn is_empty(&self) -> bool {
let head = self.head.0.load(Ordering::Relaxed);
let mask = N - 1;
let slot = unsafe { self.buffer.get_unchecked(head & mask) };
let step = slot.status.load(Ordering::Acquire);
(step as isize - (head + 1) as isize) < 0
}
}
1 post - 1 participant
🏷️ Rust_feed