Re: [PATCH v4] ipc/msg: Implement lockless pipelined wakeups

From: Davidlohr Bueso
Date: Wed Jul 20 2016 - 20:16:45 EST


On Wed, 20 Jul 2016, Sebastian Andrzej Siewior wrote:

This patch moves the wakeup_process() invocation so it is not done under
the perm->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once the message has been assigned and it does not
need to loop on SMP if the message points to NULL. In the signal case we
still need to check the pointer under the lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the NULL -> !NULL
change if the waiter has a higher priority compared to the waker.

This has been tested with Manred's pmsg-shared tool on a "AMD A10-7800
Radeon R7, 12 Compute Cores 4C+8G":

test | before | after | diff
-----------------|------------|------------|----------
pmsg-shared 8 60 | 19,347,422 | 30,442,191 | + ~57.34 %
pmsg-shared 4 60 | 21,367,197 | 35,743,458 | + ~67.28 %
pmsg-shared 2 60 | 22,884,224 | 24,278,200 | + ~6.09 %

v3???v4: - drop smp_wmb in the error case as per Davidlohr
v2???v3: - add smp_[rw]mb back including the usage graphic of them
- use READ_ONCE / WRITE_ONCE after the removal of the volatile
attribute.
v1???v2:
- msg_receiver.r_msg is no longer volatile. After all we no
longer have that busy loop.
- added a comment while we do wake_q_add() followed by the
assignment of ->r_msg and not the other way around.

Cc: Davidlohr Bueso <dave@xxxxxxxxxxxx>
Cc: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
Cc: Andrew Morton <akpm@xxxxxxxxxxxxxxxxxxxx>
Signed-off-by: Sebastian Andrzej Siewior <bigeasy@xxxxxxxxxxxxx>

This looks good to me, and is now very similar to the posix flavor of
msg queues in this regard. fwiw I threw it under ltp msgqueue specific
tests without things breaking. Just a small comments below, otherwise
feel free to add my:

Reviewed-by: Davidlohr Bueso <dave@xxxxxxxxxxxx>

-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+ struct wake_q_head *wake_q)
{
struct msg_receiver *msr, *t;

@@ -577,27 +571,23 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)

list_del(&msr->r_list);
if (msr->r_maxsize < msg->m_ts) {
- /* initialize pipelined send ordering */
- msr->r_msg = NULL;
- wake_up_process(msr->r_tsk);
- /* barrier (B) see barrier comment below */
- smp_wmb();
- msr->r_msg = ERR_PTR(-E2BIG);
+ WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
+ /*
+ * rely on wake_q_add() barrier instead of
+ * explicit smp_wmb
+ */
+ wake_q_add(wake_q, msr->r_tsk);
} else {
- msr->r_msg = NULL;
msq->q_lrpid = task_pid_vnr(msr->r_tsk);
msq->q_rtime = get_seconds();
- wake_up_process(msr->r_tsk);
/*
- * Ensure that the wakeup is visible before
- * setting r_msg, as the receiving can otherwise
- * exit - once r_msg is set, the receiver can
- * continue. See lockless receive part 1 and 2
- * in do_msgrcv(). Barrier (B).
+ * Ensure that we see the new r_msg after the
+ * wake up or the old value forcing to take the
+ * queue lock.
*/
- smp_wmb();
- msr->r_msg = msg;
-
+ WRITE_ONCE(msr->r_msg, msg);
+ smp_wmb(); /* barrier (B) */
+ wake_q_add(wake_q, msr->r_tsk);

Just as with expunge_all and the E2BIG case, could you remove that explicit
barrier (B) and just rely on wake_q_add?

Thanks,
Davidlohr