Re: [PATCH v2] fs/aio: obey min_nr when doing wakeups

From: Jeff Moyer
Date: Fri Jan 20 2023 - 14:44:30 EST


Hi, Kent,

Kent Overstreet <kent.overstreet@xxxxxxxxx> writes:

> I've been observing workloads where IPIs due to wakeups in
> aio_complete() are ~15% of total CPU time in the profile. Most of those
> wakeups are unnecessary when completion batching is in use in
> io_getevents().
>
> This plumbs min_nr through via the wait eventry, so that aio_complete()
> can avoid doing unnecessary wakeups.
>
> v2: This fixes a race in the first version of the patch. If we read some
> events out after adding to the waitlist, we need to update wait.min_nr
> call prepare_to_wait_event() again before scheduling.

I like the idea of the patch, and I'll get some real world performance
numbers soon. But first, this version (and the previous version as
well) fails test case 23 in the libaio regression test suite:

Starting cases/23.p
FAIL: poll missed an event!
FAIL: poll missed an event!
test cases/23.t completed FAILED.

I started to look into it, but didn't see anything obvious yet. My test
kernel has the kmap_local patch applied as well, fyi.

Thanks!
Jeff

>
> Signed-off-by: Kent Overstreet <kent.overstreet@xxxxxxxxx>
> Cc: Benjamin LaHaise <bcrl@xxxxxxxxx
> Cc: linux-aio@xxxxxxxxx
> Cc: linux-fsdevel@xxxxxxxxxxxxxxx
> ---
> fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++---------
> 1 file changed, 56 insertions(+), 10 deletions(-)
>
> diff --git a/fs/aio.c b/fs/aio.c
> index 3f795ed2a2..5be35cb8ec 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -1105,6 +1105,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb)
> kmem_cache_free(kiocb_cachep, iocb);
> }
>
> +struct aio_waiter {
> + struct wait_queue_entry w;
> + size_t min_nr;
> +};
> +
> /* aio_complete
> * Called when the io request on the given iocb is complete.
> */
> @@ -1113,7 +1118,7 @@ static void aio_complete(struct aio_kiocb *iocb)
> struct kioctx *ctx = iocb->ki_ctx;
> struct aio_ring *ring;
> struct io_event *ev_page, *event;
> - unsigned tail, pos, head;
> + unsigned tail, pos, head, avail;
> unsigned long flags;
>
> /*
> @@ -1157,6 +1162,10 @@ static void aio_complete(struct aio_kiocb *iocb)
> ctx->completed_events++;
> if (ctx->completed_events > 1)
> refill_reqs_available(ctx, head, tail);
> +
> + avail = tail > head
> + ? tail - head
> + : tail + ctx->nr_events - head;
> spin_unlock_irqrestore(&ctx->completion_lock, flags);
>
> pr_debug("added to ring %p at [%u]\n", iocb, tail);
> @@ -1177,8 +1186,18 @@ static void aio_complete(struct aio_kiocb *iocb)
> */
> smp_mb();
>
> - if (waitqueue_active(&ctx->wait))
> - wake_up(&ctx->wait);
> + if (waitqueue_active(&ctx->wait)) {
> + struct aio_waiter *curr, *next;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&ctx->wait.lock, flags);
> + list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry)
> + if (avail >= curr->min_nr) {
> + list_del_init_careful(&curr->w.entry);
> + wake_up_process(curr->w.private);
> + }
> + spin_unlock_irqrestore(&ctx->wait.lock, flags);
> + }
> }
>
> static inline void iocb_put(struct aio_kiocb *iocb)
> @@ -1294,7 +1313,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
> struct io_event __user *event,
> ktime_t until)
> {
> - long ret = 0;
> + struct hrtimer_sleeper t;
> + struct aio_waiter w;
> + long ret = 0, ret2 = 0;
>
> /*
> * Note that aio_read_events() is being called as the conditional - i.e.
> @@ -1310,12 +1331,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
> * the ringbuffer empty. So in practice we should be ok, but it's
> * something to be aware of when touching this code.
> */
> - if (until == 0)
> - aio_read_events(ctx, min_nr, nr, event, &ret);
> - else
> - wait_event_interruptible_hrtimeout(ctx->wait,
> - aio_read_events(ctx, min_nr, nr, event, &ret),
> - until);
> + aio_read_events(ctx, min_nr, nr, event, &ret);
> + if (until == 0 || ret < 0 || ret >= min_nr)
> + return ret;
> +
> + hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
> + if (until != KTIME_MAX) {
> + hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns);
> + hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL);
> + }
> +
> + init_wait(&w.w);
> +
> + while (1) {
> + unsigned long nr_got = ret;
> +
> + w.min_nr = min_nr - ret;
> +
> + ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?:
> + !t.task ? -ETIME : 0;
> +
> + if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2)
> + break;
> +
> + if (nr_got == ret)
> + schedule();
> + }
> +
> + finish_wait(&ctx->wait, &w.w);
> + hrtimer_cancel(&t.timer);
> + destroy_hrtimer_on_stack(&t.timer);
> +
> return ret;
> }