This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, api-next has been updated via 10f2f857f6a38c89afff6bdfcc932e18e49ca9a2 (commit) via d16d1783b8b01c5a7bbd256567fa226775a4ba93 (commit) via d762feebade0ac71bbfbfeb31a9ed7c181ff1d91 (commit) via 2796eef79f73c47e29c0d40d411a5547277836ee (commit) via d235cfacc743bf0524fbc6c7f0f5810fa85ecd93 (commit) from a70fa333ebb25a8331bd203440fbe81bf84946b1 (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 10f2f857f6a38c89afff6bdfcc932e18e49ca9a2 Author: Matias Elo matias.elo@nokia.com Date: Fri Dec 2 12:56:28 2016 +0200
linux-gen: sched: new ordered lock implementation
Implement ordered locks using per lock atomic counters. The counter values are compared to the queue’s atomic context to guarantee ordered locking. Compared to the previous implementation this enables parallel processing of ordered events outside of the lock context.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index b905bd8..8b55de1 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -59,6 +59,8 @@ struct queue_entry_s { struct { odp_atomic_u64_t ctx; /**< Current ordered context id */ odp_atomic_u64_t next_ctx; /**< Next unallocated context id */ + /** Array of ordered locks */ + odp_atomic_u64_t lock[CONFIG_QUEUE_MAX_ORD_LOCKS]; } ordered ODP_ALIGNED_CACHE;
enq_func_t enqueue ODP_ALIGNED_CACHE; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 4c7f497..d9cb9f3 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -77,8 +77,14 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) { + unsigned i; + odp_atomic_init_u64(&queue->s.ordered.ctx, 0); odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0); + + for (i = 0; i < queue->s.param.sched.lock_count; i++) + odp_atomic_init_u64(&queue->s.ordered.lock[i], + 0); } } queue->s.type = queue->s.param.type; diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 2ce90aa..645630a 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -126,6 +126,15 @@ typedef struct { int num; } ordered_stash_t;
+/* Ordered lock states */ +typedef union { + uint8_t u8[CONFIG_QUEUE_MAX_ORD_LOCKS]; + uint32_t all; +} lock_called_t; + +ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), + "Lock_called_values_do_not_fit_in_uint32"); + /* Scheduler local data */ typedef struct { int thr; @@ -143,6 +152,7 @@ typedef struct { uint64_t ctx; /**< Ordered context id */ int stash_num; /**< Number of stashed enqueue operations */ uint8_t in_order; /**< Order status */ + lock_called_t lock_called; /**< States of ordered locks */ /** Storage for stashed enqueue operations */ ordered_stash_t stash[MAX_ORDERED_STASH]; } ordered; @@ -553,12 +563,21 @@ static inline void ordered_stash_release(void)
static inline void release_ordered(void) { + unsigned i; queue_entry_t *queue;
queue = sched_local.ordered.src_queue;
wait_for_order(queue);
+ /* Release all ordered locks */ + for (i = 0; i < queue->s.param.sched.lock_count; i++) { + if (!sched_local.ordered.lock_called.u8[i]) + odp_atomic_store_rel_u64(&queue->s.ordered.lock[i], + sched_local.ordered.ctx + 1); + } + + sched_local.ordered.lock_called.all = 0; sched_local.ordered.src_queue = NULL; sched_local.ordered.in_order = 0;
@@ -923,19 +942,46 @@ static void order_unlock(void) { }
-static void schedule_order_lock(unsigned lock_index ODP_UNUSED) +static void schedule_order_lock(unsigned lock_index) { + odp_atomic_u64_t *ord_lock; queue_entry_t *queue;
queue = sched_local.ordered.src_queue;
- ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count); + ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count && + !sched_local.ordered.lock_called.u8[lock_index]);
- wait_for_order(queue); + ord_lock = &queue->s.ordered.lock[lock_index]; + + /* Busy loop to synchronize ordered processing */ + while (1) { + uint64_t lock_seq; + + lock_seq = odp_atomic_load_acq_u64(ord_lock); + + if (lock_seq == sched_local.ordered.ctx) { + sched_local.ordered.lock_called.u8[lock_index] = 1; + return; + } + odp_cpu_pause(); + } }
-static void schedule_order_unlock(unsigned lock_index ODP_UNUSED) +static void schedule_order_unlock(unsigned lock_index) { + odp_atomic_u64_t *ord_lock; + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count); + + ord_lock = &queue->s.ordered.lock[lock_index]; + + ODP_ASSERT(sched_local.ordered.ctx == odp_atomic_load_u64(ord_lock)); + + odp_atomic_store_rel_u64(ord_lock, sched_local.ordered.ctx + 1); }
static void schedule_pause(void)
commit d16d1783b8b01c5a7bbd256567fa226775a4ba93 Author: Matias Elo matias.elo@nokia.com Date: Fri Dec 2 12:56:27 2016 +0200
linux-gen: sched: new ordered queue implementation
Add new implementation for ordered queues. Compared to the old implementation this is much simpler and improves performance ~1-4x depending on the test case.
The implementation is based on an atomic ordered context, which only a single thread may possess at a time. Only the thread owning the atomic context may do enqueue(s) from the ordered queue. All other threads put their enqueued events to a thread local enqueue stash (ordered_stash_t). All stashed enqueue operations will be performed in the original order when the thread acquires the ordered context. If the ordered stash becomes full, the enqueue blocks. At the latest a thread blocks when the ev_stash is empty and the thread tries to release the order context.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index df36b76..b905bd8 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -56,6 +56,11 @@ struct queue_entry_s { odp_buffer_hdr_t *tail; int status;
+ struct { + odp_atomic_u64_t ctx; /**< Current ordered context id */ + odp_atomic_u64_t next_ctx; /**< Next unallocated context id */ + } ordered ODP_ALIGNED_CACHE; + enq_func_t enqueue ODP_ALIGNED_CACHE; deq_func_t dequeue; enq_multi_func_t enqueue_multi; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 99c91e7..4c7f497 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -73,9 +73,14 @@ static int queue_init(queue_entry_t *queue, const char *name, if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks()) return -1;
- if (param->type == ODP_QUEUE_TYPE_SCHED) + if (param->type == ODP_QUEUE_TYPE_SCHED) { queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
+ if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) { + odp_atomic_init_u64(&queue->s.ordered.ctx, 0); + odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0); + } + } queue->s.type = queue->s.param.type;
queue->s.enqueue = queue_enq; @@ -301,6 +306,13 @@ int odp_queue_destroy(odp_queue_t handle) ODP_ERR("queue "%s" not empty\n", queue->s.name); return -1; } + if (queue_is_ordered(queue) && + odp_atomic_load_u64(&queue->s.ordered.ctx) != + odp_atomic_load_u64(&queue->s.ordered.next_ctx)) { + UNLOCK(&queue->s.lock); + ODP_ERR("queue "%s" reorder incomplete\n", queue->s.name); + return -1; + }
switch (queue->s.status) { case QUEUE_STATUS_READY: diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 5bc274f..2ce90aa 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -111,11 +111,21 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, #define MAX_DEQ CONFIG_BURST_SIZE
/* Maximum number of ordered locks per queue */ -#define MAX_ORDERED_LOCKS_PER_QUEUE 1 +#define MAX_ORDERED_LOCKS_PER_QUEUE 2
ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS, "Too_many_ordered_locks");
+/* Ordered stash size */ +#define MAX_ORDERED_STASH 512 + +/* Storage for stashed enqueue operation arguments */ +typedef struct { + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + queue_entry_t *queue; + int num; +} ordered_stash_t; + /* Scheduler local data */ typedef struct { int thr; @@ -128,7 +138,15 @@ typedef struct { uint32_t queue_index; odp_queue_t queue; odp_event_t ev_stash[MAX_DEQ]; - void *queue_entry; + struct { + queue_entry_t *src_queue; /**< Source queue entry */ + uint64_t ctx; /**< Ordered context id */ + int stash_num; /**< Number of stashed enqueue operations */ + uint8_t in_order; /**< Order status */ + /** Storage for stashed enqueue operations */ + ordered_stash_t stash[MAX_ORDERED_STASH]; + } ordered; + } sched_local_t;
/* Priority queue */ @@ -491,17 +509,81 @@ static void schedule_release_atomic(void) } }
+static inline int ordered_own_turn(queue_entry_t *queue) +{ + uint64_t ctx; + + ctx = odp_atomic_load_acq_u64(&queue->s.ordered.ctx); + + return ctx == sched_local.ordered.ctx; +} + +static inline void wait_for_order(queue_entry_t *queue) +{ + /* Busy loop to synchronize ordered processing */ + while (1) { + if (ordered_own_turn(queue)) + break; + odp_cpu_pause(); + } +} + +/** + * Perform stashed enqueue operations + * + * Should be called only when already in order. + */ +static inline void ordered_stash_release(void) +{ + int i; + + for (i = 0; i < sched_local.ordered.stash_num; i++) { + queue_entry_t *queue; + odp_buffer_hdr_t **buf_hdr; + int num; + + queue = sched_local.ordered.stash[i].queue; + buf_hdr = sched_local.ordered.stash[i].buf_hdr; + num = sched_local.ordered.stash[i].num; + + queue_enq_multi(queue, buf_hdr, num); + } + sched_local.ordered.stash_num = 0; +} + +static inline void release_ordered(void) +{ + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + wait_for_order(queue); + + sched_local.ordered.src_queue = NULL; + sched_local.ordered.in_order = 0; + + ordered_stash_release(); + + /* Next thread can continue processing */ + odp_atomic_add_rel_u64(&queue->s.ordered.ctx, 1); +} + static void schedule_release_ordered(void) { - /* Process ordered queue as atomic */ - schedule_release_atomic(); - sched_local.queue_entry = NULL; + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + if (odp_unlikely(!queue || sched_local.num)) + return; + + release_ordered(); }
static inline void schedule_release_context(void) { - if (sched_local.queue_entry != NULL) - schedule_release_ordered(); + if (sched_local.ordered.src_queue != NULL) + release_ordered(); else schedule_release_atomic(); } @@ -524,13 +606,41 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max) static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num, int *ret) { - (void)queue_index; - (void)buf_hdr; - (void)num; - (void)ret; + int i; + uint32_t stash_num = sched_local.ordered.stash_num; + queue_entry_t *dst_queue = get_qentry(queue_index); + queue_entry_t *src_queue = sched_local.ordered.src_queue;
- /* didn't consume the events */ - return 0; + if (!sched_local.ordered.src_queue || sched_local.ordered.in_order) + return 0; + + if (ordered_own_turn(src_queue)) { + /* Own turn, so can do enqueue directly. */ + sched_local.ordered.in_order = 1; + ordered_stash_release(); + return 0; + } + + if (odp_unlikely(stash_num >= MAX_ORDERED_STASH)) { + /* If the local stash is full, wait until it is our turn and + * then release the stash and do enqueue directly. */ + wait_for_order(src_queue); + + sched_local.ordered.in_order = 1; + + ordered_stash_release(); + return 0; + } + + sched_local.ordered.stash[stash_num].queue = dst_queue; + sched_local.ordered.stash[stash_num].num = num; + for (i = 0; i < num; i++) + sched_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i]; + + sched_local.ordered.stash_num++; + + *ret = num; + return 1; }
/* @@ -658,9 +768,21 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], ret = copy_events(out_ev, max_num);
if (ordered) { - /* Operate as atomic */ - sched_local.queue_index = qi; - sched_local.queue_entry = get_qentry(qi); + uint64_t ctx; + queue_entry_t *queue; + odp_atomic_u64_t *next_ctx; + + queue = get_qentry(qi); + next_ctx = &queue->s.ordered.next_ctx; + + ctx = odp_atomic_fetch_inc_u64(next_ctx); + + sched_local.ordered.ctx = ctx; + sched_local.ordered.src_queue = queue; + + /* Continue scheduling ordered queues */ + ring_enq(ring, PRIO_QUEUE_MASK, qi); + } else if (sched_cb_queue_is_atomic(qi)) { /* Hold queue during atomic access */ sched_local.queue_index = qi; @@ -785,8 +907,16 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t wait, return schedule_loop(out_queue, wait, events, num); }
-static void order_lock(void) +static inline void order_lock(void) { + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + if (!queue) + return; + + wait_for_order(queue); }
static void order_unlock(void) @@ -795,6 +925,13 @@ static void order_unlock(void)
static void schedule_order_lock(unsigned lock_index ODP_UNUSED) { + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count); + + wait_for_order(queue); }
static void schedule_order_unlock(unsigned lock_index ODP_UNUSED)
commit d762feebade0ac71bbfbfeb31a9ed7c181ff1d91 Author: Matias Elo matias.elo@nokia.com Date: Fri Dec 2 12:56:26 2016 +0200
linux-gen: sched: add internal API for max number of ordered locks per queue
The number of supported ordered locks may vary between the scheduler implementations. Add an internal scheduler API call for fetching the maximum value from currently active scheduler.
Add an internal definition CONFIG_QUEUE_MAX_ORD_LOCKS for the scheduler independent maximum value.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_config_internal.h b/platform/linux-generic/include/odp_config_internal.h index 8818cda..c494660 100644 --- a/platform/linux-generic/include/odp_config_internal.h +++ b/platform/linux-generic/include/odp_config_internal.h @@ -22,6 +22,11 @@ extern "C" { #define ODP_CONFIG_QUEUES 1024
/* + * Maximum number of ordered locks per queue + */ +#define CONFIG_QUEUE_MAX_ORD_LOCKS 4 + +/* * Maximum number of packet IO resources */ #define ODP_CONFIG_PKTIO_ENTRIES 64 diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 72af01e..6c2b050 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -14,12 +14,6 @@ extern "C" { #include <odp/api/queue.h> #include <odp/api/schedule.h>
-/* Constants defined by the scheduler. These should be converted into interface - * functions. */ - -/* Number of ordered locks per queue */ -#define SCHEDULE_ORDERED_LOCKS_PER_QUEUE 2 - typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue, int in_queue_idx[]); typedef int (*schedule_thr_add_fn_t)(odp_schedule_group_t group, int thr); @@ -38,6 +32,7 @@ typedef int (*schedule_init_local_fn_t)(void); typedef int (*schedule_term_local_fn_t)(void); typedef void (*schedule_order_lock_fn_t)(void); typedef void (*schedule_order_unlock_fn_t)(void); +typedef unsigned (*schedule_max_ordered_locks_fn_t)(void);
typedef struct schedule_fn_t { schedule_pktio_start_fn_t pktio_start; @@ -54,6 +49,7 @@ typedef struct schedule_fn_t { schedule_term_local_fn_t term_local; schedule_order_lock_fn_t order_lock; schedule_order_unlock_fn_t order_unlock; + schedule_max_ordered_locks_fn_t max_ordered_locks; } schedule_fn_t;
/* Interface towards the scheduler */ diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 74f384d..99c91e7 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -70,8 +70,7 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0; } memcpy(&queue->s.param, param, sizeof(odp_queue_param_t)); - if (queue->s.param.sched.lock_count > - SCHEDULE_ORDERED_LOCKS_PER_QUEUE) + if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks()) return -1;
if (param->type == ODP_QUEUE_TYPE_SCHED) @@ -162,7 +161,7 @@ int odp_queue_capability(odp_queue_capability_t *capa)
/* Reserve some queues for internal use */ capa->max_queues = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; - capa->max_ordered_locks = SCHEDULE_ORDERED_LOCKS_PER_QUEUE; + capa->max_ordered_locks = sched_fn->max_ordered_locks(); capa->max_sched_groups = sched_fn->num_grps(); capa->sched_prios = odp_schedule_num_prio();
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 50639ff..5bc274f 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -110,6 +110,12 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, /* Maximum number of dequeues */ #define MAX_DEQ CONFIG_BURST_SIZE
+/* Maximum number of ordered locks per queue */ +#define MAX_ORDERED_LOCKS_PER_QUEUE 1 + +ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS, + "Too_many_ordered_locks"); + /* Scheduler local data */ typedef struct { int thr; @@ -323,6 +329,11 @@ static int schedule_term_local(void) return 0; }
+static unsigned schedule_max_ordered_locks(void) +{ + return MAX_ORDERED_LOCKS_PER_QUEUE; +} + static inline int queue_per_prio(uint32_t queue_index) { return ((QUEUES_PER_PRIO - 1) & queue_index); @@ -1026,7 +1037,8 @@ const schedule_fn_t schedule_default_fn = { .init_local = schedule_init_local, .term_local = schedule_term_local, .order_lock = order_lock, - .order_unlock = order_unlock + .order_unlock = order_unlock, + .max_ordered_locks = schedule_max_ordered_locks };
/* Fill in scheduler API calls */ diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 069b8bf..76d1357 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -28,6 +28,10 @@ #define GROUP_ALL ODP_SCHED_GROUP_ALL #define GROUP_WORKER ODP_SCHED_GROUP_WORKER #define GROUP_CONTROL ODP_SCHED_GROUP_CONTROL +#define MAX_ORDERED_LOCKS_PER_QUEUE 1 + +ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS, + "Too_many_ordered_locks");
struct sched_cmd_t;
@@ -162,6 +166,11 @@ static int term_local(void) return 0; }
+static unsigned max_ordered_locks(void) +{ + return MAX_ORDERED_LOCKS_PER_QUEUE; +} + static int thr_add(odp_schedule_group_t group, int thr) { sched_group_t *sched_group = &sched_global.sched_group; @@ -682,7 +691,8 @@ const schedule_fn_t schedule_sp_fn = { .init_local = init_local, .term_local = term_local, .order_lock = order_lock, - .order_unlock = order_unlock + .order_unlock = order_unlock, + .max_ordered_locks = max_ordered_locks };
/* Fill in scheduler API calls */
commit 2796eef79f73c47e29c0d40d411a5547277836ee Author: Matias Elo matias.elo@nokia.com Date: Fri Dec 2 12:56:25 2016 +0200
linux-gen: sched: remove old ordered queue implementation
Remove old ordered queue code. Replaced temporarily by atomic handling.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index b60eacb..adbe24d 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -153,8 +153,6 @@ noinst_HEADERS = \ ${srcdir}/include/odp_queue_internal.h \ ${srcdir}/include/odp_ring_internal.h \ ${srcdir}/include/odp_schedule_if.h \ - ${srcdir}/include/odp_schedule_internal.h \ - ${srcdir}/include/odp_schedule_ordered_internal.h \ ${srcdir}/include/odp_sorted_list_internal.h \ ${srcdir}/include/odp_shm_internal.h \ ${srcdir}/include/odp_timer_internal.h \ @@ -208,7 +206,6 @@ __LIB__libodp_linux_la_SOURCES = \ odp_rwlock_recursive.c \ odp_schedule.c \ odp_schedule_if.c \ - odp_schedule_ordered.c \ odp_schedule_sp.c \ odp_shared_memory.c \ odp_sorted_list.c \ diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 4e75908..2064f7c 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -79,7 +79,6 @@ struct odp_buffer_hdr_t { uint32_t all; struct { uint32_t hdrdata:1; /* Data is in buffer hdr */ - uint32_t sustain:1; /* Sustain order */ }; } flags;
@@ -95,12 +94,6 @@ struct odp_buffer_hdr_t { uint32_t uarea_size; /* size of user area */ uint32_t segcount; /* segment count */ uint32_t segsize; /* segment size */ - uint64_t order; /* sequence for ordered queues */ - queue_entry_t *origin_qe; /* ordered queue origin */ - union { - queue_entry_t *target_qe; /* ordered queue target */ - uint64_t sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; - }; #ifdef _ODP_PKTIO_IPC /* ipc mapped process can not walk over pointers, * offset has to be used */ diff --git a/platform/linux-generic/include/odp_packet_io_queue.h b/platform/linux-generic/include/odp_packet_io_queue.h index 13b79f3..d1d4b22 100644 --- a/platform/linux-generic/include/odp_packet_io_queue.h +++ b/platform/linux-generic/include/odp_packet_io_queue.h @@ -28,11 +28,10 @@ extern "C" { ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX, "ODP_PKTIN_DEQ_MULTI_MAX_ERROR");
-int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain); +int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue);
-int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, - int sustain); +int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index e223d9f..df36b76 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -41,11 +41,11 @@ extern "C" { /* forward declaration */ union queue_entry_u;
-typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *, int); +typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *); typedef odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *);
typedef int (*enq_multi_func_t)(union queue_entry_u *, - odp_buffer_hdr_t **, int, int); + odp_buffer_hdr_t **, int); typedef int (*deq_multi_func_t)(union queue_entry_u *, odp_buffer_hdr_t **, int);
@@ -68,12 +68,6 @@ struct queue_entry_s { odp_pktin_queue_t pktin; odp_pktout_queue_t pktout; char name[ODP_QUEUE_NAME_LEN]; - uint64_t order_in; - uint64_t order_out; - odp_buffer_hdr_t *reorder_head; - odp_buffer_hdr_t *reorder_tail; - odp_atomic_u64_t sync_in[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; - odp_atomic_u64_t sync_out[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; };
union queue_entry_u { @@ -84,24 +78,12 @@ union queue_entry_u {
queue_entry_t *get_qentry(uint32_t queue_id);
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain); +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, - int sustain); +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
-int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain); -int queue_pktout_enq_multi(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr[], int num, int sustain); - -int queue_tm_reenq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain); -int queue_tm_reenq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num, int sustain); -int queue_tm_reorder(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); - void queue_lock(queue_entry_t *queue); void queue_unlock(queue_entry_t *queue);
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 37f88a4..72af01e 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -31,8 +31,7 @@ typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index, typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index); typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index); typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index, - void *buf_hdr[], int num, - int sustain, int *ret); + void *buf_hdr[], int num, int *ret); typedef int (*schedule_init_global_fn_t)(void); typedef int (*schedule_term_global_fn_t)(void); typedef int (*schedule_init_local_fn_t)(void); diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h deleted file mode 100644 index 02637c2..0000000 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ /dev/null @@ -1,50 +0,0 @@ -/* Copyright (c) 2016, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef ODP_SCHEDULE_INTERNAL_H_ -#define ODP_SCHEDULE_INTERNAL_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -/* Maximum number of dequeues */ -#define MAX_DEQ CONFIG_BURST_SIZE - -typedef struct { - int thr; - int num; - int index; - int pause; - uint16_t round; - uint16_t prefer_offset; - uint16_t pktin_polls; - uint32_t queue_index; - odp_queue_t queue; - odp_event_t ev_stash[MAX_DEQ]; - void *origin_qe; - uint64_t order; - uint64_t sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; - odp_pool_t pool; - int enq_called; - int ignore_ordered_context; -} sched_local_t; - -extern __thread sched_local_t sched_local; - -void cache_order_info(uint32_t queue_index); -int release_order(void *origin_qe, uint64_t order, - odp_pool_t pool, int enq_called); - -/* API functions implemented in odp_schedule_ordered.c */ -void schedule_order_lock(unsigned lock_index); -void schedule_order_unlock(unsigned lock_index); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/platform/linux-generic/include/odp_schedule_ordered_internal.h b/platform/linux-generic/include/odp_schedule_ordered_internal.h deleted file mode 100644 index 0ffbe3a..0000000 --- a/platform/linux-generic/include/odp_schedule_ordered_internal.h +++ /dev/null @@ -1,25 +0,0 @@ -/* Copyright (c) 2016, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef ODP_SCHEDULE_ORDERED_INTERNAL_H_ -#define ODP_SCHEDULE_ORDERED_INTERNAL_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#define SUSTAIN_ORDER 1 - -int schedule_ordered_queue_enq(uint32_t queue_index, void *p_buf_hdr, - int sustain, int *ret); -int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[], - int num, int sustain, int *ret); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 7566789..98460a5 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -570,7 +570,7 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue, int ret;
dst_queue = queue_to_qentry(pkt_hdr->dst_queue); - ret = queue_enq(dst_queue, buf_hdr, 0); + ret = queue_enq(dst_queue, buf_hdr); if (ret < 0) odp_packet_free(pkt); continue; @@ -619,7 +619,7 @@ int pktout_deq_multi(queue_entry_t *qentry ODP_UNUSED, }
int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED, - odp_buffer_hdr_t *buf_hdr ODP_UNUSED, int sustain ODP_UNUSED) + odp_buffer_hdr_t *buf_hdr ODP_UNUSED) { ODP_ABORT("attempted enqueue to a pktin queue"); return -1; @@ -641,14 +641,13 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry) return NULL;
if (pkts > 1) - queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1, 0); + queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1); buf_hdr = hdr_tbl[0]; return buf_hdr; }
int pktin_enq_multi(queue_entry_t *qentry ODP_UNUSED, - odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, - int num ODP_UNUSED, int sustain ODP_UNUSED) + odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, int num ODP_UNUSED) { ODP_ABORT("attempted enqueue to a pktin queue"); return 0; @@ -682,7 +681,7 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num) hdr_tbl[j] = hdr_tbl[i];
if (j) - queue_enq_multi(qentry, hdr_tbl, j, 0); + queue_enq_multi(qentry, hdr_tbl, j); return nbr; }
@@ -720,7 +719,7 @@ int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
queue = entry->s.in_queue[index[idx]].queue; qentry = queue_to_qentry(queue); - queue_enq_multi(qentry, hdr_tbl, num, 0); + queue_enq_multi(qentry, hdr_tbl, num); }
return 0; @@ -1386,9 +1385,9 @@ int odp_pktout_queue_config(odp_pktio_t pktio, qentry->s.pktout.pktio = pktio;
/* Override default enqueue / dequeue functions */ - qentry->s.enqueue = queue_pktout_enq; + qentry->s.enqueue = pktout_enqueue; qentry->s.dequeue = pktout_dequeue; - qentry->s.enqueue_multi = queue_pktout_enq_multi; + qentry->s.enqueue_multi = pktout_enq_multi; qentry->s.dequeue_multi = pktout_deq_multi;
entry->s.out_queue[i].queue = queue; diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c index 8c38c93..4be3827 100644 --- a/platform/linux-generic/odp_pool.c +++ b/platform/linux-generic/odp_pool.c @@ -588,7 +588,6 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_t buf[], uint32_t mask, i; pool_cache_t *cache; uint32_t cache_num, num_ch, num_deq, burst; - odp_buffer_hdr_t *hdr;
ring = &pool->ring.hdr; mask = pool->ring_mask; @@ -609,13 +608,8 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_t buf[], }
/* Get buffers from the cache */ - for (i = 0; i < num_ch; i++) { + for (i = 0; i < num_ch; i++) buf[i] = cache->buf[cache_num - num_ch + i]; - hdr = buf_hdl_to_hdr(buf[i]); - hdr->origin_qe = NULL; - if (buf_hdr) - buf_hdr[i] = hdr; - }
/* If needed, get more from the global pool */ if (odp_unlikely(num_deq)) { @@ -635,11 +629,9 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_t buf[], uint32_t idx = num_ch + i;
buf[idx] = (odp_buffer_t)(uintptr_t)data[i]; - hdr = buf_hdl_to_hdr(buf[idx]); - hdr->origin_qe = NULL;
if (buf_hdr) { - buf_hdr[idx] = hdr; + buf_hdr[idx] = buf_hdl_to_hdr(buf[idx]); /* Prefetch newly allocated and soon to be used * buffer headers. */ odp_prefetch(buf_hdr[idx]); @@ -656,6 +648,11 @@ int buffer_alloc_multi(pool_t *pool, odp_buffer_t buf[], cache->num = cache_num - num_ch; }
+ if (buf_hdr) { + for (i = 0; i < num_ch; i++) + buf_hdr[i] = buf_hdl_to_hdr(buf[i]); + } + return num_ch + num_deq; }
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 43e212a..74f384d 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -23,7 +23,6 @@ #include <odp/api/hints.h> #include <odp/api/sync.h> #include <odp/api/traffic_mngr.h> -#include <odp_schedule_ordered_internal.h>
#define NUM_INTERNAL_QUEUES 64
@@ -90,16 +89,13 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.head = NULL; queue->s.tail = NULL;
- queue->s.reorder_head = NULL; - queue->s.reorder_tail = NULL; - return 0; }
int odp_queue_init_global(void) { - uint32_t i, j; + uint32_t i; odp_shm_t shm;
ODP_DBG("Queue init ... "); @@ -119,10 +115,6 @@ int odp_queue_init_global(void) /* init locks */ queue_entry_t *queue = get_qentry(i); LOCK_INIT(&queue->s.lock); - for (j = 0; j < SCHEDULE_ORDERED_LOCKS_PER_QUEUE; j++) { - odp_atomic_init_u64(&queue->s.sync_in[j], 0); - odp_atomic_init_u64(&queue->s.sync_out[j], 0); - } queue->s.index = i; queue->s.handle = queue_from_id(i); } @@ -310,12 +302,6 @@ int odp_queue_destroy(odp_queue_t handle) ODP_ERR("queue "%s" not empty\n", queue->s.name); return -1; } - if (queue_is_ordered(queue) && queue->s.reorder_head) { - UNLOCK(&queue->s.lock); - ODP_ERR("queue "%s" reorder queue not empty\n", - queue->s.name); - return -1; - }
switch (queue->s.status) { case QUEUE_STATUS_READY: @@ -379,15 +365,14 @@ odp_queue_t odp_queue_lookup(const char *name) }
static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num, int sustain) + int num) { int sched = 0; int i, ret; odp_buffer_hdr_t *hdr, *tail, *next_hdr;
- /* Ordered queues do not use bursts */ if (sched_fn->ord_enq_multi(queue->s.index, (void **)buf_hdr, num, - sustain, &ret)) + &ret)) return ret;
/* Optimize the common case of single enqueue */ @@ -395,12 +380,14 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], tail = buf_hdr[0]; hdr = tail; hdr->burst_num = 0; + hdr->next = NULL; } else { int next;
/* Start from the last buffer header */ tail = buf_hdr[num - 1]; hdr = tail; + hdr->next = NULL; next = num - 2;
while (1) { @@ -453,17 +440,16 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return num; /* All events enqueued */ }
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, - int sustain) +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - return enq_multi(queue, buf_hdr, num, sustain); + return enq_multi(queue, buf_hdr, num); }
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) { int ret;
- ret = enq_multi(queue, &buf_hdr, 1, sustain); + ret = enq_multi(queue, &buf_hdr, 1);
if (ret == 1) return 0; @@ -486,7 +472,7 @@ int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num) buf_hdr[i] = buf_hdl_to_hdr(odp_buffer_from_event(ev[i]));
return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, - num, SUSTAIN_ORDER); + num); }
int odp_queue_enq(odp_queue_t handle, odp_event_t ev) @@ -500,7 +486,7 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) /* No chains via this entry */ buf_hdr->link = NULL;
- return queue->s.enqueue(queue, buf_hdr, SUSTAIN_ORDER); + return queue->s.enqueue(queue, buf_hdr); }
static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], @@ -557,22 +543,6 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], i++; }
- /* Ordered queue book keeping inside the lock */ - if (queue_is_ordered(queue)) { - for (j = 0; j < i; j++) { - uint32_t k; - - buf_hdr[j]->origin_qe = queue; - buf_hdr[j]->order = queue->s.order_in++; - for (k = 0; k < queue->s.param.sched.lock_count; k++) { - buf_hdr[j]->sync[k] = - odp_atomic_fetch_inc_u64 - (&queue->s.sync_in[k]); - } - buf_hdr[j]->flags.sustain = SUSTAIN_ORDER; - } - } - /* Write head only if updated */ if (updated) queue->s.head = hdr; @@ -583,11 +553,6 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
UNLOCK(&queue->s.lock);
- /* Init origin_qe for non-ordered queues */ - if (!queue_is_ordered(queue)) - for (j = 0; j < i; j++) - buf_hdr[j]->origin_qe = NULL; - return i; }
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index cab68a3..50639ff 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -19,10 +19,9 @@ #include <odp/api/thrmask.h> #include <odp_config_internal.h> #include <odp_align_internal.h> -#include <odp_schedule_internal.h> -#include <odp_schedule_ordered_internal.h> #include <odp/api/sync.h> #include <odp_ring_internal.h> +#include <odp_queue_internal.h>
/* Number of priority levels */ #define NUM_PRIO 8 @@ -108,6 +107,24 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, /* Start of named groups in group mask arrays */ #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
+/* Maximum number of dequeues */ +#define MAX_DEQ CONFIG_BURST_SIZE + +/* Scheduler local data */ +typedef struct { + int thr; + int num; + int index; + int pause; + uint16_t round; + uint16_t prefer_offset; + uint16_t pktin_polls; + uint32_t queue_index; + odp_queue_t queue; + odp_event_t ev_stash[MAX_DEQ]; + void *queue_entry; +} sched_local_t; + /* Priority queue */ typedef struct { /* Ring header */ @@ -465,23 +482,16 @@ static void schedule_release_atomic(void)
static void schedule_release_ordered(void) { - if (sched_local.origin_qe) { - int rc = release_order(sched_local.origin_qe, - sched_local.order, - sched_local.pool, - sched_local.enq_called); - if (rc == 0) - sched_local.origin_qe = NULL; - } + /* Process ordered queue as atomic */ + schedule_release_atomic(); + sched_local.queue_entry = NULL; }
static inline void schedule_release_context(void) { - if (sched_local.origin_qe != NULL) { - release_order(sched_local.origin_qe, sched_local.order, - sched_local.pool, sched_local.enq_called); - sched_local.origin_qe = NULL; - } else + if (sched_local.queue_entry != NULL) + schedule_release_ordered(); + else schedule_release_atomic(); }
@@ -500,6 +510,18 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max) return i; }
+static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[], + int num, int *ret) +{ + (void)queue_index; + (void)buf_hdr; + (void)num; + (void)ret; + + /* didn't consume the events */ + return 0; +} + /* * Schedule queues */ @@ -596,12 +618,11 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
ordered = sched_cb_queue_is_ordered(qi);
- /* For ordered queues we want consecutive events to - * be dispatched to separate threads, so do not cache - * them locally. - */ - if (ordered) - max_deq = 1; + /* Do not cache ordered events locally to improve + * parallelism. Ordered context can only be released + * when the local cache is empty. */ + if (ordered && max_num < MAX_DEQ) + max_deq = max_num;
num = sched_cb_queue_deq_multi(qi, sched_local.ev_stash, max_deq); @@ -626,11 +647,9 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], ret = copy_events(out_ev, max_num);
if (ordered) { - /* Continue scheduling ordered queues */ - ring_enq(ring, PRIO_QUEUE_MASK, qi); - - /* Cache order info about this event */ - cache_order_info(qi); + /* Operate as atomic */ + sched_local.queue_index = qi; + sched_local.queue_entry = get_qentry(qi); } else if (sched_cb_queue_is_atomic(qi)) { /* Hold queue during atomic access */ sched_local.queue_index = qi; @@ -763,6 +782,14 @@ static void order_unlock(void) { }
+static void schedule_order_lock(unsigned lock_index ODP_UNUSED) +{ +} + +static void schedule_order_unlock(unsigned lock_index ODP_UNUSED) +{ +} + static void schedule_pause(void) { sched_local.pause = 1; @@ -975,8 +1002,6 @@ static int schedule_sched_queue(uint32_t queue_index) int queue_per_prio = sched->queue[queue_index].queue_per_prio; ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
- sched_local.ignore_ordered_context = 1; - ring_enq(ring, PRIO_QUEUE_MASK, queue_index); return 0; } @@ -995,7 +1020,7 @@ const schedule_fn_t schedule_default_fn = { .init_queue = schedule_init_queue, .destroy_queue = schedule_destroy_queue, .sched_queue = schedule_sched_queue, - .ord_enq_multi = schedule_ordered_queue_enq_multi, + .ord_enq_multi = schedule_ord_enq_multi, .init_global = schedule_init_global, .term_global = schedule_term_global, .init_local = schedule_init_local, diff --git a/platform/linux-generic/odp_schedule_ordered.c b/platform/linux-generic/odp_schedule_ordered.c deleted file mode 100644 index 5574faf..0000000 --- a/platform/linux-generic/odp_schedule_ordered.c +++ /dev/null @@ -1,818 +0,0 @@ -/* Copyright (c) 2016, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include <odp_packet_io_queue.h> -#include <odp_queue_internal.h> -#include <odp_schedule_if.h> -#include <odp_schedule_ordered_internal.h> -#include <odp_traffic_mngr_internal.h> -#include <odp_schedule_internal.h> - -#define RESOLVE_ORDER 0 -#define NOAPPEND 0 -#define APPEND 1 - -static inline void sched_enq_called(void) -{ - sched_local.enq_called = 1; -} - -static inline void get_sched_order(queue_entry_t **origin_qe, uint64_t *order) -{ - if (sched_local.ignore_ordered_context) { - sched_local.ignore_ordered_context = 0; - *origin_qe = NULL; - } else { - *origin_qe = sched_local.origin_qe; - *order = sched_local.order; - } -} - -static inline void sched_order_resolved(odp_buffer_hdr_t *buf_hdr) -{ - if (buf_hdr) - buf_hdr->origin_qe = NULL; - sched_local.origin_qe = NULL; -} - -static inline void get_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2) -{ - /* Special case: enq to self */ - if (qe1 == qe2) { - queue_lock(qe1); - return; - } - - /* Since any queue can be either a source or target, queues do not have - * a natural locking hierarchy. Create one by using the qentry address - * as the ordering mechanism. - */ - - if (qe1 < qe2) { - queue_lock(qe1); - queue_lock(qe2); - } else { - queue_lock(qe2); - queue_lock(qe1); - } -} - -static inline void free_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2) -{ - queue_unlock(qe1); - if (qe1 != qe2) - queue_unlock(qe2); -} - -static inline odp_buffer_hdr_t *get_buf_tail(odp_buffer_hdr_t *buf_hdr) -{ - odp_buffer_hdr_t *buf_tail = buf_hdr->link ? buf_hdr->link : buf_hdr; - - buf_hdr->next = buf_hdr->link; - buf_hdr->link = NULL; - - while (buf_tail->next) - buf_tail = buf_tail->next; - - return buf_tail; -} - -static inline void queue_add_list(queue_entry_t *queue, - odp_buffer_hdr_t *buf_head, - odp_buffer_hdr_t *buf_tail) -{ - if (queue->s.head) - queue->s.tail->next = buf_head; - else - queue->s.head = buf_head; - - queue->s.tail = buf_tail; -} - -static inline void queue_add_chain(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr) -{ - queue_add_list(queue, buf_hdr, get_buf_tail(buf_hdr)); -} - -static inline void reorder_enq(queue_entry_t *queue, - uint64_t order, - queue_entry_t *origin_qe, - odp_buffer_hdr_t *buf_hdr, - int sustain) -{ - odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; - odp_buffer_hdr_t *reorder_prev = NULL; - - while (reorder_buf && order >= reorder_buf->order) { - reorder_prev = reorder_buf; - reorder_buf = reorder_buf->next; - } - - buf_hdr->next = reorder_buf; - - if (reorder_prev) - reorder_prev->next = buf_hdr; - else - origin_qe->s.reorder_head = buf_hdr; - - if (!reorder_buf) - origin_qe->s.reorder_tail = buf_hdr; - - buf_hdr->origin_qe = origin_qe; - buf_hdr->target_qe = queue; - buf_hdr->order = order; - buf_hdr->flags.sustain = sustain; -} - -static inline void order_release(queue_entry_t *origin_qe, int count) -{ - uint64_t sync; - uint32_t i; - - origin_qe->s.order_out += count; - - for (i = 0; i < origin_qe->s.param.sched.lock_count; i++) { - sync = odp_atomic_load_u64(&origin_qe->s.sync_out[i]); - if (sync < origin_qe->s.order_out) - odp_atomic_fetch_add_u64(&origin_qe->s.sync_out[i], - origin_qe->s.order_out - sync); - } -} - -static inline int reorder_deq(queue_entry_t *queue, - queue_entry_t *origin_qe, - odp_buffer_hdr_t **reorder_tail_return, - odp_buffer_hdr_t **placeholder_buf_return, - int *release_count_return, - int *placeholder_count_return) -{ - odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; - odp_buffer_hdr_t *reorder_tail = NULL; - odp_buffer_hdr_t *placeholder_buf = NULL; - odp_buffer_hdr_t *next_buf; - int deq_count = 0; - int release_count = 0; - int placeholder_count = 0; - - while (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out + - release_count + placeholder_count) { - /* - * Elements on the reorder list fall into one of - * three categories: - * - * 1. Those destined for the same queue. These - * can be enq'd now if they were waiting to - * be unblocked by this enq. - * - * 2. Those representing placeholders for events - * whose ordering was released by a prior - * odp_schedule_release_ordered() call. These - * can now just be freed. - * - * 3. Those representing events destined for another - * queue. These cannot be consolidated with this - * enq since they have a different target. - * - * Detecting an element with an order sequence gap, an - * element in category 3, or running out of elements - * stops the scan. - */ - next_buf = reorder_buf->next; - - if (odp_likely(reorder_buf->target_qe == queue)) { - /* promote any chain */ - odp_buffer_hdr_t *reorder_link = - reorder_buf->link; - - if (reorder_link) { - reorder_buf->next = reorder_link; - reorder_buf->link = NULL; - while (reorder_link->next) - reorder_link = reorder_link->next; - reorder_link->next = next_buf; - reorder_tail = reorder_link; - } else { - reorder_tail = reorder_buf; - } - - deq_count++; - if (!reorder_buf->flags.sustain) - release_count++; - reorder_buf = next_buf; - } else if (!reorder_buf->target_qe) { - if (reorder_tail) - reorder_tail->next = next_buf; - else - origin_qe->s.reorder_head = next_buf; - - reorder_buf->next = placeholder_buf; - placeholder_buf = reorder_buf; - - reorder_buf = next_buf; - placeholder_count++; - } else { - break; - } - } - - *reorder_tail_return = reorder_tail; - *placeholder_buf_return = placeholder_buf; - *release_count_return = release_count; - *placeholder_count_return = placeholder_count; - - return deq_count; -} - -static inline void reorder_complete(queue_entry_t *origin_qe, - odp_buffer_hdr_t **reorder_buf_return, - odp_buffer_hdr_t **placeholder_buf, - int placeholder_append) -{ - odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; - odp_buffer_hdr_t *next_buf; - - *reorder_buf_return = NULL; - if (!placeholder_append) - *placeholder_buf = NULL; - - while (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out) { - next_buf = reorder_buf->next; - - if (!reorder_buf->target_qe) { - origin_qe->s.reorder_head = next_buf; - reorder_buf->next = *placeholder_buf; - *placeholder_buf = reorder_buf; - - reorder_buf = next_buf; - order_release(origin_qe, 1); - } else if (reorder_buf->flags.sustain) { - reorder_buf = next_buf; - } else { - *reorder_buf_return = origin_qe->s.reorder_head; - origin_qe->s.reorder_head = - origin_qe->s.reorder_head->next; - break; - } - } -} - -static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order, - odp_buffer_hdr_t *buf_hdr) -{ - if (buf_hdr && buf_hdr->origin_qe) { - *origin_qe = buf_hdr->origin_qe; - *order = buf_hdr->order; - } else { - get_sched_order(origin_qe, order); - } -} - -int queue_tm_reenq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain ODP_UNUSED) -{ - odp_tm_queue_t tm_queue = MAKE_ODP_TM_QUEUE((uint8_t *)queue - - offsetof(tm_queue_obj_t, - tm_qentry)); - odp_packet_t pkt = (odp_packet_t)buf_hdr->handle.handle; - - return odp_tm_enq(tm_queue, pkt); -} - -int queue_tm_reenq_multi(queue_entry_t *queue ODP_UNUSED, - odp_buffer_hdr_t *buf[] ODP_UNUSED, - int num ODP_UNUSED, - int sustain ODP_UNUSED) -{ - ODP_ABORT("Invalid call to queue_tm_reenq_multi()\n"); - return 0; -} - -int queue_tm_reorder(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr) -{ - queue_entry_t *origin_qe; - uint64_t order; - - get_queue_order(&origin_qe, &order, buf_hdr); - - if (!origin_qe) - return 0; - - /* Check if we're in order */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return 0; - } - - sched_enq_called(); - - /* Wait if it's not our turn */ - if (order > origin_qe->s.order_out) { - reorder_enq(queue, order, origin_qe, buf_hdr, SUSTAIN_ORDER); - queue_unlock(origin_qe); - return 1; - } - - /* Back to TM to handle enqueue - * - * Note: Order will be resolved by a subsequent call to - * odp_schedule_release_ordered() or odp_schedule() as odp_tm_enq() - * calls never resolve order by themselves. - */ - queue_unlock(origin_qe); - return 0; -} - -static int queue_enq_internal(odp_buffer_hdr_t *buf_hdr) -{ - return buf_hdr->target_qe->s.enqueue(buf_hdr->target_qe, buf_hdr, - buf_hdr->flags.sustain); -} - -static int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain, queue_entry_t *origin_qe, - uint64_t order) -{ - odp_buffer_hdr_t *reorder_buf; - odp_buffer_hdr_t *next_buf; - odp_buffer_hdr_t *reorder_tail; - odp_buffer_hdr_t *placeholder_buf = NULL; - int release_count, placeholder_count; - int sched = 0; - - /* Need two locks for enq operations from ordered queues */ - get_qe_locks(origin_qe, queue); - - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY || - queue->s.status < QUEUE_STATUS_READY)) { - free_qe_locks(queue, origin_qe); - ODP_ERR("Bad queue status\n"); - ODP_ERR("queue = %s, origin q = %s, buf = %p\n", - queue->s.name, origin_qe->s.name, buf_hdr); - return -1; - } - - /* Remember that enq was called for this order */ - sched_enq_called(); - - /* We can only complete this enq if we're in order */ - if (order > origin_qe->s.order_out) { - reorder_enq(queue, order, origin_qe, buf_hdr, sustain); - - /* This enq can't complete until order is restored, so - * we're done here. - */ - free_qe_locks(queue, origin_qe); - return 0; - } - - /* Resolve order if requested */ - if (!sustain) { - order_release(origin_qe, 1); - sched_order_resolved(buf_hdr); - } - - /* Update queue status */ - if (queue->s.status == QUEUE_STATUS_NOTSCHED) { - queue->s.status = QUEUE_STATUS_SCHED; - sched = 1; - } - - /* We're in order, however the reorder queue may have other buffers - * sharing this order on it and this buffer must not be enqueued ahead - * of them. If the reorder queue is empty we can short-cut and - * simply add to the target queue directly. - */ - - if (!origin_qe->s.reorder_head) { - queue_add_chain(queue, buf_hdr); - free_qe_locks(queue, origin_qe); - - /* Add queue to scheduling */ - if (sched && sched_fn->sched_queue(queue->s.index)) - ODP_ABORT("schedule_queue failed\n"); - return 0; - } - - /* The reorder_queue is non-empty, so sort this buffer into it. Note - * that we force the sustain bit on here because we'll be removing - * this immediately and we already accounted for this order earlier. - */ - reorder_enq(queue, order, origin_qe, buf_hdr, 1); - - /* Pick up this element, and all others resolved by this enq, - * and add them to the target queue. - */ - reorder_deq(queue, origin_qe, &reorder_tail, &placeholder_buf, - &release_count, &placeholder_count); - - /* Move the list from the reorder queue to the target queue */ - if (queue->s.head) - queue->s.tail->next = origin_qe->s.reorder_head; - else - queue->s.head = origin_qe->s.reorder_head; - queue->s.tail = reorder_tail; - origin_qe->s.reorder_head = reorder_tail->next; - reorder_tail->next = NULL; - - /* Reflect resolved orders in the output sequence */ - order_release(origin_qe, release_count + placeholder_count); - - /* Now handle any resolved orders for events destined for other - * queues, appending placeholder bufs as needed. - */ - if (origin_qe != queue) - queue_unlock(queue); - - /* Add queue to scheduling */ - if (sched && sched_fn->sched_queue(queue->s.index)) - ODP_ABORT("schedule_queue failed\n"); - - reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND); - queue_unlock(origin_qe); - - if (reorder_buf) - queue_enq_internal(reorder_buf); - - /* Free all placeholder bufs that are now released */ - while (placeholder_buf) { - next_buf = placeholder_buf->next; - odp_buffer_free(placeholder_buf->handle.handle); - placeholder_buf = next_buf; - } - - return 0; -} - -int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[], - int num, int sustain, int *ret) -{ - queue_entry_t *origin_qe; - uint64_t order; - int i, rc; - queue_entry_t *qe = get_qentry(queue_index); - odp_buffer_hdr_t *first_hdr = p_buf_hdr[0]; - odp_buffer_hdr_t **buf_hdr = (odp_buffer_hdr_t **)p_buf_hdr; - - /* Chain input buffers together */ - for (i = 0; i < num - 1; i++) { - buf_hdr[i]->next = buf_hdr[i + 1]; - buf_hdr[i]->burst_num = 0; - } - - buf_hdr[num - 1]->next = NULL; - - /* Handle ordered enqueues commonly via links */ - get_queue_order(&origin_qe, &order, first_hdr); - if (origin_qe) { - first_hdr->link = first_hdr->next; - rc = ordered_queue_enq(qe, first_hdr, sustain, - origin_qe, order); - *ret = rc == 0 ? num : rc; - return 1; - } - - return 0; -} - -int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain) -{ - queue_entry_t *origin_qe; - uint64_t order; - int rc; - - /* Special processing needed only if we came from an ordered queue */ - get_queue_order(&origin_qe, &order, buf_hdr); - if (!origin_qe) - return pktout_enqueue(queue, buf_hdr); - - /* Must lock origin_qe for ordered processing */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return -1; - } - - /* We can only complete the enq if we're in order */ - sched_enq_called(); - if (order > origin_qe->s.order_out) { - reorder_enq(queue, order, origin_qe, buf_hdr, sustain); - - /* This enq can't complete until order is restored, so - * we're done here. - */ - queue_unlock(origin_qe); - return 0; - } - - /* Perform our enq since we're in order. - * Note: Don't hold the origin_qe lock across an I/O operation! - */ - queue_unlock(origin_qe); - - /* Handle any chained buffers (internal calls) */ - if (buf_hdr->link) { - odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX]; - odp_buffer_hdr_t *next_buf; - int num = 0; - - next_buf = buf_hdr->link; - buf_hdr->link = NULL; - - while (next_buf) { - buf_hdrs[num++] = next_buf; - next_buf = next_buf->next; - } - - rc = pktout_enq_multi(queue, buf_hdrs, num); - if (rc < num) - return -1; - } else { - rc = pktout_enqueue(queue, buf_hdr); - if (rc) - return rc; - } - - /* Reacquire the lock following the I/O send. Note that we're still - * guaranteed to be in order here since we haven't released - * order yet. - */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return -1; - } - - /* Account for this ordered enq */ - if (!sustain) { - order_release(origin_qe, 1); - sched_order_resolved(NULL); - } - - /* Now check to see if our successful enq has unblocked other buffers - * in the origin's reorder queue. - */ - odp_buffer_hdr_t *reorder_buf; - odp_buffer_hdr_t *next_buf; - odp_buffer_hdr_t *reorder_tail; - odp_buffer_hdr_t *xmit_buf; - odp_buffer_hdr_t *placeholder_buf; - int release_count, placeholder_count; - - /* Send released buffers as well */ - if (reorder_deq(queue, origin_qe, &reorder_tail, &placeholder_buf, - &release_count, &placeholder_count)) { - xmit_buf = origin_qe->s.reorder_head; - origin_qe->s.reorder_head = reorder_tail->next; - reorder_tail->next = NULL; - queue_unlock(origin_qe); - - do { - next_buf = xmit_buf->next; - pktout_enqueue(queue, xmit_buf); - xmit_buf = next_buf; - } while (xmit_buf); - - /* Reacquire the origin_qe lock to continue */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return -1; - } - } - - /* Update the order sequence to reflect the deq'd elements */ - order_release(origin_qe, release_count + placeholder_count); - - /* Now handle sends to other queues that are ready to go */ - reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND); - - /* We're fully done with the origin_qe at last */ - queue_unlock(origin_qe); - - /* Now send the next buffer to its target queue */ - if (reorder_buf) - queue_enq_internal(reorder_buf); - - /* Free all placeholder bufs that are now released */ - while (placeholder_buf) { - next_buf = placeholder_buf->next; - odp_buffer_free(placeholder_buf->handle.handle); - placeholder_buf = next_buf; - } - - return 0; -} - -int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num, int sustain) -{ - int i, rc; - queue_entry_t *origin_qe; - uint64_t order; - - /* If we're not ordered, handle directly */ - get_queue_order(&origin_qe, &order, buf_hdr[0]); - if (!origin_qe) - return pktout_enq_multi(queue, buf_hdr, num); - - /* Chain input buffers together */ - for (i = 0; i < num - 1; i++) - buf_hdr[i]->next = buf_hdr[i + 1]; - - buf_hdr[num - 1]->next = NULL; - - /* Handle commonly via links */ - buf_hdr[0]->link = buf_hdr[0]->next; - rc = queue_pktout_enq(queue, buf_hdr[0], sustain); - return rc == 0 ? num : rc; -} - -/* These routines exists here rather than in odp_schedule - * because they operate on queue interenal structures - */ -int release_order(void *origin_qe_ptr, uint64_t order, - odp_pool_t pool, int enq_called) -{ - odp_buffer_t placeholder_buf; - odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf; - queue_entry_t *origin_qe = origin_qe_ptr; - - /* Must lock the origin queue to process the release */ - queue_lock(origin_qe); - - /* If we are in order we can release immediately since there can be no - * confusion about intermediate elements - */ - if (order <= origin_qe->s.order_out) { - reorder_buf = origin_qe->s.reorder_head; - - /* We're in order, however there may be one or more events on - * the reorder queue that are part of this order. If that is - * the case, remove them and let ordered_queue_enq() handle - * them and resolve the order for us. - */ - if (reorder_buf && reorder_buf->order == order) { - odp_buffer_hdr_t *reorder_head = reorder_buf; - - next_buf = reorder_buf->next; - - while (next_buf && next_buf->order == order) { - reorder_buf = next_buf; - next_buf = next_buf->next; - } - - origin_qe->s.reorder_head = reorder_buf->next; - reorder_buf->next = NULL; - - queue_unlock(origin_qe); - reorder_head->link = reorder_buf->next; - return ordered_queue_enq(reorder_head->target_qe, - reorder_head, RESOLVE_ORDER, - origin_qe, order); - } - - /* Reorder queue has no elements for this order, so it's safe - * to resolve order here - */ - order_release(origin_qe, 1); - - /* Check if this release allows us to unblock waiters. At the - * point of this call, the reorder list may contain zero or - * more placeholders that need to be freed, followed by zero - * or one complete reorder buffer chain. Note that since we - * are releasing order, we know no further enqs for this order - * can occur, so ignore the sustain bit to clear out our - * element(s) on the reorder queue - */ - reorder_complete(origin_qe, &reorder_buf, - &placeholder_buf_hdr, NOAPPEND); - - /* Now safe to unlock */ - queue_unlock(origin_qe); - - /* If reorder_buf has a target, do the enq now */ - if (reorder_buf) - queue_enq_internal(reorder_buf); - - while (placeholder_buf_hdr) { - odp_buffer_hdr_t *placeholder_next = - placeholder_buf_hdr->next; - - odp_buffer_free(placeholder_buf_hdr->handle.handle); - placeholder_buf_hdr = placeholder_next; - } - - return 0; - } - - /* If we are not in order we need a placeholder to represent our - * "place in line" unless we have issued enqs, in which case we - * already have a place in the reorder queue. If we need a - * placeholder, use an element from the same pool we were scheduled - * with is from, otherwise just ensure that the final element for our - * order is not marked sustain. - */ - if (enq_called) { - reorder_buf = NULL; - next_buf = origin_qe->s.reorder_head; - - while (next_buf && next_buf->order <= order) { - reorder_buf = next_buf; - next_buf = next_buf->next; - } - - if (reorder_buf && reorder_buf->order == order) { - reorder_buf->flags.sustain = 0; - queue_unlock(origin_qe); - return 0; - } - } - - placeholder_buf = odp_buffer_alloc(pool); - - /* Can't release if no placeholder is available */ - if (odp_unlikely(placeholder_buf == ODP_BUFFER_INVALID)) { - queue_unlock(origin_qe); - return -1; - } - - placeholder_buf_hdr = buf_hdl_to_hdr(placeholder_buf); - - /* Copy info to placeholder and add it to the reorder queue */ - placeholder_buf_hdr->origin_qe = origin_qe; - placeholder_buf_hdr->order = order; - placeholder_buf_hdr->flags.sustain = 0; - - reorder_enq(NULL, order, origin_qe, placeholder_buf_hdr, 0); - - queue_unlock(origin_qe); - return 0; -} - -void schedule_order_lock(unsigned lock_index) -{ - queue_entry_t *origin_qe; - uint64_t sync, sync_out; - - origin_qe = sched_local.origin_qe; - if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) - return; - - sync = sched_local.sync[lock_index]; - sync_out = odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]); - ODP_ASSERT(sync >= sync_out); - - /* Wait until we are in order. Note that sync_out will be incremented - * both by unlocks as well as order resolution, so we're OK if only - * some events in the ordered flow need to lock. - */ - while (sync != sync_out) { - odp_cpu_pause(); - sync_out = - odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]); - } -} - -void schedule_order_unlock(unsigned lock_index) -{ - queue_entry_t *origin_qe; - - origin_qe = sched_local.origin_qe; - if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) - return; - ODP_ASSERT(sched_local.sync[lock_index] == - odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index])); - - /* Release the ordered lock */ - odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out[lock_index]); -} - -void cache_order_info(uint32_t queue_index) -{ - uint32_t i; - queue_entry_t *qe = get_qentry(queue_index); - odp_event_t ev = sched_local.ev_stash[0]; - odp_buffer_hdr_t *buf_hdr = buf_hdl_to_hdr(odp_buffer_from_event(ev)); - - sched_local.origin_qe = qe; - sched_local.order = buf_hdr->order; - sched_local.pool = buf_hdr->pool_hdl; - - for (i = 0; i < qe->s.param.sched.lock_count; i++) - sched_local.sync[i] = buf_hdr->sync[i]; - - sched_local.enq_called = 0; -} diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 5090a5c..069b8bf 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -299,12 +299,11 @@ static int sched_queue(uint32_t qi) }
static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num, - int sustain, int *ret) + int *ret) { (void)queue_index; (void)buf_hdr; (void)num; - (void)sustain; (void)ret;
/* didn't consume the events */ diff --git a/platform/linux-generic/odp_traffic_mngr.c b/platform/linux-generic/odp_traffic_mngr.c index ffb149b..6a660c5 100644 --- a/platform/linux-generic/odp_traffic_mngr.c +++ b/platform/linux-generic/odp_traffic_mngr.c @@ -99,6 +99,24 @@ static odp_bool_t tm_demote_pkt_desc(tm_system_t *tm_system, tm_shaper_obj_t *timer_shaper, pkt_desc_t *demoted_pkt_desc);
+static int queue_tm_reenq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) +{ + odp_tm_queue_t tm_queue = MAKE_ODP_TM_QUEUE((uint8_t *)queue - + offsetof(tm_queue_obj_t, + tm_qentry)); + odp_packet_t pkt = (odp_packet_t)buf_hdr->handle.handle; + + return odp_tm_enq(tm_queue, pkt); +} + +static int queue_tm_reenq_multi(queue_entry_t *queue ODP_UNUSED, + odp_buffer_hdr_t *buf[] ODP_UNUSED, + int num ODP_UNUSED) +{ + ODP_ABORT("Invalid call to queue_tm_reenq_multi()\n"); + return 0; +} + static tm_queue_obj_t *get_tm_queue_obj(tm_system_t *tm_system, pkt_desc_t *pkt_desc) { @@ -1860,13 +1878,6 @@ static int tm_enqueue(tm_system_t *tm_system, odp_bool_t drop_eligible, drop; uint32_t frame_len, pkt_depth; int rc; - odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt); - - /* If we're from an ordered queue and not in order - * record the event and wait until order is resolved - */ - if (queue_tm_reorder(&tm_queue_obj->tm_qentry, &pkt_hdr->buf_hdr)) - return 0;
if (tm_system->first_enq == 0) { odp_barrier_wait(&tm_system->tm_system_barrier); @@ -1886,7 +1897,10 @@ static int tm_enqueue(tm_system_t *tm_system,
work_item.queue_num = tm_queue_obj->queue_num; work_item.pkt = pkt; + sched_fn->order_lock(); rc = input_work_queue_append(tm_system, &work_item); + sched_fn->order_unlock(); + if (rc < 0) { ODP_DBG("%s work queue full\n", __func__); return rc; diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c index 28dd404..7096283 100644 --- a/platform/linux-generic/pktio/loop.c +++ b/platform/linux-generic/pktio/loop.c @@ -169,7 +169,7 @@ static int loopback_send(pktio_entry_t *pktio_entry, int index ODP_UNUSED, odp_ticketlock_lock(&pktio_entry->s.txl);
qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq); - ret = queue_enq_multi(qentry, hdr_tbl, len, 0); + ret = queue_enq_multi(qentry, hdr_tbl, len);
if (ret > 0) { pktio_entry->s.stats.out_ucast_pkts += ret;
commit d235cfacc743bf0524fbc6c7f0f5810fa85ecd93 Author: Matias Elo matias.elo@nokia.com Date: Fri Dec 2 12:56:24 2016 +0200
linux-gen: sched: add internal APIs for locking/unlocking ordered processing
The internal ordered processing locking functions can be more streamlined compared to the public API functions.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index df73e70..37f88a4 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -37,6 +37,8 @@ typedef int (*schedule_init_global_fn_t)(void); typedef int (*schedule_term_global_fn_t)(void); typedef int (*schedule_init_local_fn_t)(void); typedef int (*schedule_term_local_fn_t)(void); +typedef void (*schedule_order_lock_fn_t)(void); +typedef void (*schedule_order_unlock_fn_t)(void);
typedef struct schedule_fn_t { schedule_pktio_start_fn_t pktio_start; @@ -51,6 +53,8 @@ typedef struct schedule_fn_t { schedule_term_global_fn_t term_global; schedule_init_local_fn_t init_local; schedule_term_local_fn_t term_local; + schedule_order_lock_fn_t order_lock; + schedule_order_unlock_fn_t order_unlock; } schedule_fn_t;
/* Interface towards the scheduler */ diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index dfc9555..cab68a3 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -755,6 +755,14 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t wait, return schedule_loop(out_queue, wait, events, num); }
+static void order_lock(void) +{ +} + +static void order_unlock(void) +{ +} + static void schedule_pause(void) { sched_local.pause = 1; @@ -991,7 +999,9 @@ const schedule_fn_t schedule_default_fn = { .init_global = schedule_init_global, .term_global = schedule_term_global, .init_local = schedule_init_local, - .term_local = schedule_term_local + .term_local = schedule_term_local, + .order_lock = order_lock, + .order_unlock = order_unlock };
/* Fill in scheduler API calls */ diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 8b355da..5090a5c 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -660,6 +660,14 @@ static void schedule_order_unlock(unsigned lock_index) (void)lock_index; }
+static void order_lock(void) +{ +} + +static void order_unlock(void) +{ +} + /* Fill in scheduler interface */ const schedule_fn_t schedule_sp_fn = { .pktio_start = pktio_start, @@ -673,7 +681,9 @@ const schedule_fn_t schedule_sp_fn = { .init_global = init_global, .term_global = term_global, .init_local = init_local, - .term_local = term_local + .term_local = term_local, + .order_lock = order_lock, + .order_unlock = order_unlock };
/* Fill in scheduler API calls */
-----------------------------------------------------------------------
Summary of changes: platform/linux-generic/Makefile.am | 3 - .../linux-generic/include/odp_buffer_internal.h | 7 - .../linux-generic/include/odp_config_internal.h | 5 + .../linux-generic/include/odp_packet_io_queue.h | 5 +- .../linux-generic/include/odp_queue_internal.h | 33 +- platform/linux-generic/include/odp_schedule_if.h | 15 +- .../linux-generic/include/odp_schedule_internal.h | 50 -- .../include/odp_schedule_ordered_internal.h | 25 - platform/linux-generic/odp_packet_io.c | 17 +- platform/linux-generic/odp_pool.c | 17 +- platform/linux-generic/odp_queue.c | 76 +- platform/linux-generic/odp_schedule.c | 284 ++++++- platform/linux-generic/odp_schedule_ordered.c | 818 --------------------- platform/linux-generic/odp_schedule_sp.c | 25 +- platform/linux-generic/odp_traffic_mngr.c | 28 +- platform/linux-generic/pktio/loop.c | 2 +- 16 files changed, 370 insertions(+), 1040 deletions(-) delete mode 100644 platform/linux-generic/include/odp_schedule_internal.h delete mode 100644 platform/linux-generic/include/odp_schedule_ordered_internal.h delete mode 100644 platform/linux-generic/odp_schedule_ordered.c
hooks/post-receive