[patch] eventfd - remove fput() call from possible IRQ context (3rdrev)

From: Davide Libenzi
Date: Wed Mar 18 2009 - 13:25:58 EST


The following patch remove a possible source of fput() call from inside
IRQ context. Myself, like Eric, wasn't able to reproduce an fput() call
from IRQ context, but Jeff said he was able to, with the attached test
program. Independently from this, the bug is conceptually there, so we
might be better off fixing it.
This patch adds an optimization similar to the one we already do on
->ki_filp, on ->ki_eventfd. Playing with ->f_count directly is not pretty
in general, but the alternative here would be to add a brand new delayed
fput() infrastructure, that I'm not sure is worth it.


On Sun, 15 Mar 2009, Benjamin LaHaise wrote:

> This looks reasonably sane, the only concern I have with it is that I think
> it logically makes more sense to use the same convention for fi_filp and
> ki_eventfd, as the different in IS_ERR vs checking for NULL is a bit
> confusing. Aside from that, it looks like it should fix the problem
> correctly.

Makes sense.



Signed-off-by: Davide Libenzi <davidel@xxxxxxxxxxxxxxx>


- Davide


---
fs/aio.c | 37 +++++++++++++++++++++++++++----------
1 file changed, 27 insertions(+), 10 deletions(-)

Index: linux-2.6.mod/fs/aio.c
===================================================================
--- linux-2.6.mod.orig/fs/aio.c 2009-03-15 13:11:56.000000000 -0700
+++ linux-2.6.mod/fs/aio.c 2009-03-18 10:18:21.000000000 -0700
@@ -443,7 +443,7 @@ static struct kiocb *__aio_get_req(struc
req->private = NULL;
req->ki_iovec = NULL;
INIT_LIST_HEAD(&req->ki_run_list);
- req->ki_eventfd = ERR_PTR(-EINVAL);
+ req->ki_eventfd = NULL;

/* Check if the completion queue has enough free space to
* accept an event from this io.
@@ -485,8 +485,6 @@ static inline void really_put_req(struct
{
assert_spin_locked(&ctx->ctx_lock);

- if (!IS_ERR(req->ki_eventfd))
- fput(req->ki_eventfd);
if (req->ki_dtor)
req->ki_dtor(req);
if (req->ki_iovec != &req->ki_inline_vec)
@@ -508,8 +506,11 @@ static void aio_fput_routine(struct work
list_del(&req->ki_list);
spin_unlock_irq(&fput_lock);

- /* Complete the fput */
- __fput(req->ki_filp);
+ /* Complete the fput(s) */
+ if (req->ki_filp != NULL)
+ __fput(req->ki_filp);
+ if (req->ki_eventfd != NULL)
+ __fput(req->ki_eventfd);

/* Link the iocb into the context's free list */
spin_lock_irq(&ctx->ctx_lock);
@@ -527,12 +528,14 @@ static void aio_fput_routine(struct work
*/
static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
{
+ int schedule_putreq = 0;
+
dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n",
req, atomic_long_read(&req->ki_filp->f_count));

assert_spin_locked(&ctx->ctx_lock);

- req->ki_users --;
+ req->ki_users--;
BUG_ON(req->ki_users < 0);
if (likely(req->ki_users))
return 0;
@@ -540,10 +543,23 @@ static int __aio_put_req(struct kioctx *
req->ki_cancel = NULL;
req->ki_retry = NULL;

- /* Must be done under the lock to serialise against cancellation.
- * Call this aio_fput as it duplicates fput via the fput_work.
+ /*
+ * Try to optimize the aio and eventfd file* puts, by avoiding to
+ * schedule work in case it is not __fput() time. In normal cases,
+ * we would not be holding the last reference to the file*, so
+ * this function will be executed w/out any aio kthread wakeup.
*/
- if (unlikely(atomic_long_dec_and_test(&req->ki_filp->f_count))) {
+ if (unlikely(atomic_long_dec_and_test(&req->ki_filp->f_count)))
+ schedule_putreq++;
+ else
+ req->ki_filp = NULL;
+ if (req->ki_eventfd != NULL) {
+ if (unlikely(atomic_long_dec_and_test(&req->ki_eventfd->f_count)))
+ schedule_putreq++;
+ else
+ req->ki_eventfd = NULL;
+ }
+ if (unlikely(schedule_putreq)) {
get_ioctx(ctx);
spin_lock(&fput_lock);
list_add(&req->ki_list, &fput_head);
@@ -1009,7 +1025,7 @@ int aio_complete(struct kiocb *iocb, lon
* eventfd. The eventfd_signal() function is safe to be called
* from IRQ context.
*/
- if (!IS_ERR(iocb->ki_eventfd))
+ if (iocb->ki_eventfd != NULL)
eventfd_signal(iocb->ki_eventfd, 1);

put_rq:
@@ -1608,6 +1624,7 @@ static int io_submit_one(struct kioctx *
req->ki_eventfd = eventfd_fget((int) iocb->aio_resfd);
if (IS_ERR(req->ki_eventfd)) {
ret = PTR_ERR(req->ki_eventfd);
+ req->ki_eventfd = NULL;
goto out_put_req;
}
}
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/types.h>
#include <errno.h>
#include <assert.h>
#include <sys/eventfd.h>
#include <libaio.h>

int
main(int argc, char **argv)
{
#define SIZE (256*1024*1024)
char *buf;
struct io_event io_event;
struct iocb iocb;
struct iocb *iocbs[] = { &iocb };
int rwfd, efd;
int res;
io_context_t io_ctx;

efd = eventfd(0, 0);
if (efd < 0) {
perror("eventfd");
exit(1);
}

rwfd = open("rwfile", O_RDWR|O_DIRECT); assert(rwfd != -1);
if (posix_memalign((void **)&buf, getpagesize(), SIZE) < 0) {
perror("posix_memalign");
exit(1);
}
memset(buf, 0x42, SIZE);

/* Write test. */
res = io_queue_init(1024, &io_ctx); assert(res == 0);
io_prep_pwrite(&iocb, rwfd, buf, SIZE, 0);
io_set_eventfd(&iocb, efd);
res = io_submit(io_ctx, 1, iocbs); assert(res == 1);

/* Now close the eventfd so that AIO has the last reference */
close(efd);

/* Keep this process around so that the aio subsystem does not hold
* the last reference on the rwfd, otherwise the really_put_req will
* be called from process context */
res = io_getevents(io_ctx, 1, 1, &io_event, NULL);
if (res != 1) {
if (res < 0) {
errno = -res;
perror("io_getevents");
} else
printf("io_getevents did not return 1 event after "
"closing eventfd\n");
exit(1);
}
assert(io_event.res == SIZE);
printf("eventfd write test [SUCCESS]\n");

return 0;
}