rcu: Rework synchronize_sched_expedited() counter handling

Now that synchronize_sched_expedited() have a mutex, it can use simpler
work-already-done detection scheme.  This commit simplifies this scheme
by using something similar to the sequence-locking counter scheme.
A counter is incremented before and after each grace period, so that
the counter is odd in the midst of the grace period and even otherwise.
So if the counter has advanced to the second even number that is
greater than or equal to the snapshot, the required grace period has
already happened.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index ae39a49..3c182fd 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3283,56 +3283,24 @@
  * restructure your code to batch your updates, and then use a single
  * synchronize_sched() instead.
  *
- * This implementation can be thought of as an application of ticket
- * locking to RCU, with sync_sched_expedited_started and
- * sync_sched_expedited_done taking on the roles of the halves
- * of the ticket-lock word.  Each task atomically increments
- * sync_sched_expedited_started upon entry, snapshotting the old value,
- * then attempts to stop all the CPUs.  If this succeeds, then each
- * CPU will have executed a context switch, resulting in an RCU-sched
- * grace period.  We are then done, so we use atomic_cmpxchg() to
- * update sync_sched_expedited_done to match our snapshot -- but
- * only if someone else has not already advanced past our snapshot.
- *
- * On the other hand, if try_stop_cpus() fails, we check the value
- * of sync_sched_expedited_done.  If it has advanced past our
- * initial snapshot, then someone else must have forced a grace period
- * some time after we took our snapshot.  In this case, our work is
- * done for us, and we can simply return.  Otherwise, we try again,
- * but keep our initial snapshot for purposes of checking for someone
- * doing our work for us.
- *
- * If we fail too many times in a row, we fall back to synchronize_sched().
+ * This implementation can be thought of as an application of sequence
+ * locking to expedited grace periods, but using the sequence counter to
+ * determine when someone else has already done the work instead of for
+ * retrying readers.  We do a mutex_trylock() polling loop, but if we fail
+ * too many times in a row, we fall back to synchronize_sched().
  */
 void synchronize_sched_expedited(void)
 {
 	int cpu;
-	long firstsnap, s, snap;
+	long s;
 	int trycount = 0;
 	struct rcu_state *rsp = &rcu_sched_state;
 
-	/*
-	 * If we are in danger of counter wrap, just do synchronize_sched().
-	 * By allowing sync_sched_expedited_started to advance no more than
-	 * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring
-	 * that more than 3.5 billion CPUs would be required to force a
-	 * counter wrap on a 32-bit system.  Quite a few more CPUs would of
-	 * course be required on a 64-bit system.
-	 */
-	if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start),
-			 (ulong)atomic_long_read(&rsp->expedited_done) +
-			 ULONG_MAX / 8)) {
-		wait_rcu_gp(call_rcu_sched);
-		atomic_long_inc(&rsp->expedited_wrap);
-		return;
-	}
+	/* Take a snapshot of the sequence number.  */
+	smp_mb(); /* Caller's modifications seen first by other CPUs. */
+	s = (READ_ONCE(rsp->expedited_sequence) + 3) & ~0x1;
+	smp_mb(); /* Above access must not bleed into critical section. */
 
-	/*
-	 * Take a ticket.  Note that atomic_inc_return() implies a
-	 * full memory barrier.
-	 */
-	snap = atomic_long_inc_return(&rsp->expedited_start);
-	firstsnap = snap;
 	if (!try_get_online_cpus()) {
 		/* CPU hotplug operation in flight, fall back to normal GP. */
 		wait_rcu_gp(call_rcu_sched);
@@ -3342,16 +3310,15 @@
 	WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));
 
 	/*
-	 * Each pass through the following loop attempts to force a
-	 * context switch on each CPU.
+	 * Each pass through the following loop attempts to acquire
+	 * ->expedited_mutex, checking for others doing our work each time.
 	 */
 	while (!mutex_trylock(&rsp->expedited_mutex)) {
 		put_online_cpus();
 		atomic_long_inc(&rsp->expedited_tryfail);
 
 		/* Check to see if someone else did our work for us. */
-		s = atomic_long_read(&rsp->expedited_done);
-		if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+		if (ULONG_CMP_GE(READ_ONCE(rsp->expedited_sequence), s)) {
 			/* ensure test happens before caller kfree */
 			smp_mb__before_atomic(); /* ^^^ */
 			atomic_long_inc(&rsp->expedited_workdone1);
@@ -3368,8 +3335,7 @@
 		}
 
 		/* Recheck to see if someone else did our work for us. */
-		s = atomic_long_read(&rsp->expedited_done);
-		if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+		if (ULONG_CMP_GE(READ_ONCE(rsp->expedited_sequence), s)) {
 			/* ensure test happens before caller kfree */
 			smp_mb__before_atomic(); /* ^^^ */
 			atomic_long_inc(&rsp->expedited_workdone2);
@@ -3389,10 +3355,20 @@
 			atomic_long_inc(&rsp->expedited_normal);
 			return;
 		}
-		snap = atomic_long_read(&rsp->expedited_start);
-		smp_mb(); /* ensure read is before try_stop_cpus(). */
 	}
 
