[RFC][PATCH] mailbox: per-channel arrays with msg data and completion structures

From: Alexey Klimov
Date: Thu Apr 06 2017 - 08:57:02 EST


When a mailbox client doesn't serialize sending of the message itself,
and asks mailbox framework to block on mbox_send_message(), one
completion structure per channel is not enough. Client can make a few
mbox_send_message() calls at the same time, and there is no guaranteed
order of going to sleep on completion.

If mailbox controller acks a message transfer, then tx_tick() wakes up
the first thread that waits on completion.
If mailbox controller doesn't ack the transfer and timeout happens, then
tx_tick() calls complete, and the next caller trying to sleep on
completion wakes up immediately.

This patch fixes this by changing completion structures to be inserted
into an array that contains a) pointer to data provided by client and
b) the completion structure. Thus active_req field tracks the index of
the current running request that was submitted to mailbox controller.

Signed-off-by: Alexey Klimov <alexey.klimov@xxxxxxx>
---
drivers/mailbox/mailbox.c | 40 +++++++++++++++++++++++---------------
drivers/mailbox/pcc.c | 10 +++++++---
include/linux/mailbox_controller.h | 24 +++++++++++++++++------
3 files changed, 49 insertions(+), 25 deletions(-)

diff --git a/drivers/mailbox/mailbox.c b/drivers/mailbox/mailbox.c
index 59b7221..d90e855 100644
--- a/drivers/mailbox/mailbox.c
+++ b/drivers/mailbox/mailbox.c
@@ -40,7 +40,7 @@ static int add_to_rbuf(struct mbox_chan *chan, void *mssg)
}

idx = chan->msg_free;
- chan->msg_data[idx] = mssg;
+ chan->msg_data[idx].msg_data = mssg;
chan->msg_count++;

if (idx == MBOX_TX_QUEUE_LEN - 1)
@@ -62,24 +62,25 @@ static void msg_submit(struct mbox_chan *chan)

spin_lock_irqsave(&chan->lock, flags);

- if (!chan->msg_count || chan->active_req)
+ if (!chan->msg_count || chan->active_req >= 0)
goto exit;

count = chan->msg_count;
idx = chan->msg_free;
+
if (idx >= count)
idx -= count;
else
idx += MBOX_TX_QUEUE_LEN - count;

- data = chan->msg_data[idx];
+ data = chan->msg_data[idx].msg_data;

if (chan->cl->tx_prepare)
chan->cl->tx_prepare(chan->cl, data);
/* Try to submit a message to the MBOX controller */
err = chan->mbox->ops->send_data(chan, data);
if (!err) {
- chan->active_req = data;
+ chan->active_req = idx;
chan->msg_count--;
}
exit:
@@ -93,11 +94,15 @@ static void msg_submit(struct mbox_chan *chan)
static void tx_tick(struct mbox_chan *chan, int r)
{
unsigned long flags;
- void *mssg;
+ void *mssg = NULL;
+ int idx = -1;

spin_lock_irqsave(&chan->lock, flags);
- mssg = chan->active_req;
- chan->active_req = NULL;
+ if (chan->active_req >= 0) {
+ mssg = chan->msg_data[chan->active_req].msg_data;
+ idx = chan->active_req;
+ chan->active_req = -1;
+ }
spin_unlock_irqrestore(&chan->lock, flags);

/* Submit next message */
@@ -107,8 +112,9 @@ static void tx_tick(struct mbox_chan *chan, int r)
if (mssg && chan->cl->tx_done)
chan->cl->tx_done(chan->cl, mssg, r);

- if (chan->cl->tx_block)
- complete(&chan->tx_complete);
+ if (chan->cl->tx_block && idx >= 0
+ && !completion_done(&chan->msg_data[idx].tx_complete))
+ complete(&chan->msg_data[idx].tx_complete);
}

