Re: [PATCH rcu/dev -fixes 1/4] rcu/tree: Keep kfree_rcu() awake during lock contention

From: Uladzislau Rezki
Date: Tue Apr 21 2020 - 09:12:47 EST


On Mon, Apr 20, 2020 at 11:38:34AM -0400, Joel Fernandes (Google) wrote:
> On PREEMPT_RT kernels, contending on the krcp spinlock can cause
> sleeping as on these kernels, the spinlock is converted to an rt-mutex.
> To prevent breakage of possible usage of kfree_rcu() now or in the
> future, make use of raw spinlocks which are not subject to such
> conversions.
>
> Vetting all code paths, there is no reason to believe that the raw
> spinlock will be held for long time so PREEMPT_RT should not suffer from
> lengthy acquirals of the lock.
>
> Cc: urezki@xxxxxxxxx
> Cc: bigeasy@xxxxxxxxxxxxx
> Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
> ---
> kernel/rcu/tree.c | 30 +++++++++++++++---------------
> 1 file changed, 15 insertions(+), 15 deletions(-)
>
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index f288477ee1c26..cf68d3d9f5b81 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2905,7 +2905,7 @@ struct kfree_rcu_cpu {
> struct kfree_rcu_bulk_data *bhead;
> struct kfree_rcu_bulk_data *bcached;
> struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
> - spinlock_t lock;
> + raw_spinlock_t lock;
> struct delayed_work monitor_work;
> bool monitor_todo;
> bool initialized;
> @@ -2939,12 +2939,12 @@ static void kfree_rcu_work(struct work_struct *work)
> krwp = container_of(to_rcu_work(work),
> struct kfree_rcu_cpu_work, rcu_work);
> krcp = krwp->krcp;
> - spin_lock_irqsave(&krcp->lock, flags);
> + raw_spin_lock_irqsave(&krcp->lock, flags);
> head = krwp->head_free;
> krwp->head_free = NULL;
> bhead = krwp->bhead_free;
> krwp->bhead_free = NULL;
> - spin_unlock_irqrestore(&krcp->lock, flags);
> + raw_spin_unlock_irqrestore(&krcp->lock, flags);
>
> /* "bhead" is now private, so traverse locklessly. */
> for (; bhead; bhead = bnext) {
> @@ -3047,14 +3047,14 @@ static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
> krcp->monitor_todo = false;
> if (queue_kfree_rcu_work(krcp)) {
> // Success! Our job is done here.
> - spin_unlock_irqrestore(&krcp->lock, flags);
> + raw_spin_unlock_irqrestore(&krcp->lock, flags);
> return;
> }
>
> // Previous RCU batch still in progress, try again later.
> krcp->monitor_todo = true;
> schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
> - spin_unlock_irqrestore(&krcp->lock, flags);
> + raw_spin_unlock_irqrestore(&krcp->lock, flags);
> }
>
> /*
> @@ -3067,11 +3067,11 @@ static void kfree_rcu_monitor(struct work_struct *work)
> struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
> monitor_work.work);
>
> - spin_lock_irqsave(&krcp->lock, flags);
> + raw_spin_lock_irqsave(&krcp->lock, flags);
> if (krcp->monitor_todo)
> kfree_rcu_drain_unlock(krcp, flags);
> else
> - spin_unlock_irqrestore(&krcp->lock, flags);
> + raw_spin_unlock_irqrestore(&krcp->lock, flags);
> }
>
> static inline bool
> @@ -3142,7 +3142,7 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> local_irq_save(flags); // For safely calling this_cpu_ptr().
> krcp = this_cpu_ptr(&krc);
> if (krcp->initialized)
> - spin_lock(&krcp->lock);
> + raw_spin_lock(&krcp->lock);
>
> // Queue the object but don't yet schedule the batch.
> if (debug_rcu_head_queue(head)) {
> @@ -3173,7 +3173,7 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
>
> unlock_return:
> if (krcp->initialized)
> - spin_unlock(&krcp->lock);
> + raw_spin_unlock(&krcp->lock);
> local_irq_restore(flags);
> }
> EXPORT_SYMBOL_GPL(kfree_call_rcu);
> @@ -3205,11 +3205,11 @@ kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
> struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
>
> count = krcp->count;
> - spin_lock_irqsave(&krcp->lock, flags);
> + raw_spin_lock_irqsave(&krcp->lock, flags);
> if (krcp->monitor_todo)
> kfree_rcu_drain_unlock(krcp, flags);
> else
> - spin_unlock_irqrestore(&krcp->lock, flags);
> + raw_spin_unlock_irqrestore(&krcp->lock, flags);
>
> sc->nr_to_scan -= count;
> freed += count;
> @@ -3236,15 +3236,15 @@ void __init kfree_rcu_scheduler_running(void)
> for_each_online_cpu(cpu) {
> struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
>
> - spin_lock_irqsave(&krcp->lock, flags);
> + raw_spin_lock_irqsave(&krcp->lock, flags);
> if (!krcp->head || krcp->monitor_todo) {
> - spin_unlock_irqrestore(&krcp->lock, flags);
> + raw_spin_unlock_irqrestore(&krcp->lock, flags);
> continue;
> }
> krcp->monitor_todo = true;
> schedule_delayed_work_on(cpu, &krcp->monitor_work,
> KFREE_DRAIN_JIFFIES);
> - spin_unlock_irqrestore(&krcp->lock, flags);
> + raw_spin_unlock_irqrestore(&krcp->lock, flags);
> }
> }
>
> @@ -4140,7 +4140,7 @@ static void __init kfree_rcu_batch_init(void)
> for_each_possible_cpu(cpu) {
> struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
>
> - spin_lock_init(&krcp->lock);
> + raw_spin_lock_init(&krcp->lock);
> for (i = 0; i < KFREE_N_BATCHES; i++) {
> INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
> krcp->krw_arr[i].krcp = krcp;
> --
> 2.26.1.301.g55bc3eb7cb9-goog
Reviewed-by: Uladzislau Rezki (Sony) <urezki@xxxxxxxxx>

If we decide to move the schedule_delayed_work() outside of the critical
section, i think, it would be better to submit separate patch with good
explanation why we do it.

--
Vlad Rezki
>