This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via 36f589b5a84804ce246bcd9a3eb9352801aaf41d (commit) via 2bbd7ace9e236d27dcfb52acfef991bd5ce39354 (commit) via 196d8f518071cc8c37dde0835cb7559d6cdd55da (commit) via 42fc5ebdcd5a1e5fbb7a19a2756a310f23a0cdd5 (commit) via bb8015d15ca4b0e2033eeca09906032a88e74bff (commit) via 3e61be54a77ed8ccc2030e88b9a26372d3f76e2c (commit) from c0da87688187c81039cf81790fed3d6ed00a956e (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 36f589b5a84804ce246bcd9a3eb9352801aaf41d Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Mar 2 16:34:07 2018 +0200
linux-gen: sched: optimize local variable layout
Pack local variables struct and bring commonly used variables into the head of the struct.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index b3c913bb..cd20b39d 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -114,14 +114,20 @@ ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), /* Scheduler local data */ typedef struct { int thr; - int stash_num; - int stash_index; - int pause; + uint16_t stash_num; + uint16_t stash_index; + uint16_t pause; uint16_t round; uint32_t stash_qi; odp_queue_t stash_queue; odp_event_t stash_ev[MAX_DEQ];
+ uint32_t grp_epoch; + int num_grp; + uint8_t grp[NUM_SCHED_GRPS]; + uint8_t weight_tbl[WEIGHT_TBL_SIZE]; + uint8_t grp_weight[WEIGHT_TBL_SIZE]; + struct { /* Source queue index */ uint32_t src_queue; @@ -133,12 +139,6 @@ typedef struct { ordered_stash_t stash[MAX_ORDERED_STASH]; } ordered;
- uint32_t grp_epoch; - int num_grp; - uint8_t grp[NUM_SCHED_GRPS]; - uint8_t weight_tbl[WEIGHT_TBL_SIZE]; - uint8_t grp_weight[WEIGHT_TBL_SIZE]; - } sched_local_t;
/* Priority queue */
commit 2bbd7ace9e236d27dcfb52acfef991bd5ce39354 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Mar 2 15:28:12 2018 +0200
linux-gen: sched: use stash prefix
Use consistently stash_ prefix for thread local variables related to event stashing.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index d662bd6a..b3c913bb 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -114,13 +114,14 @@ ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), /* Scheduler local data */ typedef struct { int thr; - int num; - int index; + int stash_num; + int stash_index; int pause; uint16_t round; - uint32_t queue_index; - odp_queue_t queue; - odp_event_t ev_stash[MAX_DEQ]; + uint32_t stash_qi; + odp_queue_t stash_queue; + odp_event_t stash_ev[MAX_DEQ]; + struct { /* Source queue index */ uint32_t src_queue; @@ -228,9 +229,9 @@ static void sched_local_init(void)
memset(&sched_local, 0, sizeof(sched_local_t));
- sched_local.thr = odp_thread_id(); - sched_local.queue = ODP_QUEUE_INVALID; - sched_local.queue_index = PRIO_QUEUE_EMPTY; + sched_local.thr = odp_thread_id(); + sched_local.stash_queue = ODP_QUEUE_INVALID; + sched_local.stash_qi = PRIO_QUEUE_EMPTY; sched_local.ordered.src_queue = NULL_INDEX;
id = sched_local.thr & (QUEUES_PER_PRIO - 1); @@ -364,7 +365,7 @@ static int schedule_init_local(void)
static int schedule_term_local(void) { - if (sched_local.num) { + if (sched_local.stash_num) { ODP_ERR("Locally pre-scheduled events exist.\n"); return -1; } @@ -536,9 +537,9 @@ static void schedule_pktio_start(int pktio_index, int num_pktin,
static void schedule_release_atomic(void) { - uint32_t qi = sched_local.queue_index; + uint32_t qi = sched_local.stash_qi;
- if (qi != PRIO_QUEUE_EMPTY && sched_local.num == 0) { + if (qi != PRIO_QUEUE_EMPTY && sched_local.stash_num == 0) { int grp = sched->queue[qi].grp; int prio = sched->queue[qi].prio; int queue_per_prio = sched->queue[qi].queue_per_prio; @@ -546,7 +547,7 @@ static void schedule_release_atomic(void)
/* Release current atomic queue */ ring_enq(ring, RING_MASK, qi); - sched_local.queue_index = PRIO_QUEUE_EMPTY; + sched_local.stash_qi = PRIO_QUEUE_EMPTY; } }
@@ -634,7 +635,7 @@ static void schedule_release_ordered(void)
queue_index = sched_local.ordered.src_queue;
- if (odp_unlikely((queue_index == NULL_INDEX) || sched_local.num)) + if (odp_unlikely((queue_index == NULL_INDEX) || sched_local.stash_num)) return;
release_ordered(); @@ -648,14 +649,14 @@ static inline void schedule_release_context(void) schedule_release_atomic(); }
-static inline int copy_events(odp_event_t out_ev[], unsigned int max) +static inline int copy_from_stash(odp_event_t out_ev[], unsigned int max) { int i = 0;
- while (sched_local.num && max) { - out_ev[i] = sched_local.ev_stash[sched_local.index]; - sched_local.index++; - sched_local.num--; + while (sched_local.stash_num && max) { + out_ev[i] = sched_local.stash_ev[sched_local.stash_index]; + sched_local.stash_index++; + sched_local.stash_num--; max--; i++; } @@ -743,7 +744,7 @@ static inline int poll_pktin(uint32_t qi, int stash)
if (stash) { for (i = 0; i < num; i++) - sched_local.ev_stash[i] = event_from_buf_hdr(b_hdr[i]); + sched_local.stash_ev[i] = event_from_buf_hdr(b_hdr[i]);
return num; } @@ -829,7 +830,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
pktin = queue_is_pktin(qi);
- num = sched_cb_queue_deq_multi(qi, sched_local.ev_stash, + num = sched_cb_queue_deq_multi(qi, sched_local.stash_ev, max_deq, !pktin);
if (num < 0) { @@ -883,17 +884,17 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
} else if (queue_is_atomic(qi)) { /* Hold queue during atomic access */ - sched_local.queue_index = qi; + sched_local.stash_qi = qi; } else { /* Continue scheduling the queue */ ring_enq(ring, RING_MASK, qi); }
- handle = queue_from_index(qi); - sched_local.num = num; - sched_local.index = 0; - sched_local.queue = handle; - ret = copy_events(out_ev, max_num); + handle = queue_from_index(qi); + sched_local.stash_num = num; + sched_local.stash_index = 0; + sched_local.stash_queue = handle; + ret = copy_from_stash(out_ev, max_num);
/* Output the source queue handle */ if (out_queue) @@ -918,11 +919,11 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], uint16_t round; uint32_t epoch;
- if (sched_local.num) { - ret = copy_events(out_ev, max_num); + if (sched_local.stash_num) { + ret = copy_from_stash(out_ev, max_num);
if (out_queue) - *out_queue = sched_local.queue; + *out_queue = sched_local.stash_queue;
return ret; }
commit 196d8f518071cc8c37dde0835cb7559d6cdd55da Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Mar 2 14:27:22 2018 +0200
linux-gen: sched: optimize parallel packet input queue throughput
Extend direct packet input processing to parallel queues. Parallel queues do not quarantee ordering, so also those can pass packets directly to application and (potentially) stash some per thread.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 3ea261fa..d662bd6a 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -846,19 +846,19 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], * priorities. Stop scheduling queue when pktio * has been stopped. */ if (pktin) { - int atomic = queue_is_atomic(qi); - int num_pkt = poll_pktin(qi, atomic); + int stash = !ordered; + int num_pkt = poll_pktin(qi, stash);
if (odp_unlikely(num_pkt < 0)) continue;
- if (num_pkt == 0 || !atomic) { + if (num_pkt == 0 || !stash) { ring_enq(ring, RING_MASK, qi); break; }
- /* Process packets from an atomic queue - * right away */ + /* Process packets from an atomic or + * parallel queue right away. */ num = num_pkt; } else { /* Remove empty queue from scheduling. @@ -868,12 +868,6 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], } }
- handle = queue_from_index(qi); - sched_local.num = num; - sched_local.index = 0; - sched_local.queue = handle; - ret = copy_events(out_ev, max_num); - if (ordered) { uint64_t ctx; odp_atomic_u64_t *next_ctx; @@ -895,6 +889,12 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], ring_enq(ring, RING_MASK, qi); }
+ handle = queue_from_index(qi); + sched_local.num = num; + sched_local.index = 0; + sched_local.queue = handle; + ret = copy_events(out_ev, max_num); + /* Output the source queue handle */ if (out_queue) *out_queue = handle;
commit 42fc5ebdcd5a1e5fbb7a19a2756a310f23a0cdd5 Author: Petri Savolainen petri.savolainen@linaro.org Date: Thu Mar 1 17:22:21 2018 +0200
linux-gen: queue: enqueue may fail
Drop events when queue enqueue fails. Enqueue failure is more likely now when queue has limited size.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 8b599e96..56e9f686 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -657,8 +657,22 @@ static odp_buffer_hdr_t *pktin_dequeue(queue_t q_int) if (pkts <= 0) return NULL;
- if (pkts > 1) - queue_fn->enq_multi(q_int, &hdr_tbl[1], pkts - 1); + if (pkts > 1) { + int num_enq; + int num = pkts - 1; + + num_enq = queue_fn->enq_multi(q_int, &hdr_tbl[1], num); + + if (odp_unlikely(num_enq < num)) { + if (odp_unlikely(num_enq < 0)) + num_enq = 0; + + ODP_DBG("Interface %s dropped %i packets\n", + entry->s.name, num - num_enq); + buffer_free_multi(&hdr_tbl[num_enq + 1], num - num_enq); + } + } + buf_hdr = hdr_tbl[0]; return buf_hdr; } @@ -695,8 +709,21 @@ static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num) for (j = 0; i < pkts; i++, j++) hdr_tbl[j] = hdr_tbl[i];
- if (j) - queue_fn->enq_multi(q_int, hdr_tbl, j); + if (j) { + int num_enq; + + num_enq = queue_fn->enq_multi(q_int, hdr_tbl, j); + + if (odp_unlikely(num_enq < j)) { + if (odp_unlikely(num_enq < 0)) + num_enq = 0; + + ODP_DBG("Interface %s dropped %i packets\n", + entry->s.name, j - num_enq); + buffer_free_multi(&buf_hdr[num_enq], j - num_enq); + } + } + return nbr; }
@@ -782,6 +809,7 @@ int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[])
for (idx = 0; idx < num_queue; idx++) { queue_t q_int; + int num_enq;
num = pktin_recv_buf(entry, index[idx], hdr_tbl, QUEUE_MULTI_MAX); @@ -795,7 +823,16 @@ int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[]) }
q_int = entry->s.in_queue[index[idx]].queue_int; - queue_fn->enq_multi(q_int, hdr_tbl, num); + num_enq = queue_fn->enq_multi(q_int, hdr_tbl, num); + + if (odp_unlikely(num_enq < num)) { + if (odp_unlikely(num_enq < 0)) + num_enq = 0; + + ODP_DBG("Interface %s dropped %i packets\n", + entry->s.name, num - num_enq); + buffer_free_multi(&hdr_tbl[num_enq], num - num_enq); + } }
return 0; diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index b251bdee..3ea261fa 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -581,13 +581,23 @@ static inline void ordered_stash_release(void) for (i = 0; i < sched_local.ordered.stash_num; i++) { queue_entry_t *queue_entry; odp_buffer_hdr_t **buf_hdr; - int num; + int num, num_enq;
queue_entry = sched_local.ordered.stash[i].queue_entry; buf_hdr = sched_local.ordered.stash[i].buf_hdr; num = sched_local.ordered.stash[i].num;
- queue_fn->enq_multi(qentry_to_int(queue_entry), buf_hdr, num); + num_enq = queue_fn->enq_multi(qentry_to_int(queue_entry), + buf_hdr, num); + + /* Drop packets that were not enqueued */ + if (odp_unlikely(num_enq < num)) { + if (odp_unlikely(num_enq < 0)) + num_enq = 0; + + ODP_DBG("Dropped %i packets\n", num - num_enq); + buffer_free_multi(&buf_hdr[num_enq], num - num_enq); + } } sched_local.ordered.stash_num = 0; } diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c index ddd97bea..40f2e9fc 100644 --- a/platform/linux-generic/odp_schedule_iquery.c +++ b/platform/linux-generic/odp_schedule_iquery.c @@ -1136,13 +1136,22 @@ static inline void ordered_stash_release(void) for (i = 0; i < thread_local.ordered.stash_num; i++) { queue_entry_t *queue_entry; odp_buffer_hdr_t **buf_hdr; - int num; + int num, num_enq;
queue_entry = thread_local.ordered.stash[i].queue_entry; buf_hdr = thread_local.ordered.stash[i].buf_hdr; num = thread_local.ordered.stash[i].num;
- queue_fn->enq_multi(qentry_to_int(queue_entry), buf_hdr, num); + num_enq = queue_fn->enq_multi(qentry_to_int(queue_entry), + buf_hdr, num); + + if (odp_unlikely(num_enq < num)) { + if (odp_unlikely(num_enq < 0)) + num_enq = 0; + + ODP_DBG("Dropped %i packets\n", num - num_enq); + buffer_free_multi(&buf_hdr[num_enq], num - num_enq); + } } thread_local.ordered.stash_num = 0; }
commit bb8015d15ca4b0e2033eeca09906032a88e74bff Author: Petri Savolainen petri.savolainen@linaro.org Date: Thu Mar 1 16:03:39 2018 +0200
linux-gen: sched: optimize atomic packet input queue throughput
When packet input queue is atomic, packets received from packet input are passed directly to the application. Other queue types may have events stashed on other threads, so for those incoming packets are always enqueued (to maintain packet order).
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_buffer_inlines.h b/platform/linux-generic/include/odp_buffer_inlines.h index 9abe79fc..9e3b6ef7 100644 --- a/platform/linux-generic/include/odp_buffer_inlines.h +++ b/platform/linux-generic/include/odp_buffer_inlines.h @@ -28,6 +28,11 @@ static inline odp_buffer_t buf_from_buf_hdr(odp_buffer_hdr_t *hdr) return (odp_buffer_t)hdr; }
+static inline odp_event_t event_from_buf_hdr(odp_buffer_hdr_t *hdr) +{ + return (odp_event_t)hdr; +} + #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 0540bf72..3aec3fe9 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -30,6 +30,7 @@ extern "C" { #include <odp/api/ticketlock.h> #include <odp_config_internal.h> #include <odp_ring_st_internal.h> +#include <odp_queue_lf.h>
#define QUEUE_STATUS_FREE 0 #define QUEUE_STATUS_DESTROYED 1 @@ -62,6 +63,27 @@ union queue_entry_u { uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))]; };
+typedef struct ODP_ALIGNED_CACHE { + /* Storage space for ring data */ + uint32_t data[CONFIG_QUEUE_SIZE]; +} queue_ring_data_t; + +typedef struct queue_global_t { + queue_entry_t queue[ODP_CONFIG_QUEUES]; + queue_ring_data_t ring_data[ODP_CONFIG_QUEUES]; + uint32_t queue_lf_num; + uint32_t queue_lf_size; + queue_lf_func_t queue_lf_func; + +} queue_global_t; + +extern queue_global_t *queue_glb; + +static inline queue_t queue_index_to_qint(uint32_t queue_id) +{ + return (queue_t)&queue_glb->queue[queue_id]; +} + static inline uint32_t queue_to_index(odp_queue_t handle) { return _odp_typeval(handle) - 1; diff --git a/platform/linux-generic/include/odp_queue_lf.h b/platform/linux-generic/include/odp_queue_lf.h index ef178e07..8a973a85 100644 --- a/platform/linux-generic/include/odp_queue_lf.h +++ b/platform/linux-generic/include/odp_queue_lf.h @@ -12,7 +12,6 @@ extern "C" { #endif
#include <odp_queue_if.h> -#include <odp_queue_internal.h>
/* Lock-free queue functions */ typedef struct { diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index dd2097bf..4bd127a4 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -77,7 +77,9 @@ typedef struct schedule_fn_t { extern const schedule_fn_t *sched_fn;
/* Interface for the scheduler */ -int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]); +int sched_cb_pktin_poll(int pktio_index, int pktin_index, + odp_buffer_hdr_t *hdr_tbl[], int num); +int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[]); int sched_cb_pktin_poll_one(int pktio_index, int rx_queue, odp_event_t evts[]); void sched_cb_pktio_stop_finalize(int pktio_index); odp_queue_t sched_cb_queue_handle(uint32_t queue_index); diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index f8792b66..8b599e96 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -578,7 +578,7 @@ odp_pktio_t odp_pktio_lookup(const char *name) return hdl; }
-static inline int pktin_recv_buf(odp_pktin_queue_t queue, +static inline int pktin_recv_buf(pktio_entry_t *entry, int pktin_index, odp_buffer_hdr_t *buffer_hdrs[], int num) { odp_packet_t pkt; @@ -589,7 +589,7 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue, int pkts; int num_rx = 0;
- pkts = odp_pktin_recv(queue, packets, num); + pkts = entry->s.ops->recv(entry, pktin_index, packets, num);
for (i = 0; i < pkts; i++) { pkt = packets[i]; @@ -643,13 +643,16 @@ static odp_buffer_hdr_t *pktin_dequeue(queue_t q_int) odp_buffer_hdr_t *buf_hdr; odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX]; int pkts; + odp_pktin_queue_t pktin_queue = queue_fn->get_pktin(q_int); + odp_pktio_t pktio = pktin_queue.pktio; + int pktin_index = pktin_queue.index; + pktio_entry_t *entry = get_pktio_entry(pktio);
buf_hdr = queue_fn->deq(q_int); if (buf_hdr != NULL) return buf_hdr;
- pkts = pktin_recv_buf(queue_fn->get_pktin(q_int), - hdr_tbl, QUEUE_MULTI_MAX); + pkts = pktin_recv_buf(entry, pktin_index, hdr_tbl, QUEUE_MULTI_MAX);
if (pkts <= 0) return NULL; @@ -665,6 +668,10 @@ static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num) int nbr; odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX]; int pkts, i, j; + odp_pktin_queue_t pktin_queue = queue_fn->get_pktin(q_int); + odp_pktio_t pktio = pktin_queue.pktio; + int pktin_index = pktin_queue.index; + pktio_entry_t *entry = get_pktio_entry(pktio);
nbr = queue_fn->deq_multi(q_int, buf_hdr, num); if (odp_unlikely(nbr > num)) @@ -676,8 +683,8 @@ static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num) if (nbr == num) return nbr;
- pkts = pktin_recv_buf(queue_fn->get_pktin(q_int), - hdr_tbl, QUEUE_MULTI_MAX); + pkts = pktin_recv_buf(entry, pktin_index, hdr_tbl, QUEUE_MULTI_MAX); + if (pkts <= 0) return nbr;
@@ -739,12 +746,29 @@ int sched_cb_pktin_poll_one(int pktio_index, return num_rx; }
-int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]) +int sched_cb_pktin_poll(int pktio_index, int pktin_index, + odp_buffer_hdr_t *hdr_tbl[], int num) +{ + pktio_entry_t *entry = pktio_entry_by_index(pktio_index); + int state = entry->s.state; + + if (odp_unlikely(state != PKTIO_STATE_STARTED)) { + if (state < PKTIO_STATE_ACTIVE || + state == PKTIO_STATE_STOP_PENDING) + return -1; + + ODP_DBG("Interface %s not started\n", entry->s.name); + return 0; + } + + return pktin_recv_buf(entry, pktin_index, hdr_tbl, num); +} + +int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[]) { odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX]; int num, idx; - pktio_entry_t *entry; - entry = pktio_entry_by_index(pktio_index); + pktio_entry_t *entry = pktio_entry_by_index(pktio_index); int state = entry->s.state;
if (odp_unlikely(state != PKTIO_STATE_STARTED)) { @@ -758,9 +782,9 @@ int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
for (idx = 0; idx < num_queue; idx++) { queue_t q_int; - odp_pktin_queue_t pktin = entry->s.in_queue[index[idx]].pktin;
- num = pktin_recv_buf(pktin, hdr_tbl, QUEUE_MULTI_MAX); + num = pktin_recv_buf(entry, index[idx], hdr_tbl, + QUEUE_MULTI_MAX);
if (num == 0) continue; diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index 4371fd90..1218987b 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -8,7 +8,6 @@
#include <odp/api/queue.h> #include <odp_queue_internal.h> -#include <odp_queue_lf.h> #include <odp_queue_if.h> #include <odp/api/std_types.h> #include <odp/api/align.h> @@ -40,21 +39,7 @@ static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param);
-typedef struct ODP_ALIGNED_CACHE { - /* Storage space for ring data */ - uint32_t data[CONFIG_QUEUE_SIZE]; -} ring_data_t; - -typedef struct queue_global_t { - queue_entry_t queue[ODP_CONFIG_QUEUES]; - ring_data_t ring_data[ODP_CONFIG_QUEUES]; - uint32_t queue_lf_num; - uint32_t queue_lf_size; - queue_lf_func_t queue_lf_func; - -} queue_global_t; - -static queue_global_t *queue_glb; +queue_global_t *queue_glb;
static inline queue_entry_t *get_qentry(uint32_t queue_id) { diff --git a/platform/linux-generic/odp_queue_lf.c b/platform/linux-generic/odp_queue_lf.c index 74f52946..066e6a67 100644 --- a/platform/linux-generic/odp_queue_lf.c +++ b/platform/linux-generic/odp_queue_lf.c @@ -7,7 +7,7 @@ #include <odp/api/queue.h> #include <odp/api/atomic.h> #include <odp/api/shared_memory.h> -#include <odp_queue_lf.h> +#include <odp_queue_internal.h> #include <string.h> #include <stdio.h>
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index b3847ab9..b251bdee 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -26,6 +26,7 @@ #include <odp_ring_internal.h> #include <odp_timer_internal.h> #include <odp_queue_internal.h> +#include <odp_buffer_inlines.h>
/* Number of priority levels */ #define NUM_PRIO 8 @@ -699,14 +700,20 @@ static inline int queue_is_pktin(uint32_t queue_index) return sched->queue[queue_index].poll_pktin; }
-static inline int poll_pktin(uint32_t qi) +static inline int poll_pktin(uint32_t qi, int stash) { - int pktio_index, pktin_index, num, num_pktin; + odp_buffer_hdr_t *b_hdr[MAX_DEQ]; + int pktio_index, pktin_index, num, num_pktin, i; + int ret; + queue_t qint;
pktio_index = sched->queue[qi].pktio_index; pktin_index = sched->queue[qi].pktin_index;
- num = sched_cb_pktin_poll(pktio_index, 1, &pktin_index); + num = sched_cb_pktin_poll(pktio_index, pktin_index, b_hdr, MAX_DEQ); + + if (num == 0) + return 0;
/* Pktio stopped or closed. Call stop_finalize when we have stopped * polling all pktin queues of the pktio. */ @@ -720,9 +727,33 @@ static inline int poll_pktin(uint32_t qi)
if (num_pktin == 0) sched_cb_pktio_stop_finalize(pktio_index); + + return num; }
- return num; + if (stash) { + for (i = 0; i < num; i++) + sched_local.ev_stash[i] = event_from_buf_hdr(b_hdr[i]); + + return num; + } + + qint = queue_index_to_qint(qi); + + ret = queue_fn->enq_multi(qint, b_hdr, num); + + /* Drop packets that were not enqueued */ + if (odp_unlikely(ret < num)) { + int num_enq = ret; + + if (odp_unlikely(ret < 0)) + num_enq = 0; + + ODP_DBG("Dropped %i packets\n", num - num_enq); + buffer_free_multi(&b_hdr[num_enq], num - num_enq); + } + + return ret; }
static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], @@ -805,17 +836,26 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], * priorities. Stop scheduling queue when pktio * has been stopped. */ if (pktin) { - int num_pkt = poll_pktin(qi); + int atomic = queue_is_atomic(qi); + int num_pkt = poll_pktin(qi, atomic);
- if (odp_likely(num_pkt >= 0)) { + if (odp_unlikely(num_pkt < 0)) + continue; + + if (num_pkt == 0 || !atomic) { ring_enq(ring, RING_MASK, qi); break; } - }
- /* Remove empty queue from scheduling. Continue - * scheduling the same priority queue. */ - continue; + /* Process packets from an atomic queue + * right away */ + num = num_pkt; + } else { + /* Remove empty queue from scheduling. + * Continue scheduling the same priority + * queue. */ + continue; + } }
handle = queue_from_index(qi); diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c index 1a82f48d..ddd97bea 100644 --- a/platform/linux-generic/odp_schedule_iquery.c +++ b/platform/linux-generic/odp_schedule_iquery.c @@ -674,9 +674,9 @@ static inline void pktio_poll_input(void) cmd = &sched->pktio_poll.commands[index];
/* Poll packet input */ - if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio, - cmd->count, - cmd->pktin))) { + if (odp_unlikely(sched_cb_pktin_poll_old(cmd->pktio, + cmd->count, + cmd->pktin))) { /* Pktio stopped or closed. Remove poll * command and call stop_finalize when all * commands of the pktio has been removed. diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 50390274..84d16d3c 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -524,8 +524,9 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait, cmd = sched_cmd();
if (cmd && cmd->s.type == CMD_PKTIO) { - if (sched_cb_pktin_poll(cmd->s.index, cmd->s.num_pktin, - cmd->s.pktin_idx)) { + if (sched_cb_pktin_poll_old(cmd->s.index, + cmd->s.num_pktin, + cmd->s.pktin_idx)) { /* Pktio stopped or closed. */ sched_cb_pktio_stop_finalize(cmd->s.index); } else {
commit 3e61be54a77ed8ccc2030e88b9a26372d3f76e2c Author: Petri Savolainen petri.savolainen@linaro.org Date: Tue Feb 27 16:27:39 2018 +0200
linux-gen: sched: optimize packet input polling
Optimize scheduler throughput with packet IO interfaces. Special pktio poll commands are removed and event queue is used instead to trigger packet input polling. Packet input is polled when those queues are empty. Thus, these queues connected to packet input are not removed from scheduling when empty.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 66e05043..dd2097bf 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -82,7 +82,9 @@ int sched_cb_pktin_poll_one(int pktio_index, int rx_queue, odp_event_t evts[]); void sched_cb_pktio_stop_finalize(int pktio_index); odp_queue_t sched_cb_queue_handle(uint32_t queue_index); void sched_cb_queue_destroy_finalize(uint32_t queue_index); -int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num); +void sched_cb_queue_set_status(uint32_t queue_index, int status); +int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num, + int update_status); int sched_cb_queue_empty(uint32_t queue_index);
/* API functions */ diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index f87c2876..4371fd90 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -309,6 +309,17 @@ void sched_cb_queue_destroy_finalize(uint32_t queue_index) UNLOCK(queue); }
+void sched_cb_queue_set_status(uint32_t queue_index, int status) +{ + queue_entry_t *queue = get_qentry(queue_index); + + LOCK(queue); + + queue->s.status = status; + + UNLOCK(queue); +} + static int queue_destroy(odp_queue_t handle) { queue_entry_t *queue; @@ -503,7 +514,7 @@ static int queue_enq(odp_queue_t handle, odp_event_t ev) }
static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num) + int num, int update_status) { int status_sync = sched_fn->status_sync; int num_deq; @@ -525,7 +536,7 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
if (num_deq == 0) { /* Already empty queue */ - if (queue->s.status == QUEUE_STATUS_SCHED) { + if (update_status && queue->s.status == QUEUE_STATUS_SCHED) { queue->s.status = QUEUE_STATUS_NOTSCHED;
if (status_sync) @@ -552,7 +563,7 @@ static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], { queue_entry_t *queue = qentry_from_int(q_int);
- return deq_multi(queue, buf_hdr, num); + return deq_multi(queue, buf_hdr, num, 0); }
static odp_buffer_hdr_t *queue_int_deq(queue_t q_int) @@ -561,7 +572,7 @@ static odp_buffer_hdr_t *queue_int_deq(queue_t q_int) odp_buffer_hdr_t *buf_hdr = NULL; int ret;
- ret = deq_multi(queue, &buf_hdr, 1); + ret = deq_multi(queue, &buf_hdr, 1, 0);
if (ret == 1) return buf_hdr; @@ -671,11 +682,12 @@ static int queue_info(odp_queue_t handle, odp_queue_info_t *info) return 0; }
-int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num) +int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num, + int update_status) { queue_entry_t *qe = get_qentry(queue_index);
- return deq_multi(qe, (odp_buffer_hdr_t **)ev, num); + return deq_multi(qe, (odp_buffer_hdr_t **)ev, num, update_status); }
int sched_cb_queue_empty(uint32_t queue_index) diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index e5e1fe60..b3847ab9 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -50,39 +50,15 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Size of poll weight table */ #define WEIGHT_TBL_SIZE ((QUEUES_PER_PRIO - 1) * PREFER_RATIO)
-/* Packet input poll cmd queues */ -#define PKTIO_CMD_QUEUES 4 - -/* Mask for wrapping command queues */ -#define PKTIO_CMD_QUEUE_MASK (PKTIO_CMD_QUEUES - 1) - -/* Maximum number of packet input queues per command */ -#define MAX_PKTIN 16 - /* Maximum number of packet IO interfaces */ #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
-/* Maximum number of pktio poll commands */ -#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO) +/* Maximum pktin index. Needs to fit into 8 bits. */ +#define MAX_PKTIN_INDEX 255
/* Not a valid index */ #define NULL_INDEX ((uint32_t)-1)
-/* Not a valid poll command */ -#define PKTIO_CMD_INVALID NULL_INDEX - -/* Pktio command is free */ -#define PKTIO_CMD_FREE PKTIO_CMD_INVALID - -/* Packet IO poll queue ring size. In worst case, all pktios have all pktins - * enabled and one poll command is created per pktin queue. The ring size must - * be larger than or equal to NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can - * hold all poll commands in the worst case. */ -#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES) - -/* Mask for wrapping around pktio poll command index */ -#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1) - /* Priority queue ring size. In worst case, all event queues are scheduled * queues and have the same priority. The ring size must be larger than or * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all @@ -90,7 +66,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && #define PRIO_QUEUE_RING_SIZE (ODP_CONFIG_QUEUES / QUEUES_PER_PRIO)
/* Mask for wrapping around priority queue index */ -#define PRIO_QUEUE_MASK (PRIO_QUEUE_RING_SIZE - 1) +#define RING_MASK (PRIO_QUEUE_RING_SIZE - 1)
/* Priority queue empty, not a valid queue index. */ #define PRIO_QUEUE_EMPTY NULL_INDEX @@ -103,15 +79,6 @@ ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES), ODP_STATIC_ASSERT(CHECK_IS_POWER2(PRIO_QUEUE_RING_SIZE), "Ring_size_is_not_power_of_two");
-/* Ring size must be power of two, so that PKTIO_RING_MASK can be used. */ -ODP_STATIC_ASSERT(CHECK_IS_POWER2(PKTIO_RING_SIZE), - "pktio_ring_size_is_not_power_of_two"); - -/* Number of commands queues must be power of two, so that PKTIO_CMD_QUEUE_MASK - * can be used. */ -ODP_STATIC_ASSERT(CHECK_IS_POWER2(PKTIO_CMD_QUEUES), - "pktio_cmd_queues_is_not_power_of_two"); - /* Mask of queues per priority */ typedef uint8_t pri_mask_t;
@@ -150,7 +117,6 @@ typedef struct { int index; int pause; uint16_t round; - uint16_t pktin_polls; uint32_t queue_index; odp_queue_t queue; odp_event_t ev_stash[MAX_DEQ]; @@ -183,24 +149,6 @@ typedef struct ODP_ALIGNED_CACHE {
} prio_queue_t;
-/* Packet IO queue */ -typedef struct ODP_ALIGNED_CACHE { - /* Ring header */ - ring_t ring; - - /* Ring data: pktio poll command indexes */ - uint32_t cmd_index[PKTIO_RING_SIZE]; - -} pktio_queue_t; - -/* Packet IO poll command */ -typedef struct { - int pktio_index; - int num_pktin; - int pktin[MAX_PKTIN]; - uint32_t cmd_index; -} pktio_cmd_t; - /* Order context of a queue */ typedef struct ODP_ALIGNED_CACHE { /* Current ordered context id */ @@ -220,16 +168,6 @@ typedef struct {
prio_queue_t prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];
- odp_spinlock_t poll_cmd_lock; - /* Number of commands in a command queue */ - uint16_t num_pktio_cmd[PKTIO_CMD_QUEUES]; - - /* Packet IO command queues */ - pktio_queue_t pktio_q[PKTIO_CMD_QUEUES]; - - /* Packet IO poll commands */ - pktio_cmd_t pktio_cmd[NUM_PKTIO_CMD]; - odp_shm_t shm; uint32_t pri_count[NUM_PRIO][QUEUES_PER_PRIO];
@@ -244,22 +182,34 @@ typedef struct { } sched_grp[NUM_SCHED_GRPS];
struct { - int grp; - int prio; - int queue_per_prio; - int sync; - uint32_t order_lock_count; + uint8_t grp; + uint8_t prio; + uint8_t queue_per_prio; + uint8_t sync; + uint8_t order_lock_count; + uint8_t poll_pktin; + uint8_t pktio_index; + uint8_t pktin_index; } queue[ODP_CONFIG_QUEUES];
struct { - /* Number of active commands for a pktio interface */ - int num_cmd; + int num_pktin; } pktio[NUM_PKTIO]; + odp_spinlock_t pktio_lock;
order_context_t order[ODP_CONFIG_QUEUES];
} sched_global_t;
+/* Check that queue[] variables are large enough */ +ODP_STATIC_ASSERT(NUM_SCHED_GRPS <= 256, "Group_does_not_fit_8_bits"); +ODP_STATIC_ASSERT(NUM_PRIO <= 256, "Prio_does_not_fit_8_bits"); +ODP_STATIC_ASSERT(QUEUES_PER_PRIO <= 256, + "Queues_per_prio_does_not_fit_8_bits"); +ODP_STATIC_ASSERT(CONFIG_QUEUE_MAX_ORD_LOCKS <= 256, + "Ordered_lock_count_does_not_fit_8_bits"); +ODP_STATIC_ASSERT(NUM_PKTIO <= 256, "Pktio_index_does_not_fit_8_bits"); + /* Global scheduler context */ static sched_global_t *sched;
@@ -337,16 +287,9 @@ static int schedule_init_global(void) } }
- odp_spinlock_init(&sched->poll_cmd_lock); - for (i = 0; i < PKTIO_CMD_QUEUES; i++) { - ring_init(&sched->pktio_q[i].ring); - - for (j = 0; j < PKTIO_RING_SIZE; j++) - sched->pktio_q[i].cmd_index[j] = PKTIO_CMD_INVALID; - } - - for (i = 0; i < NUM_PKTIO_CMD; i++) - sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE; + odp_spinlock_init(&sched->pktio_lock); + for (i = 0; i < NUM_PKTIO; i++) + sched->pktio[i].num_pktin = 0;
odp_spinlock_init(&sched->grp_lock); odp_atomic_init_u32(&sched->grp_epoch, 0); @@ -384,14 +327,14 @@ static int schedule_term_global(void) ring_t *ring = &sched->prio_q[grp][i][j].ring; uint32_t qi;
- while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) != + while ((qi = ring_deq(ring, RING_MASK)) != RING_EMPTY) { odp_event_t events[1]; int num;
num = sched_cb_queue_deq_multi(qi, events, - 1); + 1, 1);
if (num < 0) queue_destroy_finalize(qi); @@ -519,6 +462,9 @@ static int schedule_init_queue(uint32_t queue_index, sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index); sched->queue[queue_index].sync = sched_param->sync; sched->queue[queue_index].order_lock_count = sched_param->lock_count; + sched->queue[queue_index].poll_pktin = 0; + sched->queue[queue_index].pktio_index = 0; + sched->queue[queue_index].pktin_index = 0;
odp_atomic_init_u64(&sched->order[queue_index].ctx, 0); odp_atomic_init_u64(&sched->order[queue_index].next_ctx, 0); @@ -554,88 +500,39 @@ static void schedule_destroy_queue(uint32_t queue_index) ODP_ERR("queue reorder incomplete\n"); }
-static int poll_cmd_queue_idx(int pktio_index, int pktin_idx) -{ - return PKTIO_CMD_QUEUE_MASK & (pktio_index ^ pktin_idx); -} - -static inline pktio_cmd_t *alloc_pktio_cmd(void) -{ - int i; - pktio_cmd_t *cmd = NULL; - - odp_spinlock_lock(&sched->poll_cmd_lock); - - /* Find next free command */ - for (i = 0; i < NUM_PKTIO_CMD; i++) { - if (sched->pktio_cmd[i].cmd_index == PKTIO_CMD_FREE) { - cmd = &sched->pktio_cmd[i]; - cmd->cmd_index = i; - break; - } - } - - odp_spinlock_unlock(&sched->poll_cmd_lock); - - return cmd; -} - -static inline void free_pktio_cmd(pktio_cmd_t *cmd) +static int schedule_sched_queue(uint32_t queue_index) { - odp_spinlock_lock(&sched->poll_cmd_lock); - - cmd->cmd_index = PKTIO_CMD_FREE; + int grp = sched->queue[queue_index].grp; + int prio = sched->queue[queue_index].prio; + int queue_per_prio = sched->queue[queue_index].queue_per_prio; + ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
- odp_spinlock_unlock(&sched->poll_cmd_lock); + ring_enq(ring, RING_MASK, queue_index); + return 0; }
static void schedule_pktio_start(int pktio_index, int num_pktin, - int pktin_idx[], odp_queue_t odpq[] ODP_UNUSED) + int pktin_idx[], odp_queue_t queue[]) { - int i, idx; - pktio_cmd_t *cmd; - - if (num_pktin > MAX_PKTIN) - ODP_ABORT("Too many input queues for scheduler\n"); + int i; + uint32_t qi;
- sched->pktio[pktio_index].num_cmd = num_pktin; + sched->pktio[pktio_index].num_pktin = num_pktin;
- /* Create a pktio poll command per queue */ for (i = 0; i < num_pktin; i++) { + qi = queue_to_index(queue[i]); + sched->queue[qi].poll_pktin = 1; + sched->queue[qi].pktio_index = pktio_index; + sched->queue[qi].pktin_index = pktin_idx[i];
- cmd = alloc_pktio_cmd(); - - if (cmd == NULL) - ODP_ABORT("Scheduler out of pktio commands\n"); - - idx = poll_cmd_queue_idx(pktio_index, pktin_idx[i]); + ODP_ASSERT(pktin_idx[i] <= MAX_PKTIN_INDEX);
- odp_spinlock_lock(&sched->poll_cmd_lock); - sched->num_pktio_cmd[idx]++; - odp_spinlock_unlock(&sched->poll_cmd_lock); - - cmd->pktio_index = pktio_index; - cmd->num_pktin = 1; - cmd->pktin[0] = pktin_idx[i]; - ring_enq(&sched->pktio_q[idx].ring, PKTIO_RING_MASK, - cmd->cmd_index); + /* Start polling */ + sched_cb_queue_set_status(qi, QUEUE_STATUS_SCHED); + schedule_sched_queue(qi); } }
-static int schedule_pktio_stop(int pktio_index, int first_pktin) -{ - int num; - int idx = poll_cmd_queue_idx(pktio_index, first_pktin); - - odp_spinlock_lock(&sched->poll_cmd_lock); - sched->num_pktio_cmd[idx]--; - sched->pktio[pktio_index].num_cmd--; - num = sched->pktio[pktio_index].num_cmd; - odp_spinlock_unlock(&sched->poll_cmd_lock); - - return num; -} - static void schedule_release_atomic(void) { uint32_t qi = sched_local.queue_index; @@ -647,7 +544,7 @@ static void schedule_release_atomic(void) ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
/* Release current atomic queue */ - ring_enq(ring, PRIO_QUEUE_MASK, qi); + ring_enq(ring, RING_MASK, qi); sched_local.queue_index = PRIO_QUEUE_EMPTY; } } @@ -797,6 +694,37 @@ static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[], return 1; }
+static inline int queue_is_pktin(uint32_t queue_index) +{ + return sched->queue[queue_index].poll_pktin; +} + +static inline int poll_pktin(uint32_t qi) +{ + int pktio_index, pktin_index, num, num_pktin; + + pktio_index = sched->queue[qi].pktio_index; + pktin_index = sched->queue[qi].pktin_index; + + num = sched_cb_pktin_poll(pktio_index, 1, &pktin_index); + + /* Pktio stopped or closed. Call stop_finalize when we have stopped + * polling all pktin queues of the pktio. */ + if (odp_unlikely(num < 0)) { + odp_spinlock_lock(&sched->pktio_lock); + sched->pktio[pktio_index].num_pktin--; + num_pktin = sched->pktio[pktio_index].num_pktin; + odp_spinlock_unlock(&sched->pktio_lock); + + sched_cb_queue_set_status(qi, QUEUE_STATUS_NOTSCHED); + + if (num_pktin == 0) + sched_cb_pktio_stop_finalize(pktio_index); + } + + return num; +} + static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], unsigned int max_num, int grp, int first) { @@ -820,6 +748,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], int ordered; odp_queue_t handle; ring_t *ring; + int pktin;
if (id >= QUEUES_PER_PRIO) id = 0; @@ -834,7 +763,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
/* Get queue index from the priority queue */ ring = &sched->prio_q[grp][prio][id].ring; - qi = ring_deq(ring, PRIO_QUEUE_MASK); + qi = ring_deq(ring, RING_MASK);
/* Priority queue empty */ if (qi == RING_EMPTY) { @@ -857,8 +786,10 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], if (ordered && max_num < MAX_DEQ) max_deq = max_num;
+ pktin = queue_is_pktin(qi); + num = sched_cb_queue_deq_multi(qi, sched_local.ev_stash, - max_deq); + max_deq, !pktin);
if (num < 0) { /* Destroyed queue. Continue scheduling the same @@ -868,6 +799,20 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], }
if (num == 0) { + /* Poll packet input. Continue scheduling queue + * connected to a packet input. Move to the next + * priority to avoid starvation of other + * priorities. Stop scheduling queue when pktio + * has been stopped. */ + if (pktin) { + int num_pkt = poll_pktin(qi); + + if (odp_likely(num_pkt >= 0)) { + ring_enq(ring, RING_MASK, qi); + break; + } + } + /* Remove empty queue from scheduling. Continue * scheduling the same priority queue. */ continue; @@ -890,14 +835,14 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], sched_local.ordered.src_queue = qi;
/* Continue scheduling ordered queues */ - ring_enq(ring, PRIO_QUEUE_MASK, qi); + ring_enq(ring, RING_MASK, qi);
} else if (queue_is_atomic(qi)) { /* Hold queue during atomic access */ sched_local.queue_index = qi; } else { /* Continue scheduling the queue */ - ring_enq(ring, PRIO_QUEUE_MASK, qi); + ring_enq(ring, RING_MASK, qi); }
/* Output the source queue handle */ @@ -919,7 +864,7 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], { int i, num_grp; int ret; - int id, first, grp_id; + int first, grp_id; uint16_t round; uint32_t epoch;
@@ -972,66 +917,11 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], grp_id = 0; }
- /* - * Poll packet input when there are no events - * * Each thread starts the search for a poll command from its - * preferred command queue. If the queue is empty, it moves to other - * queues. - * * Most of the times, the search stops on the first command found to - * optimize multi-threaded performance. A small portion of polls - * have to do full iteration to avoid packet input starvation when - * there are less threads than command queues. - */ - id = sched_local.thr & PKTIO_CMD_QUEUE_MASK; - - for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) & - PKTIO_CMD_QUEUE_MASK)) { - ring_t *ring; - uint32_t cmd_index; - pktio_cmd_t *cmd; - - if (odp_unlikely(sched->num_pktio_cmd[id] == 0)) - continue; - - ring = &sched->pktio_q[id].ring; - cmd_index = ring_deq(ring, PKTIO_RING_MASK); - - if (odp_unlikely(cmd_index == RING_EMPTY)) - continue; - - cmd = &sched->pktio_cmd[cmd_index]; - - /* Poll packet input */ - if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio_index, - cmd->num_pktin, - cmd->pktin))){ - /* Pktio stopped or closed. Remove poll command and call - * stop_finalize when all commands of the pktio has - * been removed. */ - if (schedule_pktio_stop(cmd->pktio_index, - cmd->pktin[0]) == 0) - sched_cb_pktio_stop_finalize(cmd->pktio_index); - - free_pktio_cmd(cmd); - } else { - /* Continue scheduling the pktio */ - ring_enq(ring, PKTIO_RING_MASK, cmd_index); - - /* Do not iterate through all pktin poll command queues - * every time. */ - if (odp_likely(sched_local.pktin_polls & 0xf)) - break; - } - } - - sched_local.pktin_polls++; return 0; }
- -static int schedule_loop(odp_queue_t *out_queue, uint64_t wait, - odp_event_t out_ev[], - unsigned int max_num) +static inline int schedule_loop(odp_queue_t *out_queue, uint64_t wait, + odp_event_t out_ev[], unsigned int max_num) { odp_time_t next, wtime; int first = 1; @@ -1387,17 +1277,6 @@ static void schedule_prefetch(int num ODP_UNUSED) { }
-static int schedule_sched_queue(uint32_t queue_index) -{ - int grp = sched->queue[queue_index].grp; - int prio = sched->queue[queue_index].prio; - int queue_per_prio = sched->queue[queue_index].queue_per_prio; - ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring; - - ring_enq(ring, PRIO_QUEUE_MASK, queue_index); - return 0; -} - static int schedule_num_grps(void) { return NUM_SCHED_GRPS; diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c index ea62c364..1a82f48d 100644 --- a/platform/linux-generic/odp_schedule_iquery.c +++ b/platform/linux-generic/odp_schedule_iquery.c @@ -291,7 +291,7 @@ static int schedule_term_global(void) odp_event_t events[1];
if (sched->availables[i]) - count = sched_cb_queue_deq_multi(i, events, 1); + count = sched_cb_queue_deq_multi(i, events, 1, 1);
if (count < 0) sched_cb_queue_destroy_finalize(i); @@ -1527,7 +1527,7 @@ static inline int consume_queue(int prio, unsigned int queue_index) max = 1;
count = sched_cb_queue_deq_multi( - queue_index, cache->stash, max); + queue_index, cache->stash, max, 1);
if (count < 0) { DO_SCHED_UNLOCK(); diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 007d673f..50390274 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -559,7 +559,7 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait, }
qi = cmd->s.index; - num = sched_cb_queue_deq_multi(qi, events, 1); + num = sched_cb_queue_deq_multi(qi, events, 1, 1);
if (num > 0) { sched_local.cmd = cmd;
-----------------------------------------------------------------------
Summary of changes: .../linux-generic/include/odp_buffer_inlines.h | 5 + .../linux-generic/include/odp_queue_internal.h | 22 + platform/linux-generic/include/odp_queue_lf.h | 1 - platform/linux-generic/include/odp_schedule_if.h | 8 +- platform/linux-generic/odp_packet_io.c | 93 ++++- platform/linux-generic/odp_queue_basic.c | 41 +- platform/linux-generic/odp_queue_lf.c | 2 +- platform/linux-generic/odp_schedule_basic.c | 464 +++++++++------------ platform/linux-generic/odp_schedule_iquery.c | 23 +- platform/linux-generic/odp_schedule_sp.c | 7 +- 10 files changed, 347 insertions(+), 319 deletions(-)
hooks/post-receive