[PATCH] Adding multiple workers to the loop device.

From: muraliraja.muniraju
Date: Tue Jan 21 2020 - 14:25:57 EST


Current loop device implementation has a single kthread worker and
drains one request at a time to completion. If the underneath device is
slow then this reduces the concurrency significantly. To help in these
cases, adding multiple loop workers increases the concurrency. Also to
retain the old behaviour the default number of loop workers is 1 and can
be tuned via the ioctl.
---
drivers/block/loop.c | 68 +++++++++++++++++++++++++++++++++------
drivers/block/loop.h | 9 ++++--
include/uapi/linux/loop.h | 1 +
3 files changed, 67 insertions(+), 11 deletions(-)

diff --git a/drivers/block/loop.c b/drivers/block/loop.c
index 739b372a5112..97ec9485140c 100644
--- a/drivers/block/loop.c
+++ b/drivers/block/loop.c
@@ -793,6 +793,13 @@ static ssize_t loop_attr_backing_file_show(struct loop_device *lo, char *buf)
return ret;
}

+static ssize_t loop_attr_num_loop_workers_show(struct loop_device *lo,
+ char *buf)
+{
+ return sprintf(buf, "%llu\n",
+ (unsigned long long)lo->num_loop_workers);
+}
+
static ssize_t loop_attr_offset_show(struct loop_device *lo, char *buf)
{
return sprintf(buf, "%llu\n", (unsigned long long)lo->lo_offset);
@@ -830,6 +837,7 @@ LOOP_ATTR_RO(sizelimit);
LOOP_ATTR_RO(autoclear);
LOOP_ATTR_RO(partscan);
LOOP_ATTR_RO(dio);
+LOOP_ATTR_RO(num_loop_workers);

static struct attribute *loop_attrs[] = {
&loop_attr_backing_file.attr,
@@ -838,6 +846,7 @@ static struct attribute *loop_attrs[] = {
&loop_attr_autoclear.attr,
&loop_attr_partscan.attr,
&loop_attr_dio.attr,
+ &loop_attr_num_loop_workers.attr,
NULL,
};

@@ -889,10 +898,19 @@ static void loop_config_discard(struct loop_device *lo)
blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
}

+static void loop_queue_cleanup(struct loop_device *lo, int num_workers)
+{
+ int i;
+
+ for (i = 0; i < num_workers; i++)
+ kthread_flush_worker(&(lo->workers[i]));
+ for (i = 0; i < num_workers; i++)
+ kthread_stop(lo->worker_tasks[i]);
+}
+
static void loop_unprepare_queue(struct loop_device *lo)
{
- kthread_flush_worker(&lo->worker);
- kthread_stop(lo->worker_task);
+ loop_queue_cleanup(lo, lo->num_loop_workers);
}

static int loop_kthread_worker_fn(void *worker_ptr)
@@ -903,13 +921,22 @@ static int loop_kthread_worker_fn(void *worker_ptr)

static int loop_prepare_queue(struct loop_device *lo)
{
- kthread_init_worker(&lo->worker);
- lo->worker_task = kthread_run(loop_kthread_worker_fn,
- &lo->worker, "loop%d", lo->lo_number);
- if (IS_ERR(lo->worker_task))
- return -ENOMEM;
- set_user_nice(lo->worker_task, MIN_NICE);
+ int i = 0;
+
+ for (i = 0; i < lo->num_loop_workers; i++) {
+ kthread_init_worker(&(lo->workers[i]));
+ lo->worker_tasks[i] = kthread_run(
+ loop_kthread_worker_fn, &(lo->workers[i]),
+ "loop%d(%d)", lo->lo_number, i);
+ if (IS_ERR((lo->worker_tasks[i])))
+ goto err;
+ set_user_nice(lo->worker_tasks[i], MIN_NICE);
+ }
return 0;
+err:
+ // Cleanup the previous indices, 0 to i-1
+ loop_queue_cleanup(lo, i);
+ return -ENOMEM;
}

static void loop_update_rotational(struct loop_device *lo)
@@ -1529,6 +1556,16 @@ static int loop_set_dio(struct loop_device *lo, unsigned long arg)
return error;
}

