[PATCH 5/5] aio: Refactor aio_read_evt, use cmxchg(), fix bug

From: Kent Overstreet
Date: Tue Oct 09 2012 - 02:39:45 EST


Bunch of cleanup, and make it lockless so that userspace can safely pull
events off the ringbuffer without racing with io_getevents().

Signed-off-by: Kent Overstreet <koverstreet@xxxxxxxxxx>
---
fs/aio.c | 220 +++++++++++++++++++++-----------------------------------------
1 file changed, 73 insertions(+), 147 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index c3d97d1..1efc8a3 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -259,7 +259,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
atomic_set(&ctx->reqs_active, 1);

spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);

INIT_LIST_HEAD(&ctx->active_reqs);
@@ -941,194 +940,121 @@ put_rq:
}
EXPORT_SYMBOL(aio_complete);

-/* aio_read_evt
- * Pull an event off of the ioctx's event ring. Returns the number of
- * events fetched (0 or 1 ;-)
- * FIXME: make this use cmpxchg.
- * TODO: make the ringbuffer user mmap()able (requires FIXME).
+/* aio_read_events
+ * Pull an event off of the ioctx's event ring. Returns the number of
+ * events fetched
*/
-static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
+static int aio_read_events(struct kioctx *ctx, struct io_event __user *event,
+ long nr)
{
- struct aio_ring_info *info = &ioctx->ring_info;
+ struct aio_ring_info *info = &ctx->ring_info;
struct aio_ring *ring = info->ring;
- unsigned long head;
- int ret = 0;
-
- dprintk("in aio_read_evt h%lu t%lu m%lu\n",
- (unsigned long)ring->head, (unsigned long)ring->tail,
- (unsigned long)ring->nr);
-
- if (ring->head == ring->tail)
- goto out;
-
- spin_lock(&info->ring_lock);
+ unsigned old, new;
+ int ret;

- head = ring->head % info->nr;
- if (head != ring->tail) {
- *ent = ring->io_events[head];
- head = (head + 1) % info->nr;
- smp_mb(); /* finish reading the event before updatng the head */
- ring->head = head;
- ret = 1;
- }
- spin_unlock(&info->ring_lock);
+ pr_debug("h%lu t%lu m%lu\n",
+ (unsigned long) ring->head,
+ (unsigned long) ring->tail,
+ (unsigned long) ring->nr);
+retry:
+ ret = 0;
+ old = new = ring->head;

-out:
- dprintk("leaving aio_read_evt: %d h%lu t%lu\n", ret,
- (unsigned long)ring->head, (unsigned long)ring->tail);
- return ret;
-}
+ while (ret < nr && new != ring->tail) {
+ struct io_event *ev = &ring->io_events[new];
+ unsigned i = (new < ring->tail ? ring->tail : info->nr) - new;

-struct aio_timeout {
- struct timer_list timer;
- int timed_out;
- struct task_struct *p;
-};
+ i = min_t(int, i, nr - ret);

-static void timeout_func(unsigned long data)
-{
- struct aio_timeout *to = (struct aio_timeout *)data;
+ if (unlikely(copy_to_user(event + ret, ev, sizeof(*ev) * i)))
+ return -EFAULT;

- to->timed_out = 1;
- wake_up_process(to->p);
-}
+ ret += i;
+ new += i;
+ new %= info->nr;
+ }

-static inline void init_timeout(struct aio_timeout *to)
-{
- setup_timer_on_stack(&to->timer, timeout_func, (unsigned long) to);
- to->timed_out = 0;
- to->p = current;
-}
+ if (new != old) {
+ smp_mb(); /* finish reading the event before updatng the head */

-static inline void set_timeout(long start_jiffies, struct aio_timeout *to,
- const struct timespec *ts)
-{
- to->timer.expires = start_jiffies + timespec_to_jiffies(ts);
- if (time_after(to->timer.expires, jiffies))
- add_timer(&to->timer);
- else
- to->timed_out = 1;
-}
+ if (cmpxchg(&ring->head, old, new) != old)
+ goto retry;
+ }

-static inline void clear_timeout(struct aio_timeout *to)
-{
- del_singleshot_timer_sync(&to->timer);
+ pr_debug("ret %d h%lu t%lu\n", ret,
+ (unsigned long)ring->head,
+ (unsigned long)ring->tail);
+ return ret;
}

