[patch] epoll reduced (to 1) number of passes over the ready set...

From: Davide Libenzi
Date: Mon Feb 26 2007 - 21:32:34 EST



Epoll is doing multiple passes over the ready set at the moment, because
of the constraints over the f_op->poll() call. Looking at the code again,
I noticed that we already hold the epoll semaphore in read, and this
(together with other locking conditions that hold while doing an
epoll_wait()) can lead to a smarter way to "ship" events to userspace (in
a single pass). I added more (even) more comments to the code to explain
the conditions why certain operations are safe.
This is a stress application that can be used to test the new code. It
spwans multiple thread and call epoll_wait() and epoll_ctl() from many
threads. Stress tested on my dual Opteron 254 w/out any problems.

http://www.xmailserver.org/totalmess.c

This is not a benchmark, just something that tries to stress and exploit
possible problems with the new code.



- Davide



eventpoll.c | 174 ++++++++++++++++++++++--------------------------------------
1 file changed, 66 insertions(+), 108 deletions(-)




diff -Nru linux-2.6.20/fs/eventpoll.c linux-2.6.20.mod/fs/eventpoll.c
--- linux-2.6.20/fs/eventpoll.c 2007-02-04 10:44:54.000000000 -0800
+++ linux-2.6.20.mod/fs/eventpoll.c 2007-02-26 17:32:25.000000000 -0800
@@ -218,9 +218,6 @@
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;

