A “ping-pong buffer” is a data structure which is very useful in systems programming. The ping-pong buffer is two-element buffer which allows simultaneous access by a single producer and a single consumer. One element is reserved for writing by the producer, and the other element is reserved for reading by the consumer. When writing and reading are finished, the roles of the two elements are swapped (i.e. the one which was written will be next to be read, and the one which was read will be next to be overwritten). This way, the data which is written doesn’t have to be copied to a different memory location in order to be read.
Jason Sachs has an excellent EmbeddedRelated post about ping-pong buffers, although he calls them “revolving fireplaces”. I highly recommend reading that post (and indeed, every article that Jason has written).
Lately, the Rust programming language has been an object of my fascination. As a low-level systems language, Rust excels at providing expressiveness and abstraction without overhead. It’s possible to implement a ping-pong buffer as a Rust “smart pointer” type (similar to the built-in RefCell), which preserves Rust’s memory safety guarantees with a minimum of runtime overhead.
I wrote a Rust library called “atomic_pingpong”, which provides a generic implementation of ping-pong buffer functionality. It uses the built-in atomic types to provide lock-free thread-safety between the producer and consumer. The library is published and documented through crates.io, with source code available from GitHub.
Usage
Let’s create a ping-pong buffer where each element is an array of 100 integers.
let buffer = atomic_pingpong::Buffer::new([0; 100]);
To write data into the buffer, we obtain a mutable reference to a
buffer element by calling Buffer::write()
. (Actually, write()
returns an Option
– the call to write()
might fail because the
ping-pong buffer only allows a single producer to write data at a
time. If we know that we are the only producer, we can unwrap()
the
result of write()
. Otherwise, we would have to handle the case of a
None
return value.)
fn write_values(new_val: i32) {
let mut arr = buffer.write().unwrap();
for val in arr.iter_mut() {
*val = new_val;
}
}
Similarly, to read data from the buffer, we obtain a reference to a
buffer element by calling Buffer::read()
. (And once again, this
actually returns an Option
, because only a single consumer is
allowed to read data at a time.)
fn read_values() {
let arr = buffer.read().unwrap();
for (i, val) in arr.iter().enumerate() {
println!("arr[{}] = {}", i, val);
}
}
In this example, the ping-pong buffer has three critical attributes:
-
We can call
write_values()
andread_values()
concurrently, from different threads. Both functions will receive a reference for reading or writing on every call, without any blocking or critical sections. -
The
read_values()
function will never see partially complete data (from awrite_values()
call that is in progress at the same time). Data is only read after the entire buffer element is finished being written. -
The buffer never needs to copy data in memory. This means it would scale to, e.g., billion-element arrays, while maintaining the same constant overhead for reading and writing.
Implementation
The state of the buffer consists of four boolean values. These are stored
as four bits in an AtomicU8
:
struct BufferState(atomic::AtomicU8);
impl BufferState {
// Bits of the bitmask:
const LOCK_READ: u8 = 0b0000_0001; // reading is in progress
const LOCK_WRITE: u8 = 0b0000_0010; // writing is in progress
const MODE_IS_FLIPPED: u8 = 0b0000_0100; // buffer is flip-flopped
const WANT_MODE_CHANGE: u8 = 0b0000_1000; // buffer is ready to flip-flop
const fn new() -> Self {
Self(atomic::AtomicU8::new(0))
}
(A fifth bit of state, NEW_DATA_READY
, is present in the library to
support applications in which each buffer element should be read precisely
once after being written. For simplicity, it’s omitted from this explanation.)
To begin reading (BufferState::lock_read()
) or writing
(BufferState::lock_write()
), we check if the corresponding lock bit
is already set. If it is, we return None
: we can’t start the
requested operation because it’s already in progress. Otherwise, we
return Some(true)
if the buffer is currently flip-flopped, and
Some(false)
if it’s not.
fn lock(&self, condition: fn(u8) -> bool, action: fn(u8) -> u8) -> Option<bool> {
let mut new_flags = None::<u8>;
let _ = self.0.fetch_update(
atomic::Ordering::Acquire,
atomic::Ordering::Relaxed,
|flags| {
if condition(flags) {
new_flags = Some(action(flags));
}
new_flags
},
);
new_flags.map(|f| f & Self::MODE_IS_FLIPPED != 0)
}
fn lock_read(&self, allow_repeated: bool) -> Option<bool> {
self.lock(
|flags| flags & Self::LOCK_READ == 0,
|flags| flags | Self::LOCK_READ,
)
}
fn lock_write(&self, allow_repeated: bool) -> Option<bool> {
self.lock(
|flags| flags & Self::LOCK_WRITE == 0,
|flags| flags | Self::LOCK_WRITE,
)
}
When reading or writing is finished, we clear the corresponding flag, and also check if the buffer needs to be flip-flopped:
-
When writing finishes, we want to flip-flop the buffer. However, if reading is in progress at the time when writing finishes, all we can do is set the flag that we are ready to flip-flop later.
-
When reading finishes, we check if the ready to flip-flop flag was previously set. If so, and if writing is not in progress, we flip-flop the buffer.
fn release(&self, action: fn(u8) -> u8) {
let _ = self.0.fetch_update(
atomic::Ordering::Release,
atomic::Ordering::Relaxed,
|flags| Some(action(flags)),
);
}
fn release_read(&self) {
self.release(|mut flags| {
flags &= !Self::LOCK_READ;
if flags & (Self::LOCK_WRITE | Self::WANT_MODE_CHANGE) == Self::WANT_MODE_CHANGE {
flags &= !Self::WANT_MODE_CHANGE;
flags ^= Self::MODE_IS_FLIPPED;
}
flags
})
}
fn release_write(&self) {
self.release(|mut flags| {
flags &= !Self::LOCK_WRITE;
if flags & Self::LOCK_READ == 0 {
flags &= !Self::WANT_MODE_CHANGE;
flags ^= Self::MODE_IS_FLIPPED;
} else {
flags |= Self::WANT_MODE_CHANGE;
}
flags
})
}
}
Calls to Buffer::read()
and Buffer::write()
will, of course, call
BufferState::lock_read()
and BufferState::lock_write()
as needed
to begin reading or writing. But how can we make sure unlock_read()
or unlock_write()
gets called when reading or writing is finished?
The answer to this question is Rust’s “smart pointer” concept.
Instead of returning Option<&T>
from Buffer<T>::read()
, we’ll
return an Option<Ref<T>>
. The type Ref<T>
implements the Deref
trait, so it can be used in the same way as a regular &T
reference,
but it has a special Drop
handler which updates the buffer state
when it goes out of scope.
pub struct Ref<'a, T> {
ptr: &'a T,
state: &'a BufferState,
}
impl<'a, T> Deref for Ref<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.ptr
}
}
impl<'a, T> Drop for Ref<'a, T> {
fn drop(&mut self) {
self.state.release_read();
}
}
For Buffer::write()
, we similarly define RefMut
, which implements
DerefMut
instead of Deref
and calls release_write()
instead of
release_read()
.
Now we’re ready to define the Buffer
struct:
pub struct Buffer<T> {
ping: UnsafeCell<T>,
pong: UnsafeCell<T>,
state: BufferState,
}
We use UnsafeCell<T>
instead of T
because the Buffer
type has
interior mutability: you are allowed to mutate a Buffer
by calling
read()
or write()
, even if you only have an immutable reference
to the buffer.
The last bits of plumbing are straightforward:
impl<T> Buffer<T> {
fn get_pointer(&self, state: bool, read: bool) -> *mut T {
// state = false => read ping and write pong
// state = true => read pong and write ping
(if state ^ read { &self.ping } else { &self.pong }).get()
}
pub fn read(&self) -> Option<Ref<T>> {
let mode = buf.state.lock_read()?;
Some(Ref {
ptr: unsafe { &*self.get_pointer(mode, true) },
state: &self.state,
})
}
pub fn write(&self) -> Option<RefMut<T>> {
let mode = buf.state.lock_write()?;
Some(RefMut {
ptr: unsafe { &mut *self.get_pointer(mode, false) },
state: &self.state,
})
}
}