[RFC PATCH-queue/locking/rfc 2/2] locking/mutex: Enable optimistic spinning of woken waiter

From: Waiman Long
Date: Fri Aug 26 2016 - 19:37:54 EST


This patch makes the waiter that sets the HANDOFF flag start spinning
instead of sleeping until the handoff is complete or the owner
sleeps. Otherwise, the handoff will cause the optimistic spinners to
abort spinning as the handed-off owner may not be running.

Signed-off-by: Waiman Long <Waiman.Long@xxxxxxx>
---
kernel/locking/mutex.c | 104 ++++++++++++++++++++++++++++++++----------------
1 files changed, 70 insertions(+), 34 deletions(-)

diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
index 4a0e16e..064dd61 100644
--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -380,22 +380,38 @@ static inline int mutex_can_spin_on_owner(struct mutex *lock)
*
* Returns true when the lock was taken, otherwise false, indicating
* that we need to jump to the slowpath and sleep.
+ *
+ * The waiter flag is set to true if the spinner is a waiter in the wait
+ * queue. The waiter-spinner will spin on the lock directly and concurrently
+ * with the spinner at the head of the OSQ, if present, until the owner is
+ * changed to itself.
*/
static bool mutex_optimistic_spin(struct mutex *lock,
- struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
+ struct ww_acquire_ctx *ww_ctx,
+ const bool use_ww_ctx, bool waiter)
{
struct task_struct *task = current;
+ bool acquired = false;

- if (!mutex_can_spin_on_owner(lock))
- goto done;
+ if (!waiter) {
+ /*
+ * The purpose of the mutex_can_spin_on_owner() function is
+ * to eliminate the overhead of osq_lock() and osq_unlock()
+ * in case spinning isn't possible. As a waiter-spinner
+ * is not going to take OSQ lock anyway, there is no need
+ * to call mutex_can_spin_on_owner().
+ */
+ if (!mutex_can_spin_on_owner(lock))
+ goto done;

- /*
- * In order to avoid a stampede of mutex spinners trying to
- * acquire the mutex all at once, the spinners need to take a
- * MCS (queued) lock first before spinning on the owner field.
- */
- if (!osq_lock(&lock->osq))
- goto done;
+ /*
+ * In order to avoid a stampede of mutex spinners trying to
+ * acquire the mutex all at once, the spinners need to take a
+ * MCS (queued) lock first before spinning on the owner field.
+ */
+ if (!osq_lock(&lock->osq))
+ goto done;
+ }

while (true) {
struct task_struct *owner;
@@ -421,23 +437,24 @@ static bool mutex_optimistic_spin(struct mutex *lock,
* release the lock or go to sleep.
*/
owner = __mutex_owner(lock);
- if (owner && !mutex_spin_on_owner(lock, owner))
- break;

- /* Try to acquire the mutex if it is unlocked. */
- if (__mutex_trylock(lock)) {
- osq_unlock(&lock->osq);
- return true;
+ if (owner == task)
+ goto gotlock;
+
+ if (owner) {
+ if (!mutex_spin_on_owner(lock, owner))
+ break;
+ /*
+ * For waiter-spinner, recheck the owner field
+ * as it may have been changed to itself.
+ */
+ if (waiter && (__mutex_owner(lock) == task))
+ goto gotlock;
}

- /*
- * When there's no owner, we might have preempted between the
- * owner acquiring the lock and setting the owner field. If
- * we're an RT task that will live-lock because we won't let
- * the owner complete.
- */
- if (!owner && (need_resched() || rt_task(task)))
- break;
+ /* Try to acquire the mutex if it is unlocked. */
+ if (__mutex_trylock(lock))
+ goto gotlock;

/*
* The cpu_relax() call is a compiler barrier which forces
@@ -446,16 +463,21 @@ static bool mutex_optimistic_spin(struct mutex *lock,
* values at the cost of a few extra spins.
*/
cpu_relax_lowlatency();
+ continue;
+gotlock:
+ acquired = true;
+ break;
}

- osq_unlock(&lock->osq);
+ if (!waiter)
+ osq_unlock(&lock->osq);
done:
/*
* If we fell out of the spin path because of need_resched(),
* reschedule now, before we try-lock the mutex. This avoids getting
* scheduled out right after we obtained the mutex.
*/
- if (need_resched()) {
+ if (!acquired && need_resched()) {
/*
* We _should_ have TASK_RUNNING here, but just in case
* we do not, make it so, otherwise we might get stuck.
@@ -464,11 +486,12 @@ done:
schedule_preempt_disabled();
}

- return false;
+ return acquired;
}
#else
static bool mutex_optimistic_spin(struct mutex *lock,
- struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
+ struct ww_acquire_ctx *ww_ctx,
+ const bool use_ww_ctx, bool waiter)
{
return false;
}
@@ -561,6 +584,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
unsigned long flags;
struct ww_mutex *ww;
int ret;
+ bool acquired;

if (use_ww_ctx) {
ww = container_of(lock, struct ww_mutex, base);
@@ -571,7 +595,8 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

- if (__mutex_trylock(lock) || mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
+ if (__mutex_trylock(lock) ||
+ mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, false)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx)
@@ -603,7 +628,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,

lock_contended(&lock->dep_map, ip);

- for (;;) {
+ for (acquired = false; !acquired; ) {
/*
* got a signal? (This code gets eliminated in the
* TASK_UNINTERRUPTIBLE case.)
@@ -624,13 +649,24 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
/* didn't get the lock, go to sleep: */
spin_unlock_mutex(&lock->wait_lock, flags);
schedule_preempt_disabled();
- spin_lock_mutex(&lock->wait_lock, flags);

- if (__mutex_trylock(lock))
- break;
+ /*
+ * Both __mutex_trylock() and __mutex_waiter_is_first()
+ * can be done without the protection of wait_lock.
+ */
+ acquired = __mutex_trylock(lock);

- if (__mutex_waiter_is_first(lock, &waiter))
+ if (!acquired && __mutex_waiter_is_first(lock, &waiter)) {
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
+ /*
+ * Wait until the lock is handed off or the owner
+ * sleeps.
+ */
+ acquired = mutex_optimistic_spin(lock, ww_ctx,
+ use_ww_ctx, true);
+ }
+
+ spin_lock_mutex(&lock->wait_lock, flags);
}
__set_task_state(task, TASK_RUNNING);

--
1.7.1