[RFC] locking: rwbase: Take care of ordering guarantee for fastpath reader

From: Boqun Feng
Date: Wed Sep 01 2021 - 11:07:50 EST


Readers of rwbase can lock and unlock without taking any inner lock, if
that happens, we need the ordering provided by atomic operations to
satisfy the ordering semantics of lock/unlock. Without that, considering
the follow case:

{ X = 0 initially }

CPU 0 CPU 1
===== =====
rt_write_lock();
X = 1
rt_write_unlock():
atomic_add(READER_BIAS - WRITER_BIAS, ->readers);
// ->readers is READER_BIAS.
rt_read_lock():
if ((r = atomic_read(->readers)) < 0) // True
atomic_try_cmpxchg(->readers, r, r + 1); // succeed.
<acquire the read lock via fast path>

r1 = X; // r1 may be 0, because nothing prevent the reordering
// of "X=1" and atomic_add() on CPU 1.

Therefore audit every usage of atomic operations that may happen in a
fast path, and add necessary barriers.

Signed-off-by: Boqun Feng <boqun.feng@xxxxxxxxx>
---
Hi Thomas and Peter,

Sorry I'm late for the party of PREEMPT_RT lock review. Just want to
point the problem with this patch. Not even compile test, but show the
idea and check if I'm missing something subtle.

Regards,
Boqun


kernel/locking/rwbase_rt.c | 34 ++++++++++++++++++++++++++++++----
1 file changed, 30 insertions(+), 4 deletions(-)

diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c
index 4ba15088e640..a1886fd8bde6 100644
--- a/kernel/locking/rwbase_rt.c
+++ b/kernel/locking/rwbase_rt.c
@@ -41,6 +41,12 @@
* The risk of writer starvation is there, but the pathological use cases
* which trigger it are not necessarily the typical RT workloads.
*
+ * Fast-path orderings:
+ * The lock/unlock of readers can run in fast paths: lock and unlock are only
+ * atomic ops, and there is no inner lock to provide ACQUIRE and RELEASE
+ * semantics of rwbase_rt. Atomic ops then should be stronger than _acquire()
+ * and _release() to provide necessary ordering guarantee.
+ *
* Common code shared between RT rw_semaphore and rwlock
*/

@@ -53,6 +59,7 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb)
* set.
*/
for (r = atomic_read(&rwb->readers); r < 0;) {
+ /* Fully-ordered if cmpxchg() succeeds, provides ACQUIRE */
if (likely(atomic_try_cmpxchg(&rwb->readers, &r, r + 1)))
return 1;
}
@@ -162,6 +169,8 @@ static __always_inline void rwbase_read_unlock(struct rwbase_rt *rwb,
/*
* rwb->readers can only hit 0 when a writer is waiting for the
* active readers to leave the critical section.
+ *
+ * dec_and_test() is fully ordered, provides RELEASE.
*/
if (unlikely(atomic_dec_and_test(&rwb->readers)))
__rwbase_read_unlock(rwb, state);
@@ -172,7 +181,11 @@ static inline void __rwbase_write_unlock(struct rwbase_rt *rwb, int bias,
{
struct rt_mutex_base *rtm = &rwb->rtmutex;

- atomic_add(READER_BIAS - bias, &rwb->readers);
+ /*
+ * _release() is needed in case that reader is in fast path, pairing
+ * with atomic_try_cmpxchg() in rwbase_read_trylock(), provides RELEASE
+ */
+ (void)atomic_add_return_release(READER_BIAS - bias, &rwb->readers);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
rwbase_rtmutex_unlock(rtm);
}
@@ -216,8 +229,14 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
*/
rwbase_set_and_save_current_state(state);

- /* Block until all readers have left the critical section. */
- for (; atomic_read(&rwb->readers);) {
+ /*
+ * Block until all readers have left the critical section.
+ *
+ * _acqurie() is needed in case that the reader side runs in the fast
+ * path, pairing with the atomic_dec_and_test() in rwbase_read_unlock(),
+ * provides ACQUIRE.
+ */
+ for (; atomic_read_acquire(&rwb->readers);) {
/* Optimized out for rwlocks */
if (rwbase_signal_pending_state(state, current)) {
__set_current_state(TASK_RUNNING);
@@ -229,6 +248,9 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
/*
* Schedule and wait for the readers to leave the critical
* section. The last reader leaving it wakes the waiter.
+ *
+ * _acquire() is not needed, because we can rely on the smp_mb()
+ * in set_current_state() to provide ACQUIRE.
*/
if (atomic_read(&rwb->readers) != 0)
rwbase_schedule();
@@ -253,7 +275,11 @@ static inline int rwbase_write_trylock(struct rwbase_rt *rwb)
atomic_sub(READER_BIAS, &rwb->readers);

raw_spin_lock_irqsave(&rtm->wait_lock, flags);
- if (!atomic_read(&rwb->readers)) {
+ /*
+ * _acquire() is needed in case reader is in the fast path, pairing with
+ * rwbase_read_unlock(), provides ACQUIRE.
+ */
+ if (!atomic_read_acquire(&rwb->readers)) {
atomic_set(&rwb->readers, WRITER_BIAS);
raw_spin_unlock_irqrestore(&rtm->wait_lock, flags);
return 1;
--
2.32.0