-static int read_events(struct kioctx *ctx,
- long min_nr, long nr,
- struct io_event __user *event,
- struct timespec __user *timeout)
+static int read_events(struct kioctx *ctx, long min_nr, long nr,
+ struct io_event __user *event,
+ struct timespec __user *timeout)
{
- long start_jiffies = jiffies;
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
- int ret;
- int i = 0;
- struct io_event ent;
- struct aio_timeout to;
- int retry = 0;
-
- /* needed to zero any padding within an entry (there shouldn't be
- * any, but C is fun!
- */
- memset(&ent, 0, sizeof(ent));
+ DEFINE_WAIT(wait);
+ long until = MAX_SCHEDULE_TIMEOUT;
+ size_t i = 0;
+ int ret, retry = 0;
retry:
- ret = 0;
- while (likely(i < nr)) {
- ret = aio_read_evt(ctx, &ent);
- if (unlikely(ret <= 0))
- break;
-
- dprintk("read event: %Lx %Lx %Lx %Lx\n",
- ent.data, ent.obj, ent.res, ent.res2);
-
- /* Could we split the check in two? */
- ret = -EFAULT;
- if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- dprintk("aio: lost an event due to EFAULT.\n");
- break;
- }
- ret = 0;
-
- /* Good, event copied to userland, update counts. */
- event ++;
- i ++;
- }
+ ret = aio_read_events(ctx, event + i, nr - i);
+ if (ret < 0)
+ return ret;

- if (min_nr <= i)
+ i += ret;
+ if (i >= min_nr)
return i;
- if (ret)
- return ret;

/* End fast path */

/* racey check, but it gets redone */
+ /* XXX: wtf is this for? */
if (!retry && unlikely(!list_empty(&ctx->run_list))) {
retry = 1;
aio_run_all_iocbs(ctx);
goto retry;
}

- init_timeout(&to);
if (timeout) {
struct timespec ts;
- ret = -EFAULT;
if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
- goto out;
+ return -EFAULT;

- set_timeout(start_jiffies, &to, &ts);
+ until = timespec_to_jiffies(&ts);
}

- while (likely(i < nr)) {
- add_wait_queue_exclusive(&ctx->wait, &wait);
- do {
- set_task_state(tsk, TASK_INTERRUPTIBLE);
- ret = aio_read_evt(ctx, &ent);
- if (ret)
- break;
- if (min_nr <= i)
- break;
- if (unlikely(ctx->dead)) {
- ret = -EINVAL;
- break;
- }
- if (to.timed_out) /* Only check after read evt */
- break;
- /* Try to only show up in io wait if there are ops
- * in flight */
- if (atomic_read(&ctx->reqs_active) > 1)
- io_schedule();
- else
- schedule();
- if (signal_pending(tsk)) {
- ret = -EINTR;
- break;
- }
- /*ret = aio_read_evt(ctx, &ent);*/
- } while (1) ;
+ while (i < nr) {
+ prepare_to_wait_exclusive(&ctx->wait, &wait,
+ TASK_INTERRUPTIBLE);

- set_task_state(tsk, TASK_RUNNING);
- remove_wait_queue(&ctx->wait, &wait);
+ ret = aio_read_events(ctx, event + i, nr - i);
+ if (ret < 0)
+ break;

- if (unlikely(ret <= 0))
+ i += ret;
+ if (i >= min_nr)
break;

- ret = -EFAULT;
- if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
- dprintk("aio: lost an event due to EFAULT.\n");
+ if (unlikely(ctx->dead)) {
+ ret = -EINVAL;
break;
}

- /* Good, event copied to userland, update counts. */
- event ++;
- i ++;
+ if (!until) /* Only check after read evt */
+ break;
+
+ /* Try to only show up in io wait if there are ops in flight */
+ if (atomic_read(&ctx->reqs_active) > 1)
+ until = io_schedule_timeout(until);
+ else
+ until = schedule_timeout(until);
+
+ if (signal_pending(current)) {
+ ret = -EINTR;
+ break;
+ }
}

- if (timeout)
- clear_timeout(&to);
-out:
- destroy_timer_on_stack(&to.timer);
+ finish_wait(&ctx->wait, &wait);
return i ? i : ret;
}

--
1.7.10.4

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/