+	/* Recheck yet again to see if someone else did our work for us. */
+	if (ULONG_CMP_GE(READ_ONCE(rsp->expedited_sequence), s)) {
+		rsp->expedited_workdone3++;
+		mutex_unlock(&rsp->expedited_mutex);
+		smp_mb(); /* ensure test happens before caller kfree */
+		return;
+	}
+
+	WRITE_ONCE(rsp->expedited_sequence, rsp->expedited_sequence + 1);
+	smp_mb(); /* Ensure expedited GP seen after counter increment. */
+	WARN_ON_ONCE(!(rsp->expedited_sequence & 0x1));
+
 	/* Stop each CPU that is online, non-idle, and not us. */
 	for_each_online_cpu(cpu) {
 		struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
@@ -3403,26 +3379,12 @@
 			continue;
 		stop_one_cpu(cpu, synchronize_sched_expedited_cpu_stop, NULL);
 	}
-	atomic_long_inc(&rsp->expedited_stoppedcpus);
 
-	/*
-	 * Everyone up to our most recent fetch is covered by our grace
-	 * period.  Update the counter, but only if our work is still
-	 * relevant -- which it won't be if someone who started later
-	 * than we did already did their update.
-	 */
-	do {
-		atomic_long_inc(&rsp->expedited_done_tries);
-		s = atomic_long_read(&rsp->expedited_done);
-		if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
-			/* ensure test happens before caller kfree */
-			smp_mb__before_atomic(); /* ^^^ */
-			atomic_long_inc(&rsp->expedited_done_lost);
-			break;
-		}
-	} while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
-	atomic_long_inc(&rsp->expedited_done_exit);
+	smp_mb(); /* Ensure expedited GP seen before counter increment. */
+	WRITE_ONCE(rsp->expedited_sequence, rsp->expedited_sequence + 1);
+	WARN_ON_ONCE(rsp->expedited_sequence & 0x1);
 	mutex_unlock(&rsp->expedited_mutex);
+	smp_mb(); /* ensure subsequent action seen after grace period. */
 
 	put_online_cpus();
 }
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 7c25fe4..6a2b741 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -481,17 +481,12 @@
 	/* End of fields guarded by barrier_mutex. */
 
 	struct mutex  expedited_mutex;		/* Serializes expediting. */
-	atomic_long_t expedited_start;		/* Starting ticket. */
-	atomic_long_t expedited_done;		/* Done ticket. */
-	atomic_long_t expedited_wrap;		/* # near-wrap incidents. */
+	unsigned long expedited_sequence;	/* Take a ticket. */
 	atomic_long_t expedited_tryfail;	/* # acquisition failures. */
 	atomic_long_t expedited_workdone1;	/* # done by others #1. */
 	atomic_long_t expedited_workdone2;	/* # done by others #2. */
+	unsigned long expedited_workdone3;	/* # done by others #3. */
 	atomic_long_t expedited_normal;		/* # fallbacks to normal. */
-	atomic_long_t expedited_stoppedcpus;	/* # successful stop_cpus. */
-	atomic_long_t expedited_done_tries;	/* # tries to update _done. */
-	atomic_long_t expedited_done_lost;	/* # times beaten to _done. */
-	atomic_long_t expedited_done_exit;	/* # times exited _done loop. */
 
 	unsigned long jiffies_force_qs;		/* Time at which to invoke */
 						/*  force_quiescent_state(). */
diff --git a/kernel/rcu/tree_trace.c b/kernel/rcu/tree_trace.c
index 3ea7ffc..a1ab3a5 100644
--- a/kernel/rcu/tree_trace.c
+++ b/kernel/rcu/tree_trace.c
@@ -185,18 +185,14 @@
 {
 	struct rcu_state *rsp = (struct rcu_state *)m->private;
 
-	seq_printf(m, "s=%lu d=%lu w=%lu tf=%lu wd1=%lu wd2=%lu n=%lu sc=%lu dt=%lu dl=%lu dx=%lu\n",
-		   atomic_long_read(&rsp->expedited_start),
-		   atomic_long_read(&rsp->expedited_done),
-		   atomic_long_read(&rsp->expedited_wrap),
+	seq_printf(m, "t=%lu tf=%lu wd1=%lu wd2=%lu wd3=%lu n=%lu sc=%lu\n",
+		   rsp->expedited_sequence,
 		   atomic_long_read(&rsp->expedited_tryfail),
 		   atomic_long_read(&rsp->expedited_workdone1),
 		   atomic_long_read(&rsp->expedited_workdone2),
+		   rsp->expedited_workdone3,
 		   atomic_long_read(&rsp->expedited_normal),
-		   atomic_long_read(&rsp->expedited_stoppedcpus),
-		   atomic_long_read(&rsp->expedited_done_tries),
-		   atomic_long_read(&rsp->expedited_done_lost),
-		   atomic_long_read(&rsp->expedited_done_exit));
+		   rsp->expedited_sequence / 2);
 	return 0;
 }