On Mon, 2025-08-04 at 10:20 +0800, Xu Kuohai wrote:
[...]
@@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n) return cnt; } +static int64_t ringbuf_process_overwrite_ring(struct ring *r, size_t n) +{
- int err;
- uint32_t *len_ptr, len;
- /* 64-bit to avoid overflow in case of extreme application behavior */
- int64_t cnt = 0;
- size_t size, offset;
- unsigned long cons_pos, prod_pos, over_pos, tmp_pos;
- bool got_new_data;
- void *sample;
- bool copied;
- size = r->mask + 1;
- cons_pos = smp_load_acquire(r->consumer_pos);
- do {
got_new_data = false;
/* grab a copy of data */
prod_pos = smp_load_acquire(r->producer_pos);
do {
over_pos = READ_ONCE(*r->overwrite_pos);
/* prod_pos may be outdated now */
if (over_pos < prod_pos) {
tmp_pos = max(cons_pos, over_pos);
/* smp_load_acquire(r->producer_pos) before
* READ_ONCE(*r->overwrite_pos) ensures that
* over_pos + r->mask < prod_pos never occurs,
* so size is never larger than r->mask
*/
size = prod_pos - tmp_pos;
if (!size)
goto done;
memcpy(r->read_buffer,
r->data + (tmp_pos & r->mask), size);
copied = true;
} else {
copied = false;
}
prod_pos = smp_load_acquire(r->producer_pos);
/* retry if data is overwritten by producer */
} while (!copied || prod_pos - tmp_pos > r->mask);
Could you please elaborate a bit, why this condition is sufficient to guarantee that r->overwrite_pos had not changed while memcpy() was executing?
cons_pos = tmp_pos;
for (offset = 0; offset < size; offset += roundup_len(len)) {
len_ptr = r->read_buffer + (offset & r->mask);
len = *len_ptr;
if (len & BPF_RINGBUF_BUSY_BIT)
goto done;
got_new_data = true;
cons_pos += roundup_len(len);
if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
err = r->sample_cb(r->ctx, sample, len);
if (err < 0) {
/* update consumer pos and bail out */
smp_store_release(r->consumer_pos,
cons_pos);
return err;
}
cnt++;
}
if (cnt >= n)
goto done;
}
- } while (got_new_data);
+done:
- smp_store_release(r->consumer_pos, cons_pos);
- return cnt;
+}
[...]