- /* List header used to link the item to the transfer list */
- struct list_head txlink;
-
/*
* This is used during the collection/transfer of events to userspace
* to pin items empty events set.
@@ -259,10 +256,9 @@
static int ep_eventpoll_close(struct inode *inode, struct file *file);
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait);
static int ep_collect_ready_items(struct eventpoll *ep,
- struct list_head *txlist, int maxevents);
+ struct list_head *txlist);
static int ep_send_events(struct eventpoll *ep, struct list_head *txlist,
- struct epoll_event __user *events);
-static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist);
+ struct epoll_event __user *events, int maxevents);
static int ep_events_transfer(struct eventpoll *ep,
struct epoll_event __user *events,
int maxevents);
@@ -355,17 +351,6 @@
return rb_parent(n) != n;
}

-/*
- * Remove the item from the list and perform its initialization.
- * This is useful for us because we can test if the item is linked
- * using "ep_is_linked(p)".
- */
-static inline void ep_list_del(struct list_head *p)
-{
- list_del(p);
- INIT_LIST_HEAD(p);
-}
-
/* Tells us if the item is currently linked */
static inline int ep_is_linked(struct list_head *p)
{
@@ -480,7 +465,7 @@
epi = list_entry(lsthead->next, struct epitem, fllink);

ep = epi->ep;
- ep_list_del(&epi->fllink);
+ list_del_init(&epi->fllink);
down_write(&ep->sem);
ep_remove(ep, epi);
up_write(&ep->sem);
@@ -1011,7 +996,6 @@
ep_rb_initnode(&epi->rbn);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
- INIT_LIST_HEAD(&epi->txlink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
@@ -1080,7 +1064,7 @@
*/
write_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
- ep_list_del(&epi->rdllink);
+ list_del_init(&epi->rdllink);
write_unlock_irqrestore(&ep->lock, flags);

kmem_cache_free(epi_cache, epi);
@@ -1170,7 +1154,7 @@
while (!list_empty(lsthead)) {
pwq = list_entry(lsthead->next, struct eppoll_entry, llink);

- ep_list_del(&pwq->llink);
+ list_del_init(&pwq->llink);
remove_wait_queue(pwq->whead, &pwq->wait);
kmem_cache_free(pwq_cache, pwq);
}
@@ -1213,7 +1197,7 @@
* we want to remove it from this list to avoid stale events.
*/
if (ep_is_linked(&epi->rdllink))
- ep_list_del(&epi->rdllink);
+ list_del_init(&epi->rdllink);

error = 0;
eexit_1:
@@ -1248,7 +1232,7 @@
/* Remove the current item from the list of epoll hooks */
spin_lock(&file->f_ep_lock);
if (ep_is_linked(&epi->fllink))
- ep_list_del(&epi->fllink);
+ list_del_init(&epi->fllink);
spin_unlock(&file->f_ep_lock);

/* We need to acquire the write IRQ lock before calling ep_unlink() */
@@ -1363,46 +1347,19 @@

/*
* Since we have to release the lock during the __copy_to_user() operation and
- * during the f_op->poll() call, we try to collect the maximum number of items
- * by reducing the irqlock/irqunlock switching rate.
+ * during the f_op->poll() call, we acquire the ready list into our temporary
+ * transmission list. List splice is O(1).
*/
-static int ep_collect_ready_items(struct eventpoll *ep, struct list_head *txlist, int maxevents)
+static int ep_collect_ready_items(struct eventpoll *ep, struct list_head *txlist)
{
- int nepi;
unsigned long flags;
- struct list_head *lsthead = &ep->rdllist, *lnk;
- struct epitem *epi;

write_lock_irqsave(&ep->lock, flags);
-
- for (nepi = 0, lnk = lsthead->next; lnk != lsthead && nepi < maxevents;) {
- epi = list_entry(lnk, struct epitem, rdllink);
-
- lnk = lnk->next;
-
- /* If this file is already in the ready list we exit soon */
- if (!ep_is_linked(&epi->txlink)) {
- /*
- * This is initialized in this way so that the default
- * behaviour of the reinjecting code will be to push back
- * the item inside the ready list.
- */
- epi->revents = epi->event.events;
-
- /* Link the ready item into the transfer list */
- list_add(&epi->txlink, txlist);
- nepi++;
-
- /*
- * Unlink the item from the ready list.
- */
- ep_list_del(&epi->rdllink);
- }
- }
-
+ list_splice(&ep->rdllist, txlist);
+ INIT_LIST_HEAD(&ep->rdllist);
write_unlock_irqrestore(&ep->lock, flags);

- return nepi;
+ return 0;
}


@@ -1412,21 +1369,24 @@
* because of the way poll() is traditionally implemented in Linux.
*/
static int ep_send_events(struct eventpoll *ep, struct list_head *txlist,
- struct epoll_event __user *events)
+ struct epoll_event __user *events, int maxevents)
{
- int eventcnt = 0;
+ int eventcnt, error = -EFAULT, pwake = 0;
unsigned int revents;
- struct list_head *lnk;
+ unsigned long flags;
struct epitem *epi;
+ struct list_head injlist;
+
+ INIT_LIST_HEAD(&injlist);

/*
* We can loop without lock because this is a task private list.
* The test done during the collection loop will guarantee us that
* another task will not try to collect this file. Also, items
- * cannot vanish during the loop because we are holding "sem".
+ * cannot vanish during the loop because we are holding "sem" in read.
*/
- list_for_each(lnk, txlist) {
- epi = list_entry(lnk, struct epitem, txlink);
+ for (eventcnt = 0; !list_empty(txlist) && eventcnt < maxevents;) {
+ epi = list_entry(txlist->next, struct epitem, rdllink);

/*
* Get the ready file event set. We can safely use the file
@@ -1442,56 +1402,56 @@
*/
epi->revents = revents & epi->event.events;

+ /*
+ * Is the event mask intersect the caller-requested one,
+ * deliver the event to userspace. Again, we are holding
+ * "sem" in read, so no operations coming from userspace,
+ * that change the item can happen.
+ */
if (epi->revents) {
if (__put_user(epi->revents,
&events[eventcnt].events) ||
__put_user(epi->event.data,
&events[eventcnt].data))
- return -EFAULT;
+ goto errxit;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
eventcnt++;
}
- }
- return eventcnt;
-}
-
-
-/*
- * Walk through the transfer list we collected with ep_collect_ready_items()
- * and, if 1) the item is still "alive" 2) its event set is not empty 3) it's
- * not already linked, links it to the ready list. Same as above, we are holding
- * "sem" so items cannot vanish underneath our nose.
- */
-static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist)
-{
- int ricnt = 0, pwake = 0;
- unsigned long flags;
- struct epitem *epi;
-
- write_lock_irqsave(&ep->lock, flags);
-
- while (!list_empty(txlist)) {
- epi = list_entry(txlist->next, struct epitem, txlink);
-
- /* Unlink the current item from the transfer list */
- ep_list_del(&epi->txlink);

/*
- * If the item is no more linked to the interest set, we don't
- * have to push it inside the ready list because the following
- * ep_release_epitem() is going to drop it. Also, if the current
- * item is set to have an Edge Triggered behaviour, we don't have
- * to push it back either.
+ * This is tricky. We are holding the "sem" in read, and this
+ * means that the operations that can change the "linked" status
+ * of the epoll item (epi->rbn and epi->rdllink) cannot touch them.
+ * Also, since we are "linked" from a epi->rdllink POV (the item
+ * is linked to our transmission list, we just splice'd), the
+ * ep_poll_callback() cannot touch us either, because of the check
+ * present in there. Another parallel epoll_wait() will not
+ * get the same result set, since we spliced the ready list before.
*/
- if (ep_rb_linked(&epi->rbn) && !(epi->event.events & EPOLLET) &&
- (epi->revents & epi->event.events) && !ep_is_linked(&epi->rdllink)) {
- list_add_tail(&epi->rdllink, &ep->rdllist);
- ricnt++;
- }
+ list_del(&epi->rdllink);
+ if (!(epi->event.events & EPOLLET) &&
+ (epi->revents & epi->event.events))
+ list_add_tail(&epi->rdllink, &injlist);
+ else {
+ /*
+ * Be sure the item is totally detached before re-init the
+ * list_head. After INIT_LIST_HEAD() is committed, the
+ * ep_poll_callback() can requeue the item again, but we
+ * don't care since we are already past it.
+ */
+ smp_mb();
+ INIT_LIST_HEAD(&epi->rdllink);
+ }
}
+ error = 0;

- if (ricnt) {
+ errxit:
+
+ write_lock_irqsave(&ep->lock, flags);
+ if (!list_empty(&injlist) || !list_empty(txlist)) {
+ list_splice(txlist, &ep->rdllist);
+ list_splice(&injlist, &ep->rdllist);
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
@@ -1500,14 +1460,15 @@
__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE |
TASK_INTERRUPTIBLE);
if (waitqueue_active(&ep->poll_wait))
- pwake++;
+ pwake++;
}
-
write_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&psw, &ep->poll_wait);
+
+ return eventcnt == 0 ? error: eventcnt;
}


@@ -1517,7 +1478,7 @@
static int ep_events_transfer(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
- int eventcnt = 0;
+ int eventcnt;
struct list_head txlist;

INIT_LIST_HEAD(&txlist);
@@ -1529,13 +1490,10 @@
down_read(&ep->sem);

/* Collect/extract ready items */
- if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {
- /* Build result set in userspace */
- eventcnt = ep_send_events(ep, &txlist, events);
-
- /* Reinject ready items into the ready list */
- ep_reinject_items(ep, &txlist);
- }
+ ep_collect_ready_items(ep, &txlist);
+
+ /* Build result set in userspace */
+ eventcnt = ep_send_events(ep, &txlist, events, maxevents);

up_read(&ep->sem);

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/