[PATCH 2/2] ipc/msg.c: Handle case of senders not enqueuing the message

From: Manfred Spraul
Date: Sun May 24 2020 - 08:47:31 EST


The patch "ipc/msg.c: wake up senders until there is a queue empty
capacity" avoids the thundering herd problem by wakeing up
only as many potential senders as there is free space in the queue.

This patch is a fix: If one of the senders doesn't enqueue its message,
then a search for further potential senders must be performed.

Signed-off-by: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
---
ipc/msg.c | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)

diff --git a/ipc/msg.c b/ipc/msg.c
index 52d634b0a65a..f6d5188db38a 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -208,6 +208,12 @@ static inline void ss_del(struct msg_sender *mss)
list_del(&mss->list);
}

+/*
+ * ss_wakeup() assumes that the stored senders will enqueue the pending message.
+ * Thus: If a woken up task doesn't send the enqueued message for whatever
+ * reason, then that task must call ss_wakeup() again, to ensure that no
+ * wakeup is lost.
+ */
static void ss_wakeup(struct msg_queue *msq,
struct wake_q_head *wake_q, bool kill)
{
@@ -843,6 +849,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
struct msg_queue *msq;
struct msg_msg *msg;
int err;
+ bool need_wakeup;
struct ipc_namespace *ns;
DEFINE_WAKE_Q(wake_q);

@@ -869,6 +876,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,

ipc_lock_object(&msq->q_perm);

+ need_wakeup = false;
for (;;) {
struct msg_sender s;

@@ -898,6 +906,13 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
/* enqueue the sender and prepare to block */
ss_add(msq, &s, msgsz);

+ /* Enqueuing a sender is actually an obligation:
+ * The sender must either enqueue the message, or call
+ * ss_wakeup(). Thus track that we have added our message
+ * to the candidates for the message queue.
+ */
+ need_wakeup = true;
+
if (!ipc_rcu_getref(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
@@ -935,12 +950,18 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
msq->q_qnum++;
atomic_add(msgsz, &ns->msg_bytes);
atomic_inc(&ns->msg_hdrs);
+
+ /* we have fulfilled our obligation, no need for wakeup */
+ need_wakeup = false;
}

err = 0;
msg = NULL;

out_unlock0:
+ if (need_wakeup)
+ ss_wakeup(msq, &wake_q, false);
+
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
--
2.26.2


--------------32DD537C888A21B4AD1ED564--