[GIT PULL] aio: fix sleeping while TASK_INTERRUPTIBLE

From: Benjamin LaHaise
Date: Sun Feb 01 2015 - 09:41:10 EST


Hello Linus,

The following changes since commit 5f785de588735306ec4d7c875caf9d28481c8b21:

aio: Skip timer for io_getevents if timeout=0 (2014-12-13 17:50:20 -0500)

are available in the git repository at:

git://git.kvack.org/~bcrl/aio-fixes.git master

for you to fetch changes up to f84249f4cfef5ffa8a3e6d588a1d185f3f1fef45:

fs/aio: fix sleeping while TASK_INTERRUPTIBLE (2015-01-30 11:18:36 -0500)

----------------------------------------------------------------
Chris Mason (1):
fs/aio: fix sleeping while TASK_INTERRUPTIBLE

fs/aio.c | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 93 insertions(+), 24 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 1b7893e..71b192a 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1131,6 +1131,8 @@ EXPORT_SYMBOL(aio_complete);
/* aio_read_events_ring
* Pull an event off of the ioctx's event ring. Returns the number of
* events fetched
+ *
+ * must be called with ctx->ring locked
*/
static long aio_read_events_ring(struct kioctx *ctx,
struct io_event __user *event, long nr)
@@ -1139,8 +1141,7 @@ static long aio_read_events_ring(struct kioctx *ctx,
unsigned head, tail, pos;
long ret = 0;
int copy_ret;
-
- mutex_lock(&ctx->ring_lock);
+ int running = current->state == TASK_RUNNING;

/* Access to ->ring_pages here is protected by ctx->ring_lock. */
ring = kmap_atomic(ctx->ring_pages[0]);
@@ -1179,6 +1180,17 @@ static long aio_read_events_ring(struct kioctx *ctx,
page = ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE];
pos %= AIO_EVENTS_PER_PAGE;

+ /* we're called after calling prepare_to_wait, which means
+ * our current state might not be TASK_RUNNING. Set it
+ * to running here to sleeps in kmap or copy_to_user don't
+ * trigger warnings. If we don't copy enough events out, we'll
+ * loop through schedule() one time before sleeping.
+ */
+ if (!running) {
+ __set_current_state(TASK_RUNNING);
+ running = 1;
+ }
+
ev = kmap(page);
copy_ret = copy_to_user(event + ret, ev + pos,
sizeof(*ev) * avail);
@@ -1201,11 +1213,10 @@ static long aio_read_events_ring(struct kioctx *ctx,

pr_debug("%li h%u t%u\n", ret, head, tail);
out:
- mutex_unlock(&ctx->ring_lock);
-
return ret;
}

+/* must be called with ctx->ring locked */
static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
struct io_event __user *event, long *i)
{
@@ -1223,6 +1234,73 @@ static bool aio_read_events(struct kioctx *ctx, long min_nr, long nr,
return ret < 0 || *i >= min_nr;
}

+/*
+ * called without ctx->ring_lock held
+ */
+static long aio_wait_events(struct kioctx *ctx, long min_nr, long nr,
+ struct io_event __user *event,
+ long *i_ret, ktime_t until)
+{
+ struct hrtimer_sleeper t;
+ long ret;
+ DEFINE_WAIT(wait);
+
+ mutex_lock(&ctx->ring_lock);
+
+ /*
+ * this is an open coding of wait_event_interruptible_hrtimeout
+ * so we can deal with the ctx->mutex nicely. It starts with the
+ * fast path for existing events:
+ */
+ ret = aio_read_events(ctx, min_nr, nr, event, i_ret);
+ if (ret) {
+ mutex_unlock(&ctx->ring_lock);
+ return ret;
+ }
+
+ hrtimer_init_on_stack(&t.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+ hrtimer_init_sleeper(&t, current);
+ if (until.tv64 != KTIME_MAX) {
+ hrtimer_start_range_ns(&t.timer, until, current->timer_slack_ns,
+ HRTIMER_MODE_REL);
+ }
+
+ while (1) {
+ ret = prepare_to_wait_event(&ctx->wait, &wait,
+ TASK_INTERRUPTIBLE);
+ if (ret)
+ break;
+
+ ret = aio_read_events(ctx, min_nr, nr, event, i_ret);
+ if (ret)
+ break;
+
+ /* make sure we didn't timeout taking the mutex */
+ if (!t.task) {
+ ret = -ETIME;
+ break;
+ }
+
+ mutex_unlock(&ctx->ring_lock);
+ schedule();
+
+ if (!t.task) {
+ ret = -ETIME;
+ goto out;
+ }
+ mutex_lock(&ctx->ring_lock);
+
+ }
+ mutex_unlock(&ctx->ring_lock);
+
+out:
+ finish_wait(&ctx->wait, &wait);
+ hrtimer_cancel(&t.timer);
+ destroy_hrtimer_on_stack(&t.timer);
+ return ret;
+
+}
+
static long read_events(struct kioctx *ctx, long min_nr, long nr,
struct io_event __user *event,
struct timespec __user *timeout)
@@ -1239,27 +1317,18 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
until = timespec_to_ktime(ts);
}

- /*
- * Note that aio_read_events() is being called as the conditional - i.e.
- * we're calling it after prepare_to_wait() has set task state to
- * TASK_INTERRUPTIBLE.
- *
- * But aio_read_events() can block, and if it blocks it's going to flip
- * the task state back to TASK_RUNNING.
- *
- * This should be ok, provided it doesn't flip the state back to
- * TASK_RUNNING and return 0 too much - that causes us to spin. That
- * will only happen if the mutex_lock() call blocks, and we then find
- * the ringbuffer empty. So in practice we should be ok, but it's
- * something to be aware of when touching this code.
- */
- if (until.tv64 == 0)
- aio_read_events(ctx, min_nr, nr, event, &ret);
- else
- wait_event_interruptible_hrtimeout(ctx->wait,
- aio_read_events(ctx, min_nr, nr, event, &ret),
- until);

+ if (until.tv64 == 0) {
+ mutex_lock(&ctx->ring_lock);
+ aio_read_events(ctx, min_nr, nr, event, &ret);
+ mutex_unlock(&ctx->ring_lock);
+ } else {
+ /*
+ * we're pitching the -ETIME and -ERESTARTSYS return values
+ * here. It'll turn into -EINTR? ick
+ */
+ aio_wait_events(ctx, min_nr, nr, event, &ret, until);
+ }
if (!ret && signal_pending(current))
ret = -EINTR;

--
"Thought is the essence of where you are now."
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/