[tip:locking/core] locking/rwsem: Rework zeroing reader waiter->task

From: tip-bot for Davidlohr Bueso
Date: Fri Jun 03 2016 - 06:56:01 EST


Commit-ID: e38513905eeaae59056eac2c9ac55a43b1fc41b2
Gitweb: http://git.kernel.org/tip/e38513905eeaae59056eac2c9ac55a43b1fc41b2
Author: Davidlohr Bueso <dave@xxxxxxxxxxxx>
AuthorDate: Fri, 13 May 2016 11:56:27 -0700
Committer: Ingo Molnar <mingo@xxxxxxxxxx>
CommitDate: Fri, 3 Jun 2016 09:47:12 +0200

locking/rwsem: Rework zeroing reader waiter->task

Readers that are awoken will expect a nil ->task indicating
that a wakeup has occurred. Because of the way readers are
implemented, there's a small chance that the waiter will never
block in the slowpath (rwsem_down_read_failed), and therefore
requires some form of reference counting to avoid the following
scenario:

rwsem_down_read_failed() rwsem_wake()
get_task_struct();
spin_lock_irq(&wait_lock);
list_add_tail(&waiter.list)
spin_unlock_irq(&wait_lock);
raw_spin_lock_irqsave(&wait_lock)
__rwsem_do_wake()
while (1) {
set_task_state(TASK_UNINTERRUPTIBLE);
waiter->task = NULL
if (!waiter.task) // true
break;
schedule() // never reached

__set_task_state(TASK_RUNNING);
do_exit();
wake_up_process(tsk); // boom

... and therefore race with do_exit() when the caller returns.

There is also a mismatch between the smp_mb() and its documentation,
in that the serialization is done between reading the task and the
nil store. Furthermore, in addition to having the overlapping of
loads and stores to waiter->task guaranteed to be ordered within
that CPU, both wake_up_process() originally and now wake_q_add()
already imply barriers upon successful calls, which serves the
comment.

Now, as an alternative to perhaps inverting the checks in the blocker
side (which has its own penalty in that schedule is unavoidable),
with lockless wakeups this situation is naturally addressed and we
can just use the refcount held by wake_q_add(), instead doing so
explicitly. Of course, we must guarantee that the nil store is done
as the _last_ operation in that the task must already be marked for
deletion to not fall into the race above. Spurious wakeups are also
handled transparently in that the task's reference is only removed
when wake_up_q() is actually called _after_ the nil store.

Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
Signed-off-by: Peter Zijlstra (Intel) <peterz@xxxxxxxxxxxxx>
Cc: Andrew Morton <akpm@xxxxxxxxxxxxxxxxxxxx>
Cc: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Cc: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
Cc: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
Cc: Thomas Gleixner <tglx@xxxxxxxxxxxxx>
Cc: Waiman.Long@xxxxxxx
Cc: dave@xxxxxxxxxxxx
Cc: jason.low2@xxxxxx
Cc: peter@xxxxxxxxxxxxxxxxxx
Link: http://lkml.kernel.org/r/1463165787-25937-3-git-send-email-dave@xxxxxxxxxxxx
Signed-off-by: Ingo Molnar <mingo@xxxxxxxxxx>
---
kernel/locking/rwsem-xadd.c | 17 +++++++----------
1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index 80b05ac..fcbf75a 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -194,17 +194,15 @@ __rwsem_mark_wake(struct rw_semaphore *sem,
waiter = list_entry(next, struct rwsem_waiter, list);
next = waiter->list.next;
tsk = waiter->task;
+
+ wake_q_add(wake_q, tsk);
/*
- * Make sure we do not wakeup the next reader before
- * setting the nil condition to grant the next reader;
- * otherwise we could miss the wakeup on the other
- * side and end up sleeping again. See the pairing
- * in rwsem_down_read_failed().
+ * Ensure that the last operation is setting the reader
+ * waiter to nil such that rwsem_down_read_failed() cannot
+ * race with do_exit() by always holding a reference count
+ * to the task to wakeup.
*/
- smp_mb();
- waiter->task = NULL;
- wake_q_add(wake_q, tsk);
- put_task_struct(tsk);
+ smp_store_release(&waiter->task, NULL);
} while (--loop);

sem->wait_list.next = next;
@@ -228,7 +226,6 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
/* set up my own style of waitqueue */
waiter.task = tsk;
waiter.type = RWSEM_WAITING_FOR_READ;
- get_task_struct(tsk);

raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list))