[PATCH 2/2] locking/rwsem: Wake up all readers for wait queue waker

From: Waiman Long
Date: Mon Feb 13 2023 - 14:50:01 EST


As noted in commit 54c1ee4d614d ("locking/rwsem: Conditionally wake
waiters in reader/writer slowpaths"), it was possible for a rwsem to get
into a state where a reader-owned rwsem could have many readers waiting
in the wait queue but no writer.

Recently, it was found that one way to cause this condition is to have a
highly contended rwsem with many readers, like a mmap_sem. There can be
hundreds of readers waiting in the wait queue of a writer-owned mmap_sem.
The rwsem_wake() call by the up_write() call of the rwsem owning writer
can hit the 256 reader wakeup limit and leave the rests of the readers
remaining in the wait queue. The reason for the limit is to avoid
excessive delay in doing other useful work.

With commit 54c1ee4d614d ("locking/rwsem: Conditionally wake waiters in
reader/writer slowpaths"), a new incoming reader should wake up another
batch of up to 256 readers. However, these incoming readers or writers
will have to wait in the wait queue and there is nothing else they can
do until it is their turn to be waken up. This patch adds an additional
in_waitq argument to rwsem_mark_wake() to indicate that the waker is
in the wait queue and can ignore the limit.

Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/rwsem.c | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 3936a5fe1229..723a8824b967 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -410,7 +410,7 @@ rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
*/
static void rwsem_mark_wake(struct rw_semaphore *sem,
enum rwsem_wake_type wake_type,
- struct wake_q_head *wake_q)
+ struct wake_q_head *wake_q, bool in_waitq)
{
struct rwsem_waiter *waiter, *tmp;
long oldcount, woken = 0, adjustment = 0;
@@ -524,9 +524,10 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
list_move_tail(&waiter->list, &wlist);

/*
- * Limit # of readers that can be woken up per wakeup call.
+ * Limit # of readers that can be woken up per wakeup call
+ * unless the waker is waiting in the wait queue.
*/
- if (unlikely(woken >= MAX_READERS_WAKEUP))
+ if (unlikely(!in_waitq && (woken >= MAX_READERS_WAKEUP)))
break;
}

@@ -597,7 +598,7 @@ rwsem_del_wake_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter,
* be eligible to acquire or spin on the lock.
*/
if (rwsem_del_waiter(sem, waiter) && first)
- rwsem_mark_wake(sem, RWSEM_WAKE_ANY, wake_q);
+ rwsem_mark_wake(sem, RWSEM_WAKE_ANY, wake_q, false);
raw_spin_unlock_irq(&sem->wait_lock);
if (!wake_q_empty(wake_q))
wake_up_q(wake_q);
@@ -1004,7 +1005,7 @@ static inline void rwsem_cond_wake_waiter(struct rw_semaphore *sem, long count,
wake_type = RWSEM_WAKE_ANY;
clear_nonspinnable(sem);
}
- rwsem_mark_wake(sem, wake_type, wake_q);
+ rwsem_mark_wake(sem, wake_type, wake_q, true);
}

/*
@@ -1042,7 +1043,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
raw_spin_lock_irq(&sem->wait_lock);
if (!list_empty(&sem->wait_list))
rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED,
- &wake_q);
+ &wake_q, false);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
}
@@ -1259,7 +1260,7 @@ static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
goto unlock_out;
}
}
- rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
+ rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q, false);

unlock_out:
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
@@ -1281,7 +1282,7 @@ static struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem)
raw_spin_lock_irqsave(&sem->wait_lock, flags);

if (!list_empty(&sem->wait_list))
- rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q);
+ rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q, false);

raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
wake_up_q(&wake_q);
--
2.31.1