Re: [RFC] locking/rwsem: Avoid issuing wakeup before setting the reader waiter to nil

From: Davidlohr Bueso
Date: Mon Dec 17 2018 - 15:53:26 EST


On Mon, 17 Dec 2018, Peter Zijlstra wrote:

I've put some patches here:

git://git.kernel.org/pub/scm/linux/kernel/git/peterz/queue.git locking/core

Could you have a look?

So how about the following to reduce some of the performance penalty (at
the cost of more complexity)?

Thanks,
Davidlohr

----------8<-----------------------------------------------------------
[PATCH] sched/wake_q: Reduce reference counting for special users

Some users, specifically futexes and rwsems, required fixes
that allowed the callers to be safe when wakeups occur before
they are expected by wake_up_q(). Such scenarios also play
games and rely on reference counting, and until now were
pivoting on wake_q doing it. With the wake_q_add() call being
moved down, this can no longer be the case. As such we end up
with a double task refcounting overhead; and these callers
care enough about this (being rather core-ish).

This patch introduces a wake_q_add_tasksafe() call that serves
for callers that have already done refcounting and therefore the
task is 'safe' from wake_q point of view (int that it requires
reference throughout the entire queue/wakeup cycle). These
users also need to check the return value of the operation and
do the put() if necessary when the cmpxchg() fails. Regular users
of wake_q_add() that don't care about when the wakeup actually
happens can just ignore the return value.

Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
---
include/linux/sched/wake_q.h | 7 ++++--
kernel/futex.c | 4 ++--
kernel/locking/rwsem-xadd.c | 7 +++---
kernel/sched/core.c | 53 +++++++++++++++++++++++++++++++-------------
4 files changed, 49 insertions(+), 22 deletions(-)

diff --git a/include/linux/sched/wake_q.h b/include/linux/sched/wake_q.h
index 545f37138057..8c1fc6434c6c 100644
--- a/include/linux/sched/wake_q.h
+++ b/include/linux/sched/wake_q.h
@@ -51,8 +51,11 @@ static inline void wake_q_init(struct wake_q_head *head)
head->lastp = &head->first;
}

-extern void wake_q_add(struct wake_q_head *head,
- struct task_struct *task);
+extern bool wake_q_add(struct wake_q_head *head,
+ struct task_struct *task);
+extern bool wake_q_add_tasksafe(struct wake_q_head *head,
+ struct task_struct *task);
+
extern void wake_up_q(struct wake_q_head *head);

#endif /* _LINUX_SCHED_WAKE_Q_H */
diff --git a/kernel/futex.c b/kernel/futex.c
index d14971f6ed3d..2ff7e811f13b 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1402,8 +1402,8 @@ static void mark_wake_futex(struct wake_q_head *wake_q, struct futex_q *q)
* Queue the task for later wakeup for after we've released
* the hb->lock. wake_q_add() grabs reference to p.
*/
- wake_q_add(wake_q, p);
- put_task_struct(p);
+ if (!wake_q_add_tasksafe(wake_q, p))
+ put_task_struct(p);
}

/*
diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index 50d9af615dc4..dea4dcf9d8f5 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -211,9 +211,10 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
* Ensure issuing the wakeup (either by us or someone else)
* after setting the reader waiter to nil.
*/
- wake_q_add(wake_q, tsk);
- /* wake_q_add() already take the task ref */
- put_task_struct(tsk);
+ if (!wake_q_add_tasksafe(wake_q, tsk)) {
+ /* wake_q_add() already take the task ref */
+ put_task_struct(tsk);
+ }
}

adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index d740d7a3608d..2c1825fe46e6 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -396,19 +396,8 @@ static bool set_nr_if_polling(struct task_struct *p)
#endif
#endif

-/**
- * wake_q_add() - queue a wakeup for 'later' waking.
- * @head: the wake_q_head to add @task to
- * @task: the task to queue for 'later' wakeup
- *
- * Queue a task for later wakeup, most likely by the wake_up_q() call in the
- * same context, _HOWEVER_ this is not guaranteed, the wakeup can come
- * instantly.
- *
- * This function must be used as-if it were wake_up_process(); IOW the task
- * must be ready to be woken at this location.
- */
-void wake_q_add(struct wake_q_head *head, struct task_struct *task)
+bool __wake_q_add(struct wake_q_head *head,
+ struct task_struct *task, bool tasksafe)
{
struct wake_q_node *node = &task->wake_q;

@@ -422,15 +411,49 @@ void wake_q_add(struct wake_q_head *head, struct task_struct *task)
*/
smp_mb__before_atomic();
if (unlikely(cmpxchg_relaxed(&node->next, NULL, WAKE_Q_TAIL)))
- return;
+ return false;

- get_task_struct(task);
+ if (!tasksafe)
+ get_task_struct(task);

/*
* The head is context local, there can be no concurrency.
*/
*head->lastp = node;
head->lastp = &node->next;
+ return true;
+}
+
+/**
+ * wake_q_add() - queue a wakeup for 'later' waking.
+ * @head: the wake_q_head to add @task to
+ * @task: the task to queue for 'later' wakeup
+ *
+ * Queue a task for later wakeup, most likely by the wake_up_q() call in the
+ * same context, _HOWEVER_ this is not guaranteed, the wakeup can come
+ * instantly.
+ *
+ * This function must be used as-if it were wake_up_process(); IOW the task
+ * must be ready to be woken at this location.
+ *
+ * Returns whether or not the task was successfully queued for wakeup.
+ * If false, the task is already queued and can happen at any time after
+ * this call.
+ */
+bool wake_q_add(struct wake_q_head *head, struct task_struct *task)
+{
+ return __wake_q_add(head, task, false);
+}
+
+/*
+ * wake_q_add_tasksafe() is the same as the above wake_q_add(), except that
+ * the caller has already done the task reference counting for us. Normally
+ * the 'tasksafe' caller will check the return value and cleanup refcounting
+ * accordingly.
+ */
+bool wake_q_add_tasksafe(struct wake_q_head *head, struct task_struct *task)
+{
+ return __wake_q_add(head, task, true);
}

void wake_up_q(struct wake_q_head *head)
--
2.16.4