[RFC] Races in aio call back path.

From: Suzuki
Date: Wed Jul 27 2005 - 07:11:11 EST


Hi all,

I found some races in the AIO call back path which is the root cause for
a few problems in the AIO code. The race is between the aio call back
path ( executed by softirqs ) and the aio_run_iocb().

(1) Suppose a retry returned -EIOCBRETRY and wait has been queued.

Now, whenever the wait event has been completed, the aio_wake_function()
is invoked by the softirqs. aio_wake_function deletes the wait entry
(iocb->ki_wait.task_list) and invokes kick_iocb() to kick the iocb and
queue it back to the run_list.

Consider a situation like:

SOFTIRQ:
list_del_init(&wait->task_list);
## SOFTIRQ gets scheduled or PROCESS executes on another CPU ##
kick_iocb(iocb);
PROCESS:
/*
* Issue an additional retry to avoid waiting forever if
* no waits were queued (e.g. in case of a short read).
*/
if (list_empty(&iocb->ki_wait.task_list))
kiocbSetKicked(iocb);

So, PROCESS might see the deleted(empty) wait list
(iocb->ki_wait.task_list) and will kick and queue it up and it will be
retried again. Now if the next retry is going to happen before SOFTIRQ
gets a chance to run, we are running into problems.

There are two possiblities:

(a) The iocb may get completed.
If this happens before the SOFTIRQ enters kick_iocb(iocb), we are about
to kick an iocb which is no more valid. This might cause a Kernel Oops
while trying,
spin_lock_irqsave(&ctx->ctx_lock, flags);
since the iocb->ki_ctx may be invalid.

(b) The iocb might encounter another wait condition.
This case iocb would be queued for some events. This would cause the
SOFTIRQ hit
WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
in queue_kicked_iocb().

(2) Similar races occur due to Kicking the iocb.

