Re: [PATCH net-next] ptr_ring: make __ptr_ring_empty() checking more reliable

From: Yunsheng Lin
Date: Thu May 27 2021 - 02:08:30 EST


On 2021/5/27 12:57, Jason Wang wrote:
>
> 在 2021/5/26 下午8:29, Yunsheng Lin 写道:
>> Currently r->queue[] is cleared after r->consumer_head is moved
>> forward, which makes the __ptr_ring_empty() checking called in
>> page_pool_refill_alloc_cache() unreliable if the checking is done
>> after the r->queue clearing and before the consumer_head moving
>> forward.
>>
>> Move the r->queue[] clearing after consumer_head moving forward
>> to make __ptr_ring_empty() checking more reliable.
>
>
> If I understand this correctly, this can only happens if you run __ptr_ring_empty() in parallel with ptr_ring_discard_one().

Yes.

>
> I think those two needs to be serialized. Or did I miss anything?

As the below comment in __ptr_ring_discard_one, if the above is true, I
do not think we need to keep consumer_head valid at all times, right?


/* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
* to work correctly.
*/

>
> Thanks
>
>
>>
>> Signed-off-by: Yunsheng Lin <linyunsheng@xxxxxxxxxx>
>> ---
>> include/linux/ptr_ring.h | 26 +++++++++++++++++---------
>> 1 file changed, 17 insertions(+), 9 deletions(-)
>>
>> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
>> index 808f9d3..f32f052 100644
>> --- a/include/linux/ptr_ring.h
>> +++ b/include/linux/ptr_ring.h
>> @@ -261,8 +261,7 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
>> /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
>> * to work correctly.
>> */
>> - int consumer_head = r->consumer_head;
>> - int head = consumer_head++;
>> + int consumer_head = r->consumer_head + 1;
>> /* Once we have processed enough entries invalidate them in
>> * the ring all at once so producer can reuse their space in the ring.
>> @@ -271,19 +270,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
>> */
>> if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
>> consumer_head >= r->size)) {
>> + int tail = r->consumer_tail;
>> + int head = consumer_head;
>> +
>> + if (unlikely(consumer_head >= r->size)) {
>> + r->consumer_tail = 0;
>> + WRITE_ONCE(r->consumer_head, 0);
>> + } else {
>> + r->consumer_tail = consumer_head;
>> + WRITE_ONCE(r->consumer_head, consumer_head);
>> + }
>> +
>> /* Zero out entries in the reverse order: this way we touch the
>> * cache line that producer might currently be reading the last;
>> * producer won't make progress and touch other cache lines
>> * besides the first one until we write out all entries.
>> */
>> - while (likely(head >= r->consumer_tail))
>> - r->queue[head--] = NULL;
>> - r->consumer_tail = consumer_head;
>> - }
>> - if (unlikely(consumer_head >= r->size)) {
>> - consumer_head = 0;
>> - r->consumer_tail = 0;
>> + while (likely(--head >= tail))
>> + r->queue[head] = NULL;
>> +
>> + return;
>> }
>> +
>> /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
>> WRITE_ONCE(r->consumer_head, consumer_head);
>> }
>
>
> .
>