+static int loop_set_num_workers(struct loop_device *lo, unsigned long arg)
+{
+ if (lo->lo_state != Lo_unbound)
+ return -ENXIO;
+ if (arg < 1 || arg > MAX_LOOP_WORKER_THREADS)
+ return -EINVAL;
+ lo->num_loop_workers = arg;
+ return 0;
+}
+
static int loop_set_block_size(struct loop_device *lo, unsigned long arg)
{
int err = 0;
@@ -1584,6 +1621,9 @@ static int lo_simple_ioctl(struct loop_device *lo, unsigned int cmd,
case LOOP_SET_BLOCK_SIZE:
err = loop_set_block_size(lo, arg);
break;
+ case LOOP_SET_WORKERS:
+ err = loop_set_num_workers(lo, arg);
+ break;
default:
err = lo->ioctl ? lo->ioctl(lo, cmd, arg) : -EINVAL;
}
@@ -1907,6 +1947,7 @@ static blk_status_t loop_queue_rq(struct blk_mq_hw_ctx *hctx,
struct request *rq = bd->rq;
struct loop_cmd *cmd = blk_mq_rq_to_pdu(rq);
struct loop_device *lo = rq->q->queuedata;
+ unsigned int inx;

blk_mq_start_request(rq);

@@ -1932,7 +1973,14 @@ static blk_status_t loop_queue_rq(struct blk_mq_hw_ctx *hctx,
} else
#endif
cmd->css = NULL;
- kthread_queue_work(&lo->worker, &cmd->work);
+
+ // Round robin the incoming requests across multiple threads
+ // by having a monitorically increasing number indexing
+ // by modulo number of workers.
+ inx = lo->current_queue_inx;
+ inx = inx % lo->num_loop_workers;
+ kthread_queue_work(&lo->workers[inx], &cmd->work);
+ lo->current_queue_inx += 1;

return BLK_STS_OK;
}
@@ -2014,6 +2062,8 @@ static int loop_add(struct loop_device **l, int i)
lo->tag_set.cmd_size = sizeof(struct loop_cmd);
lo->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
lo->tag_set.driver_data = lo;
+ lo->num_loop_workers = DEFAULT_LOOP_WORKERS;
+ lo->current_queue_inx = 0;

err = blk_mq_alloc_tag_set(&lo->tag_set);
if (err)
diff --git a/drivers/block/loop.h b/drivers/block/loop.h
index af75a5ee4094..09d1cc4f9b6b 100644
--- a/drivers/block/loop.h
+++ b/drivers/block/loop.h
@@ -26,6 +26,9 @@ enum {

struct loop_func_table;

+#define MAX_LOOP_WORKER_THREADS 8
+#define DEFAULT_LOOP_WORKERS 1
+
struct loop_device {
int lo_number;
atomic_t lo_refcnt;
@@ -54,8 +57,10 @@ struct loop_device {

spinlock_t lo_lock;
int lo_state;
- struct kthread_worker worker;
- struct task_struct *worker_task;
+ struct kthread_worker workers[MAX_LOOP_WORKER_THREADS];
+ struct task_struct *worker_tasks[MAX_LOOP_WORKER_THREADS];
+ unsigned int current_queue_inx;
+ int num_loop_workers;
bool use_dio;
bool sysfs_inited;

diff --git a/include/uapi/linux/loop.h b/include/uapi/linux/loop.h
index 080a8df134ef..a1b689832bb4 100644
--- a/include/uapi/linux/loop.h
+++ b/include/uapi/linux/loop.h
@@ -90,6 +90,7 @@ struct loop_info64 {
#define LOOP_SET_CAPACITY 0x4C07
#define LOOP_SET_DIRECT_IO 0x4C08
#define LOOP_SET_BLOCK_SIZE 0x4C09
+#define LOOP_SET_WORKERS 0x4C0A

/* /dev/loop-control interface */
#define LOOP_CTL_ADD 0x4C80
--
2.17.1