This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via 5cab9bd4190cec9535f4784c57c9ea9001543516 (commit) via 0a5eff079dfa17ecb5bc67244eeb6983e1bcf296 (commit) from b3cacf62c188e2d2696bb6993a9328127274b9b0 (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 5cab9bd4190cec9535f4784c57c9ea9001543516 Author: Matias Elo matias.elo@nokia.com Date: Thu Apr 11 14:44:23 2019 +0300
linux-gen: pool: use pointer ring to store buffer headers
Modify pool implementation to store buffer headers instead of indices. This removes the need for pointer <-> index conversion in buffer alloc/free functions.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h index a44ad94f5..42a2cf160 100644 --- a/platform/linux-generic/include/odp_pool_internal.h +++ b/platform/linux-generic/include/odp_pool_internal.h @@ -23,7 +23,7 @@ extern "C" {
#include <odp_buffer_internal.h> #include <odp_config_internal.h> -#include <odp_ring_u32_internal.h> +#include <odp_ring_ptr_internal.h> #include <odp/api/plat/strong_types.h>
typedef struct ODP_ALIGNED_CACHE pool_cache_t { @@ -38,10 +38,10 @@ typedef struct ODP_ALIGNED_CACHE pool_cache_t { /* Buffer header ring */ typedef struct ODP_ALIGNED_CACHE { /* Ring header */ - ring_u32_t hdr; + ring_ptr_t hdr;
/* Ring data: buffer handles */ - uint32_t buf[CONFIG_POOL_MAX_NUM + 1]; + odp_buffer_hdr_t *buf_hdr[CONFIG_POOL_MAX_NUM + 1];
} pool_ring_t;
diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c index 136f82c5e..6fc2e0d91 100644 --- a/platform/linux-generic/odp_pool.c +++ b/platform/linux-generic/odp_pool.c @@ -20,7 +20,7 @@ #include <odp_packet_dpdk.h> #include <odp_config_internal.h> #include <odp_debug_internal.h> -#include <odp_ring_u32_internal.h> +#include <odp_ring_ptr_internal.h> #include <odp_global_data.h> #include <odp_libconfig_internal.h> #include <odp_shm_internal.h> @@ -132,14 +132,14 @@ static inline void cache_push(pool_cache_t *cache, odp_buffer_hdr_t *buf_hdr[], static void cache_flush(pool_cache_t *cache, pool_t *pool) { odp_buffer_hdr_t *buf_hdr; - ring_u32_t *ring; + ring_ptr_t *ring; uint32_t mask;
ring = &pool->ring->hdr; mask = pool->ring_mask;
while (cache_pop(cache, &buf_hdr, 1)) - ring_u32_enq(ring, mask, buf_hdr->index.buffer); + ring_ptr_enq(ring, mask, buf_hdr); }
static int read_config_file(pool_table_t *pool_tbl) @@ -350,7 +350,7 @@ static void init_buffers(pool_t *pool) void *uarea = NULL; uint8_t *data; uint32_t offset; - ring_u32_t *ring; + ring_ptr_t *ring; uint32_t mask; int type; uint64_t page_size; @@ -429,9 +429,9 @@ static void init_buffers(pool_t *pool) buf_hdr->buf_end = &data[offset + pool->seg_len + pool->tailroom];
- /* Store buffer index into the global pool */ + /* Store buffer into the global pool */ if (!skip) - ring_u32_enq(ring, mask, i); + ring_ptr_enq(ring, mask, buf_hdr); } pool->skipped_blocks = skipped_blocks; } @@ -654,7 +654,7 @@ static odp_pool_t pool_create(const char *name, odp_pool_param_t *params, pool->uarea_base_addr = odp_shm_addr(pool->uarea_shm); }
- ring_u32_init(&pool->ring->hdr); + ring_ptr_init(&pool->ring->hdr); init_buffers(pool);
/* Create zero-copy DPDK memory pool. NOP if zero-copy is disabled. */ @@ -890,7 +890,7 @@ int odp_pool_info(odp_pool_t pool_hdl, odp_pool_info_t *info) int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int max_num) { pool_cache_t *cache = local.cache[pool->pool_idx]; - ring_u32_t *ring; + ring_ptr_t *ring; odp_buffer_hdr_t *hdr; uint32_t mask, num_ch, i; uint32_t num_deq = 0; @@ -908,13 +908,12 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int max_num) if (odp_unlikely(num_deq > burst_size)) burst = num_deq;
- uint32_t data[burst]; + odp_buffer_hdr_t *hdr_tmp[burst];
- /* Temporary copy to data[] needed since odp_buffer_t is - * uintptr_t and not uint32_t. */ ring = &pool->ring->hdr; mask = pool->ring_mask; - burst = ring_u32_deq_multi(ring, mask, data, burst); + burst = ring_ptr_deq_multi(ring, mask, (void **)hdr_tmp, + burst); cache_num = burst - num_deq;
if (odp_unlikely(burst < num_deq)) { @@ -925,23 +924,14 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int max_num) for (i = 0; i < num_deq; i++) { uint32_t idx = num_ch + i;
- hdr = buf_hdr_from_index(pool, data[i]); + hdr = hdr_tmp[i]; odp_prefetch(hdr); buf_hdr[idx] = hdr; }
/* Cache possible extra buffers. Cache is currently empty. */ - if (cache_num) { - odp_buffer_hdr_t *buf_hdr[cache_num]; - - for (i = 0; i < cache_num; i++) { - uint32_t idx = num_deq + i; - - buf_hdr[i] = buf_hdr_from_index(pool, - data[idx]); - } - cache_push(cache, buf_hdr, cache_num); - } + if (cache_num) + cache_push(cache, &hdr_tmp[num_deq], cache_num); }
return num_ch + num_deq; @@ -951,22 +941,17 @@ static inline void buffer_free_to_pool(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num) { pool_cache_t *cache = local.cache[pool->pool_idx]; - ring_u32_t *ring; - int i; + ring_ptr_t *ring; uint32_t cache_num, mask; uint32_t cache_size = cache->size;
/* Special case of a very large free. Move directly to * the global pool. */ if (odp_unlikely(num > (int)cache_size)) { - uint32_t buf_index[num]; - ring = &pool->ring->hdr; mask = pool->ring_mask; - for (i = 0; i < num; i++) - buf_index[i] = buf_hdr[i]->index.buffer;
- ring_u32_enq_multi(ring, mask, buf_index, num); + ring_ptr_enq_multi(ring, mask, (void **)buf_hdr, num);
return; } @@ -986,17 +971,11 @@ static inline void buffer_free_to_pool(pool_t *pool, if (odp_unlikely((uint32_t)num > cache_num)) burst = cache_num;
- /* Temporary copy needed since odp_buffer_t is - * uintptr_t and not uint32_t. */ odp_buffer_hdr_t *buf_hdr[burst]; - uint32_t data[burst];
cache_pop(cache, buf_hdr, burst);
- for (i = 0; i < burst; i++) - data[i] = buf_hdr[i]->index.buffer; - - ring_u32_enq_multi(ring, mask, data, burst); + ring_ptr_enq_multi(ring, mask, (void **)buf_hdr, burst); }
cache_push(cache, buf_hdr, num);
commit 0a5eff079dfa17ecb5bc67244eeb6983e1bcf296 Author: Matias Elo matias.elo@nokia.com Date: Wed Apr 10 15:31:23 2019 +0300
linux-gen: ring: enable storing pointers in ring data
Added new ring type ring_ptr_t, which uses the same implementation as old ring_t, but stores pointers instead of indices. The old ring_t type is renamed to ring_u32_t.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 08b6a671f..517ea258b 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -126,10 +126,13 @@ noinst_HEADERS = \ include/odp_queue_basic_internal.h \ include/odp_queue_lf.h \ include/odp_queue_scalable_internal.h \ + include/odp_ring_common.h \ include/odp_ring_internal.h \ include/odp_ring_mpmc_internal.h \ + include/odp_ring_ptr_internal.h \ include/odp_ring_spsc_internal.h \ include/odp_ring_st_internal.h \ + include/odp_ring_u32_internal.h \ include/odp_schedule_if.h \ include/odp_schedule_scalable_config.h \ include/odp_schedule_scalable.h \ diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h index 67d7e7b7b..a44ad94f5 100644 --- a/platform/linux-generic/include/odp_pool_internal.h +++ b/platform/linux-generic/include/odp_pool_internal.h @@ -23,7 +23,7 @@ extern "C" {
#include <odp_buffer_internal.h> #include <odp_config_internal.h> -#include <odp_ring_internal.h> +#include <odp_ring_u32_internal.h> #include <odp/api/plat/strong_types.h>
typedef struct ODP_ALIGNED_CACHE pool_cache_t { @@ -38,7 +38,7 @@ typedef struct ODP_ALIGNED_CACHE pool_cache_t { /* Buffer header ring */ typedef struct ODP_ALIGNED_CACHE { /* Ring header */ - ring_t hdr; + ring_u32_t hdr;
/* Ring data: buffer handles */ uint32_t buf[CONFIG_POOL_MAX_NUM + 1]; diff --git a/platform/linux-generic/include/odp_ring_common.h b/platform/linux-generic/include/odp_ring_common.h new file mode 100644 index 000000000..88e6bf880 --- /dev/null +++ b/platform/linux-generic/include/odp_ring_common.h @@ -0,0 +1,21 @@ +/* Copyright (c) 2019, Nokia + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_COMMON_H_ +#define ODP_RING_COMMON_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define _ODP_RING_TYPE_U32 1 +#define _ODP_RING_TYPE_PTR 2 + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h index ad2f37ef2..af6b3294e 100644 --- a/platform/linux-generic/include/odp_ring_internal.h +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -1,9 +1,13 @@ /* Copyright (c) 2016-2018, Linaro Limited + * Copyright (c) 2019, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause */
+/* This header should NOT be included directly. There are no include guards for + * the function definitions! */ + #ifndef ODP_RING_INTERNAL_H_ #define ODP_RING_INTERNAL_H_
@@ -17,8 +21,9 @@ extern "C" { #include <odp_align_internal.h> #include <odp/api/plat/atomic_inlines.h> #include <odp/api/plat/cpu_inlines.h> +#include <odp_ring_common.h>
-/* Ring of uint32_t data +/* Generic ring implementation * * Ring stores head and tail counters. Ring indexes are formed from these * counters with a mask (mask = ring_size - 1), which requires that ring size @@ -26,7 +31,8 @@ extern "C" { * number of data items that will be stored on it as write operations are * assumed to succeed eventually (after readers complete their current * operations). */ -typedef struct ODP_ALIGNED_CACHE { + +struct ring_common { /* Writer head and tail */ odp_atomic_u32_t w_head; odp_atomic_u32_t w_tail; @@ -35,9 +41,17 @@ typedef struct ODP_ALIGNED_CACHE { /* Reader head and tail */ odp_atomic_u32_t r_head; odp_atomic_u32_t r_tail; +};
+typedef struct ODP_ALIGNED_CACHE { + struct ring_common r; uint32_t data[0]; -} ring_t; +} ring_u32_t; + +typedef struct ODP_ALIGNED_CACHE { + struct ring_common r; + void *data[0]; +} ring_ptr_t;
/* 32-bit CAS with memory order selection */ static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val, @@ -49,35 +63,69 @@ static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val, mo_failure); }
+#endif /* End of include guards */ + +#undef _ring_gen_t +#undef _ring_data_t +#undef _RING_INIT +#undef _RING_DEQ +#undef _RING_DEQ_MULTI +#undef _RING_ENQ +#undef _RING_ENQ_MULTI + +/* Remap generic types and function names to ring data type specific ones. One + * should never use the generic names (e.g. _RING_INIT) directly. */ + +#if _ODP_RING_TYPE == _ODP_RING_TYPE_U32 + #define _ring_gen_t ring_u32_t + #define _ring_data_t uint32_t + + #define _RING_INIT ring_u32_init + #define _RING_DEQ ring_u32_deq + #define _RING_DEQ_MULTI ring_u32_deq_multi + #define _RING_ENQ ring_u32_enq + #define _RING_ENQ_MULTI ring_u32_enq_multi +#elif _ODP_RING_TYPE == _ODP_RING_TYPE_PTR + #define _ring_gen_t ring_ptr_t + #define _ring_data_t void * + + #define _RING_INIT ring_ptr_init + #define _RING_DEQ ring_ptr_deq + #define _RING_DEQ_MULTI ring_ptr_deq_multi + #define _RING_ENQ ring_ptr_enq + #define _RING_ENQ_MULTI ring_ptr_enq_multi +#endif + /* Initialize ring */ -static inline void ring_init(ring_t *ring) +static inline void _RING_INIT(_ring_gen_t *ring) { - odp_atomic_init_u32(&ring->w_head, 0); - odp_atomic_init_u32(&ring->w_tail, 0); - odp_atomic_init_u32(&ring->r_head, 0); - odp_atomic_init_u32(&ring->r_tail, 0); + odp_atomic_init_u32(&ring->r.w_head, 0); + odp_atomic_init_u32(&ring->r.w_tail, 0); + odp_atomic_init_u32(&ring->r.r_head, 0); + odp_atomic_init_u32(&ring->r.r_tail, 0); }
/* Dequeue data from the ring head */ -static inline uint32_t ring_deq(ring_t *ring, uint32_t mask, uint32_t *data) +static inline uint32_t _RING_DEQ(_ring_gen_t *ring, uint32_t mask, + _ring_data_t *data) { uint32_t head, tail, new_head;
/* Load/CAS acquire of r_head ensures that w_tail load happens after * r_head load, and thus head value is always behind or equal to tail * value. */ - head = odp_atomic_load_acq_u32(&ring->r_head); + head = odp_atomic_load_acq_u32(&ring->r.r_head);
/* Move reader head. This thread owns data at the new head. */ do { - tail = odp_atomic_load_acq_u32(&ring->w_tail); + tail = odp_atomic_load_acq_u32(&ring->r.w_tail);
if (head == tail) return 0;
new_head = head + 1;
- } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, + } while (odp_unlikely(cas_mo_u32(&ring->r.r_head, &head, new_head, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0));
@@ -85,29 +133,29 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask, uint32_t *data) *data = ring->data[new_head & mask];
/* Wait until other readers have updated the tail */ - while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != head)) odp_cpu_pause();
/* Update the tail. Writers acquire it. */ - odp_atomic_store_rel_u32(&ring->r_tail, new_head); + odp_atomic_store_rel_u32(&ring->r.r_tail, new_head);
return 1; }
/* Dequeue multiple data from the ring head. Num is smaller than ring size. */ -static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, - uint32_t data[], uint32_t num) +static inline uint32_t _RING_DEQ_MULTI(_ring_gen_t *ring, uint32_t mask, + _ring_data_t data[], uint32_t num) { uint32_t head, tail, new_head, i;
/* Load/CAS acquire of r_head ensures that w_tail load happens after * r_head load, and thus head value is always behind or equal to tail * value. */ - head = odp_atomic_load_acq_u32(&ring->r_head); + head = odp_atomic_load_acq_u32(&ring->r.r_head);
/* Move reader head. This thread owns data at the new head. */ do { - tail = odp_atomic_load_acq_u32(&ring->w_tail); + tail = odp_atomic_load_acq_u32(&ring->r.w_tail);
/* Ring is empty */ if (head == tail) @@ -119,7 +167,7 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
new_head = head + num;
- } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, + } while (odp_unlikely(cas_mo_u32(&ring->r.r_head, &head, new_head, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0));
@@ -128,29 +176,30 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, data[i] = ring->data[(head + 1 + i) & mask];
/* Wait until other readers have updated the tail */ - while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != head)) odp_cpu_pause();
/* Update the tail. Writers acquire it. */ - odp_atomic_store_rel_u32(&ring->r_tail, new_head); + odp_atomic_store_rel_u32(&ring->r.r_tail, new_head);
return num; }
/* Enqueue data into the ring tail */ -static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) +static inline void _RING_ENQ(_ring_gen_t *ring, uint32_t mask, + _ring_data_t data) { uint32_t old_head, new_head; uint32_t size = mask + 1;
/* Reserve a slot in the ring for writing */ - old_head = odp_atomic_fetch_inc_u32(&ring->w_head); + old_head = odp_atomic_fetch_inc_u32(&ring->r.w_head); new_head = old_head + 1;
/* Wait for the last reader to finish. This prevents overwrite when * a reader has been left behind (e.g. due to an interrupt) and is * still reading the same slot. */ - while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail) + while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r.r_tail) >= size)) odp_cpu_pause();
@@ -158,28 +207,28 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) ring->data[new_head & mask] = data;
/* Wait until other writers have updated the tail */ - while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->r.w_tail) != old_head)) odp_cpu_pause();
/* Release the new writer tail, readers acquire it. */ - odp_atomic_store_rel_u32(&ring->w_tail, new_head); + odp_atomic_store_rel_u32(&ring->r.w_tail, new_head); }
/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */ -static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[], - uint32_t num) +static inline void _RING_ENQ_MULTI(_ring_gen_t *ring, uint32_t mask, + _ring_data_t data[], uint32_t num) { uint32_t old_head, new_head, i; uint32_t size = mask + 1;
/* Reserve a slot in the ring for writing */ - old_head = odp_atomic_fetch_add_u32(&ring->w_head, num); + old_head = odp_atomic_fetch_add_u32(&ring->r.w_head, num); new_head = old_head + 1;
/* Wait for the last reader to finish. This prevents overwrite when * a reader has been left behind (e.g. due to an interrupt) and is * still reading these slots. */ - while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail) + while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r.r_tail) >= size)) odp_cpu_pause();
@@ -188,15 +237,13 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[], ring->data[(new_head + i) & mask] = data[i];
/* Wait until other writers have updated the tail */ - while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->r.w_tail) != old_head)) odp_cpu_pause();
/* Release the new writer tail, readers acquire it. */ - odp_atomic_store_rel_u32(&ring->w_tail, old_head + num); + odp_atomic_store_rel_u32(&ring->r.w_tail, old_head + num); }
#ifdef __cplusplus } #endif - -#endif diff --git a/platform/linux-generic/include/odp_ring_ptr_internal.h b/platform/linux-generic/include/odp_ring_ptr_internal.h new file mode 100644 index 000000000..13b2b2fbf --- /dev/null +++ b/platform/linux-generic/include/odp_ring_ptr_internal.h @@ -0,0 +1,25 @@ +/* Copyright (c) 2019, Nokia + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_PTR_INTERNAL_H_ +#define ODP_RING_PTR_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp_ring_common.h> + +#undef _ODP_RING_TYPE +#define _ODP_RING_TYPE _ODP_RING_TYPE_PTR + +#include <odp_ring_internal.h> + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/include/odp_ring_u32_internal.h b/platform/linux-generic/include/odp_ring_u32_internal.h new file mode 100644 index 000000000..baa02e4ca --- /dev/null +++ b/platform/linux-generic/include/odp_ring_u32_internal.h @@ -0,0 +1,25 @@ +/* Copyright (c) 2019, Nokia + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_U32_INTERNAL_H_ +#define ODP_RING_U32_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp_ring_common.h> + +#undef _ODP_RING_TYPE +#define _ODP_RING_TYPE _ODP_RING_TYPE_U32 + +#include <odp_ring_internal.h> + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c index 832991d32..136f82c5e 100644 --- a/platform/linux-generic/odp_pool.c +++ b/platform/linux-generic/odp_pool.c @@ -20,7 +20,7 @@ #include <odp_packet_dpdk.h> #include <odp_config_internal.h> #include <odp_debug_internal.h> -#include <odp_ring_internal.h> +#include <odp_ring_u32_internal.h> #include <odp_global_data.h> #include <odp_libconfig_internal.h> #include <odp_shm_internal.h> @@ -132,14 +132,14 @@ static inline void cache_push(pool_cache_t *cache, odp_buffer_hdr_t *buf_hdr[], static void cache_flush(pool_cache_t *cache, pool_t *pool) { odp_buffer_hdr_t *buf_hdr; - ring_t *ring; + ring_u32_t *ring; uint32_t mask;
ring = &pool->ring->hdr; mask = pool->ring_mask;
while (cache_pop(cache, &buf_hdr, 1)) - ring_enq(ring, mask, buf_hdr->index.buffer); + ring_u32_enq(ring, mask, buf_hdr->index.buffer); }
static int read_config_file(pool_table_t *pool_tbl) @@ -350,7 +350,7 @@ static void init_buffers(pool_t *pool) void *uarea = NULL; uint8_t *data; uint32_t offset; - ring_t *ring; + ring_u32_t *ring; uint32_t mask; int type; uint64_t page_size; @@ -431,7 +431,7 @@ static void init_buffers(pool_t *pool)
/* Store buffer index into the global pool */ if (!skip) - ring_enq(ring, mask, i); + ring_u32_enq(ring, mask, i); } pool->skipped_blocks = skipped_blocks; } @@ -654,7 +654,7 @@ static odp_pool_t pool_create(const char *name, odp_pool_param_t *params, pool->uarea_base_addr = odp_shm_addr(pool->uarea_shm); }
- ring_init(&pool->ring->hdr); + ring_u32_init(&pool->ring->hdr); init_buffers(pool);
/* Create zero-copy DPDK memory pool. NOP if zero-copy is disabled. */ @@ -890,7 +890,7 @@ int odp_pool_info(odp_pool_t pool_hdl, odp_pool_info_t *info) int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int max_num) { pool_cache_t *cache = local.cache[pool->pool_idx]; - ring_t *ring; + ring_u32_t *ring; odp_buffer_hdr_t *hdr; uint32_t mask, num_ch, i; uint32_t num_deq = 0; @@ -914,7 +914,7 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int max_num) * uintptr_t and not uint32_t. */ ring = &pool->ring->hdr; mask = pool->ring_mask; - burst = ring_deq_multi(ring, mask, data, burst); + burst = ring_u32_deq_multi(ring, mask, data, burst); cache_num = burst - num_deq;
if (odp_unlikely(burst < num_deq)) { @@ -951,7 +951,7 @@ static inline void buffer_free_to_pool(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num) { pool_cache_t *cache = local.cache[pool->pool_idx]; - ring_t *ring; + ring_u32_t *ring; int i; uint32_t cache_num, mask; uint32_t cache_size = cache->size; @@ -966,7 +966,7 @@ static inline void buffer_free_to_pool(pool_t *pool, for (i = 0; i < num; i++) buf_index[i] = buf_hdr[i]->index.buffer;
- ring_enq_multi(ring, mask, buf_index, num); + ring_u32_enq_multi(ring, mask, buf_index, num);
return; } @@ -996,7 +996,7 @@ static inline void buffer_free_to_pool(pool_t *pool, for (i = 0; i < burst; i++) data[i] = buf_hdr[i]->index.buffer;
- ring_enq_multi(ring, mask, data, burst); + ring_u32_enq_multi(ring, mask, data, burst); }
cache_push(cache, buf_hdr, num); diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index d20fd7356..4b054b86c 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -1,4 +1,5 @@ /* Copyright (c) 2013-2018, Linaro Limited + * Copyright (c) 2019, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -24,7 +25,7 @@ #include <odp_align_internal.h> #include <odp/api/sync.h> #include <odp/api/packet_io.h> -#include <odp_ring_internal.h> +#include <odp_ring_u32_internal.h> #include <odp_timer_internal.h> #include <odp_queue_basic_internal.h> #include <odp_libconfig_internal.h> @@ -125,7 +126,7 @@ typedef struct ODP_ALIGNED_CACHE { uint16_t ev_index; uint32_t qi; odp_queue_t queue; - ring_t *ring; + ring_u32_t *ring; odp_event_t ev[STASH_SIZE]; } stash;
@@ -151,7 +152,7 @@ typedef struct ODP_ALIGNED_CACHE { /* Priority queue */ typedef struct ODP_ALIGNED_CACHE { /* Ring header */ - ring_t ring; + ring_u32_t ring;
/* Ring data: queue indexes */ uint32_t queue_index[MAX_RING_SIZE]; @@ -423,7 +424,7 @@ static int schedule_init_global(void) prio_queue_t *prio_q;
prio_q = &sched->prio_q[grp][i][j]; - ring_init(&prio_q->ring); + ring_u32_init(&prio_q->ring); } } } @@ -461,10 +462,12 @@ static int schedule_term_global(void) for (grp = 0; grp < NUM_SCHED_GRPS; grp++) { for (i = 0; i < NUM_PRIO; i++) { for (j = 0; j < MAX_SPREAD; j++) { - ring_t *ring = &sched->prio_q[grp][i][j].ring; + ring_u32_t *ring; uint32_t qi;
- while (ring_deq(ring, ring_mask, &qi)) { + ring = &sched->prio_q[grp][i][j].ring; + + while (ring_u32_deq(ring, ring_mask, &qi)) { odp_event_t events[1]; int num;
@@ -651,9 +654,9 @@ static int schedule_sched_queue(uint32_t queue_index) int grp = sched->queue[queue_index].grp; int prio = sched->queue[queue_index].prio; int spread = sched->queue[queue_index].spread; - ring_t *ring = &sched->prio_q[grp][prio][spread].ring; + ring_u32_t *ring = &sched->prio_q[grp][prio][spread].ring;
- ring_enq(ring, sched->ring_mask, queue_index); + ring_u32_enq(ring, sched->ring_mask, queue_index); return 0; }
@@ -682,10 +685,10 @@ static void schedule_pktio_start(int pktio_index, int num_pktin, static inline void release_atomic(void) { uint32_t qi = sched_local.stash.qi; - ring_t *ring = sched_local.stash.ring; + ring_u32_t *ring = sched_local.stash.ring;
/* Release current atomic queue */ - ring_enq(ring, sched->ring_mask, qi); + ring_u32_enq(ring, sched->ring_mask, qi);
/* We don't hold sync context anymore */ sched_local.sync_ctx = NO_SYNC_CONTEXT; @@ -980,7 +983,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], int num; uint8_t sync_ctx, ordered; odp_queue_t handle; - ring_t *ring; + ring_u32_t *ring; int pktin; uint16_t max_deq = burst_def; int stashed = 1; @@ -1000,7 +1003,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], /* Get queue index from the priority queue */ ring = &sched->prio_q[grp][prio][id].ring;
- if (ring_deq(ring, ring_mask, &qi) == 0) { + if (ring_u32_deq(ring, ring_mask, &qi) == 0) { /* Priority queue empty */ i++; id++; @@ -1053,7 +1056,8 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], continue;
if (num_pkt == 0 || !direct_recv) { - ring_enq(ring, ring_mask, qi); + ring_u32_enq(ring, ring_mask, + qi); break; }
@@ -1079,7 +1083,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.ordered.src_queue = qi;
/* Continue scheduling ordered queues */ - ring_enq(ring, ring_mask, qi); + ring_u32_enq(ring, ring_mask, qi); sched_local.sync_ctx = sync_ctx;
} else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) { @@ -1089,7 +1093,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.sync_ctx = sync_ctx; } else { /* Continue scheduling the queue */ - ring_enq(ring, ring_mask, qi); + ring_u32_enq(ring, ring_mask, qi); }
handle = queue_from_index(qi); diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index e7b378950..1c52cc18d 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -1,4 +1,5 @@ /* Copyright (c) 2016-2018, Linaro Limited + * Copyright (c) 2019, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -18,7 +19,7 @@ #include <odp_debug_internal.h> #include <odp_align_internal.h> #include <odp_config_internal.h> -#include <odp_ring_internal.h> +#include <odp_ring_u32_internal.h> #include <odp_timer_internal.h> #include <odp_queue_basic_internal.h>
@@ -74,7 +75,7 @@ typedef struct ODP_ALIGNED_CACHE sched_cmd_t {
typedef struct ODP_ALIGNED_CACHE { /* Ring header */ - ring_t ring; + ring_u32_t ring;
/* Ring data: queue indexes */ uint32_t ring_idx[RING_SIZE]; @@ -189,7 +190,7 @@ static int init_global(void)
for (i = 0; i < NUM_GROUP; i++) for (j = 0; j < NUM_PRIO; j++) - ring_init(&sched_global->prio_queue[i][j].ring); + ring_u32_init(&sched_global->prio_queue[i][j].ring);
sched_group = &sched_global->sched_group; odp_ticketlock_init(&sched_group->s.lock); @@ -411,7 +412,7 @@ static inline void add_tail(sched_cmd_t *cmd) uint32_t idx = cmd->s.ring_idx;
prio_queue = &sched_global->prio_queue[group][prio]; - ring_enq(&prio_queue->ring, RING_MASK, idx); + ring_u32_enq(&prio_queue->ring, RING_MASK, idx); }
static inline sched_cmd_t *rem_head(int group, int prio) @@ -422,7 +423,7 @@ static inline sched_cmd_t *rem_head(int group, int prio)
prio_queue = &sched_global->prio_queue[group][prio];
- if (ring_deq(&prio_queue->ring, RING_MASK, &ring_idx) == 0) + if (ring_u32_deq(&prio_queue->ring, RING_MASK, &ring_idx) == 0) return NULL;
pktio = index_from_ring_idx(&index, ring_idx);
-----------------------------------------------------------------------
Summary of changes: platform/linux-generic/Makefile.am | 3 + platform/linux-generic/include/odp_pool_internal.h | 6 +- .../linux-generic/include/odp_ring_common.h | 9 +- platform/linux-generic/include/odp_ring_internal.h | 115 +++++++++++++++------ .../linux-generic/include/odp_ring_ptr_internal.h | 25 +++++ .../linux-generic/include/odp_ring_u32_internal.h | 25 +++++ platform/linux-generic/odp_pool.c | 55 +++------- platform/linux-generic/odp_schedule_basic.c | 34 +++--- platform/linux-generic/odp_schedule_sp.c | 11 +- 9 files changed, 184 insertions(+), 99 deletions(-) copy include/odp/arch/x86_32-linux/odp/api/abi/cpu.h => platform/linux-generic/include/odp_ring_common.h (52%) create mode 100644 platform/linux-generic/include/odp_ring_ptr_internal.h create mode 100644 platform/linux-generic/include/odp_ring_u32_internal.h
hooks/post-receive