[PATCH v2 3/3] locking/rwsem: Wake up all readers for wait queue waker

From: Waiman Long
Date: Thu Feb 16 2023 - 16:10:49 EST


As noted in commit 54c1ee4d614d ("locking/rwsem: Conditionally wake
waiters in reader/writer slowpaths"), it was possible for a rwsem to get
into a state where a reader-owned rwsem could have many readers waiting
in the wait queue but no writer.

Recently, it was found that one way to cause this condition is to have a
highly contended rwsem with many readers, like a mmap_sem. There can be
hundreds of readers waiting in the wait queue of a writer-owned mmap_sem.
The rwsem_wake() call by the up_write() call of the rwsem owning writer
can hit the 256 reader wakeup limit and leave the rests of the readers
remaining in the wait queue. The reason for the limit is to avoid
excessive delay in doing other useful work.

With commit 54c1ee4d614d ("locking/rwsem: Conditionally wake waiters
in reader/writer slowpaths"), a new incoming reader should wake up
another batch of up to 256 readers. However, these incoming readers
or writers will have to wait in the wait queue and there is nothing
else they can do until it is their turn to be waken up. This patch
renames rwsem_mark_wake() to __rwsem_mark_wake() and adds an additional
in_waitq argument to indicate that the waker is in the wait queue and
can ignore the limit. A rwsem_mark_wake() helper is added that keeps
the original semantics.

Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/rwsem.c | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index fc3961ceabe8..35b4adf8ea55 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -408,9 +408,9 @@ rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
*
* Implies rwsem_del_waiter() for all woken readers.
*/
-static void rwsem_mark_wake(struct rw_semaphore *sem,
- enum rwsem_wake_type wake_type,
- struct wake_q_head *wake_q)
+static void __rwsem_mark_wake(struct rw_semaphore *sem,
+ enum rwsem_wake_type wake_type,
+ struct wake_q_head *wake_q, bool in_waitq)
{
long count = atomic_long_read(&sem->count);
struct rwsem_waiter *waiter, *tmp;
@@ -542,9 +542,10 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
list_move_tail(&waiter->list, &wlist);

/*
- * Limit # of readers that can be woken up per wakeup call.
+ * Limit # of readers that can be woken up per wakeup call
+ * unless the waker is waiting in the wait queue.
*/
- if (unlikely(woken >= MAX_READERS_WAKEUP))
+ if (unlikely(!in_waitq && (woken >= MAX_READERS_WAKEUP)))
break;
}

@@ -594,6 +595,13 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
}
}

+static inline void rwsem_mark_wake(struct rw_semaphore *sem,
+ enum rwsem_wake_type wake_type,
+ struct wake_q_head *wake_q)
+{
+ __rwsem_mark_wake(sem, wake_type, wake_q, false);
+}
+
/*
* Remove a waiter and try to wake up other waiters in the wait queue
* This function is called from the out_nolock path of both the reader and
@@ -1022,7 +1030,7 @@ static inline void rwsem_cond_wake_waiter(struct rw_semaphore *sem, long count,
wake_type = RWSEM_WAKE_ANY;
clear_nonspinnable(sem);
}
- rwsem_mark_wake(sem, wake_type, wake_q);
+ __rwsem_mark_wake(sem, wake_type, wake_q, true);
}

/*
--
2.31.1