This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via 42a46399f064212fc7a16f088779bee744d1241c (commit) via 8ae6e015e8d73092b5c6b728fa39ff8190b88015 (commit) via 47c4ecd4bfdeeca0fd507dbb4f9182b36e5829f9 (commit) via 2ba3fc5be07c77feaaffdaf7eb1bdd9f7c0f37c2 (commit) via 3331e3b51dfcbfe8be5bc8c118cb8a2561294d60 (commit) via db8c7882c2fc8c849cebef08e505343ae1396b79 (commit) from 1adfa2e17b27032ff31bd8f361e05970ce186148 (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 42a46399f064212fc7a16f088779bee744d1241c Author: Petri Savolainen petri.savolainen@linaro.org Date: Mon Sep 10 15:33:38 2018 +0300
linux-gen: ring: change ring_deq return value
Return number of data values dequeued (0 or 1) instead of the data value. This improves error tolerance as there's no data value reserved to indicate empty ring. Also CPU may speculate further before the actual data value is actually needed.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h index 97673bef..9a637afb 100644 --- a/platform/linux-generic/include/odp_ring_internal.h +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -18,9 +18,6 @@ extern "C" { #include <odp/api/plat/atomic_inlines.h> #include <odp/api/plat/cpu_inlines.h>
-/* Ring empty, not a valid data value. */ -#define RING_EMPTY ((uint32_t)-1) - /* Ring of uint32_t data * * Ring stores head and tail counters. Ring indexes are formed from these @@ -59,7 +56,7 @@ static inline void ring_init(ring_t *ring) }
/* Dequeue data from the ring head */ -static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) +static inline uint32_t ring_deq(ring_t *ring, uint32_t mask, uint32_t *data) { uint32_t head, tail, new_head;
@@ -73,7 +70,7 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) tail = odp_atomic_load_acq_u32(&ring->w_tail);
if (head == tail) - return RING_EMPTY; + return 0;
new_head = head + 1;
@@ -83,7 +80,8 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
/* Read data. CAS acquire-release ensures that data read * does not move above from here. */ - return ring->data[new_head & mask]; + *data = ring->data[new_head & mask]; + return 1; }
/* Dequeue multiple data from the ring head. Num is smaller than ring size. */ diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 77fee74d..a285edc3 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -416,8 +416,7 @@ static int schedule_term_global(void) ring_t *ring = &sched->prio_q[grp][i][j].ring; uint32_t qi;
- while ((qi = ring_deq(ring, ring_mask)) != - RING_EMPTY) { + while (ring_deq(ring, ring_mask, &qi)) { odp_event_t events[1]; int num;
@@ -907,10 +906,9 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
/* Get queue index from the priority queue */ ring = &sched->prio_q[grp][prio][id].ring; - qi = ring_deq(ring, ring_mask);
- /* Priority queue empty */ - if (qi == RING_EMPTY) { + if (ring_deq(ring, ring_mask, &qi) == 0) { + /* Priority queue empty */ i++; id++; continue; diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c index f76942ff..e1ef10c4 100644 --- a/platform/linux-generic/odp_schedule_iquery.c +++ b/platform/linux-generic/odp_schedule_iquery.c @@ -271,7 +271,7 @@ static int schedule_init_global(void) ring_init(&queue->ring);
for (k = 0; k < PKTIO_RING_SIZE; k++) - queue->cmd_index[k] = RING_EMPTY; + queue->cmd_index[k] = -1; }
for (i = 0; i < NUM_PKTIO_CMD; i++) @@ -668,9 +668,8 @@ static inline void pktio_poll_input(void) for (i = 0; i < PKTIO_CMD_QUEUES; i++, hash = (hash + 1) % PKTIO_CMD_QUEUES) { ring = &sched->pktio_poll.queues[hash].ring; - index = ring_deq(ring, PKTIO_RING_MASK);
- if (odp_unlikely(index == RING_EMPTY)) + if (odp_unlikely(ring_deq(ring, PKTIO_RING_MASK, &index) == 0)) continue;
cmd = &sched->pktio_poll.commands[index]; diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 8ddd1e94..6b9431b6 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -401,9 +401,8 @@ static inline sched_cmd_t *rem_head(int group, int prio) int pktio;
prio_queue = &sched_global->prio_queue[group][prio]; - ring_idx = ring_deq(&prio_queue->ring, RING_MASK);
- if (ring_idx == RING_EMPTY) + if (ring_deq(&prio_queue->ring, RING_MASK, &ring_idx) == 0) return NULL;
pktio = index_from_ring_idx(&index, ring_idx);
commit 8ae6e015e8d73092b5c6b728fa39ff8190b88015 Author: Petri Savolainen petri.savolainen@linaro.org Date: Wed Sep 5 16:48:53 2018 +0300
linux-gen: sched: stash ring pointer
Save ring pointer into stash to avoid table lookups when releasing the atomic context.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 6ed1f8b4..77fee74d 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -137,6 +137,7 @@ typedef struct ODP_ALIGNED_CACHE { uint16_t ev_index; uint32_t qi; odp_queue_t queue; + ring_t *ring; odp_event_t ev[BURST_SIZE_MAX]; } stash;
@@ -604,10 +605,7 @@ static void schedule_pktio_start(int pktio_index, int num_pktin, static inline void release_atomic(void) { uint32_t qi = sched_local.stash.qi; - int grp = sched->queue[qi].grp; - int prio = sched->queue[qi].prio; - int spread = sched->queue[qi].spread; - ring_t *ring = &sched->prio_q[grp][prio][spread].ring; + ring_t *ring = sched_local.stash.ring;
/* Release current atomic queue */ ring_enq(ring, sched->ring_mask, qi); @@ -990,8 +988,9 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
} else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) { /* Hold queue during atomic access */ - sched_local.stash.qi = qi; - sched_local.sync_ctx = sync_ctx; + sched_local.stash.qi = qi; + sched_local.stash.ring = ring; + sched_local.sync_ctx = sync_ctx; } else { /* Continue scheduling the queue */ ring_enq(ring, ring_mask, qi);
commit 47c4ecd4bfdeeca0fd507dbb4f9182b36e5829f9 Author: Petri Savolainen petri.savolainen@linaro.org Date: Wed Sep 5 11:54:08 2018 +0300
linux-gen: sched: remove queue_destroy_finalize callback
Scheduled queue dequeue function calls directly the scheduler queue destroy callback. Sched_queue_deq() usage is simpler when the extra round of callbacks is removed.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h index 46b74795..41ca424c 100644 --- a/platform/linux-generic/include/odp_queue_basic_internal.h +++ b/platform/linux-generic/include/odp_queue_basic_internal.h @@ -113,7 +113,6 @@ static inline queue_entry_t *qentry_from_handle(odp_queue_t handle) void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size);
/* Functions for schedulers */ -void sched_queue_destroy_finalize(uint32_t queue_index); void sched_queue_set_status(uint32_t queue_index, int status); int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int num, int update_status); diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index 61cf8a56..3f00cc11 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -353,19 +353,6 @@ static odp_queue_t queue_create(const char *name, return handle; }
-void sched_queue_destroy_finalize(uint32_t queue_index) -{ - queue_entry_t *queue = qentry_from_index(queue_index); - - LOCK(queue); - - if (queue->s.status == QUEUE_STATUS_DESTROYED) { - queue->s.status = QUEUE_STATUS_FREE; - sched_fn->destroy_queue(queue_index); - } - UNLOCK(queue); -} - void sched_queue_set_status(uint32_t queue_index, int status) { queue_entry_t *queue = qentry_from_index(queue_index); @@ -720,7 +707,12 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
if (odp_unlikely(status < QUEUE_STATUS_READY)) { /* Bad queue, or queue has been destroyed. - * Scheduler finalizes queue destroy after this. */ + * Inform scheduler about a destroyed queue. */ + if (queue->s.status == QUEUE_STATUS_DESTROYED) { + queue->s.status = QUEUE_STATUS_FREE; + sched_fn->destroy_queue(queue_index); + } + UNLOCK(queue); return -1; } diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 46ae7f1c..6ed1f8b4 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -402,11 +402,6 @@ static int schedule_init_global(void) return 0; }
-static inline void queue_destroy_finalize(uint32_t qi) -{ - sched_queue_destroy_finalize(qi); -} - static int schedule_term_global(void) { int ret = 0; @@ -427,9 +422,6 @@ static int schedule_term_global(void)
num = sched_queue_deq(qi, events, 1, 1);
- if (num < 0) - queue_destroy_finalize(qi); - if (num > 0) ODP_ERR("Queue not empty\n"); } @@ -944,10 +936,9 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
num = sched_queue_deq(qi, ev_tbl, max_deq, !pktin);
- if (num < 0) { + if (odp_unlikely(num < 0)) { /* Destroyed queue. Continue scheduling the same * priority queue. */ - sched_queue_destroy_finalize(qi); continue; }
diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c index 7dde7784..f76942ff 100644 --- a/platform/linux-generic/odp_schedule_iquery.c +++ b/platform/linux-generic/odp_schedule_iquery.c @@ -209,6 +209,7 @@ struct sched_thread_local { * in the same priority level. */ odp_rwlock_t lock; + int r_locked; queue_index_sparse_t indexes[NUM_SCHED_PRIO]; sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
@@ -292,9 +293,7 @@ static int schedule_term_global(void) if (sched->availables[i]) count = sched_queue_deq(i, events, 1, 1);
- if (count < 0) - sched_queue_destroy_finalize(i); - else if (count > 0) + if (count > 0) ODP_ERR("Queue (%d) not empty\n", i); }
@@ -526,7 +525,14 @@ static void destroy_sched_queue(uint32_t queue_index) return; }
+ if (thread_local.r_locked) + odp_rwlock_read_unlock(&thread_local.lock); + __destroy_sched_queue(G, queue_index); + + if (thread_local.r_locked) + odp_rwlock_read_lock(&thread_local.lock); + odp_rwlock_write_unlock(&G->lock);
if (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED && @@ -614,9 +620,6 @@ static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED) return remains; }
-#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock) -#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock) - static inline bool do_schedule_prio(int prio);
static inline int pop_cache_events(odp_event_t ev[], unsigned int max) @@ -720,7 +723,9 @@ static int do_schedule(odp_queue_t *out_queue, if (odp_unlikely(thread_local.pause)) return count;
- DO_SCHED_LOCK(); + odp_rwlock_read_lock(&thread_local.lock); + thread_local.r_locked = 1; + /* Schedule events */ for (prio = 0; prio < NUM_SCHED_PRIO; prio++) { /* Round robin iterate the interested queue @@ -732,11 +737,14 @@ static int do_schedule(odp_queue_t *out_queue,
count = pop_cache_events(out_ev, max_num); assign_queue_handle(out_queue); - DO_SCHED_UNLOCK(); + + odp_rwlock_read_unlock(&thread_local.lock); + thread_local.r_locked = 0; return count; }
- DO_SCHED_UNLOCK(); + odp_rwlock_read_unlock(&thread_local.lock); + thread_local.r_locked = 0;
/* Poll packet input when there are no events */ pktio_poll_input(); @@ -1536,14 +1544,7 @@ static inline int consume_queue(int prio, unsigned int queue_index)
count = sched_queue_deq(queue_index, cache->stash, max, 1);
- if (count < 0) { - DO_SCHED_UNLOCK(); - sched_queue_destroy_finalize(queue_index); - DO_SCHED_LOCK(); - return 0; - } - - if (count == 0) + if (count <= 0) return 0;
cache->top = &cache->stash[0]; diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 7932e186..8ddd1e94 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -223,12 +223,21 @@ static int init_local(void)
static int term_global(void) { + odp_event_t event; int qi, ret = 0;
for (qi = 0; qi < NUM_QUEUE; qi++) { + int report = 1; + if (sched_global->queue_cmd[qi].s.init) { - /* todo: dequeue until empty ? */ - sched_queue_destroy_finalize(qi); + while (sched_queue_deq(qi, &event, 1, 1) > 0) { + if (report) { + ODP_ERR("Queue not empty\n"); + report = 0; + } + odp_event_free(event); + } + } }
@@ -564,28 +573,20 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait, qi = cmd->s.index; num = sched_queue_deq(qi, events, 1, 1);
- if (num > 0) { - sched_local.cmd = cmd; - - if (from) - *from = queue_from_index(qi); - - return num; - } - - if (num < 0) { - /* Destroyed queue */ - sched_queue_destroy_finalize(qi); + if (num <= 0) { + /* Destroyed or empty queue. Remove empty queue from + * scheduling. A dequeue operation to on an already + * empty queue moves it to NOTSCHED state and + * sched_queue() will be called on next enqueue. */ continue; }
- if (num == 0) { - /* Remove empty queue from scheduling. A dequeue - * operation to on an already empty queue moves - * it to NOTSCHED state and sched_queue() will - * be called on next enqueue. */ - continue; - } + sched_local.cmd = cmd; + + if (from) + *from = queue_from_index(qi); + + return num; } }
commit 2ba3fc5be07c77feaaffdaf7eb1bdd9f7c0f37c2 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Aug 31 16:17:49 2018 +0300
linux-gen: sched: single variable for sync context status
Use single thread local variable to keep track if a synchronization context is held and the type of the context (atomic or ordered). Performance is improved as sync context status is located on single (the first) cache line of sched_local_t.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 89c0a5c4..46ae7f1c 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -30,6 +30,9 @@ #include <odp_libconfig_internal.h> #include <odp/api/plat/queue_inlines.h>
+/* No synchronization context */ +#define NO_SYNC_CONTEXT ODP_SCHED_SYNC_PARALLEL + /* Number of priority levels */ #define NUM_PRIO 8
@@ -124,7 +127,8 @@ ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), /* Scheduler local data */ typedef struct ODP_ALIGNED_CACHE { uint16_t thr; - uint16_t pause; + uint8_t pause; + uint8_t sync_ctx; uint16_t grp_round; uint16_t spread_round;
@@ -241,9 +245,6 @@ static sched_global_t *sched; /* Thread local scheduler context */ static __thread sched_local_t sched_local;
-/* Function prototypes */ -static inline void schedule_release_context(void); - static int read_config_file(sched_global_t *sched) { const char *str; @@ -311,6 +312,7 @@ static void sched_local_init(void) memset(&sched_local, 0, sizeof(sched_local_t));
sched_local.thr = odp_thread_id(); + sched_local.sync_ctx = NO_SYNC_CONTEXT; sched_local.stash.queue = ODP_QUEUE_INVALID; sched_local.stash.qi = PRIO_QUEUE_EMPTY; sched_local.ordered.src_queue = NULL_INDEX; @@ -450,17 +452,6 @@ static int schedule_init_local(void) return 0; }
-static int schedule_term_local(void) -{ - if (sched_local.stash.num_ev) { - ODP_ERR("Locally pre-scheduled events exist.\n"); - return -1; - } - - schedule_release_context(); - return 0; -} - static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask) { odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask); @@ -565,14 +556,9 @@ static int schedule_init_queue(uint32_t queue_index, return 0; }
-static inline int queue_is_atomic(uint32_t queue_index) +static inline uint8_t sched_sync_type(uint32_t queue_index) { - return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ATOMIC; -} - -static inline int queue_is_ordered(uint32_t queue_index) -{ - return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ORDERED; + return sched->queue[queue_index].sync; }
static void schedule_destroy_queue(uint32_t queue_index) @@ -584,7 +570,7 @@ static void schedule_destroy_queue(uint32_t queue_index) sched->queue[queue_index].prio = 0; sched->queue[queue_index].spread = 0;
- if (queue_is_ordered(queue_index) && + if ((sched_sync_type(queue_index) == ODP_SCHED_SYNC_ORDERED) && odp_atomic_load_u64(&sched->order[queue_index].ctx) != odp_atomic_load_u64(&sched->order[queue_index].next_ctx)) ODP_ERR("queue reorder incomplete\n"); @@ -623,21 +609,26 @@ static void schedule_pktio_start(int pktio_index, int num_pktin, } }
-static void schedule_release_atomic(void) +static inline void release_atomic(void) { - uint32_t qi = sched_local.stash.qi; + uint32_t qi = sched_local.stash.qi; + int grp = sched->queue[qi].grp; + int prio = sched->queue[qi].prio; + int spread = sched->queue[qi].spread; + ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
- if (qi != PRIO_QUEUE_EMPTY && sched_local.stash.num_ev == 0) { - int grp = sched->queue[qi].grp; - int prio = sched->queue[qi].prio; - int spread = sched->queue[qi].spread; - ring_t *ring = &sched->prio_q[grp][prio][spread].ring; + /* Release current atomic queue */ + ring_enq(ring, sched->ring_mask, qi);
- /* Release current atomic queue */ - ring_enq(ring, sched->ring_mask, qi); + /* We don't hold sync context anymore */ + sched_local.sync_ctx = NO_SYNC_CONTEXT; +}
- sched_local.stash.qi = PRIO_QUEUE_EMPTY; - } +static void schedule_release_atomic(void) +{ + if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC && + sched_local.stash.num_ev == 0) + release_atomic(); }
static inline int ordered_own_turn(uint32_t queue_index) @@ -709,9 +700,11 @@ static inline void release_ordered(void) }
sched_local.ordered.lock_called.all = 0; - sched_local.ordered.src_queue = NULL_INDEX; sched_local.ordered.in_order = 0;
+ /* We don't hold sync context anymore */ + sched_local.sync_ctx = NO_SYNC_CONTEXT; + ordered_stash_release();
/* Next thread can continue processing */ @@ -720,23 +713,26 @@ static inline void release_ordered(void)
static void schedule_release_ordered(void) { - uint32_t queue_index; - - queue_index = sched_local.ordered.src_queue; - - if (odp_unlikely((queue_index == NULL_INDEX) || + if (odp_unlikely((sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) || sched_local.stash.num_ev)) return;
release_ordered(); }
-static inline void schedule_release_context(void) +static int schedule_term_local(void) { - if (sched_local.ordered.src_queue != NULL_INDEX) - release_ordered(); - else + if (sched_local.stash.num_ev) { + ODP_ERR("Locally pre-scheduled events exist.\n"); + return -1; + } + + if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC) schedule_release_atomic(); + else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED) + schedule_release_ordered(); + + return 0; }
static inline int copy_from_stash(odp_event_t out_ev[], unsigned int max) @@ -758,13 +754,22 @@ static int schedule_ord_enq_multi(odp_queue_t dst_queue, void *buf_hdr[], int num, int *ret) { int i; - uint32_t stash_num = sched_local.ordered.stash_num; - queue_entry_t *dst_qentry = qentry_from_handle(dst_queue); - uint32_t src_queue = sched_local.ordered.src_queue; + uint32_t stash_num; + queue_entry_t *dst_qentry; + uint32_t src_queue;
- if ((src_queue == NULL_INDEX) || sched_local.ordered.in_order) + /* This check is done for every queue enqueue operation, also for plain + * queues. Return fast when not holding a scheduling context. */ + if (odp_likely(sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)) return 0;
+ if (sched_local.ordered.in_order) + return 0; + + src_queue = sched_local.ordered.src_queue; + stash_num = sched_local.ordered.stash_num; + dst_qentry = qentry_from_handle(dst_queue); + if (ordered_own_turn(src_queue)) { /* Own turn, so can do enqueue directly. */ sched_local.ordered.in_order = 1; @@ -891,7 +896,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
for (i = 0; i < num_spread;) { int num; - int ordered; + uint8_t sync_ctx, ordered; odp_queue_t handle; ring_t *ring; int pktin; @@ -921,7 +926,8 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], continue; }
- ordered = queue_is_ordered(qi); + sync_ctx = sched_sync_type(qi); + ordered = (sync_ctx == ODP_SCHED_SYNC_ORDERED);
/* When application's array is larger than max burst * size, output all events directly there. Also, ordered @@ -989,10 +995,12 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
/* Continue scheduling ordered queues */ ring_enq(ring, ring_mask, qi); + sched_local.sync_ctx = sync_ctx;
- } else if (queue_is_atomic(qi)) { + } else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) { /* Hold queue during atomic access */ sched_local.stash.qi = qi; + sched_local.sync_ctx = sync_ctx; } else { /* Continue scheduling the queue */ ring_enq(ring, ring_mask, qi); @@ -1042,7 +1050,11 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], return ret; }
- schedule_release_context(); + /* Release schedule context */ + if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC) + release_atomic(); + else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED) + release_ordered();
if (odp_unlikely(sched_local.pause)) return 0; @@ -1141,14 +1153,10 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,
static inline void order_lock(void) { - uint32_t queue_index; - - queue_index = sched_local.ordered.src_queue; - - if (queue_index == NULL_INDEX) + if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) return;
- wait_for_order(queue_index); + wait_for_order(sched_local.ordered.src_queue); }
static void order_unlock(void) @@ -1160,6 +1168,9 @@ static void schedule_order_lock(uint32_t lock_index) odp_atomic_u64_t *ord_lock; uint32_t queue_index;
+ if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) + return; + queue_index = sched_local.ordered.src_queue;
ODP_ASSERT(queue_index != NULL_INDEX && @@ -1187,6 +1198,9 @@ static void schedule_order_unlock(uint32_t lock_index) odp_atomic_u64_t *ord_lock; uint32_t queue_index;
+ if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) + return; + queue_index = sched_local.ordered.src_queue;
ODP_ASSERT(queue_index != NULL_INDEX &&
commit 3331e3b51dfcbfe8be5bc8c118cb8a2561294d60 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Aug 31 12:08:05 2018 +0300
linux-gen: sched: clean up local data struct
Move stash variables into a struct. Use only 16 bits for thread id, which is enough for 64k threads.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index e329a8e8..89c0a5c4 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -84,6 +84,10 @@ ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES), ODP_STATIC_ASSERT(CHECK_IS_POWER2(MAX_RING_SIZE), "Ring_size_is_not_power_of_two");
+/* Thread ID is saved into uint16_t variable */ +ODP_STATIC_ASSERT(ODP_THREAD_COUNT_MAX < (64 * 1024), + "Max_64k_threads_supported"); + /* Mask of queues per priority */ typedef uint8_t pri_mask_t;
@@ -118,19 +122,22 @@ ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), "Lock_called_values_do_not_fit_in_uint32");
/* Scheduler local data */ -typedef struct { - int thr; - uint16_t stash_num; - uint16_t stash_index; +typedef struct ODP_ALIGNED_CACHE { + uint16_t thr; + uint16_t pause; uint16_t grp_round; uint16_t spread_round; - uint32_t stash_qi; - odp_queue_t stash_queue; - odp_event_t stash_ev[BURST_SIZE_MAX]; + + struct { + uint16_t num_ev; + uint16_t ev_index; + uint32_t qi; + odp_queue_t queue; + odp_event_t ev[BURST_SIZE_MAX]; + } stash;
uint32_t grp_epoch; uint16_t num_grp; - uint16_t pause; uint8_t grp[NUM_SCHED_GRPS]; uint8_t spread_tbl[SPREAD_TBL_SIZE]; uint8_t grp_weight[GRP_WEIGHT_TBL_SIZE]; @@ -304,8 +311,8 @@ static void sched_local_init(void) memset(&sched_local, 0, sizeof(sched_local_t));
sched_local.thr = odp_thread_id(); - sched_local.stash_queue = ODP_QUEUE_INVALID; - sched_local.stash_qi = PRIO_QUEUE_EMPTY; + sched_local.stash.queue = ODP_QUEUE_INVALID; + sched_local.stash.qi = PRIO_QUEUE_EMPTY; sched_local.ordered.src_queue = NULL_INDEX;
spread = prio_spread_index(sched_local.thr); @@ -445,7 +452,7 @@ static int schedule_init_local(void)
static int schedule_term_local(void) { - if (sched_local.stash_num) { + if (sched_local.stash.num_ev) { ODP_ERR("Locally pre-scheduled events exist.\n"); return -1; } @@ -618,9 +625,9 @@ static void schedule_pktio_start(int pktio_index, int num_pktin,
static void schedule_release_atomic(void) { - uint32_t qi = sched_local.stash_qi; + uint32_t qi = sched_local.stash.qi;
- if (qi != PRIO_QUEUE_EMPTY && sched_local.stash_num == 0) { + if (qi != PRIO_QUEUE_EMPTY && sched_local.stash.num_ev == 0) { int grp = sched->queue[qi].grp; int prio = sched->queue[qi].prio; int spread = sched->queue[qi].spread; @@ -629,7 +636,7 @@ static void schedule_release_atomic(void) /* Release current atomic queue */ ring_enq(ring, sched->ring_mask, qi);
- sched_local.stash_qi = PRIO_QUEUE_EMPTY; + sched_local.stash.qi = PRIO_QUEUE_EMPTY; } }
@@ -717,7 +724,8 @@ static void schedule_release_ordered(void)
queue_index = sched_local.ordered.src_queue;
- if (odp_unlikely((queue_index == NULL_INDEX) || sched_local.stash_num)) + if (odp_unlikely((queue_index == NULL_INDEX) || + sched_local.stash.num_ev)) return;
release_ordered(); @@ -735,10 +743,10 @@ static inline int copy_from_stash(odp_event_t out_ev[], unsigned int max) { int i = 0;
- while (sched_local.stash_num && max) { - out_ev[i] = sched_local.stash_ev[sched_local.stash_index]; - sched_local.stash_index++; - sched_local.stash_num--; + while (sched_local.stash.num_ev && max) { + out_ev[i] = sched_local.stash.ev[sched_local.stash.ev_index]; + sched_local.stash.ev_index++; + sched_local.stash.num_ev--; max--; i++; } @@ -889,7 +897,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], int pktin; unsigned int max_deq = max_burst; int stashed = 1; - odp_event_t *ev_tbl = sched_local.stash_ev; + odp_event_t *ev_tbl = sched_local.stash.ev;
if (id >= num_spread) id = 0; @@ -984,7 +992,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
} else if (queue_is_atomic(qi)) { /* Hold queue during atomic access */ - sched_local.stash_qi = qi; + sched_local.stash.qi = qi; } else { /* Continue scheduling the queue */ ring_enq(ring, ring_mask, qi); @@ -993,12 +1001,12 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], handle = queue_from_index(qi);
if (stashed) { - sched_local.stash_num = num; - sched_local.stash_index = 0; - sched_local.stash_queue = handle; + sched_local.stash.num_ev = num; + sched_local.stash.ev_index = 0; + sched_local.stash.queue = handle; ret = copy_from_stash(out_ev, max_num); } else { - sched_local.stash_num = 0; + sched_local.stash.num_ev = 0; ret = num; }
@@ -1025,11 +1033,11 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], uint16_t spread_round, grp_round; uint32_t epoch;
- if (sched_local.stash_num) { + if (sched_local.stash.num_ev) { ret = copy_from_stash(out_ev, max_num);
if (out_queue) - *out_queue = sched_local.stash_queue; + *out_queue = sched_local.stash.queue;
return ret; }
commit db8c7882c2fc8c849cebef08e505343ae1396b79 Author: Petri Savolainen petri.savolainen@linaro.org Date: Thu Aug 30 17:07:25 2018 +0300
linux-gen: queue: remove extra checks
Remove unnecessary checks from critical sections of scheduled queue enqueue and dequeue operations. Parallelism improves when the number of instructions and (potential) cache misses decreases when holding the lock.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index 7e8b7e34..61cf8a56 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -681,12 +681,6 @@ static inline int _sched_queue_enq_multi(odp_queue_t handle,
LOCK(queue);
- if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { - UNLOCK(queue); - ODP_ERR("Bad queue status\n"); - return -1; - } - num_enq = ring_st_enq_multi(ring_st, queue->s.ring_data, queue->s.ring_mask, buf_idx, num);
@@ -712,7 +706,7 @@ static inline int _sched_queue_enq_multi(odp_queue_t handle, int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num, int update_status) { - int num_deq; + int num_deq, status; ring_st_t *ring_st; queue_entry_t *queue = qentry_from_index(queue_index); int status_sync = sched_fn->status_sync; @@ -722,7 +716,9 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
LOCK(queue);
- if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + status = queue->s.status; + + if (odp_unlikely(status < QUEUE_STATUS_READY)) { /* Bad queue, or queue has been destroyed. * Scheduler finalizes queue destroy after this. */ UNLOCK(queue); @@ -734,10 +730,10 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
if (num_deq == 0) { /* Already empty queue */ - if (update_status && queue->s.status == QUEUE_STATUS_SCHED) { + if (update_status && status == QUEUE_STATUS_SCHED) { queue->s.status = QUEUE_STATUS_NOTSCHED;
- if (status_sync) + if (odp_unlikely(status_sync)) sched_fn->unsched_queue(queue->s.index); }
@@ -746,7 +742,7 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num, return 0; }
- if (status_sync && queue->s.type == ODP_QUEUE_TYPE_SCHED) + if (odp_unlikely(status_sync)) sched_fn->save_context(queue->s.index);
UNLOCK(queue);
-----------------------------------------------------------------------
Summary of changes: .../include/odp_queue_basic_internal.h | 1 - platform/linux-generic/include/odp_ring_internal.h | 10 +- platform/linux-generic/odp_queue_basic.c | 38 ++-- platform/linux-generic/odp_schedule_basic.c | 198 +++++++++++---------- platform/linux-generic/odp_schedule_iquery.c | 40 ++--- platform/linux-generic/odp_schedule_sp.c | 46 ++--- 6 files changed, 164 insertions(+), 169 deletions(-)
hooks/post-receive