How to create lock free ring buffer MPSC channel?

⚓ Rust    📅 2026-03-02    👤 surdeus    👁️ 2      

surdeus

I'm learning lock-free programming in Rust. Is this correct? I need help verifying the code. Also, is there another way so I don't have to push elements one by one in fn new? I'm trying to use MaybeUninit with a loop and write to fill the values in new. Which is better between using a Vec or MaybeUninit for initialization with minimal overhead if the channel size is large?

My progress is like this

use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;

#[repr(align(64))]
struct CachePaddedAtomic(AtomicUsize);

#[repr(align(64))]
struct Slot<T> {
    status: AtomicUsize,
    data: UnsafeCell<T>,
}

pub struct MpscBuffer<T, const N: usize> {
    buffer: Box<[Slot<T>]>,
    head: CachePaddedAtomic,
    tail: CachePaddedAtomic,
}

unsafe impl<T: Send, const N: usize> Sync for MpscBuffer<T, N> {}

impl<T: Default, const N: usize> MpscBuffer<T, N> {
    const CHECK: () = assert!(N > 0 && (N & (N - 1) == 0), "N must be a power of two");
    
    pub fn new() -> Self {
        let _ = Self::CHECK;
        let mut vec = Vec::with_capacity(N);
        
        for i in 0..N {
            vec.push(Slot {
                status: AtomicUsize::new(i),
                data: UnsafeCell::new(T::default()),
            });
        }

        Self {
            buffer: vec.into_boxed_slice(),
            head: CachePaddedAtomic(AtomicUsize::new(0)),
            tail: CachePaddedAtomic(AtomicUsize::new(0)),
        }
    }

    pub fn push(&self, item: T) -> bool {
        let mut tail = self.tail.0.load(Ordering::Relaxed);
        let mask = N - 1;
        
        loop {
            let slot = unsafe { self.buffer.get_unchecked(tail & mask) };
            let step = slot.status.load(Ordering::Acquire);
            let diff = step as isize - tail as isize;

            if diff == 0 {
                let actual_tail = self.tail.0.fetch_add(1, Ordering::Relaxed);
                let actual_slot = unsafe { self.buffer.get_unchecked(actual_tail & mask) };
                
                while (actual_slot.status.load(Ordering::Acquire) as isize - actual_tail as isize) != 0 {
                    std::hint::spin_loop();
                }

                unsafe { *actual_slot.data.get() = item; }
                actual_slot.status.store(actual_tail + 1, Ordering::Release);
                return true;
            } else if diff < 0 {
                return false;
            } else {
                tail = self.tail.0.load(Ordering::Relaxed);
            }
        }
    }

    pub fn pop(&self) -> Option<T> {
        let head = self.head.0.load(Ordering::Relaxed);
        let mask = N - 1;
        let slot = unsafe { self.buffer.get_unchecked(head & mask) };
        let step = slot.status.load(Ordering::Acquire);
        let diff = step as isize - (head + 1) as isize;

        if diff == 0 {
            let item = unsafe { std::ptr::read(slot.data.get()) };
            
            slot.status.store(head.wrapping_add(mask + 1), Ordering::Release);
            self.head.0.store(head + 1, Ordering::Relaxed);
            return Some(item);
        }
        None
    }
    
    pub fn is_empty(&self) -> bool {
        let head = self.head.0.load(Ordering::Relaxed);
        let mask = N - 1;
        let slot = unsafe { self.buffer.get_unchecked(head & mask) };
        let step = slot.status.load(Ordering::Acquire);

        (step as isize - (head + 1) as isize) < 0
    }
    
}

1 post - 1 participant

Read full topic

🏷️ Rust_feed