On Thu, Aug 14, 2025 at 12:34 PM Eduard Zingerman eddyz87@gmail.com wrote:
On Mon, 2025-08-04 at 10:20 +0800, Xu Kuohai wrote:
[...]
@@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n) return cnt; }
+static int64_t ringbuf_process_overwrite_ring(struct ring *r, size_t n) +{
int err;
uint32_t *len_ptr, len;
/* 64-bit to avoid overflow in case of extreme application behavior */
int64_t cnt = 0;
size_t size, offset;
unsigned long cons_pos, prod_pos, over_pos, tmp_pos;
bool got_new_data;
void *sample;
bool copied;
size = r->mask + 1;
cons_pos = smp_load_acquire(r->consumer_pos);
do {
got_new_data = false;
/* grab a copy of data */
prod_pos = smp_load_acquire(r->producer_pos);
do {
over_pos = READ_ONCE(*r->overwrite_pos);
/* prod_pos may be outdated now */
if (over_pos < prod_pos) {
tmp_pos = max(cons_pos, over_pos);
/* smp_load_acquire(r->producer_pos) before
* READ_ONCE(*r->overwrite_pos) ensures that
* over_pos + r->mask < prod_pos never occurs,
* so size is never larger than r->mask
*/
size = prod_pos - tmp_pos;
if (!size)
goto done;
memcpy(r->read_buffer,
r->data + (tmp_pos & r->mask), size);
copied = true;
} else {
copied = false;
}
prod_pos = smp_load_acquire(r->producer_pos);
/* retry if data is overwritten by producer */
} while (!copied || prod_pos - tmp_pos > r->mask);
Could you please elaborate a bit, why this condition is sufficient to guarantee that r->overwrite_pos had not changed while memcpy() was executing?
It isn't sufficient to guarantee that, but does it need tobe ? The concern is that the data being memcpy-ed might have been overwritten, right? This condition is sufficient to guarantee that can't happen without forcing another loop iteration.
For the producer to overwrite a memcpy-ed byte, it must have looped around the entire buffer, so r->producer_pos will be at least r->mask + 1 more than tmp_pos. The +1 is because r->producer_pos first had to produce the byte that got overwritten for it to be included in the memcpy, then produce it a second time to overwrite it.
Since the code rereads r->producer_pos before making the check, if any bytes have been overwritten, prod_pos - tmp_pos will be at least r->mask + 1, so the check will return true and the loop will iterate again, and a new memcpy will be performed.
cons_pos = tmp_pos;
for (offset = 0; offset < size; offset += roundup_len(len)) {
len_ptr = r->read_buffer + (offset & r->mask);
len = *len_ptr;
if (len & BPF_RINGBUF_BUSY_BIT)
goto done;
got_new_data = true;
cons_pos += roundup_len(len);
if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
err = r->sample_cb(r->ctx, sample, len);
if (err < 0) {
/* update consumer pos and bail out */
smp_store_release(r->consumer_pos,
cons_pos);
return err;
}
cnt++;
}
if (cnt >= n)
goto done;
}
} while (got_new_data);
+done:
smp_store_release(r->consumer_pos, cons_pos);
return cnt;
+}
[...]