This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, next has been updated via d56c7755dce115f3c1692fa358e58b1b46cf2234 (commit) via 724db2bdabcafb5438fe02d01e8f4b60a1ab37e1 (commit) via ca58e1a62911ed25abddf2a899a1bee678f6d305 (commit) via a19875674436996ec38e83371fb0a91427dfe3e0 (commit) via 3fee187e801a1a6ac0d7699486451f2bfe30ecd1 (commit) via 6ec758bd738b86c48b5ae7ba01759aeb95c18e01 (commit) via 3e399ed202522790efa552666aa04dbfd9e06da4 (commit) via 31498a7a8bd19de593f6b862cb39d484b99c9bf8 (commit) from 53d71e1a9e543e8da738fbf4b9d028e750939147 (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit d56c7755dce115f3c1692fa358e58b1b46cf2234 Author: Yi He yi.he@linaro.org Date: Tue Feb 7 02:35:17 2017 +0000
linux-gen: fix odp_schedule.m4
Fixed configure script when enables schedule option schedule-iquery=yes or schedule-sp=yes: command not found
Signed-off-by: Yi He yi.he@linaro.org Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4 index 2dcc9a7..91c19f2 100644 --- a/platform/linux-generic/m4/odp_schedule.m4 +++ b/platform/linux-generic/m4/odp_schedule.m4 @@ -1,13 +1,13 @@ AC_ARG_ENABLE([schedule-sp], [ --enable-schedule-sp enable strict priority scheduler], [if test x$enableval = xyes; then - schedule-sp=yes + schedule_sp_enabled=yes ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP" fi])
AC_ARG_ENABLE([schedule-iquery], [ --enable-schedule-iquery enable interests query (sparse bitmap) scheduler], [if test x$enableval = xyes; then - schedule-iquery=yes + schedule_iquery_enabled=yes ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY" fi])
commit 724db2bdabcafb5438fe02d01e8f4b60a1ab37e1 Author: Christophe Milard christophe.milard@linaro.org Date: Thu Jul 21 14:06:11 2016 +0200
linux-generic: cosmetic changes on byteorder files
To please check-patch before the copy to the drv interface.
Signed-off-by: Christophe Milard christophe.milard@linaro.org Reviewed-and-tested-by: Mike Holmes mike.holmes@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/include/odp/api/spec/byteorder.h b/include/odp/api/spec/byteorder.h index 2899adb..38c0bdb 100644 --- a/include/odp/api/spec/byteorder.h +++ b/include/odp/api/spec/byteorder.h @@ -4,7 +4,6 @@ * SPDX-License-Identifier: BSD-3-Clause */
- /** * @file * @@ -95,7 +94,6 @@ uint32_t odp_be_to_cpu_32(odp_u32be_t be32); */ uint64_t odp_be_to_cpu_64(odp_u64be_t be64);
- /* * CPU byte order -> Big Endian: */ @@ -121,7 +119,6 @@ odp_u32be_t odp_cpu_to_be_32(uint32_t cpu32); */ odp_u64be_t odp_cpu_to_be_64(uint64_t cpu64);
- /* * Little Endian -> CPU byte order: */ @@ -147,7 +144,6 @@ uint32_t odp_le_to_cpu_32(odp_u32le_t le32); */ uint64_t odp_le_to_cpu_64(odp_u64le_t le64);
- /* * CPU byte order -> Little Endian: */ diff --git a/platform/linux-generic/include/odp/api/plat/byteorder_types.h b/platform/linux-generic/include/odp/api/plat/byteorder_types.h index 09235b5..20d52bf 100644 --- a/platform/linux-generic/include/odp/api/plat/byteorder_types.h +++ b/platform/linux-generic/include/odp/api/plat/byteorder_types.h @@ -4,7 +4,6 @@ * SPDX-License-Identifier: BSD-3-Clause */
- /** * @file * @@ -30,7 +29,6 @@ extern "C" { #error __LITTLE_ENDIAN not defined! #endif
- /* for use with type checkers such as sparse */ #ifdef __CHECKER__ /** @internal bitwise attribute */ @@ -44,7 +42,6 @@ extern "C" { #define __odp_force #endif
- /** @addtogroup odp_compiler_optim * @{ */
commit ca58e1a62911ed25abddf2a899a1bee678f6d305 Author: Bill Fischofer bill.fischofer@linaro.org Date: Mon Jan 30 14:38:14 2017 -0600
doc: userguide: add section on application portability
Add a section to the ODP User Guide discussing application portability considerations and the use of source and binary portability options provided by ODP.
Signed-off-by: Bill Fischofer bill.fischofer@linaro.org Reviewed-by: Mike Holmes mike.holmes@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/doc/users-guide/users-guide.adoc b/doc/users-guide/users-guide.adoc index 41c57d1..ead8da5 100755 --- a/doc/users-guide/users-guide.adoc +++ b/doc/users-guide/users-guide.adoc @@ -589,6 +589,126 @@ lookup. The lookup function is particularly useful to allow an ODP application that is divided into multiple processes to obtain the handle for the common resource.
+== Application Portability Considerations +ODP is designed to support the creation of portable data plane applications +that can easily be run on multiple target platforms while at the same time +fully exploit hardware acceleration capabilities native to whatever platform +it is running on. This section discusses tradeoffs that application writers +should consider when using ODP. + +First, it should be noted that portability is not an absolute good nor is it a +single-valued attribute (application is portable or is not portable). While +any application can be ported from one platform to another the real question +is: _at what cost?_ Costs can be measured in two dimensions: The level of +effort needed to port, and the resulting performance differences seen due to +the port. Ideally an application should be portable between platforms with +minimal effort and equally minimal performance impact. While ODP is designed +to support this ideal, each application must assess what its goals are in this +area and how best to use ODP to achieve these goals. + +=== Portability and Coexistence +Because ODP offers a programming _framework_ rather than a programming +_environment_, it is designed to be able to work alongside APIs offered by +other frameworks with minimual interference. Therefore when we speak of +portability in an ODP context, we of necessity speak of portability of those +portions of the application that make use of ODP APIs. If an application uses +non-ODP APIs then those must be taken into consideration as well when +assessing the portability of the entire application. For many applications, it +suffices to isolate certain non-portable code to a few areas of the application +with the result that the application is significantly more portable than it +would be without using ODP. Especially when dealing with existing applications +that run in production environments, ODP may well be introduced in an +incremental manner with the result being that the application becomes more +portable only over time. + +=== Source vs. Binary Portability +ODP has been designed to support both source and binary portability. Source +portability is intrinsic to the ODP API specification itself. Any application +written to the ODP API specification will be source portable between any +conforming ODP implementation with at most a recompile. This is because ODP +APIs do not expose implementation details or internal structures that may vary +from platform to platform. + +For platforms that share a common Instruction Set Architecture (ISA), ODP can +also offer binary portability via the specification of an Application Binary +Interface (ABI). This is especially useful in a Network Function +Virtualization (NFV) environment where a data plane application may be +developed and compiled on one platform for distribution and then deployed on +many different platforms by an NFV Orchestrator function. + +=== ODP Application Profiles +To assist in meeting these needs, ODP offers two distinct _application +profiles_ that are designed to characterize the needs of different types of +data plane applications: the _Embedded Profile_ and the _Cloud Profile_. + +==== Embedded Profile +The ODP Embedded Profile is designed to support applications that wish to +target a specific platform and achieve optimal performance on that platform +and where source code portability is sufficient. If such applications need to +support more than one platform then they simply need to be recompiled against +the ODP implementation for that platform. + +Embedded applications will typically work with a copy of ODP downloaded from +a git repository so that it can be configured for the application's precise +needs. To specify that the application wishes to use the embedded profile: + +`./configure --enable-abi-compat=no ...` + +should be used as part of the ODP configuration options. This allows +applications to use inline forms of ODP APIs to give optimal performance +on this platform, and may include additional optimizations that preclude +binary portability to other platforms. The result is a binary that will achieve +maximum performance on a given target platform and that can be ported to +other platforms with a recompile. + +==== Cloud Profile +By contrast, the ODP Cloud Profile is designed to support applications that +wish to be platform-agnostic and be binary compatible across all platforms +sharing this ABI. Any ODP implementation included in a Linux distribution will +be configured for the cloud profile, so no additional action is required on +the part of applications when compiling against a distributed copy of ODP (one +that is installed via `sudo apt-get install` or equivalent command). + +When using a copy of ODP downloaded from a repository, the cloud profile is +selected at configure time: + +`./configure --enable-abi-compat=yes ...` + +Note that `--enable-abi-compat=yes` is the default, so this need not be +specified. Unless `no` is specified for this option, the result will be +applications designed to run in the cloud profile. + +=== ABI Characteristics +An ABI consists of several conventions that ensure that a program compiled +against one ODP implementation can run unchanged on another platform that +has a possibly very different ODP implementation without requiring +recompilation. These include: + +* A set of function calling conventions that define how functions call other +functions, pass parameters, and receive returned results. These are typically +specified by the Operating System (_e.g.,_ Linux) and are independent of ODP. + +* Avoiding the use of inline expansions for any ODP API. This ensures that +differing ODP implementations can maintain their different internals without +these differences being visible to the application. + +* Agreement as to the size and alignment of ODP abstract datatypes used by all +ODP implementations sharing this ABI definition. This means that, for example, +the size of an `odp_packet_t` handle is the same across all members of the +ABI. Since these handles are opaque, it doesn't matter if their structure +differs between ODP implementations since applications never reference these +possibly different internals. + +Note that an ABI definition exists within a specific Instruction Set +Architecture (ISA), such as x86-64 or AArch64. Binaries cannot directly port +between ISAs--that requires a recompilation. + +Each ODP implementation will identify which ABI definition it supports, if any. +When compiling against an ODP implementation in ABI compabitilty mode, the +resulting binary is automatically binary compatible with all other ODP +implementations that share this ABI. For example, for the x86-64 ISA, both +the `odp-linux` and `odp-dpdk` implemtations are a common ABI. + == Shared memory === Allocating shared memory Blocks of shared memory can be created using the `odp_shm_reserve()` API
commit a19875674436996ec38e83371fb0a91427dfe3e0 Author: Bill Fischofer bill.fischofer@linaro.org Date: Tue Jan 10 20:35:49 2017 -0600
doc: userguide: add user documentation for packet references
Signed-off-by: Bill Fischofer bill.fischofer@linaro.org Reviewed-by: Balasubramanian Manoharan bala.manoharan@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/doc/users-guide/users-guide-packet.adoc b/doc/users-guide/users-guide-packet.adoc index e3be23c..d5f2ff1 100644 --- a/doc/users-guide/users-guide-packet.adoc +++ b/doc/users-guide/users-guide-packet.adoc @@ -246,7 +246,7 @@ packet pool as the original packet. The opposite operation is performed by the `odp_packet_concat()` API. This API takes a destination and source packet as arguments and the result is that the source packet is concatenated to the destination packet and ceases to -have any separete identity. Note that it is legal to concatenate a packet to +have any separate identity. Note that it is legal to concatenate a packet to itself, in which case the result is a packet with double the length of the original packet.
@@ -282,3 +282,240 @@ larger than the underlying segment size. The call may also fail if the requested alignment is too high. Alignment limits will vary among different ODP implementations, however ODP requires that all implementations support requested alignments of at least 32 bytes. + +=== Packet References +To support efficient multicast, retransmit, and related processing, ODP +supports two additional types of packet manipulation: static and dynamic +_references_. A reference is a lightweight mechanism for +creating aliases to packets as well as to create packets that share data bytes +with other packets to avoid unnecessary data copying. + +==== Static References +The simplest type of reference is the _static reference_. A static reference is +created by the call: + +[source,c] +----- +ref_pkt = odp_packet_ref_static(pkt); +----- + +If the reference fails, `ODP_PACKET_INVALID` is returned and `pkt` +remains unchanged. + +The effect of this call is shown below: + +.Static Packet Reference +image::refstatic.svg[align="center"] + +A static reference provides a simple and efficient means of creating an alias +for a packet handle that prevents the packet itself from being freed until all +references to it have been released via `odp_packet_free()` calls. This is +useful, for example, to support retransmission processing, since as part of +packet TX processing, `odp_pktout_send()` or `odp_tm_enq()` will free +the packet after it has been transmitted. + +`odp_packet_ref_static()` might be used in a transmit routine wrapper +function like: + +[source,c] +----- +int xmit_pkt(odp_pktout_queue_t queue, odp_packet_t pkt) +{ + odp_packet_t ref = odp_packet_ref_static(pkt); + return ref == ODP_PACKET_INVALID ? -1 : odp_pktout_send(queue, ref, 1); +} +----- + +This transmits a reference to `pkt` so that `pkt` is retained by the caller, +which means that the caller is free to retransmit it if needed at a later +time. When a higher level protocol (_e.g.,_ receipt of a TCP ACK packet) +confirms that the transmission was successful, `pkt` can then be discarded via +an `odp_packet_free()` call. + +The key characteristic of a static reference is that because there are +multiple independent handles that refer to the same packet, the caller should +treat the packet as read only following the creation of a static reference +until all other references to it are freed. This is because all static +references are simply aliases of the same packet, so if multiple threads were +independently manipulating the packet this would lead to unpredictable race +conditions. + +To assist in determining whether there are other references to a packet, ODP +provides the API: + +[source,c] +----- +int odp_packet_has_ref(odp_packet_t pkt); +----- + +that indicates whether other packets exist that share bytes with this +packet. If this routine returns 0 then the caller can be assured that it is +safe to modify it as this handle is the only reference to the packet. + +==== Dynamic References +While static references are convenient and efficient, they are limited by the +need to be treated as read only. For example, consider an application that +needs to _multicast_ a packet. Here the same packet needs to be sent to two or +more different destinations. While the packet payload may be the same, each +sent copy of the packet requires its own unique header to specify the +destination that is to receive the packet. + +To address this need, ODP provides _dynamic references_. These are created +by the call: + +[source,c] +----- +ref_pkt = odp_packet_ref(pkt, offset); +----- + +The `offset` parameter specifies the byte offset into `pkt` at which the +reference is to begin. This must be in the range +0..`odp_packet_len(pkt)`-1. As before, if the reference is unable to be +created `ODP_PACKET_INVALID` is returned and `pkt` is unchanged, otherwise the +result is as shown below: + +.Dynamic Packet Reference +image::ref.svg[align="center"] + +Following a successful reference creation, the bytes of `pkt` beginning at +offset `offset` are shared with the created reference. These bytes should be +treated as read only since multiple references point to them. Each reference, +however still retains its own individual headroom and metadata that is not +shared with any other reference. This allows unique headers to be created by +calling `odp_packet_push_head()` or `odp_packet_extend_head()` on either +handle. This allows multiple references to the same packet to prefix unique +headers onto common shared data it so that they can be properly multicast +using code such as: + +[source,c] +----- +int pkt_fanout(odp_packet_t payload, odp_queue_t fanout_queue[], int num_queues) +{ + int i; + + for (i = 0, i < num_queues, i++) + odp_queue_enq(fanout_queue[i], odp_packet_ref(payload, 0)); +} +----- + +Receiver worker threads can then operate on each reference to the packet in +parallel to prefix a unique transmit header onto it and send it out. + +==== Dynamic References with Headers +The dynamic references discussed so far have one drawback in that the headers +needed to make each reference unique must be constructed individually after +the reference is created. To address this problem, ODP allows these headers +to be created in advance and then simply prefixed to a base packet as part +of reference creation: + +[source,c] +----- +ref_pkt = odp_packet_ref_pkt(pkt, offset, hdr_pkt); +----- + +Here rather than creating a reference with a null header, a _header packet_ +is supplied that is prefixed onto the reference. The result looks like this: + +.Packet Reference using a Header Packet +image::refpktsingle.svg[align="center"] + +So now multicasting can be more efficient using code such as: + +[source,c] +----- +int pkt_fanout_hdr(odp_packet_t payload, odp_queue_q fanout_queue[], + odp_packet_t hdr[], int num_queues) +{ + int i; + + for (i = 0; i < num_queues, i++) + odp_queue_enq(fanout_queue[i], + odp_packet_ref_pkt(payload, 0, hdr[i])); +} +----- + +Now each individual reference has its own header already prefixed to +it ready for transmission. + +Note that when multiple references like this are made they can each have +their own offset. So if the following code is executed: + +[source,c] +----- +ref_pkt1 = odp_packet_ref_pkt(pkt, offset1, hdr_pkt1); +ref_pkt2 = odp_packet_ref_pkt(pkt, offset2, hdr_pkt2); +----- + +the result will look like: + +image::refpkt1.svg[align="center"] +image::refpktmulti.svg[align="center"] +.Multiple Packet References with Different Offsets +image::refpkt2.svg[align="center"] + +Here two separate header packets are prefixed onto the same shared packet, each +at their own specified offset, which may or may not be the same. The result is +three packets visible to the application: + +* The original `pkt`, which can still be accessed and manipulated directly. +* The first reference, which consists of `hdr_pkt1` followed by bytes +contained in `pkt` starting at `offset1`. +* The second reference, which consists of `hdr_pkt2` followed by bytes +contained in `pkt` starting at `offset2`. + +Only a single copy of the bytes in `pkt` that are common to the +references exist. + +===== Data Sharing with References +Because a `pkt` is a shared object when referenced, applications must observe +certain disciplines when working with them. For best portability and +reliability, the shared data contained in any packet referred to by references +should be treated as read only once it has been successfully referenced until +it is known that all references to it have been freed. + +To assist applications in working with references, ODP provides two additional +APIs: + +[source,c] +----- +int odp_packet_has_ref(odp_packet_t pkt); + +uint32_t odp_packet_unshared_len(odp_packet_t pkt); +----- +The `odp_packet_has_ref()` API says whether any other packets +exist that share any bytes with this packet. + +Because references and referenced packets consist of an unshared +prefix, that is modifiable, followed by a shared body that should not be +modified, the `odp_packet_unshared_len()` API is available that operates as +shown here: + +.Packet Reference Lengths +image::reflen.svg[align="center"] + +`odp_packet_unshared_len()` returns the same value as `odp_packet_len()` when +`odp_packet_has_ref()` returns 0, but for packets for which +`odp_packet_has_ref()` returns 1, only returns the number of unshared bytes +prefixed to them. To ensure portability and reliability, only offsets +0..`odp_packet_unshared_len()`-1 should be modified by the caller. + +===== Compound References +Note that architecturally ODP does not limit referencing and so it is possible +that a reference may be used as a basis for creating another reference. The +result is a _compound reference_ that should still behave as any other +reference. + +As noted earlier, the intent behind references is that they are lightweight +objects that can be implemented without requiring data copies. The existence +of compound references may complicate this goal for some implementations. As a +result, implementations are always free to perform partial or full copies of +packets as part of any reference creation call. The +`odp_packet_unshared_len()` API will always provide an authoritative answer to +the question of how many bytes of a packet may safely be modified in any +context, so whether or not copies have been performed applications can be +assured of portability across all conforming ODP implementations. + +Note also that a packet may not reference itself, nor may circular reference +relationships be formed, _e.g.,_ packet A is used as a header for a reference +to packet B and B is used as a header for a reference to packet A. Results +are undefined if such circular references are attempted.
commit 3fee187e801a1a6ac0d7699486451f2bfe30ecd1 Author: Yi He yi.he@linaro.org Date: Wed Jan 11 07:50:33 2017 +0000
linux-gen: add interests query (iquery) scheduler
Add this interests query (iquery) scheduler as an alternate choice of ODP-linux scheduler component for performance optimization especially in lower queue counts use cases.
It includes a new core algorithm, but adopted the ring-based pktio poll algorithm from default scheduler, and still uses the old ordered queue implementation.
Signed-off-by: Yi He yi.he@linaro.org Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index ba47d0b..78a0cf3 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -192,6 +192,7 @@ __LIB__libodp_linux_la_SOURCES = \ odp_schedule.c \ odp_schedule_if.c \ odp_schedule_sp.c \ + odp_schedule_iquery.c \ odp_shared_memory.c \ odp_sorted_list.c \ odp_spinlock.c \ diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4 index bc70c1f..2dcc9a7 100644 --- a/platform/linux-generic/m4/odp_schedule.m4 +++ b/platform/linux-generic/m4/odp_schedule.m4 @@ -4,3 +4,10 @@ AC_ARG_ENABLE([schedule-sp], schedule-sp=yes ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP" fi]) + +AC_ARG_ENABLE([schedule-iquery], + [ --enable-schedule-iquery enable interests query (sparse bitmap) scheduler], + [if test x$enableval = xyes; then + schedule-iquery=yes + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY" + fi]) diff --git a/platform/linux-generic/odp_schedule_if.c b/platform/linux-generic/odp_schedule_if.c index daf6c98..a9ede98 100644 --- a/platform/linux-generic/odp_schedule_if.c +++ b/platform/linux-generic/odp_schedule_if.c @@ -12,9 +12,15 @@ extern const schedule_api_t schedule_sp_api; extern const schedule_fn_t schedule_default_fn; extern const schedule_api_t schedule_default_api;
+extern const schedule_fn_t schedule_iquery_fn; +extern const schedule_api_t schedule_iquery_api; + #ifdef ODP_SCHEDULE_SP const schedule_fn_t *sched_fn = &schedule_sp_fn; const schedule_api_t *sched_api = &schedule_sp_api; +#elif defined(ODP_SCHEDULE_IQUERY) +const schedule_fn_t *sched_fn = &schedule_iquery_fn; +const schedule_api_t *sched_api = &schedule_iquery_api; #else const schedule_fn_t *sched_fn = &schedule_default_fn; const schedule_api_t *sched_api = &schedule_default_api; diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c new file mode 100644 index 0000000..b692457 --- /dev/null +++ b/platform/linux-generic/odp_schedule_iquery.c @@ -0,0 +1,1521 @@ +/* Copyright (c) 2016, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <odp/api/schedule.h> +#include <odp_schedule_if.h> +#include <odp/api/align.h> +#include <odp/api/queue.h> +#include <odp/api/shared_memory.h> +#include <odp_internal.h> +#include <odp_debug_internal.h> +#include <odp_ring_internal.h> +#include <odp_queue_internal.h> +#include <odp_buffer_internal.h> +#include <odp_bitmap_internal.h> +#include <odp/api/thread.h> +#include <odp/api/time.h> +#include <odp/api/rwlock.h> +#include <odp/api/hints.h> +#include <odp/api/cpu.h> +#include <odp/api/thrmask.h> +#include <odp_config_internal.h> + +/* Number of priority levels */ +#define NUM_SCHED_PRIO 8 + +ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (NUM_SCHED_PRIO - 1), + "lowest_prio_does_not_match_with_num_prios"); + +ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && + (ODP_SCHED_PRIO_NORMAL < (NUM_SCHED_PRIO - 1)), + "normal_prio_is_not_between_highest_and_lowest"); + +/* Number of scheduling groups */ +#define NUM_SCHED_GRPS 256 + +/* Start of named groups in group mask arrays */ +#define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1) + +/* Instantiate a WAPL bitmap to be used as queue index bitmap */ +typedef WAPL_BITMAP(ODP_CONFIG_QUEUES) queue_index_bitmap_t; + +typedef struct { + odp_rwlock_t lock; + queue_index_bitmap_t queues; /* queues in this priority level */ +} sched_prio_t; + +typedef struct { + odp_rwlock_t lock; + bool allocated; + odp_thrmask_t threads; /* threads subscribe to this group */ + queue_index_bitmap_t queues; /* queues in this group */ + char name[ODP_SCHED_GROUP_NAME_LEN]; +} sched_group_t; + +/* Packet input poll command queues */ +#define PKTIO_CMD_QUEUES 4 + +/* Maximum number of packet input queues per command */ +#define MAX_PKTIN 16 + +/* Maximum number of packet IO interfaces */ +#define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES + +/* Maximum number of pktio poll commands */ +#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO) + +/* Pktio command is free */ +#define PKTIO_CMD_FREE ((uint32_t)-1) + +/* Packet IO poll queue ring size. In worst case, all pktios + * have all pktins enabled and one poll command is created per + * pktin queue. The ring size must be larger than or equal to + * NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can hold all + * poll commands in the worst case. + */ +#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES) + +/* Mask for wrapping around pktio poll command index */ +#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1) + +/* Maximum number of dequeues */ +#define MAX_DEQ CONFIG_BURST_SIZE + +/* Instantiate a RING data structure as pktio command queue */ +typedef struct { + /* Ring header */ + ring_t ring; + + /* Ring data: pktio poll command indexes */ + uint32_t cmd_index[PKTIO_RING_SIZE]; +} pktio_cmd_queue_t ODP_ALIGNED_CACHE; + +/* Packet IO poll command */ +typedef struct { + int pktio; + int count; + int pktin[MAX_PKTIN]; + uint32_t index; +} pktio_cmd_t; + +/* Collect the pktio poll resources */ +typedef struct { + odp_rwlock_t lock; + /* count active commands per pktio interface */ + int actives[NUM_PKTIO]; + pktio_cmd_t commands[NUM_PKTIO_CMD]; + pktio_cmd_queue_t queues[PKTIO_CMD_QUEUES]; +} pktio_poll_t; + +/* Forward declaration */ +typedef struct sched_thread_local sched_thread_local_t; + +typedef struct { + odp_shm_t selfie; + + /* Schedule priorities */ + sched_prio_t prios[NUM_SCHED_PRIO]; + + /* Schedule groups */ + sched_group_t groups[NUM_SCHED_GRPS]; + + /* Cache queue parameters for easy reference */ + odp_schedule_param_t queues[ODP_CONFIG_QUEUES]; + + /* Poll pktio inputs in spare time */ + pktio_poll_t pktio_poll; + + /* Queues send or unwind their availability indications + * for scheduling, the bool value also serves as a focal + * point for atomic competition. */ + bool availables[ODP_CONFIG_QUEUES]; + + /* Quick reference to per thread context */ + sched_thread_local_t *threads[ODP_THREAD_COUNT_MAX]; +} sched_global_t; + +/* Per thread events cache */ +typedef struct { + int count; + odp_queue_t queue; + odp_event_t stash[MAX_DEQ], *top; +} event_cache_t; + +/* Maximum number of ordered locks per queue */ +#define MAX_ORDERED_LOCKS_PER_QUEUE 2 + +ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS, + "Too_many_ordered_locks"); + +/* Ordered stash size */ +#define MAX_ORDERED_STASH 512 + +/* Storage for stashed enqueue operation arguments */ +typedef struct { + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + queue_entry_t *queue; + int num; +} ordered_stash_t; + +/* Ordered lock states */ +typedef union { + uint8_t u8[CONFIG_QUEUE_MAX_ORD_LOCKS]; + uint32_t all; +} lock_called_t; + +ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), + "Lock_called_values_do_not_fit_in_uint32"); + +/* Instantiate a sparse bitmap to store thread's interested + * queue indexes per priority. + */ +typedef SPARSE_BITMAP(ODP_CONFIG_QUEUES) queue_index_sparse_t; + +struct sched_thread_local { + int thread; + bool pause; + + /* Cache events only for atomic queue */ + event_cache_t cache; + + /* Saved atomic context */ + bool *atomic; + + /* Record the pktio polls have done */ + uint16_t pktin_polls; + + /* Interested queue indexes to be checked by thread + * at each priority level for scheduling, and a round + * robin iterator to improve fairness between queues + * in the same priority level. + */ + odp_rwlock_t lock; + queue_index_sparse_t indexes[NUM_SCHED_PRIO]; + sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO]; + + struct { + queue_entry_t *src_queue; /**< Source queue entry */ + uint64_t ctx; /**< Ordered context id */ + int stash_num; /**< Number of stashed enqueue operations */ + uint8_t in_order; /**< Order status */ + lock_called_t lock_called; /**< States of ordered locks */ + /** Storage for stashed enqueue operations */ + ordered_stash_t stash[MAX_ORDERED_STASH]; + } ordered; +}; + +/* Global scheduler context */ +static sched_global_t *sched; + +/* Thread local scheduler context */ +__thread sched_thread_local_t thread_local; + +static int schedule_init_global(void) +{ + odp_shm_t shm; + int i, k, prio, group; + + ODP_DBG("Schedule[iquery] init ... "); + + shm = odp_shm_reserve("odp_scheduler_iquery", + sizeof(sched_global_t), + ODP_CACHE_LINE_SIZE, 0); + + sched = odp_shm_addr(shm); + + if (sched == NULL) { + ODP_ERR("Schedule[iquery] " + "init: shm reserve.\n"); + return -1; + } + + memset(sched, 0, sizeof(sched_global_t)); + + sched->selfie = shm; + + for (prio = 0; prio < NUM_SCHED_PRIO; prio++) + odp_rwlock_init(&sched->prios[prio].lock); + + for (group = 0; group < NUM_SCHED_GRPS; group++) { + sched->groups[group].allocated = false; + odp_rwlock_init(&sched->groups[group].lock); + } + + odp_rwlock_init(&sched->pktio_poll.lock); + + for (i = 0; i < PKTIO_CMD_QUEUES; i++) { + pktio_cmd_queue_t *queue = + &sched->pktio_poll.queues[i]; + + ring_init(&queue->ring); + + for (k = 0; k < PKTIO_RING_SIZE; k++) + queue->cmd_index[k] = RING_EMPTY; + } + + for (i = 0; i < NUM_PKTIO_CMD; i++) + sched->pktio_poll.commands[i].index = PKTIO_CMD_FREE; + + ODP_DBG("done\n"); + return 0; +} + +static int schedule_term_global(void) +{ + uint32_t i; + odp_shm_t shm = sched->selfie; + + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { + int count = 0; + odp_event_t events[1]; + + if (sched->availables[i]) + count = sched_cb_queue_deq_multi(i, events, 1); + + if (count < 0) + sched_cb_queue_destroy_finalize(i); + else if (count > 0) + ODP_ERR("Queue (%d) not empty\n", i); + } + + memset(sched, 0, sizeof(sched_global_t)); + + if (odp_shm_free(shm) < 0) { + ODP_ERR("Schedule[iquery] " + "term: shm release.\n"); + return -1; + } + return 0; +} + +/* + * These APIs are used to manipulate thread's interests. + */ +static void thread_set_interest(sched_thread_local_t *thread, + unsigned int queue_index, int prio); + +static void thread_clear_interest(sched_thread_local_t *thread, + unsigned int queue_index, int prio); + +static void thread_set_interests(sched_thread_local_t *thread, + queue_index_bitmap_t *set); + +static void thread_clear_interests(sched_thread_local_t *thread, + queue_index_bitmap_t *clear); + +static void sched_thread_local_reset(void) +{ + int prio; + queue_index_sparse_t *index; + sparse_bitmap_iterator_t *iterator; + + memset(&thread_local, 0, sizeof(sched_thread_local_t)); + + thread_local.thread = odp_thread_id(); + thread_local.cache.queue = ODP_QUEUE_INVALID; + + odp_rwlock_init(&thread_local.lock); + + for (prio = 0; prio < NUM_SCHED_PRIO; prio++) { + index = &thread_local.indexes[prio]; + iterator = &thread_local.iterators[prio]; + + sparse_bitmap_zero(index); + sparse_bitmap_iterator(iterator, index); + } +} + +static int schedule_init_local(void) +{ + int group; + sched_group_t *G; + queue_index_bitmap_t collect; + + wapl_bitmap_zero(&collect); + sched_thread_local_reset(); + + /* Collect all queue indexes of the schedule groups + * which this thread has subscribed + */ + for (group = 0; group < NUM_SCHED_GRPS; group++) { + G = &sched->groups[group]; + odp_rwlock_read_lock(&G->lock); + + if ((group < SCHED_GROUP_NAMED || G->allocated) && + odp_thrmask_isset(&G->threads, thread_local.thread)) + wapl_bitmap_or(&collect, &collect, &G->queues); + + odp_rwlock_read_unlock(&G->lock); + } + + /* Distribute the above collected queue indexes into + * thread local interests per priority level. + */ + thread_set_interests(&thread_local, &collect); + + /* "Night gathers, and now my watch begins..." */ + sched->threads[thread_local.thread] = &thread_local; + return 0; +} + +static inline void schedule_release_context(void); + +static int schedule_term_local(void) +{ + int group; + sched_group_t *G; + + if (thread_local.cache.count) { + ODP_ERR("Locally pre-scheduled events exist.\n"); + return -1; + } + + schedule_release_context(); + + /* Unsubscribe all named schedule groups */ + for (group = SCHED_GROUP_NAMED; + group < NUM_SCHED_GRPS; group++) { + G = &sched->groups[group]; + odp_rwlock_write_lock(&G->lock); + + if (G->allocated && odp_thrmask_isset( + &G->threads, thread_local.thread)) + odp_thrmask_clr(&G->threads, thread_local.thread); + + odp_rwlock_write_unlock(&G->lock); + } + + /* "...for this night and all the nights to come." */ + sched->threads[thread_local.thread] = NULL; + sched_thread_local_reset(); + return 0; +} + +static int init_sched_queue(uint32_t queue_index, + const odp_schedule_param_t *sched_param) +{ + int prio, group, thread; + sched_prio_t *P; + sched_group_t *G; + sched_thread_local_t *local; + + prio = sched_param->prio; + group = sched_param->group; + + G = &sched->groups[group]; + odp_rwlock_write_lock(&G->lock); + + /* Named schedule group must be created prior + * to queue creation to this group. + */ + if (group >= SCHED_GROUP_NAMED && !G->allocated) { + odp_rwlock_write_unlock(&G->lock); + return -1; + } + + /* Record the queue in its priority level globally */ + P = &sched->prios[prio]; + + odp_rwlock_write_lock(&P->lock); + wapl_bitmap_set(&P->queues, queue_index); + odp_rwlock_write_unlock(&P->lock); + + /* Record the queue in its schedule group */ + wapl_bitmap_set(&G->queues, queue_index); + + /* Cache queue parameters for easy reference */ + memcpy(&sched->queues[queue_index], + sched_param, sizeof(odp_schedule_param_t)); + + /* Update all threads in this schedule group to + * start check this queue index upon scheduling. + */ + thread = odp_thrmask_first(&G->threads); + while (thread >= 0) { + local = sched->threads[thread]; + thread_set_interest(local, queue_index, prio); + thread = odp_thrmask_next(&G->threads, thread); + } + + odp_rwlock_write_unlock(&G->lock); + return 0; +} + +/* + * Must be called with schedule group's rwlock held. + * This is also being used in destroy_schedule_group() + * to destroy all orphan queues while destroying a whole + * schedule group. + */ +static void __destroy_sched_queue( + sched_group_t *G, uint32_t queue_index) +{ + int prio, thread; + sched_prio_t *P; + sched_thread_local_t *local; + + prio = sched->queues[queue_index].prio; + + /* Forget the queue in its schedule group */ + wapl_bitmap_clear(&G->queues, queue_index); + + /* Forget queue schedule parameters */ + memset(&sched->queues[queue_index], + 0, sizeof(odp_schedule_param_t)); + + /* Update all threads in this schedule group to + * stop check this queue index upon scheduling. + */ + thread = odp_thrmask_first(&G->threads); + while (thread >= 0) { + local = sched->threads[thread]; + thread_clear_interest(local, queue_index, prio); + thread = odp_thrmask_next(&G->threads, thread); + } + + /* Forget the queue in its priority level globally */ + P = &sched->prios[prio]; + + odp_rwlock_write_lock(&P->lock); + wapl_bitmap_clear(&P->queues, queue_index); + odp_rwlock_write_unlock(&P->lock); +} + +static void destroy_sched_queue(uint32_t queue_index) +{ + int group; + sched_group_t *G; + + group = sched->queues[queue_index].group; + + G = &sched->groups[group]; + odp_rwlock_write_lock(&G->lock); + + /* Named schedule group could have been destroyed + * earlier and left these orphan queues. + */ + if (group >= SCHED_GROUP_NAMED && !G->allocated) { + odp_rwlock_write_unlock(&G->lock); + return; + } + + __destroy_sched_queue(G, queue_index); + odp_rwlock_write_unlock(&G->lock); +} + +static int pktio_cmd_queue_hash(int pktio, int pktin) +{ + return (pktio ^ pktin) % PKTIO_CMD_QUEUES; +} + +static inline pktio_cmd_t *alloc_pktio_cmd(void) +{ + int i; + pktio_cmd_t *cmd = NULL; + + odp_rwlock_write_lock(&sched->pktio_poll.lock); + + /* Find next free command */ + for (i = 0; i < NUM_PKTIO_CMD; i++) { + if (sched->pktio_poll.commands[i].index + == PKTIO_CMD_FREE) { + cmd = &sched->pktio_poll.commands[i]; + cmd->index = i; + break; + } + } + + odp_rwlock_write_unlock(&sched->pktio_poll.lock); + return cmd; +} + +static inline void free_pktio_cmd(pktio_cmd_t *cmd) +{ + odp_rwlock_write_lock(&sched->pktio_poll.lock); + + cmd->index = PKTIO_CMD_FREE; + + odp_rwlock_write_unlock(&sched->pktio_poll.lock); +} + +static void schedule_pktio_start(int pktio, int count, int pktin[]) +{ + int i, index; + pktio_cmd_t *cmd; + + if (count > MAX_PKTIN) + ODP_ABORT("Too many input queues for scheduler\n"); + + /* Record the active commands count per pktio interface */ + sched->pktio_poll.actives[pktio] = count; + + /* Create a pktio poll command per pktin */ + for (i = 0; i < count; i++) { + cmd = alloc_pktio_cmd(); + + if (cmd == NULL) + ODP_ABORT("Scheduler out of pktio commands\n"); + + index = pktio_cmd_queue_hash(pktio, pktin[i]); + + cmd->pktio = pktio; + cmd->count = 1; + cmd->pktin[0] = pktin[i]; + ring_enq(&sched->pktio_poll.queues[index].ring, + PKTIO_RING_MASK, cmd->index); + } +} + +static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED) +{ + int remains; + + odp_rwlock_write_lock(&sched->pktio_poll.lock); + + sched->pktio_poll.actives[pktio]--; + remains = sched->pktio_poll.actives[pktio]; + + odp_rwlock_write_unlock(&sched->pktio_poll.lock); + return remains; +} + +#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock) +#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock) + +static inline bool do_schedule_prio(int prio); + +static inline int pop_cache_events(odp_event_t ev[], unsigned int max) +{ + int k = 0; + event_cache_t *cache; + + cache = &thread_local.cache; + while (cache->count && max) { + ev[k] = *cache->top++; + k++; + max--; + cache->count--; + } + + return k; +} + +static inline void assign_queue_handle(odp_queue_t *handle) +{ + if (handle) + *handle = thread_local.cache.queue; +} + +static inline void pktio_poll_input(void) +{ + int i, hash; + uint32_t index; + + ring_t *ring; + pktio_cmd_t *cmd; + + /* + * Each thread starts the search for a poll command + * from the hash(threadID) queue to mitigate contentions. + * If the queue is empty, it moves to other queues. + * + * Most of the times, the search stops on the first + * command found to optimize multi-threaded performance. + * A small portion of polls have to do full iteration to + * avoid packet input starvation when there are less + * threads than command queues. + */ + hash = thread_local.thread % PKTIO_CMD_QUEUES; + + for (i = 0; i < PKTIO_CMD_QUEUES; i++, + hash = (hash + 1) % PKTIO_CMD_QUEUES) { + ring = &sched->pktio_poll.queues[hash].ring; + index = ring_deq(ring, PKTIO_RING_MASK); + + if (odp_unlikely(index == RING_EMPTY)) + continue; + + cmd = &sched->pktio_poll.commands[index]; + + /* Poll packet input */ + if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio, + cmd->count, + cmd->pktin))) { + /* Pktio stopped or closed. Remove poll + * command and call stop_finalize when all + * commands of the pktio has been removed. + */ + if (schedule_pktio_stop(cmd->pktio, + cmd->pktin[0]) == 0) + sched_cb_pktio_stop_finalize(cmd->pktio); + + free_pktio_cmd(cmd); + } else { + /* Continue scheduling the pktio */ + ring_enq(ring, PKTIO_RING_MASK, index); + + /* Do not iterate through all pktin poll + * command queues every time. + */ + if (odp_likely(thread_local.pktin_polls & 0xF)) + break; + } + } + + thread_local.pktin_polls++; +} + +/* + * Schedule queues + */ +static int do_schedule(odp_queue_t *out_queue, + odp_event_t out_ev[], unsigned int max_num) +{ + int prio, count; + + /* Consume locally cached events */ + count = pop_cache_events(out_ev, max_num); + if (count > 0) { + assign_queue_handle(out_queue); + return count; + } + + schedule_release_context(); + + if (odp_unlikely(thread_local.pause)) + return count; + + DO_SCHED_LOCK(); + /* Schedule events */ + for (prio = 0; prio < NUM_SCHED_PRIO; prio++) { + /* Round robin iterate the interested queue + * indexes in this priority level to compete + * and consume available queues + */ + if (!do_schedule_prio(prio)) + continue; + + count = pop_cache_events(out_ev, max_num); + assign_queue_handle(out_queue); + DO_SCHED_UNLOCK(); + return count; + } + + DO_SCHED_UNLOCK(); + + /* Poll packet input when there are no events */ + pktio_poll_input(); + return 0; +} + +static int schedule_loop(odp_queue_t *out_queue, uint64_t wait, + odp_event_t out_ev[], unsigned int max_num) +{ + int count, first = 1; + odp_time_t next, wtime; + + while (1) { + count = do_schedule(out_queue, out_ev, max_num); + + if (count) + break; + + if (wait == ODP_SCHED_WAIT) + continue; + + if (wait == ODP_SCHED_NO_WAIT) + break; + + if (first) { + wtime = odp_time_local_from_ns(wait); + next = odp_time_sum(odp_time_local(), wtime); + first = 0; + continue; + } + + if (odp_time_cmp(next, odp_time_local()) < 0) + break; + } + + return count; +} + +static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait) +{ + odp_event_t ev; + + ev = ODP_EVENT_INVALID; + + schedule_loop(out_queue, wait, &ev, 1); + + return ev; +} + +static int schedule_multi(odp_queue_t *out_queue, uint64_t wait, + odp_event_t events[], int num) +{ + return schedule_loop(out_queue, wait, events, num); +} + +static void schedule_pause(void) +{ + thread_local.pause = 1; +} + +static void schedule_resume(void) +{ + thread_local.pause = 0; +} + +static uint64_t schedule_wait_time(uint64_t ns) +{ + return ns; +} + +static int number_of_priorites(void) +{ + return NUM_SCHED_PRIO; +} + +/* + * Create a named schedule group with pre-defined + * set of subscription threads. + * + * Sched queues belonging to this group must be + * created after the group creation. Upon creation + * the group holds 0 sched queues. + */ +static odp_schedule_group_t schedule_group_create( + const char *name, const odp_thrmask_t *mask) +{ + int group; + sched_group_t *G; + + for (group = SCHED_GROUP_NAMED; + group < NUM_SCHED_GRPS; group++) { + G = &sched->groups[group]; + + odp_rwlock_write_lock(&G->lock); + if (!G->allocated) { + strncpy(G->name, name ? name : "", + ODP_SCHED_GROUP_NAME_LEN - 1); + odp_thrmask_copy(&G->threads, mask); + wapl_bitmap_zero(&G->queues); + + G->allocated = true; + odp_rwlock_write_unlock(&G->lock); + return (odp_schedule_group_t)group; + } + odp_rwlock_write_unlock(&G->lock); + } + + return ODP_SCHED_GROUP_INVALID; +} + +static inline void __destroy_group_queues(sched_group_t *group) +{ + unsigned int index; + queue_index_bitmap_t queues; + wapl_bitmap_iterator_t it; + + /* Constructor */ + wapl_bitmap_zero(&queues); + wapl_bitmap_copy(&queues, &group->queues); + wapl_bitmap_iterator(&it, &queues); + + /* Walk through the queue index bitmap */ + for (it.start(&it); it.has_next(&it);) { + index = it.next(&it); + __destroy_sched_queue(group, index); + } +} + +/* + * Destroy a named schedule group. + */ +static int schedule_group_destroy(odp_schedule_group_t group) +{ + int done = -1; + sched_group_t *G; + + if (group < SCHED_GROUP_NAMED || + group >= NUM_SCHED_GRPS) + return -1; + + G = &sched->groups[group]; + odp_rwlock_write_lock(&G->lock); + + if (G->allocated) { + /* Destroy all queues in this schedule group + * and leave no orphan queues. + */ + __destroy_group_queues(G); + + done = 0; + G->allocated = false; + wapl_bitmap_zero(&G->queues); + odp_thrmask_zero(&G->threads); + memset(G->name, 0, ODP_SCHED_GROUP_NAME_LEN); + } + + odp_rwlock_write_unlock(&G->lock); + return done; +} + +static odp_schedule_group_t schedule_group_lookup(const char *name) +{ + int group; + sched_group_t *G; + + for (group = SCHED_GROUP_NAMED; + group < NUM_SCHED_GRPS; group++) { + G = &sched->groups[group]; + + odp_rwlock_read_lock(&G->lock); + if (strcmp(name, G->name) == 0) { + odp_rwlock_read_unlock(&G->lock); + return (odp_schedule_group_t)group; + } + odp_rwlock_read_unlock(&G->lock); + } + + return ODP_SCHED_GROUP_INVALID; +} + +static int schedule_group_join(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + int done = -1, thread; + sched_group_t *G; + sched_thread_local_t *local; + + /* Named schedule group only */ + if (group < SCHED_GROUP_NAMED || + group >= NUM_SCHED_GRPS) + return done; + + G = &sched->groups[group]; + odp_rwlock_write_lock(&G->lock); + + if (G->allocated) { + /* Make new joined threads to start check + * queue indexes in this schedule group + */ + thread = odp_thrmask_first(mask); + while (thread >= 0) { + local = sched->threads[thread]; + thread_set_interests(local, &G->queues); + + odp_thrmask_set(&G->threads, thread); + thread = odp_thrmask_next(mask, thread); + } + done = 0; + } + + odp_rwlock_write_unlock(&G->lock); + return done; +} + +static int schedule_group_leave(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + int done = -1, thread; + sched_group_t *G; + sched_thread_local_t *local; + + /* Named schedule group only */ + if (group < SCHED_GROUP_NAMED || + group >= NUM_SCHED_GRPS) + return done; + + G = &sched->groups[group]; + odp_rwlock_write_lock(&G->lock); + + if (G->allocated) { + /* Make leaving threads to stop check + * queue indexes in this schedule group + */ + thread = odp_thrmask_first(mask); + while (thread >= 0) { + local = sched->threads[thread]; + thread_clear_interests(local, &G->queues); + + odp_thrmask_clr(&G->threads, thread); + thread = odp_thrmask_next(mask, thread); + } + done = 0; + } + + odp_rwlock_write_unlock(&G->lock); + return done; +} + +static int schedule_group_thrmask(odp_schedule_group_t group, + odp_thrmask_t *thrmask) +{ + int done = -1; + sched_group_t *G; + + /* Named schedule group only */ + if (group < SCHED_GROUP_NAMED || + group >= NUM_SCHED_GRPS) + return done; + + G = &sched->groups[group]; + odp_rwlock_read_lock(&G->lock); + + if (G->allocated && thrmask != NULL) { + done = 0; + odp_thrmask_copy(thrmask, &G->threads); + } + + odp_rwlock_read_unlock(&G->lock); + return done; +} + +static int schedule_group_info(odp_schedule_group_t group, + odp_schedule_group_info_t *info) +{ + int done = -1; + sched_group_t *G; + + /* Named schedule group only */ + if (group < SCHED_GROUP_NAMED || + group >= NUM_SCHED_GRPS) + return done; + + G = &sched->groups[group]; + odp_rwlock_read_lock(&G->lock); + + if (G->allocated && info != NULL) { + done = 0; + info->name = G->name; + odp_thrmask_copy(&info->thrmask, &G->threads); + } + + odp_rwlock_read_unlock(&G->lock); + return done; +} + +/* This function is a no-op */ +static void schedule_prefetch(int num ODP_UNUSED) +{ +} + +/* + * Limited to join and leave pre-defined schedule groups + * before and after thread local initialization or termination. + */ +static int group_add_thread(odp_schedule_group_t group, int thread) +{ + sched_group_t *G; + + if (group < 0 || group >= SCHED_GROUP_NAMED) + return -1; + + G = &sched->groups[group]; + + odp_rwlock_write_lock(&G->lock); + odp_thrmask_set(&G->threads, thread); + odp_rwlock_write_unlock(&G->lock); + return 0; +} + +static int group_remove_thread(odp_schedule_group_t group, int thread) +{ + sched_group_t *G; + + if (group < 0 || group >= SCHED_GROUP_NAMED) + return -1; + + G = &sched->groups[group]; + + odp_rwlock_write_lock(&G->lock); + odp_thrmask_clr(&G->threads, thread); + odp_rwlock_write_unlock(&G->lock); + return 0; +} + +static int number_of_groups(void) +{ + return NUM_SCHED_GRPS; +} + +static int schedule_sched_queue(uint32_t queue_index) +{ + /* Set available indications globally */ + sched->availables[queue_index] = true; + return 0; +} + +static int schedule_unsched_queue(uint32_t queue_index) +{ + /* Clear available indications globally */ + sched->availables[queue_index] = false; + return 0; +} + +static void schedule_release_atomic(void) +{ + unsigned int queue_index; + + if ((thread_local.atomic != NULL) && + (thread_local.cache.count == 0)) { + queue_index = thread_local.atomic - sched->availables; + thread_local.atomic = NULL; + sched->availables[queue_index] = true; + } +} + +static inline int ordered_own_turn(queue_entry_t *queue) +{ + uint64_t ctx; + + ctx = odp_atomic_load_acq_u64(&queue->s.ordered.ctx); + + return ctx == thread_local.ordered.ctx; +} + +static inline void wait_for_order(queue_entry_t *queue) +{ + /* Busy loop to synchronize ordered processing */ + while (1) { + if (ordered_own_turn(queue)) + break; + odp_cpu_pause(); + } +} + +/** + * Perform stashed enqueue operations + * + * Should be called only when already in order. + */ +static inline void ordered_stash_release(void) +{ + int i; + + for (i = 0; i < thread_local.ordered.stash_num; i++) { + queue_entry_t *queue; + odp_buffer_hdr_t **buf_hdr; + int num; + + queue = thread_local.ordered.stash[i].queue; + buf_hdr = thread_local.ordered.stash[i].buf_hdr; + num = thread_local.ordered.stash[i].num; + + queue_enq_multi(queue, buf_hdr, num); + } + thread_local.ordered.stash_num = 0; +} + +static inline void release_ordered(void) +{ + unsigned i; + queue_entry_t *queue; + + queue = thread_local.ordered.src_queue; + + wait_for_order(queue); + + /* Release all ordered locks */ + for (i = 0; i < queue->s.param.sched.lock_count; i++) { + if (!thread_local.ordered.lock_called.u8[i]) + odp_atomic_store_rel_u64(&queue->s.ordered.lock[i], + thread_local.ordered.ctx + 1); + } + + thread_local.ordered.lock_called.all = 0; + thread_local.ordered.src_queue = NULL; + thread_local.ordered.in_order = 0; + + ordered_stash_release(); + + /* Next thread can continue processing */ + odp_atomic_add_rel_u64(&queue->s.ordered.ctx, 1); +} + +static void schedule_release_ordered(void) +{ + queue_entry_t *queue; + + queue = thread_local.ordered.src_queue; + + if (odp_unlikely(!queue || thread_local.cache.count)) + return; + + release_ordered(); +} + +static inline void schedule_release_context(void) +{ + if (thread_local.ordered.src_queue != NULL) + release_ordered(); + else + schedule_release_atomic(); +} + +static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[], + int num, int *ret) +{ + int i; + uint32_t stash_num = thread_local.ordered.stash_num; + queue_entry_t *dst_queue = get_qentry(queue_index); + queue_entry_t *src_queue = thread_local.ordered.src_queue; + + if (!thread_local.ordered.src_queue || thread_local.ordered.in_order) + return 0; + + if (ordered_own_turn(src_queue)) { + /* Own turn, so can do enqueue directly. */ + thread_local.ordered.in_order = 1; + ordered_stash_release(); + return 0; + } + + if (odp_unlikely(stash_num >= MAX_ORDERED_STASH)) { + /* If the local stash is full, wait until it is our turn and + * then release the stash and do enqueue directly. */ + wait_for_order(src_queue); + + thread_local.ordered.in_order = 1; + + ordered_stash_release(); + return 0; + } + + thread_local.ordered.stash[stash_num].queue = dst_queue; + thread_local.ordered.stash[stash_num].num = num; + for (i = 0; i < num; i++) + thread_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i]; + + thread_local.ordered.stash_num++; + + *ret = num; + return 1; +} + +static void order_lock(void) +{ + queue_entry_t *queue; + + queue = thread_local.ordered.src_queue; + + if (!queue) + return; + + wait_for_order(queue); +} + +static void order_unlock(void) +{ +} + +static void schedule_order_lock(unsigned lock_index) +{ + odp_atomic_u64_t *ord_lock; + queue_entry_t *queue; + + queue = thread_local.ordered.src_queue; + + ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count && + !thread_local.ordered.lock_called.u8[lock_index]); + + ord_lock = &queue->s.ordered.lock[lock_index]; + + /* Busy loop to synchronize ordered processing */ + while (1) { + uint64_t lock_seq; + + lock_seq = odp_atomic_load_acq_u64(ord_lock); + + if (lock_seq == thread_local.ordered.ctx) { + thread_local.ordered.lock_called.u8[lock_index] = 1; + return; + } + odp_cpu_pause(); + } +} + +static void schedule_order_unlock(unsigned lock_index) +{ + odp_atomic_u64_t *ord_lock; + queue_entry_t *queue; + + queue = thread_local.ordered.src_queue; + + ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count); + + ord_lock = &queue->s.ordered.lock[lock_index]; + + ODP_ASSERT(thread_local.ordered.ctx == odp_atomic_load_u64(ord_lock)); + + odp_atomic_store_rel_u64(ord_lock, thread_local.ordered.ctx + 1); +} + +static unsigned schedule_max_ordered_locks(void) +{ + return MAX_ORDERED_LOCKS_PER_QUEUE; +} + +static void schedule_save_context(queue_entry_t *queue) +{ + if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC) { + thread_local.atomic = &sched->availables[queue->s.index]; + } else if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) { + uint64_t ctx; + odp_atomic_u64_t *next_ctx; + + next_ctx = &queue->s.ordered.next_ctx; + ctx = odp_atomic_fetch_inc_u64(next_ctx); + + thread_local.ordered.ctx = ctx; + thread_local.ordered.src_queue = queue; + } +} + +/* Fill in scheduler interface */ +const schedule_fn_t schedule_iquery_fn = { + .pktio_start = schedule_pktio_start, + .thr_add = group_add_thread, + .thr_rem = group_remove_thread, + .num_grps = number_of_groups, + .init_queue = init_sched_queue, + .destroy_queue = destroy_sched_queue, + .sched_queue = schedule_sched_queue, + .unsched_queue = schedule_unsched_queue, + .ord_enq_multi = schedule_ord_enq_multi, + .init_global = schedule_init_global, + .term_global = schedule_term_global, + .init_local = schedule_init_local, + .term_local = schedule_term_local, + .order_lock = order_lock, + .order_unlock = order_unlock, + .max_ordered_locks = schedule_max_ordered_locks, + .save_context = schedule_save_context, +}; + +/* Fill in scheduler API calls */ +const schedule_api_t schedule_iquery_api = { + .schedule_wait_time = schedule_wait_time, + .schedule = schedule, + .schedule_multi = schedule_multi, + .schedule_pause = schedule_pause, + .schedule_resume = schedule_resume, + .schedule_release_atomic = schedule_release_atomic, + .schedule_release_ordered = schedule_release_ordered, + .schedule_prefetch = schedule_prefetch, + .schedule_num_prio = number_of_priorites, + .schedule_group_create = schedule_group_create, + .schedule_group_destroy = schedule_group_destroy, + .schedule_group_lookup = schedule_group_lookup, + .schedule_group_join = schedule_group_join, + .schedule_group_leave = schedule_group_leave, + .schedule_group_thrmask = schedule_group_thrmask, + .schedule_group_info = schedule_group_info, + .schedule_order_lock = schedule_order_lock, + .schedule_order_unlock = schedule_order_unlock +}; + +static void thread_set_interest(sched_thread_local_t *thread, + unsigned int queue_index, int prio) +{ + queue_index_sparse_t *index; + + if (thread == NULL) + return; + + if (prio >= NUM_SCHED_PRIO) + return; + + index = &thread->indexes[prio]; + + odp_rwlock_write_lock(&thread->lock); + sparse_bitmap_set(index, queue_index); + odp_rwlock_write_unlock(&thread->lock); +} + +static void thread_clear_interest(sched_thread_local_t *thread, + unsigned int queue_index, int prio) +{ + queue_index_sparse_t *index; + + if (thread == NULL) + return; + + if (prio >= NUM_SCHED_PRIO) + return; + + index = &thread->indexes[prio]; + + odp_rwlock_write_lock(&thread->lock); + sparse_bitmap_clear(index, queue_index); + odp_rwlock_write_unlock(&thread->lock); +} + +static void thread_set_interests(sched_thread_local_t *thread, + queue_index_bitmap_t *set) +{ + int prio; + sched_prio_t *P; + unsigned int queue_index; + queue_index_bitmap_t subset; + wapl_bitmap_iterator_t it; + + if (thread == NULL || set == NULL) + return; + + for (prio = 0; prio < NUM_SCHED_PRIO; prio++) { + P = &sched->prios[prio]; + odp_rwlock_read_lock(&P->lock); + + /* The collection of queue indexes in 'set' + * may belong to several priority levels. + */ + wapl_bitmap_zero(&subset); + wapl_bitmap_and(&subset, &P->queues, set); + + odp_rwlock_read_unlock(&P->lock); + + /* Add the subset to local indexes */ + wapl_bitmap_iterator(&it, &subset); + for (it.start(&it); it.has_next(&it);) { + queue_index = it.next(&it); + thread_set_interest(thread, queue_index, prio); + } + } +} + +static void thread_clear_interests(sched_thread_local_t *thread, + queue_index_bitmap_t *clear) +{ + int prio; + sched_prio_t *P; + unsigned int queue_index; + queue_index_bitmap_t subset; + wapl_bitmap_iterator_t it; + + if (thread == NULL || clear == NULL) + return; + + for (prio = 0; prio < NUM_SCHED_PRIO; prio++) { + P = &sched->prios[prio]; + odp_rwlock_read_lock(&P->lock); + + /* The collection of queue indexes in 'clear' + * may belong to several priority levels. + */ + wapl_bitmap_zero(&subset); + wapl_bitmap_and(&subset, &P->queues, clear); + + odp_rwlock_read_unlock(&P->lock); + + /* Remove the subset from local indexes */ + wapl_bitmap_iterator(&it, &subset); + for (it.start(&it); it.has_next(&it);) { + queue_index = it.next(&it); + thread_clear_interest(thread, queue_index, prio); + } + } +} + +static inline bool is_atomic_queue(unsigned int queue_index) +{ + return (sched->queues[queue_index].sync + == ODP_SCHED_SYNC_ATOMIC); +} + +static inline bool is_ordered_queue(unsigned int queue_index) +{ + return (sched->queues[queue_index].sync + == ODP_SCHED_SYNC_ORDERED); +} + +static inline bool compete_atomic_queue(unsigned int queue_index) +{ + bool expected = sched->availables[queue_index]; + + if (expected && is_atomic_queue(queue_index)) { + expected = __atomic_compare_exchange_n( + &sched->availables[queue_index], + &expected, false, 0, + __ATOMIC_RELEASE, __ATOMIC_RELAXED); + } + + return expected; +} + +static inline int consume_queue(int prio, unsigned int queue_index) +{ + int count; + unsigned int max = MAX_DEQ; + event_cache_t *cache = &thread_local.cache; + + /* Low priorities have smaller batch size to limit + * head of line blocking latency. + */ + if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT)) + max = MAX_DEQ / 2; + + /* For ordered queues we want consecutive events to + * be dispatched to separate threads, so do not cache + * them locally. + */ + if (is_ordered_queue(queue_index)) + max = 1; + + count = sched_cb_queue_deq_multi( + queue_index, cache->stash, max); + + if (count < 0) { + DO_SCHED_UNLOCK(); + sched_cb_queue_destroy_finalize(queue_index); + DO_SCHED_LOCK(); + return 0; + } + + if (count == 0) + return 0; + + cache->top = &cache->stash[0]; + cache->count = count; + cache->queue = sched_cb_queue_handle(queue_index); + return count; +} + +static inline bool do_schedule_prio(int prio) +{ + int nbits, next, end; + unsigned int queue_index; + sparse_bitmap_iterator_t *it; + + it = &thread_local.iterators[prio]; + nbits = (int)*it->_base.last; + + /* No interests at all! */ + if (nbits <= 0) + return false; + + /* In critical path, cannot afford iterator calls, + * do it manually with internal knowledge + */ + it->_start = (it->_start + 1) % nbits; + end = it->_start + nbits; + + for (next = it->_start; next < end; next++) { + queue_index = it->_base.il[next % nbits]; + + if (!compete_atomic_queue(queue_index)) + continue; + + if (!consume_queue(prio, queue_index)) + continue; + + return true; + } + + return false; +}
commit 6ec758bd738b86c48b5ae7ba01759aeb95c18e01 Author: Yi He yi.he@linaro.org Date: Wed Jan 11 07:50:32 2017 +0000
linux-gen: add generic bitmaps and iterators
Add C++ template alike bitmap<size> to allow instantiate bitmap data structure of any size, and provide iterators to help walking through the bitmap objects.
Signed-off-by: Yi He yi.he@linaro.org Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index bfcbb5a..ba47d0b 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -111,6 +111,7 @@ noinst_HEADERS = \ ${srcdir}/include/odp_align_internal.h \ ${srcdir}/include/odp_atomic_internal.h \ ${srcdir}/include/odp_buffer_inlines.h \ + ${srcdir}/include/odp_bitmap_internal.h \ ${srcdir}/include/odp_buffer_internal.h \ ${srcdir}/include/odp_classification_datamodel.h \ ${srcdir}/include/odp_classification_inlines.h \ @@ -154,6 +155,7 @@ __LIB__libodp_linux_la_SOURCES = \ _ishmphy.c \ odp_atomic.c \ odp_barrier.c \ + odp_bitmap.c \ odp_buffer.c \ odp_byteorder.c \ odp_classification.c \ diff --git a/platform/linux-generic/include/odp_bitmap_internal.h b/platform/linux-generic/include/odp_bitmap_internal.h new file mode 100644 index 0000000..1be4d02 --- /dev/null +++ b/platform/linux-generic/include/odp_bitmap_internal.h @@ -0,0 +1,317 @@ +/* Copyright (c) 2016, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +/** + * @file + * + * ODP generic bitmap types and operations. + */ + +#ifndef ODP_BITMAP_INTERNAL_H_ +#define ODP_BITMAP_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <odp/api/hints.h> + +/* Generate unique identifier for instantiated class */ +#define TOKENIZE(template, line) \ + template ## _ ## line ## _ ## __COUNTER__ + +/* Array size in general */ +#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) + +#define BITS_PER_BYTE (8) +#define BITS_PER_LONG __WORDSIZE +#define BYTES_PER_LONG (BITS_PER_LONG / BITS_PER_BYTE) + +#define BIT_WORD(nr) ((nr) / BITS_PER_LONG) +#define BITS_TO_LONGS(nr) BIT_WORD(nr + BITS_PER_LONG - 1) + +#define BITMAP_FIRST_WORD_MASK(start) \ + (~0UL << ((start) & (BITS_PER_LONG - 1))) +#define BITMAP_LAST_WORD_MASK(nbits) \ + (~0UL >> (-(nbits) & (BITS_PER_LONG - 1))) + +/* WAPL bitmap base class */ +typedef struct { + unsigned int nwords; + unsigned int *pl; + unsigned long *ul; +} wapl_bitmap_t; + +/* + * Word-Aligned Position List (WAPL) bitmap, which actually + * is not a compression, but with an extra list of non-empty + * word positions. + * + * WAPL accelerates bitwise operations and iterations by + * applying only to non-empty positions instead of walking + * through the whole bitmap. + * + * WAPL uses [1 ~ N] instead of [0 ~ N - 1] as position + * values and an extra 0 as end indicator for position list. + * This is the reason to allocate one extra room below. + */ +#define instantiate_wapl_bitmap(line, nbits) \ + struct TOKENIZE(wapl_bitmap, line) { \ + unsigned int pl[BITS_TO_LONGS(nbits) + 1]; \ + unsigned long ul[BITS_TO_LONGS(nbits) + 1]; \ + } + +#define WAPL_BITMAP(nbits) instantiate_wapl_bitmap(__LINE__, nbits) + +/* + * Upcast any derived WAPL bitmap class to its base class + */ +#define __wapl_upcast(base, derived) \ + do { \ + __typeof__(derived) p = derived; \ + base.pl = p->pl; \ + base.ul = p->ul; \ + base.nwords = ARRAY_SIZE(p->ul) - 1; \ + } while (0) + +/* + * WAPL base class bitmap operations + */ +void __wapl_bitmap_and(wapl_bitmap_t *dst, + wapl_bitmap_t *src, wapl_bitmap_t *and); + +void __wapl_bitmap_or(wapl_bitmap_t *dst, wapl_bitmap_t *or); + +void __wapl_bitmap_set(wapl_bitmap_t *map, unsigned int bit); + +void __wapl_bitmap_clear(wapl_bitmap_t *map, unsigned int bit); + +/* + * Generic WAPL bitmap operations + */ +#define wapl_bitmap_zero(map) \ + ({ \ + __typeof__(map) p = map; \ + memset((void *)p, 0, sizeof(__typeof__(*p))); \ + }) + +#define wapl_bitmap_copy(dst, src) \ + ({ \ + __typeof__(dst) d = dst; \ + __typeof__(src) s = src; \ + if (d != s) \ + memcpy((void *)d, (void *)s, \ + sizeof(__typeof__(*d))); \ + }) + +#define wapl_bitmap_and(dst, src, and) \ + ({ \ + wapl_bitmap_t d, s, a; \ + __wapl_upcast(d, dst); \ + __wapl_upcast(s, src); \ + __wapl_upcast(a, and); \ + __wapl_bitmap_and(&d, &s, &a); \ + }) + +#define wapl_bitmap_or(dst, src, or) \ + ({ \ + wapl_bitmap_t d, o; \ + wapl_bitmap_copy(dst, src); \ + __wapl_upcast(d, dst); \ + __wapl_upcast(o, or); \ + __wapl_bitmap_or(&d, &o); \ + }) + +#define wapl_bitmap_set(map, bit) \ + ({ \ + wapl_bitmap_t b; \ + __wapl_upcast(b, map); \ + __wapl_bitmap_set(&b, bit); \ + }) + +#define wapl_bitmap_clear(map, bit) \ + ({ \ + wapl_bitmap_t b; \ + __wapl_upcast(b, map); \ + __wapl_bitmap_clear(&b, bit); \ + }) + +/* + * Round robin iterator runs upon a WAPL bitmap: + * + * wapl_bitmap_iterator(iterator, WAPL bitmap); + * for (iterator->start(); iterator->has_next(); ) { + * unsigned int bit_index = iterator->next(); + * ...operations on this bit index... + * } + */ +typedef struct wapl_bitmap_iterator { + int _start, _next, _nbits; + wapl_bitmap_t _base; + + void (*start)(struct wapl_bitmap_iterator *this); + bool (*has_next)(struct wapl_bitmap_iterator *this); + unsigned int (*next)(struct wapl_bitmap_iterator *this); +} wapl_bitmap_iterator_t; + +/* + * WAPL bitmap iterator constructor + */ +void __wapl_bitmap_iterator(wapl_bitmap_iterator_t *this); + +/* + * Generic constructor accepts any derived WAPL bitmap class + */ +#define wapl_bitmap_iterator(iterator, map) \ + ({ \ + __typeof__(iterator) __it = iterator; \ + __wapl_upcast(__it->_base, map); \ + __wapl_bitmap_iterator(__it); \ + }) + +/* Sparse bitmap base class */ +typedef struct { + unsigned int nbits; + unsigned int *last, *pl, *il; +} sparse_bitmap_t; + +/* + * Sparse bitmap, lists all bit indexes directly as an array. + * Expected to be significantly straightforward iteration. + */ +#define instantiate_sparse_bitmap(line, nbits) \ + struct TOKENIZE(sparse_bitmap, line) { \ + unsigned int last; \ + unsigned int pl[nbits]; \ + unsigned int il[nbits]; \ + } + +#define SPARSE_BITMAP(nbits) instantiate_sparse_bitmap(__LINE__, nbits) + +/* + * Upcast any derived sparse bitmap class to its base class + */ +#define __sparse_upcast(base, derived) \ + do { \ + __typeof__(derived) p = derived; \ + base.pl = p->pl; \ + base.il = p->il; \ + base.last = &p->last; \ + base.nbits = ARRAY_SIZE(p->il); \ + } while (0) + +/* + * Sparse base class bitmap operations + */ +void __sparse_bitmap_set(sparse_bitmap_t *map, unsigned int bit); + +void __sparse_bitmap_clear(sparse_bitmap_t *map, unsigned int bit); + +/* + * Generic sparse bitmap operations + */ +#define sparse_bitmap_zero(map) \ + ({ \ + __typeof__(map) p = map; \ + memset((void *)p, 0, sizeof(__typeof__(*p))); \ + }) + +#define sparse_bitmap_set(map, bit) \ + ({ \ + sparse_bitmap_t b; \ + __sparse_upcast(b, map); \ + __sparse_bitmap_set(&b, bit); \ + }) + +#define sparse_bitmap_clear(map, bit) \ + ({ \ + sparse_bitmap_t b; \ + __sparse_upcast(b, map); \ + __sparse_bitmap_clear(&b, bit); \ + }) + +/* + * Round robin iterator runs upon a sparse bitmap: + * + * sparse_bitmap_iterator(iterator, SPARSE bitmap); + * for (iterator->start(); iterator->has_next(); ) { + * unsigned int bit_index = iterator->next(); + * ...operations on this bit index... + * } + */ +typedef struct sparse_bitmap_iterator { + int _start, _next, _nbits; + sparse_bitmap_t _base; + + void (*start)(struct sparse_bitmap_iterator *this); + bool (*has_next)(struct sparse_bitmap_iterator *this); + unsigned int (*next)(struct sparse_bitmap_iterator *this); +} sparse_bitmap_iterator_t; + +/* + * Sparse bitmap iterator constructor + */ +void __sparse_bitmap_iterator(sparse_bitmap_iterator_t *this); + +/* + * Generic constructor accepts any derived sparse bitmap class. + */ +#define sparse_bitmap_iterator(iterator, map) \ + ({ \ + __typeof__(iterator) __it = iterator; \ + __sparse_upcast(__it->_base, map); \ + __sparse_bitmap_iterator(__it); \ + }) + +/* + * Raw bitmap atomic set and clear. + */ +void raw_bitmap_set(unsigned long *map, unsigned int bit); + +void raw_bitmap_clear(unsigned long *map, unsigned int bit); + +/* + * It will enter infinite loop incase that all bits are zero, + * so please make sure the bitmap at least has one set. + */ +static inline int __bitmap_wraparound_next( + unsigned long *addr, unsigned int nbits, int start) +{ + unsigned long tmp; + + if (start >= (int)nbits) + start = 0; + + tmp = addr[BIT_WORD(start)]; + + /* Handle 1st word. */ + tmp &= BITMAP_FIRST_WORD_MASK(start); + start = start & ~(BITS_PER_LONG - 1); + + while (!tmp) { + start += BITS_PER_LONG; + if (start >= (int)nbits) + start = 0; + + tmp = addr[BIT_WORD(start)]; + } + + start += __builtin_ffsl(tmp) - 1; + return start; +} + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/odp_bitmap.c b/platform/linux-generic/odp_bitmap.c new file mode 100644 index 0000000..a29b9ef --- /dev/null +++ b/platform/linux-generic/odp_bitmap.c @@ -0,0 +1,315 @@ +/* Copyright (c) 2016, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <string.h> +#include <unistd.h> +#include <odp/api/std_types.h> +#include <odp/api/byteorder.h> +#include <odp_bitmap_internal.h> + +/* + * WAPL base class bitmap operations + */ +static inline void __wapl_add_pos( + wapl_bitmap_t *map, unsigned int p) +{ + unsigned int s, k = 0; + unsigned int *pl = map->pl; + + while (pl[k] && p > pl[k]) + k++; + + if (p == pl[k]) + return; + + /* sorted insertion */ + for (; pl[k] && p < pl[k]; k++) { + s = pl[k]; + pl[k] = p; + p = s; + } + + if (k < map->nwords) + pl[k++] = p; + + pl[k] = 0; +} + +static inline void __wapl_remove_pos( + wapl_bitmap_t *map, unsigned int p) +{ + unsigned int k = 0; + unsigned int *pl = map->pl; + + while (pl[k] && p != pl[k]) + k++; + + for (; pl[k]; k++) + pl[k] = pl[k + 1]; +} + +void __wapl_bitmap_and(wapl_bitmap_t *dst, + wapl_bitmap_t *src, wapl_bitmap_t *and) +{ + unsigned int k = 0, p; + unsigned int *pl = src->pl; + + while ((p = *pl++) != 0) { + dst->ul[p] = src->ul[p] & and->ul[p]; + if (dst->ul[p]) + dst->pl[k++] = p; + } + + dst->pl[k] = 0; +} + +void __wapl_bitmap_or(wapl_bitmap_t *dst, wapl_bitmap_t *or) +{ + unsigned int p; + unsigned int *pl = or->pl; + + while ((p = *pl++) != 0) { + if (dst->ul[p] == 0) + __wapl_add_pos(dst, p); + + dst->ul[p] |= or->ul[p]; + } +} + +void __wapl_bitmap_set(wapl_bitmap_t *map, unsigned int bit) +{ + unsigned int p = BIT_WORD(bit) + 1; + unsigned long set = 1UL << (bit & (BITS_PER_LONG - 1)); + + if (p > map->nwords) + return; + + if (map->ul[p] == 0) + __wapl_add_pos(map, p); + + map->ul[p] |= set; +} + +void __wapl_bitmap_clear(wapl_bitmap_t *map, unsigned int bit) +{ + unsigned int p = BIT_WORD(bit) + 1; + unsigned long clear = 1UL << (bit & (BITS_PER_LONG - 1)); + + if (p > map->nwords) + return; + + map->ul[p] &= ~clear; + + if (map->ul[p] == 0) + __wapl_remove_pos(map, p); +} + +/* + * WAPL bitmap iterator implementation + */ +static void __wapl_iterator_start(wapl_bitmap_iterator_t *this) +{ + this->_nbits = this->_base.nwords * BITS_PER_LONG; + + /* Advance to next queue index to start this + * new round iteration. + */ + if (this->_base.pl[0] == 0) + this->_start = -1; + else + this->_start = __bitmap_wraparound_next( + &this->_base.ul[1], this->_nbits, this->_start + 1); + + this->_next = this->_start; +} + +static bool __wapl_iterator_has_next(wapl_bitmap_iterator_t *this) +{ + return (this->_next != -1); +} + +static unsigned int __wapl_iterator_next(wapl_bitmap_iterator_t *this) +{ + int next = this->_next; + + this->_next = __bitmap_wraparound_next( + &this->_base.ul[1], this->_nbits, this->_next + 1); + + if (this->_next == this->_start) + this->_next = -1; + + return next; +} + +void __wapl_bitmap_iterator(wapl_bitmap_iterator_t *this) +{ + this->start = __wapl_iterator_start; + this->has_next = __wapl_iterator_has_next; + this->next = __wapl_iterator_next; + + this->_start = -1; + this->_next = this->_start; +} + +/* + * Sparse base class bitmap operations + */ +void __sparse_bitmap_set(sparse_bitmap_t *map, unsigned int bit) +{ + unsigned int last = *map->last; + + /* Index exceeds */ + if (bit >= map->nbits) + return; + + /* Full bitmap */ + if (last >= map->nbits) + return; + + /* Bit was not set previously, + * also record where we set the bit + */ + if (!map->pl[bit]) { + map->il[last++] = bit; + map->pl[bit] = last; + + *map->last = last; + } +} + +void __sparse_bitmap_clear(sparse_bitmap_t *map, unsigned int bit) +{ + unsigned int p, i; + unsigned int last = *map->last; + + /* Index exceeds */ + if (bit >= map->nbits) + return; + + /* Empty bitmap */ + if (last == 0) + return; + + /* Bit was set previously */ + if (map->pl[bit]) { + p = map->pl[bit] - 1; + map->pl[bit] = 0; + + last--; + *map->last = last; + + /* Fill the hole with the latest index */ + if (p < last) { + i = map->il[last]; + map->pl[i] = p + 1; + map->il[p] = i; + } + } +} + +/* + * Sparse bitmap iterator implementation + */ +static void __sparse_iterator_start(sparse_bitmap_iterator_t *this) +{ + this->_nbits = (int)*this->_base.last; + + /* Advance to next queue index to start this + * new round iteration. + */ + if (this->_nbits == 0) + this->_start = -1; + else + this->_start = (this->_start + 1) & (this->_nbits - 1); + + this->_next = this->_start; +} + +static bool __sparse_iterator_has_next(sparse_bitmap_iterator_t *this) +{ + return (this->_next != -1); +} + +static unsigned int __sparse_iterator_next(sparse_bitmap_iterator_t *this) +{ + int next = this->_next; + + this->_next = (this->_next + 1) & (this->_nbits - 1); + if (this->_next == this->_start) + this->_next = -1; + + return this->_base.il[next]; +} + +void __sparse_bitmap_iterator(sparse_bitmap_iterator_t *this) +{ + this->start = __sparse_iterator_start; + this->has_next = __sparse_iterator_has_next; + this->next = __sparse_iterator_next; + + this->_start = -1; + this->_next = this->_start; +} + +/* + * Generic byte-width atomic set/clear + */ +static inline void atomic_byte_set( + unsigned char *addr, unsigned int bit) +{ + unsigned char load, store; + unsigned char set = 1 << (bit & (BITS_PER_BYTE - 1)); + + do { + load = *addr; + store = load | set; + } while (!__atomic_compare_exchange_n(addr, &load, store, + 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); +} + +static inline void atomic_byte_clear( + unsigned char *addr, unsigned int bit) +{ + unsigned char load, store; + unsigned char clear = 1 << (bit & (BITS_PER_BYTE - 1)); + + do { + load = *addr; + store = load & ~clear; + } while (!__atomic_compare_exchange_n(addr, &load, store, + 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); +} + +static inline unsigned char *__bit_byte( + unsigned long *word, unsigned int bit) +{ + unsigned int i; + unsigned char *b; + + b = (unsigned char *)word; + + i = bit & (BITS_PER_LONG - 1); + i = i / BITS_PER_BYTE; + +#if (ODP_BYTE_ORDER == ODP_BIG_ENDIAN) + i = BYTES_PER_LONG - 1 - i; +#endif + return &b[i]; +} + +void raw_bitmap_set(unsigned long *map, unsigned int bit) +{ + unsigned long *p = map + BIT_WORD(bit); + + atomic_byte_set(__bit_byte(p, bit), bit); +} + +void raw_bitmap_clear(unsigned long *map, unsigned int bit) +{ + unsigned long *p = map + BIT_WORD(bit); + + atomic_byte_clear(__bit_byte(p, bit), bit); +}
commit 3e399ed202522790efa552666aa04dbfd9e06da4 Author: Yi He yi.he@linaro.org Date: Wed Jan 11 07:50:31 2017 +0000
linux-gen: sched: add unsched_queue callback
Add this unsched_queue callback to indicate queue's ineligible for scheduling.
Signed-off-by: Yi He yi.he@linaro.org Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index c0aee42..530d157 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -25,6 +25,7 @@ typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index, ); typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index); typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index); +typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_index); typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index, void *buf_hdr[], int num, int *ret); typedef int (*schedule_init_global_fn_t)(void); @@ -44,6 +45,7 @@ typedef struct schedule_fn_t { schedule_init_queue_fn_t init_queue; schedule_destroy_queue_fn_t destroy_queue; schedule_sched_queue_fn_t sched_queue; + schedule_unsched_queue_fn_t unsched_queue; schedule_ord_enq_multi_fn_t ord_enq_multi; schedule_init_global_fn_t init_global; schedule_term_global_fn_t term_global; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 5efdbde..fcf4bf5 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -522,8 +522,10 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
if (hdr == NULL) { /* Already empty queue */ - if (queue->s.status == QUEUE_STATUS_SCHED) + if (queue->s.status == QUEUE_STATUS_SCHED) { queue->s.status = QUEUE_STATUS_NOTSCHED; + sched_fn->unsched_queue(queue->s.index); + }
UNLOCK(&queue->s.lock); return 0; diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index c878d8b..cd5bf21 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -1200,6 +1200,11 @@ static int schedule_sched_queue(uint32_t queue_index) return 0; }
+static int schedule_unsched_queue(uint32_t queue_index ODP_UNUSED) +{ + return 0; +} + static int schedule_num_grps(void) { return NUM_SCHED_GRPS; @@ -1218,6 +1223,7 @@ const schedule_fn_t schedule_default_fn = { .init_queue = schedule_init_queue, .destroy_queue = schedule_destroy_queue, .sched_queue = schedule_sched_queue, + .unsched_queue = schedule_unsched_queue, .ord_enq_multi = schedule_ord_enq_multi, .init_global = schedule_init_global, .term_global = schedule_term_global, diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 2c1adf6..0fd4d87 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -409,6 +409,11 @@ static int sched_queue(uint32_t qi) return 0; }
+static int unsched_queue(uint32_t qi ODP_UNUSED) +{ + return 0; +} + static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num, int *ret) { @@ -835,6 +840,7 @@ const schedule_fn_t schedule_sp_fn = { .init_queue = init_queue, .destroy_queue = destroy_queue, .sched_queue = sched_queue, + .unsched_queue = unsched_queue, .ord_enq_multi = ord_enq_multi, .init_global = init_global, .term_global = term_global,
commit 31498a7a8bd19de593f6b862cb39d484b99c9bf8 Author: Yi He yi.he@linaro.org Date: Wed Jan 11 07:50:30 2017 +0000
linux-gen: sched: solve ordered context inversion
For ordered queue, a thread consumes events (dequeue) and acquires its unique sequential context in two steps, non atomic and preemptable.
This leads to potential ordered context inversion in case the thread consumes prior events acquired subsequent context, while the thread consumes subsequent events but acquired prior context.
This patch insert the ordered context acquisition into event dequeue operation to make these two steps atomic.
Signed-off-by: Yi He yi.he@linaro.org Reviewed-and-tested-by: Bill Fischofer bill.fischofer@linaro.org Signed-off-by: Maxim Uvarov maxim.uvarov@linaro.org
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 6c2b050..c0aee42 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -12,6 +12,7 @@ extern "C" { #endif
#include <odp/api/queue.h> +#include <odp_queue_internal.h> #include <odp/api/schedule.h>
typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue, @@ -33,6 +34,7 @@ typedef int (*schedule_term_local_fn_t)(void); typedef void (*schedule_order_lock_fn_t)(void); typedef void (*schedule_order_unlock_fn_t)(void); typedef unsigned (*schedule_max_ordered_locks_fn_t)(void); +typedef void (*schedule_save_context_fn_t)(queue_entry_t *queue);
typedef struct schedule_fn_t { schedule_pktio_start_fn_t pktio_start; @@ -50,6 +52,7 @@ typedef struct schedule_fn_t { schedule_order_lock_fn_t order_lock; schedule_order_unlock_fn_t order_unlock; schedule_max_ordered_locks_fn_t max_ordered_locks; + schedule_save_context_fn_t save_context; } schedule_fn_t;
/* Interface towards the scheduler */ diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 3975405..5efdbde 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -565,6 +565,9 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], if (hdr == NULL) queue->s.tail = NULL;
+ if (queue->s.type == ODP_QUEUE_TYPE_SCHED) + sched_fn->save_context(queue); + UNLOCK(&queue->s.lock);
return i; diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index e14d145..c878d8b 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -1205,6 +1205,10 @@ static int schedule_num_grps(void) return NUM_SCHED_GRPS; }
+static void schedule_save_context(queue_entry_t *queue ODP_UNUSED) +{ +} + /* Fill in scheduler interface */ const schedule_fn_t schedule_default_fn = { .pktio_start = schedule_pktio_start, @@ -1221,7 +1225,8 @@ const schedule_fn_t schedule_default_fn = { .term_local = schedule_term_local, .order_lock = order_lock, .order_unlock = order_unlock, - .max_ordered_locks = schedule_max_ordered_locks + .max_ordered_locks = schedule_max_ordered_locks, + .save_context = schedule_save_context };
/* Fill in scheduler API calls */ diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index bdfb6af..2c1adf6 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -822,6 +822,10 @@ static void order_unlock(void) { }
+static void save_context(queue_entry_t *queue ODP_UNUSED) +{ +} + /* Fill in scheduler interface */ const schedule_fn_t schedule_sp_fn = { .pktio_start = pktio_start, @@ -838,7 +842,8 @@ const schedule_fn_t schedule_sp_fn = { .term_local = term_local, .order_lock = order_lock, .order_unlock = order_unlock, - .max_ordered_locks = max_ordered_locks + .max_ordered_locks = max_ordered_locks, + .save_context = save_context };
/* Fill in scheduler API calls */
-----------------------------------------------------------------------
Summary of changes: doc/users-guide/users-guide-packet.adoc | 239 ++- doc/users-guide/users-guide.adoc | 120 ++ include/odp/api/spec/byteorder.h | 4 - platform/linux-generic/Makefile.am | 3 + .../include/odp/api/plat/byteorder_types.h | 3 - .../linux-generic/include/odp_bitmap_internal.h | 317 ++++ platform/linux-generic/include/odp_schedule_if.h | 5 + platform/linux-generic/m4/odp_schedule.m4 | 9 +- platform/linux-generic/odp_bitmap.c | 315 ++++ platform/linux-generic/odp_queue.c | 7 +- platform/linux-generic/odp_schedule.c | 13 +- platform/linux-generic/odp_schedule_if.c | 6 + platform/linux-generic/odp_schedule_iquery.c | 1521 ++++++++++++++++++++ platform/linux-generic/odp_schedule_sp.c | 13 +- 14 files changed, 2563 insertions(+), 12 deletions(-) create mode 100644 platform/linux-generic/include/odp_bitmap_internal.h create mode 100644 platform/linux-generic/odp_bitmap.c create mode 100644 platform/linux-generic/odp_schedule_iquery.c
hooks/post-receive