This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via a7f5589f81864c71faf18c3d2c3636320cecb1c6 (commit) via aab7083f4ab4fa2ddd8d1a41a554cead3a0055bb (commit) via 66d26268065eaae370b402dc5b006121c6931c5c (commit) via d6260eeec46915b9d90453cc5add6ac0cac74e52 (commit) via ef8b7b691d4902153a71087ed0f4297cdd8809ff (commit) via de19d9847b76c90473c474a831800d383865a4ba (commit) from 2d63311886330e6acddb55674ff7a7b62d711ef5 (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit a7f5589f81864c71faf18c3d2c3636320cecb1c6 Author: Petri Savolainen petri.savolainen@nokia.com Date: Mon Sep 5 13:09:30 2016 +0300
linux-gen: sched: call schedule_release_atomic directly
Use local schedule_release_atomic call instead of the API call.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index d4eb13f..78982d9 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -201,6 +201,9 @@ static sched_global_t *sched; /* Thread local scheduler context */ __thread sched_local_t sched_local;
+/* Function prototypes */ +static inline void schedule_release_context(void); + static void ring_init(sched_ring_t *ring) { odp_atomic_init_u32(&ring->w_head, 0); @@ -377,16 +380,6 @@ static int schedule_init_local(void) return 0; }
-static inline void schedule_release_context(void) -{ - if (sched_local.origin_qe != NULL) { - release_order(sched_local.origin_qe, sched_local.order, - sched_local.pool, sched_local.enq_called); - sched_local.origin_qe = NULL; - } else - odp_schedule_release_atomic(); -} - static int schedule_term_local(void) { if (sched_local.num) { @@ -567,6 +560,16 @@ static void schedule_release_ordered(void) } }
+static inline void schedule_release_context(void) +{ + if (sched_local.origin_qe != NULL) { + release_order(sched_local.origin_qe, sched_local.order, + sched_local.pool, sched_local.enq_called); + sched_local.origin_qe = NULL; + } else + schedule_release_atomic(); +} + static inline int copy_events(odp_event_t out_ev[], unsigned int max) { int i = 0;
commit aab7083f4ab4fa2ddd8d1a41a554cead3a0055bb Author: Petri Savolainen petri.savolainen@nokia.com Date: Mon Sep 5 13:09:29 2016 +0300
linux-gen: sched: increase max dequeue batch size
Increased queue dequeue batch size to improve throughput. Low priority queues have smaller batch size than high/default, so that head of line blocking latency is limited.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h index 1b6ae93..d6cfdb4 100644 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ b/platform/linux-generic/include/odp_schedule_internal.h @@ -12,7 +12,7 @@ extern "C" { #endif
/* Maximum number of dequeues */ -#define MAX_DEQ 4 +#define MAX_DEQ 8
typedef struct { int thr; diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 95ba13f..d4eb13f 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -586,12 +586,13 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max) * Schedule queues */ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], - unsigned int max_num, unsigned int max_deq) + unsigned int max_num) { - int i, j; + int prio, i; int ret; int id; int offset = 0; + unsigned int max_deq = MAX_DEQ; uint32_t qi;
if (sched_local.num) { @@ -619,14 +620,14 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.round++;
/* Schedule events */ - for (i = 0; i < NUM_PRIO; i++) { + for (prio = 0; prio < NUM_PRIO; prio++) {
- if (sched->pri_mask[i] == 0) + if (sched->pri_mask[prio] == 0) continue;
id = (sched_local.thr + offset) & (QUEUES_PER_PRIO - 1);
- for (j = 0; j < QUEUES_PER_PRIO;) { + for (i = 0; i < QUEUES_PER_PRIO;) { int num; int grp; int ordered; @@ -637,20 +638,20 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], id = 0;
/* No queues created for this priority queue */ - if (odp_unlikely((sched->pri_mask[i] & (1 << id)) + if (odp_unlikely((sched->pri_mask[prio] & (1 << id)) == 0)) { - j++; + i++; id++; continue; }
/* Get queue index from the priority queue */ - ring = &sched->prio_q[i][id].ring; + ring = &sched->prio_q[prio][id].ring; qi = ring_deq(ring, PRIO_QUEUE_MASK);
/* Priority queue empty */ if (qi == RING_EMPTY) { - j++; + i++; id++; continue; } @@ -665,11 +666,16 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], */ ring_enq(ring, PRIO_QUEUE_MASK, qi);
- j++; + i++; id++; continue; }
+ /* Low priorities have smaller batch size to limit + * head of line blocking latency. */ + if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT)) + max_deq = MAX_DEQ / 2; + ordered = sched_cb_queue_is_ordered(qi);
/* For ordered queues we want consecutive events to @@ -782,14 +788,14 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
static int schedule_loop(odp_queue_t *out_queue, uint64_t wait, odp_event_t out_ev[], - unsigned int max_num, unsigned int max_deq) + unsigned int max_num) { odp_time_t next, wtime; int first = 1; int ret;
while (1) { - ret = do_schedule(out_queue, out_ev, max_num, max_deq); + ret = do_schedule(out_queue, out_ev, max_num);
if (ret) break; @@ -820,7 +826,7 @@ static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait)
ev = ODP_EVENT_INVALID;
- schedule_loop(out_queue, wait, &ev, 1, MAX_DEQ); + schedule_loop(out_queue, wait, &ev, 1);
return ev; } @@ -828,7 +834,7 @@ static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait) static int schedule_multi(odp_queue_t *out_queue, uint64_t wait, odp_event_t events[], int num) { - return schedule_loop(out_queue, wait, events, num, MAX_DEQ); + return schedule_loop(out_queue, wait, events, num); }
static void schedule_pause(void)
commit 66d26268065eaae370b402dc5b006121c6931c5c Author: Petri Savolainen petri.savolainen@nokia.com Date: Mon Sep 5 13:09:28 2016 +0300
linux-gen: sched: pktio poll loop optimization
Optimized pktio poll loop if-branches with likely/unlikely hints.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 3763a41..95ba13f 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -43,6 +43,9 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Packet input poll cmd queues */ #define PKTIO_CMD_QUEUES 4
+/* Mask for wrapping command queues */ +#define PKTIO_CMD_QUEUE_MASK (PKTIO_CMD_QUEUES - 1) + /* Maximum number of packet input queues per command */ #define MAX_PKTIN 16
@@ -94,6 +97,11 @@ ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PRIO_QUEUE_RING_SIZE), ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PKTIO_RING_SIZE), "pktio_ring_size_is_not_power_of_two");
+/* Number of commands queues must be power of two, so that PKTIO_CMD_QUEUE_MASK + * can be used. */ +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PKTIO_CMD_QUEUES), + "pktio_cmd_queues_is_not_power_of_two"); + /* Mask of queues per priority */ typedef uint8_t pri_mask_t;
@@ -452,7 +460,7 @@ static void schedule_destroy_queue(uint32_t queue_index)
static int poll_cmd_queue_idx(int pktio_index, int pktin_idx) { - return (PKTIO_CMD_QUEUES - 1) & (pktio_index ^ pktin_idx); + return PKTIO_CMD_QUEUE_MASK & (pktio_index ^ pktin_idx); }
static inline pktio_cmd_t *alloc_pktio_cmd(void) @@ -725,28 +733,29 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], * have to do full iteration to avoid packet input starvation when * there are less threads than command queues. */ - id = sched_local.thr & (PKTIO_CMD_QUEUES - 1); + id = sched_local.thr & PKTIO_CMD_QUEUE_MASK;
- for (i = 0; i < PKTIO_CMD_QUEUES; i++, id++) { + for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) & + PKTIO_CMD_QUEUE_MASK)) { + sched_ring_t *ring; uint32_t cmd_index; pktio_cmd_t *cmd;
- if (id == PKTIO_CMD_QUEUES) - id = 0; - - if (sched->num_pktio_cmd[id] == 0) + if (odp_unlikely(sched->num_pktio_cmd[id] == 0)) continue;
- cmd_index = ring_deq(&sched->pktio_q[id].ring, PKTIO_RING_MASK); + ring = &sched->pktio_q[id].ring; + cmd_index = ring_deq(ring, PKTIO_RING_MASK);
- if (cmd_index == RING_EMPTY) + if (odp_unlikely(cmd_index == RING_EMPTY)) continue;
cmd = &sched->pktio_cmd[cmd_index];
/* Poll packet input */ - if (sched_cb_pktin_poll(cmd->pktio_index, - cmd->num_pktin, cmd->pktin)){ + if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio_index, + cmd->num_pktin, + cmd->pktin))){ /* Pktio stopped or closed. Remove poll command and call * stop_finalize when all commands of the pktio has * been removed. */ @@ -757,8 +766,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], free_pktio_cmd(cmd); } else { /* Continue scheduling the pktio */ - ring_enq(&sched->pktio_q[id].ring, PKTIO_RING_MASK, - cmd_index); + ring_enq(ring, PKTIO_RING_MASK, cmd_index);
/* Do not iterate through all pktin poll command queues * every time. */
commit d6260eeec46915b9d90453cc5add6ac0cac74e52 Author: Petri Savolainen petri.savolainen@nokia.com Date: Mon Sep 5 13:09:27 2016 +0300
linux-gen: sched: increase number of pktin
Increased maximum number of pktin queues supported by the scheduler.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 07c21d5..3763a41 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -44,7 +44,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && #define PKTIO_CMD_QUEUES 4
/* Maximum number of packet input queues per command */ -#define MAX_PKTIN 8 +#define MAX_PKTIN 16
/* Maximum number of packet IO interfaces */ #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
commit ef8b7b691d4902153a71087ed0f4297cdd8809ff Author: Petri Savolainen petri.savolainen@nokia.com Date: Mon Sep 5 13:09:26 2016 +0300
linux-gen: sched: ring based pktio command queues
Implement scheduler pktio poll command queues with a ring based data structure instead of odp_queue_t queues.
Also scheduler initialization and termination is simplified as there is no more dependency to pool, buffer and queue APIs.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 83c81e4..07c21d5 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -8,10 +8,7 @@ #include <odp/api/schedule.h> #include <odp_schedule_if.h> #include <odp/api/align.h> -#include <odp/api/queue.h> #include <odp/api/shared_memory.h> -#include <odp/api/buffer.h> -#include <odp/api/pool.h> #include <odp_internal.h> #include <odp_debug_internal.h> #include <odp/api/thread.h> @@ -27,10 +24,6 @@ #include <odp_schedule_ordered_internal.h> #include <odp/api/sync.h>
-#ifdef _ODP_PKTIO_IPC -#include <odp_pool_internal.h> -#endif - /* Number of priority levels */ #define NUM_PRIO 8
@@ -48,7 +41,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && #define QUEUES_PER_PRIO 4
/* Packet input poll cmd queues */ -#define POLL_CMD_QUEUES 4 +#define PKTIO_CMD_QUEUES 4
/* Maximum number of packet input queues per command */ #define MAX_PKTIN 8 @@ -56,6 +49,24 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Maximum number of packet IO interfaces */ #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
+/* Maximum number of pktio poll commands */ +#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO) + +/* Not a valid poll command */ +#define PKTIO_CMD_INVALID ((uint32_t)-1) + +/* Pktio command is free */ +#define PKTIO_CMD_FREE PKTIO_CMD_INVALID + +/* Packet IO poll queue ring size. In worst case, all pktios have all pktins + * enabled and one poll command is created per pktin queue. The ring size must + * be larger than or equal to NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can + * hold all poll commands in the worst case. */ +#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES) + +/* Mask for wrapping around pktio poll command index */ +#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1) + /* Priority queue ring size. In worst case, all event queues are scheduled * queues and have the same priority. The ring size must be larger than or * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all @@ -79,6 +90,10 @@ ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES), ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PRIO_QUEUE_RING_SIZE), "Ring_size_is_not_power_of_two");
+/* Ring size must be power of two, so that PKTIO_RING_MASK can be used. */ +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PKTIO_RING_SIZE), + "pktio_ring_size_is_not_power_of_two"); + /* Mask of queues per priority */ typedef uint8_t pri_mask_t;
@@ -116,6 +131,24 @@ typedef struct {
} prio_queue_t ODP_ALIGNED_CACHE;
+/* Packet IO queue */ +typedef struct { + /* Ring header */ + sched_ring_t ring; + + /* Ring data: pktio poll command indexes */ + uint32_t cmd_index[PKTIO_RING_SIZE]; + +} pktio_queue_t ODP_ALIGNED_CACHE; + +/* Packet IO poll command */ +typedef struct { + int pktio_index; + int num_pktin; + int pktin[MAX_PKTIN]; + uint32_t cmd_index; +} pktio_cmd_t; + typedef struct { pri_mask_t pri_mask[NUM_PRIO]; odp_spinlock_t mask_lock; @@ -123,12 +156,15 @@ typedef struct { prio_queue_t prio_q[NUM_PRIO][QUEUES_PER_PRIO];
odp_spinlock_t poll_cmd_lock; - struct { - odp_queue_t queue; - uint16_t num; - } poll_cmd[POLL_CMD_QUEUES]; + /* Number of commands in a command queue */ + uint16_t num_pktio_cmd[PKTIO_CMD_QUEUES]; + + /* Packet IO command queues */ + pktio_queue_t pktio_q[PKTIO_CMD_QUEUES]; + + /* Packet IO poll commands */ + pktio_cmd_t pktio_cmd[NUM_PKTIO_CMD];
- odp_pool_t pool; odp_shm_t shm; uint32_t pri_count[NUM_PRIO][QUEUES_PER_PRIO];
@@ -145,31 +181,12 @@ typedef struct { } queue[ODP_CONFIG_QUEUES];
struct { - int num; + /* Number of active commands for a pktio interface */ + int num_cmd; } pktio[NUM_PKTIO];
} sched_global_t;
-/* Schedule command */ -typedef struct { - int cmd; - - union { - struct { - uint32_t queue_index; - }; - - struct { - int pktio_index; - int num; - int index[MAX_PKTIN]; - }; - }; -} sched_cmd_t; - -#define SCHED_CMD_DEQUEUE 0 -#define SCHED_CMD_POLL_PKTIN 1 - /* Global scheduler context */ static sched_global_t *sched;
@@ -253,10 +270,7 @@ static void sched_local_init(void) static int schedule_init_global(void) { odp_shm_t shm; - odp_pool_t pool; int i, j; - odp_pool_param_t params; - int num_cmd;
ODP_DBG("Schedule init ... ");
@@ -273,27 +287,6 @@ static int schedule_init_global(void)
memset(sched, 0, sizeof(sched_global_t));
- /* Number of schedule commands. - * One per scheduled queue and packet interface */ - num_cmd = sched_cb_num_queues() + sched_cb_num_pktio(); - - odp_pool_param_init(¶ms); - params.buf.size = sizeof(sched_cmd_t); - params.buf.align = 0; - params.buf.num = num_cmd; - params.type = ODP_POOL_BUFFER; - -#ifdef _ODP_PKTIO_IPC - pool = _pool_create("odp_sched_pool", ¶ms, 0); -#else - pool = odp_pool_create("odp_sched_pool", ¶ms); -#endif - if (pool == ODP_POOL_INVALID) { - ODP_ERR("Schedule init: Pool create failed.\n"); - return -1; - } - - sched->pool = pool; sched->shm = shm; odp_spinlock_init(&sched->mask_lock);
@@ -310,23 +303,16 @@ static int schedule_init_global(void) }
odp_spinlock_init(&sched->poll_cmd_lock); - for (i = 0; i < POLL_CMD_QUEUES; i++) { - odp_queue_t queue; - char name[] = "odp_poll_cmd_YY"; - - name[13] = '0' + i / 10; - name[14] = '0' + i - 10 * (i / 10); - - queue = odp_queue_create(name, NULL); + for (i = 0; i < PKTIO_CMD_QUEUES; i++) { + ring_init(&sched->pktio_q[i].ring);
- if (queue == ODP_QUEUE_INVALID) { - ODP_ERR("Sched init: Queue create failed.\n"); - return -1; - } - - sched->poll_cmd[i].queue = queue; + for (j = 0; j < PKTIO_RING_SIZE; j++) + sched->pktio_q[i].cmd_index[j] = PKTIO_CMD_INVALID; }
+ for (i = 0; i < NUM_PKTIO_CMD; i++) + sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE; + odp_spinlock_init(&sched->grp_lock);
for (i = 0; i < NUM_SCHED_GRPS; i++) { @@ -346,7 +332,6 @@ static int schedule_term_global(void) int ret = 0; int rc = 0; int i, j; - odp_event_t ev;
for (i = 0; i < NUM_PRIO; i++) { for (j = 0; j < QUEUES_PER_PRIO; j++) { @@ -369,23 +354,6 @@ static int schedule_term_global(void) } }
- for (i = 0; i < POLL_CMD_QUEUES; i++) { - odp_queue_t queue = sched->poll_cmd[i].queue; - - while ((ev = odp_queue_deq(queue)) != ODP_EVENT_INVALID) - odp_event_free(ev); - - if (odp_queue_destroy(queue)) { - ODP_ERR("Poll cmd queue destroy failed\n"); - rc = -1; - } - } - - if (odp_pool_destroy(sched->pool) != 0) { - ODP_ERR("Pool destroy fail.\n"); - rc = -1; - } - ret = odp_shm_free(sched->shm); if (ret < 0) { ODP_ERR("Shm free failed for odp_scheduler"); @@ -482,60 +450,83 @@ static void schedule_destroy_queue(uint32_t queue_index) sched->queue[queue_index].queue_per_prio = 0; }
-static int poll_cmd_queue_idx(int pktio_index, int in_queue_idx) +static int poll_cmd_queue_idx(int pktio_index, int pktin_idx) { - return (POLL_CMD_QUEUES - 1) & (pktio_index ^ in_queue_idx); + return (PKTIO_CMD_QUEUES - 1) & (pktio_index ^ pktin_idx); }
-static void schedule_pktio_start(int pktio_index, int num_in_queue, - int in_queue_idx[]) +static inline pktio_cmd_t *alloc_pktio_cmd(void) +{ + int i; + pktio_cmd_t *cmd = NULL; + + odp_spinlock_lock(&sched->poll_cmd_lock); + + /* Find next free command */ + for (i = 0; i < NUM_PKTIO_CMD; i++) { + if (sched->pktio_cmd[i].cmd_index == PKTIO_CMD_FREE) { + cmd = &sched->pktio_cmd[i]; + cmd->cmd_index = i; + break; + } + } + + odp_spinlock_unlock(&sched->poll_cmd_lock); + + return cmd; +} + +static inline void free_pktio_cmd(pktio_cmd_t *cmd) +{ + odp_spinlock_lock(&sched->poll_cmd_lock); + + cmd->cmd_index = PKTIO_CMD_FREE; + + odp_spinlock_unlock(&sched->poll_cmd_lock); +} + +static void schedule_pktio_start(int pktio_index, int num_pktin, + int pktin_idx[]) { - odp_buffer_t buf; - sched_cmd_t *sched_cmd; - odp_queue_t queue; int i, idx; + pktio_cmd_t *cmd;
- if (num_in_queue > MAX_PKTIN) + if (num_pktin > MAX_PKTIN) ODP_ABORT("Too many input queues for scheduler\n");
- sched->pktio[pktio_index].num = num_in_queue; + sched->pktio[pktio_index].num_cmd = num_pktin;
/* Create a pktio poll command per queue */ - for (i = 0; i < num_in_queue; i++) { - buf = odp_buffer_alloc(sched->pool); + for (i = 0; i < num_pktin; i++) {
- if (buf == ODP_BUFFER_INVALID) - ODP_ABORT("Sched pool empty\n"); + cmd = alloc_pktio_cmd();
- sched_cmd = odp_buffer_addr(buf); - sched_cmd->cmd = SCHED_CMD_POLL_PKTIN; - sched_cmd->pktio_index = pktio_index; - sched_cmd->num = 1; - sched_cmd->index[0] = in_queue_idx[i]; + if (cmd == NULL) + ODP_ABORT("Scheduler out of pktio commands\n");
- idx = poll_cmd_queue_idx(pktio_index, in_queue_idx[i]); + idx = poll_cmd_queue_idx(pktio_index, pktin_idx[i]);
odp_spinlock_lock(&sched->poll_cmd_lock); - sched->poll_cmd[idx].num++; + sched->num_pktio_cmd[idx]++; odp_spinlock_unlock(&sched->poll_cmd_lock);
- queue = sched->poll_cmd[idx].queue; - - if (odp_queue_enq(queue, odp_buffer_to_event(buf))) - ODP_ABORT("schedule_pktio_start failed\n"); + cmd->pktio_index = pktio_index; + cmd->num_pktin = 1; + cmd->pktin[0] = pktin_idx[i]; + ring_enq(&sched->pktio_q[idx].ring, PKTIO_RING_MASK, + cmd->cmd_index); } }
-static int schedule_pktio_stop(sched_cmd_t *sched_cmd) +static int schedule_pktio_stop(int pktio_index, int first_pktin) { int num; - int pktio_index = sched_cmd->pktio_index; - int idx = poll_cmd_queue_idx(pktio_index, sched_cmd->index[0]); + int idx = poll_cmd_queue_idx(pktio_index, first_pktin);
odp_spinlock_lock(&sched->poll_cmd_lock); - sched->poll_cmd[idx].num--; - sched->pktio[pktio_index].num--; - num = sched->pktio[pktio_index].num; + sched->num_pktio_cmd[idx]--; + sched->pktio[pktio_index].num_cmd--; + num = sched->pktio[pktio_index].num_cmd; odp_spinlock_unlock(&sched->poll_cmd_lock);
return num; @@ -592,9 +583,6 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], int i, j; int ret; int id; - odp_event_t ev; - odp_buffer_t buf; - sched_cmd_t *sched_cmd; int offset = 0; uint32_t qi;
@@ -737,47 +725,40 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], * have to do full iteration to avoid packet input starvation when * there are less threads than command queues. */ - id = sched_local.thr & (POLL_CMD_QUEUES - 1); + id = sched_local.thr & (PKTIO_CMD_QUEUES - 1);
- for (i = 0; i < POLL_CMD_QUEUES; i++, id++) { - odp_queue_t cmd_queue; - int pktio_index; + for (i = 0; i < PKTIO_CMD_QUEUES; i++, id++) { + uint32_t cmd_index; + pktio_cmd_t *cmd;
- if (id == POLL_CMD_QUEUES) + if (id == PKTIO_CMD_QUEUES) id = 0;
- if (sched->poll_cmd[id].num == 0) + if (sched->num_pktio_cmd[id] == 0) continue;
- cmd_queue = sched->poll_cmd[id].queue; - ev = odp_queue_deq(cmd_queue); + cmd_index = ring_deq(&sched->pktio_q[id].ring, PKTIO_RING_MASK);
- if (ev == ODP_EVENT_INVALID) + if (cmd_index == RING_EMPTY) continue;
- buf = odp_buffer_from_event(ev); - sched_cmd = odp_buffer_addr(buf); - - if (sched_cmd->cmd != SCHED_CMD_POLL_PKTIN) - ODP_ABORT("Bad poll command\n"); - - pktio_index = sched_cmd->pktio_index; + cmd = &sched->pktio_cmd[cmd_index];
/* Poll packet input */ - if (sched_cb_pktin_poll(pktio_index, - sched_cmd->num, - sched_cmd->index)) { + if (sched_cb_pktin_poll(cmd->pktio_index, + cmd->num_pktin, cmd->pktin)){ /* Pktio stopped or closed. Remove poll command and call * stop_finalize when all commands of the pktio has * been removed. */ - if (schedule_pktio_stop(sched_cmd) == 0) - sched_cb_pktio_stop_finalize(pktio_index); + if (schedule_pktio_stop(cmd->pktio_index, + cmd->pktin[0]) == 0) + sched_cb_pktio_stop_finalize(cmd->pktio_index);
- odp_buffer_free(buf); + free_pktio_cmd(cmd); } else { /* Continue scheduling the pktio */ - if (odp_queue_enq(cmd_queue, ev)) - ODP_ABORT("Poll command enqueue failed\n"); + ring_enq(&sched->pktio_q[id].ring, PKTIO_RING_MASK, + cmd_index);
/* Do not iterate through all pktin poll command queues * every time. */
commit de19d9847b76c90473c474a831800d383865a4ba Author: Petri Savolainen petri.savolainen@nokia.com Date: Mon Sep 5 13:09:25 2016 +0300
linux-gen: sched: ring based priority queues
Implemented scheduler priority queues with a ring based data structure instead of odp_queue_t queues. This enables performance optimization since a ring of indexes is more efficient to access than pointers in a linked list. Ring operations maybe further optimized with another lockfree algorithm.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Yi He yi.he@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h index 4a04b15..1b6ae93 100644 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ b/platform/linux-generic/include/odp_schedule_internal.h @@ -22,8 +22,7 @@ typedef struct { uint16_t round; uint16_t prefer_offset; uint16_t pktin_polls; - odp_queue_t pri_queue; - odp_event_t cmd_ev; + uint32_t queue_index; odp_queue_t queue; odp_event_t ev_stash[MAX_DEQ]; void *origin_qe; diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index e08de54..83c81e4 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -20,9 +20,13 @@ #include <odp/api/hints.h> #include <odp/api/cpu.h> #include <odp/api/thrmask.h> +#include <odp/api/atomic.h> #include <odp_config_internal.h> +#include <odp_align_internal.h> #include <odp_schedule_internal.h> #include <odp_schedule_ordered_internal.h> +#include <odp/api/sync.h> + #ifdef _ODP_PKTIO_IPC #include <odp_pool_internal.h> #endif @@ -52,6 +56,29 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Maximum number of packet IO interfaces */ #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
+/* Priority queue ring size. In worst case, all event queues are scheduled + * queues and have the same priority. The ring size must be larger than or + * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all + * queues in the worst case. */ +#define PRIO_QUEUE_RING_SIZE (ODP_CONFIG_QUEUES / QUEUES_PER_PRIO) + +/* Mask for wrapping around priority queue index */ +#define PRIO_QUEUE_MASK (PRIO_QUEUE_RING_SIZE - 1) + +/* Priority queue empty, not a valid queue index. */ +#define PRIO_QUEUE_EMPTY ((uint32_t)-1) + +/* Ring empty, not a valid index. */ +#define RING_EMPTY ((uint32_t)-1) + +/* For best performance, the number of queues should be a power of two. */ +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES), + "Number_of_queues_is_not_power_of_two"); + +/* Ring size must be power of two, so that MAX_QUEUE_IDX_MASK can be used. */ +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PRIO_QUEUE_RING_SIZE), + "Ring_size_is_not_power_of_two"); + /* Mask of queues per priority */ typedef uint8_t pri_mask_t;
@@ -61,11 +88,40 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, /* Start of named groups in group mask arrays */ #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
+/* Scheduler ring + * + * Ring stores head and tail counters. Ring indexes are formed from these + * counters with a mask (mask = ring_size - 1), which requires that ring size + * must be a power of two. */ +typedef struct { + /* Writer head and tail */ + odp_atomic_u32_t w_head; + odp_atomic_u32_t w_tail; + uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))]; + + /* Reader head and tail */ + odp_atomic_u32_t r_head; + odp_atomic_u32_t r_tail; + + uint32_t data[0]; +} sched_ring_t ODP_ALIGNED_CACHE; + +/* Priority queue */ +typedef struct { + /* Ring header */ + sched_ring_t ring; + + /* Ring data: queue indexes */ + uint32_t queue_index[PRIO_QUEUE_RING_SIZE]; + +} prio_queue_t ODP_ALIGNED_CACHE; + typedef struct { - odp_queue_t pri_queue[NUM_PRIO][QUEUES_PER_PRIO]; pri_mask_t pri_mask[NUM_PRIO]; odp_spinlock_t mask_lock;
+ prio_queue_t prio_q[NUM_PRIO][QUEUES_PER_PRIO]; + odp_spinlock_t poll_cmd_lock; struct { odp_queue_t queue; @@ -84,9 +140,8 @@ typedef struct { } sched_grp[NUM_SCHED_GRPS];
struct { - odp_event_t cmd_ev; - odp_queue_t pri_queue; int prio; + int queue_per_prio; } queue[ODP_CONFIG_QUEUES];
struct { @@ -121,14 +176,78 @@ static sched_global_t *sched; /* Thread local scheduler context */ __thread sched_local_t sched_local;
+static void ring_init(sched_ring_t *ring) +{ + odp_atomic_init_u32(&ring->w_head, 0); + odp_atomic_init_u32(&ring->w_tail, 0); + odp_atomic_init_u32(&ring->r_head, 0); + odp_atomic_init_u32(&ring->r_tail, 0); +} + +/* Dequeue data from the ring head */ +static inline uint32_t ring_deq(sched_ring_t *ring, uint32_t mask) +{ + uint32_t head, tail, new_head; + uint32_t data; + + head = odp_atomic_load_u32(&ring->r_head); + + /* Move reader head. This thread owns data at the new head. */ + do { + tail = odp_atomic_load_u32(&ring->w_tail); + + if (head == tail) + return RING_EMPTY; + + new_head = head + 1; + + } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, + new_head) == 0)); + + /* Read queue index */ + data = ring->data[new_head & mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) + odp_cpu_pause(); + + /* Now update the reader tail */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + + return data; +} + +/* Enqueue data into the ring tail */ +static inline void ring_enq(sched_ring_t *ring, uint32_t mask, uint32_t data) +{ + uint32_t old_head, new_head; + + /* Reserve a slot in the ring for writing */ + old_head = odp_atomic_fetch_inc_u32(&ring->w_head); + new_head = old_head + 1; + + /* Ring is full. Wait for the last reader to finish. */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head)) + odp_cpu_pause(); + + /* Write data */ + ring->data[new_head & mask] = data; + + /* Wait until other writers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) + odp_cpu_pause(); + + /* Now update the writer tail */ + odp_atomic_store_rel_u32(&ring->w_tail, new_head); +} + static void sched_local_init(void) { memset(&sched_local, 0, sizeof(sched_local_t));
sched_local.thr = odp_thread_id(); - sched_local.pri_queue = ODP_QUEUE_INVALID; sched_local.queue = ODP_QUEUE_INVALID; - sched_local.cmd_ev = ODP_EVENT_INVALID; + sched_local.queue_index = PRIO_QUEUE_EMPTY; }
static int schedule_init_global(void) @@ -179,25 +298,14 @@ static int schedule_init_global(void) odp_spinlock_init(&sched->mask_lock);
for (i = 0; i < NUM_PRIO; i++) { - odp_queue_t queue; - char name[] = "odp_priXX_YY"; - - name[7] = '0' + i / 10; - name[8] = '0' + i - 10*(i / 10); - for (j = 0; j < QUEUES_PER_PRIO; j++) { - name[10] = '0' + j / 10; - name[11] = '0' + j - 10*(j / 10); - - queue = odp_queue_create(name, NULL); + int k;
- if (queue == ODP_QUEUE_INVALID) { - ODP_ERR("Sched init: Queue create failed.\n"); - return -1; - } + ring_init(&sched->prio_q[i][j].ring);
- sched->pri_queue[i][j] = queue; - sched->pri_mask[i] = 0; + for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++) + sched->prio_q[i][j].queue_index[k] = + PRIO_QUEUE_EMPTY; } }
@@ -228,11 +336,6 @@ static int schedule_init_global(void)
odp_thrmask_setall(&sched->mask_all);
- for (i = 0; i < ODP_CONFIG_QUEUES; i++) { - sched->queue[i].cmd_ev = ODP_EVENT_INVALID; - sched->queue[i].pri_queue = ODP_QUEUE_INVALID; - } - ODP_DBG("done\n");
return 0; @@ -247,21 +350,14 @@ static int schedule_term_global(void)
for (i = 0; i < NUM_PRIO; i++) { for (j = 0; j < QUEUES_PER_PRIO; j++) { - odp_queue_t pri_q; - - pri_q = sched->pri_queue[i][j]; + sched_ring_t *ring = &sched->prio_q[i][j].ring; + uint32_t qi;
- while ((ev = odp_queue_deq(pri_q)) != - ODP_EVENT_INVALID) { - odp_buffer_t buf; - sched_cmd_t *sched_cmd; - uint32_t qi; + while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) != + RING_EMPTY) { odp_event_t events[1]; int num;
- buf = odp_buffer_from_event(ev); - sched_cmd = odp_buffer_addr(buf); - qi = sched_cmd->queue_index; num = sched_cb_queue_deq_multi(qi, events, 1);
if (num < 0) @@ -270,11 +366,6 @@ static int schedule_term_global(void) if (num > 0) ODP_ERR("Queue not empty\n"); } - - if (odp_queue_destroy(pri_q)) { - ODP_ERR("Pri queue destroy fail.\n"); - rc = -1; - } } }
@@ -331,19 +422,17 @@ static int schedule_term_local(void) return 0; }
-static inline int pri_id_queue(uint32_t queue_index) +static inline int queue_per_prio(uint32_t queue_index) { return ((QUEUES_PER_PRIO - 1) & queue_index); }
-static odp_queue_t pri_set(int id, int prio) +static void pri_set(int id, int prio) { odp_spinlock_lock(&sched->mask_lock); sched->pri_mask[prio] |= 1 << id; sched->pri_count[prio][id]++; odp_spinlock_unlock(&sched->mask_lock); - - return sched->pri_queue[prio][id]; }
static void pri_clr(int id, int prio) @@ -359,38 +448,27 @@ static void pri_clr(int id, int prio) odp_spinlock_unlock(&sched->mask_lock); }
-static odp_queue_t pri_set_queue(uint32_t queue_index, int prio) +static void pri_set_queue(uint32_t queue_index, int prio) { - int id = pri_id_queue(queue_index); + int id = queue_per_prio(queue_index);
return pri_set(id, prio); }
static void pri_clr_queue(uint32_t queue_index, int prio) { - int id = pri_id_queue(queue_index); + int id = queue_per_prio(queue_index); pri_clr(id, prio); }
static int schedule_init_queue(uint32_t queue_index, const odp_schedule_param_t *sched_param) { - odp_buffer_t buf; - sched_cmd_t *sched_cmd; int prio = sched_param->prio;
- buf = odp_buffer_alloc(sched->pool); - - if (buf == ODP_BUFFER_INVALID) - return -1; - - sched_cmd = odp_buffer_addr(buf); - sched_cmd->cmd = SCHED_CMD_DEQUEUE; - sched_cmd->queue_index = queue_index; - - sched->queue[queue_index].cmd_ev = odp_buffer_to_event(buf); - sched->queue[queue_index].pri_queue = pri_set_queue(queue_index, prio); + pri_set_queue(queue_index, prio); sched->queue[queue_index].prio = prio; + sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index);
return 0; } @@ -399,13 +477,9 @@ static void schedule_destroy_queue(uint32_t queue_index) { int prio = sched->queue[queue_index].prio;
- odp_event_free(sched->queue[queue_index].cmd_ev); - pri_clr_queue(queue_index, prio); - - sched->queue[queue_index].cmd_ev = ODP_EVENT_INVALID; - sched->queue[queue_index].pri_queue = ODP_QUEUE_INVALID; - sched->queue[queue_index].prio = 0; + sched->queue[queue_index].prio = 0; + sched->queue[queue_index].queue_per_prio = 0; }
static int poll_cmd_queue_idx(int pktio_index, int in_queue_idx) @@ -469,12 +543,16 @@ static int schedule_pktio_stop(sched_cmd_t *sched_cmd)
static void schedule_release_atomic(void) { - if (sched_local.pri_queue != ODP_QUEUE_INVALID && - sched_local.num == 0) { + uint32_t qi = sched_local.queue_index; + + if (qi != PRIO_QUEUE_EMPTY && sched_local.num == 0) { + int prio = sched->queue[qi].prio; + int queue_per_prio = sched->queue[qi].queue_per_prio; + sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; + /* Release current atomic queue */ - if (odp_queue_enq(sched_local.pri_queue, sched_local.cmd_ev)) - ODP_ABORT("odp_schedule_release_atomic failed\n"); - sched_local.pri_queue = ODP_QUEUE_INVALID; + ring_enq(ring, PRIO_QUEUE_MASK, qi); + sched_local.queue_index = PRIO_QUEUE_EMPTY; } }
@@ -553,11 +631,11 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], id = (sched_local.thr + offset) & (QUEUES_PER_PRIO - 1);
for (j = 0; j < QUEUES_PER_PRIO;) { - odp_queue_t pri_q; int num; int grp; int ordered; odp_queue_t handle; + sched_ring_t *ring;
if (id >= QUEUES_PER_PRIO) id = 0; @@ -570,20 +648,18 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], continue; }
- pri_q = sched->pri_queue[i][id]; - ev = odp_queue_deq(pri_q); + /* Get queue index from the priority queue */ + ring = &sched->prio_q[i][id].ring; + qi = ring_deq(ring, PRIO_QUEUE_MASK);
/* Priority queue empty */ - if (ev == ODP_EVENT_INVALID) { + if (qi == RING_EMPTY) { j++; id++; continue; }
- buf = odp_buffer_from_event(ev); - sched_cmd = odp_buffer_addr(buf); - qi = sched_cmd->queue_index; - grp = sched_cb_queue_grp(qi); + grp = sched_cb_queue_grp(qi);
if (grp > ODP_SCHED_GROUP_ALL && !odp_thrmask_isset(&sched->sched_grp[grp].mask, @@ -591,8 +667,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], /* This thread is not eligible for work from * this queue, so continue scheduling it. */ - if (odp_queue_enq(pri_q, ev)) - ODP_ABORT("schedule failed\n"); + ring_enq(ring, PRIO_QUEUE_MASK, qi);
j++; id++; @@ -632,19 +707,16 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
if (ordered) { /* Continue scheduling ordered queues */ - if (odp_queue_enq(pri_q, ev)) - ODP_ABORT("schedule failed\n"); + ring_enq(ring, PRIO_QUEUE_MASK, qi);
/* Cache order info about this event */ cache_order_info(qi); } else if (sched_cb_queue_is_atomic(qi)) { /* Hold queue during atomic access */ - sched_local.pri_queue = pri_q; - sched_local.cmd_ev = ev; + sched_local.queue_index = qi; } else { /* Continue scheduling the queue */ - if (odp_queue_enq(pri_q, ev)) - ODP_ABORT("schedule failed\n"); + ring_enq(ring, PRIO_QUEUE_MASK, qi); }
/* Output the source queue handle */ @@ -969,11 +1041,14 @@ static void schedule_prefetch(int num ODP_UNUSED)
static int schedule_sched_queue(uint32_t queue_index) { - odp_queue_t pri_queue = sched->queue[queue_index].pri_queue; - odp_event_t cmd_ev = sched->queue[queue_index].cmd_ev; + int prio = sched->queue[queue_index].prio; + int queue_per_prio = sched->queue[queue_index].queue_per_prio; + sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
sched_local.ignore_ordered_context = 1; - return odp_queue_enq(pri_queue, cmd_ev); + + ring_enq(ring, PRIO_QUEUE_MASK, queue_index); + return 0; }
static int schedule_num_grps(void)
-----------------------------------------------------------------------
Summary of changes: .../linux-generic/include/odp_schedule_internal.h | 5 +- platform/linux-generic/odp_schedule.c | 599 ++++++++++++--------- 2 files changed, 338 insertions(+), 266 deletions(-)
hooks/post-receive