This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via e858d688c3ad3ce0d0b3ea22539cac8e6ec844b7 (commit) via 93718d4ead55bfbaa8e564c24d1f3b76e60235ce (commit) via 582065e74e2375b5c81ac8fcec9eb02f541f42ff (commit) via 5f4f2e0da6e04637b6b4bd7aa6bb4d4d32680525 (commit) via 92336dd2808af4826371d467588dcb81daafe4cf (commit) via 7bb62b522a5f89d6d19a4c77254222b1c07ab44b (commit) from 566492d067083e870548c78a89f8c65b02ecde89 (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit e858d688c3ad3ce0d0b3ea22539cac8e6ec844b7 Author: Petri Savolainen petri.savolainen@nokia.com Date: Thu Sep 15 16:39:24 2016 +0300
linux-gen: config: increase burst sizes
Added main burst size configuration option (CONFIG_BURST_SIZE), which helps to keep various burst sizes in sync. Increased common burst size from 8 to 16. This increases scheduled queue throughput about 30-40%.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 69daf94..1c09cd3 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -103,7 +103,7 @@ typedef union odp_buffer_bits_t { }; } odp_buffer_bits_t;
-#define BUFFER_BURST_SIZE 8 +#define BUFFER_BURST_SIZE CONFIG_BURST_SIZE
/* Common buffer header */ struct odp_buffer_hdr_t { diff --git a/platform/linux-generic/include/odp_config_internal.h b/platform/linux-generic/include/odp_config_internal.h index 989ea08..b7ff610 100644 --- a/platform/linux-generic/include/odp_config_internal.h +++ b/platform/linux-generic/include/odp_config_internal.h @@ -110,6 +110,14 @@ extern "C" { */ #define ODP_CONFIG_SHM_BLOCKS (ODP_CONFIG_POOLS + 48)
+/* + * Maximum event burst size + * + * This controls the burst size on various enqueue, dequeue, etc calls. Large + * burst size improves throughput, but may degrade QoS (increase latency). + */ +#define CONFIG_BURST_SIZE 16 + #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/include/odp_packet_io_queue.h b/platform/linux-generic/include/odp_packet_io_queue.h index 5843f6a..13b79f3 100644 --- a/platform/linux-generic/include/odp_packet_io_queue.h +++ b/platform/linux-generic/include/odp_packet_io_queue.h @@ -20,9 +20,10 @@ extern "C" {
#include <odp_queue_internal.h> #include <odp_buffer_internal.h> +#include <odp_config_internal.h>
/** Max nbr of pkts to receive in one burst (keep same as QUEUE_MULTI_MAX) */ -#define ODP_PKTIN_QUEUE_MAX_BURST 16 +#define ODP_PKTIN_QUEUE_MAX_BURST CONFIG_BURST_SIZE /* pktin_deq_multi() depends on the condition: */ ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX, "ODP_PKTIN_DEQ_MULTI_MAX_ERROR"); diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h index 1b95627..ca59ade 100644 --- a/platform/linux-generic/include/odp_pool_internal.h +++ b/platform/linux-generic/include/odp_pool_internal.h @@ -52,7 +52,7 @@ typedef struct _odp_buffer_pool_init_t { } _odp_buffer_pool_init_t; /**< Type of buffer initialization struct */
#define POOL_MAX_LOCAL_CHUNKS 4 -#define POOL_CHUNK_SIZE 32 +#define POOL_CHUNK_SIZE (4 * CONFIG_BURST_SIZE) #define POOL_MAX_LOCAL_BUFS (POOL_MAX_LOCAL_CHUNKS * POOL_CHUNK_SIZE)
struct local_cache_s { diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index cb7e3f3..e223d9f 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -27,8 +27,9 @@ extern "C" { #include <odp/api/align.h> #include <odp/api/hints.h> #include <odp/api/ticketlock.h> +#include <odp_config_internal.h>
-#define QUEUE_MULTI_MAX 8 +#define QUEUE_MULTI_MAX CONFIG_BURST_SIZE
#define QUEUE_STATUS_FREE 0 #define QUEUE_STATUS_DESTROYED 1 diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h index d6cfdb4..02637c2 100644 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ b/platform/linux-generic/include/odp_schedule_internal.h @@ -12,7 +12,7 @@ extern "C" { #endif
/* Maximum number of dequeues */ -#define MAX_DEQ 8 +#define MAX_DEQ CONFIG_BURST_SIZE
typedef struct { int thr;
commit 93718d4ead55bfbaa8e564c24d1f3b76e60235ce Author: Petri Savolainen petri.savolainen@nokia.com Date: Thu Sep 15 16:39:23 2016 +0300
linux-gen: pool: use inlined ticketlock
Use inlined ticketlock calls instead of API calls.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h index d6717ff..1b95627 100644 --- a/platform/linux-generic/include/odp_pool_internal.h +++ b/platform/linux-generic/include/odp_pool_internal.h @@ -73,20 +73,10 @@ typedef struct local_cache_t { }; } local_cache_t;
-/* Use ticketlock instead of spinlock */ -#define POOL_USE_TICKETLOCK - -#ifdef POOL_USE_TICKETLOCK -#include <odp/api/ticketlock.h> -#define POOL_LOCK(a) odp_ticketlock_lock(a) -#define POOL_UNLOCK(a) odp_ticketlock_unlock(a) +#include <odp/api/plat/ticketlock_inlines.h> +#define POOL_LOCK(a) _odp_ticketlock_lock(a) +#define POOL_UNLOCK(a) _odp_ticketlock_unlock(a) #define POOL_LOCK_INIT(a) odp_ticketlock_init(a) -#else -#include <odp/api/spinlock.h> -#define POOL_LOCK(a) odp_spinlock_lock(a) -#define POOL_UNLOCK(a) odp_spinlock_unlock(a) -#define POOL_LOCK_INIT(a) odp_spinlock_init(a) -#endif
/** * ODP Pool stats - Maintain some useful stats regarding pool utilization @@ -105,15 +95,9 @@ typedef struct { } _odp_pool_stats_t;
struct pool_entry_s { -#ifdef POOL_USE_TICKETLOCK odp_ticketlock_t lock ODP_ALIGNED_CACHE; odp_ticketlock_t buf_lock; odp_ticketlock_t blk_lock; -#else - odp_spinlock_t lock ODP_ALIGNED_CACHE; - odp_spinlock_t buf_lock; - odp_spinlock_t blk_lock; -#endif
char name[ODP_POOL_NAME_LEN]; odp_pool_param_t params;
commit 582065e74e2375b5c81ac8fcec9eb02f541f42ff Author: Petri Savolainen petri.savolainen@nokia.com Date: Thu Sep 15 16:39:22 2016 +0300
linux-gen: queue: use inlined ticketlock
Use inlined ticketlock calls instead of API calls. Inlining improves performance and makes it easier to profile functions which use ticketlocks. Cycle consumption of caller functions are more interesting than combined cycle consumption of all ticketlocks.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 830798b..cb7e3f3 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -26,15 +26,7 @@ extern "C" { #include <odp/api/packet_io.h> #include <odp/api/align.h> #include <odp/api/hints.h> - - -#define USE_TICKETLOCK - -#ifdef USE_TICKETLOCK #include <odp/api/ticketlock.h> -#else -#include <odp/api/spinlock.h> -#endif
#define QUEUE_MULTI_MAX 8
@@ -57,11 +49,7 @@ typedef int (*deq_multi_func_t)(union queue_entry_u *, odp_buffer_hdr_t **, int);
struct queue_entry_s { -#ifdef USE_TICKETLOCK odp_ticketlock_t lock ODP_ALIGNED_CACHE; -#else - odp_spinlock_t lock ODP_ALIGNED_CACHE; -#endif
odp_buffer_hdr_t *head; odp_buffer_hdr_t *tail; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 5b962e9..8667076 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -27,19 +27,10 @@
#define NUM_INTERNAL_QUEUES 64
-#ifdef USE_TICKETLOCK -#include <odp/api/ticketlock.h> -#define LOCK(a) odp_ticketlock_lock(a) -#define UNLOCK(a) odp_ticketlock_unlock(a) +#include <odp/api/plat/ticketlock_inlines.h> +#define LOCK(a) _odp_ticketlock_lock(a) +#define UNLOCK(a) _odp_ticketlock_unlock(a) #define LOCK_INIT(a) odp_ticketlock_init(a) -#define LOCK_TRY(a) odp_ticketlock_trylock(a) -#else -#include <odp/api/spinlock.h> -#define LOCK(a) odp_spinlock_lock(a) -#define UNLOCK(a) odp_spinlock_unlock(a) -#define LOCK_INIT(a) odp_spinlock_init(a) -#define LOCK_TRY(a) odp_spinlock_trylock(a) -#endif
#include <string.h> #include <inttypes.h>
commit 5f4f2e0da6e04637b6b4bd7aa6bb4d4d32680525 Author: Petri Savolainen petri.savolainen@nokia.com Date: Thu Sep 15 16:39:21 2016 +0300
linux-gen: ticketlock: inline ticketlock implementation
Moved ticketlock implementation into ticketlock_inlines.h header file, which enables inlined version to be used inside implementation and through API.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index e3c0f56..900ac08 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -88,6 +88,7 @@ odpapiplatinclude_HEADERS = \ $(srcdir)/include/odp/api/plat/sync_inlines.h \ $(srcdir)/include/odp/api/plat/thread_types.h \ $(srcdir)/include/odp/api/plat/thrmask_types.h \ + $(srcdir)/include/odp/api/plat/ticketlock_inlines.h \ $(srcdir)/include/odp/api/plat/ticketlock_types.h \ $(srcdir)/include/odp/api/plat/time_types.h \ $(srcdir)/include/odp/api/plat/timer_types.h \ diff --git a/platform/linux-generic/odp_ticketlock.c b/platform/linux-generic/include/odp/api/plat/ticketlock_inlines.h similarity index 83% copy from platform/linux-generic/odp_ticketlock.c copy to platform/linux-generic/include/odp/api/plat/ticketlock_inlines.h index 353af9a..957d22e 100644 --- a/platform/linux-generic/odp_ticketlock.c +++ b/platform/linux-generic/include/odp/api/plat/ticketlock_inlines.h @@ -1,21 +1,24 @@ -/* Copyright (c) 2013, Linaro Limited +/* Copyright (c) 2016, Linaro Limited * All rights reserved. * - * SPDX-License-Identifier: BSD-3-Clause + * SPDX-License-Identifier: BSD-3-Clause */
+/** + * @file + * + * Ticketlock inline functions + */ + +#ifndef _ODP_PLAT_TICKETLOCK_INLINES_H_ +#define _ODP_PLAT_TICKETLOCK_INLINES_H_ + #include <odp/api/ticketlock.h> #include <odp/api/atomic.h> #include <odp/api/sync.h> #include <odp/api/cpu.h>
-void odp_ticketlock_init(odp_ticketlock_t *ticketlock) -{ - odp_atomic_init_u32(&ticketlock->next_ticket, 0); - odp_atomic_init_u32(&ticketlock->cur_ticket, 0); -} - -void odp_ticketlock_lock(odp_ticketlock_t *ticketlock) +static inline void _odp_ticketlock_lock(odp_ticketlock_t *ticketlock) { uint32_t ticket;
@@ -30,7 +33,7 @@ void odp_ticketlock_lock(odp_ticketlock_t *ticketlock) odp_cpu_pause(); }
-int odp_ticketlock_trylock(odp_ticketlock_t *tklock) +static inline int _odp_ticketlock_trylock(odp_ticketlock_t *tklock) { /* We read 'next_ticket' and 'cur_ticket' non-atomically which should * not be a problem as they are not independent of each other. @@ -58,7 +61,7 @@ int odp_ticketlock_trylock(odp_ticketlock_t *tklock) return 0; }
-void odp_ticketlock_unlock(odp_ticketlock_t *ticketlock) +static inline void _odp_ticketlock_unlock(odp_ticketlock_t *ticketlock) { /* Release the lock by incrementing 'cur_ticket'. As we are the * lock owner and thus the only thread that is allowed to write @@ -68,10 +71,9 @@ void odp_ticketlock_unlock(odp_ticketlock_t *ticketlock) uint32_t cur = odp_atomic_load_u32(&ticketlock->cur_ticket);
odp_atomic_store_rel_u32(&ticketlock->cur_ticket, cur + 1); - }
-int odp_ticketlock_is_locked(odp_ticketlock_t *ticketlock) +static inline int _odp_ticketlock_is_locked(odp_ticketlock_t *ticketlock) { /* Compare 'cur_ticket' with 'next_ticket'. Ideally we should read * both variables atomically but the information can become stale @@ -81,3 +83,5 @@ int odp_ticketlock_is_locked(odp_ticketlock_t *ticketlock) return odp_atomic_load_u32(&ticketlock->cur_ticket) != odp_atomic_load_u32(&ticketlock->next_ticket); } + +#endif diff --git a/platform/linux-generic/odp_ticketlock.c b/platform/linux-generic/odp_ticketlock.c index 353af9a..f18d78f 100644 --- a/platform/linux-generic/odp_ticketlock.c +++ b/platform/linux-generic/odp_ticketlock.c @@ -4,10 +4,7 @@ * SPDX-License-Identifier: BSD-3-Clause */
-#include <odp/api/ticketlock.h> -#include <odp/api/atomic.h> -#include <odp/api/sync.h> -#include <odp/api/cpu.h> +#include <odp/api/plat/ticketlock_inlines.h>
void odp_ticketlock_init(odp_ticketlock_t *ticketlock) { @@ -15,69 +12,22 @@ void odp_ticketlock_init(odp_ticketlock_t *ticketlock) odp_atomic_init_u32(&ticketlock->cur_ticket, 0); }
-void odp_ticketlock_lock(odp_ticketlock_t *ticketlock) +void odp_ticketlock_lock(odp_ticketlock_t *lock) { - uint32_t ticket; - - /* Take a ticket using an atomic increment of 'next_ticket'. - * This can be a relaxed operation but it cannot have the - * acquire semantics since we haven't acquired the lock yet */ - ticket = odp_atomic_fetch_inc_u32(&ticketlock->next_ticket); - - /* Spin waiting for our turn. Use load-acquire so that we acquire - * all stores from the previous lock owner */ - while (ticket != odp_atomic_load_acq_u32(&ticketlock->cur_ticket)) - odp_cpu_pause(); + return _odp_ticketlock_lock(lock); }
-int odp_ticketlock_trylock(odp_ticketlock_t *tklock) +int odp_ticketlock_trylock(odp_ticketlock_t *lock) { - /* We read 'next_ticket' and 'cur_ticket' non-atomically which should - * not be a problem as they are not independent of each other. - * 'cur_ticket' is always <= to 'next_ticket' and if we see an - * older value of 'cur_ticket', this only means the lock will - * look busy and trylock will fail. */ - uint32_t next = odp_atomic_load_u32(&tklock->next_ticket); - uint32_t cur = odp_atomic_load_u32(&tklock->cur_ticket); - /* First check that lock is available and possible to take without - * spinning. */ - if (next == cur) { - /* Then try to take the lock by incrementing 'next_ticket' - * but only if it still has the original value which is - * equal to 'cur_ticket'. - * We don't have to include 'cur_ticket' in the comparison - * because it cannot be larger than 'next_ticket' (only - * smaller if the lock is busy). - * If CAS fails, it means some other thread intercepted and - * took a ticket which means the lock is not available - * anymore */ - if (odp_atomic_cas_acq_u32(&tklock->next_ticket, - &next, next + 1)) - return 1; - } - return 0; + return _odp_ticketlock_trylock(lock); }
-void odp_ticketlock_unlock(odp_ticketlock_t *ticketlock) +void odp_ticketlock_unlock(odp_ticketlock_t *lock) { - /* Release the lock by incrementing 'cur_ticket'. As we are the - * lock owner and thus the only thread that is allowed to write - * 'cur_ticket', we don't need to do this with an (expensive) - * atomic RMW operation. Instead load-relaxed the current value - * and a store-release of the incremented value */ - uint32_t cur = odp_atomic_load_u32(&ticketlock->cur_ticket); - - odp_atomic_store_rel_u32(&ticketlock->cur_ticket, cur + 1); - + _odp_ticketlock_unlock(lock); }
-int odp_ticketlock_is_locked(odp_ticketlock_t *ticketlock) +int odp_ticketlock_is_locked(odp_ticketlock_t *lock) { - /* Compare 'cur_ticket' with 'next_ticket'. Ideally we should read - * both variables atomically but the information can become stale - * immediately anyway so the function can only be used reliably in - * a quiescent system where non-atomic loads should not pose a - * problem */ - return odp_atomic_load_u32(&ticketlock->cur_ticket) != - odp_atomic_load_u32(&ticketlock->next_ticket); + return _odp_ticketlock_is_locked(lock); }
commit 92336dd2808af4826371d467588dcb81daafe4cf Author: Petri Savolainen petri.savolainen@nokia.com Date: Thu Sep 15 16:39:20 2016 +0300
linux-gen: queue: burst enq and deq
Added support for a buffer header to carry a burst of buffer pointers. The buffer itself is the last one a burst. Burst are built with a enq_multi call, so single enq operations do not benefit from it.
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 7b0ef8b..69daf94 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -103,6 +103,8 @@ typedef union odp_buffer_bits_t { }; } odp_buffer_bits_t;
+#define BUFFER_BURST_SIZE 8 + /* Common buffer header */ struct odp_buffer_hdr_t { struct odp_buffer_hdr_t *next; /* next buf in a list--keep 1st */ @@ -111,6 +113,11 @@ struct odp_buffer_hdr_t { struct odp_buffer_hdr_t *link; }; odp_buffer_bits_t handle; /* handle */ + + int burst_num; + int burst_first; + struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE]; + union { uint32_t all; struct { diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 80d99e8..5b962e9 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -388,20 +388,48 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], { int sched = 0; int i, ret; - odp_buffer_hdr_t *tail; - - /* Chain input buffers together */ - for (i = 0; i < num - 1; i++) - buf_hdr[i]->next = buf_hdr[i + 1]; - - tail = buf_hdr[num - 1]; - buf_hdr[num - 1]->next = NULL; + odp_buffer_hdr_t *hdr, *tail, *next_hdr;
+ /* Ordered queues do not use bursts */ if (sched_fn->ord_enq_multi(queue->s.index, (void **)buf_hdr, num, sustain, &ret)) return ret;
- /* Handle unordered enqueues */ + /* Optimize the common case of single enqueue */ + if (num == 1) { + tail = buf_hdr[0]; + hdr = tail; + hdr->burst_num = 0; + } else { + int next; + + /* Start from the last buffer header */ + tail = buf_hdr[num - 1]; + hdr = tail; + next = num - 2; + + while (1) { + /* Build a burst. The buffer header carrying + * a burst is the last buffer of the burst. */ + for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE; + i++, next--) + hdr->burst[BUFFER_BURST_SIZE - 1 - i] = + buf_hdr[next]; + + hdr->burst_num = i; + hdr->burst_first = BUFFER_BURST_SIZE - i; + + if (odp_likely(next < 0)) + break; + + /* Get another header and link it */ + next_hdr = hdr; + hdr = buf_hdr[next]; + hdr->next = next_hdr; + next--; + } + } + LOCK(&queue->s.lock); if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { UNLOCK(&queue->s.lock); @@ -411,9 +439,9 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
/* Empty queue */ if (queue->s.head == NULL) - queue->s.head = buf_hdr[0]; + queue->s.head = hdr; else - queue->s.tail->next = buf_hdr[0]; + queue->s.tail->next = hdr;
queue->s.tail = tail;
@@ -483,9 +511,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - odp_buffer_hdr_t *hdr; - int i; - uint32_t j; + odp_buffer_hdr_t *hdr, *next; + int i, j; + int updated = 0;
LOCK(&queue->s.lock); if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { @@ -506,33 +534,65 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return 0; }
- for (i = 0; i < num && hdr; i++) { - buf_hdr[i] = hdr; - hdr = hdr->next; - buf_hdr[i]->next = NULL; - if (queue_is_ordered(queue)) { - buf_hdr[i]->origin_qe = queue; - buf_hdr[i]->order = queue->s.order_in++; - for (j = 0; j < queue->s.param.sched.lock_count; j++) { - buf_hdr[i]->sync[j] = + for (i = 0; i < num && hdr; ) { + int burst_num = hdr->burst_num; + int first = hdr->burst_first; + + /* First, get bursted buffers */ + for (j = 0; j < burst_num && i < num; j++, i++) { + buf_hdr[i] = hdr->burst[first + j]; + odp_prefetch(buf_hdr[i]); + } + + if (burst_num) { + hdr->burst_num = burst_num - j; + hdr->burst_first = first + j; + } + + if (i == num) + break; + + /* When burst is empty, consume the current buffer header and + * move to the next header */ + buf_hdr[i] = hdr; + next = hdr->next; + hdr->next = NULL; + hdr = next; + updated++; + i++; + } + + /* Ordered queue book keeping inside the lock */ + if (queue_is_ordered(queue)) { + for (j = 0; j < i; j++) { + uint32_t k; + + buf_hdr[j]->origin_qe = queue; + buf_hdr[j]->order = queue->s.order_in++; + for (k = 0; k < queue->s.param.sched.lock_count; k++) { + buf_hdr[j]->sync[k] = odp_atomic_fetch_inc_u64 - (&queue->s.sync_in[j]); + (&queue->s.sync_in[k]); } - buf_hdr[i]->flags.sustain = SUSTAIN_ORDER; - } else { - buf_hdr[i]->origin_qe = NULL; + buf_hdr[j]->flags.sustain = SUSTAIN_ORDER; } }
- queue->s.head = hdr; + /* Write head only if updated */ + if (updated) + queue->s.head = hdr;
- if (hdr == NULL) { - /* Queue is now empty */ + /* Queue is empty */ + if (hdr == NULL) queue->s.tail = NULL; - }
UNLOCK(&queue->s.lock);
+ /* Init origin_qe for non-ordered queues */ + if (!queue_is_ordered(queue)) + for (j = 0; j < i; j++) + buf_hdr[j]->origin_qe = NULL; + return i; }
@@ -543,7 +603,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) { - odp_buffer_hdr_t *buf_hdr; + odp_buffer_hdr_t *buf_hdr = NULL; int ret;
ret = deq_multi(queue, &buf_hdr, 1); diff --git a/platform/linux-generic/odp_schedule_ordered.c b/platform/linux-generic/odp_schedule_ordered.c index 92a1cc8..8412183 100644 --- a/platform/linux-generic/odp_schedule_ordered.c +++ b/platform/linux-generic/odp_schedule_ordered.c @@ -457,15 +457,24 @@ int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[], { queue_entry_t *origin_qe; uint64_t order; - int rc; + int i, rc; queue_entry_t *qe = get_qentry(queue_index); - odp_buffer_hdr_t *buf_hdr = p_buf_hdr[0]; + odp_buffer_hdr_t *first_hdr = p_buf_hdr[0]; + odp_buffer_hdr_t **buf_hdr = (odp_buffer_hdr_t **)p_buf_hdr; + + /* Chain input buffers together */ + for (i = 0; i < num - 1; i++) { + buf_hdr[i]->next = buf_hdr[i + 1]; + buf_hdr[i]->burst_num = 0; + } + + buf_hdr[num - 1]->next = NULL;
/* Handle ordered enqueues commonly via links */ - get_queue_order(&origin_qe, &order, buf_hdr); + get_queue_order(&origin_qe, &order, first_hdr); if (origin_qe) { - buf_hdr->link = buf_hdr->next; - rc = ordered_queue_enq(qe, buf_hdr, sustain, + first_hdr->link = first_hdr->next; + rc = ordered_queue_enq(qe, first_hdr, sustain, origin_qe, order); *ret = rc == 0 ? num : rc; return 1;
commit 7bb62b522a5f89d6d19a4c77254222b1c07ab44b Author: Petri Savolainen petri.savolainen@nokia.com Date: Thu Sep 15 16:39:19 2016 +0300
linux-gen: queue: reuse enq_ and deq_multi
Reuse multi enqueue and dequeue implementations for single enq/deq operations. This enables implementation to concentrate on optimizing the multi operations. Single operations do not suffer a major performance decrease since compiler likely optimizes the inlined code for single operations (num is fixed to 1).
Signed-off-by: Petri Savolainen petri.savolainen@nokia.com Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 13cdfb3..df73e70 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -30,8 +30,6 @@ typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index, ); typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index); typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index); -typedef int (*schedule_ord_enq_fn_t)(uint32_t queue_index, void *buf_hdr, - int sustain, int *ret); typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index, void *buf_hdr[], int num, int sustain, int *ret); @@ -48,7 +46,6 @@ typedef struct schedule_fn_t { schedule_init_queue_fn_t init_queue; schedule_destroy_queue_fn_t destroy_queue; schedule_sched_queue_fn_t sched_queue; - schedule_ord_enq_fn_t ord_enq; schedule_ord_enq_multi_fn_t ord_enq_multi; schedule_init_global_fn_t init_global; schedule_term_global_fn_t term_global; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index bec1e51..80d99e8 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -65,19 +65,6 @@ static inline int queue_is_ordered(queue_entry_t *qe) return qe->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED; }
-static inline void queue_add(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr) -{ - buf_hdr->next = NULL; - - if (queue->s.head) - queue->s.tail->next = buf_hdr; - else - queue->s.head = buf_hdr; - - queue->s.tail = buf_hdr; -} - queue_entry_t *get_qentry(uint32_t queue_id) { return &queue_tbl->queue[queue_id]; @@ -396,37 +383,8 @@ odp_queue_t odp_queue_lookup(const char *name) return ODP_QUEUE_INVALID; }
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) -{ - int ret; - - if (sched_fn->ord_enq(queue->s.index, buf_hdr, sustain, &ret)) - return ret; - - LOCK(&queue->s.lock); - - if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { - UNLOCK(&queue->s.lock); - ODP_ERR("Bad queue status\n"); - return -1; - } - - queue_add(queue, buf_hdr); - - if (queue->s.status == QUEUE_STATUS_NOTSCHED) { - queue->s.status = QUEUE_STATUS_SCHED; - UNLOCK(&queue->s.lock); - if (sched_fn->sched_queue(queue->s.index)) - ODP_ABORT("schedule_queue failed\n"); - return 0; - } - - UNLOCK(&queue->s.lock); - return 0; -} - -int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num, int sustain) +static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], + int num, int sustain) { int sched = 0; int i, ret; @@ -472,6 +430,24 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return num; /* All events enqueued */ }
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, + int sustain) +{ + return enq_multi(queue, buf_hdr, num, sustain); +} + +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) +{ + int ret; + + ret = enq_multi(queue, &buf_hdr, 1, sustain); + + if (ret == 1) + return 0; + else + return -1; +} + int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num) { odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; @@ -504,54 +480,8 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) return queue->s.enqueue(queue, buf_hdr, SUSTAIN_ORDER); }
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) -{ - odp_buffer_hdr_t *buf_hdr; - uint32_t i; - - LOCK(&queue->s.lock); - - if (queue->s.head == NULL) { - /* Already empty queue */ - if (queue->s.status == QUEUE_STATUS_SCHED) - queue->s.status = QUEUE_STATUS_NOTSCHED; - - UNLOCK(&queue->s.lock); - return NULL; - } - - buf_hdr = queue->s.head; - queue->s.head = buf_hdr->next; - buf_hdr->next = NULL; - - /* Note that order should really be assigned on enq to an - * ordered queue rather than deq, however the logic is simpler - * to do it here and has the same effect. - */ - if (queue_is_ordered(queue)) { - buf_hdr->origin_qe = queue; - buf_hdr->order = queue->s.order_in++; - for (i = 0; i < queue->s.param.sched.lock_count; i++) { - buf_hdr->sync[i] = - odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]); - } - buf_hdr->flags.sustain = SUSTAIN_ORDER; - } else { - buf_hdr->origin_qe = NULL; - } - - if (queue->s.head == NULL) { - /* Queue is now empty */ - queue->s.tail = NULL; - } - - UNLOCK(&queue->s.lock); - - return buf_hdr; -} - - -int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], + int num) { odp_buffer_hdr_t *hdr; int i; @@ -606,6 +536,24 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) return i; }
+int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +{ + return deq_multi(queue, buf_hdr, num); +} + +odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) +{ + odp_buffer_hdr_t *buf_hdr; + int ret; + + ret = deq_multi(queue, &buf_hdr, 1); + + if (ret == 1) + return buf_hdr; + else + return NULL; +} + int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num) { queue_entry_t *queue; @@ -740,7 +688,7 @@ int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num) queue_entry_t *qe = get_qentry(queue_index); odp_buffer_hdr_t *buf_hdr[num];
- ret = queue_deq_multi(qe, buf_hdr, num); + ret = deq_multi(qe, buf_hdr, num);
if (ret > 0) for (i = 0; i < ret; i++) diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 78982d9..81e79c9 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -1063,7 +1063,6 @@ const schedule_fn_t schedule_default_fn = { .init_queue = schedule_init_queue, .destroy_queue = schedule_destroy_queue, .sched_queue = schedule_sched_queue, - .ord_enq = schedule_ordered_queue_enq, .ord_enq_multi = schedule_ordered_queue_enq_multi, .init_global = schedule_init_global, .term_global = schedule_term_global, diff --git a/platform/linux-generic/odp_schedule_ordered.c b/platform/linux-generic/odp_schedule_ordered.c index 8c1dd7e..92a1cc8 100644 --- a/platform/linux-generic/odp_schedule_ordered.c +++ b/platform/linux-generic/odp_schedule_ordered.c @@ -452,26 +452,6 @@ static int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, return 0; }
-int schedule_ordered_queue_enq(uint32_t queue_index, void *p_buf_hdr, - int sustain, int *ret) -{ - queue_entry_t *origin_qe; - uint64_t order; - queue_entry_t *qe = get_qentry(queue_index); - odp_buffer_hdr_t *buf_hdr = p_buf_hdr; - - get_queue_order(&origin_qe, &order, buf_hdr); - - /* Handle enqueues from ordered queues separately */ - if (origin_qe) { - *ret = ordered_queue_enq(qe, buf_hdr, sustain, - origin_qe, order); - return 1; - } - - return 0; -} - int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[], int num, int sustain, int *ret) { diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 2e28aa4..879eb5c 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -298,17 +298,6 @@ static int sched_queue(uint32_t qi) return 0; }
-static int ord_enq(uint32_t queue_index, void *buf_hdr, int sustain, int *ret) -{ - (void)queue_index; - (void)buf_hdr; - (void)sustain; - (void)ret; - - /* didn't consume the events */ - return 0; -} - static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num, int sustain, int *ret) { @@ -673,7 +662,6 @@ const schedule_fn_t schedule_sp_fn = { .init_queue = init_queue, .destroy_queue = destroy_queue, .sched_queue = sched_queue, - .ord_enq = ord_enq, .ord_enq_multi = ord_enq_multi, .init_global = init_global, .term_global = term_global,
-----------------------------------------------------------------------
Summary of changes: platform/linux-generic/Makefile.am | 1 + .../odp/api/plat/ticketlock_inlines.h} | 30 ++- .../linux-generic/include/odp_buffer_internal.h | 7 + .../linux-generic/include/odp_config_internal.h | 8 + .../linux-generic/include/odp_packet_io_queue.h | 3 +- platform/linux-generic/include/odp_pool_internal.h | 24 +- .../linux-generic/include/odp_queue_internal.h | 15 +- platform/linux-generic/include/odp_schedule_if.h | 3 - .../linux-generic/include/odp_schedule_internal.h | 2 +- platform/linux-generic/odp_queue.c | 271 ++++++++++----------- platform/linux-generic/odp_schedule.c | 1 - platform/linux-generic/odp_schedule_ordered.c | 37 +-- platform/linux-generic/odp_schedule_sp.c | 12 - platform/linux-generic/odp_ticketlock.c | 68 +----- 14 files changed, 199 insertions(+), 283 deletions(-) copy platform/linux-generic/{odp_ticketlock.c => include/odp/api/plat/ticketlock_inlines.h} (83%)
hooks/post-receive