This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "".
The branch, master has been updated via b921639a9df6a0300ca1c50b5853a37241806366 (commit) via 6f367faae189144beeaafcd9850065b3f1b1098e (commit) via 4afa7da09edf07126573940fffed0784ef3080ff (commit) via 5db9a7a1c59d4eda436d896dce41a2c7aa8f79c4 (commit) via b89a5b77e92060c9343ab162526d05a8ee97f8b4 (commit) via fc07575511c3a5b79ccae80ca278b3daad4f101d (commit) via ff99ba52f99a608dc1a41f4d8a86ab5c2f8491ca (commit) via 426ee8c61835c8faa5df0873ecc4ec7c8667c691 (commit) via 559aec0db739c624652facd3b337d0430cba8c61 (commit) via 720d655c9161007b561bf033a4fbe0b31ab7fb6a (commit) from 1a8b32c758894b0d0a71ebbd86efb77fd9eb334c (commit)
Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below.
- Log ----------------------------------------------------------------- commit b921639a9df6a0300ca1c50b5853a37241806366 Author: Matias Elo matias.elo@nokia.com Date: Mon Oct 28 17:39:52 2019 +0200
linux-gen: ipc: remove ipc_data_offset from buffer header
Instead of storing IPC packet data offset in odp_buffer_hdr_t, calculate the offset on receive using odp_packet_hdr_t.seg_data pointer and remote pool base address.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 1c52b028f..6c6ec970d 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -68,10 +68,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { /* User area pointer */ void *uarea_addr;
- /* ipc mapped process can not walk over pointers, - * offset has to be used */ - uint64_t ipc_data_offset; - /* Combined pool and buffer index */ buffer_index_t index;
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index 7e3f95fb6..7575eb052 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -35,8 +35,9 @@ */ struct pktio_info { struct { + /* Pool base address */ + void *base_addr; /* number of buffer*/ - int num; char pool_name[ODP_POOL_NAME_LEN]; /* 1 if master finished creation of all shared objects */ int init_done; @@ -46,6 +47,7 @@ struct pktio_info { uint32_t ring_mask; } master; struct { + /* Pool base address */ void *base_addr; char pool_name[ODP_POOL_NAME_LEN]; /* pid of the slave process written to shm and @@ -78,8 +80,10 @@ typedef struct { /* local cache to keep packet order right */ ring_ptr_t *cache; } rx; /* slave */ - void *pool_base; /**< Remote pool base addr */ - void *pool_mdata_base; /**< Remote pool mdata base addr */ + /* Remote pool mdata base addr */ + void *pool_mdata_base; + /* Remote pool base address for offset calculation */ + void *remote_base_addr; odp_pool_t pool; /**< Pool of main process */ enum { PKTIO_TYPE_IPC_MASTER = 0, /**< Master is the process which @@ -212,7 +216,7 @@ static int _ipc_master_start(pktio_entry_t *pktio_entry) }
pktio_ipc->remote_pool_shm = shm; - pktio_ipc->pool_base = odp_shm_addr(shm); + pktio_ipc->remote_base_addr = pinfo->slave.base_addr; pktio_ipc->pool_mdata_base = (char *)odp_shm_addr(shm);
odp_atomic_store_u32(&pktio_ipc->ready, 1); @@ -328,6 +332,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, /* Export ring info for the slave process to use */ pinfo->master.ring_size = ring_size; pinfo->master.ring_mask = ring_mask; + pinfo->master.base_addr = odp_shm_addr(pool->shm);
pinfo->slave.base_addr = 0; pinfo->slave.pid = 0; @@ -362,7 +367,7 @@ static void _ipc_export_pool(struct pktio_info *pinfo, snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s", _ipc_odp_buffer_pool_shm_name(pool_hdl)); pinfo->slave.pid = odp_global_ro.main_pid; - pinfo->slave.base_addr = pool->base_addr; + pinfo->slave.base_addr = odp_shm_addr(pool->shm); }
static odp_shm_t _ipc_map_remote_pool(const char *name, int pid) @@ -494,6 +499,7 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) pid); pktio_ipc->remote_pool_shm = shm; pktio_ipc->pool_mdata_base = (char *)odp_shm_addr(shm); + pktio_ipc->remote_base_addr = pinfo->master.base_addr;
_ipc_export_pool(pinfo, pktio_ipc->pool);
@@ -674,7 +680,8 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, if (odp_unlikely(pool == ODP_POOL_INVALID)) ODP_ABORT("invalid pool");
- data_pool_off = phdr->buf_hdr.ipc_data_offset; + data_pool_off = (uint8_t *)phdr->seg_data - + (uint8_t *)pktio_ipc->remote_base_addr;
pkt = odp_packet_alloc(pool, phdr->frame_len); if (odp_unlikely(pkt == ODP_PACKET_INVALID)) { @@ -803,7 +810,6 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
/* Set offset to phdr for outgoing packets */ for (i = 0; i < num; i++) { - uint64_t data_pool_off; odp_packet_t pkt = pkt_table_mapped[i]; odp_packet_hdr_t *pkt_hdr = packet_hdr(pkt); odp_pool_t pool_hdl = odp_packet_pool(pkt); @@ -811,17 +817,15 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry,
offsets[i] = (uint8_t *)pkt_hdr - (uint8_t *)odp_shm_addr(pool->shm); - data_pool_off = (uint8_t *)pkt_hdr->seg_data - - (uint8_t *)odp_shm_addr(pool->shm);
/* compile all function code even if ipc disabled with config */ - pkt_hdr->buf_hdr.ipc_data_offset = data_pool_off; IPC_ODP_DBG("%d/%d send packet %llx, pool %llx," - "phdr = %p, offset %x sendoff %x, addr %llx iaddr %llx\n", + "phdr = %p, offset %x, sendoff %x, addr %llx iaddr %llx\n", i, num, odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl), - pkt_hdr, pkt_hdr->buf_hdr.ipc_data_offset, - offsets[i], odp_shm_addr(pool->shm), + pkt_hdr, (uint8_t *)pkt_hdr->seg_data - + (uint8_t *)odp_shm_addr(pool->shm), offsets[i], + odp_shm_addr(pool->shm), odp_shm_addr(ipc_pool->shm)); }
commit 6f367faae189144beeaafcd9850065b3f1b1098e Author: Matias Elo matias.elo@nokia.com Date: Mon Oct 28 15:01:47 2019 +0200
linux-gen: ipc: add IPC_BURST_SIZE define for IPC internal burst size
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index 572d02e74..7e3f95fb6 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -27,6 +27,9 @@ ODP_DBG(fmt, ##__VA_ARGS__);\ } while (0)
+/* Burst size for IPC free operations */ +#define IPC_BURST_SIZE 32 + /* that struct is exported to shared memory, so that processes can find * each other. */ @@ -237,7 +240,12 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry,
/* Ring must be able to store all packets in the pool */ ring_size = ROUNDUP_POWER2_U32(pool->num + 1); + + /* Ring size has to larger than burst size */ + if (ring_size <= IPC_BURST_SIZE) + ring_size = ROUNDUP_POWER2_U32(IPC_BURST_SIZE + 1); ring_mask = ring_size - 1; + pktio_ipc->ring_size = ring_size; pktio_ipc->ring_mask = ring_mask;
@@ -579,9 +587,7 @@ static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, ring_ptr_t *r, uint32_t r_mask) { pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); - uint32_t ring_size = pktio_ipc->ring_size; - uint32_t ring_mask = pktio_ipc->ring_mask; - uintptr_t offsets[ring_size]; + uintptr_t offsets[IPC_BURST_SIZE]; int ret; void **rbuf_p; int i; @@ -597,7 +603,7 @@ static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, ring_ptr_t *r, rbuf_p = (void *)&offsets;
while (1) { - ret = ring_ptr_deq_multi(r, r_mask, rbuf_p, ring_mask); + ret = ring_ptr_deq_multi(r, r_mask, rbuf_p, IPC_BURST_SIZE); if (ret <= 0) break; for (i = 0; i < ret; i++) { @@ -616,13 +622,12 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[], int len) { pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); - uint32_t ring_size = pktio_ipc->ring_size; uint32_t ring_mask = pktio_ipc->ring_mask; int pkts = 0; int i; ring_ptr_t *r; ring_ptr_t *r_p; - uintptr_t offsets[ring_size]; + uintptr_t offsets[len]; void **ipcbufs_p = (void *)&offsets[0]; uint32_t ready;
commit 4afa7da09edf07126573940fffed0784ef3080ff Author: Matias Elo matias.elo@nokia.com Date: Mon Oct 28 10:52:38 2019 +0200
linux-gen: ipc: clean excessive pkt_priv() usage
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index 065f70cc7..572d02e74 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -193,7 +193,8 @@ static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl)
static int _ipc_master_start(pktio_entry_t *pktio_entry) { - struct pktio_info *pinfo = pkt_priv(pktio_entry)->pinfo; + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + struct pktio_info *pinfo = pktio_ipc->pinfo; odp_shm_t shm;
if (pinfo->slave.init_done == 0) @@ -207,11 +208,11 @@ static int _ipc_master_start(pktio_entry_t *pktio_entry) return -1; }
- pkt_priv(pktio_entry)->remote_pool_shm = shm; - pkt_priv(pktio_entry)->pool_base = odp_shm_addr(shm); - pkt_priv(pktio_entry)->pool_mdata_base = (char *)odp_shm_addr(shm); + pktio_ipc->remote_pool_shm = shm; + pktio_ipc->pool_base = odp_shm_addr(shm); + pktio_ipc->pool_mdata_base = (char *)odp_shm_addr(shm);
- odp_atomic_store_u32(&pkt_priv(pktio_entry)->ready, 1); + odp_atomic_store_u32(&pktio_ipc->ready, 1);
IPC_ODP_DBG("%s started.\n", pktio_entry->s.name); return 0; @@ -221,6 +222,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, const char *dev, odp_pool_t pool_hdl) { + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")]; struct pktio_info *pinfo; const char *pool_name; @@ -236,17 +238,16 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, /* Ring must be able to store all packets in the pool */ ring_size = ROUNDUP_POWER2_U32(pool->num + 1); ring_mask = ring_size - 1; - pkt_priv(pktio_entry)->ring_size = ring_size; - pkt_priv(pktio_entry)->ring_mask = ring_mask; + pktio_ipc->ring_size = ring_size; + pktio_ipc->ring_mask = ring_mask;
if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) { ODP_ERR("too big ipc name\n"); return -1; }
- pkt_priv(pktio_entry)->rx.cache = _ring_create("ipc_rx_cache", - ring_size, 0); - if (!pkt_priv(pktio_entry)->rx.cache) { + pktio_ipc->rx.cache = _ring_create("ipc_rx_cache", ring_size, 0); + if (!pktio_ipc->rx.cache) { ODP_ERR("pid %d unable to create ipc rx cache\n", getpid()); return -1; } @@ -255,66 +256,58 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, * to be processed packets ring. */ snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev); - pkt_priv(pktio_entry)->tx.send = _ring_create(ipc_shm_name, ring_size, - ODP_SHM_PROC | - ODP_SHM_EXPORT); - if (!pkt_priv(pktio_entry)->tx.send) { + pktio_ipc->tx.send = _ring_create(ipc_shm_name, ring_size, + ODP_SHM_PROC | ODP_SHM_EXPORT); + if (!pktio_ipc->tx.send) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); return -1; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->tx.send, ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->tx.send, ring_mask), + _ring_free_count(pktio_ipc->tx.send, ring_mask));
/* generate name in shm like ipc_pktio_p for * already processed packets */ snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); - pkt_priv(pktio_entry)->tx.free = _ring_create(ipc_shm_name, ring_size, - ODP_SHM_PROC | - ODP_SHM_EXPORT); - if (!pkt_priv(pktio_entry)->tx.free) { + pktio_ipc->tx.free = _ring_create(ipc_shm_name, ring_size, + ODP_SHM_PROC | ODP_SHM_EXPORT); + if (!pktio_ipc->tx.free) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_m_prod; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->tx.free, ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->tx.free, ring_mask), + _ring_free_count(pktio_ipc->tx.free, ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); - pkt_priv(pktio_entry)->rx.recv = _ring_create(ipc_shm_name, ring_size, - ODP_SHM_PROC | - ODP_SHM_EXPORT); - if (!pkt_priv(pktio_entry)->rx.recv) { + pktio_ipc->rx.recv = _ring_create(ipc_shm_name, ring_size, + ODP_SHM_PROC | ODP_SHM_EXPORT); + if (!pktio_ipc->rx.recv) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_m_cons; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->rx.recv, ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->rx.recv, ring_mask), + _ring_free_count(pktio_ipc->rx.recv, ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); - pkt_priv(pktio_entry)->rx.free = _ring_create(ipc_shm_name, ring_size, - ODP_SHM_PROC | - ODP_SHM_EXPORT); - if (!pkt_priv(pktio_entry)->rx.free) { + pktio_ipc->rx.free = _ring_create(ipc_shm_name, ring_size, + ODP_SHM_PROC | ODP_SHM_EXPORT); + if (!pktio_ipc->rx.free) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_s_prod; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->rx.free, ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->rx.free, ring_mask), + _ring_free_count(pktio_ipc->rx.free, ring_mask));
/* Set up pool name for remote info */ - pinfo = pkt_priv(pktio_entry)->pinfo; + pinfo = pktio_ipc->pinfo; pool_name = _ipc_odp_buffer_pool_shm_name(pool_hdl); if (strlen(pool_name) > ODP_POOL_NAME_LEN) { ODP_ERR("pid %d ipc pool name %s is too big %d\n", @@ -332,7 +325,7 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, pinfo->slave.pid = 0; pinfo->slave.init_done = 0;
- pkt_priv(pktio_entry)->pool = pool_hdl; + pktio_ipc->pool = pool_hdl;
ODP_DBG("Pre init... DONE.\n"); pinfo->master.init_done = 1; @@ -426,13 +419,14 @@ static int _ipc_init_slave(const char *dev, pktio_entry_t *pktio_entry,
static int _ipc_slave_start(pktio_entry_t *pktio_entry) { + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_slave_r")]; struct pktio_info *pinfo; odp_shm_t shm; char tail[ODP_POOL_NAME_LEN]; char dev[ODP_POOL_NAME_LEN]; int pid; - uint32_t ring_mask = pkt_priv(pktio_entry)->ring_mask; + uint32_t ring_mask = pktio_ipc->ring_mask;
if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) { ODP_ERR("wrong pktio name\n"); @@ -442,68 +436,60 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) sprintf(dev, "ipc:%s", tail);
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev); - pkt_priv(pktio_entry)->rx.recv = _ipc_shm_map(ipc_shm_name, pid); - if (!pkt_priv(pktio_entry)->rx.recv) { + pktio_ipc->rx.recv = _ipc_shm_map(ipc_shm_name, pid); + if (!pktio_ipc->rx.recv) { ODP_DBG("pid %d unable to find ipc ring %s name\n", getpid(), dev); sleep(1); return -1; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->rx.recv, - ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->rx.recv, ring_mask), + _ring_free_count(pktio_ipc->rx.recv, ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); - pkt_priv(pktio_entry)->rx.free = _ipc_shm_map(ipc_shm_name, pid); - if (!pkt_priv(pktio_entry)->rx.free) { + pktio_ipc->rx.free = _ipc_shm_map(ipc_shm_name, pid); + if (!pktio_ipc->rx.free) { ODP_ERR("pid %d unable to find ipc ring %s name\n", getpid(), dev); goto free_m_prod; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->rx.free, - ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->rx.free, ring_mask), + _ring_free_count(pktio_ipc->rx.free, ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); - pkt_priv(pktio_entry)->tx.send = _ipc_shm_map(ipc_shm_name, pid); - if (!pkt_priv(pktio_entry)->tx.send) { + pktio_ipc->tx.send = _ipc_shm_map(ipc_shm_name, pid); + if (!pktio_ipc->tx.send) { ODP_ERR("pid %d unable to find ipc ring %s name\n", getpid(), dev); goto free_m_cons; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->tx.send, - ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->tx.send, ring_mask), + _ring_free_count(pktio_ipc->tx.send, ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); - pkt_priv(pktio_entry)->tx.free = _ipc_shm_map(ipc_shm_name, pid); - if (!pkt_priv(pktio_entry)->tx.free) { + pktio_ipc->tx.free = _ipc_shm_map(ipc_shm_name, pid); + if (!pktio_ipc->tx.free) { ODP_ERR("pid %d unable to find ipc ring %s name\n", getpid(), dev); goto free_s_prod; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free, - ring_mask), - _ring_free_count(pkt_priv(pktio_entry)->tx.free, - ring_mask)); + ipc_shm_name, _ring_count(pktio_ipc->tx.free, ring_mask), + _ring_free_count(pktio_ipc->tx.free, ring_mask));
/* Get info about remote pool */ - pinfo = pkt_priv(pktio_entry)->pinfo; + pinfo = pktio_ipc->pinfo; shm = _ipc_map_remote_pool(pinfo->master.pool_name, pid); - pkt_priv(pktio_entry)->remote_pool_shm = shm; - pkt_priv(pktio_entry)->pool_mdata_base = (char *)odp_shm_addr(shm); + pktio_ipc->remote_pool_shm = shm; + pktio_ipc->pool_mdata_base = (char *)odp_shm_addr(shm);
- _ipc_export_pool(pinfo, pkt_priv(pktio_entry)->pool); + _ipc_export_pool(pinfo, pktio_ipc->pool);
- odp_atomic_store_u32(&pkt_priv(pktio_entry)->ready, 1); + odp_atomic_store_u32(&pktio_ipc->ready, 1); pinfo->slave.init_done = 1;
ODP_DBG("%s started.\n", pktio_entry->s.name); @@ -871,14 +857,15 @@ static int ipc_mac_addr_get(pktio_entry_t *pktio_entry ODP_UNUSED,
static int ipc_start(pktio_entry_t *pktio_entry) { - uint32_t ready = odp_atomic_load_u32(&pkt_priv(pktio_entry)->ready); + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + uint32_t ready = odp_atomic_load_u32(&pktio_ipc->ready);
if (ready) { ODP_ABORT("%s Already started\n", pktio_entry->s.name); return -1; }
- if (pkt_priv(pktio_entry)->type == PKTIO_TYPE_IPC_MASTER) + if (pktio_ipc->type == PKTIO_TYPE_IPC_MASTER) return _ipc_master_start(pktio_entry); else return _ipc_slave_start(pktio_entry); @@ -906,6 +893,7 @@ static int ipc_stop(pktio_entry_t *pktio_entry)
static int ipc_close(pktio_entry_t *pktio_entry) { + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")]; char *dev = pktio_entry->s.name; char name[ODP_POOL_NAME_LEN]; @@ -914,7 +902,7 @@ static int ipc_close(pktio_entry_t *pktio_entry)
ipc_stop(pktio_entry);
- odp_shm_free(pkt_priv(pktio_entry)->remote_pool_shm); + odp_shm_free(pktio_ipc->remote_pool_shm);
if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) snprintf(name, sizeof(name), "ipc:%s", tail); @@ -922,7 +910,7 @@ static int ipc_close(pktio_entry_t *pktio_entry) snprintf(name, sizeof(name), "%s", dev);
/* unlink this pktio info for both master and slave */ - odp_shm_free(pkt_priv(pktio_entry)->pinfo_shm); + odp_shm_free(pktio_ipc->pinfo_shm);
/* destroy rings */ snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", name);
commit 5db9a7a1c59d4eda436d896dce41a2c7aa8f79c4 Author: Matias Elo matias.elo@nokia.com Date: Mon Oct 28 10:40:42 2019 +0200
linux-gen: ipc: match ring size to used packet pools
The ring_ptr_t implementation requires that ring size must be larger than the maximum number of data items that will be stored on it. Adjust ring size according to the used packet pools to guarantee this.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index dec918e95..065f70cc7 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -27,11 +27,6 @@ ODP_DBG(fmt, ##__VA_ARGS__);\ } while (0)
-/* number of odp buffers in odp ring queue */ -#define PKTIO_IPC_ENTRIES 4096 - -#define PKTIO_IPC_ENTRY_MASK (PKTIO_IPC_ENTRIES - 1) - /* that struct is exported to shared memory, so that processes can find * each other. */ @@ -42,6 +37,10 @@ struct pktio_info { char pool_name[ODP_POOL_NAME_LEN]; /* 1 if master finished creation of all shared objects */ int init_done; + /* IPC ring size */ + uint32_t ring_size; + /* IPC ring mask */ + uint32_t ring_mask; } master; struct { void *base_addr; @@ -87,7 +86,11 @@ typedef struct { } type; /**< define if it's master or slave process */ odp_atomic_u32_t ready; /**< 1 - pktio is ready and can recv/send packet, 0 - not yet ready */ - void *pinfo; + /* Local copy of IPC ring size */ + uint32_t ring_size; + /* Local copy IPC ring mask */ + uint32_t ring_mask; + struct pktio_info *pinfo; odp_shm_t pinfo_shm; odp_shm_t remote_pool_shm; /**< shm of remote pool get with _ipc_map_remote_pool() */ @@ -221,18 +224,38 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")]; struct pktio_info *pinfo; const char *pool_name; + pool_t *pool = pool_entry_from_hdl(pool_hdl); + uint32_t ring_size; + uint32_t ring_mask; + + if ((uint64_t)ROUNDUP_POWER2_U32(pool->num + 1) > UINT32_MAX) { + ODP_ERR("Too large packet pool\n"); + return -1; + } + + /* Ring must be able to store all packets in the pool */ + ring_size = ROUNDUP_POWER2_U32(pool->num + 1); + ring_mask = ring_size - 1; + pkt_priv(pktio_entry)->ring_size = ring_size; + pkt_priv(pktio_entry)->ring_mask = ring_mask;
if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) { ODP_ERR("too big ipc name\n"); return -1; }
+ pkt_priv(pktio_entry)->rx.cache = _ring_create("ipc_rx_cache", + ring_size, 0); + if (!pkt_priv(pktio_entry)->rx.cache) { + ODP_ERR("pid %d unable to create ipc rx cache\n", getpid()); + return -1; + } + /* generate name in shm like ipc_pktio_r for * to be processed packets ring. */ snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev); - pkt_priv(pktio_entry)->tx.send = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, + pkt_priv(pktio_entry)->tx.send = _ring_create(ipc_shm_name, ring_size, ODP_SHM_PROC | ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->tx.send) { @@ -242,16 +265,14 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send, - PKTIO_IPC_ENTRY_MASK), - _ring_free_count(pkt_priv(pktio_entry)->tx.send, - PKTIO_IPC_ENTRY_MASK)); + ring_mask), + _ring_free_count(pkt_priv(pktio_entry)->tx.send, ring_mask));
/* generate name in shm like ipc_pktio_p for * already processed packets */ snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); - pkt_priv(pktio_entry)->tx.free = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, + pkt_priv(pktio_entry)->tx.free = _ring_create(ipc_shm_name, ring_size, ODP_SHM_PROC | ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->tx.free) { @@ -261,13 +282,11 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free, - PKTIO_IPC_ENTRY_MASK), - _ring_free_count(pkt_priv(pktio_entry)->tx.free, - PKTIO_IPC_ENTRY_MASK)); + ring_mask), + _ring_free_count(pkt_priv(pktio_entry)->tx.free, ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); - pkt_priv(pktio_entry)->rx.recv = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, + pkt_priv(pktio_entry)->rx.recv = _ring_create(ipc_shm_name, ring_size, ODP_SHM_PROC | ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->rx.recv) { @@ -277,13 +296,11 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv, - PKTIO_IPC_ENTRY_MASK), - _ring_free_count(pkt_priv(pktio_entry)->rx.recv, - PKTIO_IPC_ENTRY_MASK)); + ring_mask), + _ring_free_count(pkt_priv(pktio_entry)->rx.recv, ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); - pkt_priv(pktio_entry)->rx.free = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, + pkt_priv(pktio_entry)->rx.free = _ring_create(ipc_shm_name, ring_size, ODP_SHM_PROC | ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->rx.free) { @@ -293,9 +310,8 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free, - PKTIO_IPC_ENTRY_MASK), - _ring_free_count(pkt_priv(pktio_entry)->rx.free, - PKTIO_IPC_ENTRY_MASK)); + ring_mask), + _ring_free_count(pkt_priv(pktio_entry)->rx.free, ring_mask));
/* Set up pool name for remote info */ pinfo = pkt_priv(pktio_entry)->pinfo; @@ -307,6 +323,11 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, }
memcpy(pinfo->master.pool_name, pool_name, strlen(pool_name)); + + /* Export ring info for the slave process to use */ + pinfo->master.ring_size = ring_size; + pinfo->master.ring_mask = ring_mask; + pinfo->slave.base_addr = 0; pinfo->slave.pid = 0; pinfo->slave.init_done = 0; @@ -372,14 +393,34 @@ static void *_ipc_shm_map(char *name, int pid) return odp_shm_addr(shm); }
-static int _ipc_init_slave(const char *dev, - pktio_entry_t *pktio_entry, - odp_pool_t pool) +static int _ipc_init_slave(const char *dev, pktio_entry_t *pktio_entry, + odp_pool_t pool_hdl) { - if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r"))) - ODP_ABORT("too big ipc name\n"); + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + pool_t *pool = pool_entry_from_hdl(pool_hdl); + uint32_t ring_size = pktio_ipc->pinfo->master.ring_size; + + if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_slave_r"))) { + ODP_ERR("Too big ipc name\n"); + return -1; + } + + /* Check that IPC rings are able to store all packets */ + if (pool->num >= ring_size) { + ODP_ERR("Slave process packet pool too large. Master process " + "packet pool has to be larger than slave pool.\n"); + return -1; + } + + pktio_ipc->rx.cache = _ring_create("ipc_rx_cache", ring_size, 0); + if (!pktio_ipc->rx.cache) { + ODP_ERR("Pid %d unable to create ipc rx cache\n", getpid()); + return -1; + } + pktio_ipc->ring_size = ring_size; + pktio_ipc->ring_mask = pktio_ipc->pinfo->master.ring_mask; + pktio_ipc->pool = pool_hdl;
- pkt_priv(pktio_entry)->pool = pool; return 0; }
@@ -391,6 +432,7 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) char tail[ODP_POOL_NAME_LEN]; char dev[ODP_POOL_NAME_LEN]; int pid; + uint32_t ring_mask = pkt_priv(pktio_entry)->ring_mask;
if (sscanf(pktio_entry->s.name, "ipc:%d:%s", &pid, tail) != 2) { ODP_ERR("wrong pktio name\n"); @@ -409,9 +451,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv, - PKTIO_IPC_ENTRY_MASK), + ring_mask), _ring_free_count(pkt_priv(pktio_entry)->rx.recv, - PKTIO_IPC_ENTRY_MASK)); + ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); pkt_priv(pktio_entry)->rx.free = _ipc_shm_map(ipc_shm_name, pid); @@ -422,9 +464,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free, - PKTIO_IPC_ENTRY_MASK), + ring_mask), _ring_free_count(pkt_priv(pktio_entry)->rx.free, - PKTIO_IPC_ENTRY_MASK)); + ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); pkt_priv(pktio_entry)->tx.send = _ipc_shm_map(ipc_shm_name, pid); @@ -435,9 +477,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send, - PKTIO_IPC_ENTRY_MASK), + ring_mask), _ring_free_count(pkt_priv(pktio_entry)->tx.send, - PKTIO_IPC_ENTRY_MASK)); + ring_mask));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); pkt_priv(pktio_entry)->tx.free = _ipc_shm_map(ipc_shm_name, pid); @@ -448,9 +490,9 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free, - PKTIO_IPC_ENTRY_MASK), + ring_mask), _ring_free_count(pkt_priv(pktio_entry)->tx.free, - PKTIO_IPC_ENTRY_MASK)); + ring_mask));
/* Get info about remote pool */ pinfo = pkt_priv(pktio_entry)->pinfo; @@ -487,7 +529,8 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, const char *dev, odp_pool_t pool) { - int ret = -1; + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + int ret = 0; int pid; struct pktio_info *pinfo; char name[ODP_POOL_NAME_LEN + sizeof("_info")]; @@ -497,43 +540,35 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, if (strncmp(dev, "ipc", 3)) return -1;
- odp_atomic_init_u32(&pkt_priv(pktio_entry)->ready, 0); - - pkt_priv(pktio_entry)->rx.cache = _ring_create("ipc_rx_cache", - PKTIO_IPC_ENTRIES, 0); - if (!pkt_priv(pktio_entry)->rx.cache) - return -1; + odp_atomic_init_u32(&pktio_ipc->ready, 0);
/* Shared info about remote pktio */ if (sscanf(dev, "ipc:%d:%s", &pid, tail) == 2) { - pkt_priv(pktio_entry)->type = PKTIO_TYPE_IPC_SLAVE; + pktio_ipc->type = PKTIO_TYPE_IPC_SLAVE;
snprintf(name, sizeof(name), "ipc:%s_info", tail); IPC_ODP_DBG("lookup for name %s for pid %d\n", name, pid); shm = odp_shm_import(name, pid, name); - if (ODP_SHM_INVALID == shm) { - _ring_destroy("ipc_rx_cache"); + if (ODP_SHM_INVALID == shm) return -1; - } + pinfo = odp_shm_addr(shm);
if (!pinfo->master.init_done) { odp_shm_free(shm); - _ring_destroy("ipc_rx_cache"); return -1; } - pkt_priv(pktio_entry)->pinfo = pinfo; - pkt_priv(pktio_entry)->pinfo_shm = shm; + pktio_ipc->pinfo = pinfo; + pktio_ipc->pinfo_shm = shm; ODP_DBG("process %d is slave\n", getpid()); ret = _ipc_init_slave(name, pktio_entry, pool); } else { - pkt_priv(pktio_entry)->type = PKTIO_TYPE_IPC_MASTER; + pktio_ipc->type = PKTIO_TYPE_IPC_MASTER; snprintf(name, sizeof(name), "%s_info", dev); shm = odp_shm_reserve(name, sizeof(struct pktio_info), ODP_CACHE_LINE_SIZE, ODP_SHM_EXPORT | ODP_SHM_SINGLE_VA); if (ODP_SHM_INVALID == shm) { - _ring_destroy("ipc_rx_cache"); ODP_ERR("can not create shm %s\n", name); return -1; } @@ -541,19 +576,26 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, pinfo = odp_shm_addr(shm); pinfo->master.init_done = 0; pinfo->master.pool_name[0] = 0; - pkt_priv(pktio_entry)->pinfo = pinfo; - pkt_priv(pktio_entry)->pinfo_shm = shm; + + pktio_ipc->pinfo = pinfo; + pktio_ipc->pinfo_shm = shm; ODP_DBG("process %d is master\n", getpid()); ret = _ipc_init_master(pktio_entry, dev, pool); }
+ if (ret) + odp_shm_free(shm); + return ret; }
static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, ring_ptr_t *r, uint32_t r_mask) { - uintptr_t offsets[PKTIO_IPC_ENTRIES]; + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + uint32_t ring_size = pktio_ipc->ring_size; + uint32_t ring_mask = pktio_ipc->ring_mask; + uintptr_t offsets[ring_size]; int ret; void **rbuf_p; int i; @@ -563,14 +605,13 @@ static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, ring_ptr_t *r, if (!r) return;
- pool = pool_entry_from_hdl(pkt_priv(pktio_entry)->pool); + pool = pool_entry_from_hdl(pktio_ipc->pool); addr = odp_shm_addr(pool->shm);
rbuf_p = (void *)&offsets;
while (1) { - ret = ring_ptr_deq_multi(r, r_mask, rbuf_p, - PKTIO_IPC_ENTRIES - 1); + ret = ring_ptr_deq_multi(r, r_mask, rbuf_p, ring_mask); if (ret <= 0) break; for (i = 0; i < ret; i++) { @@ -588,34 +629,36 @@ static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, ring_ptr_t *r, static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, odp_packet_t pkt_table[], int len) { + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + uint32_t ring_size = pktio_ipc->ring_size; + uint32_t ring_mask = pktio_ipc->ring_mask; int pkts = 0; int i; ring_ptr_t *r; ring_ptr_t *r_p; - uintptr_t offsets[PKTIO_IPC_ENTRIES]; + uintptr_t offsets[ring_size]; void **ipcbufs_p = (void *)&offsets[0]; uint32_t ready;
- ready = odp_atomic_load_u32(&pkt_priv(pktio_entry)->ready); + ready = odp_atomic_load_u32(&pktio_ipc->ready); if (odp_unlikely(!ready)) { IPC_ODP_DBG("start pktio is missing before usage?\n"); return 0; }
- _ipc_free_ring_packets(pktio_entry, pkt_priv(pktio_entry)->tx.free, - PKTIO_IPC_ENTRY_MASK); + _ipc_free_ring_packets(pktio_entry, pktio_ipc->tx.free, ring_mask);
/* rx from cache */ - r = pkt_priv(pktio_entry)->rx.cache; - pkts = ring_ptr_deq_multi(r, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, len); + r = pktio_ipc->rx.cache; + pkts = ring_ptr_deq_multi(r, ring_mask, ipcbufs_p, len); if (odp_unlikely(pkts < 0)) ODP_ABORT("internal error dequeue\n");
/* rx from other app */ if (pkts == 0) { ipcbufs_p = (void *)&offsets[0]; - r = pkt_priv(pktio_entry)->rx.recv; - pkts = ring_ptr_deq_multi(r, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, + r = pktio_ipc->rx.recv; + pkts = ring_ptr_deq_multi(r, ring_mask, ipcbufs_p, len); if (odp_unlikely(pkts < 0)) ODP_ABORT("internal error dequeue\n"); @@ -633,11 +676,10 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, uint64_t data_pool_off; void *rmt_data_ptr;
- phdr = (void *)((uint8_t *)pkt_priv(pktio_entry)-> - pool_mdata_base + - offsets[i]); + phdr = (void *)((uint8_t *)pktio_ipc->pool_mdata_base + + offsets[i]);
- pool = pkt_priv(pktio_entry)->pool; + pool = pktio_ipc->pool; if (odp_unlikely(pool == ODP_POOL_INVALID)) ODP_ABORT("invalid pool");
@@ -646,7 +688,7 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, pkt = odp_packet_alloc(pool, phdr->frame_len); if (odp_unlikely(pkt == ODP_PACKET_INVALID)) { /* Original pool might be smaller then - * PKTIO_IPC_ENTRIES. If packet can not be + * ring size. If packet can not be * allocated from pool at this time, * simple get in on next recv() call. To keep * packet ordering store such packets in local @@ -661,13 +703,11 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, pkt_data = odp_packet_data(pkt); if (odp_unlikely(!pkt_data)) ODP_ABORT("unable to map pkt_data ipc_slave %d\n", - (PKTIO_TYPE_IPC_SLAVE == - pkt_priv(pktio_entry)->type)); + (PKTIO_TYPE_IPC_SLAVE == pktio_ipc->type));
/* Copy packet data from shared pool to local pool. */ - rmt_data_ptr = (uint8_t *)pkt_priv(pktio_entry)-> - pool_mdata_base + - data_pool_off; + rmt_data_ptr = (uint8_t *)pktio_ipc->pool_mdata_base + + data_pool_off; memcpy(pkt_data, rmt_data_ptr, phdr->frame_len);
/* Copy packets L2, L3 parsed offsets and size */ @@ -686,8 +726,8 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, /* put back to rx ring dequed but not processed packets*/ if (pkts != i) { ipcbufs_p = (void *)&offsets[i]; - r_p = pkt_priv(pktio_entry)->rx.cache; - ring_ptr_enq_multi(r_p, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, + r_p = pktio_ipc->rx.cache; + ring_ptr_enq_multi(r_p, ring_mask, ipcbufs_p, pkts - i);
if (i == 0) @@ -698,10 +738,10 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, pkts = i;
/* Now tell other process that we no longer need that buffers.*/ - r_p = pkt_priv(pktio_entry)->rx.free; + r_p = pktio_ipc->rx.free;
ipcbufs_p = (void *)&offsets[0]; - ring_ptr_enq_multi(r_p, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, pkts); + ring_ptr_enq_multi(r_p, ring_mask, ipcbufs_p, pkts);
for (i = 0; i < pkts; i++) { IPC_ODP_DBG("%d/%d send to be free packet offset %x\n", @@ -728,11 +768,13 @@ static int ipc_pktio_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED, static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, const odp_packet_t pkt_table[], int num) { + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + uint32_t ring_mask = pktio_ipc->ring_mask; ring_ptr_t *r; void **rbuf_p; int i; - uint32_t ready = odp_atomic_load_u32(&pkt_priv(pktio_entry)->ready); - pool_t *ipc_pool = pool_entry_from_hdl(pkt_priv(pktio_entry)->pool); + uint32_t ready = odp_atomic_load_u32(&pktio_ipc->ready); + pool_t *ipc_pool = pool_entry_from_hdl(pktio_ipc->pool); odp_packet_t pkt_table_mapped[num]; /**< Ready to send packet has to be * in memory mapped pool. */ uintptr_t offsets[num]; @@ -740,8 +782,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, if (odp_unlikely(!ready)) return 0;
- _ipc_free_ring_packets(pktio_entry, pkt_priv(pktio_entry)->tx.free, - PKTIO_IPC_ENTRY_MASK); + _ipc_free_ring_packets(pktio_entry, pktio_ipc->tx.free, ring_mask);
/* Copy packets to shm shared pool if they are in different * pool, or if they are references (we can't share across IPC). @@ -758,8 +799,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, odp_packet_has_ref(pkt)) { odp_packet_t newpkt;
- newpkt = odp_packet_copy(pkt, - pkt_priv(pktio_entry)->pool); + newpkt = odp_packet_copy(pkt, pktio_ipc->pool); if (newpkt == ODP_PACKET_INVALID) ODP_ABORT("Unable to copy packet\n");
@@ -791,14 +831,13 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, odp_packet_to_u64(pkt), odp_pool_to_u64(pool_hdl), pkt_hdr, pkt_hdr->buf_hdr.ipc_data_offset, offsets[i], odp_shm_addr(pool->shm), - odp_shm_addr(pool_entry_from_hdl( - pkt_priv(pktio_entry)->pool)->shm)); + odp_shm_addr(ipc_pool->shm)); }
/* Put packets to ring to be processed by other process. */ rbuf_p = (void *)&offsets[0]; - r = pkt_priv(pktio_entry)->tx.send; - ring_ptr_enq_multi(r, PKTIO_IPC_ENTRY_MASK, rbuf_p, num); + r = pktio_ipc->tx.send; + ring_ptr_enq_multi(r, ring_mask, rbuf_p, num);
return num; } @@ -847,19 +886,20 @@ static int ipc_start(pktio_entry_t *pktio_entry)
static int ipc_stop(pktio_entry_t *pktio_entry) { - odp_atomic_store_u32(&pkt_priv(pktio_entry)->ready, 0); + pkt_ipc_t *pktio_ipc = pkt_priv(pktio_entry); + uint32_t ring_mask = pktio_ipc->ring_mask; + + odp_atomic_store_u32(&pktio_ipc->ready, 0);
- if (pkt_priv(pktio_entry)->tx.send) - _ipc_free_ring_packets(pktio_entry, - pkt_priv(pktio_entry)->tx.send, - PKTIO_IPC_ENTRY_MASK); + if (pktio_ipc->tx.send) + _ipc_free_ring_packets(pktio_entry, pktio_ipc->tx.send, + ring_mask); /* other process can transfer packets from one ring to * other, use delay here to free that packets. */ sleep(1); - if (pkt_priv(pktio_entry)->tx.free) - _ipc_free_ring_packets(pktio_entry, - pkt_priv(pktio_entry)->tx.free, - PKTIO_IPC_ENTRY_MASK); + if (pktio_ipc->tx.free) + _ipc_free_ring_packets(pktio_entry, pktio_ipc->tx.free, + ring_mask);
return 0; }
commit b89a5b77e92060c9343ab162526d05a8ee97f8b4 Author: Matias Elo matias.elo@nokia.com Date: Fri Oct 25 13:59:58 2019 +0300
linux-gen: ipc: clean up code
Reorder code and remove unused variables and defines.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index 00edcdb7e..dec918e95 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -11,6 +11,7 @@ #include <odp/api/system_info.h> #include <odp_shm_internal.h> #include <odp_ring_ptr_internal.h> +#include <odp_global_data.h>
#include <errno.h> #include <fcntl.h> @@ -38,15 +39,12 @@ struct pktio_info { struct { /* number of buffer*/ int num; - /* size of packet/segment in remote pool */ - uint32_t block_size; char pool_name[ODP_POOL_NAME_LEN]; /* 1 if master finished creation of all shared objects */ int init_done; } master; struct { void *base_addr; - uint32_t block_size; char pool_name[ODP_POOL_NAME_LEN]; /* pid of the slave process written to shm and * used by master to look up memory created by @@ -57,46 +55,79 @@ struct pktio_info { } slave; } ODP_PACKED;
-/* The maximum length of a ring name. */ -#define _RING_NAMESIZE 32 -/* If set - ring is visible from different processes. - * Default is thread visible.*/ -#define _RING_SHM_PROC (1 << 2) -/* Ring size mask */ -#define _RING_SZ_MASK (unsigned)(0x0fffffff) +typedef struct { + /* TX */ + struct { + /* ODP ring for IPC msg packets indexes transmitted to shared + * memory */ + ring_ptr_t *send; + /* ODP ring for IPC msg packets indexes already processed by + * remote process */ + ring_ptr_t *free; + } tx; + /* RX */ + struct { + /* ODP ring for IPC msg packets indexes received from shared + * memory (from remote process) */ + ring_ptr_t *recv; + /* odp ring for ipc msg packets indexes already processed by + * current process */ + ring_ptr_t *free; + /* local cache to keep packet order right */ + ring_ptr_t *cache; + } rx; /* slave */ + void *pool_base; /**< Remote pool base addr */ + void *pool_mdata_base; /**< Remote pool mdata base addr */ + odp_pool_t pool; /**< Pool of main process */ + enum { + PKTIO_TYPE_IPC_MASTER = 0, /**< Master is the process which + creates shm */ + PKTIO_TYPE_IPC_SLAVE /**< Slave is the process which + connects to shm */ + } type; /**< define if it's master or slave process */ + odp_atomic_u32_t ready; /**< 1 - pktio is ready and can recv/send + packet, 0 - not yet ready */ + void *pinfo; + odp_shm_t pinfo_shm; + odp_shm_t remote_pool_shm; /**< shm of remote pool get with + _ipc_map_remote_pool() */ +} pkt_ipc_t; + +ODP_STATIC_ASSERT(PKTIO_PRIVATE_SIZE >= sizeof(pkt_ipc_t), + "PKTIO_PRIVATE_SIZE too small"); + +static inline pkt_ipc_t *pkt_priv(pktio_entry_t *pktio_entry) +{ + return (pkt_ipc_t *)(uintptr_t)(pktio_entry->s.pkt_priv); +} + +/* MAC address for the "ipc" interface */ +static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12}; + +static odp_shm_t _ipc_map_remote_pool(const char *name, int pid);
/* create the ring */ -static ring_ptr_t * -_ring_create(const char *name, unsigned count, unsigned flags) +static ring_ptr_t *_ring_create(const char *name, uint32_t count, + uint32_t shm_flags) { - char ring_name[_RING_NAMESIZE]; ring_ptr_t *r; size_t ring_size; - uint32_t shm_flag; odp_shm_t shm;
- if (flags & _RING_SHM_PROC) - shm_flag = ODP_SHM_PROC | ODP_SHM_EXPORT; - else - shm_flag = 0; if (odp_global_ro.shm_single_va) - shm_flag |= ODP_SHM_SINGLE_VA; + shm_flags |= ODP_SHM_SINGLE_VA;
/* count must be a power of 2 */ if (!CHECK_IS_POWER2(count)) { - ODP_ERR("Requested size is invalid, must be power of 2," - "and do not exceed the size limit %u\n", - _RING_SZ_MASK); + ODP_ERR("Requested size is invalid, must be a power of 2\n"); __odp_errno = EINVAL; return NULL; }
- snprintf(ring_name, sizeof(ring_name), "%s", name); ring_size = sizeof(ring_ptr_t) + count * sizeof(void *);
/* reserve a memory zone for this ring.*/ - shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, - shm_flag); + shm = odp_shm_reserve(name, ring_size, ODP_CACHE_LINE_SIZE, shm_flags);
r = odp_shm_addr(shm); if (r != NULL) { @@ -124,7 +155,7 @@ static int _ring_destroy(const char *name) /** * Return the number of entries in a ring. */ -static unsigned _ring_count(ring_ptr_t *r, uint32_t mask) +static uint32_t _ring_count(ring_ptr_t *r, uint32_t mask) { uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail); @@ -135,7 +166,7 @@ static unsigned _ring_count(ring_ptr_t *r, uint32_t mask) /** * Return the number of free entries in a ring. */ -static unsigned _ring_free_count(ring_ptr_t *r, uint32_t mask) +static uint32_t _ring_free_count(ring_ptr_t *r, uint32_t mask) { uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail); @@ -143,58 +174,6 @@ static unsigned _ring_free_count(ring_ptr_t *r, uint32_t mask) return (cons_tail - prod_tail - 1) & mask; }
-typedef struct { - /* TX */ - struct { - /* ODP ring for IPC msg packets indexes transmitted to shared - * memory */ - ring_ptr_t *send; - /* ODP ring for IPC msg packets indexes already processed by - * remote process */ - ring_ptr_t *free; - } tx; - /* RX */ - struct { - /* ODP ring for IPC msg packets indexes received from shared - * memory (from remote process) */ - ring_ptr_t *recv; - /* odp ring for ipc msg packets indexes already processed by - * current process */ - ring_ptr_t *free; - /* local cache to keep packet order right */ - ring_ptr_t *cache; - } rx; /* slave */ - void *pool_base; /**< Remote pool base addr */ - void *pool_mdata_base; /**< Remote pool mdata base addr */ - uint64_t pkt_size; /**< Packet size in remote pool */ - odp_pool_t pool; /**< Pool of main process */ - enum { - PKTIO_TYPE_IPC_MASTER = 0, /**< Master is the process which - creates shm */ - PKTIO_TYPE_IPC_SLAVE /**< Slave is the process which - connects to shm */ - } type; /**< define if it's master or slave process */ - odp_atomic_u32_t ready; /**< 1 - pktio is ready and can recv/send - packet, 0 - not yet ready */ - void *pinfo; - odp_shm_t pinfo_shm; - odp_shm_t remote_pool_shm; /**< shm of remote pool get with - _ipc_map_remote_pool() */ -} pkt_ipc_t; - -ODP_STATIC_ASSERT(PKTIO_PRIVATE_SIZE >= sizeof(pkt_ipc_t), - "PKTIO_PRIVATE_SIZE too small"); - -static inline pkt_ipc_t *pkt_priv(pktio_entry_t *pktio_entry) -{ - return (pkt_ipc_t *)(uintptr_t)(pktio_entry->s.pkt_priv); -} - -/* MAC address for the "ipc" interface */ -static const char pktio_ipc_mac[] = {0x12, 0x12, 0x12, 0x12, 0x12, 0x12}; - -static odp_shm_t _ipc_map_remote_pool(const char *name, int pid); - static const char *_ipc_odp_buffer_pool_shm_name(odp_pool_t pool_hdl) { pool_t *pool; @@ -240,13 +219,9 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, odp_pool_t pool_hdl) { char ipc_shm_name[ODP_POOL_NAME_LEN + sizeof("_m_prod")]; - pool_t *pool; struct pktio_info *pinfo; const char *pool_name;
- pool = pool_entry_from_hdl(pool_hdl); - (void)pool; - if (strlen(dev) > (ODP_POOL_NAME_LEN - sizeof("_m_prod"))) { ODP_ERR("too big ipc name\n"); return -1; @@ -258,7 +233,8 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev); pkt_priv(pktio_entry)->tx.send = _ring_create(ipc_shm_name, PKTIO_IPC_ENTRIES, - _RING_SHM_PROC); + ODP_SHM_PROC | + ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->tx.send) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); @@ -276,7 +252,8 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); pkt_priv(pktio_entry)->tx.free = _ring_create(ipc_shm_name, PKTIO_IPC_ENTRIES, - _RING_SHM_PROC); + ODP_SHM_PROC | + ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->tx.free) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); @@ -291,7 +268,8 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); pkt_priv(pktio_entry)->rx.recv = _ring_create(ipc_shm_name, PKTIO_IPC_ENTRIES, - _RING_SHM_PROC); + ODP_SHM_PROC | + ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->rx.recv) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); @@ -306,7 +284,8 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); pkt_priv(pktio_entry)->rx.free = _ring_create(ipc_shm_name, PKTIO_IPC_ENTRIES, - _RING_SHM_PROC); + ODP_SHM_PROC | + ODP_SHM_EXPORT); if (!pkt_priv(pktio_entry)->rx.free) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); @@ -361,7 +340,6 @@ static void _ipc_export_pool(struct pktio_info *pinfo, snprintf(pinfo->slave.pool_name, ODP_POOL_NAME_LEN, "%s", _ipc_odp_buffer_pool_shm_name(pool_hdl)); pinfo->slave.pid = odp_global_ro.main_pid; - pinfo->slave.block_size = pool->block_size; pinfo->slave.base_addr = pool->base_addr; }
@@ -480,7 +458,6 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) pid); pkt_priv(pktio_entry)->remote_pool_shm = shm; pkt_priv(pktio_entry)->pool_mdata_base = (char *)odp_shm_addr(shm); - pkt_priv(pktio_entry)->pkt_size = pinfo->master.block_size;
_ipc_export_pool(pinfo, pkt_priv(pktio_entry)->pool);
@@ -517,9 +494,6 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, char tail[ODP_POOL_NAME_LEN]; odp_shm_t shm;
- ODP_STATIC_ASSERT(ODP_POOL_NAME_LEN == _RING_NAMESIZE, - "mismatch pool and ring name arrays"); - if (strncmp(dev, "ipc", 3)) return -1;
commit fc07575511c3a5b79ccae80ca278b3daad4f101d Author: Matias Elo matias.elo@nokia.com Date: Fri Oct 25 13:01:00 2019 +0300
linux-gen: ipc: remove now unnecessary init/term functions
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index faabe8a51..00edcdb7e 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -65,43 +65,6 @@ struct pktio_info { /* Ring size mask */ #define _RING_SZ_MASK (unsigned)(0x0fffffff)
-typedef struct { - /* Rings tailq lock */ - odp_rwlock_t qlock; - odp_shm_t shm; -} global_data_t; - -static global_data_t *global; - -/* Initialize tailq_ring */ -static int _ring_global_init(void) -{ - odp_shm_t shm; - - /* Allocate globally shared memory */ - shm = odp_shm_reserve("_odp_ring_global", sizeof(global_data_t), - ODP_CACHE_LINE_SIZE, 0); - if (ODP_SHM_INVALID == shm) { - ODP_ERR("Shm reserve failed for pktio ring\n"); - return -1; - } - - global = odp_shm_addr(shm); - memset(global, 0, sizeof(global_data_t)); - global->shm = shm; - - return 0; -} - -static int _ring_global_term(void) -{ - if (odp_shm_free(global->shm)) { - ODP_ERR("Shm free failed for pktio ring\n"); - return -1; - } - return 0; -} - /* create the ring */ static ring_ptr_t * _ring_create(const char *name, unsigned count, unsigned flags) @@ -961,23 +924,12 @@ static int ipc_close(pktio_entry_t *pktio_entry) return 0; }
-static int ipc_pktio_init_global(void) -{ - ODP_DBG("PKTIO: initializing ipc interface.\n"); - return _ring_global_init(); -} - -static int ipc_pktio_term_global(void) -{ - return _ring_global_term(); -} - const pktio_if_ops_t ipc_pktio_ops = { .name = "ipc", .print = NULL, - .init_global = ipc_pktio_init_global, + .init_global = NULL, .init_local = NULL, - .term = ipc_pktio_term_global, + .term = NULL, .open = ipc_pktio_open, .close = ipc_close, .recv = ipc_pktio_recv,
commit ff99ba52f99a608dc1a41f4d8a86ab5c2f8491ca Author: Matias Elo matias.elo@nokia.com Date: Fri Oct 25 11:13:59 2019 +0300
linux-gen: ipc: remove unnecessary odp_packet_io_ipc_internal.h header
Copy the remaining necessary code from odp_packet_io_ipc_internal.h to ipc.c.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index a3ec92af6..0d15e53d9 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -108,7 +108,6 @@ noinst_HEADERS = \ include/odp_packet_dpdk.h \ include/odp_packet_internal.h \ include/odp_packet_io_internal.h \ - include/odp_packet_io_ipc_internal.h \ include/odp_socket_common.h \ include/odp_packet_io_stats_common.h \ include/odp_packet_io_stats.h \ diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h deleted file mode 100644 index 8962f8936..000000000 --- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h +++ /dev/null @@ -1,46 +0,0 @@ -/* Copyright (c) 2015-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include <odp/api/packet_io.h> -#include <odp_packet_io_internal.h> -#include <odp/api/packet.h> -#include <odp_packet_internal.h> -#include <odp/api/shared_memory.h> - -#include <string.h> -#include <unistd.h> -#include <stdlib.h> - -/* number of odp buffers in odp ring queue */ -#define PKTIO_IPC_ENTRIES 4096 - -#define PKTIO_IPC_ENTRY_MASK (PKTIO_IPC_ENTRIES - 1) - -/* that struct is exported to shared memory, so that processes can find - * each other. - */ -struct pktio_info { - struct { - /* number of buffer*/ - int num; - /* size of packet/segment in remote pool */ - uint32_t block_size; - char pool_name[ODP_POOL_NAME_LEN]; - /* 1 if master finished creation of all shared objects */ - int init_done; - } master; - struct { - void *base_addr; - uint32_t block_size; - char pool_name[ODP_POOL_NAME_LEN]; - /* pid of the slave process written to shm and - * used by master to look up memory created by - * slave - */ - int pid; - int init_done; - } slave; -} ODP_PACKED; diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 6e4f36569..33ba8fdb4 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -24,7 +24,6 @@ #include <odp_schedule_if.h> #include <odp_classification_internal.h> #include <odp_debug_internal.h> -#include <odp_packet_io_ipc_internal.h> #include <odp/api/time.h> #include <odp/api/plat/time_inlines.h> #include <odp_pcapng.h> diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index 680bfa392..faabe8a51 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -5,7 +5,6 @@ * SPDX-License-Identifier: BSD-3-Clause */
-#include <odp_packet_io_ipc_internal.h> #include <odp_debug_internal.h> #include <odp_packet_io_internal.h> #include <odp_errno_define.h> @@ -17,6 +16,7 @@ #include <fcntl.h> #include <sys/mman.h> #include <sys/stat.h> +#include <unistd.h>
#define IPC_ODP_DEBUG_PRINT 0
@@ -26,6 +26,37 @@ ODP_DBG(fmt, ##__VA_ARGS__);\ } while (0)
+/* number of odp buffers in odp ring queue */ +#define PKTIO_IPC_ENTRIES 4096 + +#define PKTIO_IPC_ENTRY_MASK (PKTIO_IPC_ENTRIES - 1) + +/* that struct is exported to shared memory, so that processes can find + * each other. + */ +struct pktio_info { + struct { + /* number of buffer*/ + int num; + /* size of packet/segment in remote pool */ + uint32_t block_size; + char pool_name[ODP_POOL_NAME_LEN]; + /* 1 if master finished creation of all shared objects */ + int init_done; + } master; + struct { + void *base_addr; + uint32_t block_size; + char pool_name[ODP_POOL_NAME_LEN]; + /* pid of the slave process written to shm and + * used by master to look up memory created by + * slave + */ + int pid; + int init_done; + } slave; +} ODP_PACKED; + /* The maximum length of a ring name. */ #define _RING_NAMESIZE 32 /* If set - ring is visible from different processes.
commit 426ee8c61835c8faa5df0873ecc4ec7c8667c691 Author: Matias Elo matias.elo@nokia.com Date: Fri Oct 25 11:08:52 2019 +0300
linux-gen: ipc: move remaining ring helper functions to ipc source file
The remaining ring helper functions are only used by the IPC pktio, so move them to ipc.c.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 0d4d905f9..a3ec92af6 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -109,7 +109,6 @@ noinst_HEADERS = \ include/odp_packet_internal.h \ include/odp_packet_io_internal.h \ include/odp_packet_io_ipc_internal.h \ - include/odp_packet_io_ring_internal.h \ include/odp_socket_common.h \ include/odp_packet_io_stats_common.h \ include/odp_packet_io_stats.h \ @@ -224,7 +223,6 @@ __LIB__libodp_linux_la_SOURCES = \ pktio/netmap.c \ pktio/null.c \ pktio/pktio_common.c \ - pktio/ring.c \ pktio/socket.c \ pktio/socket_mmap.c \ pktio/tap.c diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h index d12db8339..ba2e7c6bc 100644 --- a/platform/linux-generic/include/odp_packet_io_internal.h +++ b/platform/linux-generic/include/odp_packet_io_internal.h @@ -27,7 +27,6 @@ extern "C" { #include <odp_classification_datamodel.h> #include <odp_align_internal.h> #include <odp_debug_internal.h> -#include <odp_packet_io_ring_internal.h> #include <odp_packet_io_stats_common.h> #include <odp_queue_if.h>
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h index 59c268cb9..8962f8936 100644 --- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h @@ -14,9 +14,6 @@ #include <unistd.h> #include <stdlib.h>
-/* IPC packet I/O over shared memory ring */ -#include <odp_packet_io_ring_internal.h> - /* number of odp buffers in odp ring queue */ #define PKTIO_IPC_ENTRIES 4096
diff --git a/platform/linux-generic/include/odp_packet_io_ring_internal.h b/platform/linux-generic/include/odp_packet_io_ring_internal.h deleted file mode 100644 index 22f1ae017..000000000 --- a/platform/linux-generic/include/odp_packet_io_ring_internal.h +++ /dev/null @@ -1,413 +0,0 @@ -/* Copyright (c) 2014-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -/*- - * BSD LICENSE - * - * Copyright(c) 2010-2013 Intel Corporation. All rights reserved. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Intel Corporation nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/* - * Derived from FreeBSD's bufring.c - * - ************************************************************************** - * - * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. The name of Kip Macy nor the names of other - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - * - ***************************************************************************/ - -/** - * ODP Ring - * - * The Ring Manager is a fixed-size queue, implemented as a table of - * pointers. Head and tail pointers are modified atomically, allowing - * concurrent access to it. It has the following features: - * - * - FIFO (First In First Out) - * - Maximum size is fixed; the pointers are stored in a table. - * - Lockless implementation. - * - Multi- or single-consumer dequeue. - * - Multi- or single-producer enqueue. - * - Bulk dequeue. - * - Bulk enqueue. - * - * Note: the ring implementation is not preemptable. A lcore must not - * be interrupted by another task that uses the same ring. - * - */ - -#ifndef _RING_H_ -#define _RING_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include <odp/api/std_types.h> -#include <odp/api/hints.h> -#include <odp/api/atomic.h> -#include <errno.h> -#include <sys/queue.h> -#include <odp_debug_internal.h> - -#include <odp_ring_ptr_internal.h> - -enum _ring_queue_behavior { - _RING_QUEUE_FIXED = 0, /**< Enq/Deq a fixed number - of items from a ring */ - _RING_QUEUE_VARIABLE /**< Enq/Deq as many items - a possible from ring */ -}; - -#define _RING_NAMESIZE 32 /**< The maximum length of a ring name. */ - -/** - * An ODP ring structure. - * - * The producer and the consumer have a head and a tail index. The particularity - * of these index is that they are not between 0 and size(ring). These indexes - * are between 0 and 2^32, and we mask their value when we access the ring[] - * field. Thanks to this assumption, we can do subtractions between 2 index - * values in a modulo-32bit base: that's why the overflow of the indexes is not - * a problem. - */ -typedef struct _ring { - /** @private Next in list. */ - TAILQ_ENTRY(_ring) next; - - /** @private Name of the ring. */ - char name[_RING_NAMESIZE]; - /** @private Flags supplied at creation. */ - int flags; - - /** @private Producer */ - struct ODP_ALIGNED_CACHE _prod { - uint32_t size; /* Size of ring. */ - uint32_t mask; /* Mask (size-1) of ring. */ - volatile uint32_t head; /* Producer head. */ - volatile uint32_t tail; /* Producer tail. */ - } prod; - - /** @private Consumer */ - struct ODP_ALIGNED_CACHE _cons { - uint32_t size; /* Size of the ring. */ - uint32_t mask; /* Mask (size-1) of ring. */ - volatile uint32_t head; /* Consumer head. */ - volatile uint32_t tail; /* Consumer tail. */ - } cons; - - /** @private Memory space of ring starts here. */ - void ODP_ALIGNED_CACHE *ring[0]; -} _ring_t; - -/* The default enqueue is "single-producer".*/ -#define _RING_F_SP_ENQ (1 << 0) -/* The default dequeue is "single-consumer".*/ -#define _RING_F_SC_DEQ (1 << 1) -/* If set - ring is visible from different processes. - * Default is thread visible.*/ -#define _RING_SHM_PROC (1 << 2) -/* Ring size mask */ -#define _RING_SZ_MASK (unsigned)(0x0fffffff) - -/** - * Create a new ring named *name* in memory. - * - * This function uses odp_shm_reserve() to allocate memory. Its size is - * set to *count*, which must be a power of two. Note that the real usable - * ring size is count-1 instead of count. - * - * @param name - * The name of the ring. - * @param count - * The size of the ring (must be a power of 2). - * @param flags - * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``odph_ring_enqueue()`` or ``odph_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``odph_ring_dequeue()`` or ``odph_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". - * @return - * On success, the pointer to the new allocated ring. NULL on error with - * odp_errno set appropriately. Possible errno values include: - * - EINVAL - count provided is not a power of 2 - * - ENOSPC - the maximum number of memzones has already been allocated - * - EEXIST - a memzone with the same name already exists - * - ENOMEM - no appropriate memory area found in which to create memzone - */ -ring_ptr_t *_ring_create(const char *name, unsigned count, unsigned flags); - -/** - * Destroy the ring created with *name*. - * - * @param name name of the ring to be destroyed. - * @return 0 on success and negative value on error. - */ -int _ring_destroy(const char *name); - -/** - * Dump the status of the ring to the console. - * - * @param r A pointer to the ring structure. - */ -void _ring_dump(ring_ptr_t *r, uint32_t mask); - -/** - * Enqueue several objects on the ring (multi-producers safe). - * - * This function uses a "compare and set" instruction to move the - * producer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the ring from the obj_table. - * @param behavior - * ODPH_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring - * ODPH_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring - * @return - * Depend on the behavior value - * if behavior = ODPH_RING_QUEUE_FIXED - * - 0: Success; objects enqueue. - * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. - * if behavior = ODPH_RING_QUEUE_VARIABLE - * - n: Actual number of objects enqueued. - */ -int ___ring_mp_do_enqueue(ring_ptr_t *r, void * const *obj_table, - unsigned n, - enum _ring_queue_behavior behavior); - -/** - * Dequeue several objects from a ring (multi-consumers safe). When - * the request objects are more than the available objects, only dequeue the - * actual number of objects - * - * This function uses a "compare and set" instruction to move the - * consumer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. - * @param n - * The number of objects to dequeue from the ring to the obj_table. - * @param behavior - * ODPH_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring - * ODPH_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring - * @return - * Depend on the behavior value - * if behavior = ODPH_RING_QUEUE_FIXED - * - 0: Success; objects dequeued. - * - -ENOENT: Not enough entries in the ring to dequeue; no object is - * dequeued. - * if behavior = ODPH_RING_QUEUE_VARIABLE - * - n: Actual number of objects dequeued. - */ - -int ___ring_mc_do_dequeue(ring_ptr_t *r, void **obj_table, - unsigned n, - enum _ring_queue_behavior behavior); - -/** - * Enqueue several objects on the ring (multi-producers safe). - * - * This function uses a "compare and set" instruction to move the - * producer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the ring from the obj_table. - * @return - * - 0: Success; objects enqueue. - * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the - * high water mark is exceeded. - * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. - */ -int _ring_mp_enqueue_bulk(ring_ptr_t *r, void * const *obj_table, - unsigned n); - -/** - * Dequeue several objects from a ring (multi-consumers safe). - * - * This function uses a "compare and set" instruction to move the - * consumer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. - * @param n - * The number of objects to dequeue from the ring to the obj_table. - * @return - * - 0: Success; objects dequeued. - * - -ENOENT: Not enough entries in the ring to dequeue; no object is - * dequeued. - */ -int _ring_mc_dequeue_bulk(ring_ptr_t *r, void **obj_table, unsigned n); - -/** - * Test if a ring is full. - * - * @param r - * A pointer to the ring structure. - * @return - * - 1: The ring is full. - * - 0: The ring is not full. - */ -int _ring_full(ring_ptr_t *r, uint32_t mask); - -/** - * Test if a ring is empty. - * - * @param r - * A pointer to the ring structure. - * @return - * - 1: The ring is empty. - * - 0: The ring is not empty. - */ -int _ring_empty(ring_ptr_t *r, uint32_t mask); - -/** - * Return the number of entries in a ring. - * - * @param r - * A pointer to the ring structure. - * @return - * The number of entries in the ring. - */ -unsigned _ring_count(ring_ptr_t *r, uint32_t mask); - -/** - * Return the number of free entries in a ring. - * - * @param r - * A pointer to the ring structure. - * @return - * The number of free entries in the ring. - */ -unsigned _ring_free_count(ring_ptr_t *r, uint32_t mask); - -/** - * search ring by name - * @param name ring name to search - * @return pointer to ring otherwise NULL - */ -ring_ptr_t *_ring_lookup(const char *name); - -/** - * Enqueue several objects on the ring (multi-producers safe). - * - * This function uses a "compare and set" instruction to move the - * producer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects). - * @param n - * The number of objects to add in the ring from the obj_table. - * @return - * - n: Actual number of objects enqueued. - */ -int _ring_mp_enqueue_burst(ring_ptr_t *r, void * const *obj_table, - unsigned n); - -/** - * Dequeue several objects from a ring (multi-consumers safe). When the request - * objects are more than the available objects, only dequeue the actual number - * of objects - * - * This function uses a "compare and set" instruction to move the - * consumer index atomically. - * - * @param r - * A pointer to the ring structure. - * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. - * @param n - * The number of objects to dequeue from the ring to the obj_table. - * @return - * - n: Actual number of objects dequeued, 0 if ring is empty - */ -int _ring_mc_dequeue_burst(ring_ptr_t *r, void **obj_table, unsigned n); - -/** - * dump the status of all rings on the console - */ -void _ring_list_dump(void); - -/** - * Initialize ring tailq - */ -int _ring_global_init(void); - -/** - * Terminate ring tailq - */ -int _ring_global_term(void); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index a7678f261..680bfa392 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -1,4 +1,5 @@ /* Copyright (c) 2015-2018, Linaro Limited + * Copyright (c) 2019, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -7,13 +8,15 @@ #include <odp_packet_io_ipc_internal.h> #include <odp_debug_internal.h> #include <odp_packet_io_internal.h> +#include <odp_errno_define.h> #include <odp/api/system_info.h> #include <odp_shm_internal.h> #include <odp_ring_ptr_internal.h>
+#include <errno.h> +#include <fcntl.h> #include <sys/mman.h> #include <sys/stat.h> -#include <fcntl.h>
#define IPC_ODP_DEBUG_PRINT 0
@@ -23,6 +26,129 @@ ODP_DBG(fmt, ##__VA_ARGS__);\ } while (0)
+/* The maximum length of a ring name. */ +#define _RING_NAMESIZE 32 +/* If set - ring is visible from different processes. + * Default is thread visible.*/ +#define _RING_SHM_PROC (1 << 2) +/* Ring size mask */ +#define _RING_SZ_MASK (unsigned)(0x0fffffff) + +typedef struct { + /* Rings tailq lock */ + odp_rwlock_t qlock; + odp_shm_t shm; +} global_data_t; + +static global_data_t *global; + +/* Initialize tailq_ring */ +static int _ring_global_init(void) +{ + odp_shm_t shm; + + /* Allocate globally shared memory */ + shm = odp_shm_reserve("_odp_ring_global", sizeof(global_data_t), + ODP_CACHE_LINE_SIZE, 0); + if (ODP_SHM_INVALID == shm) { + ODP_ERR("Shm reserve failed for pktio ring\n"); + return -1; + } + + global = odp_shm_addr(shm); + memset(global, 0, sizeof(global_data_t)); + global->shm = shm; + + return 0; +} + +static int _ring_global_term(void) +{ + if (odp_shm_free(global->shm)) { + ODP_ERR("Shm free failed for pktio ring\n"); + return -1; + } + return 0; +} + +/* create the ring */ +static ring_ptr_t * +_ring_create(const char *name, unsigned count, unsigned flags) +{ + char ring_name[_RING_NAMESIZE]; + ring_ptr_t *r; + size_t ring_size; + uint32_t shm_flag; + odp_shm_t shm; + + if (flags & _RING_SHM_PROC) + shm_flag = ODP_SHM_PROC | ODP_SHM_EXPORT; + else + shm_flag = 0; + if (odp_global_ro.shm_single_va) + shm_flag |= ODP_SHM_SINGLE_VA; + + /* count must be a power of 2 */ + if (!CHECK_IS_POWER2(count)) { + ODP_ERR("Requested size is invalid, must be power of 2," + "and do not exceed the size limit %u\n", + _RING_SZ_MASK); + __odp_errno = EINVAL; + return NULL; + } + + snprintf(ring_name, sizeof(ring_name), "%s", name); + ring_size = sizeof(ring_ptr_t) + count * sizeof(void *); + + /* reserve a memory zone for this ring.*/ + shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, + shm_flag); + + r = odp_shm_addr(shm); + if (r != NULL) { + /* init the ring structure */ + ring_ptr_init(r); + + } else { + __odp_errno = ENOMEM; + ODP_ERR("Cannot reserve memory\n"); + } + + return r; +} + +static int _ring_destroy(const char *name) +{ + odp_shm_t shm = odp_shm_lookup(name); + + if (shm != ODP_SHM_INVALID) + return odp_shm_free(shm); + + return 0; +} + +/** + * Return the number of entries in a ring. + */ +static unsigned _ring_count(ring_ptr_t *r, uint32_t mask) +{ + uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); + uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail); + + return (prod_tail - cons_tail) & mask; +} + +/** + * Return the number of free entries in a ring. + */ +static unsigned _ring_free_count(ring_ptr_t *r, uint32_t mask) +{ + uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); + uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail); + + return (cons_tail - prod_tail - 1) & mask; +} + typedef struct { /* TX */ struct { diff --git a/platform/linux-generic/pktio/ring.c b/platform/linux-generic/pktio/ring.c deleted file mode 100644 index 8b3d2b26e..000000000 --- a/platform/linux-generic/pktio/ring.c +++ /dev/null @@ -1,199 +0,0 @@ -/* Copyright (c) 2014-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -/*- - * BSD LICENSE - * - * Copyright(c) 2010-2013 Intel Corporation. All rights reserved. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Intel Corporation nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/* - * Derived from FreeBSD's bufring.c - * - ************************************************************************** - * - * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. The name of Kip Macy nor the names of other - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - * - ***************************************************************************/ - -#include <odp_api.h> -#include <fcntl.h> -#include <stdio.h> -#include <string.h> -#include <stdbool.h> -#include <inttypes.h> -#include <odp_packet_io_ring_internal.h> -#include <odp_errno_define.h> -#include <odp_global_data.h> -#include <odp_align_internal.h> - -#include <odp_ring_ptr_internal.h> - -#include <odp/api/plat/cpu_inlines.h> - -typedef struct { - /* Rings tailq lock */ - odp_rwlock_t qlock; - odp_shm_t shm; -} global_data_t; - -static global_data_t *global; - -/* Initialize tailq_ring */ -int _ring_global_init(void) -{ odp_shm_t shm; - - /* Allocate globally shared memory */ - shm = odp_shm_reserve("_odp_ring_global", sizeof(global_data_t), - ODP_CACHE_LINE_SIZE, 0); - if (ODP_SHM_INVALID == shm) { - ODP_ERR("Shm reserve failed for pktio ring\n"); - return -1; - } - - global = odp_shm_addr(shm); - memset(global, 0, sizeof(global_data_t)); - global->shm = shm; - - return 0; -} - -int _ring_global_term(void) -{ - if (odp_shm_free(global->shm)) { - ODP_ERR("Shm free failed for pktio ring\n"); - return -1; - } - return 0; -} - -/* create the ring */ -ring_ptr_t * -_ring_create(const char *name, unsigned count, unsigned flags) -{ - char ring_name[_RING_NAMESIZE]; - ring_ptr_t *r; - size_t ring_size; - uint32_t shm_flag; - odp_shm_t shm; - - if (flags & _RING_SHM_PROC) - shm_flag = ODP_SHM_PROC | ODP_SHM_EXPORT; - else - shm_flag = 0; - if (odp_global_ro.shm_single_va) - shm_flag |= ODP_SHM_SINGLE_VA; - - /* count must be a power of 2 */ - if (!CHECK_IS_POWER2(count)) { - ODP_ERR("Requested size is invalid, must be power of 2," - "and do not exceed the size limit %u\n", - _RING_SZ_MASK); - __odp_errno = EINVAL; - return NULL; - } - - snprintf(ring_name, sizeof(ring_name), "%s", name); - ring_size = sizeof(ring_ptr_t) + count * sizeof(void *); - - /* reserve a memory zone for this ring.*/ - shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, - shm_flag); - - r = odp_shm_addr(shm); - if (r != NULL) { - /* init the ring structure */ - ring_ptr_init(r); - - } else { - __odp_errno = ENOMEM; - ODP_ERR("Cannot reserve memory\n"); - } - - return r; -} - -int _ring_destroy(const char *name) -{ - odp_shm_t shm = odp_shm_lookup(name); - - if (shm != ODP_SHM_INVALID) - return odp_shm_free(shm); - - return 0; -} - -/** - * Return the number of entries in a ring. - */ -unsigned _ring_count(ring_ptr_t *r, uint32_t mask) -{ - uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); - uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail); - - return (prod_tail - cons_tail) & mask; -} - -/** - * Return the number of free entries in a ring. - */ -unsigned _ring_free_count(ring_ptr_t *r, uint32_t mask) -{ - uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); - uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail); - - return (cons_tail - prod_tail - 1) & mask; -}
commit 559aec0db739c624652facd3b337d0430cba8c61 Author: Matias Elo matias.elo@nokia.com Date: Thu Oct 24 15:51:47 2019 +0300
linux-gen: ipc: use standard ring_ptr_t implementation
Use standard ODP internal ring_ptr_t rings as IPC rings. This enables removing the duplicate _ring_t implementation.
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/include/odp_packet_io_ipc_internal.h b/platform/linux-generic/include/odp_packet_io_ipc_internal.h index 01410dab1..59c268cb9 100644 --- a/platform/linux-generic/include/odp_packet_io_ipc_internal.h +++ b/platform/linux-generic/include/odp_packet_io_ipc_internal.h @@ -20,6 +20,8 @@ /* number of odp buffers in odp ring queue */ #define PKTIO_IPC_ENTRIES 4096
+#define PKTIO_IPC_ENTRY_MASK (PKTIO_IPC_ENTRIES - 1) + /* that struct is exported to shared memory, so that processes can find * each other. */ diff --git a/platform/linux-generic/include/odp_packet_io_ring_internal.h b/platform/linux-generic/include/odp_packet_io_ring_internal.h index 6b4e06a42..22f1ae017 100644 --- a/platform/linux-generic/include/odp_packet_io_ring_internal.h +++ b/platform/linux-generic/include/odp_packet_io_ring_internal.h @@ -103,6 +103,8 @@ extern "C" { #include <sys/queue.h> #include <odp_debug_internal.h>
+#include <odp_ring_ptr_internal.h> + enum _ring_queue_behavior { _RING_QUEUE_FIXED = 0, /**< Enq/Deq a fixed number of items from a ring */ @@ -158,8 +160,6 @@ typedef struct _ring { /* If set - ring is visible from different processes. * Default is thread visible.*/ #define _RING_SHM_PROC (1 << 2) - /* Do not link ring to linked list. */ -#define _RING_NO_LIST (1 << 3) /* Ring size mask */ #define _RING_SZ_MASK (unsigned)(0x0fffffff)
@@ -190,8 +190,7 @@ typedef struct _ring { * - EEXIST - a memzone with the same name already exists * - ENOMEM - no appropriate memory area found in which to create memzone */ -_ring_t *_ring_create(const char *name, unsigned count, - unsigned flags); +ring_ptr_t *_ring_create(const char *name, unsigned count, unsigned flags);
/** * Destroy the ring created with *name*. @@ -206,7 +205,7 @@ int _ring_destroy(const char *name); * * @param r A pointer to the ring structure. */ -void _ring_dump(const _ring_t *r); +void _ring_dump(ring_ptr_t *r, uint32_t mask);
/** * Enqueue several objects on the ring (multi-producers safe). @@ -231,7 +230,7 @@ void _ring_dump(const _ring_t *r); * if behavior = ODPH_RING_QUEUE_VARIABLE * - n: Actual number of objects enqueued. */ -int ___ring_mp_do_enqueue(_ring_t *r, void * const *obj_table, +int ___ring_mp_do_enqueue(ring_ptr_t *r, void * const *obj_table, unsigned n, enum _ring_queue_behavior behavior);
@@ -262,7 +261,7 @@ int ___ring_mp_do_enqueue(_ring_t *r, void * const *obj_table, * - n: Actual number of objects dequeued. */
-int ___ring_mc_do_dequeue(_ring_t *r, void **obj_table, +int ___ring_mc_do_dequeue(ring_ptr_t *r, void **obj_table, unsigned n, enum _ring_queue_behavior behavior);
@@ -284,7 +283,7 @@ int ___ring_mc_do_dequeue(_ring_t *r, void **obj_table, * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. */ -int _ring_mp_enqueue_bulk(_ring_t *r, void * const *obj_table, +int _ring_mp_enqueue_bulk(ring_ptr_t *r, void * const *obj_table, unsigned n);
/** @@ -304,7 +303,7 @@ int _ring_mp_enqueue_bulk(_ring_t *r, void * const *obj_table, * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -int _ring_mc_dequeue_bulk(_ring_t *r, void **obj_table, unsigned n); +int _ring_mc_dequeue_bulk(ring_ptr_t *r, void **obj_table, unsigned n);
/** * Test if a ring is full. @@ -315,7 +314,7 @@ int _ring_mc_dequeue_bulk(_ring_t *r, void **obj_table, unsigned n); * - 1: The ring is full. * - 0: The ring is not full. */ -int _ring_full(const _ring_t *r); +int _ring_full(ring_ptr_t *r, uint32_t mask);
/** * Test if a ring is empty. @@ -326,7 +325,7 @@ int _ring_full(const _ring_t *r); * - 1: The ring is empty. * - 0: The ring is not empty. */ -int _ring_empty(const _ring_t *r); +int _ring_empty(ring_ptr_t *r, uint32_t mask);
/** * Return the number of entries in a ring. @@ -336,7 +335,7 @@ int _ring_empty(const _ring_t *r); * @return * The number of entries in the ring. */ -unsigned _ring_count(const _ring_t *r); +unsigned _ring_count(ring_ptr_t *r, uint32_t mask);
/** * Return the number of free entries in a ring. @@ -346,14 +345,14 @@ unsigned _ring_count(const _ring_t *r); * @return * The number of free entries in the ring. */ -unsigned _ring_free_count(const _ring_t *r); +unsigned _ring_free_count(ring_ptr_t *r, uint32_t mask);
/** * search ring by name * @param name ring name to search * @return pointer to ring otherwise NULL */ -_ring_t *_ring_lookup(const char *name); +ring_ptr_t *_ring_lookup(const char *name);
/** * Enqueue several objects on the ring (multi-producers safe). @@ -370,7 +369,7 @@ _ring_t *_ring_lookup(const char *name); * @return * - n: Actual number of objects enqueued. */ -int _ring_mp_enqueue_burst(_ring_t *r, void * const *obj_table, +int _ring_mp_enqueue_burst(ring_ptr_t *r, void * const *obj_table, unsigned n);
/** @@ -390,7 +389,7 @@ int _ring_mp_enqueue_burst(_ring_t *r, void * const *obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -int _ring_mc_dequeue_burst(_ring_t *r, void **obj_table, unsigned n); +int _ring_mc_dequeue_burst(ring_ptr_t *r, void **obj_table, unsigned n);
/** * dump the status of all rings on the console @@ -400,12 +399,12 @@ void _ring_list_dump(void); /** * Initialize ring tailq */ -int _ring_tailq_init(void); +int _ring_global_init(void);
/** * Terminate ring tailq */ -int _ring_tailq_term(void); +int _ring_global_term(void);
#ifdef __cplusplus } diff --git a/platform/linux-generic/pktio/ipc.c b/platform/linux-generic/pktio/ipc.c index 1090d760c..a7678f261 100644 --- a/platform/linux-generic/pktio/ipc.c +++ b/platform/linux-generic/pktio/ipc.c @@ -9,7 +9,7 @@ #include <odp_packet_io_internal.h> #include <odp/api/system_info.h> #include <odp_shm_internal.h> -#include <odp_shm_internal.h> +#include <odp_ring_ptr_internal.h>
#include <sys/mman.h> #include <sys/stat.h> @@ -26,22 +26,23 @@ typedef struct { /* TX */ struct { - _ring_t *send; /**< ODP ring for IPC msg packets - indexes transmitted to shared - memory */ - _ring_t *free; /**< ODP ring for IPC msg packets - indexes already processed by remote - process */ + /* ODP ring for IPC msg packets indexes transmitted to shared + * memory */ + ring_ptr_t *send; + /* ODP ring for IPC msg packets indexes already processed by + * remote process */ + ring_ptr_t *free; } tx; /* RX */ struct { - _ring_t *recv; /**< ODP ring for IPC msg packets - indexes received from shared - memory (from remote process) */ - _ring_t *free; /**< odp ring for ipc msg packets - indexes already processed by - current process */ - _ring_t *cache; /**< local cache to keep packet order right */ + /* ODP ring for IPC msg packets indexes received from shared + * memory (from remote process) */ + ring_ptr_t *recv; + /* odp ring for ipc msg packets indexes already processed by + * current process */ + ring_ptr_t *free; + /* local cache to keep packet order right */ + ring_ptr_t *cache; } rx; /* slave */ void *pool_base; /**< Remote pool base addr */ void *pool_mdata_base; /**< Remote pool mdata base addr */ @@ -136,58 +137,66 @@ static int _ipc_init_master(pktio_entry_t *pktio_entry, */ snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_prod", dev); pkt_priv(pktio_entry)->tx.send = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, - _RING_SHM_PROC | _RING_NO_LIST); + PKTIO_IPC_ENTRIES, + _RING_SHM_PROC); if (!pkt_priv(pktio_entry)->tx.send) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); return -1; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send), - _ring_free_count(pkt_priv(pktio_entry)->tx.send)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->tx.send, + PKTIO_IPC_ENTRY_MASK));
/* generate name in shm like ipc_pktio_p for * already processed packets */ snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); pkt_priv(pktio_entry)->tx.free = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, - _RING_SHM_PROC | _RING_NO_LIST); + PKTIO_IPC_ENTRIES, + _RING_SHM_PROC); if (!pkt_priv(pktio_entry)->tx.free) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_m_prod; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free), - _ring_free_count(pkt_priv(pktio_entry)->tx.free)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->tx.free, + PKTIO_IPC_ENTRY_MASK));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); pkt_priv(pktio_entry)->rx.recv = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, - _RING_SHM_PROC | _RING_NO_LIST); + PKTIO_IPC_ENTRIES, + _RING_SHM_PROC); if (!pkt_priv(pktio_entry)->rx.recv) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_m_cons; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv), - _ring_free_count(pkt_priv(pktio_entry)->rx.recv)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->rx.recv, + PKTIO_IPC_ENTRY_MASK));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); pkt_priv(pktio_entry)->rx.free = _ring_create(ipc_shm_name, - PKTIO_IPC_ENTRIES, - _RING_SHM_PROC | _RING_NO_LIST); + PKTIO_IPC_ENTRIES, + _RING_SHM_PROC); if (!pkt_priv(pktio_entry)->rx.free) { ODP_ERR("pid %d unable to create ipc ring %s name\n", getpid(), ipc_shm_name); goto free_s_prod; } ODP_DBG("Created IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free), - _ring_free_count(pkt_priv(pktio_entry)->rx.free)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->rx.free, + PKTIO_IPC_ENTRY_MASK));
/* Set up pool name for remote info */ pinfo = pkt_priv(pktio_entry)->pinfo; @@ -301,8 +310,10 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) return -1; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv), - _ring_free_count(pkt_priv(pktio_entry)->rx.recv)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.recv, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->rx.recv, + PKTIO_IPC_ENTRY_MASK));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_m_cons", dev); pkt_priv(pktio_entry)->rx.free = _ipc_shm_map(ipc_shm_name, pid); @@ -312,8 +323,10 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) goto free_m_prod; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free), - _ring_free_count(pkt_priv(pktio_entry)->rx.free)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->rx.free, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->rx.free, + PKTIO_IPC_ENTRY_MASK));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_prod", dev); pkt_priv(pktio_entry)->tx.send = _ipc_shm_map(ipc_shm_name, pid); @@ -323,8 +336,10 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) goto free_m_cons; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send), - _ring_free_count(pkt_priv(pktio_entry)->tx.send)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.send, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->tx.send, + PKTIO_IPC_ENTRY_MASK));
snprintf(ipc_shm_name, sizeof(ipc_shm_name), "%s_s_cons", dev); pkt_priv(pktio_entry)->tx.free = _ipc_shm_map(ipc_shm_name, pid); @@ -334,8 +349,10 @@ static int _ipc_slave_start(pktio_entry_t *pktio_entry) goto free_s_prod; } ODP_DBG("Connected IPC ring: %s, count %d, free %d\n", - ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free), - _ring_free_count(pkt_priv(pktio_entry)->tx.free)); + ipc_shm_name, _ring_count(pkt_priv(pktio_entry)->tx.free, + PKTIO_IPC_ENTRY_MASK), + _ring_free_count(pkt_priv(pktio_entry)->tx.free, + PKTIO_IPC_ENTRY_MASK));
/* Get info about remote pool */ pinfo = pkt_priv(pktio_entry)->pinfo; @@ -374,7 +391,7 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, odp_pool_t pool) { int ret = -1; - int pid ODP_UNUSED; + int pid; struct pktio_info *pinfo; char name[ODP_POOL_NAME_LEN + sizeof("_info")]; char tail[ODP_POOL_NAME_LEN]; @@ -389,8 +406,7 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, odp_atomic_init_u32(&pkt_priv(pktio_entry)->ready, 0);
pkt_priv(pktio_entry)->rx.cache = _ring_create("ipc_rx_cache", - PKTIO_IPC_ENTRIES, - _RING_NO_LIST); + PKTIO_IPC_ENTRIES, 0); if (!pkt_priv(pktio_entry)->rx.cache) return -1;
@@ -440,7 +456,8 @@ static int ipc_pktio_open(odp_pktio_t id ODP_UNUSED, return ret; }
-static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r) +static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, ring_ptr_t *r, + uint32_t r_mask) { uintptr_t offsets[PKTIO_IPC_ENTRIES]; int ret; @@ -458,8 +475,8 @@ static void _ipc_free_ring_packets(pktio_entry_t *pktio_entry, _ring_t *r) rbuf_p = (void *)&offsets;
while (1) { - ret = _ring_mc_dequeue_burst(r, rbuf_p, - PKTIO_IPC_ENTRIES); + ret = ring_ptr_deq_multi(r, r_mask, rbuf_p, + PKTIO_IPC_ENTRIES - 1); if (ret <= 0) break; for (i = 0; i < ret; i++) { @@ -479,12 +496,11 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, { int pkts = 0; int i; - _ring_t *r; - _ring_t *r_p; + ring_ptr_t *r; + ring_ptr_t *r_p; uintptr_t offsets[PKTIO_IPC_ENTRIES]; void **ipcbufs_p = (void *)&offsets[0]; uint32_t ready; - int pkts_ring;
ready = odp_atomic_load_u32(&pkt_priv(pktio_entry)->ready); if (odp_unlikely(!ready)) { @@ -492,11 +508,12 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, return 0; }
- _ipc_free_ring_packets(pktio_entry, pkt_priv(pktio_entry)->tx.free); + _ipc_free_ring_packets(pktio_entry, pkt_priv(pktio_entry)->tx.free, + PKTIO_IPC_ENTRY_MASK);
/* rx from cache */ r = pkt_priv(pktio_entry)->rx.cache; - pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len); + pkts = ring_ptr_deq_multi(r, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, len); if (odp_unlikely(pkts < 0)) ODP_ABORT("internal error dequeue\n");
@@ -504,7 +521,8 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, if (pkts == 0) { ipcbufs_p = (void *)&offsets[0]; r = pkt_priv(pktio_entry)->rx.recv; - pkts = _ring_mc_dequeue_burst(r, ipcbufs_p, len); + pkts = ring_ptr_deq_multi(r, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, + len); if (odp_unlikely(pkts < 0)) ODP_ABORT("internal error dequeue\n"); } @@ -575,14 +593,11 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, if (pkts != i) { ipcbufs_p = (void *)&offsets[i]; r_p = pkt_priv(pktio_entry)->rx.cache; - pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts - i); - - if (pkts_ring != (pkts - i)) - ODP_ABORT("bug to enqueue packets\n"); + ring_ptr_enq_multi(r_p, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, + pkts - i);
if (i == 0) return 0; - }
/*num of actually received packets*/ @@ -591,28 +606,14 @@ static int ipc_pktio_recv_lockless(pktio_entry_t *pktio_entry, /* Now tell other process that we no longer need that buffers.*/ r_p = pkt_priv(pktio_entry)->rx.free;
-repeat: - ipcbufs_p = (void *)&offsets[0]; - pkts_ring = _ring_mp_enqueue_burst(r_p, ipcbufs_p, pkts); - if (odp_unlikely(pkts_ring < 0)) - ODP_ABORT("ipc: odp_ring_mp_enqueue_bulk r_p fail\n"); + ring_ptr_enq_multi(r_p, PKTIO_IPC_ENTRY_MASK, ipcbufs_p, pkts);
for (i = 0; i < pkts; i++) { IPC_ODP_DBG("%d/%d send to be free packet offset %x\n", i, pkts, offsets[i]); }
- if (odp_unlikely(pkts != pkts_ring)) { - IPC_ODP_DBG("odp_ring_full: %d, odp_ring_count %d," - " _ring_free_count %d\n", - _ring_full(r_p), _ring_count(r_p), - _ring_free_count(r_p)); - ipcbufs_p = (void *)&offsets[pkts_ring - 1]; - pkts = pkts - pkts_ring; - goto repeat; - } - return pkts; }
@@ -633,9 +634,8 @@ static int ipc_pktio_recv(pktio_entry_t *pktio_entry, int index ODP_UNUSED, static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, const odp_packet_t pkt_table[], int num) { - _ring_t *r; + ring_ptr_t *r; void **rbuf_p; - int ret; int i; uint32_t ready = odp_atomic_load_u32(&pkt_priv(pktio_entry)->ready); pool_t *ipc_pool = pool_entry_from_hdl(pkt_priv(pktio_entry)->pool); @@ -646,7 +646,8 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, if (odp_unlikely(!ready)) return 0;
- _ipc_free_ring_packets(pktio_entry, pkt_priv(pktio_entry)->tx.free); + _ipc_free_ring_packets(pktio_entry, pkt_priv(pktio_entry)->tx.free, + PKTIO_IPC_ENTRY_MASK);
/* Copy packets to shm shared pool if they are in different * pool, or if they are references (we can't share across IPC). @@ -703,17 +704,7 @@ static int ipc_pktio_send_lockless(pktio_entry_t *pktio_entry, /* Put packets to ring to be processed by other process. */ rbuf_p = (void *)&offsets[0]; r = pkt_priv(pktio_entry)->tx.send; - ret = _ring_mp_enqueue_burst(r, rbuf_p, num); - if (odp_unlikely(ret < 0)) { - ODP_ERR("pid %d odp_ring_mp_enqueue_bulk fail, ipc_slave %d, ret %d\n", - getpid(), - (PKTIO_TYPE_IPC_SLAVE == pkt_priv(pktio_entry)->type), - ret); - ODP_ERR("odp_ring_full: %d, odp_ring_count %d, _ring_free_count %d\n", - _ring_full(r), _ring_count(r), - _ring_free_count(r)); - ODP_ABORT("Unexpected!\n"); - } + ring_ptr_enq_multi(r, PKTIO_IPC_ENTRY_MASK, rbuf_p, num);
return num; } @@ -762,28 +753,19 @@ static int ipc_start(pktio_entry_t *pktio_entry)
static int ipc_stop(pktio_entry_t *pktio_entry) { - unsigned tx_send = 0, tx_free = 0; - odp_atomic_store_u32(&pkt_priv(pktio_entry)->ready, 0);
if (pkt_priv(pktio_entry)->tx.send) _ipc_free_ring_packets(pktio_entry, - pkt_priv(pktio_entry)->tx.send); + pkt_priv(pktio_entry)->tx.send, + PKTIO_IPC_ENTRY_MASK); /* other process can transfer packets from one ring to * other, use delay here to free that packets. */ sleep(1); if (pkt_priv(pktio_entry)->tx.free) _ipc_free_ring_packets(pktio_entry, - pkt_priv(pktio_entry)->tx.free); - - if (pkt_priv(pktio_entry)->tx.send) - tx_send = _ring_count(pkt_priv(pktio_entry)->tx.send); - if (pkt_priv(pktio_entry)->tx.free) - tx_free = _ring_count(pkt_priv(pktio_entry)->tx.free); - if (tx_send | tx_free) { - ODP_DBG("IPC rings: tx send %d tx free %d\n", - tx_send, tx_free); - } + pkt_priv(pktio_entry)->tx.free, + PKTIO_IPC_ENTRY_MASK);
return 0; } @@ -825,12 +807,12 @@ static int ipc_close(pktio_entry_t *pktio_entry) static int ipc_pktio_init_global(void) { ODP_DBG("PKTIO: initializing ipc interface.\n"); - return _ring_tailq_init(); + return _ring_global_init(); }
static int ipc_pktio_term_global(void) { - return _ring_tailq_term(); + return _ring_global_term(); }
const pktio_if_ops_t ipc_pktio_ops = { diff --git a/platform/linux-generic/pktio/ring.c b/platform/linux-generic/pktio/ring.c index 903e2c9f1..8b3d2b26e 100644 --- a/platform/linux-generic/pktio/ring.c +++ b/platform/linux-generic/pktio/ring.c @@ -78,13 +78,13 @@ #include <odp_packet_io_ring_internal.h> #include <odp_errno_define.h> #include <odp_global_data.h> +#include <odp_align_internal.h>
-#include <odp/api/plat/cpu_inlines.h> +#include <odp_ring_ptr_internal.h>
-#define RING_VAL_IS_POWER_2(x) ((((x) - 1) & (x)) == 0) +#include <odp/api/plat/cpu_inlines.h>
typedef struct { - TAILQ_HEAD(, _ring) ring_list; /* Rings tailq lock */ odp_rwlock_t qlock; odp_shm_t shm; @@ -92,71 +92,8 @@ typedef struct {
static global_data_t *global;
-/* - * the enqueue of pointers on the ring. - */ -#define ENQUEUE_PTRS() do { \ - const uint32_t size = r->prod.size; \ - uint32_t idx = prod_head & mask; \ - if (odp_likely(idx + n < size)) { \ - for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \ - r->ring[idx] = obj_table[i]; \ - r->ring[idx + 1] = obj_table[i + 1]; \ - r->ring[idx + 2] = obj_table[i + 2]; \ - r->ring[idx + 3] = obj_table[i + 3]; \ - } \ - switch (n & 0x3) { \ - case 3: \ - r->ring[idx++] = obj_table[i++]; \ - /* fallthrough */ \ - case 2: \ - r->ring[idx++] = obj_table[i++]; \ - /* fallthrough */ \ - case 1: \ - r->ring[idx++] = obj_table[i++]; \ - } \ - } else { \ - for (i = 0; idx < size; i++, idx++)\ - r->ring[idx] = obj_table[i]; \ - for (idx = 0; i < n; i++, idx++) \ - r->ring[idx] = obj_table[i]; \ - } \ -} while (0) - -/* - * the actual copy of pointers on the ring to obj_table. - */ -#define DEQUEUE_PTRS() do { \ - uint32_t idx = cons_head & mask; \ - const uint32_t size = r->cons.size; \ - if (odp_likely(idx + n < size)) { \ - for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\ - obj_table[i] = r->ring[idx]; \ - obj_table[i + 1] = r->ring[idx + 1]; \ - obj_table[i + 2] = r->ring[idx + 2]; \ - obj_table[i + 3] = r->ring[idx + 3]; \ - } \ - switch (n & 0x3) { \ - case 3: \ - obj_table[i++] = r->ring[idx++]; \ - /* fallthrough */ \ - case 2: \ - obj_table[i++] = r->ring[idx++]; \ - /* fallthrough */ \ - case 1: \ - obj_table[i++] = r->ring[idx++]; \ - } \ - } else { \ - for (i = 0; idx < size; i++, idx++) \ - obj_table[i] = r->ring[idx]; \ - for (idx = 0; i < n; i++, idx++) \ - obj_table[i] = r->ring[idx]; \ - } \ -} while (0) - - /* Initialize tailq_ring */ -int _ring_tailq_init(void) +int _ring_global_init(void) { odp_shm_t shm;
/* Allocate globally shared memory */ @@ -171,14 +108,10 @@ int _ring_tailq_init(void) memset(global, 0, sizeof(global_data_t)); global->shm = shm;
- TAILQ_INIT(&global->ring_list); - odp_rwlock_init(&global->qlock); - return 0; }
-/* Terminate tailq_ring */ -int _ring_tailq_term(void) +int _ring_global_term(void) { if (odp_shm_free(global->shm)) { ODP_ERR("Shm free failed for pktio ring\n"); @@ -188,11 +121,11 @@ int _ring_tailq_term(void) }
/* create the ring */ -_ring_t * +ring_ptr_t * _ring_create(const char *name, unsigned count, unsigned flags) { char ring_name[_RING_NAMESIZE]; - _ring_t *r; + ring_ptr_t *r; size_t ring_size; uint32_t shm_flag; odp_shm_t shm; @@ -205,7 +138,7 @@ _ring_create(const char *name, unsigned count, unsigned flags) shm_flag |= ODP_SHM_SINGLE_VA;
/* count must be a power of 2 */ - if (!RING_VAL_IS_POWER_2(count) || (count > _RING_SZ_MASK)) { + if (!CHECK_IS_POWER2(count)) { ODP_ERR("Requested size is invalid, must be power of 2," "and do not exceed the size limit %u\n", _RING_SZ_MASK); @@ -214,36 +147,22 @@ _ring_create(const char *name, unsigned count, unsigned flags) }
snprintf(ring_name, sizeof(ring_name), "%s", name); - ring_size = count * sizeof(void *) + sizeof(_ring_t); + ring_size = sizeof(ring_ptr_t) + count * sizeof(void *);
- odp_rwlock_write_lock(&global->qlock); /* reserve a memory zone for this ring.*/ shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, shm_flag);
r = odp_shm_addr(shm); - if (r != NULL) { /* init the ring structure */ - snprintf(r->name, sizeof(r->name), "%s", name); - r->flags = flags; - r->prod.size = count; - r->cons.size = count; - r->prod.mask = count - 1; - r->cons.mask = count - 1; - r->prod.head = 0; - r->cons.head = 0; - r->prod.tail = 0; - r->cons.tail = 0; + ring_ptr_init(r);
- if (!(flags & _RING_NO_LIST)) - TAILQ_INSERT_TAIL(&global->ring_list, r, next); } else { __odp_errno = ENOMEM; ODP_ERR("Cannot reserve memory\n"); }
- odp_rwlock_write_unlock(&global->qlock); return r; }
@@ -251,265 +170,30 @@ int _ring_destroy(const char *name) { odp_shm_t shm = odp_shm_lookup(name);
- if (shm != ODP_SHM_INVALID) { - _ring_t *r = odp_shm_addr(shm); - - odp_rwlock_write_lock(&global->qlock); - if (!(r->flags & _RING_NO_LIST)) - TAILQ_REMOVE(&global->ring_list, r, next); - odp_rwlock_write_unlock(&global->qlock); - + if (shm != ODP_SHM_INVALID) return odp_shm_free(shm); - } - return 0; -} - -/** - * Enqueue several objects on the ring (multi-producers safe). - */ -int ___ring_mp_do_enqueue(_ring_t *r, void * const *obj_table, - unsigned n, enum _ring_queue_behavior behavior) -{ - uint32_t prod_head, prod_next; - uint32_t cons_tail, free_entries; - const unsigned max = n; - int success; - unsigned i; - uint32_t mask = r->prod.mask; - - /* move prod.head atomically */ - do { - /* Reset n to the initial burst count */ - n = max; - - prod_head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE); - cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * prod_head > cons_tail). So 'free_entries' is always between 0 - * and size(ring)-1. */ - free_entries = (mask + cons_tail - prod_head); - - /* check that we have enough room in ring */ - if (odp_unlikely(n > free_entries)) { - if (behavior == _RING_QUEUE_FIXED) - return -ENOBUFS; - /* No free entry available */ - if (odp_unlikely(free_entries == 0)) - return 0; - - n = free_entries; - } - - prod_next = prod_head + n; - success = __atomic_compare_exchange_n(&r->prod.head, - &prod_head, - prod_next, - false/*strong*/, - __ATOMIC_ACQUIRE, - __ATOMIC_RELAXED); - } while (odp_unlikely(success == 0)); - - /* write entries in ring */ - ENQUEUE_PTRS(); - - /* - * If there are other enqueues in progress that preceded us, - * we need to wait for them to complete - */ - while (odp_unlikely(__atomic_load_n(&r->prod.tail, __ATOMIC_RELAXED) != - prod_head)) - odp_cpu_pause(); - - /* Release our entries and the memory they refer to */ - __atomic_store_n(&r->prod.tail, prod_next, __ATOMIC_RELEASE); - return (behavior == _RING_QUEUE_FIXED) ? 0 : n; -} - -/** - * Dequeue several objects from a ring (multi-consumers safe). - */ - -int ___ring_mc_do_dequeue(_ring_t *r, void **obj_table, - unsigned n, enum _ring_queue_behavior behavior) -{ - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - const unsigned max = n; - int success; - unsigned i; - uint32_t mask = r->prod.mask; - - /* move cons.head atomically */ - do { - /* Restore n as it may change every loop */ - n = max; - - cons_head = __atomic_load_n(&r->cons.head, __ATOMIC_ACQUIRE); - prod_tail = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE); - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = (prod_tail - cons_head); - - /* Set the actual entries for dequeue */ - if (n > entries) { - if (behavior == _RING_QUEUE_FIXED) - return -ENOENT; - if (odp_unlikely(entries == 0)) - return 0; - - n = entries; - } - - cons_next = cons_head + n; - success = __atomic_compare_exchange_n(&r->cons.head, - &cons_head, - cons_next, - false/*strong*/, - __ATOMIC_ACQUIRE, - __ATOMIC_RELAXED); - } while (odp_unlikely(success == 0)); - - /* copy in table */ - DEQUEUE_PTRS(); - - /* - * If there are other dequeues in progress that preceded us, - * we need to wait for them to complete - */ - while (odp_unlikely(__atomic_load_n(&r->cons.tail, __ATOMIC_RELAXED) != - cons_head)) - odp_cpu_pause(); - - /* Release our entries and the memory they refer to */ - __atomic_store_n(&r->cons.tail, cons_next, __ATOMIC_RELEASE); - - return behavior == _RING_QUEUE_FIXED ? 0 : n; -} - -/** - * Enqueue several objects on the ring (multi-producers safe). - */ -int _ring_mp_enqueue_bulk(_ring_t *r, void * const *obj_table, - unsigned n) -{ - return ___ring_mp_do_enqueue(r, obj_table, n, - _RING_QUEUE_FIXED); -} - -/** - * Dequeue several objects from a ring (multi-consumers safe). - */ -int _ring_mc_dequeue_bulk(_ring_t *r, void **obj_table, unsigned n) -{ - return ___ring_mc_do_dequeue(r, obj_table, n, - _RING_QUEUE_FIXED); -} - -/** - * Test if a ring is full. - */ -int _ring_full(const _ring_t *r) -{ - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail;
- return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0); -} - -/** - * Test if a ring is empty. - */ -int _ring_empty(const _ring_t *r) -{ - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - - return !!(cons_tail == prod_tail); + return 0; }
/** * Return the number of entries in a ring. */ -unsigned _ring_count(const _ring_t *r) +unsigned _ring_count(ring_ptr_t *r, uint32_t mask) { - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; + uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); + uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail);
- return (prod_tail - cons_tail) & r->prod.mask; + return (prod_tail - cons_tail) & mask; }
/** * Return the number of free entries in a ring. */ -unsigned _ring_free_count(const _ring_t *r) -{ - uint32_t prod_tail = r->prod.tail; - uint32_t cons_tail = r->cons.tail; - - return (cons_tail - prod_tail - 1) & r->prod.mask; -} - -/* dump the status of the ring on the console */ -void _ring_dump(const _ring_t *r) -{ - ODP_DBG("ring <%s>@%p\n", r->name, r); - ODP_DBG(" flags=%x\n", r->flags); - ODP_DBG(" size=%" PRIu32 "\n", r->prod.size); - ODP_DBG(" ct=%" PRIu32 "\n", r->cons.tail); - ODP_DBG(" ch=%" PRIu32 "\n", r->cons.head); - ODP_DBG(" pt=%" PRIu32 "\n", r->prod.tail); - ODP_DBG(" ph=%" PRIu32 "\n", r->prod.head); - ODP_DBG(" used=%u\n", _ring_count(r)); - ODP_DBG(" avail=%u\n", _ring_free_count(r)); -} - -/* dump the status of all rings on the console */ -void _ring_list_dump(void) -{ - const _ring_t *mp = NULL; - - odp_rwlock_read_lock(&global->qlock); - - TAILQ_FOREACH(mp, &global->ring_list, next) { - _ring_dump(mp); - } - - odp_rwlock_read_unlock(&global->qlock); -} - -/* search a ring from its name */ -_ring_t *_ring_lookup(const char *name) -{ - _ring_t *r; - - odp_rwlock_read_lock(&global->qlock); - TAILQ_FOREACH(r, &global->ring_list, next) { - if (strncmp(name, r->name, _RING_NAMESIZE) == 0) - break; - } - odp_rwlock_read_unlock(&global->qlock); - - return r; -} - -/** - * Enqueue several objects on the ring (multi-producers safe). - */ -int _ring_mp_enqueue_burst(_ring_t *r, void * const *obj_table, - unsigned n) +unsigned _ring_free_count(ring_ptr_t *r, uint32_t mask) { - return ___ring_mp_do_enqueue(r, obj_table, n, - _RING_QUEUE_VARIABLE); -} + uint32_t prod_tail = odp_atomic_load_u32(&r->r.w_tail); + uint32_t cons_tail = odp_atomic_load_u32(&r->r.r_tail);
-/** - * Dequeue several objects from a ring (multi-consumers safe). - */ -int _ring_mc_dequeue_burst(_ring_t *r, void **obj_table, unsigned n) -{ - return ___ring_mc_do_dequeue(r, obj_table, n, - _RING_QUEUE_VARIABLE); + return (cons_tail - prod_tail - 1) & mask; }
commit 720d655c9161007b561bf033a4fbe0b31ab7fb6a Author: Matias Elo matias.elo@nokia.com Date: Thu Oct 24 15:58:33 2019 +0300
linux-gen: ring: remove internal ring test
Signed-off-by: Matias Elo matias.elo@nokia.com Reviewed-by: Petri Savolainen petri.savolainen@nokia.com
diff --git a/platform/linux-generic/m4/configure.m4 b/platform/linux-generic/m4/configure.m4 index 461dfc0fb..82fe6fe9c 100644 --- a/platform/linux-generic/m4/configure.m4 +++ b/platform/linux-generic/m4/configure.m4 @@ -38,6 +38,5 @@ AC_CONFIG_FILES([platform/linux-generic/Makefile platform/linux-generic/test/Makefile platform/linux-generic/test/validation/api/shmem/Makefile platform/linux-generic/test/validation/api/pktio/Makefile - platform/linux-generic/test/pktio_ipc/Makefile - platform/linux-generic/test/ring/Makefile]) + platform/linux-generic/test/pktio_ipc/Makefile]) ]) diff --git a/platform/linux-generic/test/Makefile.am b/platform/linux-generic/test/Makefile.am index d3193b5e9..870336fd2 100644 --- a/platform/linux-generic/test/Makefile.am +++ b/platform/linux-generic/test/Makefile.am @@ -23,8 +23,7 @@ test_SCRIPTS = $(dist_check_SCRIPTS)
SUBDIRS += validation/api/pktio\ validation/api/shmem\ - pktio_ipc\ - ring + pktio_ipc
if HAVE_PCAP TESTS += validation/api/pktio/pktio_run_pcap.sh diff --git a/platform/linux-generic/test/ring/.gitignore b/platform/linux-generic/test/ring/.gitignore deleted file mode 100644 index 7341a340c..000000000 --- a/platform/linux-generic/test/ring/.gitignore +++ /dev/null @@ -1 +0,0 @@ -ring_main diff --git a/platform/linux-generic/test/ring/Makefile.am b/platform/linux-generic/test/ring/Makefile.am deleted file mode 100644 index 999beed4f..000000000 --- a/platform/linux-generic/test/ring/Makefile.am +++ /dev/null @@ -1,38 +0,0 @@ -# ring test uses internal symbols from libodp-linux which are not available -# when linking test with libodp-linux.so -if STATIC_APPS - -include $(top_srcdir)/test/Makefile.inc - -test_PROGRAMS = ring_main -ring_main_SOURCES = \ - ring_main.c \ - ring_suites.c ring_suites.h \ - ring_basic.c ring_stress.c - -TESTS = ring_main$(EXEEXT) - -PRELDADD += $(LIBCUNIT_COMMON) - -AM_CPPFLAGS += -I$(top_srcdir)/platform/linux-generic/include - -AM_CFLAGS += $(LIBCONFIG_CFLAGS) - -TESTNAME = linux-generic-ring - -TESTENV = tests-$(TESTNAME).env - -test_DATA = $(TESTENV) - -DISTCLEANFILES = $(TESTENV) -.PHONY: $(TESTENV) -$(TESTENV): - echo "TESTS="$(TESTS)"" > $@ - echo "$(TESTS_ENVIRONMENT)" >> $@ - echo "$(LOG_COMPILER)" >> $@ - -if test_installdir -installcheck-local: - $(DESTDIR)/$(testdir)/run-test.sh $(TESTNAME) -endif -endif diff --git a/platform/linux-generic/test/ring/ring_basic.c b/platform/linux-generic/test/ring/ring_basic.c deleted file mode 100644 index dc76fd6b3..000000000 --- a/platform/linux-generic/test/ring/ring_basic.c +++ /dev/null @@ -1,280 +0,0 @@ -/* Copyright (c) 2016-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -/** - * @file - * - * ODP ring basic test - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> - -#include <odp_cunit_common.h> -#include <odp_packet_io_ring_internal.h> -#include <odp_errno_define.h> - -#include <odp/helper/odph_api.h> - -#include "ring_suites.h" - -/* labor functions declaration */ -static void __do_basic_burst(_ring_t *r); -static void __do_basic_bulk(_ring_t *r); - -/* dummy object pointers for enqueue and dequeue testing */ -static void **test_enq_data; -static void **test_deq_data; - -/* create multiple thread test ring */ -static const char *mt_ring_name = "MT basic ring"; -static _ring_t *mt_ring; - -int ring_test_basic_start(void) -{ - int i = 0; - - /* alloc dummy object pointers for enqueue testing */ - test_enq_data = malloc(RING_SIZE * 2 * sizeof(void *)); - if (NULL == test_enq_data) { - ODPH_ERR("failed to allocate basic test enqeue data\n"); - return -1; - } - - for (i = 0; i < RING_SIZE * 2; i++) - test_enq_data[i] = (void *)(unsigned long)i; - - /* alloc dummy object pointers for dequeue testing */ - test_deq_data = malloc(RING_SIZE * 2 * sizeof(void *)); - if (NULL == test_deq_data) { - ODPH_ERR("failed to allocate basic test dequeue data\n"); - free(test_enq_data); test_enq_data = NULL; - return -1; - } - - memset(test_deq_data, 0, RING_SIZE * 2 * sizeof(void *)); - return 0; -} - -int ring_test_basic_end(void) -{ - _ring_destroy(mt_ring_name); - - free(test_enq_data); - free(test_deq_data); - return 0; -} - -/* basic test cases */ -void ring_test_basic_create(void) -{ - /* prove illegal size shall fail */ - mt_ring = _ring_create(mt_ring_name, ILLEGAL_SIZE, 0); - CU_ASSERT(NULL == mt_ring); - CU_ASSERT(EINVAL == __odp_errno); - - /* create ring for multiple thread usage scenario */ - mt_ring = _ring_create(mt_ring_name, RING_SIZE, - _RING_SHM_PROC); - - CU_ASSERT(NULL != mt_ring); - CU_ASSERT(_ring_lookup(mt_ring_name) == mt_ring); -} - -void ring_test_basic_burst(void) -{ - __do_basic_burst(mt_ring); -} - -void ring_test_basic_bulk(void) -{ - __do_basic_bulk(mt_ring); -} - -/* labor functions definition */ -static void __do_basic_burst(_ring_t *r) -{ - int result = 0; - unsigned int count = 0; - void * const *source = test_enq_data; - void * const *dest = test_deq_data; - void **enq = NULL, **deq = NULL; - - enq = test_enq_data; deq = test_deq_data; - - /* ring is empty */ - CU_ASSERT(1 == _ring_empty(r)); - - /* enqueue 1 object */ - result = _ring_mp_enqueue_burst(r, enq, 1); - enq += 1; - CU_ASSERT(1 == (result & _RING_SZ_MASK)); - - /* enqueue 2 objects */ - result = _ring_mp_enqueue_burst(r, enq, 2); - enq += 2; - CU_ASSERT(2 == (result & _RING_SZ_MASK)); - - /* enqueue HALF_BULK objects */ - result = _ring_mp_enqueue_burst(r, enq, HALF_BULK); - enq += HALF_BULK; - CU_ASSERT(HALF_BULK == (result & _RING_SZ_MASK)); - - /* ring is neither empty nor full */ - CU_ASSERT(0 == _ring_full(r)); - CU_ASSERT(0 == _ring_empty(r)); - - /* _ring_count() equals enqueued */ - count = (1 + 2 + HALF_BULK); - CU_ASSERT(count == _ring_count(r)); - /* _ring_free_count() equals rooms left */ - count = (RING_SIZE - 1) - count; - CU_ASSERT(count == _ring_free_count(r)); - - /* exceed the size, enquene as many as possible */ - result = _ring_mp_enqueue_burst(r, enq, HALF_BULK); - enq += count; - CU_ASSERT(count == (result & _RING_SZ_MASK)); - CU_ASSERT(1 == _ring_full(r)); - - /* dequeue 1 object */ - result = _ring_mc_dequeue_burst(r, deq, 1); - deq += 1; - CU_ASSERT(1 == (result & _RING_SZ_MASK)); - - /* dequeue 2 objects */ - result = _ring_mc_dequeue_burst(r, deq, 2); - deq += 2; - CU_ASSERT(2 == (result & _RING_SZ_MASK)); - - /* dequeue HALF_BULK objects */ - result = _ring_mc_dequeue_burst(r, deq, HALF_BULK); - deq += HALF_BULK; - CU_ASSERT(HALF_BULK == (result & _RING_SZ_MASK)); - - /* _ring_free_count() equals dequeued */ - count = (1 + 2 + HALF_BULK); - CU_ASSERT(count == _ring_free_count(r)); - /* _ring_count() equals remained left */ - count = (RING_SIZE - 1) - count; - CU_ASSERT(count == _ring_count(r)); - - /* underrun the size, dequeue as many as possible */ - result = _ring_mc_dequeue_burst(r, deq, HALF_BULK); - deq += count; - CU_ASSERT(count == (result & _RING_SZ_MASK)); - CU_ASSERT(1 == _ring_empty(r)); - - /* check data */ - CU_ASSERT(0 == memcmp(source, dest, deq - dest)); - - /* reset dequeue data */ - memset(test_deq_data, 0, RING_SIZE * 2 * sizeof(void *)); -} - -/* incomplete ring API set: strange! - * complement _ring_enqueue/dequeue_bulk to improve coverage - */ -static inline int __ring_enqueue_bulk( - _ring_t *r, void * const *objects, unsigned bulk) -{ - return _ring_mp_enqueue_bulk(r, objects, bulk); -} - -static inline int __ring_dequeue_bulk( - _ring_t *r, void **objects, unsigned bulk) -{ - return _ring_mc_dequeue_bulk(r, objects, bulk); -} - -static void __do_basic_bulk(_ring_t *r) -{ - int result = 0; - unsigned int count = 0; - void * const *source = test_enq_data; - void * const *dest = test_deq_data; - void **enq = NULL, **deq = NULL; - - enq = test_enq_data; deq = test_deq_data; - - /* ring is empty */ - CU_ASSERT(1 == _ring_empty(r)); - - /* enqueue 1 object */ - result = __ring_enqueue_bulk(r, enq, 1); - enq += 1; - CU_ASSERT(0 == result); - - /* enqueue 2 objects */ - result = __ring_enqueue_bulk(r, enq, 2); - enq += 2; - CU_ASSERT(0 == result); - - /* enqueue HALF_BULK objects */ - result = __ring_enqueue_bulk(r, enq, HALF_BULK); - enq += HALF_BULK; - CU_ASSERT(0 == result); - - /* ring is neither empty nor full */ - CU_ASSERT(0 == _ring_full(r)); - CU_ASSERT(0 == _ring_empty(r)); - - /* _ring_count() equals enqueued */ - count = (1 + 2 + HALF_BULK); - CU_ASSERT(count == _ring_count(r)); - /* _ring_free_count() equals rooms left */ - count = (RING_SIZE - 1) - count; - CU_ASSERT(count == _ring_free_count(r)); - - /* exceed the size, enquene shall fail with -ENOBUFS */ - result = __ring_enqueue_bulk(r, enq, HALF_BULK); - CU_ASSERT(-ENOBUFS == result); - - /* fullful the ring */ - result = __ring_enqueue_bulk(r, enq, count); - enq += count; - CU_ASSERT(0 == result); - CU_ASSERT(1 == _ring_full(r)); - - /* dequeue 1 object */ - result = __ring_dequeue_bulk(r, deq, 1); - deq += 1; - CU_ASSERT(0 == result); - - /* dequeue 2 objects */ - result = __ring_dequeue_bulk(r, deq, 2); - deq += 2; - CU_ASSERT(0 == result); - - /* dequeue HALF_BULK objects */ - result = __ring_dequeue_bulk(r, deq, HALF_BULK); - deq += HALF_BULK; - CU_ASSERT(0 == result); - - /* _ring_free_count() equals dequeued */ - count = (1 + 2 + HALF_BULK); - CU_ASSERT(count == _ring_free_count(r)); - /* _ring_count() equals remained left */ - count = (RING_SIZE - 1) - count; - CU_ASSERT(count == _ring_count(r)); - - /* underrun the size, dequeue shall fail with -ENOENT */ - result = __ring_dequeue_bulk(r, deq, HALF_BULK); - CU_ASSERT(-ENOENT == result); - - /* empty the queue */ - result = __ring_dequeue_bulk(r, deq, count); - deq += count; - CU_ASSERT(0 == result); - CU_ASSERT(1 == _ring_empty(r)); - - /* check data */ - CU_ASSERT(0 == memcmp(source, dest, deq - dest)); - - /* reset dequeue data */ - memset(test_deq_data, 0, RING_SIZE * 2 * sizeof(void *)); -} diff --git a/platform/linux-generic/test/ring/ring_main.c b/platform/linux-generic/test/ring/ring_main.c deleted file mode 100644 index 8d0f07527..000000000 --- a/platform/linux-generic/test/ring/ring_main.c +++ /dev/null @@ -1,12 +0,0 @@ -/* Copyright (c) 2016-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "ring_suites.h" - -int main(int argc, char *argv[]) -{ - return ring_suites_main(argc, argv); -} diff --git a/platform/linux-generic/test/ring/ring_stress.c b/platform/linux-generic/test/ring/ring_stress.c deleted file mode 100644 index b0b996789..000000000 --- a/platform/linux-generic/test/ring/ring_stress.c +++ /dev/null @@ -1,270 +0,0 @@ -/* Copyright (c) 2016-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -/** - * @file - * - * ODP ring stress test - */ - -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#endif - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <unistd.h> - -#include <odp_api.h> -#include <odp/helper/odph_api.h> -#include <odp_packet_io_ring_internal.h> -#include <odp_cunit_common.h> - -#include "ring_suites.h" - -/* There's even number of producer and consumer threads and each thread does - * this many successful enq or deq operations */ -#define NUM_BULK_OP ((RING_SIZE / PIECE_BULK) * 100) - -/* - * Note : make sure that both enqueue and dequeue - * operation starts at same time so to avoid data corruption - * Its because atomic lock will protect only indexes, but if order of - * read or write operation incorrect then data mismatch will happen - * So its resposibility of application develop to take care of order of - * data read or write. - */ -typedef enum { - STRESS_1_1_PRODUCER_CONSUMER, - STRESS_1_N_PRODUCER_CONSUMER, - STRESS_N_1_PRODUCER_CONSUMER, - STRESS_N_M_PRODUCER_CONSUMER -} stress_case_t; - -#define GLOBAL_SHM_NAME "RingGlobalShm" - -/* worker function declarations */ -static int stress_worker(void *_data); - -/* global name for later look up in workers' context */ -static const char *ring_name = "stress_ring"; - -typedef struct { - odp_shm_t shm; - /* Barrier to run threads at the same time */ - odp_barrier_t barrier; - /* - * Since cunit framework cannot work with multi-threading, ask workers - * to save their results for delayed assertion after thread collection. - */ - int worker_results[MAX_WORKERS]; -} global_shared_mem_t; - -static global_shared_mem_t *global_mem; - -int ring_test_stress_start(void) -{ - _ring_t *r_stress = NULL; - odp_shm_t shm; - - shm = odp_shm_reserve(GLOBAL_SHM_NAME, sizeof(global_shared_mem_t), 64, - ODP_SHM_SW_ONLY); - if (shm == ODP_SHM_INVALID) { - fprintf(stderr, "Unable reserve memory for global_shm\n"); - return -1; - } - - global_mem = odp_shm_addr(shm); - memset(global_mem, 0, sizeof(global_shared_mem_t)); - global_mem->shm = shm; - - /* multiple thread usage scenario, thread or process sharable */ - r_stress = _ring_create(ring_name, RING_SIZE, _RING_SHM_PROC); - if (r_stress == NULL) { - ODPH_ERR("create ring failed for stress.\n"); - return -1; - } - - return 0; -} - -int ring_test_stress_end(void) -{ - if (odp_shm_free(global_mem->shm)) { - fprintf(stderr, "error: odp_shm_free() failed.\n"); - return -1; - } - - return 0; -} - -void ring_test_stress_1_1_producer_consumer(void) -{ - int i = 0; - odp_cpumask_t cpus; - pthrd_arg worker_param; - - /* reset results for delayed assertion */ - memset(global_mem->worker_results, 0, - sizeof(global_mem->worker_results)); - - /* request 2 threads to run 1:1 stress */ - worker_param.numthrds = odp_cpumask_default_worker(&cpus, 2); - worker_param.testcase = STRESS_1_1_PRODUCER_CONSUMER; - - /* not failure, insufficient resource */ - if (worker_param.numthrds < 2) { - ODPH_ERR("insufficient cpu for 1:1 " - "producer/consumer stress.\n"); - return; - } - - odp_barrier_init(&global_mem->barrier, 2); - - /* kick the workers */ - odp_cunit_thread_create(stress_worker, &worker_param); - - /* collect the results */ - odp_cunit_thread_exit(&worker_param); - - /* delayed assertion due to cunit limitation */ - for (i = 0; i < worker_param.numthrds; i++) - CU_ASSERT(0 == global_mem->worker_results[i]); -} - -void ring_test_stress_N_M_producer_consumer(void) -{ - int i = 0; - odp_cpumask_t cpus; - pthrd_arg worker_param; - - /* reset results for delayed assertion */ - memset(global_mem->worker_results, 0, - sizeof(global_mem->worker_results)); - - /* request MAX_WORKERS threads to run N:M stress */ - worker_param.numthrds = - odp_cpumask_default_worker(&cpus, MAX_WORKERS); - worker_param.testcase = STRESS_N_M_PRODUCER_CONSUMER; - - /* not failure, insufficient resource */ - if (worker_param.numthrds < 3) { - ODPH_ERR("insufficient cpu for N:M " - "producer/consumer stress.\n"); - return; - } - - /* force even number of threads */ - if (worker_param.numthrds & 0x1) - worker_param.numthrds -= 1; - - odp_barrier_init(&global_mem->barrier, worker_param.numthrds); - - /* kick the workers */ - odp_cunit_thread_create(stress_worker, &worker_param); - - /* collect the results */ - odp_cunit_thread_exit(&worker_param); - - /* delayed assertion due to cunit limitation */ - for (i = 0; i < worker_param.numthrds; i++) - CU_ASSERT(0 == global_mem->worker_results[i]); -} - -void ring_test_stress_1_N_producer_consumer(void) -{ -} - -void ring_test_stress_N_1_producer_consumer(void) -{ -} - -void ring_test_stress_ring_list_dump(void) -{ - /* improve code coverage */ - _ring_list_dump(); -} - -/* worker function for multiple producer instances */ -static int do_producer(_ring_t *r) -{ - void *enq[PIECE_BULK]; - int i; - int num = NUM_BULK_OP; - - /* data pattern to be evaluated later in consumer */ - for (i = 0; i < PIECE_BULK; i++) - enq[i] = (void *)(uintptr_t)i; - - while (num) - if (_ring_mp_enqueue_bulk(r, enq, PIECE_BULK) == 0) - num--; - - return 0; -} - -/* worker function for multiple consumer instances */ -static int do_consumer(_ring_t *r) -{ - void *deq[PIECE_BULK]; - int i; - int num = NUM_BULK_OP; - - while (num) { - if (_ring_mc_dequeue_bulk(r, deq, PIECE_BULK) == 0) { - num--; - - /* evaluate the data pattern */ - for (i = 0; i < PIECE_BULK; i++) - CU_ASSERT(deq[i] == (void *)(uintptr_t)i); - } - } - - return 0; -} - -static int stress_worker(void *_data) -{ - pthrd_arg *worker_param = (pthrd_arg *)_data; - _ring_t *r_stress = NULL; - int *result = NULL; - int worker_id = odp_thread_id(); - - /* save the worker result for delayed assertion */ - result = &global_mem->worker_results[(worker_id % - worker_param->numthrds)]; - - /* verify ring lookup in worker context */ - r_stress = _ring_lookup(ring_name); - if (NULL == r_stress) { - ODPH_ERR("ring lookup %s not found\n", ring_name); - return (*result = -1); - } - - odp_barrier_wait(&global_mem->barrier); - - switch (worker_param->testcase) { - case STRESS_1_1_PRODUCER_CONSUMER: - case STRESS_N_M_PRODUCER_CONSUMER: - /* interleaved producer/consumer */ - if (0 == (worker_id % 2)) - *result = do_producer(r_stress); - else if (1 == (worker_id % 2)) - *result = do_consumer(r_stress); - break; - case STRESS_1_N_PRODUCER_CONSUMER: - case STRESS_N_1_PRODUCER_CONSUMER: - default: - ODPH_ERR("invalid or not-implemented stress type (%d)\n", - worker_param->testcase); - break; - } - - odp_barrier_wait(&global_mem->barrier); - - return 0; -} diff --git a/platform/linux-generic/test/ring/ring_suites.c b/platform/linux-generic/test/ring/ring_suites.c deleted file mode 100644 index a6b3376ce..000000000 --- a/platform/linux-generic/test/ring/ring_suites.c +++ /dev/null @@ -1,74 +0,0 @@ -/* Copyright (c) 2016-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> - -#include <odp_api.h> -#include <odp/helper/odph_api.h> - -#include <odp_cunit_common.h> -#include <odp_packet_io_ring_internal.h> - -#include "ring_suites.h" - -static int ring_suites_init(odp_instance_t *inst) -{ - if (0 != odp_init_global(inst, NULL, NULL)) { - ODPH_ERR("error: odp_init_global() failed.\n"); - return -1; - } - if (0 != odp_init_local(*inst, ODP_THREAD_CONTROL)) { - ODPH_ERR("error: odp_init_local() failed.\n"); - return -1; - } - - _ring_tailq_init(); - return 0; -} - -static odp_testinfo_t ring_suite_basic[] = { - ODP_TEST_INFO(ring_test_basic_create), - ODP_TEST_INFO(ring_test_basic_burst), - ODP_TEST_INFO(ring_test_basic_bulk), - ODP_TEST_INFO_NULL, -}; - -static odp_testinfo_t ring_suite_stress[] = { - ODP_TEST_INFO(ring_test_stress_1_1_producer_consumer), - ODP_TEST_INFO(ring_test_stress_1_N_producer_consumer), - ODP_TEST_INFO(ring_test_stress_N_1_producer_consumer), - ODP_TEST_INFO(ring_test_stress_N_M_producer_consumer), - ODP_TEST_INFO(ring_test_stress_ring_list_dump), - ODP_TEST_INFO_NULL, -}; - -static odp_suiteinfo_t ring_suites[] = { - {"ring basic", ring_test_basic_start, - ring_test_basic_end, ring_suite_basic}, - {"ring stress", ring_test_stress_start, - ring_test_stress_end, ring_suite_stress}, - ODP_SUITE_INFO_NULL -}; - -int ring_suites_main(int argc, char *argv[]) -{ - int ret; - - /* let helper collect its own arguments (e.g. --odph_proc) */ - if (odp_cunit_parse_options(argc, argv)) - return -1; - - odp_cunit_register_global_init(ring_suites_init); - - ret = odp_cunit_register(ring_suites); - - if (ret == 0) - ret = odp_cunit_run(); - - return ret; -} diff --git a/platform/linux-generic/test/ring/ring_suites.h b/platform/linux-generic/test/ring/ring_suites.h deleted file mode 100644 index d56ab7845..000000000 --- a/platform/linux-generic/test/ring/ring_suites.h +++ /dev/null @@ -1,33 +0,0 @@ -/* Copyright (c) 2016-2018, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#define RING_SIZE 4096 -#define PIECE_BULK 32 - -#define HALF_BULK (RING_SIZE >> 1) -#define ILLEGAL_SIZE (RING_SIZE | 0x3) - -/* test suite start and stop */ -int ring_test_basic_start(void); -int ring_test_basic_end(void); - -/* basic test cases */ -void ring_test_basic_create(void); -void ring_test_basic_burst(void); -void ring_test_basic_bulk(void); - -/* test suite start and stop */ -int ring_test_stress_start(void); -int ring_test_stress_end(void); - -/* stress test cases */ -void ring_test_stress_1_1_producer_consumer(void); -void ring_test_stress_1_N_producer_consumer(void); -void ring_test_stress_N_1_producer_consumer(void); -void ring_test_stress_N_M_producer_consumer(void); -void ring_test_stress_ring_list_dump(void); - -int ring_suites_main(int argc, char *argv[]);
-----------------------------------------------------------------------
Summary of changes: platform/linux-generic/Makefile.am | 3 - .../linux-generic/include/odp_buffer_internal.h | 4 - .../linux-generic/include/odp_packet_io_internal.h | 1 - .../include/odp_packet_io_ipc_internal.h | 47 -- .../include/odp_packet_io_ring_internal.h | 414 ---------------- platform/linux-generic/m4/configure.m4 | 3 +- platform/linux-generic/odp_packet_io.c | 1 - platform/linux-generic/pktio/ipc.c | 536 ++++++++++++--------- platform/linux-generic/pktio/ring.c | 515 -------------------- platform/linux-generic/test/Makefile.am | 3 +- platform/linux-generic/test/ring/.gitignore | 1 - platform/linux-generic/test/ring/Makefile.am | 38 -- platform/linux-generic/test/ring/ring_basic.c | 280 ----------- platform/linux-generic/test/ring/ring_main.c | 12 - platform/linux-generic/test/ring/ring_stress.c | 270 ----------- platform/linux-generic/test/ring/ring_suites.c | 74 --- platform/linux-generic/test/ring/ring_suites.h | 33 -- 17 files changed, 321 insertions(+), 1914 deletions(-) delete mode 100644 platform/linux-generic/include/odp_packet_io_ipc_internal.h delete mode 100644 platform/linux-generic/include/odp_packet_io_ring_internal.h delete mode 100644 platform/linux-generic/pktio/ring.c delete mode 100644 platform/linux-generic/test/ring/.gitignore delete mode 100644 platform/linux-generic/test/ring/Makefile.am delete mode 100644 platform/linux-generic/test/ring/ring_basic.c delete mode 100644 platform/linux-generic/test/ring/ring_main.c delete mode 100644 platform/linux-generic/test/ring/ring_stress.c delete mode 100644 platform/linux-generic/test/ring/ring_suites.c delete mode 100644 platform/linux-generic/test/ring/ring_suites.h
hooks/post-receive