i.e,
SOFTIRQ: [in kick_iocb()]
if(!kiocbTryKick(iocb)) {
# If we get scheduled here. And PROCESS reaches as below #
queue_kicked_iocb(iocb);
PROCESS: [ in aio_run_iocb() ]
/* will make __queue_kicked_iocb succeed from here on */
INIT_LIST_HEAD(&iocb->ki_run_list);
/* we must queue the next iteration ourselves, if it
* has already been kicked */
if (kiocbIsKicked(iocb)) { <- Will be true see ( kicked by kick_iocb)
__queue_kicked_iocb(iocb);
}

Thus iocb will get queued and may be retried. This again can cause the
problems described above.

Conclusion
===========

From this I conclude that the following operations in call back path
for async iocbs should be atomic.

(*) Deletion of the wait queue entry.
(*) Kicking the iocb.
(*) Queue back to run_list. ( which is already atomic with ctx_lock held ).

Also the check for waits being queued in aio_run_iocb() should be done
with the ctx_lock held.

Solution
========
I have created a patch which makes above operations to be done with the
iocb->ki_ctx->ctx_lock held. Also, I have moved the check :

> if (list_empty(&iocb->ki_wait.task_list))
> kiocbSetKicked(iocb);

to be done under the ctx_lock held.

Since we already holds a lock before calling queue_kicked_iocb(), there
is no need for a queue_kicked_iocb anymore. So I have renamed the
__queue_kicked_iocb to queue_kicked_iocb.

Testing
========
I tested the proposed patch in 2.6.13-rc3-mm1 on a 2-way Intel Xeon(with HT) with aio-stress and fsx-linux test cases.
The tests ran fine.


Comments ?

regards,

Suzuki K P
Linux Technology Centre
IBM India Software Labs
Bangalore.

"Sorrow keeps u Human ,Failure Keeps u Humble,Success keeps u Glowing.
But only God Keeps u Going..... "



--- linux-2.6.13-rc3/fs/aio.c.old 2005-07-27 10:19:21.000000000 -0700
+++ linux-2.6.13-rc3/fs/aio.c 2005-07-27 10:22:27.000000000 -0700
@@ -609,7 +609,7 @@ static void unuse_mm(struct mm_struct *m
* Should be called with the spin lock iocb->ki_ctx->ctx_lock
* held
*/
-static inline int __queue_kicked_iocb(struct kiocb *iocb)
+static inline int queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;

@@ -724,13 +724,6 @@ static ssize_t aio_run_iocb(struct kiocb
aio_complete(iocb, ret, 0);
/* must not access the iocb after this */
}
- } else {
- /*
- * Issue an additional retry to avoid waiting forever if
- * no waits were queued (e.g. in case of a short read).
- */
- if (list_empty(&iocb->ki_wait.task_list))
- kiocbSetKicked(iocb);
}
out:
spin_lock_irq(&ctx->ctx_lock);
@@ -741,17 +734,23 @@ out:
* and know that there is more left to go,
* this is where we let go so that a subsequent
* "kick" can start the next iteration
+ *
+ * Issue an additional retry to avoid waiting forever if
+ * no waits were queued (e.g. in case of a short read).
+ * (Should be done with ctx_lock held.)
*/

- /* will make __queue_kicked_iocb succeed from here on */
+ if (list_empty(&iocb->ki_wait.task_list))
+ kiocbSetKicked(iocb);
+ /* will make queue_kicked_iocb succeed from here on */
INIT_LIST_HEAD(&iocb->ki_run_list);
/* we must queue the next iteration ourselves, if it
* has already been kicked */
if (kiocbIsKicked(iocb)) {
- __queue_kicked_iocb(iocb);
+ queue_kicked_iocb(iocb);

/*
- * __queue_kicked_iocb will always return 1 here, because
+ * queue_kicked_iocb will always return 1 here, because
* iocb->ki_run_list is empty at this point so it should
* be safe to unconditionally queue the context into the
* work queue.
@@ -870,21 +869,31 @@ static void aio_kick_handler(void *data)


/*
- * Called by kick_iocb to queue the kiocb for retry
- * and if required activate the aio work queue to process
- * it
+ * Kicking an async iocb.
+ * The following operations for the async iocbs should be atomic
+ * to avoid races with the aio_run_iocb() code.
+ * (1) Deleting the wait queue entry.
+ * (2) Kicking the iocb.
+ * (3) Queue the iocb back to run_list.
+ * Holds the ctx->ctx_lock to avoid races.
*/
-static void queue_kicked_iocb(struct kiocb *iocb)
+static void kick_async_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
unsigned long flags;
int run = 0;
-
- WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
-
+
spin_lock_irqsave(&ctx->ctx_lock, flags);
- run = __queue_kicked_iocb(iocb);
+ list_del_init(&iocb->ki_wait.task_list);
+ /* If its already kicked we shouldn't queue it again */
+ if (!kiocbTryKick(iocb)) {
+ run = queue_kicked_iocb(iocb);
+ }
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+
+ /* Activate the aio worker queue if we have successfully queued
+ * the iocb, so that it can be processed
+ */
if (run)
aio_queue_work(ctx);
}
@@ -901,15 +910,13 @@ void fastcall kick_iocb(struct kiocb *io
/* sync iocbs are easy: they can only ever be executing from a
* single context. */
if (is_sync_kiocb(iocb)) {
+ list_del_init(&iocb->ki_wait.task_list);
kiocbSetKicked(iocb);
wake_up_process(iocb->ki_obj.tsk);
return;
- }
-
- /* If its already kicked we shouldn't queue it again */
- if (!kiocbTryKick(iocb)) {
- queue_kicked_iocb(iocb);
- }
+ } else
+ kick_async_iocb(iocb);
+
}
EXPORT_SYMBOL(kick_iocb);

@@ -1461,7 +1468,6 @@ static int aio_wake_function(wait_queue_
{
struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);

- list_del_init(&wait->task_list);
kick_iocb(iocb);
return 1;
}