Re: [RFC][PATCH 12/13] stop_machine: Remove lglock

From: Paul E. McKenney
Date: Wed Jun 24 2015 - 13:20:35 EST


On Wed, Jun 24, 2015 at 10:10:04AM -0700, Paul E. McKenney wrote:
> On Wed, Jun 24, 2015 at 06:42:00PM +0200, Peter Zijlstra wrote:
> > On Wed, Jun 24, 2015 at 09:09:04AM -0700, Paul E. McKenney wrote:

[ . . . ]

> > > It looks like I do need to use smp_call_function_single() and your
> > > resched_cpu() because calling stop_one_cpu() sequentially is about
> > > twice as slow as try_stop_cpus() in rcutorture runs of up to 16 CPUs.
> > > But either way, your point about not stopping all the CPUs does hold.
> >
> > Bah, I was afraid of that, the problem is that we wait for the
> > individual stop_work to complete before sending another.
> >
> > The below is getting a little out of hand, but should avoid the problem
> > and might be easier than getting the IPI think going, but who knows.
>
> OK, I will give this a try. Of course, the counter needs to be
> initialized to 1 rather than zero, and it needs to be atomically
> decremented after all stop_one_cpu_nowait() invocations, otherwise you
> can get an early wakeup due to the usual race conditions.

Except that I promised Ingo I would check for CPUs failing to schedule
quickly enough, which means that I must track them individually rather
than via a single counter...

You did have me going for a bit, though! ;-)

Thanx, Paul

