On Wed, May 07, 2025 at 03:49:40PM -0600, Uday Shankar wrote:
Towards the goal of decoupling ublk_queues from ublk server threads, move resources/data that should be per-thread rather than per-queue out of ublk_queue and into a new struct ublk_thread.
Signed-off-by: Uday Shankar ushankar@purestorage.com
tools/testing/selftests/ublk/kublk.c | 225 ++++++++++++++++++----------------- tools/testing/selftests/ublk/kublk.h | 38 ++++-- 2 files changed, 145 insertions(+), 118 deletions(-)
diff --git a/tools/testing/selftests/ublk/kublk.c b/tools/testing/selftests/ublk/kublk.c index 3ad9e162816c3a10e9928f9d530908cda7595530..313689f94cd6361a9a0f4b9257085b2a62bc8b8c 100644 --- a/tools/testing/selftests/ublk/kublk.c +++ b/tools/testing/selftests/ublk/kublk.c @@ -324,8 +324,8 @@ static void ublk_ctrl_dump(struct ublk_dev *dev) for (i = 0; i < info->nr_hw_queues; i++) { ublk_print_cpu_set(&affinity[i], buf, sizeof(buf));
printf("\tqueue %u: tid %d affinity(%s)\n",
i, dev->q[i].tid, buf);
printf("\tqueue %u: affinity(%s)\n",
} free(affinity); }i, buf);
@@ -395,18 +395,16 @@ static void ublk_queue_deinit(struct ublk_queue *q) free(q->ios[i].buf_addr); } -static void ublk_thread_deinit(struct ublk_queue *q) +static void ublk_thread_deinit(struct ublk_thread *t) {
- q->tid = 0;
- io_uring_unregister_buffers(&t->ring);
- io_uring_unregister_buffers(&q->ring);
- io_uring_unregister_ring_fd(&t->ring);
- io_uring_unregister_ring_fd(&q->ring);
- if (q->ring.ring_fd > 0) {
io_uring_unregister_files(&q->ring);
close(q->ring.ring_fd);
q->ring.ring_fd = -1;
- if (t->ring.ring_fd > 0) {
io_uring_unregister_files(&t->ring);
close(t->ring.ring_fd);
}t->ring.ring_fd = -1;
} @@ -421,7 +419,6 @@ static int ublk_queue_init(struct ublk_queue *q) q->tgt_ops = dev->tgt.ops; q->state = 0; q->q_depth = depth;
- q->cmd_inflight = 0;
if (dev->dev_info.flags & UBLK_F_SUPPORT_ZERO_COPY) { q->state |= UBLKSRV_NO_BUF; @@ -443,6 +440,7 @@ static int ublk_queue_init(struct ublk_queue *q) q->ios[i].buf_addr = NULL; q->ios[i].flags = UBLKSRV_NEED_FETCH_RQ | UBLKSRV_IO_FREE; q->ios[i].q = q;
q->ios[i].tag = i;
if (q->state & UBLKSRV_NO_BUF) continue; @@ -463,47 +461,46 @@ static int ublk_queue_init(struct ublk_queue *q) return -ENOMEM; } -static int ublk_thread_init(struct ublk_queue *q) +static int ublk_thread_init(struct ublk_thread *t) {
- struct ublk_dev *dev = q->dev;
- struct ublk_dev *dev = t->dev; int ring_depth = dev->tgt.sq_depth, cq_depth = dev->tgt.cq_depth; int ret;
- q->tid = gettid();
- ret = ublk_setup_ring(&q->ring, ring_depth, cq_depth,
- ret = ublk_setup_ring(&t->ring, ring_depth, cq_depth, IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN); if (ret < 0) {
ublk_err("ublk dev %d queue %d setup io_uring failed %d\n",
q->dev->dev_info.dev_id, q->q_id, ret);
ublk_err("ublk dev %d thread %d setup io_uring failed %d\n",
goto fail; }dev->dev_info.dev_id, t->idx, ret);
if (dev->dev_info.flags & UBLK_F_SUPPORT_ZERO_COPY) {
ret = io_uring_register_buffers_sparse(&q->ring, q->q_depth);
ret = io_uring_register_buffers_sparse(
if (ret) {&t->ring, dev->dev_info.queue_depth);
ublk_err("ublk dev %d queue %d register spare buffers failed %d",
dev->dev_info.dev_id, q->q_id, ret);
ublk_err("ublk dev %d thread %d register spare buffers failed %d",
} }dev->dev_info.dev_id, t->idx, ret); goto fail;
- io_uring_register_ring_fd(&q->ring);
- io_uring_register_ring_fd(&t->ring);
- ret = io_uring_register_files(&q->ring, dev->fds, dev->nr_fds);
- ret = io_uring_register_files(&t->ring, dev->fds, dev->nr_fds); if (ret) {
ublk_err("ublk dev %d queue %d register files failed %d\n",
q->dev->dev_info.dev_id, q->q_id, ret);
ublk_err("ublk dev %d thread %d register files failed %d\n",
goto fail; }t->dev->dev_info.dev_id, t->idx, ret);
return 0; fail:
- ublk_thread_deinit(q);
- ublk_err("ublk dev %d queue %d thread init failed\n",
dev->dev_info.dev_id, q->q_id);
- ublk_thread_deinit(t);
- ublk_err("ublk dev %d thread %d init failed\n",
return -ENOMEM;dev->dev_info.dev_id, t->idx);
} @@ -545,8 +542,9 @@ static void ublk_dev_unprep(struct ublk_dev *dev) close(dev->fds[0]); } -int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag) +int ublk_queue_io_cmd(struct ublk_io *io) {
- struct ublk_thread *t = io->t; struct ublksrv_io_cmd *cmd; struct io_uring_sqe *sqe[1]; unsigned int cmd_op = 0;
@@ -571,13 +569,13 @@ int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag) else if (io->flags & UBLKSRV_NEED_FETCH_RQ) cmd_op = UBLK_U_IO_FETCH_REQ;
- if (io_uring_sq_space_left(&q->ring) < 1)
io_uring_submit(&q->ring);
- if (io_uring_sq_space_left(&t->ring) < 1)
io_uring_submit(&t->ring);
- ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 1);
- ublk_io_alloc_sqes(io, sqe, 1); if (!sqe[0]) {
ublk_err("%s: run out of sqe %d, tag %d\n",
__func__, q->q_id, tag);
ublk_err("%s: run out of sqe. thread %u, tag %d\n",
return -1; }__func__, t->idx, io->tag);
@@ -592,42 +590,51 @@ int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag) sqe[0]->opcode = IORING_OP_URING_CMD; sqe[0]->flags = IOSQE_FIXED_FILE; sqe[0]->rw_flags = 0;
- cmd->tag = tag;
- cmd->q_id = q->q_id;
- if (!(q->state & UBLKSRV_NO_BUF))
- cmd->tag = io->tag;
- cmd->q_id = io->q->q_id;
- if (!(io->q->state & UBLKSRV_NO_BUF)) cmd->addr = (__u64) (uintptr_t) io->buf_addr; else cmd->addr = 0;
- user_data = build_user_data(tag, _IOC_NR(cmd_op), 0, q->q_id, 0);
- user_data = build_user_data(io->tag, _IOC_NR(cmd_op), 0, io->q->q_id, 0); io_uring_sqe_set_data64(sqe[0], user_data);
io->flags = 0;
- q->cmd_inflight += 1;
- t->cmd_inflight += 1;
- ublk_dbg(UBLK_DBG_IO_CMD, "%s: (qid %d tag %u cmd_op %u) iof %x stopping %d\n",
__func__, q->q_id, tag, cmd_op,
io->flags, !!(q->state & UBLKSRV_QUEUE_STOPPING));
- ublk_dbg(UBLK_DBG_IO_CMD, "%s: (thread %u qid %d tag %u cmd_op %u) iof %x stopping %d\n",
__func__, t->idx, io->q->q_id, io->tag, cmd_op,
return 1;io->flags, !!(t->state & UBLKSRV_THREAD_STOPPING));
} -static void ublk_submit_fetch_commands(struct ublk_queue *q) +static void ublk_submit_fetch_commands(struct ublk_thread *t) {
- /*
* Service exclusively the queue whose q_id matches our thread
* index. This may change in the future.
*/
- struct ublk_queue *q = &t->dev->q[t->idx];
- struct ublk_io *io; int i = 0;
- for (i = 0; i < q->q_depth; i++)
ublk_queue_io_cmd(q, &q->ios[i], i);
- for (i = 0; i < q->q_depth; i++) {
io = &q->ios[i];
io->t = t;
ublk_queue_io_cmd(io);
- }
} -static int ublk_queue_is_idle(struct ublk_queue *q) +static int ublk_thread_is_idle(struct ublk_thread *t) {
- return !io_uring_sq_ready(&q->ring) && !q->io_inflight;
- return !io_uring_sq_ready(&t->ring) && !t->io_inflight;
} -static int ublk_queue_is_done(struct ublk_queue *q) +static int ublk_thread_is_done(struct ublk_thread *t) {
- return (q->state & UBLKSRV_QUEUE_STOPPING) && ublk_queue_is_idle(q);
- return (t->state & UBLKSRV_THREAD_STOPPING) && ublk_thread_is_idle(t);
} static inline void ublksrv_handle_tgt_cqe(struct ublk_queue *q, @@ -645,15 +652,16 @@ static inline void ublksrv_handle_tgt_cqe(struct ublk_queue *q, q->tgt_ops->tgt_io_done(q, tag, cqe); } -static void ublk_handle_cqe(struct ublk_dev *dev, +static void ublk_handle_cqe(struct ublk_thread *t, struct io_uring_cqe *cqe, void *data) {
- struct ublk_dev *dev = t->dev; unsigned q_id = user_data_to_q_id(cqe->user_data); struct ublk_queue *q = &dev->q[q_id]; unsigned tag = user_data_to_tag(cqe->user_data); unsigned cmd_op = user_data_to_op(cqe->user_data); int fetch = (cqe->res != UBLK_IO_RES_ABORT) &&
!(q->state & UBLKSRV_QUEUE_STOPPING);
struct ublk_io *io;!(t->state & UBLKSRV_THREAD_STOPPING);
if (cqe->res < 0 && cqe->res != -ENODEV) @@ -664,7 +672,7 @@ static void ublk_handle_cqe(struct ublk_dev *dev, __func__, cqe->res, q->q_id, tag, cmd_op, is_target_io(cqe->user_data), user_data_to_tgt_data(cqe->user_data),
(q->state & UBLKSRV_QUEUE_STOPPING));
(t->state & UBLKSRV_THREAD_STOPPING));
/* Don't retrieve io in case of target io */ if (is_target_io(cqe->user_data)) { @@ -673,10 +681,10 @@ static void ublk_handle_cqe(struct ublk_dev *dev, } io = &q->ios[tag];
- q->cmd_inflight--;
- t->cmd_inflight--;
if (!fetch) {
q->state |= UBLKSRV_QUEUE_STOPPING;
io->flags &= ~UBLKSRV_NEED_FETCH_RQ; }t->state |= UBLKSRV_THREAD_STOPPING;
@@ -686,7 +694,7 @@ static void ublk_handle_cqe(struct ublk_dev *dev, q->tgt_ops->queue_io(q, tag); } else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) { io->flags |= UBLKSRV_NEED_GET_DATA | UBLKSRV_IO_FREE;
ublk_queue_io_cmd(q, io, tag);
} else { /*ublk_queue_io_cmd(io);
- COMMIT_REQ will be completed immediately since no fetching
@@ -700,87 +708,92 @@ static void ublk_handle_cqe(struct ublk_dev *dev, } } -static int ublk_reap_events_uring(struct ublk_queue *q) +static int ublk_reap_events_uring(struct ublk_thread *t) { struct io_uring_cqe *cqe; unsigned head; int count = 0;
- io_uring_for_each_cqe(&q->ring, head, cqe) {
ublk_handle_cqe(q->dev, cqe, NULL);
- io_uring_for_each_cqe(&t->ring, head, cqe) {
count += 1; }ublk_handle_cqe(t, cqe, NULL);
- io_uring_cq_advance(&q->ring, count);
- io_uring_cq_advance(&t->ring, count);
return count; } -static int ublk_process_io(struct ublk_queue *q) +static int ublk_process_io(struct ublk_thread *t) { int ret, reapped;
- ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: to_submit %d inflight cmd %u stopping %d\n",
q->dev->dev_info.dev_id,
q->q_id, io_uring_sq_ready(&q->ring),
q->cmd_inflight,
(q->state & UBLKSRV_QUEUE_STOPPING));
- ublk_dbg(UBLK_DBG_THREAD, "dev%d-t%u: to_submit %d inflight cmd %u stopping %d\n",
t->dev->dev_info.dev_id,
t->idx, io_uring_sq_ready(&t->ring),
t->cmd_inflight,
(t->state & UBLKSRV_THREAD_STOPPING));
- if (ublk_queue_is_done(q))
- if (ublk_thread_is_done(t)) return -ENODEV;
- ret = io_uring_submit_and_wait(&q->ring, 1);
- reapped = ublk_reap_events_uring(q);
- ret = io_uring_submit_and_wait(&t->ring, 1);
- reapped = ublk_reap_events_uring(t);
- ublk_dbg(UBLK_DBG_QUEUE, "submit result %d, reapped %d stop %d idle %d\n",
ret, reapped, (q->state & UBLKSRV_QUEUE_STOPPING),
(q->state & UBLKSRV_QUEUE_IDLE));
- ublk_dbg(UBLK_DBG_THREAD, "submit result %d, reapped %d stop %d idle %d\n",
ret, reapped, (t->state & UBLKSRV_THREAD_STOPPING),
(t->state & UBLKSRV_THREAD_IDLE));
return reapped; } -static void ublk_queue_set_sched_affinity(const struct ublk_queue *q, +static void ublk_thread_set_sched_affinity(const struct ublk_thread *t, cpu_set_t *cpuset) { if (sched_setaffinity(0, sizeof(*cpuset), cpuset) < 0)
ublk_err("ublk dev %u queue %u set affinity failed",
q->dev->dev_info.dev_id, q->q_id);
ublk_err("ublk dev %u thread %u set affinity failed",
t->dev->dev_info.dev_id, t->idx);
} -struct ublk_queue_info {
- struct ublk_queue *q;
- sem_t *queue_sem;
+struct ublk_thread_info {
- struct ublk_dev *dev;
- unsigned idx;
- sem_t *ready; cpu_set_t *affinity;
}; static void *ublk_io_handler_fn(void *data) {
- struct ublk_queue_info *info = data;
- struct ublk_queue *q = info->q;
- int dev_id = q->dev->dev_info.dev_id;
- struct ublk_thread_info *info = data;
- struct ublk_thread *t = &info->dev->threads[info->idx];
- int dev_id = info->dev->dev_info.dev_id; int ret;
- ret = ublk_thread_init(q);
- t->dev = info->dev;
- t->idx = info->idx;
- ret = ublk_thread_init(t); if (ret) {
ublk_err("ublk dev %d queue %d thread init failed\n",
dev_id, q->q_id);
ublk_err("ublk dev %d thread %u init failed\n",
return NULL; } /* IO perf is sensitive with queue pthread affinity on NUMA machine*/dev_id, t->idx);
- ublk_queue_set_sched_affinity(q, info->affinity);
- sem_post(info->queue_sem);
- ublk_thread_set_sched_affinity(t, info->affinity);
- sem_post(info->ready);
- ublk_dbg(UBLK_DBG_QUEUE, "tid %d: ublk dev %d queue %d started\n",
q->tid, dev_id, q->q_id);
- ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %u started\n",
gettid(), dev_id, t->idx);
/* submit all io commands to ublk driver */
- ublk_submit_fetch_commands(q);
- ublk_submit_fetch_commands(t); do {
if (ublk_process_io(q) < 0)
} while (1);if (ublk_process_io(t) < 0) break;
- ublk_dbg(UBLK_DBG_QUEUE, "ublk dev %d queue %d exited\n", dev_id, q->q_id);
- ublk_thread_deinit(q);
- ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %d exiting\n",
gettid(), dev_id, t->idx);
- ublk_thread_deinit(t); return NULL;
} @@ -823,20 +836,19 @@ static int ublk_send_dev_event(const struct dev_ctx *ctx, struct ublk_dev *dev, static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev) { const struct ublksrv_ctrl_dev_info *dinfo = &dev->dev_info;
- struct ublk_queue_info *qinfo;
- struct ublk_thread_info *tinfo; cpu_set_t *affinity_buf; void *thread_ret;
- sem_t queue_sem;
- sem_t ready; int ret, i;
ublk_dbg(UBLK_DBG_DEV, "%s enter\n", __func__);
- qinfo = (struct ublk_queue_info *)calloc(sizeof(struct ublk_queue_info),
dinfo->nr_hw_queues);
- if (!qinfo)
- tinfo = calloc(sizeof(struct ublk_thread_info), dinfo->nr_hw_queues);
- if (!tinfo) return -ENOMEM;
- sem_init(&queue_sem, 0, 0);
- sem_init(&ready, 0, 0); ret = ublk_dev_prep(ctx, dev); if (ret) return ret;
@@ -856,17 +868,18 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev) goto fail; }
qinfo[i].q = &dev->q[i];
qinfo[i].queue_sem = &queue_sem;
qinfo[i].affinity = &affinity_buf[i];
pthread_create(&dev->q[i].thread, NULL,
tinfo[i].dev = dev;
tinfo[i].idx = i;
tinfo[i].ready = &ready;
tinfo[i].affinity = &affinity_buf[i];
pthread_create(&dev->threads[i].thread, NULL, ublk_io_handler_fn,
&qinfo[i]);
}&tinfo[i]);
for (i = 0; i < dinfo->nr_hw_queues; i++)
sem_wait(&queue_sem);
- free(qinfo);
sem_wait(&ready);
- free(tinfo); free(affinity_buf);
/* everything is fine now, start us */ @@ -889,7 +902,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev) /* wait until we are terminated */ for (i = 0; i < dinfo->nr_hw_queues; i++)
pthread_join(dev->q[i].thread, &thread_ret);
fail: for (i = 0; i < dinfo->nr_hw_queues; i++) ublk_queue_deinit(&dev->q[i]);pthread_join(dev->threads[i].thread, &thread_ret);
diff --git a/tools/testing/selftests/ublk/kublk.h b/tools/testing/selftests/ublk/kublk.h index 7c912116606429215af7dbc2a8ce6b40ef89bfbd..9eb2207fcebe96d34488d057c881db262b9767b3 100644 --- a/tools/testing/selftests/ublk/kublk.h +++ b/tools/testing/selftests/ublk/kublk.h @@ -51,10 +51,12 @@ #define UBLK_IO_MAX_BYTES (1 << 20) #define UBLK_MAX_QUEUES_SHIFT 5 #define UBLK_MAX_QUEUES (1 << UBLK_MAX_QUEUES_SHIFT) +#define UBLK_MAX_THREADS_SHIFT 5 +#define UBLK_MAX_THREADS (1 << UBLK_MAX_THREADS_SHIFT) #define UBLK_QUEUE_DEPTH 1024 #define UBLK_DBG_DEV (1U << 0) -#define UBLK_DBG_QUEUE (1U << 1) +#define UBLK_DBG_THREAD (1U << 1) #define UBLK_DBG_IO_CMD (1U << 2) #define UBLK_DBG_IO (1U << 3) #define UBLK_DBG_CTRL_CMD (1U << 4) @@ -62,6 +64,7 @@ struct ublk_dev; struct ublk_queue; +struct ublk_thread; struct stripe_ctx { /* stripe */ @@ -120,6 +123,8 @@ struct ublk_io { unsigned short refs; /* used by target code only */ struct ublk_queue *q;
- struct ublk_thread *t;
Given you have to take static mapping between queue/tag and thread, 'struct ublk_thread' should have been figured out runtime easily, then we can save 8 bytes, also avoid memory indirect dereference.
sizeof(struct ublk_io) need to be held in single L1 cacheline.
But it can be one followup.
Reviewed-by: Ming Lei ming.lei@redhat.com
thanks, Ming