[RFC PATCH 1/5] Priority Sifting Reader-Writer Lock v13

From: Mathieu Desnoyers
Date: Mon Sep 08 2008 - 21:21:52 EST


Priority Sifting Reader-Writer Lock (psrwlock) excludes reader execution
contexts one at a time, thus increasing the writer priority in stages. It favors
writers against readers, but lets higher priority readers access the lock even
when there are subscribed writers waiting for the lock at a lower priority.
Very frequent writers could starve reader threads.


I used LTTng traces and eventually made a small patch to lockdep to detect
whenever a spinlock or a rwlock is used both with interrupts enabled and
disabled. Those sites are likely to produce very high latencies and should IMHO
be considered as bogus. The basic bogus scenario is to have a spinlock held on
CPU A with interrupts enabled being interrupted and then a softirq runs. On CPU
B, the same lock is acquired with interrupts off. We therefore disable
interrupts on CPU B for the duration of the softirq currently running on the CPU
A, which is really not something that helps keeping short latencies. My
preliminary results shows that there are a lot of inconsistent spinlock/rwlock
irq on/off uses in the kernel.

This kind of scenario is pretty easy to fix for spinlocks (either move
the interrupt disable within the spinlock section if the spinlock is
never used by an interrupt handler or make sure that every users has
interrupts disabled).

The problem comes with rwlocks : it is correct to have readers both with
and without irq disable, even when interrupt handlers use the read lock.
However, the write lock has to disable interrupt in that case, and we
suffer from the high latency I pointed out. The tasklist_lock is the
perfect example of this. In the following patch, I try to address this
issue.

TODO :
- Add writer-writer fairness using "tickets" instead of single-bit mutexes

- Add lockdep support
- Create a compatibility layer to make port of current rwlock easier
- Use priority barrel (shifting the reader priority bits in a loop to generalize
the number of reader priority up to a maximum of 64 with a u64).
- For -rt : support priority inheritance

Other name ideas (RFC) :
- Priority Sifting Reader-Writer Lock
- Staircase Reader-Writer Lock
- Staged Priority Elevation Reader-Writer Lock

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@xxxxxxxxxx>
CC: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Cc: "H. Peter Anvin" <hpa@xxxxxxxxx>
CC: Jeremy Fitzhardinge <jeremy@xxxxxxxx>
CC: Andrew Morton <akpm@xxxxxxxxxxxxxxxxxxxx>
CC: Ingo Molnar <mingo@xxxxxxx>
CC: "Paul E. McKenney" <paulmck@xxxxxxxxxxxxxxxxxx>
CC: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
CC: Joe Perches <joe@xxxxxxxxxxx>
CC: Wei Weng <wweng@xxxxxxxxxx>
---
include/linux/psrwlock-types.h | 92 ++++
include/linux/psrwlock.h | 384 ++++++++++++++++++
lib/Kconfig.debug | 3
lib/Makefile | 3
lib/psrwlock.c | 839 +++++++++++++++++++++++++++++++++++++++++
5 files changed, 1321 insertions(+)

