Re: [PATCH V12 3/9] PCI: Create PCIe library functions in support of DOE mailboxes.

From: Ira Weiny
Date: Tue Jun 28 2022 - 14:20:52 EST


On Tue, Jun 28, 2022 at 03:16:26PM +0100, Jonathan Cameron wrote:
> On Mon, 27 Jun 2022 21:15:21 -0700
> ira.weiny@xxxxxxxxx wrote:
>
> > From: Jonathan Cameron <Jonathan.Cameron@xxxxxxxxxx>
> >
> > Introduced in a PCIe r6.0, sec 6.30, DOE provides a config space based
> > mailbox with standard protocol discovery. Each mailbox is accessed
> > through a DOE Extended Capability.
> >
> > Each DOE mailbox must support the DOE discovery protocol in addition to
> > any number of additional protocols.
> >
> > Define core PCIe functionality to manage a single PCIe DOE mailbox at a
> > defined config space offset. Functionality includes iterating,
> > creating, query of supported protocol, and task submission. Destruction
> > of the mailboxes is device managed.
> >
> > If interrupts are desired, the interrupt number can be queried and
> > passed to the create function. Passing a negative value disables
> > interrupts for that mailbox. It is the caller's responsibility to ensure
> > enough interrupt vectors are allocated.
> >
> > Cc: "Li, Ming" <ming4.li@xxxxxxxxx>
> > Cc: Bjorn Helgaas <helgaas@xxxxxxxxxx>
> > Signed-off-by: Jonathan Cameron <Jonathan.Cameron@xxxxxxxxxx>
> > Co-developed-by: Ira Weiny <ira.weiny@xxxxxxxxx>
> > Signed-off-by: Ira Weiny <ira.weiny@xxxxxxxxx>
> >
> Hi Ira,
>
> Thanks for keeping at this!
>
> I think this has reintroduced some of the races around that annoying
> interrupt source form BUSY transitioning to low that has
> no explicit 'cause' flag. I think we'd hammered all those out in the
> previous version but maybe there were still some there...

Well I really tried hard not to introduce races which would be a problem. But
I would not be surprised.

>
> I 'think' it will work as is, but depending on the timing a given DOE
> implementation has, the interrupt may be completely pointless as it
> will be signaling the wrong event.

:-/

There is a chance that an IRQ comes in just after we timeout waiting for it. I
think that has always been the case and the IRQ will effectively be missed I
_think_.

>
> Jonathan
>

[snip]

>
> > +/*
> > + * Returned from the wait functions to indicate that an abort has been issued
> > + */
> > +#define DOE_WAIT_ABORT 1
> > +
> > +static int pci_doe_arm_wait(struct pci_doe_mb *doe_mb)
>
> Feels like there should be a naming to convey the return value as
> a boolean rather than pushing through a flag value.

Something like?

static bool pci_doe_arm_abort_seen(struct pci_doe_mb *doe_mb)
{
if (test_and_clear_bit(PCI_DOE_FLAG_ABORT, &doe_mb->flags))
return true;
clear_bit(PCI_DOE_FLAG_IRQ, &doe_mb->flags);
return false;
}

...
if (pci_doe_arm_abort_seen(mb))
... Process abort ...
...

>
> > +{
> > + if (test_and_clear_bit(PCI_DOE_FLAG_ABORT, &doe_mb->flags))
> > + return DOE_WAIT_ABORT;
> > + clear_bit(PCI_DOE_FLAG_IRQ, &doe_mb->flags);
> > + return 0;
> > +}
> > +
>
> > +static void pci_doe_write_ctrl(struct pci_doe_mb *doe_mb, u32 val)
> > +{
> > + struct pci_dev *pdev = doe_mb->pdev;
> > + int offset = doe_mb->cap_offset;
> > +
> > + if (pci_doe_irq_enabled(doe_mb))
> > + val |= PCI_DOE_CTRL_INT_EN;
> > + pci_write_config_dword(pdev, offset + PCI_DOE_CTRL, val);
> > +}
> > +
> > +static int pci_doe_issue_abort(struct pci_doe_mb *doe_mb)
> Can we rename this as it does more than simply issue the abort,
> it waits for it to finish

