buf_ring: Keep the full head and tail values

If a thread reads the head but then sleeps for long enough that
another thread fills the ring and leaves the new head with the
expected value then the cmpset can pass when it should have failed.

To work around this keep the full head and tail value and use the
upper bits as a generation count.

Reviewed by:	kib
Sponsored by:	Arm Ltd
Differential Revision:	https://reviews.freebsd.org/D46151
This commit is contained in:
Andrew Turner
2024-08-19 10:06:44 +01:00
parent 17a597bc13
commit 3cc603909e
+56 -31
View File
@@ -40,6 +40,15 @@
#include <sys/mutex.h>
#endif
/*
* We only apply the mask to the head and tail values when calculating the
* index into br_ring to access. This means the upper bits can be used as
* epoch to reduce the chance the atomic_cmpset succeedes when it should
* fail, e.g. when the head wraps while the CPU is in an interrupt. This
* is a probablistic fix as there is still a very unlikely chance the
* value wraps back to the expected value.
*
*/
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
@@ -63,28 +72,28 @@ struct buf_ring {
static __inline int
buf_ring_enqueue(struct buf_ring *br, void *buf)
{
uint32_t prod_head, prod_next, cons_tail;
#ifdef DEBUG_BUFRING
int i;
uint32_t prod_head, prod_next, prod_idx;
uint32_t cons_tail, mask;
mask = br->br_prod_mask;
#ifdef DEBUG_BUFRING
/*
* Note: It is possible to encounter an mbuf that was removed
* via drbr_peek(), and then re-added via drbr_putback() and
* trigger a spurious panic.
*/
for (i = br->br_cons_head; i != br->br_prod_head;
i = ((i + 1) & br->br_cons_mask))
if (br->br_ring[i] == buf)
for (uint32_t i = br->br_cons_head; i != br->br_prod_head; i++)
if (br->br_ring[i & mask] == buf)
panic("buf=%p already enqueue at %d prod=%d cons=%d",
buf, i, br->br_prod_tail, br->br_cons_tail);
#endif
critical_enter();
do {
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
prod_next = prod_head + 1;
cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
rmb();
if (prod_head == br->br_prod_head &&
cons_tail == br->br_cons_tail) {
@@ -95,11 +104,12 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
continue;
}
} while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
prod_idx = prod_head & mask;
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
if (br->br_ring[prod_idx] != NULL)
panic("dangling value in enqueue");
#endif
br->br_ring[prod_head] = buf;
br->br_ring[prod_idx] = buf;
/*
* If there are other enqueues in progress
@@ -120,23 +130,26 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
static __inline void *
buf_ring_dequeue_mc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
uint32_t cons_head, cons_next, cons_idx;
uint32_t mask;
void *buf;
critical_enter();
mask = br->br_cons_mask;
do {
cons_head = br->br_cons_head;
cons_next = (cons_head + 1) & br->br_cons_mask;
cons_next = cons_head + 1;
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
} while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
cons_idx = cons_head & mask;
buf = br->br_ring[cons_head];
buf = br->br_ring[cons_idx];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
br->br_ring[cons_idx] = NULL;
#endif
/*
* If there are other dequeues in progress
@@ -160,8 +173,8 @@ buf_ring_dequeue_mc(struct buf_ring *br)
static __inline void *
buf_ring_dequeue_sc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
uint32_t cons_head, cons_next, cons_idx;
uint32_t prod_tail, mask;
void *buf;
/*
@@ -189,6 +202,7 @@ buf_ring_dequeue_sc(struct buf_ring *br)
*
* <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
*/
mask = br->br_cons_mask;
#if defined(__arm__) || defined(__aarch64__)
cons_head = atomic_load_acq_32(&br->br_cons_head);
#else
@@ -196,16 +210,17 @@ buf_ring_dequeue_sc(struct buf_ring *br)
#endif
prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_next = (cons_head + 1) & br->br_cons_mask;
cons_next = cons_head + 1;
if (cons_head == prod_tail)
if (cons_head == prod_tail)
return (NULL);
cons_idx = cons_head & mask;
br->br_cons_head = cons_next;
buf = br->br_ring[cons_head];
buf = br->br_ring[cons_idx];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
br->br_ring[cons_idx] = NULL;
#ifdef _KERNEL
if (!mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
@@ -226,18 +241,21 @@ buf_ring_dequeue_sc(struct buf_ring *br)
static __inline void
buf_ring_advance_sc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
uint32_t cons_head, cons_next, prod_tail;
#ifdef DEBUG_BUFRING
uint32_t mask;
mask = br->br_cons_mask;
#endif
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;
cons_next = (cons_head + 1) & br->br_cons_mask;
if (cons_head == prod_tail)
cons_next = cons_head + 1;
if (cons_head == prod_tail)
return;
br->br_cons_head = cons_next;
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
br->br_ring[cons_head & mask] = NULL;
#endif
br->br_cons_tail = cons_next;
}
@@ -261,9 +279,12 @@ buf_ring_advance_sc(struct buf_ring *br)
static __inline void
buf_ring_putback_sc(struct buf_ring *br, void *new)
{
KASSERT(br->br_cons_head != br->br_prod_tail,
uint32_t mask;
mask = br->br_cons_mask;
KASSERT((br->br_cons_head & mask) != (br->br_prod_tail & mask),
("Buf-Ring has none in putback")) ;
br->br_ring[br->br_cons_head] = new;
br->br_ring[br->br_cons_head & mask] = new;
}
/*
@@ -274,11 +295,13 @@ buf_ring_putback_sc(struct buf_ring *br, void *new)
static __inline void *
buf_ring_peek(struct buf_ring *br)
{
uint32_t mask;
#if defined(DEBUG_BUFRING) && defined(_KERNEL)
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
mask = br->br_cons_mask;
/*
* I believe it is safe to not have a memory barrier
* here because we control cons and tail is worst case
@@ -288,12 +311,13 @@ buf_ring_peek(struct buf_ring *br)
if (br->br_cons_head == br->br_prod_tail)
return (NULL);
return (br->br_ring[br->br_cons_head]);
return (br->br_ring[br->br_cons_head & mask]);
}
static __inline void *
buf_ring_peek_clear_sc(struct buf_ring *br)
{
uint32_t mask;
void *ret;
#if defined(DEBUG_BUFRING) && defined(_KERNEL)
@@ -301,6 +325,7 @@ buf_ring_peek_clear_sc(struct buf_ring *br)
panic("lock not held on single consumer dequeue");
#endif
mask = br->br_cons_mask;
if (br->br_cons_head == br->br_prod_tail)
return (NULL);
@@ -318,13 +343,13 @@ buf_ring_peek_clear_sc(struct buf_ring *br)
atomic_thread_fence_acq();
#endif
ret = br->br_ring[br->br_cons_head];
ret = br->br_ring[br->br_cons_head & mask];
#ifdef DEBUG_BUFRING
/*
* Single consumer, i.e. cons_head will not move while we are
* running, so atomic_swap_ptr() is not necessary here.
*/
br->br_ring[br->br_cons_head] = NULL;
br->br_ring[br->br_cons_head & mask] = NULL;
#endif
return (ret);
}
@@ -333,7 +358,7 @@ static __inline int
buf_ring_full(struct buf_ring *br)
{
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
return (br->br_prod_head == br->br_cons_tail + br->br_cons_size - 1);
}
static __inline int