This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, api-next has been updated via 7a5813042d58598e1c66243d8cfed548302edfc4 (commit) via 759a54bf66317fee539f2b2649c283be937d832a (commit) via 9fd48a9215a7831ca951839b7187bd1eb3f7bb06 (commit) via bf554b2d9472ed8ed04580904d0aa906cdbe1e83 (commit) via ac89f9c568830498b186dff33e908053dbdc88c7 (commit) via 5b08cc290a4a84ed0ffbc4a591427f1891b9ace3 (commit) from 5551c0d76bd721110a0fa4ba1423fe1edb0fd530 (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 7a5813042d58598e1c66243d8cfed548302edfc4 Author: Honnappa Nagarahalli honnappa.nagarahalli@arm.com Date: Fri Jun 23 16:04:41 2017 -0500
travis: add scalable scheduler in CI
Added running tests with scalable scheduler to CI
Signed-off-by: Honnappa Nagarahalli honnappa.nagarahalli@arm.com Reviewed-by: Brian Brooks brian.brooks@arm.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/.travis.yml b/.travis.yml index f74c564f..7304e0ef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -62,6 +62,7 @@ env: - CONF="--disable-abi-compat" - CONF="--enable-schedule-sp" - CONF="--enable-schedule-iquery" + - CONF="--enable-schedule-scalable"
install: - echo 1000 | sudo tee /proc/sys/vm/nr_hugepages
commit 759a54bf66317fee539f2b2649c283be937d832a Author: Brian Brooks brian.brooks@arm.com Date: Fri Jun 23 16:04:40 2017 -0500
linux-gen: sched scalable: add scalable scheduler
Signed-off-by: Brian Brooks brian.brooks@arm.com Signed-off-by: Kevin Wang kevin.wang@arm.com Signed-off-by: Honnappa Nagarahalli honnappa.nagarahalli@arm.com Signed-off-by: Ola Liljedahl ola.liljedahl@arm.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 19e2241b..82ab4642 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -185,9 +185,13 @@ noinst_HEADERS = \ ${srcdir}/include/odp_pool_internal.h \ ${srcdir}/include/odp_posix_extensions.h \ ${srcdir}/include/odp_queue_internal.h \ + ${srcdir}/include/odp_queue_scalable_internal.h \ ${srcdir}/include/odp_ring_internal.h \ ${srcdir}/include/odp_queue_if.h \ ${srcdir}/include/odp_schedule_if.h \ + ${srcdir}/include/odp_schedule_scalable.h \ + ${srcdir}/include/odp_schedule_scalable_config.h \ + ${srcdir}/include/odp_schedule_scalable_ordered.h \ ${srcdir}/include/odp_sorted_list_internal.h \ ${srcdir}/include/odp_shm_internal.h \ ${srcdir}/include/odp_time_internal.h \ @@ -259,12 +263,15 @@ __LIB__libodp_linux_la_SOURCES = \ odp_pool.c \ odp_queue.c \ odp_queue_if.c \ + odp_queue_scalable.c \ odp_rwlock.c \ odp_rwlock_recursive.c \ odp_schedule.c \ odp_schedule_if.c \ odp_schedule_sp.c \ odp_schedule_iquery.c \ + odp_schedule_scalable.c \ + odp_schedule_scalable_ordered.c \ odp_shared_memory.c \ odp_sorted_list.c \ odp_spinlock.c \ diff --git a/platform/linux-generic/include/odp/api/plat/schedule_types.h b/platform/linux-generic/include/odp/api/plat/schedule_types.h index 535fd6d0..4e75f9ee 100644 --- a/platform/linux-generic/include/odp/api/plat/schedule_types.h +++ b/platform/linux-generic/include/odp/api/plat/schedule_types.h @@ -18,6 +18,8 @@ extern "C" { #endif
+#include <odp/api/std_types.h> + /** @addtogroup odp_scheduler * @{ */ @@ -44,7 +46,7 @@ typedef int odp_schedule_sync_t; typedef int odp_schedule_group_t;
/* These must be kept in sync with thread_globals_t in odp_thread.c */ -#define ODP_SCHED_GROUP_INVALID -1 +#define ODP_SCHED_GROUP_INVALID ((odp_schedule_group_t)-1) #define ODP_SCHED_GROUP_ALL 0 #define ODP_SCHED_GROUP_WORKER 1 #define ODP_SCHED_GROUP_CONTROL 2 diff --git a/platform/linux-generic/include/odp_config_internal.h b/platform/linux-generic/include/odp_config_internal.h index dadd59e7..3cff0045 100644 --- a/platform/linux-generic/include/odp_config_internal.h +++ b/platform/linux-generic/include/odp_config_internal.h @@ -7,10 +7,6 @@ #ifndef ODP_CONFIG_INTERNAL_H_ #define ODP_CONFIG_INTERNAL_H_
-#ifdef __cplusplus -extern "C" { -#endif - /* * Maximum number of pools */ @@ -22,6 +18,13 @@ extern "C" { #define ODP_CONFIG_QUEUES 1024
/* + * Maximum queue depth. Maximum number of elements that can be stored in a + * queue. This value is used only when the size is not explicitly provided + * during queue creation. + */ +#define CONFIG_QUEUE_SIZE 4096 + +/* * Maximum number of ordered locks per queue */ #define CONFIG_QUEUE_MAX_ORD_LOCKS 4 @@ -139,8 +142,4 @@ extern "C" { */ #define CONFIG_POOL_CACHE_SIZE 256
-#ifdef __cplusplus -} -#endif - #endif diff --git a/platform/linux-generic/include/odp_queue_scalable_internal.h b/platform/linux-generic/include/odp_queue_scalable_internal.h new file mode 100644 index 00000000..f15314b2 --- /dev/null +++ b/platform/linux-generic/include/odp_queue_scalable_internal.h @@ -0,0 +1,104 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_QUEUE_SCALABLE_INTERNAL_H_ +#define ODP_QUEUE_SCALABLE_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp/api/queue.h> +#include <odp_forward_typedefs_internal.h> +#include <odp_queue_if.h> +#include <odp_buffer_internal.h> +#include <odp_align_internal.h> +#include <odp/api/packet_io.h> +#include <odp/api/align.h> +#include <odp/api/hints.h> +#include <odp/api/ticketlock.h> +#include <odp_config_internal.h> +#include <odp_schedule_scalable.h> +#include <odp_schedule_scalable_ordered.h> + +#define QUEUE_STATUS_FREE 0 +#define QUEUE_STATUS_DESTROYED 1 +#define QUEUE_STATUS_READY 2 + +struct queue_entry_s { + sched_elem_t sched_elem; + + odp_ticketlock_t lock ODP_ALIGNED_CACHE; + int status; + + queue_enq_fn_t enqueue ODP_ALIGNED_CACHE; + queue_deq_fn_t dequeue; + queue_enq_multi_fn_t enqueue_multi; + queue_deq_multi_fn_t dequeue_multi; + + uint32_t index; + odp_queue_t handle; + odp_queue_type_t type; + odp_queue_param_t param; + odp_pktin_queue_t pktin; + odp_pktout_queue_t pktout; + char name[ODP_QUEUE_NAME_LEN]; +}; + +union queue_entry_u { + struct queue_entry_s s; + uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))]; +}; + +int _odp_queue_deq(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num); +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num); +int _odp_queue_deq_mc(sched_elem_t *q, odp_event_t *evp, int num); + +/* Round up memory size to next cache line size to + * align all memory addresses on cache line boundary. + */ +static inline void *shm_pool_alloc_align(_odp_ishm_pool_t *pool, uint32_t size) +{ + void *addr; + + addr = _odp_ishm_pool_alloc(pool, ROUNDUP_CACHE_LINE(size)); + ODP_ASSERT(((uintptr_t)addr & (ODP_CACHE_LINE_SIZE - 1)) == 0); + + return addr; +} + +static inline uint32_t queue_to_id(odp_queue_t handle) +{ + return _odp_typeval(handle) - 1; +} + +static inline queue_entry_t *qentry_from_int(queue_t handle) +{ + return (queue_entry_t *)(void *)(handle); +} + +static inline queue_t qentry_to_int(queue_entry_t *qentry) +{ + return (queue_t)(qentry); +} + +static inline odp_queue_t queue_get_handle(queue_entry_t *queue) +{ + return queue->s.handle; +} + +static inline reorder_window_t *queue_get_rwin(queue_entry_t *queue) +{ + return queue->s.sched_elem.rwin; +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 5d10cd37..5877a1cd 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -12,7 +12,7 @@ extern "C" { #endif
#include <odp/api/queue.h> -#include <odp_queue_internal.h> +#include <odp_queue_if.h> #include <odp/api/schedule.h>
typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue, diff --git a/platform/linux-generic/include/odp_schedule_scalable.h b/platform/linux-generic/include/odp_schedule_scalable.h new file mode 100644 index 00000000..8a2d70da --- /dev/null +++ b/platform/linux-generic/include/odp_schedule_scalable.h @@ -0,0 +1,139 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_SCHEDULE_SCALABLE_H +#define ODP_SCHEDULE_SCALABLE_H + +#include <odp/api/align.h> +#include <odp/api/schedule.h> +#include <odp/api/ticketlock.h> + +#include <odp_schedule_scalable_config.h> +#include <odp_schedule_scalable_ordered.h> +#include <odp_llqueue.h> + +/* + * ODP_SCHED_PRIO_HIGHEST/NORMAL/LOWEST/DEFAULT are compile time + * constants, but not ODP_SCHED_PRIO_NUM. The current API for this + * is odp_schedule_num_prio(). The other schedulers also define + * this internally as NUM_PRIO. + */ +#define ODP_SCHED_PRIO_NUM 8 + +typedef struct { + union { + struct { + struct llqueue llq; + uint32_t prio; + }; + char line[ODP_CACHE_LINE_SIZE]; + }; +} sched_queue_t ODP_ALIGNED_CACHE; + +#define TICKET_INVALID (uint16_t)(~0U) + +typedef struct { + int32_t numevts; + uint16_t wrr_budget; + uint8_t cur_ticket; + uint8_t nxt_ticket; +} qschedstate_t ODP_ALIGNED(sizeof(uint64_t)); + +typedef uint32_t ringidx_t; + +#ifdef CONFIG_SPLIT_PRODCONS +#define SPLIT_PC ODP_ALIGNED_CACHE +#else +#define SPLIT_PC +#endif + +#define ODP_NO_SCHED_QUEUE (ODP_SCHED_SYNC_ORDERED + 1) + +typedef struct { + struct llnode node; /* must be first */ + sched_queue_t *schedq; +#ifdef CONFIG_QSCHST_LOCK + odp_ticketlock_t qschlock; +#endif + qschedstate_t qschst; + uint16_t pop_deficit; + uint16_t qschst_type; + ringidx_t prod_read SPLIT_PC; + ringidx_t prod_write; + ringidx_t prod_mask; + odp_buffer_hdr_t **prod_ring; + ringidx_t cons_write SPLIT_PC; + ringidx_t cons_read; + reorder_window_t *rwin; + void *user_ctx; +#ifdef CONFIG_SPLIT_PRODCONS + odp_buffer_hdr_t **cons_ring; + ringidx_t cons_mask; + uint16_t cons_type; +#else +#define cons_mask prod_mask +#define cons_ring prod_ring +#define cons_type qschst_type +#endif +} sched_elem_t ODP_ALIGNED_CACHE; + +/* Number of scheduling groups */ +#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT) + +typedef bitset_t sched_group_mask_t; + +typedef struct { + /* Threads currently associated with the sched group */ + bitset_t thr_actual[ODP_SCHED_PRIO_NUM] ODP_ALIGNED_CACHE; + bitset_t thr_wanted; + /* Used to spread queues over schedq's */ + uint32_t xcount[ODP_SCHED_PRIO_NUM]; + /* Number of schedq's per prio */ + uint32_t xfactor; + char name[ODP_SCHED_GROUP_NAME_LEN]; + /* ODP_SCHED_PRIO_NUM * xfactor. Must be last. */ + sched_queue_t schedq[1] ODP_ALIGNED_CACHE; +} sched_group_t; + +/* Number of reorder contexts per thread */ +#define TS_RVEC_SIZE 16 + +typedef struct { + /* Atomic queue currently being processed or NULL */ + sched_elem_t *atomq; + /* Current reorder context or NULL */ + reorder_context_t *rctx; + uint8_t pause; + uint8_t out_of_order; + uint8_t tidx; + uint8_t pad; + uint32_t dequeued; /* Number of events dequeued from atomic queue */ + uint16_t pktin_next; /* Next pktin tag to poll */ + uint16_t pktin_poll_cnts; + uint16_t ticket; /* Ticket for atomic queue or TICKET_INVALID */ + uint16_t num_schedq; + uint16_t sg_sem; /* Set when sg_wanted is modified by other thread */ +#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * ODP_SCHED_PRIO_NUM) + sched_queue_t *schedq_list[SCHEDQ_PER_THREAD]; + /* Current sched_group membership */ + sched_group_mask_t sg_actual[ODP_SCHED_PRIO_NUM]; + /* Future sched_group membership. */ + sched_group_mask_t sg_wanted[ODP_SCHED_PRIO_NUM]; + bitset_t priv_rvec_free; + /* Bitset of free entries in rvec[] */ + bitset_t rvec_free ODP_ALIGNED_CACHE; + /* Reordering contexts to allocate from */ + reorder_context_t rvec[TS_RVEC_SIZE] ODP_ALIGNED_CACHE; +} sched_scalable_thread_state_t ODP_ALIGNED_CACHE; + +void sched_update_enq(sched_elem_t *q, uint32_t actual); +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual); +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio); +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio); + +#endif /* ODP_SCHEDULE_SCALABLE_H */ diff --git a/platform/linux-generic/include/odp_schedule_scalable_config.h b/platform/linux-generic/include/odp_schedule_scalable_config.h new file mode 100644 index 00000000..b9a9a55f --- /dev/null +++ b/platform/linux-generic/include/odp_schedule_scalable_config.h @@ -0,0 +1,52 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_SCHEDULE_SCALABLE_CONFIG_H_ +#define ODP_SCHEDULE_SCALABLE_CONFIG_H_ + +/* + * Default scaling factor for the scheduler group + * + * This scaling factor is used when the application creates a scheduler + * group with no worker threads. + */ +#define CONFIG_DEFAULT_XFACTOR 4 + +/* + * Default weight (in events) for WRR in scalable scheduler + * + * This controls the per-queue weight for WRR between queues of the same + * priority in the scalable scheduler + * A higher value improves throughput while a lower value increases fairness + * and thus likely decreases latency + * + * If WRR is undesired, set the value to ~0 which will use the largest possible + * weight + * + * Note: an API for specifying this on a per-queue basis would be useful but is + * not yet available + */ +#define CONFIG_WRR_WEIGHT 64 + +/* + * Split queue producer/consumer metadata into separate cache lines. + * This is beneficial on e.g. Cortex-A57 but not so much on A53. + */ +#define CONFIG_SPLIT_PRODCONS + +/* + * Use locks to protect queue (ring buffer) and scheduler state updates + * On x86, this decreases overhead noticeably. + */ +#if !defined(__arm__) && !defined(__aarch64__) +#define CONFIG_QSCHST_LOCK +/* Keep all ring buffer/qschst data together when using locks */ +#undef CONFIG_SPLIT_PRODCONS +#endif + +#endif /* ODP_SCHEDULE_SCALABLE_CONFIG_H_ */ diff --git a/platform/linux-generic/include/odp_schedule_scalable_ordered.h b/platform/linux-generic/include/odp_schedule_scalable_ordered.h new file mode 100644 index 00000000..941304b7 --- /dev/null +++ b/platform/linux-generic/include/odp_schedule_scalable_ordered.h @@ -0,0 +1,132 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_SCHEDULE_SCALABLE_ORDERED_H +#define ODP_SCHEDULE_SCALABLE_ORDERED_H + +#include <odp/api/shared_memory.h> + +#include <odp_internal.h> +#include <odp_align_internal.h> +#include <odp_bitset.h> +#include <_ishmpool_internal.h> + +/* High level functioning of reordering + * Datastructures - + * Reorder Window - Every ordered queue is associated with a reorder window. + * Reorder window stores reorder contexts from threads that + * have completed processing out-of-order. + * Reorder Context - Reorder context consists of events that a thread + * wants to enqueue while processing a batch of events + * from an ordered queue. + * + * Algorithm - + * 1) Thread identifies the ordered queue. + * 2) It 'reserves a slot in the reorder window and dequeues the + * events' atomically. Atomicity is achieved by using a ticket-lock + * like design where the reorder window slot is the ticket. + * 3a) Upon order-release/next schedule call, the thread + * checks if it's slot (ticket) equals the head of the reorder window. + * If yes, enqueues the events to the destination queue till + * i) the reorder window is empty or + * ii) there is a gap in the reorder window + * If no, the reorder context is stored in the reorder window at + * the reserved slot. + * 3b) Upon the first enqueue, the thread checks if it's slot (ticket) + * equals the head of the reorder window. + * If yes, enqueues the events immediately to the destination queue + * If no, these (and subsequent) events are stored in the reorder context + * (in the application given order) + */ + +/* Head and change indicator variables are used to synchronise between + * concurrent insert operations in the reorder window. A thread performing + * an in-order insertion must be notified about the newly inserted + * reorder contexts so that it doesn’t halt the retire process too early. + * A thread performing an out-of-order insertion must correspondingly + * notify the thread doing in-order insertion of the new waiting reorder + * context, which may need to be handled by that thread. + * + * Also, an out-of-order insertion may become an in-order insertion if the + * thread doing an in-order insertion completes before this thread completes. + * We need a point of synchronisation where this knowledge and potential state + * change can be transferred between threads. + */ +typedef struct hc { + /* First missing context */ + uint32_t head; + /* Change indicator */ + uint32_t chgi; +} hc_t ODP_ALIGNED(sizeof(uint64_t)); + +/* Number of reorder contects in the reorder window. + * Should be at least one per CPU. + */ +#define RWIN_SIZE 32 +ODP_STATIC_ASSERT(CHECK_IS_POWER2(RWIN_SIZE), "RWIN_SIZE is not a power of 2"); + +typedef struct reorder_context reorder_context_t; + +typedef struct reorder_window { + /* head and change indicator */ + hc_t hc; + uint32_t winmask; + uint32_t tail; + uint32_t turn; + uint32_t olock[CONFIG_QUEUE_MAX_ORD_LOCKS]; + uint16_t lock_count; + /* Reorder contexts in this window */ + reorder_context_t *ring[RWIN_SIZE]; +} reorder_window_t; + +/* Number of events that can be stored in a reorder context. + * This size is chosen so that there is no space left unused at the end + * of the last cache line (for 64b architectures and 64b handles). + */ +#define RC_EVT_SIZE 18 + +struct reorder_context { + /* Reorder window to which this context belongs */ + reorder_window_t *rwin; + /* Pointer to TS->rvec_free */ + bitset_t *rvec_free; + /* Our slot number in the reorder window */ + uint32_t sn; + uint8_t olock_flags; + /* Our index in thread_state rvec array */ + uint8_t idx; + /* Use to link reorder contexts together */ + uint8_t next_idx; + /* Current reorder context to save events in */ + uint8_t cur_idx; + /* Number of events stored in this reorder context */ + uint8_t numevts; + /* Events stored in this context */ + odp_buffer_hdr_t *events[RC_EVT_SIZE]; + queue_entry_t *destq[RC_EVT_SIZE]; +} ODP_ALIGNED_CACHE; + +reorder_window_t *rwin_alloc(_odp_ishm_pool_t *pool, + unsigned lock_count); +int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin); +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn); +void rwin_insert(reorder_window_t *rwin, + reorder_context_t *rctx, + uint32_t sn, + void (*callback)(reorder_context_t *)); +void rctx_init(reorder_context_t *rctx, uint16_t idx, + reorder_window_t *rwin, uint32_t sn); +void rctx_free(const reorder_context_t *rctx); +void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin, + uint32_t lock_index); +void olock_release(const reorder_context_t *rctx); +void rctx_retire(reorder_context_t *first); +void rctx_release(reorder_context_t *rctx); +int rctx_save(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); + +#endif /* ODP_SCHEDULE_SCALABLE_ORDERED_H */ diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4 index 91c19f21..d862b8b2 100644 --- a/platform/linux-generic/m4/odp_schedule.m4 +++ b/platform/linux-generic/m4/odp_schedule.m4 @@ -1,13 +1,44 @@ -AC_ARG_ENABLE([schedule-sp], - [ --enable-schedule-sp enable strict priority scheduler], - [if test x$enableval = xyes; then - schedule_sp_enabled=yes - ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP" - fi]) +# Checks for --enable-schedule-sp and defines ODP_SCHEDULE_SP and adds +# -DODP_SCHEDULE_SP to CFLAGS. +AC_ARG_ENABLE( + [schedule_sp], + [AC_HELP_STRING([--enable-schedule-sp], + [enable strict priority scheduler])], + [if test "x$enableval" = xyes; then + schedule_sp=true + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP" + else + schedule_sp=false + fi], + [schedule_sp=false]) +AM_CONDITIONAL([ODP_SCHEDULE_SP], [test x$schedule_sp = xtrue])
-AC_ARG_ENABLE([schedule-iquery], - [ --enable-schedule-iquery enable interests query (sparse bitmap) scheduler], - [if test x$enableval = xyes; then - schedule_iquery_enabled=yes - ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY" - fi]) +# Checks for --enable-schedule-iquery and defines ODP_SCHEDULE_IQUERY and adds +# -DODP_SCHEDULE_IQUERY to CFLAGS. +AC_ARG_ENABLE( + [schedule_iquery], + [AC_HELP_STRING([--enable-schedule-iquery], + [enable interests query (sparse bitmap) scheduler])], + [if test "x$enableval" = xyes; then + schedule_iquery=true + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY" + else + schedule_iquery=false + fi], + [schedule_iquery=false]) +AM_CONDITIONAL([ODP_SCHEDULE_IQUERY], [test x$schedule_iquery = xtrue]) + +# Checks for --enable-schedule-scalable and defines ODP_SCHEDULE_SCALABLE and +# adds -DODP_SCHEDULE_SCALABLE to CFLAGS. +AC_ARG_ENABLE( + [schedule_scalable], + [AC_HELP_STRING([--enable-schedule-scalable], + [enable scalable scheduler])], + [if test "x$enableval" = xyes; then + schedule_scalable=true + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SCALABLE" + else + schedule_scalable=false + fi], + [schedule_scalable=false]) +AM_CONDITIONAL([ODP_SCHEDULE_SCALABLE], [test x$schedule_scalable = xtrue]) diff --git a/platform/linux-generic/odp_queue_if.c b/platform/linux-generic/odp_queue_if.c index c91f00eb..d7471dfc 100644 --- a/platform/linux-generic/odp_queue_if.c +++ b/platform/linux-generic/odp_queue_if.c @@ -6,11 +6,19 @@
#include <odp_queue_if.h>
+extern const queue_api_t queue_scalable_api; +extern const queue_fn_t queue_scalable_fn; + extern const queue_api_t queue_default_api; extern const queue_fn_t queue_default_fn;
+#ifdef ODP_SCHEDULE_SCALABLE +const queue_api_t *queue_api = &queue_scalable_api; +const queue_fn_t *queue_fn = &queue_scalable_fn; +#else const queue_api_t *queue_api = &queue_default_api; const queue_fn_t *queue_fn = &queue_default_fn; +#endif
odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param) { diff --git a/platform/linux-generic/odp_queue_scalable.c b/platform/linux-generic/odp_queue_scalable.c new file mode 100644 index 00000000..f95f5f93 --- /dev/null +++ b/platform/linux-generic/odp_queue_scalable.c @@ -0,0 +1,1022 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <odp/api/hints.h> +#include <odp/api/plat/ticketlock_inlines.h> +#include <odp/api/queue.h> +#include <odp/api/schedule.h> +#include <odp/api/shared_memory.h> +#include <odp/api/sync.h> +#include <odp/api/traffic_mngr.h> + +#include <odp_internal.h> +#include <odp_config_internal.h> +#include <odp_debug_internal.h> + +#include <odp_buffer_inlines.h> +#include <odp_packet_io_internal.h> +#include <odp_packet_io_queue.h> +#include <odp_pool_internal.h> +#include <odp_queue_scalable_internal.h> +#include <odp_schedule_if.h> +#include <_ishm_internal.h> +#include <_ishmpool_internal.h> + +#include <string.h> +#include <inttypes.h> + +#define NUM_INTERNAL_QUEUES 64 + +#define MIN(a, b) \ + ({ \ + __typeof__(a) tmp_a = (a); \ + __typeof__(b) tmp_b = (b); \ + tmp_a < tmp_b ? tmp_a : tmp_b; \ + }) + +#define LOCK(a) _odp_ticketlock_lock(a) +#define UNLOCK(a) _odp_ticketlock_unlock(a) +#define LOCK_INIT(a) odp_ticketlock_init(a) + +extern __thread sched_scalable_thread_state_t *sched_ts; + +typedef struct queue_table_t { + queue_entry_t queue[ODP_CONFIG_QUEUES]; +} queue_table_t; + +static queue_table_t *queue_tbl; +_odp_ishm_pool_t *queue_shm_pool; + +static inline odp_queue_t queue_from_id(uint32_t queue_id) +{ + return _odp_cast_scalar(odp_queue_t, queue_id + 1); +} + +static queue_t queue_from_ext(odp_queue_t handle); +static int _queue_enq(queue_t handle, odp_buffer_hdr_t *buf_hdr); +static odp_buffer_hdr_t *_queue_deq(queue_t handle); +static int _queue_enq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num); +static int _queue_deq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num); + +static queue_entry_t *get_qentry(uint32_t queue_id) +{ + return &queue_tbl->queue[queue_id]; +} + +static int _odp_queue_disable_enq(sched_elem_t *q) +{ + ringidx_t old_read, old_write, new_write; + uint32_t size; + + old_write = q->prod_write; + size = q->prod_mask + 1; + do { + /* Need __atomic_load to avoid compiler reordering */ + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE); + if (old_write != old_read) { + /* Queue is not empty, cannot claim all elements + * Cannot disable enqueue. + */ + return -1; + } + /* Claim all elements in ring */ + new_write = old_write + size; + } while (!__atomic_compare_exchange_n(&q->prod_write, + &old_write, /* Updated on failure */ + new_write, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + /* All remaining elements claimed, no one else can enqueue */ + return 0; +} + +static int queue_init(queue_entry_t *queue, const char *name, + const odp_queue_param_t *param) +{ + ringidx_t ring_idx; + sched_elem_t *sched_elem; + uint32_t ring_size; + odp_buffer_hdr_t **ring; + uint32_t size; + + sched_elem = &queue->s.sched_elem; + ring_size = param->size > 0 ? + ROUNDUP_POWER2_U32(param->size) : CONFIG_QUEUE_SIZE; + strncpy(queue->s.name, name ? name : "", ODP_QUEUE_NAME_LEN - 1); + queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0; + memcpy(&queue->s.param, param, sizeof(odp_queue_param_t)); + + size = ring_size * sizeof(odp_buffer_hdr_t *); + ring = (odp_buffer_hdr_t **)shm_pool_alloc_align(queue_shm_pool, size); + if (NULL == ring) + return -1; + + for (ring_idx = 0; ring_idx < ring_size; ring_idx++) + ring[ring_idx] = NULL; + + queue->s.type = queue->s.param.type; + queue->s.enqueue = _queue_enq; + queue->s.dequeue = _queue_deq; + queue->s.enqueue_multi = _queue_enq_multi; + queue->s.dequeue_multi = _queue_deq_multi; + queue->s.pktin = PKTIN_INVALID; + + sched_elem->node.next = NULL; +#ifdef CONFIG_QSCHST_LOCK + LOCK_INIT(&sched_elem->qschlock); +#endif + sched_elem->qschst.numevts = 0; + sched_elem->qschst.wrr_budget = CONFIG_WRR_WEIGHT; + sched_elem->qschst.cur_ticket = 0; + sched_elem->qschst.nxt_ticket = 0; + sched_elem->pop_deficit = 0; + if (queue->s.type == ODP_QUEUE_TYPE_SCHED) + sched_elem->qschst_type = queue->s.param.sched.sync; + else + sched_elem->qschst_type = ODP_NO_SCHED_QUEUE; + /* 2nd cache line - enqueue */ + sched_elem->prod_read = 0; + sched_elem->prod_write = 0; + sched_elem->prod_ring = ring; + sched_elem->prod_mask = ring_size - 1; + /* 3rd cache line - dequeue */ + sched_elem->cons_read = 0; + sched_elem->cons_write = 0; + sched_elem->rwin = NULL; + sched_elem->schedq = NULL; + sched_elem->user_ctx = queue->s.param.context; +#ifdef CONFIG_SPLIT_PRODCONS + sched_elem->cons_ring = ring; + sched_elem->cons_mask = ring_size - 1; + sched_elem->cons_type = sched_elem->qschst_type; +#endif + + /* Queue initialized successfully, add it to the sched group */ + if (queue->s.type == ODP_QUEUE_TYPE_SCHED) { + if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) { + sched_elem->rwin = + rwin_alloc(queue_shm_pool, + queue->s.param.sched.lock_count); + if (sched_elem->rwin == NULL) { + ODP_ERR("Reorder window not created\n"); + goto rwin_create_failed; + } + } + sched_elem->schedq = + schedq_from_sched_group(param->sched.group, + param->sched.prio); + } + + return 0; + +rwin_create_failed: + _odp_ishm_pool_free(queue_shm_pool, ring); + + return -1; +} + +static int queue_init_global(void) +{ + uint32_t i; + uint64_t pool_size; + uint64_t min_alloc; + uint64_t max_alloc; + + ODP_DBG("Queue init ... "); + + /* Attach to the pool if it exists */ + queue_shm_pool = _odp_ishm_pool_lookup("queue_shm_pool"); + if (queue_shm_pool == NULL) { + /* Create shared memory pool to allocate shared memory for the + * queues. Use the default queue size. + */ + /* Add size of the array holding the queues */ + pool_size = sizeof(queue_table_t); + /* Add storage required for queues */ + pool_size += (CONFIG_QUEUE_SIZE * sizeof(odp_buffer_hdr_t *)) * + ODP_CONFIG_QUEUES; + /* Add the reorder window size */ + pool_size += sizeof(reorder_window_t) * ODP_CONFIG_QUEUES; + /* Choose min_alloc and max_alloc such that buddy allocator is + * is selected. + */ + min_alloc = 0; + max_alloc = CONFIG_QUEUE_SIZE * sizeof(odp_buffer_hdr_t *); + queue_shm_pool = _odp_ishm_pool_create("queue_shm_pool", + pool_size, + min_alloc, max_alloc, + _ODP_ISHM_SINGLE_VA); + if (queue_shm_pool == NULL) { + ODP_ERR("Failed to allocate shared memory pool for" + " queues\n"); + goto queue_shm_pool_create_failed; + } + } + + queue_tbl = (queue_table_t *) + shm_pool_alloc_align(queue_shm_pool, + sizeof(queue_table_t)); + if (queue_tbl == NULL) { + ODP_ERR("Failed to reserve shared memory for queue table\n"); + goto queue_tbl_ishm_alloc_failed; + } + + memset(queue_tbl, 0, sizeof(queue_table_t)); + + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { + /* init locks */ + queue_entry_t *queue; + + queue = get_qentry(i); + LOCK_INIT(&queue->s.lock); + queue->s.index = i; + queue->s.handle = queue_from_id(i); + } + + ODP_DBG("done\n"); + ODP_DBG("Queue init global\n"); + ODP_DBG(" struct queue_entry_s size %zu\n", + sizeof(struct queue_entry_s)); + ODP_DBG(" queue_entry_t size %zu\n", + sizeof(queue_entry_t)); + ODP_DBG("\n"); + + return 0; + +queue_shm_pool_create_failed: + +queue_tbl_ishm_alloc_failed: + _odp_ishm_pool_destroy(queue_shm_pool); + + return -1; +} + +static int queue_term_global(void) +{ + int ret = 0; + int rc = 0; + queue_entry_t *queue; + int i; + + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { + queue = &queue_tbl->queue[i]; + if (__atomic_load_n(&queue->s.status, + __ATOMIC_RELAXED) != QUEUE_STATUS_FREE) { + ODP_ERR("Not destroyed queue: %s\n", queue->s.name); + rc = -1; + } + } + + _odp_ishm_pool_free(queue_shm_pool, queue_tbl); + + ret = _odp_ishm_pool_destroy(queue_shm_pool); + if (ret < 0) { + ODP_ERR("Failed to destroy shared memory pool for queues\n"); + rc = -1; + } + + return rc; +} + +static int queue_init_local(void) +{ + return 0; +} + +static int queue_term_local(void) +{ + return 0; +} + +static int queue_capability(odp_queue_capability_t *capa) +{ + memset(capa, 0, sizeof(odp_queue_capability_t)); + + /* Reserve some queues for internal use */ + capa->max_queues = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; + capa->max_ordered_locks = sched_fn->max_ordered_locks(); + capa->max_sched_groups = sched_fn->num_grps(); + capa->sched_prios = odp_schedule_num_prio(); + capa->plain.max_num = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; + capa->plain.max_size = 0; + capa->sched.max_num = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; + capa->sched.max_size = 0; + + return 0; +} + +static odp_queue_type_t queue_type(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.type; +} + +static odp_schedule_sync_t queue_sched_type(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.sched.sync; +} + +static odp_schedule_prio_t queue_sched_prio(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.sched.prio; +} + +static odp_schedule_group_t queue_sched_group(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.sched.group; +} + +static int queue_lock_count(odp_queue_t handle) +{ + queue_entry_t *queue = qentry_from_int(queue_from_ext(handle)); + + return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ? + (int)queue->s.param.sched.lock_count : -1; +} + +static odp_queue_t queue_create(const char *name, + const odp_queue_param_t *param) +{ + int queue_idx; + odp_queue_t handle = ODP_QUEUE_INVALID; + queue_entry_t *queue; + odp_queue_param_t default_param; + + if (param == NULL) { + odp_queue_param_init(&default_param); + param = &default_param; + } + + for (queue_idx = 0; queue_idx < ODP_CONFIG_QUEUES; queue_idx++) { + queue = &queue_tbl->queue[queue_idx]; + + if (queue->s.status != QUEUE_STATUS_FREE) + continue; + + LOCK(&queue->s.lock); + if (queue->s.status == QUEUE_STATUS_FREE) { + if (queue_init(queue, name, param)) { + UNLOCK(&queue->s.lock); + return handle; + } + queue->s.status = QUEUE_STATUS_READY; + handle = queue->s.handle; + UNLOCK(&queue->s.lock); + break; + } + UNLOCK(&queue->s.lock); + } + return handle; +} + +static int queue_destroy(odp_queue_t handle) +{ + queue_entry_t *queue; + sched_elem_t *q; + + if (handle == ODP_QUEUE_INVALID) + return -1; + + queue = qentry_from_int(queue_from_ext(handle)); + LOCK(&queue->s.lock); + if (queue->s.status != QUEUE_STATUS_READY) { + UNLOCK(&queue->s.lock); + return -1; + } + q = &queue->s.sched_elem; + +#ifdef CONFIG_QSCHST_LOCK + LOCK(&q->qschlock); +#endif + if (_odp_queue_disable_enq(q)) { + /* Producer side not empty */ +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&q->qschlock); +#endif + UNLOCK(&queue->s.lock); + return -1; + } + /* Enqueue is now disabled */ + if (q->cons_read != q->cons_write) { + /* Consumer side is not empty + * Roll back previous change, enable enqueue again. + */ + uint32_t size; + + size = q->prod_mask + 1; + __atomic_fetch_sub(&q->prod_write, size, __ATOMIC_RELAXED); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&q->qschlock); +#endif + UNLOCK(&queue->s.lock); + return -1; + } +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&q->qschlock); +#endif + /* Producer and consumer sides empty, enqueue disabled + * Now wait until schedq state is empty and no outstanding tickets + */ + while (__atomic_load_n(&q->qschst.numevts, __ATOMIC_RELAXED) != 0 || + __atomic_load_n(&q->qschst.cur_ticket, __ATOMIC_RELAXED) != + __atomic_load_n(&q->qschst.nxt_ticket, __ATOMIC_RELAXED)) { + sevl(); + while (wfe() && monitor32((uint32_t *)&q->qschst.numevts, + __ATOMIC_RELAXED) != 0) + doze(); + } + + /* Adjust the spread factor for the queues in the schedule group */ + if (queue->s.type == ODP_QUEUE_TYPE_SCHED) + sched_group_xcount_dec(queue->s.param.sched.group, + queue->s.param.sched.prio); + + _odp_ishm_pool_free(queue_shm_pool, q->prod_ring); + + if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) { + if (rwin_free(queue_shm_pool, q->rwin) < 0) { + ODP_ERR("Failed to free reorder window\n"); + UNLOCK(&queue->s.lock); + return -1; + } + } + queue->s.status = QUEUE_STATUS_FREE; + UNLOCK(&queue->s.lock); + return 0; +} + +static int queue_context_set(odp_queue_t handle, void *context, + uint32_t len ODP_UNUSED) +{ + odp_mb_full(); + qentry_from_int(queue_from_ext(handle))->s.param.context = context; + odp_mb_full(); + return 0; +} + +static void *queue_context(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.context; +} + +static odp_queue_t queue_lookup(const char *name) +{ + uint32_t i; + + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { + queue_entry_t *queue = &queue_tbl->queue[i]; + + if (queue->s.status == QUEUE_STATUS_FREE || + queue->s.status == QUEUE_STATUS_DESTROYED) + continue; + + LOCK(&queue->s.lock); + if (strcmp(name, queue->s.name) == 0) { + /* found it */ + UNLOCK(&queue->s.lock); + return queue->s.handle; + } + UNLOCK(&queue->s.lock); + } + + return ODP_QUEUE_INVALID; +} + +#ifndef CONFIG_QSCHST_LOCK +static inline int _odp_queue_enq(sched_elem_t *q, + odp_buffer_hdr_t *buf_hdr[], + int num) +{ + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_write; + int actual; + uint32_t mask; + odp_buffer_hdr_t **ring; + + mask = q->prod_mask; + ring = q->prod_ring; + + /* Load producer ring state (read & write index) */ + old_write = __atomic_load_n(&q->prod_write, __ATOMIC_RELAXED); + do { + /* Consumer does store-release prod_read, we need + * load-acquire. + */ + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE); + + actual = MIN(num, (int)((mask + 1) - (old_write - old_read))); + if (odp_unlikely(actual <= 0)) + return 0; + + new_write = old_write + actual; + } while (!__atomic_compare_exchange_n(&q->prod_write, + &old_write, /* Updated on failure */ + new_write, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->cons_write, 0, 0); +#endif + /* Store our event(s) in the ring */ + do { + ring[old_write & mask] = *buf_hdr++; + } while (++old_write != new_write); + old_write -= actual; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + /* Wait for our turn to signal consumers */ + if (odp_unlikely(__atomic_load_n(&q->cons_write, + __ATOMIC_RELAXED) != old_write)) { + sevl(); + while (wfe() && monitor32(&q->cons_write, + __ATOMIC_RELAXED) != old_write) + doze(); + } + + /* Signal consumers that events are available (release events) + * Enable other producers to continue + */ + /* Wait for writes (to ring slots) to complete */ + atomic_store_release(&q->cons_write, new_write, /*readonly=*/false); + + return actual; +} + +#else + +static inline int _odp_queue_enq_sp(sched_elem_t *q, + odp_buffer_hdr_t *buf_hdr[], + int num) +{ + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_write; + int actual; + uint32_t mask; + odp_buffer_hdr_t **ring; + + mask = q->prod_mask; + ring = q->prod_ring; + + /* Load producer ring state (read & write index) */ + old_write = q->prod_write; + /* Consumer does store-release prod_read, we need load-acquire */ + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE); + actual = MIN(num, (int)((mask + 1) - (old_write - old_read))); + if (odp_unlikely(actual <= 0)) + return 0; + + new_write = old_write + actual; + q->prod_write = new_write; + + /* Store our event(s) in the ring */ + do { + ring[old_write & mask] = *buf_hdr++; + } while (++old_write != new_write); + old_write -= actual; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + + /* Signal consumers that events are available (release events) + * Enable other producers to continue + */ +#ifdef CONFIG_QSCHST_LOCK + q->cons_write = new_write; +#else + atomic_store_release(&q->cons_write, new_write, /*readonly=*/false); +#endif + + return actual; +} +#endif + +static int _queue_enq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + int actual; + queue_entry_t *queue; + sched_scalable_thread_state_t *ts; + + queue = qentry_from_int(handle); + ts = sched_ts; + if (ts && odp_unlikely(ts->out_of_order)) { + actual = rctx_save(queue, buf_hdr, num); + return actual; + } + +#ifdef CONFIG_QSCHST_LOCK + LOCK(&queue->s.sched_elem.qschlock); + actual = _odp_queue_enq_sp(&queue->s.sched_elem, buf_hdr, num); +#else + actual = _odp_queue_enq(&queue->s.sched_elem, buf_hdr, num); +#endif + + if (odp_likely(queue->s.sched_elem.schedq != NULL && actual != 0)) { + /* Perform scheduler related updates. */ +#ifdef CONFIG_QSCHST_LOCK + sched_update_enq_sp(&queue->s.sched_elem, actual); +#else + sched_update_enq(&queue->s.sched_elem, actual); +#endif + } + +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&queue->s.sched_elem.qschlock); +#endif + return actual; +} + +static int _queue_enq(queue_t handle, odp_buffer_hdr_t *buf_hdr) +{ + return odp_likely( + _queue_enq_multi(handle, &buf_hdr, 1) == 1) ? 0 : -1; +} + +static int queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num) +{ + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + queue_entry_t *queue; + int i; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + queue = qentry_from_int(queue_from_ext(handle)); + + for (i = 0; i < num; i++) + buf_hdr[i] = buf_hdl_to_hdr(odp_buffer_from_event(ev[i])); + + return queue->s.enqueue_multi(qentry_to_int(queue), buf_hdr, num); +} + +static int queue_enq(odp_queue_t handle, odp_event_t ev) +{ + odp_buffer_hdr_t *buf_hdr; + queue_entry_t *queue; + + queue = qentry_from_int(queue_from_ext(handle)); + buf_hdr = buf_hdl_to_hdr(odp_buffer_from_event(ev)); + + return queue->s.enqueue(qentry_to_int(queue), buf_hdr); +} + +/* Single-consumer dequeue. */ +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num) +{ + int actual; + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_read; + uint32_t mask; + odp_buffer_hdr_t **ring; + + /* Load consumer ring state (read & write index). */ + old_read = q->cons_read; + /* Producer does store-release cons_write, we need load-acquire */ + old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE); + actual = MIN(num, (int)(old_write - old_read)); + + if (odp_unlikely(actual <= 0)) + return 0; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + new_read = old_read + actual; + q->cons_read = new_read; + + mask = q->cons_mask; + ring = q->cons_ring; + do { + *evp++ = odp_buffer_to_event( + odp_hdr_to_buf(ring[old_read & mask])); + } while (++old_read != new_read); + + /* Signal producers that empty slots are available + * (release ring slots). Enable other consumers to continue. + */ +#ifdef CONFIG_QSCHST_LOCK + q->prod_read = new_read; +#else + /* Wait for loads (from ring slots) to complete. */ + atomic_store_release(&q->prod_read, new_read, /*readonly=*/true); +#endif + return actual; +} + +inline int _odp_queue_deq(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int actual; + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_read; + uint32_t mask; + odp_buffer_hdr_t **ring; + odp_buffer_hdr_t **p_buf_hdr; + + mask = q->cons_mask; + ring = q->cons_ring; + + /* Load consumer ring state (read & write index) */ + old_read = __atomic_load_n(&q->cons_read, __ATOMIC_RELAXED); + do { + /* Need __atomic_load to avoid compiler reordering + * Producer does store-release cons_write, we need + * load-acquire. + */ + old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE); + /* Prefetch ring buffer array */ + __builtin_prefetch(&q->cons_ring[old_read & mask], 0, 0); + + actual = MIN(num, (int)(old_write - old_read)); + if (odp_unlikely(actual <= 0)) + return 0; + + /* Attempt to free ring slot(s) */ + new_read = old_read + actual; + } while (!__atomic_compare_exchange_n(&q->cons_read, + &old_read, /* Updated on failure */ + new_read, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->prod_read, 0, 0); +#endif + p_buf_hdr = buf_hdr; + do { + *p_buf_hdr++ = ring[old_read & mask]; + } while (++old_read != new_read); + old_read -= actual; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + /* Wait for our turn to signal producers */ + if (odp_unlikely(__atomic_load_n(&q->prod_read, __ATOMIC_RELAXED) != + old_read)) { + sevl(); + while (wfe() && monitor32(&q->prod_read, + __ATOMIC_RELAXED) != old_read) + doze(); + } + + /* Signal producers that empty slots are available + * (release ring slots) + * Enable other consumers to continue + */ + /* Wait for loads (from ring slots) to complete */ + atomic_store_release(&q->prod_read, new_read, /*readonly=*/true); + + return actual; +} + +inline int _odp_queue_deq_mc(sched_elem_t *q, odp_event_t *evp, int num) +{ + int ret, evt_idx; + odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX]; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + ret = _odp_queue_deq(q, hdr_tbl, num); + if (odp_likely(ret != 0)) { + for (evt_idx = 0; evt_idx < num; evt_idx++) + evp[evt_idx] = odp_buffer_to_event( + odp_hdr_to_buf(hdr_tbl[evt_idx])); + } + + return ret; +} + +static int _queue_deq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + sched_elem_t *q; + queue_entry_t *queue; + + queue = qentry_from_int(handle); + q = &queue->s.sched_elem; + return _odp_queue_deq(q, buf_hdr, num); +} + +static odp_buffer_hdr_t *_queue_deq(queue_t handle) +{ + sched_elem_t *q; + odp_buffer_hdr_t *buf_hdr; + queue_entry_t *queue; + + queue = qentry_from_int(handle); + q = &queue->s.sched_elem; + if (_odp_queue_deq(q, &buf_hdr, 1) == 1) + return buf_hdr; + else + return NULL; +} + +static int queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num) +{ + queue_entry_t *queue; + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + int i, ret; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + queue = qentry_from_int(queue_from_ext(handle)); + + ret = queue->s.dequeue_multi(qentry_to_int(queue), buf_hdr, num); + + for (i = 0; i < ret; i++) + events[i] = odp_buffer_to_event(buf_hdr[i]->handle.handle); + + return ret; +} + +static odp_event_t queue_deq(odp_queue_t handle) +{ + queue_entry_t *queue; + odp_buffer_hdr_t *buf_hdr; + + queue = qentry_from_int(queue_from_ext(handle)); + buf_hdr = queue->s.dequeue(qentry_to_int(queue)); + + if (buf_hdr) + return odp_buffer_to_event(buf_hdr->handle.handle); + + return ODP_EVENT_INVALID; +} + +static void queue_param_init(odp_queue_param_t *params) +{ + memset(params, 0, sizeof(odp_queue_param_t)); + params->type = ODP_QUEUE_TYPE_PLAIN; + params->enq_mode = ODP_QUEUE_OP_MT; + params->deq_mode = ODP_QUEUE_OP_MT; + params->sched.prio = ODP_SCHED_PRIO_DEFAULT; + params->sched.sync = ODP_SCHED_SYNC_PARALLEL; + params->sched.group = ODP_SCHED_GROUP_ALL; +} + +static int queue_info(odp_queue_t handle, odp_queue_info_t *info) +{ + uint32_t queue_id; + queue_entry_t *queue; + int status; + + if (odp_unlikely(info == NULL)) { + ODP_ERR("Unable to store info, NULL ptr given\n"); + return -1; + } + + queue_id = queue_to_id(handle); + + if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) { + ODP_ERR("Invalid queue handle:%" PRIu64 "\n", + odp_queue_to_u64(handle)); + return -1; + } + + queue = get_qentry(queue_id); + + LOCK(&queue->s.lock); + status = queue->s.status; + + if (odp_unlikely(status == QUEUE_STATUS_FREE || + status == QUEUE_STATUS_DESTROYED)) { + UNLOCK(&queue->s.lock); + ODP_ERR("Invalid queue status:%d\n", status); + return -1; + } + + info->name = queue->s.name; + info->param = queue->s.param; + + UNLOCK(&queue->s.lock); + + return 0; +} + +static uint64_t queue_to_u64(odp_queue_t hdl) +{ + return _odp_pri(hdl); +} + +static odp_pktout_queue_t queue_get_pktout(queue_t handle) +{ + return qentry_from_int(handle)->s.pktout; +} + +static void queue_set_pktout(queue_t handle, odp_pktio_t pktio, int index) +{ + qentry_from_int(handle)->s.pktout.pktio = pktio; + qentry_from_int(handle)->s.pktout.index = index; +} + +static odp_pktin_queue_t queue_get_pktin(queue_t handle) +{ + return qentry_from_int(handle)->s.pktin; +} + +static void queue_set_pktin(queue_t handle, odp_pktio_t pktio, int index) +{ + qentry_from_int(handle)->s.pktin.pktio = pktio; + qentry_from_int(handle)->s.pktin.index = index; +} + +static void queue_set_enq_func(queue_t handle, queue_enq_fn_t func) +{ + qentry_from_int(handle)->s.enqueue = func; +} + +static void queue_set_enq_multi_func(queue_t handle, queue_enq_multi_fn_t func) +{ + qentry_from_int(handle)->s.enqueue_multi = func; +} + +static void queue_set_deq_func(queue_t handle, queue_deq_fn_t func) +{ + qentry_from_int(handle)->s.dequeue = func; +} + +static void queue_set_deq_multi_func(queue_t handle, queue_deq_multi_fn_t func) +{ + qentry_from_int(handle)->s.dequeue_multi = func; +} + +static void queue_set_type(queue_t handle, odp_queue_type_t type) +{ + qentry_from_int(handle)->s.type = type; +} + +static queue_t queue_from_ext(odp_queue_t handle) +{ + uint32_t queue_id; + + queue_id = queue_to_id(handle); + return qentry_to_int(get_qentry(queue_id)); +} + +static odp_queue_t queue_to_ext(queue_t handle) +{ + return qentry_from_int(handle)->s.handle; +} + +/* API functions */ +queue_api_t queue_scalable_api = { + .queue_create = queue_create, + .queue_destroy = queue_destroy, + .queue_lookup = queue_lookup, + .queue_capability = queue_capability, + .queue_context_set = queue_context_set, + .queue_context = queue_context, + .queue_enq = queue_enq, + .queue_enq_multi = queue_enq_multi, + .queue_deq = queue_deq, + .queue_deq_multi = queue_deq_multi, + .queue_type = queue_type, + .queue_sched_type = queue_sched_type, + .queue_sched_prio = queue_sched_prio, + .queue_sched_group = queue_sched_group, + .queue_lock_count = queue_lock_count, + .queue_to_u64 = queue_to_u64, + .queue_param_init = queue_param_init, + .queue_info = queue_info +}; + +/* Functions towards internal components */ +queue_fn_t queue_scalable_fn = { + .init_global = queue_init_global, + .term_global = queue_term_global, + .init_local = queue_init_local, + .term_local = queue_term_local, + .from_ext = queue_from_ext, + .to_ext = queue_to_ext, + .enq = _queue_enq, + .enq_multi = _queue_enq_multi, + .deq = _queue_deq, + .deq_multi = _queue_deq_multi, + .get_pktout = queue_get_pktout, + .set_pktout = queue_set_pktout, + .get_pktin = queue_get_pktin, + .set_pktin = queue_set_pktin, + .set_enq_fn = queue_set_enq_func, + .set_enq_multi_fn = queue_set_enq_multi_func, + .set_deq_fn = queue_set_deq_func, + .set_deq_multi_fn = queue_set_deq_multi_func, + .set_type = queue_set_type +}; diff --git a/platform/linux-generic/odp_schedule_if.c b/platform/linux-generic/odp_schedule_if.c index a9ede98d..2f07aafe 100644 --- a/platform/linux-generic/odp_schedule_if.c +++ b/platform/linux-generic/odp_schedule_if.c @@ -15,12 +15,18 @@ extern const schedule_api_t schedule_default_api; extern const schedule_fn_t schedule_iquery_fn; extern const schedule_api_t schedule_iquery_api;
+extern const schedule_fn_t schedule_scalable_fn; +extern const schedule_api_t schedule_scalable_api; + #ifdef ODP_SCHEDULE_SP const schedule_fn_t *sched_fn = &schedule_sp_fn; const schedule_api_t *sched_api = &schedule_sp_api; #elif defined(ODP_SCHEDULE_IQUERY) const schedule_fn_t *sched_fn = &schedule_iquery_fn; const schedule_api_t *sched_api = &schedule_iquery_api; +#elif defined(ODP_SCHEDULE_SCALABLE) +const schedule_fn_t *sched_fn = &schedule_scalable_fn; +const schedule_api_t *sched_api = &schedule_scalable_api; #else const schedule_fn_t *sched_fn = &schedule_default_fn; const schedule_api_t *sched_api = &schedule_default_api; diff --git a/platform/linux-generic/odp_schedule_scalable.c b/platform/linux-generic/odp_schedule_scalable.c new file mode 100644 index 00000000..78159b53 --- /dev/null +++ b/platform/linux-generic/odp_schedule_scalable.c @@ -0,0 +1,1980 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <odp/api/align.h> +#include <odp/api/atomic.h> +#include <odp/api/cpu.h> +#include <odp/api/hints.h> +#include <odp/api/schedule.h> +#include <odp/api/shared_memory.h> +#include <odp/api/sync.h> +#include <odp/api/thread.h> +#include <odp/api/thrmask.h> +#include <odp/api/time.h> + +#include <odp_internal.h> +#include <odp_config_internal.h> +#include <odp_debug_internal.h> +#include <_ishm_internal.h> +#include <_ishmpool_internal.h> + +#include <odp_align_internal.h> +#include <odp_buffer_inlines.h> +#include <odp_llqueue.h> +#include <odp_queue_scalable_internal.h> +#include <odp_schedule_if.h> +#include <odp_bitset.h> +#include <odp_packet_io_internal.h> + +#include <limits.h> +#include <stdbool.h> +#include <string.h> + +#include <odp/api/plat/ticketlock_inlines.h> +#define LOCK(a) _odp_ticketlock_lock((a)) +#define UNLOCK(a) _odp_ticketlock_unlock((a)) + +#define TAG_EMPTY 0U +#define TAG_USED (1U << 15) +#define TAG_BUSY (1U << 31) +#define PKTIO_QUEUE_2_TAG(p, q) ((p) << 16 | (q) | TAG_USED) +#define TAG_2_PKTIO(t) (((t) >> 16) & 0x7FFF) +#define TAG_2_QUEUE(t) ((t) & 0x7FFF) +#define TAG_IS_READY(t) (((t) & (TAG_USED | TAG_BUSY)) == TAG_USED) +#define PKTIN_MAX (ODP_CONFIG_PKTIO_ENTRIES * PKTIO_MAX_QUEUES) +#define MAXTHREADS ATOM_BITSET_SIZE + +static _odp_ishm_pool_t *sched_shm_pool; +static uint32_t pktin_num; +static uint32_t pktin_hi; +static uint16_t pktin_count[ODP_CONFIG_PKTIO_ENTRIES]; +static uint32_t pktin_tags[PKTIN_MAX] ODP_ALIGNED_CACHE; + +#define __atomic_fetch_max(var, v, mo) do { \ + /* Evalulate 'v' once */ \ + __typeof__(v) tmp_v = (v); \ + __typeof__(*var) old_var = \ + __atomic_load_n((var), __ATOMIC_RELAXED); \ + while (tmp_v > old_var) { \ + /* Attempt to store 'v' in '*var' */ \ + if (__atomic_compare_exchange_n((var), &old_var, \ + tmp_v, true, (mo), \ + (mo))) \ + break; \ + } \ + /* v <= old_var, nothing to do */ \ + } while (0) + +ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (ODP_SCHED_PRIO_NUM - 1), + "lowest_prio_does_not_match_with_num_prios"); + +ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && + (ODP_SCHED_PRIO_NORMAL < (ODP_SCHED_PRIO_NUM - 1)), + "normal_prio_is_not_between_highest_and_lowest"); + +ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES), + "Number_of_queues_is_not_power_of_two"); + +/* + * Scheduler group related variables. + */ +/* Currently used scheduler groups */ +static sched_group_mask_t sg_free; +static sched_group_t *sg_vec[MAX_SCHED_GROUP]; +/* Group lock for MT-safe APIs */ +odp_spinlock_t sched_grp_lock; + +#define SCHED_GROUP_JOIN 0 +#define SCHED_GROUP_LEAVE 1 + +/* + * Per thread state + */ +static sched_scalable_thread_state_t thread_state[MAXTHREADS]; +__thread sched_scalable_thread_state_t *sched_ts; + +/* + * Forward declarations. + */ +static int thread_state_init(int tidx) +{ + sched_scalable_thread_state_t *ts; + uint32_t i; + + ODP_ASSERT(tidx < MAXTHREADS); + ts = &thread_state[tidx]; + ts->atomq = NULL; + ts->rctx = NULL; + ts->pause = false; + ts->out_of_order = false; + ts->tidx = tidx; + ts->dequeued = 0; + ts->pktin_next = 0; + ts->pktin_poll_cnts = 0; + ts->ticket = TICKET_INVALID; + ts->priv_rvec_free = 0; + ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1; + ts->num_schedq = 0; + ts->sg_sem = 1; /* Start with sched group semaphore changed */ + memset(ts->sg_actual, 0, sizeof(ts->sg_actual)); + for (i = 0; i < TS_RVEC_SIZE; i++) { + ts->rvec[i].rvec_free = &ts->rvec_free; + ts->rvec[i].idx = i; + } + sched_ts = ts; + + return 0; +} + +static void insert_schedq_in_list(sched_scalable_thread_state_t *ts, + sched_queue_t *schedq) +{ + /* Find slot for schedq */ + for (uint32_t i = 0; i < ts->num_schedq; i++) { + /* Lower value is higher priority and closer to start of list */ + if (schedq->prio <= ts->schedq_list[i]->prio) { + /* This is the slot! */ + sched_queue_t *tmp; + + tmp = ts->schedq_list[i]; + ts->schedq_list[i] = schedq; + schedq = tmp; + /* Continue the insertion procedure with the + * new schedq. + */ + } + } + if (ts->num_schedq == SCHEDQ_PER_THREAD) + ODP_ABORT("Too many schedqs\n"); + ts->schedq_list[ts->num_schedq++] = schedq; +} + +static void remove_schedq_from_list(sched_scalable_thread_state_t *ts, + sched_queue_t *schedq) +{ + /* Find schedq */ + for (uint32_t i = 0; i < ts->num_schedq; i++) + if (ts->schedq_list[i] == schedq) { + /* Move remaining schedqs */ + for (uint32_t j = i + 1; j < ts->num_schedq; j++) + ts->schedq_list[j - 1] = ts->schedq_list[j]; + ts->num_schedq--; + return; + } + ODP_ABORT("Cannot find schedq\n"); +} + +/******************************************************************************* + * Scheduler queues + ******************************************************************************/ +#ifndef odp_container_of +#define odp_container_of(pointer, type, member) \ + ((type *)(void *)(((char *)pointer) - offsetof(type, member))) +#endif + +static inline void schedq_init(sched_queue_t *schedq, uint32_t prio) +{ + llqueue_init(&schedq->llq); + schedq->prio = prio; +} + +static inline sched_elem_t *schedq_peek(sched_queue_t *schedq) +{ + struct llnode *ptr; + + ptr = llq_head(&schedq->llq); + return odp_container_of(ptr, sched_elem_t, node); +} + +static inline odp_bool_t schedq_cond_pop(sched_queue_t *schedq, + sched_elem_t *elem) +{ + return llq_dequeue_cond(&schedq->llq, &elem->node); +} + +static inline void schedq_push(sched_queue_t *schedq, sched_elem_t *elem) +{ + llq_enqueue(&schedq->llq, &elem->node); +} + +static inline odp_bool_t schedq_cond_rotate(sched_queue_t *schedq, + sched_elem_t *elem) +{ + return llq_cond_rotate(&schedq->llq, &elem->node); +} + +static inline bool schedq_elem_on_queue(sched_elem_t *elem) +{ + return llq_on_queue(&elem->node); +} + +/******************************************************************************* + * Shared metadata btwn scheduler and queue + ******************************************************************************/ + +void sched_update_enq(sched_elem_t *q, uint32_t actual) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + oss = q->qschst; + /* Update event counter, optionally taking a ticket. */ + do { + ticket = TICKET_INVALID; + nss = oss; + nss.numevts += actual; + if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0)) + /* E -> NE transition */ + if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC || + oss.cur_ticket == oss.nxt_ticket) + /* Parallel or ordered queues: always take + * ticket. + * Atomic queue: only take ticket if one is + * immediately available. + * Otherwise ticket already taken => queue + * processed by some thread. + */ + ticket = nss.nxt_ticket++; + /* Else queue already was non-empty. */ + /* Attempt to update numevts counter and optionally take ticket. */ + } while (!__atomic_compare_exchange( + &q->qschst, &oss, &nss, + true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if (odp_unlikely(ticket != TICKET_INVALID)) { + /* Wait for our turn to update schedq. */ + if (odp_unlikely( + __atomic_load_n(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket)) { + sevl(); + while (wfe() && + monitor8(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket) + doze(); + } + /* Enqueue at end of scheduler queue */ + /* We are here because of empty-to-non-empty transition + * This means queue must be pushed to schedq if possible + * but we can't do that if it already is on the schedq + */ + if (odp_likely(!schedq_elem_on_queue(q) && + q->pop_deficit == 0)) { + /* Queue not already on schedq and no pop deficit means + * we can push queue to schedq */ + schedq_push(q->schedq, q); + } else { + /* Missed push => cancels one missed pop */ + q->pop_deficit--; + } + atomic_store_release(&q->qschst.cur_ticket, ticket + 1, + /*readonly=*/false); + } + /* Else queue was not empty or atomic queue already busy. */ +} + +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + oss = q->qschst; + /* Update event counter, optionally taking a ticket. */ + ticket = TICKET_INVALID; + nss = oss; + nss.numevts += actual; + if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0)) { + /* E -> NE transition */ + if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC || + oss.cur_ticket == oss.nxt_ticket) { + /* Parallel or ordered queues: always take + * ticket. + * Atomic queue: only take ticket if one is + * immediately available. Otherwise ticket already + * taken => queue owned/processed by some thread + */ + ticket = nss.nxt_ticket++; + } + } + /* Else queue already was non-empty. */ + /* Attempt to update numevts counter and optionally take ticket. */ + q->qschst = nss; + + if (odp_unlikely(ticket != TICKET_INVALID)) { + /* Enqueue at end of scheduler queue */ + /* We are here because of empty-to-non-empty transition + * This means queue must be pushed to schedq if possible + * but we can't do that if it already is on the schedq + */ + if (odp_likely(!schedq_elem_on_queue(q) && + q->pop_deficit == 0)) { + /* Queue not already on schedq and no pop deficit means + * we can push queue to schedq */ + schedq_push(q->schedq, q); + } else { + /* Missed push => cancels one missed pop */ + q->pop_deficit--; + } + q->qschst.cur_ticket = ticket + 1; + } + /* Else queue was not empty or atomic queue already busy. */ +} + +#ifndef CONFIG_QSCHST_LOCK +/* The scheduler is the only entity that performs the dequeue from a queue. */ +static void +sched_update_deq(sched_elem_t *q, + uint32_t actual, + bool atomic) __attribute__((always_inline)); +static inline void +sched_update_deq(sched_elem_t *q, + uint32_t actual, bool atomic) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + if (atomic) { + bool pushed = false; + + /* We own this atomic queue, only we can dequeue from it and + * thus decrease numevts. Other threads may enqueue and thus + * increase numevts. + * This means that numevts can't unexpectedly become 0 and + * invalidate a push operation already performed + */ + oss = q->qschst; + do { + ODP_ASSERT(oss.cur_ticket == sched_ts->ticket); + nss = oss; + nss.numevts -= actual; + if (nss.numevts > 0 && !pushed) { + schedq_push(q->schedq, q); + pushed = true; + } + /* Attempt to release ticket expecting our view of + * numevts to be correct + * Unfortunately nxt_ticket will also be included in + * the CAS operation + */ + nss.cur_ticket = sched_ts->ticket + 1; + } while (odp_unlikely(!__atomic_compare_exchange( + &q->qschst, + &oss, &nss, + true, + __ATOMIC_RELEASE, + __ATOMIC_RELAXED))); + return; + } + + oss = q->qschst; + do { + ticket = TICKET_INVALID; + nss = oss; + nss.numevts -= actual; + nss.wrr_budget -= actual; + if ((oss.numevts > 0 && nss.numevts <= 0) || + oss.wrr_budget <= actual) { + /* If we have emptied parallel/ordered queue or + * exchausted its WRR budget, we need a ticket + * for a later pop. + */ + ticket = nss.nxt_ticket++; + /* Reset wrr_budget as we might also push the + * queue to the schedq. + */ + nss.wrr_budget = CONFIG_WRR_WEIGHT; + } + /* Attempt to update numevts and optionally take ticket. */ + } while (!__atomic_compare_exchange( + &q->qschst, &oss, &nss, + true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if (odp_unlikely(ticket != TICKET_INVALID)) { + ODP_ASSERT(q->qschst_type != ODP_SCHED_SYNC_ATOMIC); + /* Wait for our turn to update schedq. */ + if (odp_unlikely( + __atomic_load_n(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket)) { + sevl(); + while (wfe() && + monitor8(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket) + doze(); + } + /* We are here because of non-empty-to-empty transition or + * WRR budget exhausted + * This means the queue must be popped from the schedq, now or + * later + * If there was no NE->E transition but instead the WRR budget + * was exhausted, the queue needs to be moved (popped and + * pushed) to the tail of the schedq + */ + if (oss.numevts > 0 && nss.numevts <= 0) { + /* NE->E transition, need to pop */ + if (!schedq_elem_on_queue(q) || + !schedq_cond_pop(q->schedq, q)) { + /* Queue not at head, failed to dequeue + * Missed a pop. + */ + q->pop_deficit++; + } + } else { + /* WRR budget exhausted + * Need to move queue to tail of schedq if possible + */ + if (odp_likely(schedq_elem_on_queue(q))) { + /* Queue is on schedq, try to move it to + * the tail + */ + (void)schedq_cond_rotate(q->schedq, q); + } + /* Else queue not on schedq or not at head of schedq + * No pop => no push + */ + } + atomic_store_release(&q->qschst.cur_ticket, ticket + 1, + /*readonly=*/false); + } +} +#endif + +#ifdef CONFIG_QSCHST_LOCK +static void +sched_update_deq_sc(sched_elem_t *q, + uint32_t actual, + bool atomic) __attribute__((always_inline)); +static inline void +sched_update_deq_sc(sched_elem_t *q, + uint32_t actual, bool atomic) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + if (atomic) { + ODP_ASSERT(q->qschst.cur_ticket == sched_ts->ticket); + ODP_ASSERT(q->qschst.cur_ticket != q->qschst.nxt_ticket); + q->qschst.numevts -= actual; + q->qschst.cur_ticket = sched_ts->ticket + 1; + if (q->qschst.numevts > 0) + schedq_push(q->schedq, q); + return; + } + + oss = q->qschst; + ticket = TICKET_INVALID; + nss = oss; + nss.numevts -= actual; + nss.wrr_budget -= actual; + if ((oss.numevts > 0 && nss.numevts <= 0) || oss.wrr_budget <= actual) { + /* If we emptied the queue or + * if we have served the maximum number of events + * then we need a ticket for a later pop. + */ + ticket = nss.nxt_ticket++; + /* Also reset wrr_budget as we might also push the + * queue to the schedq. + */ + nss.wrr_budget = CONFIG_WRR_WEIGHT; + } + q->qschst = nss; + + if (ticket != TICKET_INVALID) { + if (oss.numevts > 0 && nss.numevts <= 0) { + /* NE->E transition, need to pop */ + if (!schedq_elem_on_queue(q) || + !schedq_cond_pop(q->schedq, q)) { + /* Queue not at head, failed to dequeue. + * Missed a pop. + */ + q->pop_deficit++; + } + } else { + /* WRR budget exhausted + * Need to move queue to tail of schedq if possible + */ + if (odp_likely(schedq_elem_on_queue(q))) { + /* Queue is on schedq, try to move it to + * the tail + */ + (void)schedq_cond_rotate(q->schedq, q); + } + /* Else queue not on schedq or not at head of schedq + * No pop => no push + */ + } + q->qschst.cur_ticket = ticket + 1; + } +} +#endif + +static inline void sched_update_popd_sc(sched_elem_t *elem) +{ + if (elem->pop_deficit != 0 && + schedq_elem_on_queue(elem) && + schedq_cond_pop(elem->schedq, elem)) + elem->pop_deficit--; +} + +#ifndef CONFIG_QSCHST_LOCK +static inline void sched_update_popd(sched_elem_t *elem) +{ + uint32_t ticket = __atomic_fetch_add(&elem->qschst.nxt_ticket, + 1, + __ATOMIC_RELAXED); + if (odp_unlikely(__atomic_load_n(&elem->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket)) { + sevl(); + while (wfe() && monitor8(&elem->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket) + doze(); + } + sched_update_popd_sc(elem); + atomic_store_release(&elem->qschst.cur_ticket, ticket + 1, + /*readonly=*/false); +} +#endif + +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio) +{ + uint32_t sgi; + sched_group_t *sg; + uint32_t x; + + ODP_ASSERT(grp >= 0 && grp < (odp_schedule_group_t)MAX_SCHED_GROUP); + ODP_ASSERT((sg_free & (1ULL << grp)) == 0); + ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM); + + sgi = grp; + sg = sg_vec[sgi]; + + /* Use xcount to spread queues over the xfactor schedq's + * per priority. + */ + x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED); + if (x == 0) { + /* First ODP queue for this priority + * Notify all threads in sg->thr_wanted that they + * should join. + */ + sched_group_mask_t thrds = sg->thr_wanted; + + while (!bitset_is_null(thrds)) { + uint32_t thr; + + thr = bitset_ffs(thrds) - 1; + thrds = bitset_clr(thrds, thr); + /* Notify the thread about membership in this + * group/priority. + */ + atom_bitset_set(&thread_state[thr].sg_wanted[prio], + sgi, __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, 1, + __ATOMIC_RELEASE); + } + } + return &sg->schedq[prio * sg->xfactor + x % sg->xfactor]; +} + +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio) +{ + uint32_t sgi; + sched_group_t *sg; + uint32_t x; + + ODP_ASSERT(grp >= 0 && grp < (odp_schedule_group_t)MAX_SCHED_GROUP); + ODP_ASSERT((sg_free & (1ULL << grp)) == 0); + ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM); + + sgi = grp; + sg = sg_vec[sgi]; + x = __atomic_sub_fetch(&sg->xcount[prio], 1, __ATOMIC_RELAXED); + + if (x == 0) { + /* Last ODP queue for this priority + * Notify all threads in sg->thr_wanted that they + * should leave. + */ + sched_group_mask_t thrds = sg->thr_wanted; + + while (!bitset_is_null(thrds)) { + uint32_t thr; + + thr = bitset_ffs(thrds) - 1; + thrds = bitset_clr(thrds, thr); + /* Notify the thread about membership in this + * group/priority. + */ + atom_bitset_clr(&thread_state[thr].sg_wanted[prio], + sgi, __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, 1, + __ATOMIC_RELEASE); + } + } +} + +static void update_sg_membership(sched_scalable_thread_state_t *ts) +{ + uint32_t p; + sched_group_mask_t sg_wanted; + sched_group_mask_t added; + sched_group_mask_t removed; + uint32_t sgi; + sched_group_t *sg; + uint32_t x; + + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + sg_wanted = atom_bitset_load(&ts->sg_wanted[p], + __ATOMIC_ACQUIRE); + if (!bitset_is_eql(ts->sg_actual[p], sg_wanted)) { + /* Our sched_group membership has changed */ + added = bitset_andn(sg_wanted, ts->sg_actual[p]); + while (!bitset_is_null(added)) { + sgi = bitset_ffs(added) - 1; + sg = sg_vec[sgi]; + for (x = 0; x < sg->xfactor; x++) { + /* Include our thread index to shift + * (rotate) the order of schedq's + */ + insert_schedq_in_list + (ts, + &sg->schedq[p * sg->xfactor + + (x + ts->tidx) % sg->xfactor]); + } + atom_bitset_set(&sg->thr_actual[p], ts->tidx, + __ATOMIC_RELAXED); + added = bitset_clr(added, sgi); + } + removed = bitset_andn(ts->sg_actual[p], sg_wanted); + while (!bitset_is_null(removed)) { + sgi = bitset_ffs(removed) - 1; + sg = sg_vec[sgi]; + for (x = 0; x < sg->xfactor; x++) { + remove_schedq_from_list + (ts, + &sg->schedq[p * + sg->xfactor + x]); + } + atom_bitset_clr(&sg->thr_actual[p], ts->tidx, + __ATOMIC_RELAXED); + removed = bitset_clr(removed, sgi); + } + ts->sg_actual[p] = sg_wanted; + } + } +} + +/******************************************************************************* + * Scheduler + ******************************************************************************/ + +static inline void _schedule_release_atomic(sched_scalable_thread_state_t *ts) +{ +#ifdef CONFIG_QSCHST_LOCK + sched_update_deq_sc(ts->atomq, ts->dequeued, true); + ODP_ASSERT(ts->atomq->qschst.cur_ticket != ts->ticket); + ODP_ASSERT(ts->atomq->qschst.cur_ticket == + ts->atomq->qschst.nxt_ticket); +#else + sched_update_deq(ts->atomq, ts->dequeued, true); +#endif + ts->atomq = NULL; + ts->ticket = TICKET_INVALID; +} + +static inline void _schedule_release_ordered(sched_scalable_thread_state_t *ts) +{ + ts->out_of_order = false; + rctx_release(ts->rctx); + ts->rctx = NULL; +} + +static void pktin_poll(sched_scalable_thread_state_t *ts) +{ + uint32_t i, tag, hi, npolls = 0; + int pktio_index, queue_index; + + hi = __atomic_load_n(&pktin_hi, __ATOMIC_RELAXED); + if (hi == 0) + return; + + for (i = ts->pktin_next; npolls != hi; i = (i + 1) % hi, npolls++) { + tag = __atomic_load_n(&pktin_tags[i], __ATOMIC_RELAXED); + if (!TAG_IS_READY(tag)) + continue; + if (!__atomic_compare_exchange_n(&pktin_tags[i], &tag, + tag | TAG_BUSY, + true, + __ATOMIC_ACQUIRE, + __ATOMIC_RELAXED)) + continue; + /* Tag grabbed */ + pktio_index = TAG_2_PKTIO(tag); + queue_index = TAG_2_QUEUE(tag); + if (odp_unlikely(sched_cb_pktin_poll(pktio_index, + 1, &queue_index))) { + /* Pktio stopped or closed + * Remove tag from pktin_tags + */ + __atomic_store_n(&pktin_tags[i], + TAG_EMPTY, __ATOMIC_RELAXED); + __atomic_fetch_sub(&pktin_num, + 1, __ATOMIC_RELEASE); + /* Call stop_finalize when all queues + * of the pktio have been removed + */ + if (__atomic_sub_fetch(&pktin_count[pktio_index], 1, + __ATOMIC_RELAXED) == 0) + sched_cb_pktio_stop_finalize(pktio_index); + } else { + /* We don't know whether any packets were found and enqueued + * Write back original tag value to release pktin queue + */ + __atomic_store_n(&pktin_tags[i], tag, __ATOMIC_RELAXED); + /* Do not iterate through all pktin queues every time */ + if ((ts->pktin_poll_cnts & 0xf) != 0) + break; + } + } + ODP_ASSERT(i < hi); + ts->pktin_poll_cnts++; + ts->pktin_next = i; +} + +static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts) +{ + sched_scalable_thread_state_t *ts; + sched_elem_t *atomq; + int num; + uint32_t i; + + ts = sched_ts; + atomq = ts->atomq; + + /* Once an atomic queue has been scheduled to a thread, it will stay + * on that thread until empty or 'rotated' by WRR + */ + if (atomq != NULL) { + ODP_ASSERT(ts->ticket != TICKET_INVALID); +#ifdef CONFIG_QSCHST_LOCK + LOCK(&atomq->qschlock); +#endif +dequeue_atomic: + ODP_ASSERT(ts->ticket == atomq->qschst.cur_ticket); + ODP_ASSERT(ts->ticket != atomq->qschst.nxt_ticket); + /* Atomic queues can be dequeued without lock since this thread + * has the only reference to the atomic queue being processed. + */ + if (ts->dequeued < atomq->qschst.wrr_budget) { + num = _odp_queue_deq_sc(atomq, ev, num_evts); + if (odp_likely(num != 0)) { +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + ts->dequeued += num; + /* Allow this thread to continue to 'own' this + * atomic queue until all events have been + * processed and the thread re-invokes the + * scheduler. + */ + if (from) + *from = queue_get_handle( + (queue_entry_t *)atomq); + return num; + } + } + /* Atomic queue was empty or interrupted by WRR, release it. */ + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } + + /* Release any previous reorder context. */ + if (ts->rctx != NULL) + _schedule_release_ordered(ts); + + /* Check for and perform any scheduler group updates. */ + if (odp_unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0)) { + (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE); + ts->sg_sem = 0; + update_sg_membership(ts); + } + + /* Scan our schedq list from beginning to end */ + for (i = 0; i < ts->num_schedq; i++) { + sched_queue_t *schedq = ts->schedq_list[i]; + sched_elem_t *elem; +restart_same: + elem = schedq_peek(schedq); + if (odp_unlikely(elem == NULL)) { + /* Schedq empty, look at next one. */ + continue; + } + + if (elem->cons_type == ODP_SCHED_SYNC_ATOMIC) { + /* Dequeue element only if it is still at head + * of schedq. + */ + if (odp_unlikely(!schedq_cond_pop(schedq, elem))) { + /* Queue not at head of schedq anymore, some + * other thread popped it. + */ + goto restart_same; + } + ts->atomq = elem; + atomq = elem; + ts->dequeued = 0; +#ifdef CONFIG_QSCHST_LOCK + LOCK(&atomq->qschlock); + ts->ticket = atomq->qschst.nxt_ticket++; + ODP_ASSERT(atomq->qschst.cur_ticket == ts->ticket); +#else + /* Dequeued atomic queue from the schedq, only we + * can process it and any qschst updates are our + * responsibility. + */ + /* The ticket taken below will signal producers */ + ts->ticket = __atomic_fetch_add( + &atomq->qschst.nxt_ticket, 1, __ATOMIC_RELAXED); + while (__atomic_load_n( + &atomq->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ts->ticket) { + /* No need to use WFE, spinning here seems + * very infrequent. + */ + odp_cpu_pause(); + } +#endif + goto dequeue_atomic; + } else if (elem->cons_type == ODP_SCHED_SYNC_PARALLEL) { +#ifdef CONFIG_QSCHST_LOCK + LOCK(&elem->qschlock); + num = _odp_queue_deq_sc(elem, ev, num_evts); + if (odp_likely(num != 0)) { + sched_update_deq_sc(elem, num, false); + UNLOCK(&elem->qschlock); + if (from) + *from = + queue_get_handle((queue_entry_t *)elem); + return num; + } + UNLOCK(&elem->qschlock); +#else + num = _odp_queue_deq_mc(elem, ev, num_evts); + if (odp_likely(num != 0)) { + sched_update_deq(elem, num, false); + if (from) + *from = + queue_get_handle((queue_entry_t *)elem); + return num; + } +#endif + } else if (elem->cons_type == ODP_SCHED_SYNC_ORDERED) { + reorder_window_t *rwin; + reorder_context_t *rctx; + uint32_t sn; + uint32_t idx; + + /* The ordered queue has a reorder window so requires + * order restoration. We must use a reorder context to + * collect all outgoing events. Ensure there is at least + * one available reorder context. + */ + if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) { + ts->priv_rvec_free = atom_bitset_xchg( + &ts->rvec_free, 0, + __ATOMIC_RELAXED); + if (odp_unlikely(bitset_is_null( + ts->priv_rvec_free))) { + /* No free reorder contexts for + * this thread. Look at next schedq, + * hope we find non-ordered queue. + */ + continue; + } + } + /* rwin_reserve and odp_queue_deq must be atomic or + * there will be a potential race condition. + * Allocate a slot in the reorder window. + */ + rwin = queue_get_rwin((queue_entry_t *)elem); + ODP_ASSERT(rwin != NULL); + if (odp_unlikely(!rwin_reserve(rwin, &sn))) { + /* Reorder window full */ + /* Look at next schedq, find other queue */ + continue; + } + /* Wait for our turn to dequeue */ + if (odp_unlikely(__atomic_load_n(&rwin->turn, + __ATOMIC_ACQUIRE) + != sn)) { + sevl(); + while (wfe() && + monitor32(&rwin->turn, __ATOMIC_ACQUIRE) + != sn) + doze(); + } +#ifdef CONFIG_QSCHST_LOCK + LOCK(&elem->qschlock); +#endif + num = _odp_queue_deq_sc(elem, ev, num_evts); + /* Wait for prod_read write in _odp_queue_dequeue_sc() + * to complete before we signal the next consumer + */ + atomic_store_release(&rwin->turn, sn + 1, + /*readonly=*/false); + /* Find and initialise an unused reorder context. */ + idx = bitset_ffs(ts->priv_rvec_free) - 1; + ts->priv_rvec_free = + bitset_clr(ts->priv_rvec_free, idx); + rctx = &ts->rvec[idx]; + /* Need to initialise reorder context or we can't + * release it later. + */ + rctx_init(rctx, idx, rwin, sn); + + /* Was dequeue successful? */ + if (odp_likely(num != 0)) { + /* Perform scheduler related updates */ +#ifdef CONFIG_QSCHST_LOCK + sched_update_deq_sc(elem, num, + /*atomic=*/false); + UNLOCK(&elem->qschlock); +#else + sched_update_deq(elem, num, /*atomic=*/false); +#endif + + /* Are we in-order or out-of-order? */ + ts->out_of_order = sn != rwin->hc.head; + + ts->rctx = rctx; + if (from) + *from = queue_get_handle( + (queue_entry_t *)elem); + return num; + } +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&elem->qschlock); +#endif + /* Since a slot was reserved in the reorder window, + * the reorder context needs to be released and + * inserted into the reorder window. + */ + rctx_release(rctx); + ODP_ASSERT(ts->rctx == NULL); + } + /* Dequeue from parallel/ordered queue failed + * Check if we have a queue at the head of the schedq that needs + * to be popped + */ + if (odp_unlikely(__atomic_load_n(&elem->pop_deficit, + __ATOMIC_RELAXED) != 0)) { +#ifdef CONFIG_QSCHST_LOCK + LOCK(&elem->qschlock); + sched_update_popd_sc(elem); + UNLOCK(&elem->qschlock); +#else + sched_update_popd(elem); +#endif + } + } + + pktin_poll(ts); + return 0; +} + +/******************************************************************************/ + +static void schedule_order_lock(unsigned lock_index) +{ + struct reorder_context *rctx = sched_ts->rctx; + + if (odp_unlikely(rctx == NULL || + rctx->rwin == NULL || + lock_index >= rctx->rwin->lock_count)) { + ODP_ERR("Invalid call to odp_schedule_order_lock\n"); + return; + } + if (odp_unlikely(__atomic_load_n(&rctx->rwin->olock[lock_index], + __ATOMIC_ACQUIRE) != rctx->sn)) { + sevl(); + while (wfe() && + monitor32(&rctx->rwin->olock[lock_index], + __ATOMIC_ACQUIRE) != rctx->sn) + doze(); + } +} + +static void schedule_order_unlock(unsigned lock_index) +{ + struct reorder_context *rctx; + + rctx = sched_ts->rctx; + if (odp_unlikely(rctx == NULL || + rctx->rwin == NULL || + lock_index >= rctx->rwin->lock_count || + rctx->rwin->olock[lock_index] != rctx->sn)) { + ODP_ERR("Invalid call to odp_schedule_order_unlock\n"); + return; + } + atomic_store_release(&rctx->rwin->olock[lock_index], + rctx->sn + 1, + /*readonly=*/false); + rctx->olock_flags |= 1U << lock_index; +} + +static void schedule_release_atomic(void) +{ + sched_scalable_thread_state_t *ts; + + ts = sched_ts; + if (odp_likely(ts->atomq != NULL)) { +#ifdef CONFIG_QSCHST_LOCK + sched_elem_t *atomq; + + atomq = ts->atomq; + LOCK(&atomq->qschlock); +#endif + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } +} + +static void schedule_release_ordered(void) +{ + sched_scalable_thread_state_t *ts; + + ts = sched_ts; + if (ts->rctx != NULL) + _schedule_release_ordered(ts); +} + +static int schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t ev[], + int num) +{ + sched_scalable_thread_state_t *ts; + int n; + odp_time_t start; + odp_time_t delta; + odp_time_t deadline; + + ts = sched_ts; + if (odp_unlikely(ts->pause)) { + if (ts->atomq != NULL) { +#ifdef CONFIG_QSCHST_LOCK + sched_elem_t *atomq; + + atomq = ts->atomq; + LOCK(&atomq->qschlock); +#endif + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } else if (ts->rctx != NULL) { + _schedule_release_ordered(ts); + } + return 0; + } + + if (wait == ODP_SCHED_NO_WAIT) + return _schedule(from, ev, num); + + if (wait == ODP_SCHED_WAIT) { + for (;;) { + n = _schedule(from, ev, num); + if (odp_likely(n > 0)) + return n; + } + } + + start = odp_time_local(); + + n = _schedule(from, ev, num); + if (odp_likely(n > 0)) + return n; + + delta = odp_time_local_from_ns(wait); + deadline = odp_time_sum(start, delta); + + while (odp_time_cmp(deadline, odp_time_local()) > 0) { + n = _schedule(from, ev, num); + if (odp_likely(n > 0)) + return n; + } + + return 0; +} + +static odp_event_t schedule(odp_queue_t *from, uint64_t wait) +{ + odp_event_t ev = ODP_EVENT_INVALID; + const int num = 1; + sched_scalable_thread_state_t *ts; + int n; + odp_time_t start; + odp_time_t delta; + odp_time_t deadline; + + ts = sched_ts; + if (odp_unlikely(ts->pause)) { + if (ts->atomq != NULL) { +#ifdef CONFIG_QSCHST_LOCK + sched_elem_t *atomq; + + atomq = ts->atomq; + LOCK(&atomq->qschlock); +#endif + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } else if (ts->rctx != NULL) { + _schedule_release_ordered(ts); + } + return ev; + } + + if (wait == ODP_SCHED_NO_WAIT) { + (void)_schedule(from, &ev, num); + return ev; + } + + if (wait == ODP_SCHED_WAIT) { + for (;;) { + n = _schedule(from, &ev, num); + if (odp_likely(n > 0)) + return ev; + } + } + + start = odp_time_local(); + + n = _schedule(from, &ev, num); + if (odp_likely(n > 0)) + return ev; + + delta = odp_time_local_from_ns(wait); + deadline = odp_time_sum(start, delta); + + while (odp_time_cmp(deadline, odp_time_local()) > 0) { + n = _schedule(from, &ev, num); + if (odp_likely(n > 0)) + return ev; + } + + return ev; +} + +static void schedule_pause(void) +{ + sched_ts->pause = true; +} + +static void schedule_resume(void) +{ + sched_ts->pause = false; +} + +static uint64_t schedule_wait_time(uint64_t ns) +{ + return ns; +} + +static int schedule_num_prio(void) +{ + return ODP_SCHED_PRIO_NUM; +} + +static int schedule_group_update(sched_group_t *sg, + uint32_t sgi, + const odp_thrmask_t *mask, + int join_leave) +{ + int thr; + uint32_t p; + + /* Internal function, do not validate inputs */ + + /* Notify relevant threads about the change */ + thr = odp_thrmask_first(mask); + while (0 <= thr) { + /* Add thread to scheduler group's wanted thread mask */ + if (join_leave == SCHED_GROUP_JOIN) + atom_bitset_set(&sg->thr_wanted, thr, __ATOMIC_RELAXED); + else + atom_bitset_clr(&sg->thr_wanted, thr, __ATOMIC_RELAXED); + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + if (sg->xcount[p] != 0) { + /* This priority level has ODP queues + * Notify the thread about membership in + * this group/priority + */ + if (join_leave == SCHED_GROUP_JOIN) + atom_bitset_set( + &thread_state[thr].sg_wanted[p], + sgi, + __ATOMIC_RELEASE); + else + atom_bitset_clr( + &thread_state[thr].sg_wanted[p], + sgi, + __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, + 1, + __ATOMIC_RELEASE); + } + } + thr = odp_thrmask_next(mask, thr); + } + + return 0; +} + +static int _schedule_group_thrmask(sched_group_t *sg, odp_thrmask_t *mask) +{ + bitset_t bs; + uint32_t bit; + + /* Internal function, do not validate inputs */ + + odp_thrmask_zero(mask); + bs = sg->thr_wanted; + while (!bitset_is_null(bs)) { + bit = bitset_ffs(bs) - 1; + bs = bitset_clr(bs, bit); + odp_thrmask_set(mask, bit); + } + + return 0; +} + +static odp_schedule_group_t schedule_group_create(const char *name, + const odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_mask_t free; + uint32_t xfactor; + sched_group_t *sg; + uint32_t p; + uint32_t x; + uint32_t size; + + /* Validate inputs */ + if (mask == NULL) + ODP_ABORT("mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + /* Allocate a scheduler group */ + free = atom_bitset_load(&sg_free, __ATOMIC_RELAXED); + do { + /* All sched_groups in use */ + if (bitset_is_null(free)) + goto no_free_sched_group; + + sgi = bitset_ffs(free) - 1; + /* All sched_groups in use */ + if (sgi >= MAX_SCHED_GROUP) + goto no_free_sched_group; + } while (!atom_bitset_cmpxchg(&sg_free, + &free, + bitset_clr(free, sgi), + true, + __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE)); + + /* Compute xfactor (spread factor) from the number of threads + * present in the thread mask. Preferable this would be an + * explicit parameter. + */ + xfactor = odp_thrmask_count(mask); + if (xfactor < 1) + xfactor = CONFIG_DEFAULT_XFACTOR; + + size = sizeof(sched_group_t) + + (ODP_SCHED_PRIO_NUM * xfactor - 1) * sizeof(sched_queue_t); + sg = (sched_group_t *)shm_pool_alloc_align(sched_shm_pool, size); + if (sg == NULL) + goto shm_pool_alloc_failed; + + strncpy(sg->name, name ? name : "", ODP_SCHED_GROUP_NAME_LEN - 1); + sg_vec[sgi] = sg; + memset(sg->thr_actual, 0, sizeof(sg->thr_actual)); + sg->thr_wanted = bitset_null(); + sg->xfactor = xfactor; + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + sg->xcount[p] = 0; + for (x = 0; x < xfactor; x++) + schedq_init(&sg->schedq[p * xfactor + x], p); + } + if (odp_thrmask_count(mask) != 0) + schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN); + + odp_spinlock_unlock(&sched_grp_lock); + + return (odp_schedule_group_t)(sgi); + +shm_pool_alloc_failed: + /* Free the allocated group index */ + atom_bitset_set(&sg_free, sgi, __ATOMIC_RELAXED); + +no_free_sched_group: + odp_spinlock_unlock(&sched_grp_lock); + + return ODP_SCHED_GROUP_INVALID; +} + +static int schedule_group_destroy(odp_schedule_group_t group) +{ + uint32_t sgi; + sched_group_t *sg; + uint32_t p; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= (odp_schedule_group_t)MAX_SCHED_GROUP) { + ret = -1; + goto invalid_group; + } + + if (sched_ts && + odp_unlikely(__atomic_load_n(&sched_ts->sg_sem, + __ATOMIC_RELAXED) != 0)) { + (void)__atomic_load_n(&sched_ts->sg_sem, + __ATOMIC_ACQUIRE); + sched_ts->sg_sem = 0; + update_sg_membership(sched_ts); + } + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + /* First ensure all threads have processed group_join/group_leave + * requests. + */ + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + if (sg->xcount[p] != 0) { + bitset_t wanted = atom_bitset_load( + &sg->thr_wanted, __ATOMIC_RELAXED); + + sevl(); + while (wfe() && + !bitset_is_eql(wanted, + bitset_monitor(&sg->thr_actual[p], + __ATOMIC_RELAXED))) + doze(); + } + /* Else ignore because no ODP queues on this prio */ + } + + /* Check if all threads/queues have left the group */ + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + if (!bitset_is_null(sg->thr_actual[p])) { + ODP_ERR("Group has threads\n"); + ret = -1; + goto thrd_q_present_in_group; + } + if (sg->xcount[p] != 0) { + ODP_ERR("Group has queues\n"); + ret = -1; + goto thrd_q_present_in_group; + } + } + + _odp_ishm_pool_free(sched_shm_pool, sg); + sg_vec[sgi] = NULL; + atom_bitset_set(&sg_free, sgi, __ATOMIC_RELEASE); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +thrd_q_present_in_group: + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + + return ret; +} + +static odp_schedule_group_t schedule_group_lookup(const char *name) +{ + uint32_t sgi; + odp_schedule_group_t group; + + /* Validate inputs */ + if (name == NULL) + ODP_ABORT("name or mask is NULL\n"); + + group = ODP_SCHED_GROUP_INVALID; + + odp_spinlock_lock(&sched_grp_lock); + + /* Scan through the schedule group array */ + for (sgi = 0; sgi < MAX_SCHED_GROUP; sgi++) { + if ((sg_vec[sgi] != NULL) && + (strncmp(name, sg_vec[sgi]->name, + ODP_SCHED_GROUP_NAME_LEN) == 0)) { + group = (odp_schedule_group_t)sgi; + break; + } + } + + odp_spinlock_unlock(&sched_grp_lock); + + return group; +} + +static int schedule_group_join(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_t *sg; + int ret; + + /* Validate inputs */ + if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) + return -1; + + if (mask == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + odp_spinlock_unlock(&sched_grp_lock); + return -1; + } + + sg = sg_vec[sgi]; + ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; +} + +static int schedule_group_leave(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_t *sg; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= (odp_schedule_group_t)MAX_SCHED_GROUP) { + ret = -1; + goto invalid_group; + } + + if (mask == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + + ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_LEAVE); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + return ret; +} + +static int schedule_group_thrmask(odp_schedule_group_t group, + odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_t *sg; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) { + ret = -1; + goto invalid_group; + } + + if (mask == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + ret = _schedule_group_thrmask(sg, mask); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + return ret; +} + +static int schedule_group_info(odp_schedule_group_t group, + odp_schedule_group_info_t *info) +{ + uint32_t sgi; + sched_group_t *sg; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) { + ret = -1; + goto invalid_group; + } + + if (info == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + + ret = _schedule_group_thrmask(sg, &info->thrmask); + + info->name = sg->name; + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + return ret; +} + +static int schedule_init_global(void) +{ + odp_thrmask_t mask; + odp_schedule_group_t tmp_all; + odp_schedule_group_t tmp_wrkr; + odp_schedule_group_t tmp_ctrl; + uint32_t bits; + uint32_t pool_size; + uint64_t min_alloc; + uint64_t max_alloc; + + /* Attach to the pool if it exists */ + sched_shm_pool = _odp_ishm_pool_lookup("sched_shm_pool"); + if (sched_shm_pool == NULL) { + /* Add storage required for sched groups. Assume worst case + * xfactor of MAXTHREADS. + */ + pool_size = (sizeof(sched_group_t) + + (ODP_SCHED_PRIO_NUM * MAXTHREADS - 1) * + sizeof(sched_queue_t)) * MAX_SCHED_GROUP; + /* Choose min_alloc and max_alloc such that slab allocator + * is selected. + */ + min_alloc = sizeof(sched_group_t) + + (ODP_SCHED_PRIO_NUM * MAXTHREADS - 1) * + sizeof(sched_queue_t); + max_alloc = min_alloc; + sched_shm_pool = _odp_ishm_pool_create("sched_shm_pool", + pool_size, + min_alloc, max_alloc, + _ODP_ISHM_SINGLE_VA); + if (sched_shm_pool == NULL) { + ODP_ERR("Failed to allocate shared memory pool " + "for sched\n"); + goto failed_sched_shm_pool_create; + } + } + + odp_spinlock_init(&sched_grp_lock); + + bits = MAX_SCHED_GROUP; + if (MAX_SCHED_GROUP == sizeof(sg_free) * CHAR_BIT) + sg_free = ~0; + else + sg_free = (1 << bits) - 1; + + for (uint32_t i = 0; i < MAX_SCHED_GROUP; i++) + sg_vec[i] = NULL; + for (uint32_t i = 0; i < MAXTHREADS; i++) { + thread_state[i].sg_sem = 0; + for (uint32_t j = 0; j < ODP_SCHED_PRIO_NUM; j++) + thread_state[i].sg_wanted[j] = bitset_null(); + } + + /* Create sched groups for default GROUP_ALL, GROUP_WORKER and + * GROUP_CONTROL groups. + */ + odp_thrmask_zero(&mask); + tmp_all = odp_schedule_group_create("__group_all", &mask); + if (tmp_all != ODP_SCHED_GROUP_ALL) { + ODP_ERR("Could not create ODP_SCHED_GROUP_ALL()\n"); + goto failed_create_group_all; + } + + tmp_wrkr = odp_schedule_group_create("__group_worker", &mask); + if (tmp_wrkr != ODP_SCHED_GROUP_WORKER) { + ODP_ERR("Could not create ODP_SCHED_GROUP_WORKER()\n"); + goto failed_create_group_worker; + } + + tmp_ctrl = odp_schedule_group_create("__group_control", &mask); + if (tmp_ctrl != ODP_SCHED_GROUP_CONTROL) { + ODP_ERR("Could not create ODP_SCHED_GROUP_CONTROL()\n"); + goto failed_create_group_control; + } + + return 0; + +failed_create_group_control: + if (tmp_ctrl != ODP_SCHED_GROUP_INVALID) + odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL); + +failed_create_group_worker: + if (tmp_wrkr != ODP_SCHED_GROUP_INVALID) + odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER); + +failed_create_group_all: + if (tmp_all != ODP_SCHED_GROUP_INVALID) + odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL); + +failed_sched_shm_pool_create: + + return -1; +} + +static int schedule_term_global(void) +{ + /* Destroy sched groups for default GROUP_ALL, GROUP_WORKER and + * GROUP_CONTROL groups. + */ + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL) != 0) + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_ALL\n"); + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER) != 0) + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_WORKER\n"); + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL) != 0) + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_CONTROL\n"); + + _odp_ishm_pool_destroy(sched_shm_pool); + + return 0; +} + +static int schedule_init_local(void) +{ + int thr_id; + odp_thread_type_t thr_type; + odp_thrmask_t mask; + + thr_id = odp_thread_id(); + if (thread_state_init(thr_id)) + goto failed_to_init_ts; + + /* Add this thread to default schedule groups */ + thr_type = odp_thread_type(); + odp_thrmask_zero(&mask); + odp_thrmask_set(&mask, thr_id); + + if (odp_schedule_group_join(ODP_SCHED_GROUP_ALL, &mask) != 0) { + ODP_ERR("Failed to join ODP_SCHED_GROUP_ALL\n"); + goto failed_to_join_grp_all; + } + if (thr_type == ODP_THREAD_CONTROL) { + if (odp_schedule_group_join(ODP_SCHED_GROUP_CONTROL, + &mask) != 0) { + ODP_ERR("Failed to join ODP_SCHED_GROUP_CONTROL\n"); + goto failed_to_join_grp_ctrl; + } + } else { + if (odp_schedule_group_join(ODP_SCHED_GROUP_WORKER, + &mask) != 0) { + ODP_ERR("Failed to join ODP_SCHED_GROUP_WORKER\n"); + goto failed_to_join_grp_wrkr; + } + } + + return 0; + +failed_to_join_grp_wrkr: + +failed_to_join_grp_ctrl: + odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask); + +failed_to_join_grp_all: +failed_to_init_ts: + + return -1; +} + +static int schedule_term_local(void) +{ + int thr_id; + odp_thread_type_t thr_type; + odp_thrmask_t mask; + int rc = 0; + + /* Remove this thread from default schedule groups */ + thr_id = odp_thread_id(); + thr_type = odp_thread_type(); + odp_thrmask_zero(&mask); + odp_thrmask_set(&mask, thr_id); + + if (odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask) != 0) + ODP_ERR("Failed to leave ODP_SCHED_GROUP_ALL\n"); + if (thr_type == ODP_THREAD_CONTROL) { + if (odp_schedule_group_leave(ODP_SCHED_GROUP_CONTROL, + &mask) != 0) + ODP_ERR("Failed to leave ODP_SCHED_GROUP_CONTROL\n"); + } else { + if (odp_schedule_group_leave(ODP_SCHED_GROUP_WORKER, + &mask) != 0) + ODP_ERR("Failed to leave ODP_SCHED_GROUP_WORKER\n"); + } + + update_sg_membership(sched_ts); + + /* Check if the thread is still part of any groups */ + if (sched_ts->num_schedq != 0) { + ODP_ERR("Thread %d still part of scheduler group(s)\n", + sched_ts->tidx); + rc = -1; + } + + return rc; +} + +static void pktio_start(int pktio_index, int num_in_queue, int in_queue_idx[]) +{ + int i; + uint32_t old, tag, j; + + for (i = 0; i < num_in_queue; i++) { + /* Try to reserve a slot */ + if (__atomic_fetch_add(&pktin_num, + 1, __ATOMIC_RELAXED) >= PKTIN_MAX) { + __atomic_fetch_sub(&pktin_num, 1, __ATOMIC_RELAXED); + ODP_ABORT("Too many pktio in queues for scheduler\n"); + } + /* A slot has been reserved, now we need to find an empty one */ + for (j = 0; ; j = (j + 1) % PKTIN_MAX) { + if (__atomic_load_n(&pktin_tags[j], + __ATOMIC_RELAXED) != TAG_EMPTY) + /* Slot used, continue with next */ + continue; + /* Empty slot found */ + old = TAG_EMPTY; + tag = PKTIO_QUEUE_2_TAG(pktio_index, in_queue_idx[i]); + if (__atomic_compare_exchange_n(&pktin_tags[j], + &old, + tag, + true, + __ATOMIC_RELEASE, + __ATOMIC_RELAXED)) { + /* Success grabbing slot,update high + * watermark + */ + __atomic_fetch_max(&pktin_hi, + j + 1, __ATOMIC_RELAXED); + /* One more tag (queue) for this pktio + * instance + */ + __atomic_fetch_add(&pktin_count[pktio_index], + 1, __ATOMIC_RELAXED); + /* Continue with next RX queue */ + break; + } + /* Failed to grab slot */ + } + } +} + +static int num_grps(void) +{ + return MAX_SCHED_GROUP; +} + +/* + * Stubs for internal scheduler abstraction layer due to absence of NULL + * checking before calling the function pointer. + */ + +static int thr_add(odp_schedule_group_t group, int thr) +{ + /* This function is a schedule_init_local duplicate. */ + (void)group; + (void)thr; + return 0; +} + +static int thr_rem(odp_schedule_group_t group, int thr) +{ + /* This function is a schedule_term_local duplicate. */ + (void)group; + (void)thr; + return 0; +} + +static int init_queue(uint32_t queue_index, + const odp_schedule_param_t *sched_param) +{ + /* Not used in scalable scheduler. */ + (void)queue_index; + (void)sched_param; + return 0; +} + +static void destroy_queue(uint32_t queue_index) +{ + /* Not used in scalable scheduler. */ + (void)queue_index; +} + +static int sched_queue(uint32_t queue_index) +{ + /* Not used in scalable scheduler. */ + (void)queue_index; + return 0; +} + +static int ord_enq_multi(queue_t handle, void *buf_hdr[], int num, + int *ret) + +{ + queue_entry_t *queue; + sched_scalable_thread_state_t *ts; + int actual; + + ts = sched_ts; + if (ts && odp_unlikely(ts->out_of_order)) { + queue = qentry_from_int(handle); + actual = rctx_save(queue, (odp_buffer_hdr_t **)buf_hdr, num); + *ret = actual; + return 1; + } + return 0; +} + +static void schedule_prefetch(int num) +{ + (void)num; +} + +/* Wait until we are in-order (when processing an ordered queue) + * Note: this function may be called also when processing other queue types + */ +static void order_lock(void) +{ + sched_scalable_thread_state_t *ts; + reorder_window_t *rwin; + uint32_t sn; + + ts = sched_ts; + if (odp_unlikely(ts->out_of_order)) { + /* We are processing ordered queue and are currently + * out-of-order. + * We are in-order when our reorder window slot number (sn) + * equals the head of the reorder window. + */ + ODP_ASSERT(ts->rctx != NULL); + rwin = ts->rctx->rwin; + sn = ts->rctx->sn; + sevl(); + /* Use acquire ordering to be on the safe side even if + * this isn't an acquire/release situation (aka lock). + */ + while (wfe() && + monitor32(&rwin->hc.head, __ATOMIC_ACQUIRE) != sn) + doze(); + } +} + +/* This function is unnecessary. + * The next thread becomes in-order when we release our reorder context + * (i.e. when odp_schedule() is called again. + */ +static void order_unlock(void) +{ +} + +static unsigned schedule_max_ordered_locks(void) +{ + return CONFIG_QUEUE_MAX_ORD_LOCKS; +} + +const schedule_fn_t schedule_scalable_fn = { + .pktio_start = pktio_start, + .thr_add = thr_add, + .thr_rem = thr_rem, + .num_grps = num_grps, + .init_queue = init_queue, + .destroy_queue = destroy_queue, + .sched_queue = sched_queue, + .ord_enq_multi = ord_enq_multi, + .init_global = schedule_init_global, + .term_global = schedule_term_global, + .init_local = schedule_init_local, + .term_local = schedule_term_local, + .order_lock = order_lock, + .order_unlock = order_unlock, + .max_ordered_locks = schedule_max_ordered_locks, +}; + +const schedule_api_t schedule_scalable_api = { + .schedule_wait_time = schedule_wait_time, + .schedule = schedule, + .schedule_multi = schedule_multi, + .schedule_pause = schedule_pause, + .schedule_resume = schedule_resume, + .schedule_release_atomic = schedule_release_atomic, + .schedule_release_ordered = schedule_release_ordered, + .schedule_prefetch = schedule_prefetch, + .schedule_num_prio = schedule_num_prio, + .schedule_group_create = schedule_group_create, + .schedule_group_destroy = schedule_group_destroy, + .schedule_group_lookup = schedule_group_lookup, + .schedule_group_join = schedule_group_join, + .schedule_group_leave = schedule_group_leave, + .schedule_group_thrmask = schedule_group_thrmask, + .schedule_group_info = schedule_group_info, + .schedule_order_lock = schedule_order_lock, + .schedule_order_unlock = schedule_order_unlock, +}; diff --git a/platform/linux-generic/odp_schedule_scalable_ordered.c b/platform/linux-generic/odp_schedule_scalable_ordered.c new file mode 100644 index 00000000..90ddb61c --- /dev/null +++ b/platform/linux-generic/odp_schedule_scalable_ordered.c @@ -0,0 +1,345 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <odp/api/shared_memory.h> +#include <odp_queue_scalable_internal.h> +#include <odp_schedule_if.h> +#include <odp_bitset.h> + +#include <string.h> + +extern __thread sched_scalable_thread_state_t *sched_ts; + +reorder_window_t *rwin_alloc(_odp_ishm_pool_t *pool, unsigned lock_count) +{ + reorder_window_t *rwin; + uint32_t i; + + rwin = (reorder_window_t *) + shm_pool_alloc_align(pool, sizeof(reorder_window_t)); + if (rwin == NULL) + return NULL; + + rwin->hc.head = 0; + rwin->hc.chgi = 0; + rwin->winmask = RWIN_SIZE - 1; + rwin->tail = 0; + rwin->turn = 0; + rwin->lock_count = (uint16_t)lock_count; + memset(rwin->olock, 0, sizeof(rwin->olock)); + for (i = 0; i < RWIN_SIZE; i++) + rwin->ring[i] = NULL; + + return rwin; +} + +int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin) +{ + return _odp_ishm_pool_free(pool, rwin); +} + +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn) +{ + uint32_t head; + uint32_t oldt; + uint32_t newt; + uint32_t winmask; + + /* Read head and tail separately */ + oldt = rwin->tail; + winmask = rwin->winmask; + do { + /* Need __atomic_load to avoid compiler reordering */ + head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED); + if (odp_unlikely(oldt - head >= winmask)) + return false; + + newt = oldt + 1; + } while (!__atomic_compare_exchange(&rwin->tail, + &oldt, + &newt, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + *sn = oldt; + + return true; +} + +void rwin_insert(reorder_window_t *rwin, + reorder_context_t *rctx, + uint32_t sn, + void (*callback)(reorder_context_t *)) +{ + /* Initialise to silence scan-build */ + hc_t old = {0, 0}; + hc_t new; + uint32_t winmask; + + __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE); + winmask = rwin->winmask; + if (old.head != sn) { + /* We are out-of-order. Store context in reorder window, + * releasing its content. + */ + ODP_ASSERT(rwin->ring[sn & winmask] == NULL); + atomic_store_release(&rwin->ring[sn & winmask], + rctx, + /*readonly=*/false); + rctx = NULL; + do { + hc_t new; + + new.head = old.head; + new.chgi = old.chgi + 1; /* Changed value */ + /* Update head & chgi, fail if any has changed */ + if (__atomic_compare_exchange(&rwin->hc, + /* Updated on fail */ + &old, + &new, + true, + /* Rel our ring update */ + __ATOMIC_RELEASE, + __ATOMIC_ACQUIRE)) + /* CAS succeeded => head same (we are not + * in-order), chgi updated. + */ + return; + /* CAS failed => head and/or chgi changed. + * We might not be out-of-order anymore. + */ + } while (old.head != sn); + } + + /* old.head == sn => we are now in-order! */ + ODP_ASSERT(old.head == sn); + /* We are in-order so our responsibility to retire contexts */ + new.head = old.head; + new.chgi = old.chgi + 1; + + /* Retire our in-order context (if we still have it) */ + if (rctx != NULL) { + callback(rctx); + new.head++; + } + + /* Retire in-order contexts in the ring + * The first context might actually be ours (if we were originally + * out-of-order) + */ + do { + for (;;) { + rctx = __atomic_load_n(&rwin->ring[new.head & winmask], + __ATOMIC_ACQUIRE); + if (rctx == NULL) + break; + /* We are the only thread that are in-order + * (until head updated) so don't have to use + * atomic load-and-clear (exchange) + */ + rwin->ring[new.head & winmask] = NULL; + callback(rctx); + new.head++; + } + /* Update head&chgi, fail if chgi has changed (head cannot change) */ + } while (!__atomic_compare_exchange(&rwin->hc, + &old, /* Updated on failure */ + &new, + false, /* weak */ + __ATOMIC_RELEASE, /* Release our ring updates */ + __ATOMIC_ACQUIRE)); +} + +void rctx_init(reorder_context_t *rctx, uint16_t idx, + reorder_window_t *rwin, uint32_t sn) +{ + /* rctx->rvec_free and rctx->idx already initialised in + * thread_state_init function. + */ + ODP_ASSERT(rctx->idx == idx); + rctx->rwin = rwin; + rctx->sn = sn; + rctx->olock_flags = 0; + /* First => no next reorder context */ + rctx->next_idx = idx; + /* Where to store next event */ + rctx->cur_idx = idx; + rctx->numevts = 0; +} + +inline void rctx_free(const reorder_context_t *rctx) +{ + const reorder_context_t *const base = &rctx[-(int)rctx->idx]; + const uint32_t first = rctx->idx; + uint32_t next_idx; + + next_idx = rctx->next_idx; + + ODP_ASSERT(rctx->rwin != NULL); + /* Set free bit */ + if (rctx->rvec_free == &sched_ts->rvec_free) + /* Since it is our own reorder context, we can instead + * perform a non-atomic and relaxed update on our private + * rvec_free. + */ + sched_ts->priv_rvec_free = + bitset_set(sched_ts->priv_rvec_free, rctx->idx); + else + atom_bitset_set(rctx->rvec_free, rctx->idx, __ATOMIC_RELEASE); + + /* Can't dereference rctx after the corresponding free bit is set */ + while (next_idx != first) { + rctx = &base[next_idx]; + next_idx = rctx->next_idx; + /* Set free bit */ + if (rctx->rvec_free == &sched_ts->rvec_free) + sched_ts->priv_rvec_free = + bitset_set(sched_ts->priv_rvec_free, rctx->idx); + else + atom_bitset_set(rctx->rvec_free, rctx->idx, + __ATOMIC_RELEASE); + } +} + +inline void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin, + uint32_t lock_index) +{ + if ((rctx->olock_flags & (1U << lock_index)) == 0) { + /* Use relaxed ordering, we are not releasing any updates */ + rwin->olock[lock_index] = rctx->sn + 1; + } +} + +void olock_release(const reorder_context_t *rctx) +{ + reorder_window_t *rwin; + int i; + + rwin = rctx->rwin; + + for (i = 0; i < rwin->lock_count; i++) + olock_unlock(rctx, rwin, i); +} + +static void blocking_enqueue(queue_entry_t *q, odp_buffer_hdr_t **evts, int num) +{ + int actual; + + /* Iterate until all events have been successfully enqueued */ + for (;;) { + /* Attempt to enqueue remaining events */ + actual = q->s.enqueue_multi(qentry_to_int(q), evts, num); + if (odp_unlikely(actual < 0)) + ODP_ERR("Failed to enqueue deferred events\n"); + /* Update for potential partial success */ + evts += actual; + num -= actual; + if (num == 0) + break; + /* Back-off to decrease load on the system */ + odp_cpu_pause(); + } +} + +void rctx_retire(reorder_context_t *first) +{ + reorder_context_t *rctx; + queue_entry_t *q; + uint32_t i; + uint32_t j; + uint32_t num; + + rctx = first; + do { + /* Process all events in this reorder context */ + for (i = 0; i < rctx->numevts;) { + q = rctx->destq[i]; + /* Find index of next different destq */ + j = i + 1; + while (j < rctx->numevts && rctx->destq[j] == q) + j++; + num = j - i; + /* Blocking enqueue of events to this destq */ + blocking_enqueue(q, &rctx->events[i], num); + i += num; + } + /* Update rctx pointer to point to 'next_idx' element */ + rctx += (int)rctx->next_idx - (int)rctx->idx; + } while (rctx != first); + olock_release(first); + rctx_free(first); +} + +void rctx_release(reorder_context_t *rctx) +{ + /* Insert reorder context into reorder window, potentially calling the + * rctx_retire function for all pending reorder_contexts. + */ + rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire); +} + +/* Save destination queue and events in the reorder context for deferred + * enqueue. + */ +int rctx_save(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + sched_scalable_thread_state_t *ts; + reorder_context_t *first; + reorder_context_t *cur; + bitset_t next_idx; + + ts = sched_ts; + first = ts->rctx; + ODP_ASSERT(ts->rctx != NULL); + cur = &first[(int)first->cur_idx - (int)first->idx]; + for (i = 0; i < num; i++) { + if (odp_unlikely(cur->numevts == RC_EVT_SIZE)) { + /* No more space in current reorder context + * Try to allocate another. + */ + if (odp_unlikely( + bitset_is_null(ts->priv_rvec_free))) { + ts->priv_rvec_free = + atom_bitset_xchg( + &ts->rvec_free, + 0, + __ATOMIC_RELAXED); + if (odp_unlikely(bitset_is_null( + ts->priv_rvec_free))) + /* Out of reorder contexts. + * Return the number of events + * stored so far. + */ + return i; + } + next_idx = bitset_ffs(ts->priv_rvec_free) - 1; + ts->priv_rvec_free = + bitset_clr(ts->priv_rvec_free, + next_idx); + /* Link current to next (for eventual + * retiring) + */ + cur->next_idx = next_idx; + /* Link first to next (for next call to + * queue_enq_multi()) + */ + first->cur_idx = next_idx; + /* Update current to next */ + cur = &ts->rvec[next_idx]; + rctx_init(cur, next_idx, NULL, 0); + /* The last rctx (so far) */ + cur->next_idx = first->idx; + } + cur->events[cur->numevts] = buf_hdr[i]; + cur->destq[cur->numevts] = queue; + cur->numevts++; + } + /* All events stored. */ + return num; +}
commit 9fd48a9215a7831ca951839b7187bd1eb3f7bb06 Author: Brian Brooks brian.brooks@arm.com Date: Fri Jun 23 16:04:39 2017 -0500
linux-gen: sched scalable: add a concurrent queue
Signed-off-by: Ola Liljedahl ola.liljedahl@arm.com Reviewed-by: Brian Brooks brian.brooks@arm.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 39322dc1..19e2241b 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -170,6 +170,7 @@ noinst_HEADERS = \ ${srcdir}/include/odp_errno_define.h \ ${srcdir}/include/odp_forward_typedefs_internal.h \ ${srcdir}/include/odp_internal.h \ + ${srcdir}/include/odp_llqueue.h \ ${srcdir}/include/odp_name_table_internal.h \ ${srcdir}/include/odp_packet_internal.h \ ${srcdir}/include/odp_packet_io_internal.h \ diff --git a/platform/linux-generic/include/odp_llqueue.h b/platform/linux-generic/include/odp_llqueue.h new file mode 100644 index 00000000..99b12e66 --- /dev/null +++ b/platform/linux-generic/include/odp_llqueue.h @@ -0,0 +1,311 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_LLQUEUE_H_ +#define ODP_LLQUEUE_H_ + +#include <odp/api/cpu.h> +#include <odp/api/hints.h> +#include <odp/api/spinlock.h> + +#include <odp_config_internal.h> +#include <odp_debug_internal.h> +#include <odp_cpu.h> + +#include <stdint.h> +#include <stdlib.h> + +/****************************************************************************** + * Linked list queues + *****************************************************************************/ + +struct llqueue; +struct llnode; + +static struct llnode *llq_head(struct llqueue *llq); +static void llqueue_init(struct llqueue *llq); +static void llq_enqueue(struct llqueue *llq, struct llnode *node); +static struct llnode *llq_dequeue(struct llqueue *llq); +static odp_bool_t llq_dequeue_cond(struct llqueue *llq, struct llnode *exp); +static odp_bool_t llq_cond_rotate(struct llqueue *llq, struct llnode *node); +static odp_bool_t llq_on_queue(struct llnode *node); + +/****************************************************************************** + * The implementation(s) + *****************************************************************************/ + +#define SENTINEL ((void *)~(uintptr_t)0) + +#ifdef CONFIG_LLDSCD +/* Implement queue operations using double-word LL/SC */ + +/* The scalar equivalent of a double pointer */ +#if __SIZEOF_PTRDIFF_T__ == 4 +typedef uint64_t dintptr_t; +#endif +#if __SIZEOF_PTRDIFF_T__ == 8 +typedef __int128 dintptr_t; +#endif + +struct llnode { + struct llnode *next; +}; + +union llht { + struct { + struct llnode *head, *tail; + } st; + dintptr_t ui; +}; + +struct llqueue { + union llht u; +}; + +static inline struct llnode *llq_head(struct llqueue *llq) +{ + return __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED); +} + +static inline void llqueue_init(struct llqueue *llq) +{ + llq->u.st.head = NULL; + llq->u.st.tail = NULL; +} + +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node) +{ + union llht old, neu; + + ODP_ASSERT(node->next == NULL); + node->next = SENTINEL; + do { + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); + neu.st.head = old.st.head == NULL ? node : old.st.head; + neu.st.tail = node; + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE))); + if (old.st.tail != NULL) { + /* List was not empty */ + ODP_ASSERT(old.st.tail->next == SENTINEL); + old.st.tail->next = node; + } +} + +static inline struct llnode *llq_dequeue(struct llqueue *llq) +{ + struct llnode *head; + union llht old, neu; + + /* llq_dequeue() may be used in a busy-waiting fashion + * Read head using plain load to avoid disturbing remote LL/SC + */ + head = __atomic_load_n(&llq->u.st.head, __ATOMIC_ACQUIRE); + if (head == NULL) + return NULL; + /* Read head->next before LL to minimize cache miss latency + * in LL/SC below + */ + (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED); + + do { + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); + if (odp_unlikely(old.st.head == NULL)) { + /* Empty list */ + return NULL; + } else if (odp_unlikely(old.st.head == old.st.tail)) { + /* Single-element in list */ + neu.st.head = NULL; + neu.st.tail = NULL; + } else { + /* Multi-element list, dequeue head */ + struct llnode *next; + /* Wait until llq_enqueue() has written true next + * pointer + */ + while ((next = __atomic_load_n(&old.st.head->next, + __ATOMIC_RELAXED)) == + SENTINEL) + odp_cpu_pause(); + neu.st.head = next; + neu.st.tail = old.st.tail; + } + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))); + old.st.head->next = NULL; + return old.st.head; +} + +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq, + struct llnode *exp) +{ + union llht old, neu; + + do { + old.ui = lld(&llq->u.ui, __ATOMIC_ACQUIRE); + if (odp_unlikely(old.st.head == NULL || old.st.head != exp)) { + /* Empty list or wrong head */ + return false; + } else if (odp_unlikely(old.st.head == old.st.tail)) { + /* Single-element in list */ + neu.st.head = NULL; + neu.st.tail = NULL; + } else { + /* Multi-element list, dequeue head */ + struct llnode *next; + + /* Wait until llq_enqueue() has written true next + * pointer */ + while ((next = __atomic_load_n(&old.st.head->next, + __ATOMIC_RELAXED)) == + SENTINEL) + odp_cpu_pause(); + + neu.st.head = next; + neu.st.tail = old.st.tail; + } + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))); + old.st.head->next = NULL; + return true; +} + +/* If 'node' is a head of llq then move it to tail */ +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq, + struct llnode *node) +{ + /* Difficult to make this into a single atomic operation + * Instead use existing primitives. + */ + if (odp_likely(llq_dequeue_cond(llq, node))) { + llq_enqueue(llq, node); + return true; + } + return false; +} + +static inline odp_bool_t llq_on_queue(struct llnode *node) +{ + return node->next != NULL; +} + +#else +/* Implement queue operations protected by a spin lock */ + +struct llnode { + struct llnode *next; +}; + +struct llqueue { + struct llnode *head, *tail; + odp_spinlock_t lock; +}; + +static inline struct llnode *llq_head(struct llqueue *llq) +{ + return __atomic_load_n(&llq->head, __ATOMIC_RELAXED); +} + +static inline void llqueue_init(struct llqueue *llq) +{ + llq->head = NULL; + llq->tail = NULL; + odp_spinlock_init(&llq->lock); +} + +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node) +{ + ODP_ASSERT(node->next == NULL); + node->next = SENTINEL; + + odp_spinlock_lock(&llq->lock); + if (llq->head == NULL) { + llq->head = node; + llq->tail = node; + } else { + llq->tail->next = node; + llq->tail = node; + } + odp_spinlock_unlock(&llq->lock); +} + +static inline struct llnode *llq_dequeue(struct llqueue *llq) +{ + struct llnode *head; + struct llnode *node = NULL; + + head = __atomic_load_n(&llq->head, __ATOMIC_RELAXED); + if (head == NULL) + return NULL; + + odp_spinlock_lock(&llq->lock); + if (llq->head != NULL) { + node = llq->head; + if (llq->head == llq->tail) { + ODP_ASSERT(node->next == SENTINEL); + llq->head = NULL; + llq->tail = NULL; + } else { + ODP_ASSERT(node->next != SENTINEL); + llq->head = node->next; + } + node->next = NULL; + } + odp_spinlock_unlock(&llq->lock); + return node; +} + +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq, + struct llnode *node) +{ + odp_bool_t success = false; + + odp_spinlock_lock(&llq->lock); + if (odp_likely(llq->head != NULL && llq->head == node)) { + success = true; + if (llq->head == llq->tail) { + ODP_ASSERT(node->next == SENTINEL); + llq->head = NULL; + llq->tail = NULL; + } else { + ODP_ASSERT(node->next != SENTINEL); + llq->head = node->next; + } + node->next = NULL; + } + odp_spinlock_unlock(&llq->lock); + return success; +} + +/* If 'node' is a head of llq then move it to tail */ +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq, + struct llnode *node) +{ + odp_bool_t success = false; + + odp_spinlock_lock(&llq->lock); + if (odp_likely(llq->head == node)) { + success = true; + if (llq->tail != node) { + ODP_ASSERT(node->next != SENTINEL); + llq->head = node->next; + llq->tail->next = node; + llq->tail = node; + node->next = SENTINEL; + } + /* Else 'node' is only element on list => nothing to do */ + } + odp_spinlock_unlock(&llq->lock); + return success; +} + +static inline odp_bool_t llq_on_queue(struct llnode *node) +{ + return node->next != NULL; +} + +#endif + +#endif
commit bf554b2d9472ed8ed04580904d0aa906cdbe1e83 Author: Brian Brooks brian.brooks@arm.com Date: Fri Jun 23 16:04:38 2017 -0500
linux-gen: sched scalable: add a bitset
Signed-off-by: Ola Liljedahl ola.liljedahl@arm.com Reviewed-by: Brian Brooks brian.brooks@arm.com Reviewed-by: Honnappa Nagarahalli honnappa.nagarahalli@arm.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 09e25166..39322dc1 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -159,6 +159,7 @@ noinst_HEADERS = \ ${srcdir}/include/odp_atomic_internal.h \ ${srcdir}/include/odp_buffer_inlines.h \ ${srcdir}/include/odp_bitmap_internal.h \ + ${srcdir}/include/odp_bitset.h \ ${srcdir}/include/odp_buffer_internal.h \ ${srcdir}/include/odp_classification_datamodel.h \ ${srcdir}/include/odp_classification_inlines.h \ diff --git a/platform/linux-generic/include/odp_bitset.h b/platform/linux-generic/include/odp_bitset.h new file mode 100644 index 00000000..4b7dd6d6 --- /dev/null +++ b/platform/linux-generic/include/odp_bitset.h @@ -0,0 +1,212 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef _ODP_BITSET_H_ +#define _ODP_BITSET_H_ + +#include <odp_cpu.h> + +#include <limits.h> + +/****************************************************************************** + * bitset abstract data type + *****************************************************************************/ +/* This could be a struct of scalars to support larger bit sets */ + +/* + * Size of atomic bit set. This limits the max number of threads, + * scheduler groups and reorder windows. On ARMv8/64-bit and x86-64, the + * (lock-free) max is 128 + */ + +/* Find a suitable data type that supports lock-free atomic operations */ +#if defined(__aarch64__) && defined(__SIZEOF_INT128__) && \ + __SIZEOF_INT128__ == 16 +#define LOCKFREE16 +typedef __int128 bitset_t; +#define ATOM_BITSET_SIZE (CHAR_BIT * __SIZEOF_INT128__) + +#elif __GCC_ATOMIC_LLONG_LOCK_FREE == 2 && \ + __SIZEOF_LONG_LONG__ != __SIZEOF_LONG__ +typedef unsigned long long bitset_t; +#define ATOM_BITSET_SIZE (CHAR_BIT * __SIZEOF_LONG_LONG__) + +#elif __GCC_ATOMIC_LONG_LOCK_FREE == 2 && __SIZEOF_LONG__ != __SIZEOF_INT__ +typedef unsigned long bitset_t; +#define ATOM_BITSET_SIZE (CHAR_BIT * __SIZEOF_LONG__) + +#elif __GCC_ATOMIC_INT_LOCK_FREE == 2 +typedef unsigned int bitset_t; +#define ATOM_BITSET_SIZE (CHAR_BIT * __SIZEOF_INT__) + +#else +/* Target does not support lock-free atomic operations */ +typedef unsigned int bitset_t; +#define ATOM_BITSET_SIZE (CHAR_BIT * __SIZEOF_INT__) +#endif + +#if ATOM_BITSET_SIZE <= 32 + +static inline bitset_t bitset_mask(uint32_t bit) +{ + return 1UL << bit; +} + +/* Return first-bit-set with StdC ffs() semantics */ +static inline uint32_t bitset_ffs(bitset_t b) +{ + return __builtin_ffsl(b); +} + +/* Load-exclusive with memory ordering */ +static inline bitset_t bitset_monitor(bitset_t *bs, int mo) +{ + return monitor32(bs, mo); +} + +#elif ATOM_BITSET_SIZE <= 64 + +static inline bitset_t bitset_mask(uint32_t bit) +{ + return 1ULL << bit; +} + +/* Return first-bit-set with StdC ffs() semantics */ +static inline uint32_t bitset_ffs(bitset_t b) +{ + return __builtin_ffsll(b); +} + +/* Load-exclusive with memory ordering */ +static inline bitset_t bitset_monitor(bitset_t *bs, int mo) +{ + return monitor64(bs, mo); +} + +#elif ATOM_BITSET_SIZE <= 128 + +static inline bitset_t bitset_mask(uint32_t bit) +{ + if (bit < 64) + return 1ULL << bit; + else + return (unsigned __int128)(1ULL << (bit - 64)) << 64; +} + +/* Return first-bit-set with StdC ffs() semantics */ +static inline uint32_t bitset_ffs(bitset_t b) +{ + if ((uint64_t)b != 0) + return __builtin_ffsll((uint64_t)b); + else if ((b >> 64) != 0) + return __builtin_ffsll((uint64_t)(b >> 64)) + 64; + else + return 0; +} + +/* Load-exclusive with memory ordering */ +static inline bitset_t bitset_monitor(bitset_t *bs, int mo) +{ + return monitor128(bs, mo); +} + +#else +#error Unsupported size of bit sets (ATOM_BITSET_SIZE) +#endif + +/* Atomic load with memory ordering */ +static inline bitset_t atom_bitset_load(bitset_t *bs, int mo) +{ +#ifdef LOCKFREE16 + return __lockfree_load_16(bs, mo); +#else + return __atomic_load_n(bs, mo); +#endif +} + +/* Atomic bit set with memory ordering */ +static inline void atom_bitset_set(bitset_t *bs, uint32_t bit, int mo) +{ +#ifdef LOCKFREE16 + (void)__lockfree_fetch_or_16(bs, bitset_mask(bit), mo); +#else + (void)__atomic_fetch_or(bs, bitset_mask(bit), mo); +#endif +} + +/* Atomic bit clear with memory ordering */ +static inline void atom_bitset_clr(bitset_t *bs, uint32_t bit, int mo) +{ +#ifdef LOCKFREE16 + (void)__lockfree_fetch_and_16(bs, ~bitset_mask(bit), mo); +#else + (void)__atomic_fetch_and(bs, ~bitset_mask(bit), mo); +#endif +} + +/* Atomic exchange with memory ordering */ +static inline bitset_t atom_bitset_xchg(bitset_t *bs, bitset_t neu, int mo) +{ +#ifdef LOCKFREE16 + return __lockfree_exchange_16(bs, neu, mo); +#else + return __atomic_exchange_n(bs, neu, mo); +#endif +} + +/* Atomic compare&exchange with memory ordering */ +static inline bitset_t atom_bitset_cmpxchg(bitset_t *bs, bitset_t *old, + bitset_t neu, bool weak, + int mo_success, int mo_failure) +{ +#ifdef LOCKFREE16 + return __lockfree_compare_exchange_16( + bs, old, neu, weak, mo_success, mo_failure); +#else + return __atomic_compare_exchange_n( + bs, old, neu, weak, mo_success, mo_failure); +#endif +} + +/* Return a & ~b */ +static inline bitset_t bitset_andn(bitset_t a, bitset_t b) +{ + return a & ~b; +} + +static inline bool bitset_is_eql(bitset_t a, bitset_t b) +{ + return a == b; +} + +static inline bitset_t bitset_clr(bitset_t bs, uint32_t bit) +{ + return bs & ~bitset_mask(bit); +} + +static inline bitset_t bitset_set(bitset_t bs, uint32_t bit) +{ + return bs | bitset_mask(bit); +} + +static inline bitset_t bitset_null(void) +{ + return 0U; +} + +static inline bool bitset_is_null(bitset_t a) +{ + return a == 0U; +} + +static inline bool bitset_is_set(bitset_t a, uint32_t bit) +{ + return (a & bitset_mask(bit)) != 0; +} + +#endif
commit ac89f9c568830498b186dff33e908053dbdc88c7 Author: Brian Brooks brian.brooks@arm.com Date: Fri Jun 23 16:04:37 2017 -0500
linux-gen: sched scalable: add arch files
Signed-off-by: Brian Brooks brian.brooks@arm.com Signed-off-by: Ola Liljedahl ola.liljedahl@arm.com Reviewed-by: Honnappa Nagarahalli honnappa.nagarahalli@arm.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 2293f7f0..09e25166 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -8,6 +8,7 @@ AM_CFLAGS += -I$(srcdir)/include AM_CFLAGS += -I$(top_srcdir)/include AM_CFLAGS += -I$(top_srcdir)/include/odp/arch/@ARCH_ABI@ AM_CFLAGS += -I$(top_builddir)/include +AM_CFLAGS += -I$(top_srcdir)/arch/@ARCH_DIR@ AM_CFLAGS += -Iinclude AM_CFLAGS += -DSYSCONFDIR="@sysconfdir@" AM_CFLAGS += -D_ODP_PKTIO_IPC @@ -198,6 +199,22 @@ noinst_HEADERS = \ ${srcdir}/include/protocols/udp.h \ ${srcdir}/Makefile.inc
+if ARCH_IS_ARM +noinst_HEADERS += ${srcdir}/arch/arm/odp_atomic.h \ + ${srcdir}/arch/arm/odp_cpu.h \ + ${srcdir}/arch/arm/odp_cpu_idling.h \ + ${srcdir}/arch/arm/odp_llsc.h +endif +if ARCH_IS_MIPS64 +noinst_HEADERS += ${srcdir}/arch/mips64/odp_cpu.h +endif +if ARCH_IS_POWERPC +noinst_HEADERS += ${srcdir}/arch/powerpc/odp_cpu.h +endif +if ARCH_IS_X86 +noinst_HEADERS += ${srcdir}/arch/x86/odp_cpu.h +endif + __LIB__libodp_linux_la_SOURCES = \ _fdserver.c \ _ishm.c \ diff --git a/platform/linux-generic/arch/arm/odp_atomic.h b/platform/linux-generic/arch/arm/odp_atomic.h new file mode 100644 index 00000000..3a21a47b --- /dev/null +++ b/platform/linux-generic/arch/arm/odp_atomic.h @@ -0,0 +1,212 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_ATOMIC_H +#define PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_ATOMIC_H + +#ifndef PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_CPU_H +#error This file should not be included directly, please include odp_cpu.h +#endif + +#ifdef CONFIG_DMBSTR + +#define atomic_store_release(loc, val, ro) \ +do { \ + _odp_release_barrier(ro); \ + __atomic_store_n(loc, val, __ATOMIC_RELAXED); \ +} while (0) + +#else + +#define atomic_store_release(loc, val, ro) \ + __atomic_store_n(loc, val, __ATOMIC_RELEASE) + +#endif /* CONFIG_DMBSTR */ + +#ifdef __aarch64__ + +#define HAS_ACQ(mo) ((mo) != __ATOMIC_RELAXED && (mo) != __ATOMIC_RELEASE) +#define HAS_RLS(mo) ((mo) == __ATOMIC_RELEASE || (mo) == __ATOMIC_ACQ_REL || \ + (mo) == __ATOMIC_SEQ_CST) + +#define LL_MO(mo) (HAS_ACQ((mo)) ? __ATOMIC_ACQUIRE : __ATOMIC_RELAXED) +#define SC_MO(mo) (HAS_RLS((mo)) ? __ATOMIC_RELEASE : __ATOMIC_RELAXED) + +#ifndef __ARM_FEATURE_QRDMX /* Feature only available in v8.1a and beyond */ +static inline bool +__lockfree_compare_exchange_16(register __int128 *var, __int128 *exp, + register __int128 neu, bool weak, int mo_success, + int mo_failure) +{ + (void)weak; /* Always do strong CAS or we can't perform atomic read */ + /* Ignore memory ordering for failure, memory order for + * success must be stronger or equal. */ + (void)mo_failure; + register __int128 old; + register __int128 expected; + int ll_mo = LL_MO(mo_success); + int sc_mo = SC_MO(mo_success); + + expected = *exp; + __asm__ volatile("" ::: "memory"); + do { + /* Atomicity of LLD is not guaranteed */ + old = lld(var, ll_mo); + /* Must write back neu or old to verify atomicity of LLD */ + } while (odp_unlikely(scd(var, old == expected ? neu : old, sc_mo))); + *exp = old; /* Always update, atomically read value */ + return old == expected; +} + +static inline __int128 __lockfree_exchange_16(__int128 *var, __int128 neu, + int mo) +{ + register __int128 old; + int ll_mo = LL_MO(mo); + int sc_mo = SC_MO(mo); + + do { + /* Atomicity of LLD is not guaranteed */ + old = lld(var, ll_mo); + /* Must successfully write back to verify atomicity of LLD */ + } while (odp_unlikely(scd(var, neu, sc_mo))); + return old; +} + +static inline __int128 __lockfree_fetch_and_16(__int128 *var, __int128 mask, + int mo) +{ + register __int128 old; + int ll_mo = LL_MO(mo); + int sc_mo = SC_MO(mo); + + do { + /* Atomicity of LLD is not guaranteed */ + old = lld(var, ll_mo); + /* Must successfully write back to verify atomicity of LLD */ + } while (odp_unlikely(scd(var, old & mask, sc_mo))); + return old; +} + +static inline __int128 __lockfree_fetch_or_16(__int128 *var, __int128 mask, + int mo) +{ + register __int128 old; + int ll_mo = LL_MO(mo); + int sc_mo = SC_MO(mo); + + do { + /* Atomicity of LLD is not guaranteed */ + old = lld(var, ll_mo); + /* Must successfully write back to verify atomicity of LLD */ + } while (odp_unlikely(scd(var, old | mask, sc_mo))); + return old; +} + +#else + +static inline __int128 casp(__int128 *var, __int128 old, __int128 neu, int mo) +{ + if (mo == __ATOMIC_RELAXED) { + __asm__ volatile("casp %0, %H0, %1, %H1, [%2]" + : "+r" (old) + : "r" (neu), "r" (var) + : "memory"); + } else if (mo == __ATOMIC_ACQUIRE) { + __asm__ volatile("caspa %0, %H0, %1, %H1, [%2]" + : "+r" (old) + : "r" (neu), "r" (var) + : "memory"); + } else if (mo == __ATOMIC_ACQ_REL) { + __asm__ volatile("caspal %0, %H0, %1, %H1, [%2]" + : "+r" (old) + : "r" (neu), "r" (var) + : "memory"); + } else if (mo == __ATOMIC_RELEASE) { + __asm__ volatile("caspl %0, %H0, %1, %H1, [%2]" + : "+r" (old) + : "r" (neu), "r" (var) + : "memory"); + } else { + abort(); + } + return old; +} + +static inline bool +__lockfree_compare_exchange_16(register __int128 *var, __int128 *exp, + register __int128 neu, bool weak, int mo_success, + int mo_failure) +{ + (void)weak; + (void)mo_failure; + __int128 old; + __int128 expected; + + expected = *exp; + old = casp(var, expected, neu, mo_success); + *exp = old; /* Always update, atomically read value */ + return old == expected; +} + +static inline __int128 __lockfree_exchange_16(__int128 *var, __int128 neu, + int mo) +{ + __int128 old; + __int128 expected; + + do { + expected = *var; + old = casp(var, expected, neu, mo); + } while (old != expected); + return old; +} + +static inline __int128 __lockfree_fetch_and_16(__int128 *var, __int128 mask, + int mo) +{ + __int128 old; + __int128 expected; + + do { + expected = *var; + old = casp(var, expected, expected & mask, mo); + } while (old != expected); + return old; +} + +static inline __int128 __lockfree_fetch_or_16(__int128 *var, __int128 mask, + int mo) +{ + __int128 old; + __int128 expected; + + do { + expected = *var; + old = casp(var, expected, expected | mask, mo); + } while (old != expected); + return old; +} + +#endif /* __ARM_FEATURE_QRDMX */ + +static inline __int128 __lockfree_load_16(__int128 *var, int mo) +{ + __int128 old = *var; /* Possibly torn read */ + + /* Do CAS to ensure atomicity + * Either CAS succeeds (writing back the same value) + * Or CAS fails and returns the old value (atomic read) + */ + (void)__lockfree_compare_exchange_16(var, &old, old, false, mo, mo); + return old; +} + +#endif /* __aarch64__ */ + +#endif /* PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_ATOMIC_H */ diff --git a/platform/linux-generic/arch/arm/odp_cpu.h b/platform/linux-generic/arch/arm/odp_cpu.h new file mode 100644 index 00000000..8ef50da4 --- /dev/null +++ b/platform/linux-generic/arch/arm/odp_cpu.h @@ -0,0 +1,71 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_CPU_H +#define PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_CPU_H + +#if !defined(__arm__) && !defined(__aarch64__) +#error Use this file only when compiling for ARM architecture +#endif + +#include <odp_debug_internal.h> + +/* + * Use LLD/SCD atomic primitives instead of lock-based code path in llqueue + * LLD/SCD is on ARM the fastest way to enqueue and dequeue elements from a + * linked list queue. + */ +#define CONFIG_LLDSCD + +/* + * Use DMB;STR instead of STRL on ARM + * On early ARMv8 implementations (e.g. Cortex-A57) this is noticeably more + * performant than using store-release. + * This also allows for load-only barriers (DMB ISHLD) which are much cheaper + * than a full barrier + */ +#define CONFIG_DMBSTR + +/* + * Use ARM event signalling mechanism + * Event signalling minimises spinning (busy waiting) which decreases + * cache coherency traffic when spinning on shared locations (thus faster and + * more scalable) and enables the CPU to enter a sleep state (lower power + * consumption). + */ +#define CONFIG_WFE + +static inline void dmb(void) +{ + __asm__ volatile("dmb" : : : "memory"); +} + +#ifdef __aarch64__ + +/* Only ARMv8 supports DMB ISHLD */ +/* A load only barrier is much cheaper than full barrier */ +#define _odp_release_barrier(ro) \ +do { \ + if (ro) \ + __asm__ volatile("dmb ishld" ::: "memory"); \ + else \ + __asm__ volatile("dmb ish" ::: "memory"); \ +} while (0) + +#else + +#define _odp_release_barrier(ro) \ + __atomic_thread_fence(__ATOMIC_RELEASE) + +#endif /* __aarch64__ */ + +#include "odp_llsc.h" +#include "odp_atomic.h" +#include "odp_cpu_idling.h" + +#endif /* PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_CPU_H */ diff --git a/platform/linux-generic/arch/arm/odp_cpu_idling.h b/platform/linux-generic/arch/arm/odp_cpu_idling.h new file mode 100644 index 00000000..3ae1028c --- /dev/null +++ b/platform/linux-generic/arch/arm/odp_cpu_idling.h @@ -0,0 +1,53 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef PLATFORM_LINUXGENERIC_ARCH_ARM_CPU_IDLING_H +#define PLATFORM_LINUXGENERIC_ARCH_ARM_CPU_IDLING_H + +#ifndef PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_CPU_H +#error This file should not be included directly, please include odp_cpu.h +#endif + +static inline void sevl(void) +{ +#ifdef CONFIG_WFE + __asm__ volatile("sevl" : : : ); +#endif +} + +static inline int wfe(void) +{ +#ifdef CONFIG_WFE + __asm__ volatile("wfe" : : : "memory"); +#endif + return 1; +} + +static inline void doze(void) +{ +#ifndef CONFIG_WFE + /* When using WFE do not stall the pipeline using other means */ + odp_cpu_pause(); +#endif +} + +#ifdef CONFIG_WFE +#ifdef __aarch64__ +#define monitor128(addr, mo) lld((addr), (mo)) +#endif +#define monitor64(addr, mo) ll64((addr), (mo)) +#define monitor32(addr, mo) ll32((addr), (mo)) +#define monitor8(addr, mo) ll8((addr), (mo)) +#else +#define monitor128(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor64(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor32(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor8(addr, mo) __atomic_load_n((addr), (mo)) +#endif + +#endif /* PLATFORM_LINUXGENERIC_ARCH_ARM_CPU_IDLING_H */ diff --git a/platform/linux-generic/arch/arm/odp_llsc.h b/platform/linux-generic/arch/arm/odp_llsc.h new file mode 100644 index 00000000..73c4d7b1 --- /dev/null +++ b/platform/linux-generic/arch/arm/odp_llsc.h @@ -0,0 +1,253 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef PLATFORM_LINUXGENERIC_ARCH_ARM_LLSC_H +#define PLATFORM_LINUXGENERIC_ARCH_ARM_LLSC_H + +#ifndef PLATFORM_LINUXGENERIC_ARCH_ARM_ODP_CPU_H +#error This file should not be included directly, please include odp_cpu.h +#endif + +#ifdef __arm__ + +static inline uint32_t ll8(uint8_t *var, int mm) +{ + uint8_t old; + + __asm__ volatile("ldrexb %0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + /* Barrier after an acquiring load */ + if (mm == __ATOMIC_ACQUIRE) + dmb(); + return old; +} + +static inline uint32_t ll(uint32_t *var, int mm) +{ + uint32_t old; + + __asm__ volatile("ldrex %0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + /* Barrier after an acquiring load */ + if (mm == __ATOMIC_ACQUIRE) + dmb(); + return old; +} + +#define ll32(a, b) ll((a), (b)) + +/* Return 0 on success, 1 on failure */ +static inline uint32_t sc(uint32_t *var, uint32_t neu, int mm) +{ + uint32_t ret; + + /* Barrier before a releasing store */ + if (mm == __ATOMIC_RELEASE) + dmb(); + __asm__ volatile("strex %0, %1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + return ret; +} + +#define sc32(a, b, c) sc((a), (b), (c)) + +static inline uint64_t lld(uint64_t *var, int mm) +{ + uint64_t old; + + __asm__ volatile("ldrexd %0, %H0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + /* Barrier after an acquiring load */ + if (mm == __ATOMIC_ACQUIRE) + dmb(); + return old; +} + +#define ll64(a, b) lld((a), (b)) + +/* Return 0 on success, 1 on failure */ +static inline uint32_t scd(uint64_t *var, uint64_t neu, int mm) +{ + uint32_t ret; + + /* Barrier before a releasing store */ + if (mm == __ATOMIC_RELEASE) + dmb(); + __asm__ volatile("strexd %0, %1, %H1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + return ret; +} + +#define sc64(a, b, c) scd((a), (b), (c)) + +#endif /* __arm__ */ + +#ifdef __aarch64__ + +static inline uint16_t ll8(uint8_t *var, int mm) +{ + uint16_t old; + + if (mm == __ATOMIC_ACQUIRE) + __asm__ volatile("ldaxrb %w0, [%1]" + : "=&r" (old) + : "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm__ volatile("ldxrb %w0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + else + ODP_ABORT(); + return old; +} + +static inline uint32_t ll32(uint32_t *var, int mm) +{ + uint32_t old; + + if (mm == __ATOMIC_ACQUIRE) + __asm__ volatile("ldaxr %w0, [%1]" + : "=&r" (old) + : "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm__ volatile("ldxr %w0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + else + ODP_ABORT(); + return old; +} + +/* Return 0 on success, 1 on failure */ +static inline uint32_t sc32(uint32_t *var, uint32_t neu, int mm) +{ + uint32_t ret; + + if (mm == __ATOMIC_RELEASE) + __asm__ volatile("stlxr %w0, %w1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm__ volatile("stxr %w0, %w1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + else + ODP_ABORT(); + return ret; +} + +static inline uint64_t ll(uint64_t *var, int mm) +{ + uint64_t old; + + if (mm == __ATOMIC_ACQUIRE) + __asm__ volatile("ldaxr %0, [%1]" + : "=&r" (old) + : "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm__ volatile("ldxr %0, [%1]" + : "=&r" (old) + : "r" (var) + : ); + else + ODP_ABORT(); + return old; +} + +#define ll64(a, b) ll((a), (b)) + +/* Return 0 on success, 1 on failure */ +static inline uint32_t sc(uint64_t *var, uint64_t neu, int mm) +{ + uint32_t ret; + + if (mm == __ATOMIC_RELEASE) + __asm__ volatile("stlxr %w0, %1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm__ volatile("stxr %w0, %1, [%2]" + : "=&r" (ret) + : "r" (neu), "r" (var) + : ); + else + ODP_ABORT(); + return ret; +} + +#define sc64(a, b, c) sc((a), (b), (c)) + +union i128 { + __int128 i128; + int64_t i64[2]; +}; + +static inline __int128 lld(__int128 *var, int mm) +{ + union i128 old; + + if (mm == __ATOMIC_ACQUIRE) + __asm__ volatile("ldaxp %0, %1, [%2]" + : "=&r" (old.i64[0]), "=&r" (old.i64[1]) + : "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm__ volatile("ldxp %0, %1, [%2]" + : "=&r" (old.i64[0]), "=&r" (old.i64[1]) + : "r" (var) + : ); + else + ODP_ABORT(); + return old.i128; +} + +/* Return 0 on success, 1 on failure */ +static inline uint32_t scd(__int128 *var, __int128 neu, int mm) +{ + uint32_t ret; + + if (mm == __ATOMIC_RELEASE) + __asm__ volatile("stlxp %w0, %1, %2, [%3]" + : "=&r" (ret) + : "r" (((union i128)neu).i64[0]), + "r" (((union i128)neu).i64[1]), + "r" (var) + : "memory"); + else if (mm == __ATOMIC_RELAXED) + __asm__ volatile("stxp %w0, %1, %2, [%3]" + : "=&r" (ret) + : "r" (((union i128)neu).i64[0]), + "r" (((union i128)neu).i64[1]), + "r" (var) + : ); + else + ODP_ABORT(); + return ret; +} + +#endif /* __aarch64__ */ + +#endif /* PLATFORM_LINUXGENERIC_ARCH_ARM_LLSC_H */ diff --git a/platform/linux-generic/arch/default/odp_cpu.h b/platform/linux-generic/arch/default/odp_cpu.h new file mode 100644 index 00000000..12a44b93 --- /dev/null +++ b/platform/linux-generic/arch/default/odp_cpu.h @@ -0,0 +1,43 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_DEFAULT_CPU_H_ +#define ODP_DEFAULT_CPU_H_ + +/****************************************************************************** + * Atomics + *****************************************************************************/ + +#define atomic_store_release(loc, val, ro) \ + __atomic_store_n(loc, val, __ATOMIC_RELEASE) + +/****************************************************************************** + * Idle mgmt + *****************************************************************************/ + +static inline void sevl(void) +{ + /* empty */ +} + +static inline int wfe(void) +{ + return 1; +} + +#define monitor128(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor64(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor32(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor8(addr, mo) __atomic_load_n((addr), (mo)) + +static inline void doze(void) +{ + odp_cpu_pause(); +} + +#endif diff --git a/platform/linux-generic/arch/mips64/odp_cpu.h b/platform/linux-generic/arch/mips64/odp_cpu.h new file mode 100644 index 00000000..1097f7bc --- /dev/null +++ b/platform/linux-generic/arch/mips64/odp_cpu.h @@ -0,0 +1,43 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_MIPS64_CPU_H_ +#define ODP_MIPS64_CPU_H_ + +/****************************************************************************** + * Atomics + *****************************************************************************/ + +#define atomic_store_release(loc, val, ro) \ + __atomic_store_n(loc, val, __ATOMIC_RELEASE) + +/****************************************************************************** + * Idle mgmt + *****************************************************************************/ + +static inline void sevl(void) +{ + /* empty */ +} + +static inline int wfe(void) +{ + return 1; +} + +#define monitor128(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor64(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor32(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor8(addr, mo) __atomic_load_n((addr), (mo)) + +static inline void doze(void) +{ + odp_cpu_pause(); +} + +#endif diff --git a/platform/linux-generic/arch/powerpc/odp_cpu.h b/platform/linux-generic/arch/powerpc/odp_cpu.h new file mode 100644 index 00000000..cd27fe24 --- /dev/null +++ b/platform/linux-generic/arch/powerpc/odp_cpu.h @@ -0,0 +1,43 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_POWERPC_CPU_H_ +#define ODP_POWERPC_CPU_H_ + +/****************************************************************************** + * Atomics + *****************************************************************************/ + +#define atomic_store_release(loc, val, ro) \ + __atomic_store_n(loc, val, __ATOMIC_RELEASE) + +/****************************************************************************** + * Idle mgmt + *****************************************************************************/ + +static inline void sevl(void) +{ + /* empty */ +} + +static inline int wfe(void) +{ + return 1; +} + +#define monitor128(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor64(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor32(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor8(addr, mo) __atomic_load_n((addr), (mo)) + +static inline void doze(void) +{ + odp_cpu_pause(); +} + +#endif diff --git a/platform/linux-generic/arch/x86/odp_cpu.h b/platform/linux-generic/arch/x86/odp_cpu.h new file mode 100644 index 00000000..b5c89409 --- /dev/null +++ b/platform/linux-generic/arch/x86/odp_cpu.h @@ -0,0 +1,43 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_X86_CPU_H_ +#define ODP_X86_CPU_H_ + +/****************************************************************************** + * Atomics + *****************************************************************************/ + +#define atomic_store_release(loc, val, ro) \ + __atomic_store_n(loc, val, __ATOMIC_RELEASE) + +/****************************************************************************** + * Idle mgmt + *****************************************************************************/ + +static inline void sevl(void) +{ + /* empty */ +} + +static inline int wfe(void) +{ + return 1; +} + +#define monitor128(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor64(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor32(addr, mo) __atomic_load_n((addr), (mo)) +#define monitor8(addr, mo) __atomic_load_n((addr), (mo)) + +static inline void doze(void) +{ + odp_cpu_pause(); +} + +#endif
commit 5b08cc290a4a84ed0ffbc4a591427f1891b9ace3 Author: Brian Brooks brian.brooks@arm.com Date: Fri Jun 23 16:04:36 2017 -0500
test: odp_pktio_ordered: add queue size
Signed-off-by: Brian Brooks brian.brooks@arm.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/test/common_plat/performance/odp_pktio_ordered.c b/test/common_plat/performance/odp_pktio_ordered.c index 4bb0bef9..50bfef51 100644 --- a/test/common_plat/performance/odp_pktio_ordered.c +++ b/test/common_plat/performance/odp_pktio_ordered.c @@ -91,6 +91,9 @@ /** Maximum number of pktio queues per interface */ #define MAX_QUEUES 32
+/** Seems to need at least 8192 elements per queue */ +#define QUEUE_SIZE 8192 + /** Maximum number of pktio interfaces */ #define MAX_PKTIOS 8
@@ -1232,6 +1235,7 @@ int main(int argc, char *argv[]) qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; qparam.sched.group = ODP_SCHED_GROUP_ALL; + qparam.size = QUEUE_SIZE;
gbl_args->flow_qcontext[i][j].idx = i; gbl_args->flow_qcontext[i][j].input_queue = 0;
-----------------------------------------------------------------------
Summary of changes: .travis.yml | 1 + platform/linux-generic/Makefile.am | 26 + platform/linux-generic/arch/arm/odp_atomic.h | 212 +++ platform/linux-generic/arch/arm/odp_cpu.h | 71 + platform/linux-generic/arch/arm/odp_cpu_idling.h | 53 + platform/linux-generic/arch/arm/odp_llsc.h | 253 +++ platform/linux-generic/arch/default/odp_cpu.h | 43 + platform/linux-generic/arch/mips64/odp_cpu.h | 43 + platform/linux-generic/arch/powerpc/odp_cpu.h | 43 + platform/linux-generic/arch/x86/odp_cpu.h | 43 + .../include/odp/api/plat/schedule_types.h | 4 +- platform/linux-generic/include/odp_bitset.h | 212 +++ .../linux-generic/include/odp_config_internal.h | 15 +- platform/linux-generic/include/odp_llqueue.h | 311 +++ .../include/odp_queue_scalable_internal.h | 104 + platform/linux-generic/include/odp_schedule_if.h | 2 +- .../linux-generic/include/odp_schedule_scalable.h | 139 ++ .../include/odp_schedule_scalable_config.h | 52 + .../include/odp_schedule_scalable_ordered.h | 132 ++ platform/linux-generic/m4/odp_schedule.m4 | 55 +- platform/linux-generic/odp_queue_if.c | 8 + platform/linux-generic/odp_queue_scalable.c | 1022 ++++++++++ platform/linux-generic/odp_schedule_if.c | 6 + platform/linux-generic/odp_schedule_scalable.c | 1980 ++++++++++++++++++++ .../linux-generic/odp_schedule_scalable_ordered.c | 345 ++++ test/common_plat/performance/odp_pktio_ordered.c | 4 + 26 files changed, 5157 insertions(+), 22 deletions(-) create mode 100644 platform/linux-generic/arch/arm/odp_atomic.h create mode 100644 platform/linux-generic/arch/arm/odp_cpu.h create mode 100644 platform/linux-generic/arch/arm/odp_cpu_idling.h create mode 100644 platform/linux-generic/arch/arm/odp_llsc.h create mode 100644 platform/linux-generic/arch/default/odp_cpu.h create mode 100644 platform/linux-generic/arch/mips64/odp_cpu.h create mode 100644 platform/linux-generic/arch/powerpc/odp_cpu.h create mode 100644 platform/linux-generic/arch/x86/odp_cpu.h create mode 100644 platform/linux-generic/include/odp_bitset.h create mode 100644 platform/linux-generic/include/odp_llqueue.h create mode 100644 platform/linux-generic/include/odp_queue_scalable_internal.h create mode 100644 platform/linux-generic/include/odp_schedule_scalable.h create mode 100644 platform/linux-generic/include/odp_schedule_scalable_config.h create mode 100644 platform/linux-generic/include/odp_schedule_scalable_ordered.h create mode 100644 platform/linux-generic/odp_queue_scalable.c create mode 100644 platform/linux-generic/odp_schedule_scalable.c create mode 100644 platform/linux-generic/odp_schedule_scalable_ordered.c
hooks/post-receive