Re: [PATCH] mailbox: forward the hrtimer if not queued and under a lock

From: Bjorn Ardo
Date: Mon May 23 2022 - 07:56:36 EST

Hi again,

On 4/20/22 10:28, Bjorn Ardo wrote:

Our current solution are using 4 different mailbox channels asynchronously. The code is part of a larger system, but I can put down some time and try and extract the relevant parts if you still think this is a client issue? But with my current understanding of the code, the race between msg_submit() and txdone_hrtimer() is quite clear, and with my proposed patch that removes this race we have be able to run for very long time without any problems (that is several days). Without the fix we get the race after 5-10 min.

I do not know if you have had any time to review my comments yet, but we have created some examples to trigger the error.

With the attached testmodule mailbox-loadtest.c I can trigger the error by attaching it to the two sides of an mailbox with the following devicetree code:

        mboxtest1 {
                compatible = "mailbox-loadtest";
                mbox-names = "ping", "pong";
                mboxes = <&mbox_loop_pri 0 &mbox_loop_pri 1>;

        mboxtest2 {
                compatible = "mailbox-loadtest";
                mbox-names = "pong", "ping";
                mboxes = <&mbox_loop_scd 0 &mbox_loop_scd 1>;

After that I create load on the mailbox by running (or respectively system) the following:

while echo 1 > /sys/kernel/debug/mboxtest1/ping ; do
usleep 1

while echo 1 > /sys/kernel/debug/mboxtest2/ping ; do
usleep 50000

After a few minutes (normally 2-5) I get errors.

Using the patch I sent earlier the errors goes away.

We also have created a mailbox-loopback.c that is a loopback mailbox that can be used on the same system (to make testing easier on systems that does not have a hardware mailbox), it is also attached. This can be probed by the following devicetree code:

        mbox_loop_pri: mailbox_loop_pri {
                compatible = "mailbox-loopback";
                #mbox-cells = <1>;
                side = <0>;
        mbox_loop_scd: mailbox_loop_scd {
                compatible = "mailbox-loopback";
                #mbox-cells = <1>;
                side = <1>;

And with this loopback mailbox we have also been able to reproduce the errors without the patch applied.

Best Regards,


// SPDX-License-Identifier: GPL-2.0-or-later

/* Module to test a mailbox with heavy load on two concurrent channels.
* Sends the value of the void pointer as the message, so no dereferencing
* of the pointers are done here, or can be done by the mailbox driver.

#include <linux/debugfs.h>
#include <linux/err.h>
#include <linux/fs.h>
#include <linux/io.h>
#include <linux/kernel.h>
#include <linux/mailbox_client.h>
#include <linux/module.h>
#include <linux/of.h>
#include <linux/platform_device.h>
#include <linux/poll.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
#include <linux/interrupt.h>

struct mbox_loadtest_device {
struct device *dev;
struct dentry *root_debugfs_dir;

struct mbox_client ping_client;
struct mbox_chan *ping_channel;
struct completion completion;
struct mutex ping_lock;
u32 ping_rsp;

struct mbox_client pong_client;
struct mbox_chan *pong_channel;
struct tasklet_struct pong_tasklet;
u32 pong_rsp;

static void mbox_loadtest_receive_ping_message(struct mbox_client *client, void *message)
struct mbox_loadtest_device *tdev = container_of(client, struct mbox_loadtest_device, ping_client);

tdev->ping_rsp = (u32)(unsigned long)message;

static void mbox_loadtest_pong_tasklet(unsigned long data)
struct mbox_loadtest_device *tdev = (struct mbox_loadtest_device *)data;

mbox_send_message(tdev->pong_channel, (void *)(unsigned long)tdev->pong_rsp);

static void mbox_loadtest_receive_pong_message(struct mbox_client *client, void *message)
struct mbox_loadtest_device *tdev = container_of(client, struct mbox_loadtest_device, pong_client);

tdev->pong_rsp = ((u32)(unsigned long)message) + 1;
tasklet_init(&tdev->pong_tasklet, mbox_loadtest_pong_tasklet, (unsigned long)tdev);

static int mbox_loadtest_send_ping_message(struct mbox_loadtest_device *tdev, u32 message)
int compleated;
u32 rsp;

mbox_send_message(tdev->ping_channel, (void *)(unsigned long)message);

compleated = wait_for_completion_timeout(&tdev->completion, msecs_to_jiffies(20));
rsp = tdev->ping_rsp;

if (!compleated) {
dev_err(tdev->dev, "Timeout\n");
return -EFAULT;
if (rsp != message+1) {
dev_err(tdev->dev, "Wrong ans %i != %i\n", rsp, message);
return -EFAULT;

return 0;
static ssize_t mbox_loadtest_ping_write(struct file *filp,
const char __user *userbuf,
size_t count, loff_t *ppos)
int ret;
struct mbox_loadtest_device *tdev = filp->private_data;

ret = mbox_loadtest_send_ping_message(tdev, 0x42);
return ret < 0 ? ret : count;

static const struct file_operations mbox_loadtest_ping_ops = {
.write = mbox_loadtest_ping_write,
.open = simple_open,

static int mbox_loadtest_probe(struct platform_device *pdev)
struct mbox_loadtest_device *tdev;
struct device_node *np = pdev->dev.of_node;

tdev = devm_kzalloc(&pdev->dev, sizeof(*tdev), GFP_KERNEL);
if (!tdev)
return -ENOMEM;

if (of_property_match_string(np, "mbox-names", "ping") >= 0) {

tdev-> = &pdev->dev;
tdev->ping_client.rx_callback = mbox_loadtest_receive_ping_message;
tdev->ping_client.tx_done = NULL;
tdev->ping_client.tx_block = false;
tdev->ping_client.knows_txdone = false;
tdev->ping_client.tx_tout = 500;
tdev->ping_channel = mbox_request_channel_byname(&tdev->ping_client, "ping");
if (IS_ERR(tdev->ping_channel)) {

if (debugfs_initialized()) {
tdev->root_debugfs_dir = debugfs_create_dir(dev_name(&pdev->dev), NULL);
debugfs_create_file("ping", 0600, tdev->root_debugfs_dir,
tdev, &mbox_loadtest_ping_ops);

if (of_property_match_string(np, "mbox-names", "pong") >= 0) {

tdev-> = &pdev->dev;
tdev->pong_client.rx_callback = mbox_loadtest_receive_pong_message;
tdev->pong_client.tx_done = NULL;
tdev->pong_client.tx_block = false;
tdev->pong_client.knows_txdone = false;
tdev->pong_client.tx_tout = 500;
tdev->pong_channel = mbox_request_channel_byname(&tdev->pong_client, "pong");
if (IS_ERR(tdev->pong_channel)) {

tdev->dev = &pdev->dev;
platform_set_drvdata(pdev, tdev);
return 0;

static int mbox_loadtest_remove(struct platform_device *pdev)
struct mbox_loadtest_device *tdev = platform_get_drvdata(pdev);


if (tdev->ping_channel)
if (tdev->pong_channel)

return 0;

static const struct of_device_id mbox_loadtest_match[] = {
{ .compatible = "mailbox-loadtest" },
MODULE_DEVICE_TABLE(of, mbox_loadtest_match);

static struct platform_driver mbox_loadtest_driver = {
.driver = {
.name = "mailbox_loadtest",
.of_match_table = mbox_loadtest_match,
.probe = mbox_loadtest_probe,
.remove = mbox_loadtest_remove,

MODULE_DESCRIPTION("Mailbox Load Testing Facility");
// SPDX-License-Identifier: GPL-2.0

/* This is a loopback mailbox that can be used to test drivers on
* a single system. It uses a global memory so only 2 instances
* (one for each side) can be probed.

#include <linux/mailbox_controller.h>
#include <linux/platform_device.h>
#include <linux/irq_work.h>
#include <linux/device.h>
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/of.h>

struct mbox_loopback_chan {
struct mbox_chan *self;
struct mbox_chan *other;
struct irq_work work;
struct hrtimer timer;
u32 msg;
bool pending;


struct mbox_loopback {
struct mbox_controller controller[2];
bool probed[2];
struct mbox_loopback_chan lchans[2*MAILBOX_NUM_CHANS];
struct mbox_chan chans[2*MAILBOX_NUM_CHANS];

/* A global shared memory for both sides of the mailbox */
static struct mbox_loopback mbox_loopback;

static void mbox_loopback_work(struct irq_work *work)
struct mbox_loopback_chan *lchan = container_of(work, struct mbox_loopback_chan, work);

mbox_chan_received_data(lchan->other, (void *)(unsigned long)lchan->msg);
lchan->pending = false;

static bool mbox_loopback_last_tx_done(struct mbox_chan *chan)
struct mbox_loopback_chan *lchan = chan->con_priv;

return !lchan->pending;

static int mbox_loopback_send_data(struct mbox_chan *chan, void *data)
struct mbox_loopback_chan *lchan = chan->con_priv;

lchan->msg = (u32)(unsigned long)data;
lchan->pending = true;

/* Start a timer that will trigger an IRQ in a short while */
hrtimer_start(&lchan->timer, ns_to_ktime(1000), HRTIMER_MODE_REL);
return 0;

static enum hrtimer_restart mbox_loopback_timer_callback(struct hrtimer *hrtimer)
struct mbox_loopback_chan *lchan = container_of(hrtimer, struct mbox_loopback_chan, timer);


static void mbox_loopback_shutdown(struct mbox_chan *chan)
struct mbox_loopback_chan *lchan = chan->con_priv;


static const struct mbox_chan_ops mbox_loopback_ops = {
.send_data = mbox_loopback_send_data,
.shutdown = mbox_loopback_shutdown,
.last_tx_done = mbox_loopback_last_tx_done,

static int mbox_loopback_probe(struct platform_device *pdev)
struct device *dev = &pdev->dev;
int i;
unsigned int side;

/* Check side (nothing or anything but 1 is primary side) */
of_property_read_u32(dev->of_node, "side", &side);
if (side != 1)
side = 0;

if (mbox_loopback.probed[side])
return -ENOMEM;
mbox_loopback.probed[side] = true;

mbox_loopback.controller[side].dev = dev;
mbox_loopback.controller[side].num_chans = MAILBOX_NUM_CHANS;
mbox_loopback.controller[side].txdone_irq = false;
mbox_loopback.controller[side].txdone_poll = true;
mbox_loopback.controller[side].txpoll_period = 1;
mbox_loopback.controller[side].chans = &mbox_loopback.chans[side * MAILBOX_NUM_CHANS];
mbox_loopback.controller[side].ops = &mbox_loopback_ops;

BUILD_BUG_ON(ARRAY_SIZE(mbox_loopback.chans) != ARRAY_SIZE(mbox_loopback.lchans));

for (i = 0; i < MAILBOX_NUM_CHANS; i++) {
int me = i + side * MAILBOX_NUM_CHANS;
int other;

if (me >= MAILBOX_NUM_CHANS) {
other = me - MAILBOX_NUM_CHANS;
} else {
other = me + MAILBOX_NUM_CHANS;

mbox_loopback.lchans[me].self = &mbox_loopback.chans[me];
mbox_loopback.lchans[me].other = &mbox_loopback.chans[other];
init_irq_work(&mbox_loopback.lchans[me].work, mbox_loopback_work);
hrtimer_init(&mbox_loopback.lchans[me].timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
mbox_loopback.lchans[me].timer.function = mbox_loopback_timer_callback;
mbox_loopback.chans[me].con_priv = &mbox_loopback.lchans[me];
return devm_mbox_controller_register(dev, &mbox_loopback.controller[side]);

static const struct of_device_id mbox_loopback_match[] = {
{ .compatible = "mailbox-loopback" },
{ /* sentinel */ }
MODULE_DEVICE_TABLE(of, mbox_loopback_match);

static struct platform_driver mbox_loopback_driver = {
.probe = mbox_loopback_probe,
.driver = {
.name = "mailbox-loopback",
.of_match_table = mbox_loopback_match,


MODULE_DESCRIPTION("Loopback Mailbox");