Index: linux-2.6-lttng/include/linux/psrwlock.h
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux-2.6-lttng/include/linux/psrwlock.h 2008-09-08 20:29:11.000000000 -0400
@@ -0,0 +1,384 @@
+#ifndef _LINUX_PSRWLOCK_H
+#define _LINUX_PSRWLOCK_H
+
+/*
+ * Priority Sifting Reader-Writer Lock
+ *
+ * Priority Sifting Reader-Writer Lock (psrwlock) excludes reader execution
+ * contexts one at a time, thus increasing the writer priority in stages. It
+ * favors writers against reader threads, but lets higher priority readers in
+ * even when there are subscribed writers waiting for the lock at a given lower
+ * priority. Very frequent writers could starve reader threads.
+ *
+ * See psrwlock-types.h for types definitions.
+ * See psrwlock.c for algorithmic details.
+ *
+ * Mathieu Desnoyers <mathieu.desnoyers@xxxxxxxxxx>
+ * August 2008
+ */
+
+#include <linux/hardirq.h>
+#include <linux/wait.h>
+#include <linux/psrwlock-types.h>
+
+#include <asm/atomic.h>
+
+#define NR_PREEMPT_BUSY_LOOPS 100
+
+/*
+ * Uncontended word bits (32 bits)
+ *
+ * Because we deal with overflow by busy-looping waiting for the counter to
+ * decrement, make sure the maximum allowed for lower-priority execution
+ * contexts is lower than the maximum for higher priority execution contexts.
+ * Therefore, all contexts use the same counter bits, but they reach their
+ * overflow capacity one bit apart from each other (only used in the slow path).
+ *
+ * 3 bits for status
+ * 29 bits for reader count
+ * reserve 1 high bit for irqs
+ * reserve 1 high bit for bh
+ * reserve 1 high bit for non-preemptable threads
+ * 26 bits left for preemptable readers count
+ */
+#define UC_READER_MAX (1U << 29)
+#define UC_HARDIRQ_READER_MAX UC_READER_MAX
+#define UC_SOFTIRQ_READER_MAX (UC_HARDIRQ_READER_MAX >> 1)
+#define UC_NPTHREAD_READER_MAX (UC_SOFTIRQ_READER_MAX >> 1)
+#define UC_PTHREAD_READER_MAX (UC_NPTHREAD_READER_MAX >> 1)
+
+#define UC_WRITER (1U << 0)
+#define UC_SLOW_WRITER (1U << 1)
+#define UC_WQ_ACTIVE (1U << 2)
+#define UC_READER_OFFSET (1U << 3)
+#define UC_HARDIRQ_READER_MASK ((UC_HARDIRQ_READER_MAX - 1) * UC_READER_OFFSET)
+#define UC_SOFTIRQ_READER_MASK ((UC_SOFTIRQ_READER_MAX - 1) * UC_READER_OFFSET)
+#define UC_NPTHREAD_READER_MASK \
+ ((UC_NPTHREAD_READER_MAX - 1) * UC_READER_OFFSET)
+#define UC_PTHREAD_READER_MASK ((UC_PTHREAD_READER_MAX - 1) * UC_READER_OFFSET)
+#define UC_READER_MASK UC_HARDIRQ_READER_MASK
+
+
+/*
+ * Writers in slow path count and mutexes (32 bits)
+ *
+ * 1 bit for WS_WQ_MUTEX (wait queue mutex, always taken with irqs off)
+ * 1 bit for WS_COUNT_MUTEX (protects writer count and UC_SLOW_WRITER updates,
+ * taken in initial writer context).
+ * 1 bit for WS_LOCK_MUTEX (single writer in critical section)
+ * 29 bits for writer count.
+ */
+#define WS_WQ_MUTEX (1U << 0)
+#define WS_COUNT_MUTEX (1U << 1)
+#define WS_LOCK_MUTEX (1U << 2)
+
+#define WS_MAX (1U << 29)
+#define WS_OFFSET (1U << 3)
+#define WS_MASK ((WS_MAX - 1) * WS_OFFSET)
+
+
+/*
+ * Per-context slow path reader and writer count maximum, offset and mask.
+ * unsigned long type. Used to atomically detect that there is no contention in
+ * a given slow path context and subscribe a writer or let a reader take the
+ * slow path context lock.
+ */
+#define CTX_WOFFSET (1UL << 0)
+#define CTX_WMAX (1UL << (BITS_PER_LONG/2))
+#define CTX_WMASK ((CTX_WMAX - 1) * CTX_WOFFSET)
+
+#define CTX_ROFFSET CTX_WMAX
+#define CTX_RMAX (1UL << (BITS_PER_LONG/2))
+#define CTX_RMASK ((CTX_RMAX - 1) * CTX_ROFFSET)
+
+
+/*
+ * Internal slow paths.
+ */
+extern asmregparm
+void _psread_lock_slow_irq(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+int _psread_trylock_slow_irq(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+void _psread_lock_slow_bh(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+int _psread_trylock_slow_bh(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+void _psread_lock_slow_inatomic(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+int _psread_trylock_slow_inatomic(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+void _psread_lock_slow(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+int _psread_trylock_slow(unsigned int uc, psrwlock_t *rwlock);
+
+extern asmregparm
+void _pswrite_lock_slow(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+int _pswrite_trylock_slow(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+void _pswrite_unlock_slow(unsigned int uc, psrwlock_t *rwlock);
+extern asmregparm
+void _psrwlock_wakeup(unsigned int uc, psrwlock_t *rwlock);
+
+#ifdef CONFIG_HAVE_PSRWLOCK_ASM_CALL
+#include <asm/call_64.h>
+#else
+#define psread_lock_slow_irq _psread_lock_slow_irq
+#define psread_trylock_slow_irq _psread_trylock_slow_irq
+#define psread_lock_slow_bh _psread_lock_slow_bh
+#define psread_trylock_slow_bh _psread_trylock_slow_bh
+#define psread_lock_slow_inatomic _psread_lock_slow_inatomic
+#define psread_trylock_slow_inatomic _psread_trylock_slow_inatomic
+#define psread_lock_slow _psread_lock_slow
+#define psread_trylock_slow _psread_trylock_slow
+
+#define pswrite_lock_slow _pswrite_lock_slow
+#define pswrite_trylock_slow _pswrite_trylock_slow
+#define pswrite_unlock_slow _pswrite_unlock_slow
+#define psrwlock_wakeup _psrwlock_wakeup
+#endif
+
+/*
+ * psrwlock-specific latency tracing, maps to standard macros by default.
+ */
+#ifdef CONFIG_PSRWLOCK_LATENCY_TEST
+#include <linux/psrwlock-latency-trace.h>
+#else
+static inline void psrwlock_profile_latency_reset(void)
+{ }
+static inline void psrwlock_profile_latency_print(void)
+{ }
+
+#define psrwlock_irq_save(flags) local_irq_save(flags)
+#define psrwlock_irq_restore(flags) local_irq_restore(flags)
+#define psrwlock_irq_disable() local_irq_disable()
+#define psrwlock_irq_enable() local_irq_enable()
+#define psrwlock_bh_disable() local_bh_disable()
+#define psrwlock_bh_enable() local_bh_enable()
+#define psrwlock_bh_enable_ip(ip) local_bh_enable_ip(ip)
+#define psrwlock_preempt_disable() preempt_disable()
+#define psrwlock_preempt_enable() preempt_enable()
+#define psrwlock_preempt_enable_no_resched() preempt_enable_no_resched()
+#endif
+
+/*
+ * Internal preemption/softirq/irq disabling helpers. Optimized into simple use
+ * of standard local_irq_disable, local_bh_disable, preempt_disable by the
+ * compiler since wctx and rctx are constant.
+ */
+
+static inline void write_context_disable(enum psrw_prio wctx, u32 rctx)
+{
+ if (wctx != PSRW_PRIO_IRQ && (rctx & PSR_IRQ))
+ psrwlock_irq_disable();
+ else if (wctx != PSRW_PRIO_BH && (rctx & PSR_BH))
+ psrwlock_bh_disable();
+ else if (wctx != PSRW_PRIO_NP && (rctx & PSR_NPTHREAD))
+ psrwlock_preempt_disable();
+}
+
+static inline void write_context_enable(enum psrw_prio wctx, u32 rctx)
+{
+ if (wctx != PSRW_PRIO_IRQ && (rctx & PSR_IRQ))
+ psrwlock_irq_enable();
+ else if (wctx != PSRW_PRIO_BH && (rctx & PSR_BH))
+ psrwlock_bh_enable();
+ else if (wctx != PSRW_PRIO_NP && (rctx & PSR_NPTHREAD))
+ psrwlock_preempt_enable();
+}
+
+/*
+ * psrwlock_preempt_check must have a uc parameter read with a memory
+ * barrier making sure the slow path variable writes and the UC_WQ_ACTIVE flag
+ * read are done in this order (either a smp_mb() or a atomic_sub_return()).
+ */
+static inline void psrwlock_preempt_check(unsigned int uc,
+ psrwlock_t *rwlock)
+{
+ if (unlikely(uc & UC_WQ_ACTIVE))
+ psrwlock_wakeup(uc, rwlock);
+}
+
+
+/*
+ * API
+ */
+
+/* Reader lock */
+
+/*
+ * many readers, from irq/softirq/non preemptable and preemptable thread
+ * context. Protects against writers.
+ *
+ * Read lock fastpath :
+ *
+ * A cmpxchg is used here and _not_ a simple add because a lower-priority reader
+ * could block the writer while it is waiting for readers to clear the
+ * uncontended path. This would happen if, for instance, the reader gets
+ * interrupted between the add and the moment it gets to the slow path.
+ */
+
+/*
+ * Called from any context.
+ */
+static inline void psread_unlock(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_sub_return(UC_READER_OFFSET, &rwlock->uc);
+ psrwlock_preempt_check(uc, rwlock);
+}
+
+/*
+ * Called from interrupt disabled or interrupt context.
+ */
+static inline void psread_lock_irq(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return;
+ psread_lock_slow_irq(uc, rwlock);
+}
+
+static inline int psread_trylock_irq(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return 1;
+ return psread_trylock_slow_irq(uc, rwlock);
+}
+
+/*
+ * Called from softirq context.
+ */
+
+static inline void psread_lock_bh(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return;
+ psread_lock_slow_bh(uc, rwlock);
+}
+
+static inline int psread_trylock_bh(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return 1;
+ return psread_trylock_slow_bh(uc, rwlock);
+}
+
+
+/*
+ * Called from non-preemptable thread context.
+ */
+
+static inline void psread_lock_inatomic(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return;
+ psread_lock_slow_inatomic(uc, rwlock);
+}
+
+static inline int psread_trylock_inatomic(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return 1;
+ return psread_trylock_slow_inatomic(uc, rwlock);
+}
+
+
+/*
+ * Called from preemptable thread context.
+ */
+
+static inline void psread_lock(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return;
+ psread_lock_slow(uc, rwlock);
+}
+
+static inline int psread_trylock(psrwlock_t *rwlock)
+{
+ unsigned int uc = atomic_cmpxchg(&rwlock->uc, 0, UC_READER_OFFSET);
+ if (likely(!uc))
+ return 1;
+ return psread_trylock_slow(uc, rwlock);
+}
+
+
+/* Writer Lock */
+
+/*
+ * ctx is the context map showing which contexts can take the read lock and
+ * which context is using the write lock.
+ *
+ * Write lock use example, where the lock is used by readers in interrupt,
+ * preemptable context and non-preemptable context. The writer lock is taken in
+ * preemptable context.
+ *
+ * static DEFINE_PSRWLOCK(lock, PSRW_PRIO_P, PSR_IRQ | PSR_PTHREAD);
+ * CHECK_PSRWLOCK_MAP(lock, PSRW_PRIO_P, PSR_IRQ | PSR_PTHREAD);
+ *
+ * pswrite_lock(&lock, PSRW_PRIO_P, PSR_IRQ | PSR_PTHREAD);
+ * ...
+ * pswrite_unlock(&lock, PSRW_PRIO_P, PSR_IRQ | PSR_PTHREAD);
+ */
+static inline
+void pswrite_lock(psrwlock_t *rwlock, enum psrw_prio wctx, u32 rctx)
+{
+ unsigned int uc;
+
+ write_context_disable(wctx, rctx);
+ /* no other reader nor writer present, try to take the lock */
+ uc = atomic_cmpxchg(&rwlock->uc, 0, UC_WRITER);
+ if (likely(!uc))
+ return;
+ else
+ pswrite_lock_slow(uc, rwlock);
+}
+
+static inline
+int pswrite_trylock(psrwlock_t *rwlock, enum psrw_prio wctx, u32 rctx)
+{
+ unsigned int uc;
+
+ write_context_disable(wctx, rctx);
+ /* no other reader nor writer present, try to take the lock */
+ uc = atomic_cmpxchg(&rwlock->uc, 0, UC_WRITER);
+ if (likely(!uc))
+ return 1;
+ else
+ return pswrite_trylock_slow(uc, rwlock);
+}
+
+static inline
+void pswrite_unlock(psrwlock_t *rwlock, enum psrw_prio wctx, u32 rctx)
+{
+ unsigned int uc;
+
+ /*
+ * atomic_cmpxchg makes sure we commit the data before reenabling
+ * the lock. Will take the slow path if there are active readers, if
+ * UC_SLOW_WRITER is set or if there are threads in the wait queue.
+ */
+ uc = atomic_cmpxchg(&rwlock->uc, UC_WRITER, 0);
+ if (likely(uc == UC_WRITER)) {
+ write_context_enable(wctx, rctx);
+ /*
+ * no need to check preempt because all wait queue masks
+ * were 0. An active wait queue would trigger the slow path.
+ */
+ return;
+ }
+ /*
+ * Go through the slow unlock path to check if we must clear the
+ * UC_SLOW_WRITER bit.
+ */
+ pswrite_unlock_slow(uc, rwlock);
+}
+
+#endif /* _LINUX_PSRWLOCK_H */
Index: linux-2.6-lttng/lib/Makefile
===================================================================
--- linux-2.6-lttng.orig/lib/Makefile 2008-09-08 20:28:14.000000000 -0400
+++ linux-2.6-lttng/lib/Makefile 2008-09-08 20:29:11.000000000 -0400
@@ -43,6 +43,9 @@ obj-$(CONFIG_DEBUG_PREEMPT) += smp_proce
obj-$(CONFIG_DEBUG_LIST) += list_debug.o
obj-$(CONFIG_DEBUG_OBJECTS) += debugobjects.o

+obj-y += psrwlock.o
+obj-$(CONFIG_PSRWLOCK_LATENCY_TEST) += psrwlock-latency-trace.o
+
ifneq ($(CONFIG_HAVE_DEC_LOCK),y)
lib-y += dec_and_lock.o
endif
Index: linux-2.6-lttng/lib/psrwlock.c
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux-2.6-lttng/lib/psrwlock.c 2008-09-08 20:29:11.000000000 -0400
@@ -0,0 +1,839 @@
+/*
+ * Priority Sifting Reader-Writer Lock
+ *
+ * Priority Sifting Reader-Writer Lock (psrwlock) excludes reader execution
+ * contexts one at a time, thus increasing the writer priority in stages. It
+ * favors writers against reader threads, but lets higher priority readers in
+ * even when there are subscribed writers waiting for the lock at a given lower
+ * priority. Very frequent writers could starve reader threads.
+ *
+ * Copyright 2008 Mathieu Desnoyers <mathieu.desnoyers@xxxxxxxxxx>
+ */
+
+#include <linux/psrwlock.h>
+#include <linux/wait.h>
+#include <linux/freezer.h>
+#include <linux/module.h>
+
+#include <asm/processor.h>
+
+#ifdef WBIAS_RWLOCK_DEBUG
+#define printk_dbg printk
+#else
+#define printk_dbg(fmt, args...)
+#endif
+
+enum preempt_type {
+ PSRW_PREEMPT, /* preemptable */
+ PSRW_NON_PREEMPT, /* non-preemptable */
+};
+
+enum lock_type {
+ PSRW_READ,
+ PSRW_WRITE,
+};
+
+enum v_type {
+ V_INT,
+ V_LONG,
+};
+
+static void rwlock_wait(void *vptr, psrwlock_t *rwlock,
+ unsigned long mask, unsigned long test_mask,
+ unsigned long full_mask, int check_full_mask,
+ enum v_type vtype, enum lock_type ltype);
+
+/*
+ * Lock out a specific uncontended execution context from the read lock. Wait
+ * for the rmask (readers in previous context count) and for the writer count in
+ * the new context not to be full before proceeding to subscribe to the new
+ * write context.
+ */
+static int _pswrite_lock_ctx_wait_sub(void *v_inout,
+ void *vptr, psrwlock_t *rwlock,
+ unsigned long wait_mask, unsigned long test_mask,
+ unsigned long full_mask, long offset,
+ enum v_type vtype, enum lock_type ltype,
+ enum preempt_type ptype, int trylock)
+{
+ long try = NR_PREEMPT_BUSY_LOOPS;
+ unsigned long newv;
+ unsigned long v;
+
+ if (vtype == V_LONG)
+ v = *(unsigned long *)v_inout;
+ else
+ v = *(unsigned int *)v_inout;
+
+ printk_dbg("wait sub start v %lX, new %lX, wait_mask %lX, "
+ "test_mask %lX, full_mask %lX, offset %lX\n",
+ v, v + offset, wait_mask, test_mask, full_mask, offset);
+
+ for (;;) {
+ if (v & wait_mask || (v & test_mask) >= full_mask) {
+ if (trylock)
+ return 0;
+ if (ptype == PSRW_PREEMPT && unlikely(!(--try))) {
+ rwlock_wait(vptr, rwlock, wait_mask,
+ test_mask, full_mask, 1,
+ vtype, ltype);
+ try = NR_PREEMPT_BUSY_LOOPS;
+ } else
+ cpu_relax(); /* Order v reads */
+ if (vtype == V_LONG)
+ v = atomic_long_read((atomic_long_t *)vptr);
+ else
+ v = atomic_read((atomic_t *)vptr);
+ continue;
+ }
+ if (vtype == V_LONG)
+ newv = atomic_long_cmpxchg((atomic_long_t *)vptr,
+ v, v + offset);
+ else
+ newv = atomic_cmpxchg((atomic_t *)vptr,
+ (int)v, (int)v + (int)offset);
+ if (likely(newv == v))
+ break;
+ else {
+ if (trylock)
+ return 0;
+ v = newv;
+ }
+ }
+ printk_dbg("wait sub end v %lX, new %lX, wait_mask %lX, "
+ "test_mask %lX, full_mask %lX, offset %lX\n",
+ v, v + offset, wait_mask, test_mask, full_mask, offset);
+ /* cmpxchg orders memory reads and writes */
+ v += offset;
+ if (vtype == V_LONG)
+ *(unsigned long *)v_inout = v;
+ else
+ *(unsigned int *)v_inout = v;
+ return 1;
+}
+
+static int _pswrite_lock_ctx_wait(unsigned long v_in, void *vptr,
+ psrwlock_t *rwlock, unsigned long wait_mask,
+ enum v_type vtype, enum lock_type ltype,
+ enum preempt_type ptype, int trylock)
+{
+ int try = NR_PREEMPT_BUSY_LOOPS;
+ unsigned long v = v_in;
+
+ printk_dbg("wait start v %lX, wait_mask %lX\n", v, wait_mask);
+ /* order all read and write memory operations. */
+ smp_mb();
+ while (v & wait_mask) {
+ if (ptype == PSRW_PREEMPT && unlikely(!(--try))) {
+ if (trylock)
+ return 0;
+ rwlock_wait(vptr, rwlock, wait_mask, 0, 0, 0, vtype,
+ ltype);
+ try = NR_PREEMPT_BUSY_LOOPS;
+ } else
+ cpu_relax(); /* Order v reads */
+ if (vtype == V_LONG)
+ v = atomic_long_read((atomic_long_t *)vptr);
+ else
+ v = atomic_read((atomic_t *)vptr);
+ }
+ /* order all read and write memory operations. */
+ smp_mb();
+ printk_dbg("wait end v %lX, wait_mask %lX\n", v, wait_mask);
+
+ return 1;
+}
+
+/*
+ * Go into a wait queue.
+ *
+ * mask, v & full_mask == full_mask are the conditions for which we wait.
+ */
+static void rwlock_wait(void *vptr, psrwlock_t *rwlock,
+ unsigned long mask, unsigned long test_mask,
+ unsigned long full_mask, int check_full_mask,
+ enum v_type vtype, enum lock_type ltype)
+{
+ DECLARE_WAITQUEUE(psrwlock_wq, current);
+ unsigned long v;
+ int wq_active, ws;
+
+ /*
+ * Busy-loop waiting for the waitqueue mutex.
+ */
+ psrwlock_irq_disable();
+ ws = atomic_read(&rwlock->ws);
+ _pswrite_lock_ctx_wait_sub(&ws, &rwlock->ws, rwlock,
+ 0, WS_WQ_MUTEX, WS_WQ_MUTEX, WS_WQ_MUTEX,
+ V_INT, ltype, PSRW_NON_PREEMPT, 0);
+ /*
+ * Got the waitqueue mutex, get into the wait queue.
+ */
+ wq_active = waitqueue_active(&rwlock->wq_read)
+ || waitqueue_active(&rwlock->wq_write);
+ if (!wq_active)
+ atomic_add(UC_WQ_ACTIVE, &rwlock->uc);
+ /* Set the UC_WQ_ACTIVE flag before testing the condition. */
+ smp_mb();
+ /*
+ * Before we go to sleep, check that the lock we were expecting
+ * did not free between the moment we last checked for the lock and the
+ * moment we raised the UC_WQ_ACTIVE flag.
+ */
+ if (vtype == V_LONG)
+ v = atomic_long_read((atomic_long_t *)vptr);
+ else
+ v = atomic_read((atomic_t *)vptr);
+ if (unlikely(!(v & mask || (check_full_mask
+ && (v & test_mask) >= full_mask))))
+ goto skip_sleep;
+ /*
+ * Only one thread will be woken up at a time.
+ */
+ if (ltype == PSRW_WRITE)
+ add_wait_queue_exclusive_locked(&rwlock->wq_write,
+ &psrwlock_wq);
+ else
+ __add_wait_queue(&rwlock->wq_read, &psrwlock_wq);
+ __set_current_state(TASK_UNINTERRUPTIBLE);
+ smp_mb(); /* Insure memory ordering when clearing the mutex. */
+ atomic_sub(WS_WQ_MUTEX, &rwlock->ws);
+ psrwlock_irq_enable();
+
+ try_to_freeze();
+ schedule();
+
+ /*
+ * Woken up; Busy-loop waiting for the waitqueue mutex.
+ */
+ psrwlock_irq_disable();
+ ws = atomic_read(&rwlock->ws);
+ _pswrite_lock_ctx_wait_sub(&ws, &rwlock->ws, rwlock,
+ 0, WS_WQ_MUTEX, WS_WQ_MUTEX, WS_WQ_MUTEX,
+ V_INT, ltype, PSRW_NON_PREEMPT, 0);
+ __set_current_state(TASK_RUNNING);
+ if (ltype == PSRW_WRITE)
+ remove_wait_queue_locked(&rwlock->wq_write, &psrwlock_wq);
+ else
+ remove_wait_queue_locked(&rwlock->wq_read, &psrwlock_wq);
+skip_sleep:
+ wq_active = waitqueue_active(&rwlock->wq_read)
+ || waitqueue_active(&rwlock->wq_write);
+ if (!wq_active)
+ atomic_sub(UC_WQ_ACTIVE, &rwlock->uc);
+ smp_mb(); /* Insure memory ordering when clearing the mutex. */
+ atomic_sub(WS_WQ_MUTEX, &rwlock->ws);
+ psrwlock_irq_enable();
+}
+
+/*
+ * Reader lock
+ */
+
+/*
+ * _psread_lock_fast_check
+ *
+ * Second cmpxchg taken in case of many active readers.
+ * Will busy-loop if cmpxchg fails even in trylock mode.
+ *
+ * First try to get the uncontended lock. If it is non-zero (can be common,
+ * since we allow multiple readers), pass the returned cmpxchg v to the loop
+ * to try to get the reader lock.
+ *
+ * trylock will fail if a writer is subscribed or holds the lock, but will
+ * spin if there is concurency to win the cmpxchg. It could happen if, for
+ * instance, other concurrent reads need to update the roffset or if a
+ * writer updated the lock bits which does not contend us. Since many
+ * concurrent readers is a common case, it makes sense not to fail is it
+ * happens.
+ *
+ * the non-trylock case will spin for both situations.
+ *
+ * Busy-loop if the reader count is full.
+ */
+static int _psread_lock_fast_check(unsigned int uc, psrwlock_t *rwlock,
+ unsigned int uc_rmask)
+{
+ unsigned int newuc;
+
+ /*
+ * This is the second cmpxchg taken in case of many active readers.
+ */
+ while (likely(!(uc & (UC_SLOW_WRITER | UC_WRITER))
+ && (uc & UC_READER_MASK) < uc_rmask)) {
+ newuc = atomic_cmpxchg(&rwlock->uc, uc, uc + UC_READER_OFFSET);
+ if (likely(newuc == uc))
+ return 1;
+ else
+ uc = newuc;
+ }
+ return 0;
+}
+
+int __psread_lock_slow(psrwlock_t *rwlock,
+ unsigned int uc_rmask, atomic_long_t *vptr,
+ int trylock, enum preempt_type ptype)
+{
+ u32 rctx = rwlock->rctx_bitmap;
+ unsigned long v;
+ unsigned int uc;
+ int ret;
+
+ if (unlikely(in_irq() || irqs_disabled()))
+ WARN_ON_ONCE(!(rctx & PSR_IRQ) || ptype != PSRW_NON_PREEMPT);
+ else if (in_softirq())
+ WARN_ON_ONCE(!(rctx & PSR_BH) || ptype != PSRW_NON_PREEMPT);
+#ifdef CONFIG_PREEMPT
+ else if (in_atomic())
+ WARN_ON_ONCE(!(rctx & PSR_NPTHREAD)
+ || ptype != PSRW_NON_PREEMPT);
+ else
+ WARN_ON_ONCE(!(rctx & PSR_PTHREAD) || ptype != PSRW_PREEMPT);
+#else
+ else
+ WARN_ON_ONCE((!(rctx & PSR_NPTHREAD)
+ || ptype != PSRW_NON_PREEMPT)
+ && (!(rctx & PSR_PTHREAD)
+ || ptype != PSRW_PREEMPT));
+#endif
+
+ /*
+ * A cmpxchg read uc, which implies strict ordering.
+ */
+ v = atomic_long_read(vptr);
+ ret = _pswrite_lock_ctx_wait_sub(&v, vptr, rwlock,
+ CTX_WMASK, CTX_RMASK, CTX_RMASK, CTX_ROFFSET,
+ V_LONG, PSRW_READ, ptype, trylock);
+ if (unlikely(!ret))
+ goto fail;
+
+ /*
+ * We are in! Well, we just have to busy-loop waiting for any
+ * uncontended writer to release its lock.
+ *
+ * In this exact order :
+ * - increment the uncontended readers count.
+ * - decrement the current context reader count we just previously got.
+ *
+ * This makes sure we always count in either the slow path per context
+ * count or the uncontended reader count starting from the moment we got
+ * the slow path count to the moment we will release the uncontended
+ * reader count at the unlock.
+ *
+ * This implies a strict read/write ordering of these two variables.
+ * Reading first "uc" and then "v" is strictly required. The current
+ * reader count can be summed twice in the worse case, but we are only
+ * interested to know if there is _any_ reader left.
+ */
+ uc = atomic_read(&rwlock->uc);
+ ret = _pswrite_lock_ctx_wait_sub(&uc, &rwlock->uc, rwlock,
+ UC_WRITER, UC_READER_MASK, uc_rmask, UC_READER_OFFSET,
+ V_INT, PSRW_READ, ptype, trylock);
+ /*
+ * _pswrite_lock_ctx_wait_sub has a memory barrier
+ */
+ atomic_long_sub(CTX_ROFFSET, vptr);
+ /*
+ * don't care about v ordering wrt memory operations inside the
+ * read lock. It's uc which holds our read count.
+ */
+ if (unlikely(!ret))
+ goto fail_preempt;
+
+ /* Success */
+ return 1;
+
+ /* Failure */
+fail_preempt:
+ /* write v before reading uc */
+ smp_mb();
+ uc = atomic_read(&rwlock->uc);
+ psrwlock_preempt_check(uc, rwlock);
+fail:
+ cpu_relax();
+ return 0;
+
+}
+
+/*
+ * _psread_lock_slow : read lock slow path.
+ *
+ * Non-preemptable :
+ * Busy-wait for the specific context lock.
+ * Preemptable :
+ * Busy-wait for the specific context lock NR_PREEMPT_BUSY_LOOPS loops, and then
+ * go to the wait queue.
+ *
+ * _psread_trylock_slow : read trylock slow path.
+ *
+ * Try to get the read lock. Returns 1 if succeeds, else returns 0.
+ */
+
+asmregparm
+void _psread_lock_slow_irq(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_HARDIRQ_READER_MASK);
+ if (ret)
+ return;
+ __psread_lock_slow(rwlock, UC_HARDIRQ_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_IRQ],
+ 0, PSRW_NON_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_lock_slow_irq);
+
+asmregparm
+void _psread_lock_slow_bh(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_SOFTIRQ_READER_MASK);
+ if (ret)
+ return;
+ __psread_lock_slow(rwlock, UC_SOFTIRQ_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_BH],
+ 0, PSRW_NON_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_lock_slow_bh);
+
+asmregparm
+void _psread_lock_slow_inatomic(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_NPTHREAD_READER_MASK);
+ if (ret)
+ return;
+ __psread_lock_slow(rwlock, UC_NPTHREAD_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_NP],
+ 0, PSRW_NON_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_lock_slow_inatomic);
+
+asmregparm
+void _psread_lock_slow(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_PTHREAD_READER_MASK);
+ if (ret)
+ return;
+ __psread_lock_slow(rwlock, UC_PTHREAD_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_P],
+ 0, PSRW_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_lock_slow);
+
+asmregparm
+int _psread_trylock_slow_irq(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_HARDIRQ_READER_MASK);
+ if (ret)
+ return 1;
+ return __psread_lock_slow(rwlock, UC_HARDIRQ_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_IRQ],
+ 1, PSRW_NON_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_trylock_slow_irq);
+
+asmregparm
+int _psread_trylock_slow_bh(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_SOFTIRQ_READER_MASK);
+ if (ret)
+ return 1;
+ return __psread_lock_slow(rwlock, UC_SOFTIRQ_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_BH],
+ 1, PSRW_NON_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_trylock_slow_bh);
+
+asmregparm
+int _psread_trylock_slow_inatomic(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_NPTHREAD_READER_MASK);
+ if (ret)
+ return 1;
+ return __psread_lock_slow(rwlock, UC_NPTHREAD_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_NP],
+ 1, PSRW_NON_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_trylock_slow_inatomic);
+
+asmregparm
+int _psread_trylock_slow(unsigned int uc, psrwlock_t *rwlock)
+{
+ int ret;
+
+ ret = _psread_lock_fast_check(uc, rwlock, UC_PTHREAD_READER_MASK);
+ if (ret)
+ return 1;
+ return __psread_lock_slow(rwlock, UC_PTHREAD_READER_MASK,
+ &rwlock->prio[PSRW_PRIO_P],
+ 1, PSRW_PREEMPT);
+}
+EXPORT_SYMBOL(_psread_trylock_slow);
+
+
+/* Writer lock */
+
+static int _pswrite_lock_out_context(unsigned int *uc_inout,
+ atomic_long_t *vptr, psrwlock_t *rwlock,
+ enum preempt_type ptype, int trylock)
+{
+ int ret;
+ unsigned long v;
+
+ /* lock out read slow paths */
+ v = atomic_long_read(vptr);
+ ret = _pswrite_lock_ctx_wait_sub(&v, vptr, rwlock,
+ 0, CTX_WMASK, CTX_WMASK, CTX_WOFFSET,
+ V_LONG, PSRW_WRITE, ptype, trylock);
+ if (unlikely(!ret))
+ return 0;
+ /*
+ * continue when no reader threads left, but keep subscription, will be
+ * removed by next subscription.
+ */
+ ret = _pswrite_lock_ctx_wait(v, vptr, rwlock,
+ CTX_RMASK, V_LONG, PSRW_WRITE, ptype, trylock);
+ if (unlikely(!ret))
+ goto fail_clean_slow;
+ /* Wait for uncontended readers and writers to unlock */
+ *uc_inout = atomic_read(&rwlock->uc);
+ ret = _pswrite_lock_ctx_wait(*uc_inout, &rwlock->uc, rwlock,
+ UC_WRITER | UC_READER_MASK,
+ V_INT, PSRW_WRITE, ptype, trylock);
+ if (!ret)
+ goto fail_clean_slow;
+ return 1;
+
+fail_clean_slow:
+ atomic_long_sub(CTX_WOFFSET, vptr);
+ return 0;
+}
+
+static void writer_count_inc(unsigned int *uc, psrwlock_t *rwlock,
+ enum preempt_type ptype)
+{
+ unsigned int ws;
+
+ ws = atomic_read(&rwlock->ws);
+ /*
+ * Take the mutex and increment the writer count at once.
+ * Never fail.
+ */
+ _pswrite_lock_ctx_wait_sub(&ws, &rwlock->ws, rwlock,
+ WS_COUNT_MUTEX, WS_MASK, WS_MASK,
+ WS_COUNT_MUTEX + WS_OFFSET,
+ V_INT, PSRW_WRITE, ptype, 0);
+ /* First writer in slow path ? */
+ if ((ws & WS_MASK) == WS_OFFSET) {
+ atomic_add(UC_SLOW_WRITER, &rwlock->uc);
+ *uc += UC_SLOW_WRITER;
+ }
+ smp_mb(); /* serialize memory operations with mutex */
+ atomic_sub(WS_COUNT_MUTEX, &rwlock->ws);
+}
+
+static void writer_count_dec(unsigned int *uc, psrwlock_t *rwlock,
+ enum preempt_type ptype)
+{
+ unsigned int ws;
+
+ ws = atomic_read(&rwlock->ws);
+ /*
+ * Take the mutex and decrement the writer count at once.
+ * Never fail.
+ */
+ _pswrite_lock_ctx_wait_sub(&ws, &rwlock->ws, rwlock,
+ WS_COUNT_MUTEX, WS_COUNT_MUTEX, WS_COUNT_MUTEX,
+ WS_COUNT_MUTEX - WS_OFFSET,
+ V_INT, PSRW_WRITE, ptype, 0);
+ /* Last writer in slow path ? */
+ if (!(ws & WS_MASK)) {
+ atomic_sub(UC_SLOW_WRITER, &rwlock->uc);
+ *uc -= UC_SLOW_WRITER;
+ }
+ smp_mb(); /* serialize memory operations with mutex */
+ atomic_sub(WS_COUNT_MUTEX, &rwlock->ws);
+}
+
+static int __pswrite_lock_slow(unsigned int uc, psrwlock_t *rwlock,
+ int trylock)
+{
+ enum psrw_prio wctx = rwlock->wctx;
+ u32 rctx = rwlock->rctx_bitmap;
+ enum preempt_type ptype;
+ unsigned int ws;
+ int ret;
+
+ write_context_enable(wctx, rctx);
+
+ if (wctx == PSRW_PRIO_IRQ)
+ WARN_ON_ONCE(!in_irq() && !irqs_disabled());
+ else if (wctx == PSRW_PRIO_BH)
+ WARN_ON_ONCE(!in_softirq());
+#ifdef CONFIG_PREEMPT
+ else if (wctx == PSRW_PRIO_NP)
+ WARN_ON_ONCE(!in_atomic());
+#endif
+
+ /*
+ * We got here because the MAY_CONTEND bit is set in the uc bitmask. We
+ * are therefore contending with fast-path or other slow-path writers.
+ * A cmpxchg reads uc, which implies strict ordering.
+ */
+ if (wctx == PSRW_PRIO_P)
+ ptype = PSRW_PREEMPT;
+ else
+ ptype = PSRW_NON_PREEMPT;
+
+ /* Increment the slow path writer count */
+ writer_count_inc(&uc, rwlock, ptype);
+
+ if (rctx & PSR_PTHREAD) {
+ ptype = PSRW_PREEMPT;
+ ret = _pswrite_lock_out_context(&uc,
+ &rwlock->prio[PSRW_PRIO_P], rwlock, ptype, trylock);
+ if (unlikely(!ret))
+ goto fail_dec_count;
+ }
+
+ /*
+ * lock out non-preemptable threads.
+ */
+ if (rctx & PSR_NPTHREAD) {
+ if (wctx != PSRW_PRIO_NP)
+ psrwlock_preempt_disable();
+ ptype = PSRW_NON_PREEMPT;
+ ret = _pswrite_lock_out_context(&uc,
+ &rwlock->prio[PSRW_PRIO_NP], rwlock, ptype, trylock);
+ if (unlikely(!ret))
+ goto fail_unsub_pthread;
+ }
+
+ /* lock out softirqs */
+ if (rctx & PSR_BH) {
+ if (wctx != PSRW_PRIO_BH)
+ psrwlock_bh_disable();
+ ptype = PSRW_NON_PREEMPT;
+ ret = _pswrite_lock_out_context(&uc,
+ &rwlock->prio[PSRW_PRIO_BH], rwlock,
+ ptype, trylock);
+ if (unlikely(!ret))
+ goto fail_unsub_npthread;
+ }
+
+ /* lock out hardirqs */
+ if (rctx & PSR_IRQ) {
+ if (wctx != PSRW_PRIO_IRQ)
+ psrwlock_irq_disable();
+ ptype = PSRW_NON_PREEMPT;
+ ret = _pswrite_lock_out_context(&uc,
+ &rwlock->prio[PSRW_PRIO_IRQ], rwlock,
+ ptype, trylock);
+ if (unlikely(!ret))
+ goto fail_unsub_bh;
+ }
+
+ /*
+ * Finally, take the mutex.
+ */
+ if (rctx & (PSR_NPTHREAD | PSR_BH | PSR_IRQ))
+ ptype = PSRW_NON_PREEMPT;
+ else
+ ptype = PSRW_PREEMPT;
+ ws = atomic_read(&rwlock->ws);
+ ret = _pswrite_lock_ctx_wait_sub(&ws, &rwlock->ws, rwlock,
+ 0, WS_LOCK_MUTEX, WS_LOCK_MUTEX, WS_LOCK_MUTEX,
+ V_INT, PSRW_WRITE, ptype, trylock);
+ if (unlikely(!ret))
+ goto fail_unsub_irq;
+ /* atomic_cmpxchg orders writes */
+
+ return 1; /* success */
+
+ /* Failure paths */
+fail_unsub_irq:
+ if (rctx & PSR_IRQ)
+ atomic_long_sub(CTX_WOFFSET, &rwlock->prio[PSRW_PRIO_IRQ]);
+fail_unsub_bh:
+ if ((rctx & PSR_IRQ) && wctx != PSRW_PRIO_IRQ)
+ psrwlock_irq_enable();
+ if (rctx & PSR_BH)
+ atomic_long_sub(CTX_WOFFSET, &rwlock->prio[PSRW_PRIO_BH]);
+fail_unsub_npthread:
+ if ((rctx & PSR_BH) && wctx != PSRW_PRIO_BH)
+ psrwlock_bh_enable();
+ if (rctx & PSR_NPTHREAD)
+ atomic_long_sub(CTX_WOFFSET, &rwlock->prio[PSRW_PRIO_NP]);
+fail_unsub_pthread:
+ if ((rctx & PSR_NPTHREAD) && wctx != PSRW_PRIO_NP)
+ psrwlock_preempt_enable();
+ if (rctx & PSR_PTHREAD)
+ atomic_long_sub(CTX_WOFFSET, &rwlock->prio[PSRW_PRIO_P]);
+fail_dec_count:
+ if (wctx == PSRW_PRIO_P)
+ ptype = PSRW_PREEMPT;
+ else
+ ptype = PSRW_NON_PREEMPT;
+ writer_count_dec(&uc, rwlock, ptype);
+ psrwlock_preempt_check(uc, rwlock);
+ cpu_relax();
+ return 0;
+}
+
+/*
+ * _pswrite_lock_slow : Writer-biased rwlock write lock slow path.
+ *
+ * Locks out execution contexts one by one.
+ */
+asmregparm void _pswrite_lock_slow(unsigned int uc, psrwlock_t *rwlock)
+{
+ __pswrite_lock_slow(uc, rwlock, 0);
+}
+EXPORT_SYMBOL_GPL(_pswrite_lock_slow);
+
+/*
+ * _pswrite_trylock_slow : Try to take a write lock.
+ */
+asmregparm
+int _pswrite_trylock_slow(unsigned int uc, psrwlock_t *rwlock)
+{
+ return __pswrite_lock_slow(uc, rwlock, 1);
+}
+EXPORT_SYMBOL_GPL(_pswrite_trylock_slow);
+
+asmregparm
+void _pswrite_unlock_slow(unsigned int uc, psrwlock_t *rwlock)
+{
+ enum psrw_prio wctx = rwlock->wctx;
+ u32 rctx = rwlock->rctx_bitmap;
+ enum preempt_type ptype;
+
+ /*
+ * We get here either :
+ * - From the fast-path unlock, but a slow-path writer has set the
+ * UC_SLOW_WRITER bit.
+ * - still having the slowpath locks.
+ *
+ * We have to know if we must decrement the WS_OFFSET count.
+ *
+ * uc, received as parameter, was read by an atomic cmpxchg, which
+ * implies strict memory ordering. It orders memory accesses done within
+ * the critical section with the lock.
+ */
+ if (uc & UC_WRITER) {
+ uc = atomic_sub_return(UC_WRITER, &rwlock->uc);
+ write_context_enable(wctx, rctx);
+ psrwlock_preempt_check(uc, rwlock);
+ } else {
+ /*
+ * Release the slow path lock.
+ */
+ smp_mb(); /* insure memory order with lock mutex */
+ atomic_sub(WS_LOCK_MUTEX, &rwlock->ws);
+ if (rctx & PSR_IRQ) {
+ atomic_long_sub(CTX_WOFFSET,
+ &rwlock->prio[PSRW_PRIO_IRQ]);
+ if (wctx != PSRW_PRIO_IRQ)
+ psrwlock_irq_enable();
+ }
+ if (rctx & PSR_BH) {
+ atomic_long_sub(CTX_WOFFSET,
+ &rwlock->prio[PSRW_PRIO_BH]);
+ if (wctx != PSRW_PRIO_BH)
+ psrwlock_bh_enable();
+ }
+ if (rctx & PSR_NPTHREAD) {
+ atomic_long_sub(CTX_WOFFSET,
+ &rwlock->prio[PSRW_PRIO_NP]);
+ if (wctx != PSRW_PRIO_NP)
+ psrwlock_preempt_enable();
+ }
+ if (rctx & PSR_PTHREAD)
+ atomic_long_sub(CTX_WOFFSET,
+ &rwlock->prio[PSRW_PRIO_P]);
+
+ if (wctx == PSRW_PRIO_P)
+ ptype = PSRW_PREEMPT;
+ else
+ ptype = PSRW_NON_PREEMPT;
+ writer_count_dec(&uc, rwlock, ptype);
+ psrwlock_preempt_check(uc, rwlock);
+ }
+}
+EXPORT_SYMBOL_GPL(_pswrite_unlock_slow);
+
+/*
+ * _psrwlock_wakeup : Wake up tasks waiting for a write or read lock.
+ *
+ * Called from any context (irq/softirq/preempt/non-preempt). Contains a
+ * busy-loop; must therefore disable interrupts, but only for a short time.
+ */
+asmregparm void _psrwlock_wakeup(unsigned int uc, psrwlock_t *rwlock)
+{
+ unsigned long flags;
+ unsigned int ws;
+
+ /*
+ * Busy-loop waiting for the waitqueue mutex.
+ */
+ psrwlock_irq_save(flags);
+ /*
+ * Pass PSRW_READ since unused in PSRW_NON_PREEMPT.
+ */
+ ws = atomic_read(&rwlock->ws);
+ _pswrite_lock_ctx_wait_sub(&ws, &rwlock->ws, rwlock,
+ 0, WS_WQ_MUTEX, WS_WQ_MUTEX, WS_WQ_MUTEX,
+ V_INT, PSRW_READ, PSRW_NON_PREEMPT, 0);
+ /*
+ * If there is at least one non-preemptable writer subscribed or holding
+ * higher priority write masks, let it handle the wakeup when it exits
+ * its critical section which excludes any preemptable context anyway.
+ * The same applies to preemptable readers, which are the only ones
+ * which can cause a preemptable writer to sleep.
+ *
+ * The conditions here are all the states in which we are sure to reach
+ * a preempt check without blocking on the lock.
+ */
+ uc = atomic_read(&rwlock->uc);
+ if (!(uc & UC_WQ_ACTIVE) || uc & UC_READER_MASK
+ || (atomic_long_read(&rwlock->prio[PSRW_PRIO_IRQ])
+ & CTX_WMASK)
+ || (atomic_long_read(&rwlock->prio[PSRW_PRIO_BH])
+ & CTX_WMASK)
+ || (atomic_long_read(&rwlock->prio[PSRW_PRIO_NP])
+ & CTX_WMASK)) {
+ smp_mb(); /*
+ * Insure memory ordering when clearing the
+ * mutex.
+ */
+ atomic_sub(WS_WQ_MUTEX, &rwlock->ws);
+ psrwlock_irq_restore(flags);
+ return;
+ }
+
+ /*
+ * First do an exclusive wake-up of the first writer if there is one
+ * waiting, else wake-up the readers.
+ */
+ if (waitqueue_active(&rwlock->wq_write))
+ wake_up_locked(&rwlock->wq_write);
+ else
+ wake_up_locked(&rwlock->wq_read);
+ smp_mb(); /*
+ * Insure global memory order when clearing the mutex.
+ */
+ atomic_sub(WS_WQ_MUTEX, &rwlock->ws);
+ psrwlock_irq_restore(flags);
+}
+EXPORT_SYMBOL_GPL(_psrwlock_wakeup);
Index: linux-2.6-lttng/lib/Kconfig.debug
===================================================================
--- linux-2.6-lttng.orig/lib/Kconfig.debug 2008-09-08 20:28:14.000000000 -0400
+++ linux-2.6-lttng/lib/Kconfig.debug 2008-09-08 20:29:11.000000000 -0400
@@ -680,6 +680,9 @@ config FAULT_INJECTION_STACKTRACE_FILTER
help
Provide stacktrace filter for fault-injection capabilities

+config HAVE_PSRWLOCK_ASM_CALL
+ def_bool n
+
config LATENCYTOP
bool "Latency measuring infrastructure"
select FRAME_POINTER if !MIPS
Index: linux-2.6-lttng/include/linux/psrwlock-types.h
===================================================================
--- /dev/null 1970-01-01 00:00:00.000000000 +0000
+++ linux-2.6-lttng/include/linux/psrwlock-types.h 2008-09-08 20:29:11.000000000 -0400
@@ -0,0 +1,92 @@
+#ifndef _LINUX_PSRWLOCK_TYPES_H
+#define _LINUX_PSRWLOCK_TYPES_H
+
+/*
+ * Priority Sifting Reader-Writer Lock types definition
+ *
+ * Mathieu Desnoyers <mathieu.desnoyers@xxxxxxxxxx>
+ * August 2008
+ */
+
+#include <linux/wait.h>
+#include <asm/atomic.h>
+
+/*
+ * This table represents which is the lowest read priority context can be used
+ * given the highest read priority context and the context in which the write
+ * lock is taken.
+ *
+ * e.g. given the highest priority context from which we take the read lock is
+ * interrupt context (IRQ) and the context where the write lock is taken is
+ * non-preemptable (NP), we should never have a reader in context lower than
+ * NP.
+ *
+ * X means : don't !
+ *
+ * X axis : Priority of writer
+ * Y axis : Max priority of reader
+ * Maps to : Minimum priority of a reader.
+ *
+ * Highest Read Prio / Write Prio | P NP BH IRQ
+ * ------------------------------------------------------------------------
+ * P | P X X X
+ * NP | P NP X X
+ * BH | P NP BH X
+ * IRQ | P NP BH IRQ
+ *
+ * This table is verified by the CHECK_PSRWLOCK_MAP macro.
+ */
+
+enum psrw_prio {
+ PSRW_PRIO_P,
+ PSRW_PRIO_NP,
+ PSRW_PRIO_BH,
+ PSRW_PRIO_IRQ,
+ PSRW_NR_PRIO,
+};
+
+/*
+ * Possible execution contexts for readers.
+ */
+#define PSR_PTHREAD (1U << PSRW_PRIO_P)
+#define PSR_NPTHREAD (1U << PSRW_PRIO_NP)
+#define PSR_BH (1U << PSRW_PRIO_BH)
+#define PSR_IRQ (1U << PSRW_PRIO_IRQ)
+#define PSR_NR PSRW_NR_PRIO
+#define PSR_MASK (PSR_PTHREAD | PSR_NPTHREAD | PSR_BH | PSR_IRQ)
+
+typedef struct psrwlock {
+ atomic_t uc; /* Uncontended word */
+ atomic_t ws; /* Writers in the slow path count */
+ atomic_long_t prio[PSRW_NR_PRIO]; /* Per priority slow path counts */
+ u32 rctx_bitmap; /* Allowed read execution ctx */
+ enum psrw_prio wctx; /* Allowed write execution ctx */
+ wait_queue_head_t wq_read; /* Preemptable readers wait queue */
+ wait_queue_head_t wq_write; /* Preemptable writers wait queue */
+} psrwlock_t;
+
+#define __PSRWLOCK_UNLOCKED(x, _wctx, _rctx) \
+ { \
+ .uc = { 0 }, \
+ .ws = { 0 }, \
+ .prio[0 ... (PSRW_NR_PRIO - 1)] = { 0 }, \
+ .rctx_bitmap = (_rctx), \
+ .wctx = (_wctx), \
+ .wq_read = __WAIT_QUEUE_HEAD_INITIALIZER((x).wq_read), \
+ .wq_write = __WAIT_QUEUE_HEAD_INITIALIZER((x).wq_write),\
+ }
+
+#define DEFINE_PSRWLOCK(x, wctx, rctx) \
+ psrwlock_t x = __PSRWLOCK_UNLOCKED(x, wctx, rctx)
+
+/*
+ * Statically check that no reader with priority lower than the writer is
+ * possible.
+ */
+#define CHECK_PSRWLOCK_MAP(x, wctx, rctx) \
+ static inline void __psrwlock_bad_context_map_##x(void) \
+ { \
+ BUILD_BUG_ON((~(~0UL << (wctx))) & (rctx)); \
+ }
+
+#endif /* _LINUX_PSRWLOCK_TYPES_H */

--
Mathieu Desnoyers
Computer Engineering Ph.D. Student, Ecole Polytechnique de Montreal
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/