[RFC] locking/mutex: Fix starvation of sleeping waiters

From: Imre Deak
Date: Mon Jul 18 2016 - 12:16:59 EST


Currently a thread sleeping on a mutex wait queue can be delayed
indefinitely by other threads managing to steal the lock, that is
acquiring the lock out-of-order before the sleepers. I noticed this via
a testcase (see the Reference: below) where one CPU was unlocking /
relocking a mutex in a tight loop while another CPU was delayed
indefinitely trying to wake up and get the lock but losing out to the
first CPU and going back to sleep:

CPU0: CPU1:
mutex_lock->acquire
mutex_lock->sleep
mutex_unlock->wake CPU1
wakeup
mutex_lock->acquire
trylock fail->sleep
mutex_unlock->wake CPU1
wakeup
mutex_lock->acquire
trylock fail->sleep
... ...

To fix this we can make sure that CPU1 makes progress by avoiding the
fastpath locking, optimistic spinning and trylocking if there is any
waiter on the list. The corresponding check can be done without holding
wait_lock, since the goal is only to make sure sleepers make progress
and not to guarantee that the locking will happen in FIFO order.

CC: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
CC: Ingo Molnar <mingo@xxxxxxxxxx>
CC: Chris Wilson <chris@xxxxxxxxxxxxxxxxxx>
CC: Daniel Vetter <daniel.vetter@xxxxxxxxx>
CC: Ville SyrjÃlà <ville.syrjala@xxxxxxxxxxxxxxx>
Reference: https://bugs.freedesktop.org/show_bug.cgi?id=96701
Signed-off-by: Imre Deak <imre.deak@xxxxxxxxx>
---
include/linux/mutex.h | 5 +++++
kernel/locking/mutex.c | 33 +++++++++++++++++++++------------
2 files changed, 26 insertions(+), 12 deletions(-)

diff --git a/include/linux/mutex.h b/include/linux/mutex.h
index 2cb7531..562dfa8 100644
--- a/include/linux/mutex.h
+++ b/include/linux/mutex.h
@@ -130,6 +130,11 @@ static inline int mutex_is_locked(struct mutex *lock)
return atomic_read(&lock->count) != 1;
}

+static inline int mutex_has_waiters(struct mutex *lock)
+{
+ return !list_empty(&lock->wait_list);
+}
+
/*
* See kernel/locking/mutex.c for detailed documentation of these APIs.
* Also see Documentation/locking/mutex-design.txt.
diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
index a70b90d..d18b531 100644
--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -276,7 +276,7 @@ static inline int mutex_can_spin_on_owner(struct mutex *lock)
*/
static inline bool mutex_try_to_acquire(struct mutex *lock)
{
- return !mutex_is_locked(lock) &&
+ return !mutex_is_locked(lock) && !mutex_has_waiters(lock) &&
(atomic_cmpxchg_acquire(&lock->count, 1, 0) == 1);
}

@@ -520,7 +520,8 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

- if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
+ if (!mutex_has_waiters(lock) &&
+ mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
/* got the lock, yay! */
preempt_enable();
return 0;
@@ -532,7 +533,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
* Once more, try to acquire the lock. Only try-lock the mutex if
* it is unlocked to reduce unnecessary xchg() operations.
*/
- if (!mutex_is_locked(lock) &&
+ if (!mutex_is_locked(lock) && !mutex_has_waiters(lock) &&
(atomic_xchg_acquire(&lock->count, 0) == 1))
goto skip_wait;

@@ -556,7 +557,8 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
* other waiters. We only attempt the xchg if the count is
* non-negative in order to avoid unnecessary xchg operations:
*/
- if (atomic_read(&lock->count) >= 0 &&
+ if (lock->wait_list.next == &waiter.list &&
+ atomic_read(&lock->count) >= 0 &&
(atomic_xchg_acquire(&lock->count, -1) == 1))
break;

@@ -789,10 +791,11 @@ __mutex_lock_interruptible_slowpath(struct mutex *lock);
*/
int __sched mutex_lock_interruptible(struct mutex *lock)
{
- int ret;
+ int ret = -1;

might_sleep();
- ret = __mutex_fastpath_lock_retval(&lock->count);
+ if (!mutex_has_waiters(lock))
+ ret = __mutex_fastpath_lock_retval(&lock->count);
if (likely(!ret)) {
mutex_set_owner(lock);
return 0;
@@ -804,10 +807,11 @@ EXPORT_SYMBOL(mutex_lock_interruptible);

int __sched mutex_lock_killable(struct mutex *lock)
{
- int ret;
+ int ret = -1;

might_sleep();
- ret = __mutex_fastpath_lock_retval(&lock->count);
+ if (!mutex_has_waiters(lock))
+ ret = __mutex_fastpath_lock_retval(&lock->count);
if (likely(!ret)) {
mutex_set_owner(lock);
return 0;
@@ -905,6 +909,9 @@ int __sched mutex_trylock(struct mutex *lock)
{
int ret;

+ if (mutex_has_waiters(lock))
+ return 0;
+
ret = __mutex_fastpath_trylock(&lock->count, __mutex_trylock_slowpath);
if (ret)
mutex_set_owner(lock);
@@ -917,11 +924,12 @@ EXPORT_SYMBOL(mutex_trylock);
int __sched
__ww_mutex_lock(struct ww_mutex *lock, struct ww_acquire_ctx *ctx)
{
- int ret;
+ int ret = -1;

might_sleep();

- ret = __mutex_fastpath_lock_retval(&lock->base.count);
+ if (!mutex_has_waiters(lock))
+ ret = __mutex_fastpath_lock_retval(&lock->base.count);

if (likely(!ret)) {
ww_mutex_set_context_fastpath(lock, ctx);
@@ -935,11 +943,12 @@ EXPORT_SYMBOL(__ww_mutex_lock);
int __sched
__ww_mutex_lock_interruptible(struct ww_mutex *lock, struct ww_acquire_ctx *ctx)
{
- int ret;
+ int ret = -1;

might_sleep();

- ret = __mutex_fastpath_lock_retval(&lock->base.count);
+ if (!mutex_has_waiters(lock))
+ ret = __mutex_fastpath_lock_retval(&lock->base.count);

if (likely(!ret)) {
ww_mutex_set_context_fastpath(lock, ctx);
--
2.5.0