Re: [PATCH] improve rwsem scalability (was Re: [CFT][RFC] HT scheduler)

From: Nick Piggin
Date: Fri Dec 19 2003 - 19:09:42 EST




Martin J. Bligh wrote:

What do you think? Is there any other sorts of benchmarks I should try?
The improvement I think is significant, although volanomark is quite
erratic and doesn't show it well.

I don't see any problem with moving the wakeups out of the rwsem's spinlock.


Actually my implementation does have a race because list_del_init isn't
atomic. Shouldn't be difficult to fix if anyone cares about it... otherwise
I won't bother.


If you can fix it up, I'll get people here to do some more testing on the
patch on other benchmarks, etc.


OK, this one should work. There isn't much that uses rwsems, but mmap_sem
is the obvious one.

So if the patch helps anywhere, it will be something heavily threaded, that
is taking the mmap sem a lot (mostly for reading, sometimes writing), with
a lot of CPUs and a lot of runqueue activity (ie waking up, sleeping, tasks
running).


Move rwsem's up_read wakeups out of the semaphore's wait_lock


linux-2.6-npiggin/lib/rwsem.c | 42 ++++++++++++++++++++++++++++--------------
1 files changed, 28 insertions(+), 14 deletions(-)

diff -puN lib/rwsem.c~rwsem-scale lib/rwsem.c
--- linux-2.6/lib/rwsem.c~rwsem-scale 2003-12-20 02:17:36.000000000 +1100
+++ linux-2.6-npiggin/lib/rwsem.c 2003-12-20 02:23:59.000000000 +1100
@@ -8,7 +8,7 @@
#include <linux/module.h>

struct rwsem_waiter {
- struct list_head list;
+ struct list_head list, wake_list;
struct task_struct *task;
unsigned int flags;
#define RWSEM_WAITING_FOR_READ 0x00000001
@@ -35,9 +35,12 @@ void rwsemtrace(struct rw_semaphore *sem
* - the spinlock must be held by the caller
* - woken process blocks are discarded from the list after having flags zeroised
* - writers are only woken if wakewrite is non-zero
+ *
+ * The spinlock will be dropped by this function
*/
static inline struct rw_semaphore *__rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
{
+ LIST_HEAD(wake_list);
struct rwsem_waiter *waiter;
struct list_head *next;
signed long oldcount;
@@ -65,7 +68,8 @@ static inline struct rw_semaphore *__rws

list_del(&waiter->list);
waiter->flags = 0;
- wake_up_process(waiter->task);
+ if (list_empty(&waiter->wake_list))
+ list_add_tail(&waiter->wake_list, &wake_list);
goto out;

/* don't want to wake any writers */
@@ -74,9 +78,10 @@ static inline struct rw_semaphore *__rws
if (waiter->flags & RWSEM_WAITING_FOR_WRITE)
goto out;

- /* grant an infinite number of read locks to the readers at the front of the queue
- * - note we increment the 'active part' of the count by the number of readers (less one
- * for the activity decrement we've already done) before waking any processes up
+ /* grant an infinite number of read locks to the readers at the front
+ * of the queue - note we increment the 'active part' of the count by
+ * the number of readers (less one for the activity decrement we've
+ * already done) before waking any processes up
*/
readers_only:
woken = 0;
@@ -100,13 +105,22 @@ static inline struct rw_semaphore *__rws
waiter = list_entry(next,struct rwsem_waiter,list);
next = waiter->list.next;
waiter->flags = 0;
- wake_up_process(waiter->task);
+ if (list_empty(&waiter->wake_list))
+ list_add_tail(&waiter->wake_list, &wake_list);
}

sem->wait_list.next = next;
next->prev = &sem->wait_list;

out:
+ spin_unlock(&sem->wait_lock);
+ while (!list_empty(&wake_list)) {
+ waiter = list_entry(wake_list.next,struct rwsem_waiter,wake_list);
+ list_del(&waiter->wake_list);
+ waiter->wake_list.next = &waiter->wake_list;
+ wmb(); /* Mustn't lose wakeups */
+ wake_up_process(waiter->task);
+ }
rwsemtrace(sem,"Leaving __rwsem_do_wake");
return sem;

@@ -130,9 +144,9 @@ static inline struct rw_semaphore *rwsem
set_task_state(tsk,TASK_UNINTERRUPTIBLE);

/* set up my own style of waitqueue */
- spin_lock(&sem->wait_lock);
waiter->task = tsk;
-
+ INIT_LIST_HEAD(&waiter->wake_list);
+ spin_lock(&sem->wait_lock);
list_add_tail(&waiter->list,&sem->wait_list);

/* note that we're now waiting on the lock, but no longer actively read-locking */
@@ -143,8 +157,8 @@ static inline struct rw_semaphore *rwsem
*/
if (!(count & RWSEM_ACTIVE_MASK))
sem = __rwsem_do_wake(sem,1);
-
- spin_unlock(&sem->wait_lock);
+ else
+ spin_unlock(&sem->wait_lock);

/* wait to be given the lock */
for (;;) {
@@ -204,8 +218,8 @@ struct rw_semaphore *rwsem_wake(struct r
/* do nothing if list empty */
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem,1);
-
- spin_unlock(&sem->wait_lock);
+ else
+ spin_unlock(&sem->wait_lock);

rwsemtrace(sem,"Leaving rwsem_wake");

@@ -226,8 +240,8 @@ struct rw_semaphore *rwsem_downgrade_wak
/* do nothing if list empty */
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem,0);
-
- spin_unlock(&sem->wait_lock);
+ else
+ spin_unlock(&sem->wait_lock);

rwsemtrace(sem,"Leaving rwsem_downgrade_wake");
return sem;

_