On Sun, Aug 3, 2025 at 7:27 PM Xu Kuohai xukuohai@huaweicloud.com wrote:
From: Xu Kuohai xukuohai@huawei.com
In overwrite mode, the producer does not wait for the consumer, so the consumer is responsible for handling conflicts. An optimistic method is used to resolve the conflicts: the consumer first reads consumer_pos, producer_pos and overwrite_pos, then calculates a read window and copies data in the window from the ring buffer. After copying, it checks the positions to decide if the data in the copy window have been overwritten by be the producer. If so, it discards the copy and tries again. Once success, the consumer processes the events in the copy.
Signed-off-by: Xu Kuohai xukuohai@huawei.com
tools/lib/bpf/ringbuf.c | 103 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-)
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c index 9702b70da444..9c072af675ff 100644 --- a/tools/lib/bpf/ringbuf.c +++ b/tools/lib/bpf/ringbuf.c @@ -27,10 +27,13 @@ struct ring { ring_buffer_sample_fn sample_cb; void *ctx; void *data;
- void *read_buffer;
unsigned long *consumer_pos; unsigned long *producer_pos;
- unsigned long *overwrite_pos;
unsigned long mask; int map_fd;
- bool overwrite_mode;
};
struct ring_buffer { @@ -69,6 +72,9 @@ static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r) r->producer_pos = NULL; }
- if (r->read_buffer)
- free(r->read_buffer);
free(r); }
@@ -119,6 +125,14 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, r->sample_cb = sample_cb; r->ctx = ctx; r->mask = info.max_entries - 1;
- r->overwrite_mode = info.map_flags & BPF_F_OVERWRITE;
- if (unlikely(r->overwrite_mode)) {
- r->read_buffer = malloc(info.max_entries);
- if (!r->read_buffer) {
- err = -ENOMEM;
- goto err_out;
- }
- }
/* Map writable consumer page */ tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0); @@ -148,6 +162,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, goto err_out; } r->producer_pos = tmp;
- r->overwrite_pos = r->producer_pos + 1; /* overwrite_pos is next to producer_pos */
r->data = tmp + rb->page_size;
e = &rb->events[rb->ring_cnt]; @@ -232,7 +247,7 @@ static inline int roundup_len(__u32 len) return (len + 7) / 8 * 8; }
-static int64_t ringbuf_process_ring(struct ring *r, size_t n) +static int64_t ringbuf_process_normal_ring(struct ring *r, size_t n) { int *len_ptr, len, err; /* 64-bit to avoid overflow in case of extreme application behavior */ @@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n) return cnt; }
+static int64_t ringbuf_process_overwrite_ring(struct ring *r, size_t n) +{
- int err;
- uint32_t *len_ptr, len;
- /* 64-bit to avoid overflow in case of extreme application behavior */
- int64_t cnt = 0;
- size_t size, offset;
- unsigned long cons_pos, prod_pos, over_pos, tmp_pos;
- bool got_new_data;
- void *sample;
- bool copied;
- size = r->mask + 1;
- cons_pos = smp_load_acquire(r->consumer_pos);
- do {
- got_new_data = false;
- /* grab a copy of data */
- prod_pos = smp_load_acquire(r->producer_pos);
- do {
- over_pos = READ_ONCE(*r->overwrite_pos);
- /* prod_pos may be outdated now */
- if (over_pos < prod_pos) {
- tmp_pos = max(cons_pos, over_pos);
- /* smp_load_acquire(r->producer_pos) before
- READ_ONCE(*r->overwrite_pos) ensures that
- over_pos + r->mask < prod_pos never occurs,
- so size is never larger than r->mask
- */
- size = prod_pos - tmp_pos;
- if (!size)
- goto done;
- memcpy(r->read_buffer,
- r->data + (tmp_pos & r->mask), size);
- copied = true;
- } else {
- copied = false;
- }
- prod_pos = smp_load_acquire(r->producer_pos);
- /* retry if data is overwritten by producer */
- } while (!copied || prod_pos - tmp_pos > r->mask);
This seems to allow for a situation where a call to process the ring can infinite loop if the producers are producing and overwriting fast enough. That seems suboptimal to me?
Should there be a timeout or maximum number of attempts or something that returns -EBUSY or another error to the user?
- cons_pos = tmp_pos;
- for (offset = 0; offset < size; offset += roundup_len(len)) {
- len_ptr = r->read_buffer + (offset & r->mask);
- len = *len_ptr;
- if (len & BPF_RINGBUF_BUSY_BIT)
- goto done;
- got_new_data = true;
- cons_pos += roundup_len(len);
- if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
- sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
- err = r->sample_cb(r->ctx, sample, len);
- if (err < 0) {
- /* update consumer pos and bail out */
- smp_store_release(r->consumer_pos,
- cons_pos);
- return err;
- }
- cnt++;
- }
- if (cnt >= n)
- goto done;
- }
- } while (got_new_data);
+done:
- smp_store_release(r->consumer_pos, cons_pos);
- return cnt;
+}
+static int64_t ringbuf_process_ring(struct ring *r, size_t n) +{
- if (likely(!r->overwrite_mode))
- return ringbuf_process_normal_ring(r, n);
- else
- return ringbuf_process_overwrite_ring(r, n);
+}
/* Consume available ring buffer(s) data without event polling, up to n
- records.
-- 2.43.0