Sure.

How about just pci_doe_abort()? I'm probably going to open code that call now.

>
> > +{
> > + struct pci_dev *pdev = doe_mb->pdev;
> > + int offset = doe_mb->cap_offset;
> > + unsigned long timeout_jiffies;
> > +
> > + pci_dbg(pdev, "[%x] Issuing Abort\n", offset);
> > +
> > + /*
> > + * Abort detected while aborting; something is really broken or the
> > + * mailbox is being destroyed.
> > + */
> > + if (pci_doe_arm_wait(doe_mb))
> > + return -EIO;
> > +
> > + timeout_jiffies = jiffies + PCI_DOE_TIMEOUT;
> > + pci_doe_write_ctrl(doe_mb, PCI_DOE_CTRL_ABORT);
> > +
> > + do {
> > + u32 val;
> > +
> > + /*
> > + * Abort detected while aborting; something is really broken or
> > + * the mailbox is being destroyed.
> > + */
> > + if (pci_doe_wait_irq_or_poll(doe_mb))
> > + return -EIO;
> > + pci_read_config_dword(pdev, offset + PCI_DOE_STATUS, &val);
> > +
> > + /* Abort success! */
> > + if (!FIELD_GET(PCI_DOE_STATUS_ERROR, val) &&
> > + !FIELD_GET(PCI_DOE_STATUS_BUSY, val))
> > + return 0;
> > +
> > + } while (!time_after(jiffies, timeout_jiffies));
> > +
> > + /* Abort has timed out and the MB is dead */
> > + pci_err(pdev, "[%x] ABORT timed out\n", offset);
>
> Does this print mention it's a DOE somewhere?

Yep, per Bjorn's suggestion I removed all the 'DOE ' strings and use the
dev_fmt specifier.

[snip]

> > +
> > + /* First 2 dwords have already been read */
> > + length -= 2;
> > + payload_length = min(length, task->response_pl_sz / sizeof(u32));
> > + /* Read the rest of the response payload */
> > + for (i = 0; i < payload_length; i++) {
> > + pci_read_config_dword(pdev, offset + PCI_DOE_READ,
> > + &task->response_pl[i]);
> > + /* Prior to the last ack, ensure Data Object Ready */
> > + if (i == (payload_length-1) && !pci_doe_data_obj_ready(doe_mb))
>
> spaces around -

Done.

>
> > + return -EIO;
> > + pci_write_config_dword(pdev, offset + PCI_DOE_READ, 0);
> > + }
> > +
> > + /* Flush excess length */
> > + for (; i < length; i++) {
> > + pci_read_config_dword(pdev, offset + PCI_DOE_READ, &val);
> > + pci_write_config_dword(pdev, offset + PCI_DOE_READ, 0);
> > + }
> > +
> > + /* Final error check to pick up on any since Data Object Ready */
> > + pci_read_config_dword(pdev, offset + PCI_DOE_STATUS, &val);
> > + if (FIELD_GET(PCI_DOE_STATUS_ERROR, val))
> > + return -EIO;
> > +
> > + return min(length, task->response_pl_sz / sizeof(u32)) * sizeof(u32);
> > +}
> > +
>
> > +
> > +static void doe_statemachine_work(struct work_struct *work)
> > +{
> > + struct pci_doe_task *task = container_of(work, struct pci_doe_task,
> > + work);
> > + struct pci_doe_mb *doe_mb = task->doe_mb;
> > + struct pci_dev *pdev = doe_mb->pdev;
> > + int offset = doe_mb->cap_offset;
> > + unsigned int busy_retries = 0;
> > + unsigned long timeout_jiffies;
> > + u32 val;
> > + int rc;
> > +
> > + if (test_bit(PCI_DOE_FLAG_DEAD, &doe_mb->flags)) {
> > + signal_task_complete(task, -EIO);
> > + return;
> > + }
> > +
> > + /* Send request */
> > +retry_req:
> > + if (pci_doe_arm_wait(doe_mb)) {
> > + signal_task_abort(task, -EIO);
> > + return;
> > + }
>
> Is there a race here? If Busy drops at this point we queue up
> a message, but IRQ bit is already set. Hence when we hit
> wait_event_timeout() the flag is already set and IIRC we'll
> drop straight through.
>

I did not realize that the device would interrupt when Busy dropped? I was
thinking that V11 did not respond to IRQ but indeed it did via setting the work
item to run immediately...

However, I only see this in the spec:

1) System firmware/software checks that the DOE Busy bit is Clear to ensure
that the DOE instance is ready to receive a DOE request.

