This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via 6ce60cac1fea6d65803740f5f9b6627abf6814b3 (commit) via f2fd33bd4a83f92f080826accc064df94e263210 (commit) via cb93abaef977a8014e81b296e8656c9faa43eaf2 (commit) via 3330513ba71f43869538ce6e97962c59f331a5ad (commit) from 6adce4648bf228b60fc65312964a21c84771f58d (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 6ce60cac1fea6d65803740f5f9b6627abf6814b3 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Aug 24 16:19:48 2018 +0300
linux-gen: ring_spsc: move ring mask and data pointer
Store mask and data pointer in queue entry instead of ring structure. Data is constant and can be stored among other frequently used read only data. Also other ring type use the same variables.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_ring_spsc_internal.h b/platform/linux-generic/include/odp_ring_spsc_internal.h index e38bda1d..de122bf5 100644 --- a/platform/linux-generic/include/odp_ring_spsc_internal.h +++ b/platform/linux-generic/include/odp_ring_spsc_internal.h @@ -29,31 +29,27 @@ extern "C" { typedef struct { odp_atomic_u32_t head; odp_atomic_u32_t tail; - uint32_t mask; - uint32_t *data;
} ring_spsc_t;
/* Initialize ring. Ring size must be a power of two. */ -static inline void ring_spsc_init(ring_spsc_t *ring, uint32_t *data, - uint32_t size) +static inline void ring_spsc_init(ring_spsc_t *ring) { odp_atomic_init_u32(&ring->head, 0); odp_atomic_init_u32(&ring->tail, 0); - ring->mask = size - 1; - ring->data = data; }
/* Dequeue data from the ring head. Max_num is smaller than ring size.*/ -static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[], +static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, + uint32_t *ring_data, + uint32_t ring_mask, uint32_t data[], uint32_t max_num) { - uint32_t head, tail, mask, idx; + uint32_t head, tail, idx; uint32_t num, i;
tail = odp_atomic_load_acq_u32(&ring->tail); head = odp_atomic_load_u32(&ring->head); - mask = ring->mask; num = tail - head;
/* Empty */ @@ -63,11 +59,11 @@ static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[], if (num > max_num) num = max_num;
- idx = head & mask; + idx = head & ring_mask;
for (i = 0; i < num; i++) { - data[i] = ring->data[idx]; - idx = (idx + 1) & mask; + data[i] = ring_data[idx]; + idx = (idx + 1) & ring_mask; }
odp_atomic_store_rel_u32(&ring->head, head + num); @@ -77,16 +73,17 @@ static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[],
/* Enqueue data into the ring tail. Num_data is smaller than ring size. */ static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring, + uint32_t *ring_data, + uint32_t ring_mask, const uint32_t data[], uint32_t num_data) { - uint32_t head, tail, mask, size, idx; + uint32_t head, tail, size, idx; uint32_t num, i;
head = odp_atomic_load_acq_u32(&ring->head); tail = odp_atomic_load_u32(&ring->tail); - mask = ring->mask; - size = mask + 1; + size = ring_mask + 1; num = size - (tail - head);
/* Full */ @@ -96,11 +93,11 @@ static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring, if (num > num_data) num = num_data;
- idx = tail & mask; + idx = tail & ring_mask;
for (i = 0; i < num; i++) { - ring->data[idx] = data[i]; - idx = (idx + 1) & mask; + ring_data[idx] = data[i]; + idx = (idx + 1) & ring_mask; }
odp_atomic_store_rel_u32(&ring->tail, tail + num); diff --git a/platform/linux-generic/odp_queue_spsc.c b/platform/linux-generic/odp_queue_spsc.c index 0fd8d85a..002561a4 100644 --- a/platform/linux-generic/odp_queue_spsc.c +++ b/platform/linux-generic/odp_queue_spsc.c @@ -49,7 +49,8 @@ static inline int spsc_enq_multi(odp_queue_t handle, return -1; }
- return ring_spsc_enq_multi(ring_spsc, buf_idx, num); + return ring_spsc_enq_multi(ring_spsc, queue->s.ring_data, + queue->s.ring_mask, buf_idx, num); }
static inline int spsc_deq_multi(odp_queue_t handle, @@ -68,7 +69,8 @@ static inline int spsc_deq_multi(odp_queue_t handle, return -1; }
- num_deq = ring_spsc_deq_multi(ring_spsc, buf_idx, num); + num_deq = ring_spsc_deq_multi(ring_spsc, queue->s.ring_data, + queue->s.ring_mask, buf_idx, num);
if (num_deq == 0) return 0; @@ -127,6 +129,7 @@ void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
- ring_spsc_init(&queue->s.ring_spsc, &queue_glb->ring_data[offset], - queue_size); + queue->s.ring_data = &queue_glb->ring_data[offset]; + queue->s.ring_mask = queue_size - 1; + ring_spsc_init(&queue->s.ring_spsc); }
commit f2fd33bd4a83f92f080826accc064df94e263210 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Aug 24 16:04:54 2018 +0300
linux-gen: ring_st: move ring mask and data pointer
Store mask and data pointer in queue entry instead of ring structure. Data is constant and can be stored among other frequently used read only data. Also other ring type use the same variables.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_ring_st_internal.h b/platform/linux-generic/include/odp_ring_st_internal.h index 5fb37d4e..1bc18cda 100644 --- a/platform/linux-generic/include/odp_ring_st_internal.h +++ b/platform/linux-generic/include/odp_ring_st_internal.h @@ -19,30 +19,25 @@ extern "C" { typedef struct { uint32_t head; uint32_t tail; - uint32_t mask; - uint32_t *data; - } ring_st_t;
/* Initialize ring. Ring size must be a power of two. */ -static inline void ring_st_init(ring_st_t *ring, uint32_t *data, uint32_t size) +static inline void ring_st_init(ring_st_t *ring) { ring->head = 0; ring->tail = 0; - ring->mask = size - 1; - ring->data = data; }
/* Dequeue data from the ring head. Max_num is smaller than ring size.*/ -static inline uint32_t ring_st_deq_multi(ring_st_t *ring, uint32_t data[], +static inline uint32_t ring_st_deq_multi(ring_st_t *ring, uint32_t *ring_data, + uint32_t ring_mask, uint32_t data[], uint32_t max_num) { - uint32_t head, tail, mask, idx; + uint32_t head, tail, idx; uint32_t num, i;
head = ring->head; tail = ring->tail; - mask = ring->mask; num = tail - head;
/* Empty */ @@ -52,11 +47,11 @@ static inline uint32_t ring_st_deq_multi(ring_st_t *ring, uint32_t data[], if (num > max_num) num = max_num;
- idx = head & mask; + idx = head & ring_mask;
for (i = 0; i < num; i++) { - data[i] = ring->data[idx]; - idx = (idx + 1) & mask; + data[i] = ring_data[idx]; + idx = (idx + 1) & ring_mask; }
ring->head = head + num; @@ -65,16 +60,17 @@ static inline uint32_t ring_st_deq_multi(ring_st_t *ring, uint32_t data[], }
/* Enqueue data into the ring tail. Num_data is smaller than ring size. */ -static inline uint32_t ring_st_enq_multi(ring_st_t *ring, const uint32_t data[], +static inline uint32_t ring_st_enq_multi(ring_st_t *ring, uint32_t *ring_data, + uint32_t ring_mask, + const uint32_t data[], uint32_t num_data) { - uint32_t head, tail, mask, size, idx; + uint32_t head, tail, size, idx; uint32_t num, i;
head = ring->head; tail = ring->tail; - mask = ring->mask; - size = mask + 1; + size = ring_mask + 1; num = size - (tail - head);
/* Full */ @@ -84,11 +80,11 @@ static inline uint32_t ring_st_enq_multi(ring_st_t *ring, const uint32_t data[], if (num > num_data) num = num_data;
- idx = tail & mask; + idx = tail & ring_mask;
for (i = 0; i < num; i++) { - ring->data[idx] = data[i]; - idx = (idx + 1) & mask; + ring_data[idx] = data[i]; + idx = (idx + 1) & ring_mask; }
ring->tail = tail + num; diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index 8b9a70bb..7e8b7e34 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -687,7 +687,8 @@ static inline int _sched_queue_enq_multi(odp_queue_t handle, return -1; }
- num_enq = ring_st_enq_multi(ring_st, buf_idx, num); + num_enq = ring_st_enq_multi(ring_st, queue->s.ring_data, + queue->s.ring_mask, buf_idx, num);
if (odp_unlikely(num_enq == 0)) { UNLOCK(queue); @@ -728,7 +729,8 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num, return -1; }
- num_deq = ring_st_deq_multi(ring_st, buf_idx, max_num); + num_deq = ring_st_deq_multi(ring_st, queue->s.ring_data, + queue->s.ring_mask, buf_idx, max_num);
if (num_deq == 0) { /* Already empty queue */ @@ -875,8 +877,10 @@ static int queue_init(queue_entry_t *queue, const char *name, } else { queue->s.enqueue = sched_queue_enq; queue->s.enqueue_multi = sched_queue_enq_multi; - ring_st_init(&queue->s.ring_st, - &queue_glb->ring_data[offset], queue_size); + + queue->s.ring_data = &queue_glb->ring_data[offset]; + queue->s.ring_mask = queue_size - 1; + ring_st_init(&queue->s.ring_st); } }
commit cb93abaef977a8014e81b296e8656c9faa43eaf2 Author: Petri Savolainen petri.savolainen@linaro.org Date: Tue Aug 21 10:40:59 2018 +0300
linux-gen: queue: use mpmc ring in plain queues
Change plain queue implementation to use ring_mpmc instead of ticket lock and ring_st ring. Performance and scalability improves especially on 64 bit ARM.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h index 15e49772..46b74795 100644 --- a/platform/linux-generic/include/odp_queue_basic_internal.h +++ b/platform/linux-generic/include/odp_queue_basic_internal.h @@ -22,6 +22,7 @@ extern "C" { #include <odp/api/hints.h> #include <odp/api/ticketlock.h> #include <odp_config_internal.h> +#include <odp_ring_mpmc_internal.h> #include <odp_ring_st_internal.h> #include <odp_ring_spsc_internal.h> #include <odp_queue_lf.h> @@ -33,22 +34,29 @@ extern "C" { #define QUEUE_STATUS_SCHED 4
struct queue_entry_s { - odp_ticketlock_t ODP_ALIGNED_CACHE lock; - union { - ring_st_t ring_st; - ring_spsc_t ring_spsc; - }; - int status; - + /* The first cache line is read only */ queue_enq_fn_t ODP_ALIGNED_CACHE enqueue; queue_deq_fn_t dequeue; queue_enq_multi_fn_t enqueue_multi; queue_deq_multi_fn_t dequeue_multi; - queue_deq_multi_fn_t orig_dequeue_multi; + uint32_t *ring_data; + uint32_t ring_mask; + uint32_t index; + odp_queue_t handle; + odp_queue_type_t type; + + /* MPMC ring (2 cache lines). */ + ring_mpmc_t ring_mpmc;
- uint32_t index; - odp_queue_t handle; - odp_queue_type_t type; + odp_ticketlock_t lock; + union { + ring_st_t ring_st; + ring_spsc_t ring_spsc; + }; + + int status; + + queue_deq_multi_fn_t orig_dequeue_multi; odp_queue_param_t param; odp_pktin_queue_t pktin; odp_pktout_queue_t pktout; diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index e5d91564..8b9a70bb 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -400,8 +400,10 @@ static int queue_destroy(odp_queue_t handle)
if (queue->s.spsc) empty = ring_spsc_is_empty(&queue->s.ring_spsc); - else + else if (queue->s.type == ODP_QUEUE_TYPE_SCHED) empty = ring_st_is_empty(&queue->s.ring_st); + else + empty = ring_mpmc_is_empty(&queue->s.ring_mpmc);
if (!empty) { UNLOCK(queue); @@ -490,28 +492,19 @@ static inline int _plain_queue_enq_multi(odp_queue_t handle, { queue_entry_t *queue; int ret, num_enq; - ring_st_t *ring_st; + ring_mpmc_t *ring_mpmc; uint32_t buf_idx[num];
queue = qentry_from_handle(handle); - ring_st = &queue->s.ring_st; + ring_mpmc = &queue->s.ring_mpmc;
if (sched_fn->ord_enq_multi(handle, (void **)buf_hdr, num, &ret)) return ret;
buffer_index_from_buf(buf_idx, buf_hdr, num);
- LOCK(queue); - - if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { - UNLOCK(queue); - ODP_ERR("Bad queue status\n"); - return -1; - } - - num_enq = ring_st_enq_multi(ring_st, buf_idx, num); - - UNLOCK(queue); + num_enq = ring_mpmc_enq_multi(ring_mpmc, queue->s.ring_data, + queue->s.ring_mask, buf_idx, num);
return num_enq; } @@ -521,23 +514,14 @@ static inline int _plain_queue_deq_multi(odp_queue_t handle, { int num_deq; queue_entry_t *queue; - ring_st_t *ring_st; + ring_mpmc_t *ring_mpmc; uint32_t buf_idx[num];
queue = qentry_from_handle(handle); - ring_st = &queue->s.ring_st; + ring_mpmc = &queue->s.ring_mpmc;
- LOCK(queue); - - if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { - /* Bad queue, or queue has been destroyed. */ - UNLOCK(queue); - return -1; - } - - num_deq = ring_st_deq_multi(ring_st, buf_idx, num); - - UNLOCK(queue); + num_deq = ring_mpmc_deq_multi(ring_mpmc, queue->s.ring_data, + queue->s.ring_mask, buf_idx, num);
if (num_deq == 0) return 0; @@ -883,13 +867,17 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.dequeue = plain_queue_deq; queue->s.dequeue_multi = plain_queue_deq_multi; queue->s.orig_dequeue_multi = plain_queue_deq_multi; + + queue->s.ring_data = &queue_glb->ring_data[offset]; + queue->s.ring_mask = queue_size - 1; + ring_mpmc_init(&queue->s.ring_mpmc); + } else { queue->s.enqueue = sched_queue_enq; queue->s.enqueue_multi = sched_queue_enq_multi; + ring_st_init(&queue->s.ring_st, + &queue_glb->ring_data[offset], queue_size); } - - ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset], - queue_size); }
return 0;
commit 3330513ba71f43869538ce6e97962c59f331a5ad Author: Petri Savolainen petri.savolainen@linaro.org Date: Mon Aug 20 17:38:18 2018 +0300
linux-gen: ring_mpmc: new multi-producer, multi-consumer ring
The ring is similar to ring_internal.h, but checks for ring fullness. This ring can be used for storing events in a queue as enqueues can be tried on an already full queue.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 80f96875..ab0b755d 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -127,6 +127,7 @@ noinst_HEADERS = \ include/odp_queue_lf.h \ include/odp_queue_scalable_internal.h \ include/odp_ring_internal.h \ + include/odp_ring_mpmc_internal.h \ include/odp_ring_spsc_internal.h \ include/odp_ring_st_internal.h \ include/odp_schedule_if.h \ diff --git a/platform/linux-generic/include/odp_ring_mpmc_internal.h b/platform/linux-generic/include/odp_ring_mpmc_internal.h new file mode 100644 index 00000000..74bbb8fc --- /dev/null +++ b/platform/linux-generic/include/odp_ring_mpmc_internal.h @@ -0,0 +1,169 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_MPMC_INTERNAL_H_ +#define ODP_RING_MPMC_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp/api/atomic.h> +#include <odp/api/cpu.h> +#include <odp/api/hints.h> +#include <odp_align_internal.h> +#include <odp/api/plat/atomic_inlines.h> +#include <odp/api/plat/cpu_inlines.h> + +/* Ring of uint32_t data + * + * Ring stores head and tail counters. Ring indexes are formed from these + * counters with a mask (mask = ring_size - 1), which requires that ring size + * must be a power of two. + * + * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + * | E | E | | | | | | | | | | E | E | E | E | E | + * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + * ^ ^ ^ ^ + * | | | | + * r_tail r_head w_tail w_head + * + */ +typedef struct { + odp_atomic_u32_t ODP_ALIGNED_CACHE r_head; + odp_atomic_u32_t r_tail; + + odp_atomic_u32_t ODP_ALIGNED_CACHE w_head; + odp_atomic_u32_t w_tail; + +} ring_mpmc_t; + +static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom, + uint32_t *old_val, uint32_t new_val) +{ + return __atomic_compare_exchange_n(&atom->v, old_val, new_val, + 0 /* strong */, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED); +} + +/* Initialize ring */ +static inline void ring_mpmc_init(ring_mpmc_t *ring) +{ + odp_atomic_init_u32(&ring->w_head, 0); + odp_atomic_init_u32(&ring->w_tail, 0); + odp_atomic_init_u32(&ring->r_head, 0); + odp_atomic_init_u32(&ring->r_tail, 0); +} + +/* Dequeue data from the ring head. Num is smaller than ring size. */ +static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring, + uint32_t *ring_data, + uint32_t ring_mask, + uint32_t data[], + uint32_t num) +{ + uint32_t old_head, new_head, w_tail, num_data, i; + + /* Load acquires ensure that w_tail load happens after r_head load, + * and thus r_head value is always behind or equal to w_tail value. + * When CAS operation succeeds, this thread owns data between old + * and new r_head. */ + do { + old_head = odp_atomic_load_acq_u32(&ring->r_head); + odp_prefetch(&ring_data[(old_head + 1) & ring_mask]); + w_tail = odp_atomic_load_acq_u32(&ring->w_tail); + num_data = w_tail - old_head; + + /* Ring is empty */ + if (num_data == 0) + return 0; + + /* Try to take all available */ + if (num > num_data) + num = num_data; + + new_head = old_head + num; + + } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r_head, &old_head, + new_head) == 0)); + + /* Read data. This will not move above load acquire of r_head. */ + for (i = 0; i < num; i++) + data[i] = ring_data[(old_head + 1 + i) & ring_mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != old_head)) + odp_cpu_pause(); + + /* Release the new reader tail, writers acquire it. */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + + return num; +} + +/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */ +static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring, + uint32_t *ring_data, + uint32_t ring_mask, + const uint32_t data[], + uint32_t num) +{ + uint32_t old_head, new_head, r_tail, num_free, i; + uint32_t size = ring_mask + 1; + + /* Load acquires ensure that w_head load happens after r_tail load, + * and thus r_tail value is always behind or equal to w_head value. + * When CAS operation succeeds, this thread owns data between old + * and new w_head. */ + do { + r_tail = odp_atomic_load_acq_u32(&ring->r_tail); + old_head = odp_atomic_load_acq_u32(&ring->w_head); + + num_free = size - (old_head - r_tail); + + /* Ring is full */ + if (num_free == 0) + return 0; + + /* Try to use all available */ + if (num > num_free) + num = num_free; + + new_head = old_head + num; + + } while (odp_unlikely(ring_mpmc_cas_u32(&ring->w_head, &old_head, + new_head) == 0)); + + /* Write data. This will not move above load acquire of w_head. */ + for (i = 0; i < num; i++) + ring_data[(old_head + 1 + i) & ring_mask] = data[i]; + + /* Wait until other writers have updated the tail */ + while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head)) + odp_cpu_pause(); + + /* Release the new writer tail, readers acquire it. */ + odp_atomic_store_rel_u32(&ring->w_tail, new_head); + + return num; +} + +/* Check if ring is empty */ +static inline int ring_mpmc_is_empty(ring_mpmc_t *ring) +{ + uint32_t head = odp_atomic_load_u32(&ring->r_head); + uint32_t tail = odp_atomic_load_u32(&ring->w_tail); + + return head == tail; +} + +#ifdef __cplusplus +} +#endif + +#endif
-----------------------------------------------------------------------
Summary of changes: platform/linux-generic/Makefile.am | 1 + .../include/odp_queue_basic_internal.h | 30 ++-- .../linux-generic/include/odp_ring_mpmc_internal.h | 169 +++++++++++++++++++++ .../linux-generic/include/odp_ring_spsc_internal.h | 33 ++-- .../linux-generic/include/odp_ring_st_internal.h | 34 ++--- platform/linux-generic/odp_queue_basic.c | 56 +++---- platform/linux-generic/odp_queue_spsc.c | 11 +- 7 files changed, 250 insertions(+), 84 deletions(-) create mode 100644 platform/linux-generic/include/odp_ring_mpmc_internal.h
hooks/post-receive