This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via 7e0c523ad6f209f238277e4116c21ba696edb7f4 (commit) via 12d238d96318cc9d46dd5a2893894824a633805b (commit) via bcd246e3a9948a631eb8eb77886ac1b369b13dba (commit) via c57da8ddc80e0e314957c6d11db288ef11fc1805 (commit) via 535ad269bfc6581a048d3e844576063be5d19501 (commit) from d8dd54f2325cf859703721e7a17b6cbe734d857a (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 7e0c523ad6f209f238277e4116c21ba696edb7f4 Author: Petri Savolainen petri.savolainen@linaro.org Date: Mon Mar 12 17:05:32 2018 +0200
linux-gen: sched: increase max spread
Allow user to configure larger than the default spread value.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Balasubramanian Manoharan bala.manoharan@linaro.org Reviewed-by: Dmitry Eremin-Solenikov dmitry.ereminsolenikov@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 3bcafdb4..b50462a5 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -46,7 +46,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && #define GRP_WEIGHT_TBL_SIZE NUM_SCHED_GRPS
/* Maximum priority queue spread */ -#define MAX_SPREAD 4 +#define MAX_SPREAD 8
/* Minimum priority queue spread */ #define MIN_SPREAD 1
commit 12d238d96318cc9d46dd5a2893894824a633805b Author: Petri Savolainen petri.savolainen@linaro.org Date: Mon Mar 12 16:54:41 2018 +0200
linux-gen: sched: decouple spread and group table sizes
Only thing common between these tables were the shared round counter. With separate counters, tables can have different sizes. Especially, spread table size needs to be increased and support also sizes not power of two (configurable spread values).
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Balasubramanian Manoharan bala.manoharan@linaro.org Reviewed-by: Dmitry Eremin-Solenikov dmitry.ereminsolenikov@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index e6d28c6d..3bcafdb4 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -42,6 +42,9 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Number of scheduling groups */ #define NUM_SCHED_GRPS 32
+/* Group weight table size */ +#define GRP_WEIGHT_TBL_SIZE NUM_SCHED_GRPS + /* Maximum priority queue spread */ #define MAX_SPREAD 4
@@ -52,8 +55,8 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && * of the prefer queue. */ #define PREFER_RATIO 64
-/* Size of poll weight table */ -#define WEIGHT_TBL_SIZE ((MAX_SPREAD - 1) * PREFER_RATIO) +/* Spread weight table */ +#define SPREAD_TBL_SIZE ((MAX_SPREAD - 1) * PREFER_RATIO)
/* Maximum number of packet IO interfaces */ #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES @@ -116,17 +119,18 @@ typedef struct { int thr; uint16_t stash_num; uint16_t stash_index; - uint16_t pause; - uint16_t round; + uint16_t grp_round; + uint16_t spread_round; uint32_t stash_qi; odp_queue_t stash_queue; odp_event_t stash_ev[MAX_DEQ];
uint32_t grp_epoch; - int num_grp; + uint16_t num_grp; + uint16_t pause; uint8_t grp[NUM_SCHED_GRPS]; - uint8_t weight_tbl[WEIGHT_TBL_SIZE]; - uint8_t grp_weight[WEIGHT_TBL_SIZE]; + uint8_t spread_tbl[SPREAD_TBL_SIZE]; + uint8_t grp_weight[GRP_WEIGHT_TBL_SIZE];
struct { /* Source queue index */ @@ -182,6 +186,7 @@ typedef struct { odp_spinlock_t grp_lock; odp_atomic_u32_t grp_epoch; uint32_t ring_mask; + uint16_t max_spread;
struct { char name[ODP_SCHED_GROUP_NAME_LEN]; @@ -216,6 +221,7 @@ ODP_STATIC_ASSERT(MAX_SPREAD <= 256, "Spread_does_not_fit_8_bits"); ODP_STATIC_ASSERT(CONFIG_QUEUE_MAX_ORD_LOCKS <= 256, "Ordered_lock_count_does_not_fit_8_bits"); ODP_STATIC_ASSERT(NUM_PKTIO <= 256, "Pktio_index_does_not_fit_8_bits"); +ODP_STATIC_ASSERT(CHECK_IS_POWER2(GRP_WEIGHT_TBL_SIZE), "Not_power_of_2");
/* Global scheduler context */ static sched_global_t *sched; @@ -271,11 +277,11 @@ static void sched_local_init(void)
spread = prio_spread_index(sched_local.thr);
- for (i = 0; i < WEIGHT_TBL_SIZE; i++) { - sched_local.weight_tbl[i] = spread; + for (i = 0; i < SPREAD_TBL_SIZE; i++) { + sched_local.spread_tbl[i] = spread;
if (num_spread > 1 && (i % PREFER_RATIO) == 0) { - sched_local.weight_tbl[i] = prio_spread_index(spread + + sched_local.spread_tbl[i] = prio_spread_index(spread + offset); offset++; if (offset == num_spread) @@ -309,6 +315,8 @@ static int schedule_init_global(void) return -1; }
+ /* When num_spread == 1, only spread_tbl[0] is used. */ + sched->max_spread = (sched->config.num_spread - 1) * PREFER_RATIO; sched->shm = shm; odp_spinlock_init(&sched->mask_lock);
@@ -442,7 +450,7 @@ static inline int grp_update_tbl(void) odp_spinlock_unlock(&sched->grp_lock);
/* Update group weights. Round robin over all thread's groups. */ - for (i = 0; i < WEIGHT_TBL_SIZE; i++) + for (i = 0; i < GRP_WEIGHT_TBL_SIZE; i++) sched_local.grp_weight[i] = i % num;
sched_local.num_grp = num; @@ -961,7 +969,7 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], int i, num_grp; int ret; int first, grp_id; - uint16_t round; + uint16_t spread_round, grp_round; uint32_t epoch;
if (sched_local.stash_num) { @@ -978,15 +986,17 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], if (odp_unlikely(sched_local.pause)) return 0;
- /* Each thread prefers a priority queue. Poll weight table avoids + /* Each thread prefers a priority queue. Spread weight table avoids * starvation of other priority queues on low thread counts. */ - round = sched_local.round + 1; + spread_round = sched_local.spread_round; + grp_round = (sched_local.grp_round++) & (GRP_WEIGHT_TBL_SIZE - 1);
- if (odp_unlikely(round == WEIGHT_TBL_SIZE)) - round = 0; + if (odp_unlikely(spread_round + 1 >= sched->max_spread)) + sched_local.spread_round = 0; + else + sched_local.spread_round = spread_round + 1;
- sched_local.round = round; - first = sched_local.weight_tbl[round]; + first = sched_local.spread_tbl[spread_round];
epoch = odp_atomic_load_acq_u32(&sched->grp_epoch); num_grp = sched_local.num_grp; @@ -996,7 +1006,7 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.grp_epoch = epoch; }
- grp_id = sched_local.grp_weight[round]; + grp_id = sched_local.grp_weight[grp_round];
/* Schedule queues per group and priority */ for (i = 0; i < num_grp; i++) {
commit bcd246e3a9948a631eb8eb77886ac1b369b13dba Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Mar 9 16:49:48 2018 +0200
linux-gen: sched: configurable priority spread
Use configuration file to enable user to change priority queue spreading.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Balasubramanian Manoharan bala.manoharan@linaro.org Reviewed-by: Dmitry Eremin-Solenikov dmitry.ereminsolenikov@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/config/odp-linux-generic.conf b/config/odp-linux-generic.conf index 91605886..0034c64b 100644 --- a/config/odp-linux-generic.conf +++ b/config/odp-linux-generic.conf @@ -38,3 +38,12 @@ queue_basic: { # Default queue size. Value must be a power of two. default_queue_size = 4096 } + +sched_basic: { + # Priority level spread. Each priority level is spread into multiple + # scheduler internal queues. A higher spread value typically improves + # parallelism and thus is better for high thread counts, but causes + # uneven service level for low thread counts. Typically, optimal + # value is the number of threads using the scheduler. + prio_spread = 4 +} diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index cd20b39d..e6d28c6d 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -27,6 +27,7 @@ #include <odp_timer_internal.h> #include <odp_queue_internal.h> #include <odp_buffer_inlines.h> +#include <odp_libconfig_internal.h>
/* Number of priority levels */ #define NUM_PRIO 8 @@ -41,15 +42,18 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Number of scheduling groups */ #define NUM_SCHED_GRPS 32
-/* Priority queues per priority */ -#define QUEUES_PER_PRIO 4 +/* Maximum priority queue spread */ +#define MAX_SPREAD 4 + +/* Minimum priority queue spread */ +#define MIN_SPREAD 1
/* A thread polls a non preferred sched queue every this many polls * of the prefer queue. */ #define PREFER_RATIO 64
/* Size of poll weight table */ -#define WEIGHT_TBL_SIZE ((QUEUES_PER_PRIO - 1) * PREFER_RATIO) +#define WEIGHT_TBL_SIZE ((MAX_SPREAD - 1) * PREFER_RATIO)
/* Maximum number of packet IO interfaces */ #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES @@ -60,14 +64,10 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Not a valid index */ #define NULL_INDEX ((uint32_t)-1)
-/* Priority queue ring size. In worst case, all event queues are scheduled - * queues and have the same priority. The ring size must be larger than or - * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all - * queues in the worst case. */ -#define PRIO_QUEUE_RING_SIZE (ODP_CONFIG_QUEUES / QUEUES_PER_PRIO) - -/* Mask for wrapping around priority queue index */ -#define RING_MASK (PRIO_QUEUE_RING_SIZE - 1) +/* Maximum priority queue ring size. A ring must be large enough to store all + * queues in the worst case (all queues are scheduled, have the same priority + * and no spreading). */ +#define MAX_RING_SIZE ODP_CONFIG_QUEUES
/* Priority queue empty, not a valid queue index. */ #define PRIO_QUEUE_EMPTY NULL_INDEX @@ -76,14 +76,14 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES), "Number_of_queues_is_not_power_of_two");
-/* Ring size must be power of two, so that MAX_QUEUE_IDX_MASK can be used. */ -ODP_STATIC_ASSERT(CHECK_IS_POWER2(PRIO_QUEUE_RING_SIZE), +/* Ring size must be power of two, so that mask can be used. */ +ODP_STATIC_ASSERT(CHECK_IS_POWER2(MAX_RING_SIZE), "Ring_size_is_not_power_of_two");
/* Mask of queues per priority */ typedef uint8_t pri_mask_t;
-ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, +ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= MAX_SPREAD, "pri_mask_t_is_too_small");
/* Start of named groups in group mask arrays */ @@ -147,7 +147,7 @@ typedef struct ODP_ALIGNED_CACHE { ring_t ring;
/* Ring data: queue indexes */ - uint32_t queue_index[PRIO_QUEUE_RING_SIZE]; + uint32_t queue_index[MAX_RING_SIZE];
} prio_queue_t;
@@ -168,14 +168,20 @@ typedef struct { pri_mask_t pri_mask[NUM_PRIO]; odp_spinlock_t mask_lock;
- prio_queue_t prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO]; + prio_queue_t prio_q[NUM_SCHED_GRPS][NUM_PRIO][MAX_SPREAD];
odp_shm_t shm; - uint32_t pri_count[NUM_PRIO][QUEUES_PER_PRIO]; + + struct { + uint8_t num_spread; + } config; + + uint32_t pri_count[NUM_PRIO][MAX_SPREAD];
odp_thrmask_t mask_all; odp_spinlock_t grp_lock; odp_atomic_u32_t grp_epoch; + uint32_t ring_mask;
struct { char name[ODP_SCHED_GROUP_NAME_LEN]; @@ -186,7 +192,7 @@ typedef struct { struct { uint8_t grp; uint8_t prio; - uint8_t queue_per_prio; + uint8_t spread; uint8_t sync; uint8_t order_lock_count; uint8_t poll_pktin; @@ -206,8 +212,7 @@ typedef struct { /* Check that queue[] variables are large enough */ ODP_STATIC_ASSERT(NUM_SCHED_GRPS <= 256, "Group_does_not_fit_8_bits"); ODP_STATIC_ASSERT(NUM_PRIO <= 256, "Prio_does_not_fit_8_bits"); -ODP_STATIC_ASSERT(QUEUES_PER_PRIO <= 256, - "Queues_per_prio_does_not_fit_8_bits"); +ODP_STATIC_ASSERT(MAX_SPREAD <= 256, "Spread_does_not_fit_8_bits"); ODP_STATIC_ASSERT(CONFIG_QUEUE_MAX_ORD_LOCKS <= 256, "Ordered_lock_count_does_not_fit_8_bits"); ODP_STATIC_ASSERT(NUM_PKTIO <= 256, "Pktio_index_does_not_fit_8_bits"); @@ -221,11 +226,41 @@ static __thread sched_local_t sched_local; /* Function prototypes */ static inline void schedule_release_context(void);
+static int read_config_file(sched_global_t *sched) +{ + const char *str; + int val = 0; + + ODP_PRINT("Scheduler config:\n"); + + str = "sched_basic.prio_spread"; + if (!_odp_libconfig_lookup_int(str, &val)) { + ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + + if (val > MAX_SPREAD || val < MIN_SPREAD) { + ODP_ERR("Bad value %s = %u\n", str, val); + return -1; + } + + sched->config.num_spread = val; + ODP_PRINT(" %s: %i\n\n", str, val); + + return 0; +} + +static inline uint8_t prio_spread_index(uint32_t index) +{ + return index % sched->config.num_spread; +} + static void sched_local_init(void) { int i; - uint8_t id; - uint8_t offset = 0; + uint8_t spread; + uint8_t num_spread = sched->config.num_spread; + uint8_t offset = 1;
memset(&sched_local, 0, sizeof(sched_local_t));
@@ -234,17 +269,17 @@ static void sched_local_init(void) sched_local.stash_qi = PRIO_QUEUE_EMPTY; sched_local.ordered.src_queue = NULL_INDEX;
- id = sched_local.thr & (QUEUES_PER_PRIO - 1); + spread = prio_spread_index(sched_local.thr);
for (i = 0; i < WEIGHT_TBL_SIZE; i++) { - sched_local.weight_tbl[i] = id; + sched_local.weight_tbl[i] = spread;
- if (i % PREFER_RATIO == 0) { + if (num_spread > 1 && (i % PREFER_RATIO) == 0) { + sched_local.weight_tbl[i] = prio_spread_index(spread + + offset); offset++; - sched_local.weight_tbl[i] = (id + offset) & - (QUEUES_PER_PRIO - 1); - if (offset == QUEUES_PER_PRIO - 1) - offset = 0; + if (offset == num_spread) + offset = 1; } } } @@ -269,19 +304,24 @@ static int schedule_init_global(void)
memset(sched, 0, sizeof(sched_global_t));
+ if (read_config_file(sched)) { + odp_shm_free(shm); + return -1; + } + sched->shm = shm; odp_spinlock_init(&sched->mask_lock);
for (grp = 0; grp < NUM_SCHED_GRPS; grp++) { for (i = 0; i < NUM_PRIO; i++) { - for (j = 0; j < QUEUES_PER_PRIO; j++) { + for (j = 0; j < MAX_SPREAD; j++) { prio_queue_t *prio_q; int k;
prio_q = &sched->prio_q[grp][i][j]; ring_init(&prio_q->ring);
- for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++) { + for (k = 0; k < MAX_RING_SIZE; k++) { prio_q->queue_index[k] = PRIO_QUEUE_EMPTY; } @@ -322,14 +362,15 @@ static int schedule_term_global(void) int ret = 0; int rc = 0; int i, j, grp; + uint32_t ring_mask = sched->ring_mask;
for (grp = 0; grp < NUM_SCHED_GRPS; grp++) { for (i = 0; i < NUM_PRIO; i++) { - for (j = 0; j < QUEUES_PER_PRIO; j++) { + for (j = 0; j < MAX_SPREAD; j++) { ring_t *ring = &sched->prio_q[grp][i][j].ring; uint32_t qi;
- while ((qi = ring_deq(ring, RING_MASK)) != + while ((qi = ring_deq(ring, ring_mask)) != RING_EMPTY) { odp_event_t events[1]; int num; @@ -413,11 +454,6 @@ static uint32_t schedule_max_ordered_locks(void) return CONFIG_QUEUE_MAX_ORD_LOCKS; }
-static inline int queue_per_prio(uint32_t queue_index) -{ - return ((QUEUES_PER_PRIO - 1) & queue_index); -} - static void pri_set(int id, int prio) { odp_spinlock_lock(&sched->mask_lock); @@ -441,33 +477,39 @@ static void pri_clr(int id, int prio)
static void pri_set_queue(uint32_t queue_index, int prio) { - int id = queue_per_prio(queue_index); + uint8_t id = prio_spread_index(queue_index);
return pri_set(id, prio); }
static void pri_clr_queue(uint32_t queue_index, int prio) { - int id = queue_per_prio(queue_index); + uint8_t id = prio_spread_index(queue_index); pri_clr(id, prio); }
static int schedule_init_queue(uint32_t queue_index, const odp_schedule_param_t *sched_param) { + uint32_t ring_size; int i; int prio = sched_param->prio;
pri_set_queue(queue_index, prio); sched->queue[queue_index].grp = sched_param->group; sched->queue[queue_index].prio = prio; - sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index); + sched->queue[queue_index].spread = prio_spread_index(queue_index); sched->queue[queue_index].sync = sched_param->sync; sched->queue[queue_index].order_lock_count = sched_param->lock_count; sched->queue[queue_index].poll_pktin = 0; sched->queue[queue_index].pktio_index = 0; sched->queue[queue_index].pktin_index = 0;
+ ring_size = MAX_RING_SIZE / sched->config.num_spread; + ring_size = ROUNDUP_POWER2_U32(ring_size); + ODP_ASSERT(ring_size <= MAX_RING_SIZE); + sched->ring_mask = ring_size - 1; + odp_atomic_init_u64(&sched->order[queue_index].ctx, 0); odp_atomic_init_u64(&sched->order[queue_index].next_ctx, 0);
@@ -492,9 +534,9 @@ static void schedule_destroy_queue(uint32_t queue_index) int prio = sched->queue[queue_index].prio;
pri_clr_queue(queue_index, prio); - sched->queue[queue_index].grp = 0; - sched->queue[queue_index].prio = 0; - sched->queue[queue_index].queue_per_prio = 0; + sched->queue[queue_index].grp = 0; + sched->queue[queue_index].prio = 0; + sched->queue[queue_index].spread = 0;
if (queue_is_ordered(queue_index) && odp_atomic_load_u64(&sched->order[queue_index].ctx) != @@ -504,12 +546,12 @@ static void schedule_destroy_queue(uint32_t queue_index)
static int schedule_sched_queue(uint32_t queue_index) { - int grp = sched->queue[queue_index].grp; - int prio = sched->queue[queue_index].prio; - int queue_per_prio = sched->queue[queue_index].queue_per_prio; - ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring; + int grp = sched->queue[queue_index].grp; + int prio = sched->queue[queue_index].prio; + int spread = sched->queue[queue_index].spread; + ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
- ring_enq(ring, RING_MASK, queue_index); + ring_enq(ring, sched->ring_mask, queue_index); return 0; }
@@ -540,13 +582,14 @@ static void schedule_release_atomic(void) uint32_t qi = sched_local.stash_qi;
if (qi != PRIO_QUEUE_EMPTY && sched_local.stash_num == 0) { - int grp = sched->queue[qi].grp; - int prio = sched->queue[qi].prio; - int queue_per_prio = sched->queue[qi].queue_per_prio; - ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring; + int grp = sched->queue[qi].grp; + int prio = sched->queue[qi].prio; + int spread = sched->queue[qi].spread; + ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
/* Release current atomic queue */ - ring_enq(ring, RING_MASK, qi); + ring_enq(ring, sched->ring_mask, qi); + sched_local.stash_qi = PRIO_QUEUE_EMPTY; } } @@ -773,8 +816,10 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], int prio, i; int ret; int id; - unsigned int max_deq = MAX_DEQ; uint32_t qi; + unsigned int max_deq = MAX_DEQ; + int num_spread = sched->config.num_spread; + uint32_t ring_mask = sched->ring_mask;
/* Schedule events */ for (prio = 0; prio < NUM_PRIO; prio++) { @@ -785,14 +830,14 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], /* Select the first ring based on weights */ id = first;
- for (i = 0; i < QUEUES_PER_PRIO;) { + for (i = 0; i < num_spread;) { int num; int ordered; odp_queue_t handle; ring_t *ring; int pktin;
- if (id >= QUEUES_PER_PRIO) + if (id >= num_spread) id = 0;
/* No queues created for this priority queue */ @@ -805,7 +850,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
/* Get queue index from the priority queue */ ring = &sched->prio_q[grp][prio][id].ring; - qi = ring_deq(ring, RING_MASK); + qi = ring_deq(ring, ring_mask);
/* Priority queue empty */ if (qi == RING_EMPTY) { @@ -854,7 +899,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], continue;
if (num_pkt == 0 || !stash) { - ring_enq(ring, RING_MASK, qi); + ring_enq(ring, ring_mask, qi); break; }
@@ -880,14 +925,14 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.ordered.src_queue = qi;
/* Continue scheduling ordered queues */ - ring_enq(ring, RING_MASK, qi); + ring_enq(ring, ring_mask, qi);
} else if (queue_is_atomic(qi)) { /* Hold queue during atomic access */ sched_local.stash_qi = qi; } else { /* Continue scheduling the queue */ - ring_enq(ring, RING_MASK, qi); + ring_enq(ring, ring_mask, qi); }
handle = queue_from_index(qi);
commit c57da8ddc80e0e314957c6d11db288ef11fc1805 Author: Petri Savolainen petri.savolainen@linaro.org Date: Thu Mar 8 14:55:14 2018 +0200
linux-gen: queue: configurable max size
Use configuration file to enable user to change the maximum queue size. Ring memory for all queues is reserved based on the max size.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Balasubramanian Manoharan bala.manoharan@linaro.org Reviewed-by: Dmitry Eremin-Solenikov dmitry.ereminsolenikov@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/config/odp-linux-generic.conf b/config/odp-linux-generic.conf index 306ee197..91605886 100644 --- a/config/odp-linux-generic.conf +++ b/config/odp-linux-generic.conf @@ -32,6 +32,9 @@ pktio_dpdk: { }
queue_basic: { + # Maximum queue size. Value must be a power of two. + max_queue_size = 8192 + # Default queue size. Value must be a power of two. default_queue_size = 4096 } diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 386d804e..98e86fa0 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -63,19 +63,17 @@ union queue_entry_u { uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))]; };
-typedef struct ODP_ALIGNED_CACHE { - /* Storage space for ring data */ - uint32_t data[CONFIG_QUEUE_SIZE]; -} queue_ring_data_t; - typedef struct queue_global_t { - queue_entry_t queue[ODP_CONFIG_QUEUES]; - queue_ring_data_t ring_data[ODP_CONFIG_QUEUES]; - uint32_t queue_lf_num; - uint32_t queue_lf_size; - queue_lf_func_t queue_lf_func; + queue_entry_t queue[ODP_CONFIG_QUEUES]; + uint32_t *ring_data; + uint32_t queue_lf_num; + uint32_t queue_lf_size; + queue_lf_func_t queue_lf_func; + odp_shm_t queue_gbl_shm; + odp_shm_t queue_ring_shm;
struct { + uint32_t max_queue_size; uint32_t default_queue_size; } config;
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index bbd12fe8..89a0cd90 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -38,7 +38,7 @@ #include <inttypes.h>
#define MIN_QUEUE_SIZE 8 -#define MAX_QUEUE_SIZE CONFIG_QUEUE_SIZE +#define MAX_QUEUE_SIZE (1 * 1024 * 1024)
static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param); @@ -64,11 +64,11 @@ static int queue_capa(odp_queue_capability_t *capa, int sched) /* Reserve some queues for internal use */ capa->max_queues = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; capa->plain.max_num = capa->max_queues; - capa->plain.max_size = MAX_QUEUE_SIZE; + capa->plain.max_size = queue_glb->config.max_queue_size; capa->plain.lockfree.max_num = queue_glb->queue_lf_num; capa->plain.lockfree.max_size = queue_glb->queue_lf_size; capa->sched.max_num = capa->max_queues; - capa->sched.max_size = MAX_QUEUE_SIZE; + capa->sched.max_size = queue_glb->config.max_queue_size;
if (sched) { capa->max_ordered_locks = sched_fn->max_ordered_locks(); @@ -87,7 +87,7 @@ static int read_config_file(queue_global_t *queue_glb)
ODP_PRINT("Queue config:\n");
- str = "queue_basic.default_queue_size"; + str = "queue_basic.max_queue_size"; if (!_odp_libconfig_lookup_int(str, &val)) { ODP_ERR("Config option '%s' not found.\n", str); return -1; @@ -101,6 +101,24 @@ static int read_config_file(queue_global_t *queue_glb) return -1; }
+ queue_glb->config.max_queue_size = val_u32; + ODP_PRINT(" %s: %u\n", str, val_u32); + + str = "queue_basic.default_queue_size"; + if (!_odp_libconfig_lookup_int(str, &val)) { + ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + + val_u32 = val; + + if (val_u32 > queue_glb->config.max_queue_size || + val_u32 < MIN_QUEUE_SIZE || + !CHECK_IS_POWER2(val_u32)) { + ODP_ERR("Bad value %s = %u\n", str, val_u32); + return -1; + } + queue_glb->config.default_queue_size = val_u32; ODP_PRINT(" %s: %u\n\n", str, val_u32);
@@ -114,10 +132,11 @@ static int queue_init_global(void) uint32_t lf_size = 0; queue_lf_func_t *lf_func; odp_queue_capability_t capa; + uint64_t mem_size;
ODP_DBG("Starts...\n");
- shm = odp_shm_reserve("odp_queues", + shm = odp_shm_reserve("_odp_queue_gbl", sizeof(queue_global_t), sizeof(queue_entry_t), 0);
@@ -141,6 +160,21 @@ static int queue_init_global(void) return -1; }
+ queue_glb->queue_gbl_shm = shm; + mem_size = sizeof(uint32_t) * ODP_CONFIG_QUEUES * + (uint64_t)queue_glb->config.max_queue_size; + + shm = odp_shm_reserve("_odp_queue_rings", mem_size, + ODP_CACHE_LINE_SIZE, 0); + + if (shm == ODP_SHM_INVALID) { + odp_shm_free(queue_glb->queue_gbl_shm); + return -1; + } + + queue_glb->queue_ring_shm = shm; + queue_glb->ring_data = odp_shm_addr(shm); + lf_func = &queue_glb->queue_lf_func; queue_glb->queue_lf_num = queue_lf_init_global(&lf_size, lf_func); queue_glb->queue_lf_size = lf_size; @@ -170,7 +204,6 @@ static int queue_term_local(void) static int queue_term_global(void) { int ret = 0; - int rc = 0; queue_entry_t *queue; int i;
@@ -179,20 +212,24 @@ static int queue_term_global(void) LOCK(queue); if (queue->s.status != QUEUE_STATUS_FREE) { ODP_ERR("Not destroyed queue: %s\n", queue->s.name); - rc = -1; + ret = -1; } UNLOCK(queue); }
queue_lf_term_global();
- ret = odp_shm_free(odp_shm_lookup("odp_queues")); - if (ret < 0) { - ODP_ERR("shm free failed for odp_queues"); - rc = -1; + if (odp_shm_free(queue_glb->queue_ring_shm)) { + ODP_ERR("shm free failed"); + ret = -1; + } + + if (odp_shm_free(queue_glb->queue_gbl_shm)) { + ODP_ERR("shm free failed"); + ret = -1; }
- return rc; + return ret; }
static int queue_capability(odp_queue_capability_t *capa) @@ -244,7 +281,7 @@ static odp_queue_t queue_create(const char *name, }
if (param->nonblocking == ODP_BLOCKING) { - if (param->size > MAX_QUEUE_SIZE) + if (param->size > queue_glb->config.max_queue_size) return ODP_QUEUE_INVALID; } else if (param->nonblocking == ODP_NONBLOCKING_LF) { /* Only plain type lock-free queues supported */ @@ -623,6 +660,7 @@ static odp_event_t queue_deq(odp_queue_t handle) static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param) { + uint64_t offset; uint32_t queue_size;
if (name == NULL) { @@ -657,13 +695,15 @@ static int queue_init(queue_entry_t *queue, const char *name, /* Round up if not already a power of two */ queue_size = ROUNDUP_POWER2_U32(queue_size);
- if (queue_size > MAX_QUEUE_SIZE) { + if (queue_size > queue_glb->config.max_queue_size) { ODP_ERR("Too large queue size %u\n", queue_size); return -1; }
- ring_st_init(&queue->s.ring_st, - queue_glb->ring_data[queue->s.index].data, queue_size); + offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size; + + ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset], + queue_size);
return 0; }
commit 535ad269bfc6581a048d3e844576063be5d19501 Author: Petri Savolainen petri.savolainen@linaro.org Date: Wed Mar 7 15:33:55 2018 +0200
linux-gen: queue: configurable default size
Use configuration file to enable user to change default queue size. Queue size parameter from application is used for larger queues than the default size.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Balasubramanian Manoharan bala.manoharan@linaro.org Reviewed-by: Dmitry Eremin-Solenikov dmitry.ereminsolenikov@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/config/odp-linux-generic.conf b/config/odp-linux-generic.conf index 15e65d00..306ee197 100644 --- a/config/odp-linux-generic.conf +++ b/config/odp-linux-generic.conf @@ -30,3 +30,8 @@ pktio_dpdk: { rx_drop_en = 1 } } + +queue_basic: { + # Default queue size. Value must be a power of two. + default_queue_size = 4096 +} diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 3aec3fe9..386d804e 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -75,6 +75,10 @@ typedef struct queue_global_t { uint32_t queue_lf_size; queue_lf_func_t queue_lf_func;
+ struct { + uint32_t default_queue_size; + } config; + } queue_global_t;
extern queue_global_t *queue_glb; diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index 1218987b..bbd12fe8 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -25,6 +25,7 @@ #include <odp/api/hints.h> #include <odp/api/sync.h> #include <odp/api/traffic_mngr.h> +#include <odp_libconfig_internal.h>
#define NUM_INTERNAL_QUEUES 64
@@ -36,6 +37,9 @@ #include <string.h> #include <inttypes.h>
+#define MIN_QUEUE_SIZE 8 +#define MAX_QUEUE_SIZE CONFIG_QUEUE_SIZE + static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param);
@@ -60,11 +64,11 @@ static int queue_capa(odp_queue_capability_t *capa, int sched) /* Reserve some queues for internal use */ capa->max_queues = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; capa->plain.max_num = capa->max_queues; - capa->plain.max_size = CONFIG_QUEUE_SIZE; + capa->plain.max_size = MAX_QUEUE_SIZE; capa->plain.lockfree.max_num = queue_glb->queue_lf_num; capa->plain.lockfree.max_size = queue_glb->queue_lf_size; capa->sched.max_num = capa->max_queues; - capa->sched.max_size = CONFIG_QUEUE_SIZE; + capa->sched.max_size = MAX_QUEUE_SIZE;
if (sched) { capa->max_ordered_locks = sched_fn->max_ordered_locks(); @@ -75,6 +79,34 @@ static int queue_capa(odp_queue_capability_t *capa, int sched) return 0; }
+static int read_config_file(queue_global_t *queue_glb) +{ + const char *str; + uint32_t val_u32; + int val = 0; + + ODP_PRINT("Queue config:\n"); + + str = "queue_basic.default_queue_size"; + if (!_odp_libconfig_lookup_int(str, &val)) { + ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + + val_u32 = val; + + if (val_u32 > MAX_QUEUE_SIZE || val_u32 < MIN_QUEUE_SIZE || + !CHECK_IS_POWER2(val_u32)) { + ODP_ERR("Bad value %s = %u\n", str, val_u32); + return -1; + } + + queue_glb->config.default_queue_size = val_u32; + ODP_PRINT(" %s: %u\n\n", str, val_u32); + + return 0; +} + static int queue_init_global(void) { uint32_t i; @@ -104,6 +136,11 @@ static int queue_init_global(void) queue->s.handle = queue_from_index(i); }
+ if (read_config_file(queue_glb)) { + odp_shm_free(shm); + return -1; + } + lf_func = &queue_glb->queue_lf_func; queue_glb->queue_lf_num = queue_lf_init_global(&lf_size, lf_func); queue_glb->queue_lf_size = lf_size; @@ -207,7 +244,7 @@ static odp_queue_t queue_create(const char *name, }
if (param->nonblocking == ODP_BLOCKING) { - if (param->size > CONFIG_QUEUE_SIZE) + if (param->size > MAX_QUEUE_SIZE) return ODP_QUEUE_INVALID; } else if (param->nonblocking == ODP_NONBLOCKING_LF) { /* Only plain type lock-free queues supported */ @@ -586,6 +623,8 @@ static odp_event_t queue_deq(odp_queue_t handle) static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param) { + uint32_t queue_size; + if (name == NULL) { queue->s.name[0] = 0; } else { @@ -609,9 +648,22 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.pktin = PKTIN_INVALID; queue->s.pktout = PKTOUT_INVALID;
+ /* Use default size for all small queues to quarantee performance + * level. */ + queue_size = queue_glb->config.default_queue_size; + if (param->size > queue_glb->config.default_queue_size) + queue_size = param->size; + + /* Round up if not already a power of two */ + queue_size = ROUNDUP_POWER2_U32(queue_size); + + if (queue_size > MAX_QUEUE_SIZE) { + ODP_ERR("Too large queue size %u\n", queue_size); + return -1; + } + ring_st_init(&queue->s.ring_st, - queue_glb->ring_data[queue->s.index].data, - CONFIG_QUEUE_SIZE); + queue_glb->ring_data[queue->s.index].data, queue_size);
return 0; }
-----------------------------------------------------------------------
Summary of changes: config/odp-linux-generic.conf | 17 ++ .../linux-generic/include/odp_queue_internal.h | 22 ++- platform/linux-generic/odp_queue_basic.c | 120 ++++++++++-- platform/linux-generic/odp_schedule_basic.c | 209 +++++++++++++-------- 4 files changed, 267 insertions(+), 101 deletions(-)
hooks/post-receive