>
> It'll probably be fine because it will end up polling below
> but doesn't look ideal.

I agree it would not be ideal but I think if we are waiting for Busy to be
cleared then the pci_doe_arm_wait() should be benign.

>
> Upshot is that you sort of have to handle "spurious interrupts"
> cleanly and rewait on the interrupt if you get one whilst also handling
> race conditions around RW1C of the interrupt status flag.

Sorry I'm not sure what 'RW1C' means here?

Anyway, spurious interrupts was something I was concerned about but I don't
think there is anything we can do about an interrupt coming in when we are
expecting one but the device did not really send one. AFAIK that is virtually
impossible anyway.

If we actually 'miss' one because we timed out before the device sent it then I
think we are going to ignore the PCI_DOE_FLAG_IRQ flag on the next go around.

Actually timeout is handled by the abort call and that IRQ will, depending on
timing, cause a full PCI_DOE_TIMEOUT to expire. :-( That is indeed not
ideal. However, by that time the error and busy flags should be clear and we
can safely continue. Otherwise we are going to take the mailbox down.

It may seem better to arm wait on each iteration through the abort loop. But
this is not logically correct because the abort operation should trigger an
IRQ. So there is always a race if we missed an IRQ because we timed out early.

>
>
> > +
> > + rc = pci_doe_send_req(doe_mb, task);
> > +
> > + /*
> > + * The specification does not provide any guidance on how long
> > + * some other entity could keep the DOE busy, so try for 1
> > + * second then fail. Busy handling is best effort only, because
> > + * there is no way of avoiding racing against another user of
> > + * the DOE.
> > + */
> > + if (rc == -EBUSY) {
> > + busy_retries++;
> > + if (busy_retries == PCI_DOE_BUSY_MAX_RETRIES) {
> > + pci_warn(pdev,
> > + "[%x] busy for too long (> 1 sec)\n",
> > + offset);
> > + signal_task_complete(task, rc);
> > + return;
> > + }
> > + if (pci_doe_wait_poll(doe_mb, HZ / PCI_DOE_BUSY_MAX_RETRIES)) {
> > + signal_task_abort(task, rc);
> > + return;
> > + }
> > + goto retry_req;
> > + } else if (rc) {
> > + signal_task_abort(task, rc);
> > + return;
> > + }
> > +
> > + timeout_jiffies = jiffies + HZ;
> > + if (pci_doe_wait_irq_or_poll(doe_mb)) {
>
> So this may well be passed as a result of a BUSY transition to 0 very soon
> after the doe_send_req but well before the data is ready....

I think the simple fix is to make the BUSY wait on an IRQ. Something like:

21:13:53 > git di
diff --git a/drivers/pci/doe.c b/drivers/pci/doe.c
index 12f9f8045eb7..afd326320798 100644
--- a/drivers/pci/doe.c
+++ b/drivers/pci/doe.c
@@ -352,7 +352,7 @@ static void doe_statemachine_work(struct work_struct *work)
signal_task_complete(task, rc);
return;
}
- if (pci_doe_wait_poll(doe_mb, HZ / PCI_DOE_BUSY_MAX_RETRIES)) {
+ if (pci_doe_wait_irq_or_poll(...)) {
signal_task_abort(task, rc);
return;
}

>
>
> > + signal_task_abort(task, -EIO);
> > + return;
> > + }
> > +
> > + /* Poll for response */
> > +retry_resp:
> > + if (pci_doe_arm_wait(doe_mb)) {
> I think we can get here between Busy drop and Object Ready which means
> this can get another IRQ_FLAG setting just after it. Does it matter?
> Don't think so, as we don't use that bit again in this run through
> and it will be cleared at beginning of next one,

Yea basically I agree.

> but if so why is
> this call here?

Seemed like the right thing to do at the time... ;-) j/k

> I think it's only useful for detecting an abort, if
> so do that explicitly.

Actually it is the right thing to do... However, the wait poll below also
needs to be an IRQ or poll. I'm not sure how I missed that logic.

>
> > + signal_task_abort(task, -EIO);
> > + return;
> > + }
> > +
> > + pci_read_config_dword(pdev, offset + PCI_DOE_STATUS, &val);
> > + if (FIELD_GET(PCI_DOE_STATUS_ERROR, val)) {
> > + signal_task_abort(task, -EIO);
> > + return;
> > + }
> > +
> > + if (!FIELD_GET(PCI_DOE_STATUS_DATA_OBJECT_READY, val)) {
> > + if (time_after(jiffies, timeout_jiffies)) {
> > + signal_task_abort(task, -ETIMEDOUT);
> > + return;
> > + }
> > + if (pci_doe_wait_poll(doe_mb, PCI_DOE_POLL_INTERVAL)) {
>
> Following on from above....
> As a result of the interrupt having fired on the BUSY off transition,
> I think we will almost always end up spinning here until Object Ready
> is set. Fine, but seems a shame we don't let an interrupt do this
> for us in most cases. (note in QEMU response is instantaneous so
> when the interrupt for Busy drop is set, object ready also true so
> by the time we get here data ready will already be sent).

This needs to be pci_doe_wait_irq_or_poll() as well and the arm wait above
ensures we continue to look for that interrupt.

I'm starting to see how I got confused. The timeout values all vary and
mod_delayed_work() made it very easy for you to interrupt any of those.

I tried to bundle the poll vs interrupt modes in pci_doe_wait_irq_or_poll() and
got confused. :-(

>
> > + signal_task_abort(task, -EIO);
> > + return;
> > + }
> > + goto retry_resp;
> > + }
> > +
> > + rc = pci_doe_recv_resp(doe_mb, task);
> > + if (rc < 0) {
> > + signal_task_abort(task, rc);
> > + return;
> > + }
> > +
> > + signal_task_complete(task, rc);
> > +}
> > +
>
>
> > +static void pci_doe_free_mb(struct pci_doe_mb *doe_mb)
> > +{
> > + if (doe_mb->work_queue)
>
> I'm not a great fan of free functions that check a bunch of conditions
> because they may be called before things are set up.

I'll see what I can do. I do kind of like this but I think it gets muddled and
I'm not dead set on either way.

> To my
> mind that generally means we should be calling individual cleanup
> in the appropriate error handlers.
>
> Either that or just use devm handling for each item. Sure
> it's a few more lines of code, but I find it a lot easier to go
>
> Oh look that thing we just set up is cleaned up by this.
>
>
> > + destroy_workqueue(doe_mb->work_queue);
> > + if (pci_doe_irq_enabled(doe_mb))
> > + pci_free_irq(doe_mb->pdev, doe_mb->int_msg_num, doe_mb);
> > + xa_destroy(&doe_mb->prots);
> > + kfree(doe_mb);
> > +}
> > +
>
> ...
>
> > +
> > +static void pci_doe_destroy_mb(void *mb)
> > +{
> > + struct pci_doe_mb *doe_mb = mb;
> > +
> > + /* Mark going down */
> > + set_bit(PCI_DOE_FLAG_DEAD, &doe_mb->flags);
> > +
> > + /* Abort any in progress work items */
> > + pci_doe_abort(doe_mb);
>
> Abort is getting used for two things in here. Perhaps
> rename this one to
> pci_doe_abort_tasks() or something like that?

What do you mean two things? Oh I think I see. You mean abort the work item
vs abort sent to the hardware?

This no longer aborts all the tasks just the one which may be in progress.
Because the work queue is ordered only one task may be in progress. I'll clean
up the comment too.

This sets the abort flag and wakes it up if it is sleeping. If not then the
abort flag will be detected in the next arm.

FWIW I think I may just remove this call and open code it here.

>
> > +
> > + /* Flush remaining work items */
> > + flush_workqueue(doe_mb->work_queue);
> > +
> > + pci_doe_free_mb(doe_mb);
> > +}
> > +
> > +/**
> > + * pcim_doe_create_mb() - Create a DOE mailbox object
> > + *
> > + * @pdev: PCI device to create the DOE mailbox for
> > + * @cap_offset: Offset of the DOE mailbox
> > + * @int_msg_num: Interrupt message number to use; a negative value means don't
> > + * use interrupts
> > + *
> > + * Create a single mailbox object to manage the mailbox protocol at the
> > + * cap_offset specified.
> > + *
> > + * Caller should allocate PCI IRQ vectors before passing a possitive value for
>
> positive

Thanks fixed.

>
> > + * int_msg_num.
> > + *
> > + * RETURNS: created mailbox object on success
> > + * ERR_PTR(-errno) on failure
> > + */
> > +struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset,
> > + int int_msg_num)
> > +{
> > + struct pci_doe_mb *doe_mb;
> > + int rc;
> > +
> > + doe_mb = kzalloc(sizeof(*doe_mb), GFP_KERNEL);
> > + if (!doe_mb)
> > + return ERR_PTR(-ENOMEM);
> > +
> > + doe_mb->pdev = pdev;
> > + doe_mb->int_msg_num = -1;
> > + doe_mb->cap_offset = cap_offset;
> > +
> > + xa_init(&doe_mb->prots);
> > + init_waitqueue_head(&doe_mb->wq);
> > +
> > + if (int_msg_num >= 0) {
> > + rc = pci_doe_enable_irq(doe_mb, int_msg_num);
> > + if (rc)
> > + pci_err(pdev,
> > + "[%x] enable requested IRQ (%d) failed : %d\n",
> > + doe_mb->cap_offset, int_msg_num, rc);
>
> If we are printing an error, I'd argue we should not continue.
> Or at very least we should a comment here to say why we should do so...
>

Not continue seems reasonable.

>
> > + }
> > +
> > + doe_mb->work_queue = alloc_ordered_workqueue("DOE: [%x]", 0,
> > + doe_mb->cap_offset);
> > + if (!doe_mb->work_queue) {
> > + pci_err(pdev, "[%x] failed to allocate work queue\n",
> > + doe_mb->cap_offset);
> > + pci_doe_free_mb(doe_mb);
>
> As above, I'd rather this explicitly freed what has been set up
> and only that rather than calling a free function that does a bunch of
> stuff conditionally.

I think I can make that work. This is the only conditional in free however,
because the other conditional is the IRQ support which may not be set up.

Thanks again for the in depth review!
Ira

>
>
> > + return ERR_PTR(-ENOMEM);
> > + }
> > +
> > + /* Reset the mailbox by issuing an abort */
> > + rc = pci_doe_issue_abort(doe_mb);
> > + if (rc) {
> > + pci_err(pdev, "[%x] failed to reset : %d\n",
> > + doe_mb->cap_offset, rc);
> > + pci_doe_free_mb(doe_mb);
> > + return ERR_PTR(rc);
> > + }
> > +
> > + if (devm_add_action_or_reset(&pdev->dev, pci_doe_destroy_mb, doe_mb))
> > + return ERR_PTR(-EIO);
> > +
> > + rc = pci_doe_cache_protocols(doe_mb);
> > + if (rc) {
> > + pci_err(pdev, "[%x] failed to cache protocols : %d\n",
> > + doe_mb->cap_offset, rc);
> > + return ERR_PTR(rc);
> > + }
> > +
> > + return doe_mb;
> > +}
> > +EXPORT_SYMBOL_GPL(pcim_doe_create_mb);
> > +
>
>
>