static enum hrtimer_restart txdone_hrtimer(struct hrtimer *hrtimer)
@@ -121,7 +127,7 @@ static enum hrtimer_restart txdone_hrtimer(struct hrtimer *hrtimer)
for (i = 0; i < mbox->num_chans; i++) {
struct mbox_chan *chan = &mbox->chans[i];

- if (chan->active_req && chan->cl) {
+ if ((chan->active_req >= 0) && chan->cl) {
txdone = chan->mbox->ops->last_tx_done(chan);
if (txdone)
tx_tick(chan, 0);
@@ -260,7 +266,7 @@ int mbox_send_message(struct mbox_chan *chan, void *mssg)

msg_submit(chan);

- if (chan->cl->tx_block && chan->active_req) {
+ if (chan->cl->tx_block) {
unsigned long wait;
int ret;

@@ -269,7 +275,8 @@ int mbox_send_message(struct mbox_chan *chan, void *mssg)
else
wait = msecs_to_jiffies(chan->cl->tx_tout);

- ret = wait_for_completion_timeout(&chan->tx_complete, wait);
+ reinit_completion(&chan->msg_data[t].tx_complete);
+ ret = wait_for_completion_timeout(&chan->msg_data[t].tx_complete, wait);
if (ret == 0) {
t = -EIO;
tx_tick(chan, -EIO);
@@ -304,7 +311,7 @@ struct mbox_chan *mbox_request_channel(struct mbox_client *cl, int index)
struct of_phandle_args spec;
struct mbox_chan *chan;
unsigned long flags;
- int ret;
+ int ret, i;

if (!dev || !dev->of_node) {
pr_debug("%s: No owner device node\n", __func__);
@@ -343,9 +350,10 @@ struct mbox_chan *mbox_request_channel(struct mbox_client *cl, int index)
spin_lock_irqsave(&chan->lock, flags);
chan->msg_free = 0;
chan->msg_count = 0;
- chan->active_req = NULL;
+ chan->active_req = -1;
chan->cl = cl;
- init_completion(&chan->tx_complete);
+ for (i = 0; i < MBOX_TX_QUEUE_LEN; i++)
+ init_completion(&chan->msg_data[i].tx_complete);

if (chan->txdone_method == TXDONE_BY_POLL && cl->knows_txdone)
chan->txdone_method |= TXDONE_BY_ACK;
@@ -410,7 +418,7 @@ void mbox_free_channel(struct mbox_chan *chan)
/* The queued TX requests are simply aborted, no callbacks are made */
spin_lock_irqsave(&chan->lock, flags);
chan->cl = NULL;
- chan->active_req = NULL;
+ chan->active_req = -1;
if (chan->txdone_method == (TXDONE_BY_POLL | TXDONE_BY_ACK))
chan->txdone_method = TXDONE_BY_POLL;

diff --git a/drivers/mailbox/pcc.c b/drivers/mailbox/pcc.c
index dd9ecd35..39481ec 100644
--- a/drivers/mailbox/pcc.c
+++ b/drivers/mailbox/pcc.c
@@ -243,6 +243,7 @@ struct mbox_chan *pcc_mbox_request_channel(struct mbox_client *cl,
struct device *dev = pcc_mbox_ctrl.dev;
struct mbox_chan *chan;
unsigned long flags;
+ int i;

/*
* Each PCC Subspace is a Mailbox Channel.
@@ -261,9 +262,11 @@ struct mbox_chan *pcc_mbox_request_channel(struct mbox_client *cl,
spin_lock_irqsave(&chan->lock, flags);
chan->msg_free = 0;
chan->msg_count = 0;
- chan->active_req = NULL;
+ chan->active_req = -1;
chan->cl = cl;
- init_completion(&chan->tx_complete);
+
+ for (i = 0; i < MBOX_TX_QUEUE_LEN; i++)
+ init_completion(&chan->msg_data[i].tx_complete);

if (chan->txdone_method == TXDONE_BY_POLL && cl->knows_txdone)
chan->txdone_method |= TXDONE_BY_ACK;
@@ -311,7 +314,8 @@ void pcc_mbox_free_channel(struct mbox_chan *chan)

spin_lock_irqsave(&chan->lock, flags);
chan->cl = NULL;
- chan->active_req = NULL;
+ chan->active_req = -1;
+
if (chan->txdone_method == (TXDONE_BY_POLL | TXDONE_BY_ACK))
chan->txdone_method = TXDONE_BY_POLL;

diff --git a/include/linux/mailbox_controller.h b/include/linux/mailbox_controller.h
index 74deadb..52dd45e 100644
--- a/include/linux/mailbox_controller.h
+++ b/include/linux/mailbox_controller.h
@@ -101,29 +101,41 @@ struct mbox_controller {
*/
#define MBOX_TX_QUEUE_LEN 20

+
+/**
+ * struct msg_data - elements of array associated with queued requests
+ * @msg_data: Hook for data packet
+ * @tx_complete: Transmission completion
+ */
+
+struct msg_data {
+ void *msg_data;
+ struct completion tx_complete;
+};
+
/**
* struct mbox_chan - s/w representation of a communication chan
* @mbox: Pointer to the parent/provider of this channel
* @txdone_method: Way to detect TXDone chosen by the API
* @cl: Pointer to the current owner of this channel
- * @tx_complete: Transmission completion
- * @active_req: Currently active request hook
+ * @active_req: Index of currently active slot
* @msg_count: No. of mssg currently queued
* @msg_free: Index of next available mssg slot
- * @msg_data: Hook for data packet
* @lock: Serialise access to the channel
* @con_priv: Hook for controller driver to attach private data
+ * @msg_data: Array containing indexed data associated with queued
+ * requests
*/
+
struct mbox_chan {
struct mbox_controller *mbox;
unsigned txdone_method;
struct mbox_client *cl;
- struct completion tx_complete;
- void *active_req;
+ int active_req;
unsigned msg_count, msg_free;
- void *msg_data[MBOX_TX_QUEUE_LEN];
spinlock_t lock; /* Serialise access to the channel */
void *con_priv;
+ struct msg_data msg_data[MBOX_TX_QUEUE_LEN];
};

int mbox_controller_register(struct mbox_controller *mbox); /* can sleep */
--
1.9.1