This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via 492390a79f20c7aaaf16f232fbd5ecf0d9b700e7 (commit) via 53f3baf58256c085fa230992eb03c896276fc874 (commit) via 7266ca2e44705e550ba7c8c1a71fa373eabd7b99 (commit) via eb021ca3cba9635b861205b2fc94da2a3cdf37bc (commit) via f8136babc5601068ac0f3ab30414a5cbd99388c3 (commit) from a01d17348d6c34457a415935e702a24257adcf8a (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit 492390a79f20c7aaaf16f232fbd5ecf0d9b700e7 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Jun 15 16:48:26 2018 +0300
validation: queue: pair test
Test a pair of queues with two threads. This simple multi-thread test can be executed in all enqueue/dequeue modes, also in single-producer / single-consumer mode.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/test/validation/api/queue/queue.c b/test/validation/api/queue/queue.c index 20408da5..f009a24b 100644 --- a/test/validation/api/queue/queue.c +++ b/test/validation/api/queue/queue.c @@ -24,6 +24,17 @@ typedef struct { odp_queue_t queue; odp_atomic_u32_t num_event;
+ struct { + odp_queue_t queue_a; + odp_queue_t queue_b; + int passed_a; + int passed_b; + int burst; + odp_pool_t pool; + odp_barrier_t barrier; + odp_atomic_u32_t counter; + } pair; + struct { uint32_t num_event; } thread[ODP_THREAD_COUNT_MAX]; @@ -349,6 +360,208 @@ static void queue_test_burst_lf_spsc(void) ODP_QUEUE_OP_MT_UNSAFE); }
+static int queue_pair_work_loop(void *arg) +{ + uint32_t i, events, burst, retry, max_retry; + odp_buffer_t buf; + odp_event_t ev; + uint32_t *data; + odp_queue_t src_queue, dst_queue; + odp_pool_t pool; + int passed; + int thread_a; + test_globals_t *globals = arg; + + burst = globals->pair.burst; + pool = globals->pair.pool; + + /* Select which thread is A */ + thread_a = odp_atomic_fetch_inc_u32(&globals->pair.counter); + + if (thread_a) { + src_queue = globals->pair.queue_a; + dst_queue = globals->pair.queue_b; + } else { + src_queue = globals->pair.queue_b; + dst_queue = globals->pair.queue_a; + } + + for (i = 0; i < burst; i++) { + buf = odp_buffer_alloc(pool); + CU_ASSERT(buf != ODP_BUFFER_INVALID); + + if (buf == ODP_BUFFER_INVALID) + return -1; + + data = odp_buffer_addr(buf); + *data = i; + ev = odp_buffer_to_event(buf); + CU_ASSERT(odp_queue_enq(dst_queue, ev) == 0); + } + + /* Wait until both threads are ready */ + odp_barrier_wait(&globals->pair.barrier); + events = 0; + retry = 0; + max_retry = 0; + i = 0; + while (events < 10000 && retry < 300) { + ev = odp_queue_deq(src_queue); + if (ev == ODP_EVENT_INVALID) { + retry++; + /* Slow down polling period after 100 retries. This + * gives time for the other thread to answer, if it + * was e.g. interrupted by the OS. We give up if + * the source queue stays empty for about 100ms. */ + if (retry > 200) + odp_time_wait_ns(ODP_TIME_MSEC_IN_NS); + else if (retry > 100) + odp_time_wait_ns(ODP_TIME_USEC_IN_NS); + + if (retry > max_retry) + max_retry = retry; + + continue; + } + + events++; + retry = 0; + buf = odp_buffer_from_event(ev); + data = odp_buffer_addr(buf); + CU_ASSERT(*data == i); + i++; + if (i == burst) + i = 0; + + CU_ASSERT(odp_queue_enq(dst_queue, ev) == 0); + } + + passed = (events == 10000); + + if (thread_a) { + globals->pair.passed_a = passed; + if (max_retry > 100) + printf("\n thread_a max_retry %u\n", max_retry); + } else { + globals->pair.passed_b = passed; + if (max_retry > 100) + printf("\n thread_b max_retry %u\n", max_retry); + } + + return 0; +} + +static void test_pair(odp_nonblocking_t nonblocking, + odp_queue_op_mode_t enq_mode, + odp_queue_op_mode_t deq_mode) +{ + odp_queue_param_t param; + odp_queue_t queue; + odp_queue_capability_t capa; + uint32_t max_burst; + odp_pool_t pool; + odp_event_t ev; + odp_shm_t shm; + test_globals_t *globals; + + shm = odp_shm_lookup(GLOBALS_NAME); + CU_ASSERT_FATAL(shm != ODP_SHM_INVALID); + globals = odp_shm_addr(shm); + + CU_ASSERT_FATAL(odp_queue_capability(&capa) == 0); + + max_burst = 2 * BURST_SIZE; + + if (nonblocking == ODP_NONBLOCKING_LF) { + if (capa.plain.lockfree.max_num == 0) { + printf(" NO LOCKFREE QUEUES. Test skipped.\n"); + return; + } + + if (capa.plain.lockfree.max_size < max_burst) + max_burst = capa.plain.lockfree.max_size; + } else { + if (capa.plain.max_size && capa.plain.max_size < max_burst) + max_burst = capa.plain.max_size; + } + + globals->pair.burst = max_burst / 2; + + pool = odp_pool_lookup("msg_pool"); + CU_ASSERT_FATAL(pool != ODP_POOL_INVALID); + globals->pair.pool = pool; + + odp_queue_param_init(¶m); + param.type = ODP_QUEUE_TYPE_PLAIN; + param.nonblocking = nonblocking; + param.size = max_burst; + param.enq_mode = enq_mode; + param.deq_mode = deq_mode; + + queue = odp_queue_create("queue_a", ¶m); + CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID); + globals->pair.queue_a = queue; + CU_ASSERT(odp_queue_deq(queue) == ODP_EVENT_INVALID); + + queue = odp_queue_create("queue_b", ¶m); + CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID); + globals->pair.queue_b = queue; + CU_ASSERT(odp_queue_deq(queue) == ODP_EVENT_INVALID); + + odp_barrier_init(&globals->pair.barrier, 2); + globals->pair.passed_a = 0; + globals->pair.passed_b = 0; + odp_atomic_init_u32(&globals->pair.counter, 0); + + /* Create one worker thread */ + globals->cu_thr.numthrds = 1; + odp_cunit_thread_create(queue_pair_work_loop, (pthrd_arg *)globals); + + /* Run this thread as the second thread */ + CU_ASSERT(queue_pair_work_loop(globals) == 0); + + /* Wait worker to terminate */ + odp_cunit_thread_exit((pthrd_arg *)globals); + + CU_ASSERT(globals->pair.passed_a); + CU_ASSERT(globals->pair.passed_b); + + while ((ev = dequeue_event(globals->pair.queue_a)) != ODP_EVENT_INVALID) + odp_event_free(ev); + + while ((ev = dequeue_event(globals->pair.queue_b)) != ODP_EVENT_INVALID) + odp_event_free(ev); + + CU_ASSERT(odp_queue_destroy(globals->pair.queue_a) == 0); + CU_ASSERT(odp_queue_destroy(globals->pair.queue_b) == 0); +} + +static void queue_test_pair(void) +{ + test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT); +} + +static void queue_test_pair_spmc(void) +{ + test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT_UNSAFE, ODP_QUEUE_OP_MT); +} + +static void queue_test_pair_mpsc(void) +{ + test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT_UNSAFE); +} + +static void queue_test_pair_spsc(void) +{ + test_pair(ODP_BLOCKING, ODP_QUEUE_OP_MT_UNSAFE, ODP_QUEUE_OP_MT_UNSAFE); +} + +static void queue_test_pair_lf_spsc(void) +{ + test_pair(ODP_NONBLOCKING_LF, ODP_QUEUE_OP_MT_UNSAFE, + ODP_QUEUE_OP_MT_UNSAFE); +} + static void queue_test_param(void) { odp_queue_t queue, null_queue; @@ -727,6 +940,11 @@ odp_testinfo_t queue_suite[] = { ODP_TEST_INFO(queue_test_burst_lf_spmc), ODP_TEST_INFO(queue_test_burst_lf_mpsc), ODP_TEST_INFO(queue_test_burst_lf_spsc), + ODP_TEST_INFO(queue_test_pair), + ODP_TEST_INFO(queue_test_pair_spmc), + ODP_TEST_INFO(queue_test_pair_mpsc), + ODP_TEST_INFO(queue_test_pair_spsc), + ODP_TEST_INFO(queue_test_pair_lf_spsc), ODP_TEST_INFO(queue_test_param), ODP_TEST_INFO(queue_test_info), ODP_TEST_INFO(queue_test_mt_plain_block),
commit 53f3baf58256c085fa230992eb03c896276fc874 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Jun 15 13:22:01 2018 +0300
validation: queue: test enq/deq mode combinations
Changed the single thread, lock-free queue test to generic single thread burst enq/deq test. Test all combination enq/deq mode combination with blocking and lock-free queues.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/test/validation/api/queue/queue.c b/test/validation/api/queue/queue.c index 304de021..20408da5 100644 --- a/test/validation/api/queue/queue.c +++ b/test/validation/api/queue/queue.c @@ -224,7 +224,9 @@ static odp_event_t dequeue_event(odp_queue_t queue) return ev; }
-static void queue_test_lockfree(void) +static void test_burst(odp_nonblocking_t nonblocking, + odp_queue_op_mode_t enq_mode, + odp_queue_op_mode_t deq_mode) { odp_queue_param_t param; odp_queue_t queue; @@ -237,12 +239,16 @@ static void queue_test_lockfree(void)
CU_ASSERT_FATAL(odp_queue_capability(&capa) == 0);
- if (capa.plain.lockfree.max_num == 0) { - printf(" NO LOCKFREE QUEUES. Test skipped.\n"); - return; - } + max_burst = capa.plain.max_size; + + if (nonblocking == ODP_NONBLOCKING_LF) { + if (capa.plain.lockfree.max_num == 0) { + printf(" NO LOCKFREE QUEUES. Test skipped.\n"); + return; + }
- max_burst = capa.plain.lockfree.max_size; + max_burst = capa.plain.lockfree.max_size; + }
if (max_burst == 0 || max_burst > MAX_NUM_EVENT) max_burst = MAX_NUM_EVENT; @@ -252,10 +258,12 @@ static void queue_test_lockfree(void)
odp_queue_param_init(¶m); param.type = ODP_QUEUE_TYPE_PLAIN; - param.nonblocking = ODP_NONBLOCKING_LF; + param.nonblocking = nonblocking; param.size = max_burst; + param.enq_mode = enq_mode; + param.deq_mode = deq_mode;
- queue = odp_queue_create("lockfree_queue", ¶m); + queue = odp_queue_create("burst test", ¶m); CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID);
CU_ASSERT(odp_queue_deq(queue) == ODP_EVENT_INVALID); @@ -299,6 +307,48 @@ static void queue_test_lockfree(void) CU_ASSERT(odp_queue_destroy(queue) == 0); }
+static void queue_test_burst(void) +{ + test_burst(ODP_BLOCKING, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT); +} + +static void queue_test_burst_spmc(void) +{ + test_burst(ODP_BLOCKING, ODP_QUEUE_OP_MT_UNSAFE, ODP_QUEUE_OP_MT); +} + +static void queue_test_burst_mpsc(void) +{ + test_burst(ODP_BLOCKING, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT_UNSAFE); +} + +static void queue_test_burst_spsc(void) +{ + test_burst(ODP_BLOCKING, ODP_QUEUE_OP_MT_UNSAFE, + ODP_QUEUE_OP_MT_UNSAFE); +} + +static void queue_test_burst_lf(void) +{ + test_burst(ODP_NONBLOCKING_LF, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT); +} + +static void queue_test_burst_lf_spmc(void) +{ + test_burst(ODP_NONBLOCKING_LF, ODP_QUEUE_OP_MT_UNSAFE, ODP_QUEUE_OP_MT); +} + +static void queue_test_burst_lf_mpsc(void) +{ + test_burst(ODP_NONBLOCKING_LF, ODP_QUEUE_OP_MT, ODP_QUEUE_OP_MT_UNSAFE); +} + +static void queue_test_burst_lf_spsc(void) +{ + test_burst(ODP_NONBLOCKING_LF, ODP_QUEUE_OP_MT_UNSAFE, + ODP_QUEUE_OP_MT_UNSAFE); +} + static void queue_test_param(void) { odp_queue_t queue, null_queue; @@ -669,7 +719,14 @@ static void queue_test_mt_plain_nonblock_lf(void) odp_testinfo_t queue_suite[] = { ODP_TEST_INFO(queue_test_capa), ODP_TEST_INFO(queue_test_mode), - ODP_TEST_INFO(queue_test_lockfree), + ODP_TEST_INFO(queue_test_burst), + ODP_TEST_INFO(queue_test_burst_spmc), + ODP_TEST_INFO(queue_test_burst_mpsc), + ODP_TEST_INFO(queue_test_burst_spsc), + ODP_TEST_INFO(queue_test_burst_lf), + ODP_TEST_INFO(queue_test_burst_lf_spmc), + ODP_TEST_INFO(queue_test_burst_lf_mpsc), + ODP_TEST_INFO(queue_test_burst_lf_spsc), ODP_TEST_INFO(queue_test_param), ODP_TEST_INFO(queue_test_info), ODP_TEST_INFO(queue_test_mt_plain_block),
commit 7266ca2e44705e550ba7c8c1a71fa373eabd7b99 Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Jun 15 13:19:49 2018 +0300
test: queue_perf: single producer/consumer option
Added option to test queues with single producer/consumer mode set.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/test/performance/odp_queue_perf.c b/test/performance/odp_queue_perf.c index 521b7271..d5ff254d 100644 --- a/test/performance/odp_queue_perf.c +++ b/test/performance/odp_queue_perf.c @@ -18,6 +18,7 @@ typedef struct test_options_t { uint32_t num_event; uint32_t num_round; odp_nonblocking_t nonblock; + int single;
} test_options_t;
@@ -33,6 +34,7 @@ static void print_usage(void) " -r, --num_round Number of rounds\n" " -l, --lockfree Lockfree queues\n" " -w, --waitfree Waitfree queues\n" + " -s, --single Single producer, single consumer\n" " -h, --help This help\n" "\n"); } @@ -49,16 +51,18 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options) {"num_round", required_argument, NULL, 'r'}, {"lockfree", no_argument, NULL, 'l'}, {"waitfree", no_argument, NULL, 'w'}, + {"single", no_argument, NULL, 's'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} };
- static const char *shortopts = "+q:e:r:lwh"; + static const char *shortopts = "+q:e:r:lwsh";
test_options->num_queue = 1; test_options->num_event = 1; test_options->num_round = 1000; test_options->nonblock = ODP_BLOCKING; + test_options->single = 0;
while (1) { opt = getopt_long(argc, argv, shortopts, longopts, &long_index); @@ -82,6 +86,9 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options) case 'w': test_options->nonblock = ODP_NONBLOCKING_WF; break; + case 's': + test_options->single = 1; + break; case 'h': /* fall through */ default: @@ -210,6 +217,11 @@ static int test_queue(test_options_t *test_options) queue_param.nonblocking = nonblock; queue_param.size = num_event;
+ if (test_options->single) { + queue_param.enq_mode = ODP_QUEUE_OP_MT_UNSAFE; + queue_param.deq_mode = ODP_QUEUE_OP_MT_UNSAFE; + } + for (i = 0; i < num_queue; i++) { queue[i] = odp_queue_create(NULL, &queue_param);
commit eb021ca3cba9635b861205b2fc94da2a3cdf37bc Author: Petri Savolainen petri.savolainen@linaro.org Date: Fri Jun 15 10:06:38 2018 +0300
linux-gen: queue_spsc: single-producer, single-consumer queue
Simple and lock-free implementation of plain queues when there are only single producer and consumer.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index eca2ad17..1e617cc3 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -182,6 +182,7 @@ __LIB__libodp_linux_la_SOURCES = \ odp_queue_if.c \ odp_queue_lf.c \ odp_queue_scalable.c \ + odp_queue_spsc.c \ odp_random.c \ odp_rwlock.c \ odp_rwlock_recursive.c \ diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 6cc6ed28..bb74f729 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -23,6 +23,7 @@ extern "C" { #include <odp/api/ticketlock.h> #include <odp_config_internal.h> #include <odp_ring_st_internal.h> +#include <odp_ring_spsc_internal.h> #include <odp_queue_lf.h>
#define QUEUE_STATUS_FREE 0 @@ -33,7 +34,10 @@ extern "C" {
struct queue_entry_s { odp_ticketlock_t ODP_ALIGNED_CACHE lock; - ring_st_t ring_st; + union { + ring_st_t ring_st; + ring_spsc_t ring_spsc; + }; int status;
queue_enq_fn_t ODP_ALIGNED_CACHE enqueue; @@ -48,6 +52,7 @@ struct queue_entry_s { odp_pktin_queue_t pktin; odp_pktout_queue_t pktout; void *queue_lf; + int spsc; char name[ODP_QUEUE_NAME_LEN]; };
@@ -91,6 +96,8 @@ static inline odp_queue_t queue_from_index(uint32_t queue_id) return (odp_queue_t)qentry_from_index(queue_id); }
+void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size); + #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index f7715bc5..f592ef6c 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -308,7 +308,8 @@ static odp_queue_t queue_create(const char *name, return ODP_QUEUE_INVALID; }
- if (param->nonblocking == ODP_NONBLOCKING_LF) { + if (!queue->s.spsc && + param->nonblocking == ODP_NONBLOCKING_LF) { queue_lf_func_t *lf_func;
lf_func = &queue_glb->queue_lf_func; @@ -421,7 +422,7 @@ static int queue_destroy(odp_queue_t handle) ODP_ABORT("Unexpected queue status\n"); }
- if (queue->s.param.nonblocking == ODP_NONBLOCKING_LF) + if (queue->s.queue_lf) queue_lf_destroy(queue->s.queue_lf);
UNLOCK(queue); @@ -658,6 +659,7 @@ static int queue_init(queue_entry_t *queue, const char *name, { uint64_t offset; uint32_t queue_size; + int spsc;
if (name == NULL) { queue->s.name[0] = 0; @@ -674,11 +676,6 @@ static int queue_init(queue_entry_t *queue, const char *name,
queue->s.type = queue->s.param.type;
- queue->s.enqueue = queue_int_enq; - queue->s.dequeue = queue_int_deq; - queue->s.enqueue_multi = queue_int_enq_multi; - queue->s.dequeue_multi = queue_int_deq_multi; - queue->s.pktin = PKTIN_INVALID; queue->s.pktout = PKTOUT_INVALID;
@@ -698,8 +695,26 @@ static int queue_init(queue_entry_t *queue, const char *name,
offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size;
- ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset], - queue_size); + /* Single-producer / single-consumer plain queue has simple and + * lock-free implementation */ + spsc = (param->type == ODP_QUEUE_TYPE_PLAIN) && + (param->enq_mode == ODP_QUEUE_OP_MT_UNSAFE) && + (param->deq_mode == ODP_QUEUE_OP_MT_UNSAFE); + + queue->s.spsc = spsc; + queue->s.queue_lf = NULL; + + if (spsc) { + queue_spsc_init(queue, queue_size); + } else { + queue->s.enqueue = queue_int_enq; + queue->s.dequeue = queue_int_deq; + queue->s.enqueue_multi = queue_int_enq_multi; + queue->s.dequeue_multi = queue_int_deq_multi; + + ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset], + queue_size); + }
return 0; } diff --git a/platform/linux-generic/odp_queue_spsc.c b/platform/linux-generic/odp_queue_spsc.c new file mode 100644 index 00000000..f8b04ca2 --- /dev/null +++ b/platform/linux-generic/odp_queue_spsc.c @@ -0,0 +1,131 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ +#include <string.h> +#include <stdio.h> + +#include <odp_queue_internal.h> +#include <odp_pool_internal.h> + +#include "config.h" +#include <odp_debug_internal.h> + +static inline void buffer_index_from_buf(uint32_t buffer_index[], + odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + + for (i = 0; i < num; i++) + buffer_index[i] = buf_hdr[i]->index.u32; +} + +static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[], + uint32_t buffer_index[], int num) +{ + int i; + + for (i = 0; i < num; i++) { + buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]); + odp_prefetch(buf_hdr[i]); + } +} + +static inline int spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + queue_entry_t *queue; + ring_spsc_t *ring_spsc; + uint32_t buf_idx[num]; + + queue = q_int; + ring_spsc = &queue->s.ring_spsc; + + buffer_index_from_buf(buf_idx, buf_hdr, num); + + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + ODP_ERR("Bad queue status\n"); + return -1; + } + + return ring_spsc_enq_multi(ring_spsc, buf_idx, num); +} + +static inline int spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + queue_entry_t *queue; + int num_deq; + ring_spsc_t *ring_spsc; + uint32_t buf_idx[num]; + + queue = q_int; + ring_spsc = &queue->s.ring_spsc; + + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + /* Bad queue, or queue has been destroyed. */ + return -1; + } + + num_deq = ring_spsc_deq_multi(ring_spsc, buf_idx, num); + + if (num_deq == 0) + return 0; + + buffer_index_to_buf(buf_hdr, buf_idx, num_deq); + + return num_deq; +} + +static int queue_spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + return spsc_enq_multi(q_int, buf_hdr, num); +} + +static int queue_spsc_enq(void *q_int, odp_buffer_hdr_t *buf_hdr) +{ + int ret; + + ret = spsc_enq_multi(q_int, &buf_hdr, 1); + + if (ret == 1) + return 0; + else + return -1; +} + +static int queue_spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + return spsc_deq_multi(q_int, buf_hdr, num); +} + +static odp_buffer_hdr_t *queue_spsc_deq(void *q_int) +{ + odp_buffer_hdr_t *buf_hdr = NULL; + int ret; + + ret = spsc_deq_multi(q_int, &buf_hdr, 1); + + if (ret == 1) + return buf_hdr; + else + return NULL; +} + +void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size) +{ + uint64_t offset; + + queue->s.enqueue = queue_spsc_enq; + queue->s.dequeue = queue_spsc_deq; + queue->s.enqueue_multi = queue_spsc_enq_multi; + queue->s.dequeue_multi = queue_spsc_deq_multi; + + offset = queue->s.index * (uint64_t)queue_glb->config.max_queue_size; + + ring_spsc_init(&queue->s.ring_spsc, &queue_glb->ring_data[offset], + queue_size); +}
commit f8136babc5601068ac0f3ab30414a5cbd99388c3 Author: Petri Savolainen petri.savolainen@linaro.org Date: Thu Jun 14 13:51:39 2018 +0300
linux-gen: ring_spsc: single-producer, single-consumer ring
This ring can be used to implement (lock-free) queues when there is only single (concurrent) producer and consumer. SP/SC limitation enables very simple synchronization and thus good performance.
Signed-off-by: Petri Savolainen petri.savolainen@linaro.org Reviewed-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 6ccc284f..eca2ad17 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -127,6 +127,7 @@ noinst_HEADERS = \ include/odp_queue_lf.h \ include/odp_queue_scalable_internal.h \ include/odp_ring_internal.h \ + include/odp_ring_spsc_internal.h \ include/odp_ring_st_internal.h \ include/odp_schedule_if.h \ include/odp_schedule_scalable_config.h \ diff --git a/platform/linux-generic/include/odp_ring_spsc_internal.h b/platform/linux-generic/include/odp_ring_spsc_internal.h new file mode 100644 index 00000000..e38bda1d --- /dev/null +++ b/platform/linux-generic/include/odp_ring_spsc_internal.h @@ -0,0 +1,124 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_SPSC_INTERNAL_H_ +#define ODP_RING_SPSC_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdint.h> + +#include <odp/api/atomic.h> +#include <odp/api/plat/atomic_inlines.h> + +/* Lock-free ring for single-producer / single-consumer usage. + * + * Thread doing an operation may be different each time, but the same operation + * (enq- or dequeue) must not be called concurrently. The next thread may call + * the same operation only when it's sure that the previous thread have returned + * from the call, or will never return back to finish the call when interrupted + * during the call. + * + * Enqueue and dequeue operations can be done concurrently. + */ +typedef struct { + odp_atomic_u32_t head; + odp_atomic_u32_t tail; + uint32_t mask; + uint32_t *data; + +} ring_spsc_t; + +/* Initialize ring. Ring size must be a power of two. */ +static inline void ring_spsc_init(ring_spsc_t *ring, uint32_t *data, + uint32_t size) +{ + odp_atomic_init_u32(&ring->head, 0); + odp_atomic_init_u32(&ring->tail, 0); + ring->mask = size - 1; + ring->data = data; +} + +/* Dequeue data from the ring head. Max_num is smaller than ring size.*/ +static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[], + uint32_t max_num) +{ + uint32_t head, tail, mask, idx; + uint32_t num, i; + + tail = odp_atomic_load_acq_u32(&ring->tail); + head = odp_atomic_load_u32(&ring->head); + mask = ring->mask; + num = tail - head; + + /* Empty */ + if (num == 0) + return 0; + + if (num > max_num) + num = max_num; + + idx = head & mask; + + for (i = 0; i < num; i++) { + data[i] = ring->data[idx]; + idx = (idx + 1) & mask; + } + + odp_atomic_store_rel_u32(&ring->head, head + num); + + return num; +} + +/* Enqueue data into the ring tail. Num_data is smaller than ring size. */ +static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring, + const uint32_t data[], + uint32_t num_data) +{ + uint32_t head, tail, mask, size, idx; + uint32_t num, i; + + head = odp_atomic_load_acq_u32(&ring->head); + tail = odp_atomic_load_u32(&ring->tail); + mask = ring->mask; + size = mask + 1; + num = size - (tail - head); + + /* Full */ + if (num == 0) + return 0; + + if (num > num_data) + num = num_data; + + idx = tail & mask; + + for (i = 0; i < num; i++) { + ring->data[idx] = data[i]; + idx = (idx + 1) & mask; + } + + odp_atomic_store_rel_u32(&ring->tail, tail + num); + + return num; +} + +/* Check if ring is empty */ +static inline int ring_spsc_is_empty(ring_spsc_t *ring) +{ + uint32_t head = odp_atomic_load_u32(&ring->head); + uint32_t tail = odp_atomic_load_u32(&ring->tail); + + return head == tail; +} + +#ifdef __cplusplus +} +#endif + +#endif
-----------------------------------------------------------------------
Summary of changes: platform/linux-generic/Makefile.am | 2 + .../linux-generic/include/odp_queue_internal.h | 9 +- .../linux-generic/include/odp_ring_spsc_internal.h | 124 +++++++++ platform/linux-generic/odp_queue_basic.c | 33 ++- platform/linux-generic/odp_queue_spsc.c | 131 +++++++++ test/performance/odp_queue_perf.c | 14 +- test/validation/api/queue/queue.c | 293 ++++++++++++++++++++- 7 files changed, 586 insertions(+), 20 deletions(-) create mode 100644 platform/linux-generic/include/odp_ring_spsc_internal.h create mode 100644 platform/linux-generic/odp_queue_spsc.c
hooks/post-receive