> > ---
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -103,6 +103,7 @@ struct rcu_state sname##_state = { \
> > .orphan_nxttail = &sname##_state.orphan_nxtlist, \
> > .orphan_donetail = &sname##_state.orphan_donelist, \
> > .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
> > + .expedited_mutex = __MUTEX_INITIALIZER(sname##_state.expedited_mutex), \
> > .name = RCU_STATE_NAME(sname), \
> > .abbr = sabbr, \
> > }
> > @@ -3253,23 +3254,28 @@ void cond_synchronize_rcu(unsigned long
> > }
> > EXPORT_SYMBOL_GPL(cond_synchronize_rcu);
> >
> > +struct exp_stop_state {
> > + wait_queue_head_t *wq;
> > + atomic_t count;
> > +};
> > +
> > static int synchronize_sched_expedited_cpu_stop(void *data)
> > {
> > + struct exp_stop_state *ess = data;
> > +
> > /*
> > * There must be a full memory barrier on each affected CPU
> > * between the time that try_stop_cpus() is called and the
> > * time that it returns.
> > - *
> > - * In the current initial implementation of cpu_stop, the
> > - * above condition is already met when the control reaches
> > - * this point and the following smp_mb() is not strictly
> > - * necessary. Do smp_mb() anyway for documentation and
> > - * robustness against future implementation changes.
> > */
> > - smp_mb(); /* See above comment block. */
> > + if (atomic_dec_and_test(&ess->count))
> > + wake_up(ess->wq);
> > +
> > return 0;
> > }
> >
> > +static DEFINE_PER_CPU(struct cpu_stop_work, exp_stop_work);
> > +
> > /**
> > * synchronize_sched_expedited - Brute-force RCU-sched grace period
> > *
> > @@ -3304,12 +3310,11 @@ static int synchronize_sched_expedited_c
> > */
> > void synchronize_sched_expedited(void)
> > {
> > - cpumask_var_t cm;
> > - bool cma = false;
> > - int cpu;
> > - long firstsnap, s, snap;
> > - int trycount = 0;
> > + DECLARE_WAIT_QUEUE_HEAD_ONSTACK(stop_wait);
> > + struct exp_stop_state ess = { .wq = &stop_wait, };
> > struct rcu_state *rsp = &rcu_sched_state;
> > + long s, snap;
> > + int cpu;
> >
> > /*
> > * If we are in danger of counter wrap, just do synchronize_sched().
> > @@ -3332,7 +3337,6 @@ void synchronize_sched_expedited(void)
> > * full memory barrier.
> > */
> > snap = atomic_long_inc_return(&rsp->expedited_start);
> > - firstsnap = snap;
> > if (!try_get_online_cpus()) {
> > /* CPU hotplug operation in flight, fall back to normal GP. */
> > wait_rcu_gp(call_rcu_sched);
> > @@ -3341,82 +3345,44 @@ void synchronize_sched_expedited(void)
> > }
> > WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));
> >
> > - /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */
> > - cma = zalloc_cpumask_var(&cm, GFP_KERNEL);
> > - if (cma) {
> > - cpumask_copy(cm, cpu_online_mask);
> > - cpumask_clear_cpu(raw_smp_processor_id(), cm);
> > - for_each_cpu(cpu, cm) {
> > - struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
> > -
> > - if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
> > - cpumask_clear_cpu(cpu, cm);
> > - }
> > - if (cpumask_weight(cm) == 0)
> > - goto all_cpus_idle;
> > - }
> > -
> > /*
> > * Each pass through the following loop attempts to force a
> > * context switch on each CPU.
> > */
> > - while (try_stop_cpus(cma ? cm : cpu_online_mask,
> > - synchronize_sched_expedited_cpu_stop,
> > - NULL) == -EAGAIN) {
> > - put_online_cpus();
> > - atomic_long_inc(&rsp->expedited_tryfail);
> > + mutex_lock(&rsp->expedited_mutex);
> >
> > - /* Check to see if someone else did our work for us. */
> > - s = atomic_long_read(&rsp->expedited_done);
> > - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
> > - /* ensure test happens before caller kfree */
> > - smp_mb__before_atomic(); /* ^^^ */
> > - atomic_long_inc(&rsp->expedited_workdone1);
> > - free_cpumask_var(cm);
> > - return;
> > - }
> > + /*
> > + * Check to see if someone else did our work for us, while we were
> > + * waiting for the mutex.
> > + */
> > + s = atomic_long_read(&rsp->expedited_done);
> > + if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
> > + /* ensure test happens before caller kfree */
> > + smp_mb__before_atomic(); /* ^^^ */
> > + atomic_long_inc(&rsp->expedited_workdone1);
> > + goto unlock;
> > + }
> >
> > - /* No joy, try again later. Or just synchronize_sched(). */
> > - if (trycount++ < 10) {
> > - udelay(trycount * num_online_cpus());
> > - } else {
> > - wait_rcu_gp(call_rcu_sched);
> > - atomic_long_inc(&rsp->expedited_normal);
> > - free_cpumask_var(cm);
> > - return;
> > - }
> > + /* Stop each CPU that is online, non-idle, and not us. */
> > + for_each_online_cpu(cpu) {
> > + struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
> >
> > - /* Recheck to see if someone else did our work for us. */
> > - s = atomic_long_read(&rsp->expedited_done);
> > - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
> > - /* ensure test happens before caller kfree */
> > - smp_mb__before_atomic(); /* ^^^ */
> > - atomic_long_inc(&rsp->expedited_workdone2);
> > - free_cpumask_var(cm);
> > - return;
> > - }
> > + /* Skip our CPU, */
> > + if (raw_smp_processor_id() == cpu)
> > + continue;
> >
> > - /*
> > - * Refetching sync_sched_expedited_started allows later
> > - * callers to piggyback on our grace period. We retry
> > - * after they started, so our grace period works for them,
> > - * and they started after our first try, so their grace
> > - * period works for us.
> > - */
> > - if (!try_get_online_cpus()) {
> > - /* CPU hotplug operation in flight, use normal GP. */
> > - wait_rcu_gp(call_rcu_sched);
> > - atomic_long_inc(&rsp->expedited_normal);
> > - free_cpumask_var(cm);
> > - return;
> > - }
> > - snap = atomic_long_read(&rsp->expedited_start);
> > - smp_mb(); /* ensure read is before try_stop_cpus(). */
> > + /* and any idle CPUs. */
> > + if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
> > + continue;
> > +
> > + atomic_inc(&ess.count);
> > + stop_one_cpu_nowait(cpu, synchronize_sched_expedited_cpu_stop, ,
> > + &per_cpu(exp_stop_work, cpu));
> > }
> > - atomic_long_inc(&rsp->expedited_stoppedcpus);
> >
> > -all_cpus_idle:
> > - free_cpumask_var(cm);
> > + wait_event(ess.wq, !atomic_read(&ess.count));
> > +
> > + atomic_long_inc(&rsp->expedited_stoppedcpus);
> >
> > /*
> > * Everyone up to our most recent fetch is covered by our grace
> > @@ -3435,6 +3401,8 @@ void synchronize_sched_expedited(void)
> > }
> > } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
> > atomic_long_inc(&rsp->expedited_done_exit);
> > +unlock:
> > + mutex_unlock(&rsp->expedited_mutex);
> >
> > put_online_cpus();
> > }
> > --- a/kernel/rcu/tree.h
> > +++ b/kernel/rcu/tree.h
> > @@ -483,6 +483,7 @@ struct rcu_state {
> > /* _rcu_barrier(). */
> > /* End of fields guarded by barrier_mutex. */
> >
> > + struct mutex expedited_mutex; /* Serializes expediting. */
> > atomic_long_t expedited_start; /* Starting ticket. */
> > atomic_long_t expedited_done; /* Done ticket. */
> > atomic_long_t expedited_wrap; /* # near-wrap incidents. */
> >

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/