rcu: Fix synchronization for rcu_process_gp_end() uses of ->completed counter

Impose a clear locking design on the rcu_process_gp_end()
function's use of the ->completed counter.  This is done by
creating a ->completed field in the rcu_node structure, which
can safely be accessed under the protection of that structure's
lock.  Performance and scalability are maintained by using a
form of double-checked locking, so that rcu_process_gp_end()
only acquires the leaf rcu_node structure's ->lock if a grace
period has recently ended.

This fix reduces rcutorture failure rate by at least two orders
of magnitude under heavy stress with force_quiescent_state()
being invoked artificially often.  Without this fix,
unsynchronized access to the ->completed field can cause
rcu_process_gp_end() to advance callbacks whose grace period has
not yet expired.  (Bad idea!)

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: mathieu.desnoyers@polymtl.ca
Cc: josh@joshtriplett.org
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
Cc: Valdis.Kletnieks@vt.edu
Cc: dhowells@redhat.com
Cc: <stable@kernel.org> # .32.x
LKML-Reference: <12571987494069-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 26249ab..9e068d1 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -570,6 +570,76 @@
 }
 
 /*
+ * Advance this CPU's callbacks, but only if the current grace period
+ * has ended.  This may be called only from the CPU to whom the rdp
+ * belongs.  In addition, the corresponding leaf rcu_node structure's
+ * ->lock must be held by the caller, with irqs disabled.
+ */
+static void
+__rcu_process_gp_end(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)
+{
+	/* Did another grace period end? */
+	if (rdp->completed != rnp->completed) {
+
+		/* Advance callbacks.  No harm if list empty. */
+		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[RCU_WAIT_TAIL];
+		rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_READY_TAIL];
+		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+
+		/* Remember that we saw this grace-period completion. */
+		rdp->completed = rnp->completed;
+	}
+}
+
+/*
+ * Advance this CPU's callbacks, but only if the current grace period
+ * has ended.  This may be called only from the CPU to whom the rdp
+ * belongs.
+ */
+static void
+rcu_process_gp_end(struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	unsigned long flags;
+	struct rcu_node *rnp;
+
+	local_irq_save(flags);
+	rnp = rdp->mynode;
+	if (rdp->completed == ACCESS_ONCE(rnp->completed) || /* outside lock. */
+	    !spin_trylock(&rnp->lock)) { /* irqs already off, retry later. */
+		local_irq_restore(flags);
+		return;
+	}
+	__rcu_process_gp_end(rsp, rnp, rdp);
+	spin_unlock_irqrestore(&rnp->lock, flags);
+}
+
+/*
+ * Do per-CPU grace-period initialization for running CPU.  The caller
+ * must hold the lock of the leaf rcu_node structure corresponding to
+ * this CPU.
+ */
+static void
+rcu_start_gp_per_cpu(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)
+{
+	/* Prior grace period ended, so advance callbacks for current CPU. */
+	__rcu_process_gp_end(rsp, rnp, rdp);
+
+	/*
+	 * Because this CPU just now started the new grace period, we know
+	 * that all of its callbacks will be covered by this upcoming grace
+	 * period, even the ones that were registered arbitrarily recently.
+	 * Therefore, advance all outstanding callbacks to RCU_WAIT_TAIL.
+	 *
+	 * Other CPUs cannot be sure exactly when the grace period started.
+	 * Therefore, their recently registered callbacks must pass through
+	 * an additional RCU_NEXT_READY stage, so that they will be handled
+	 * by the next RCU grace period.
+	 */
+	rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+	rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+}
+
+/*
  * Start a new RCU grace period if warranted, re-initializing the hierarchy
  * in preparation for detecting the next grace period.  The caller must hold
  * the root node's ->lock, which is released before return.  Hard irqs must
@@ -596,26 +666,14 @@
 	dyntick_record_completed(rsp, rsp->completed - 1);
 	note_new_gpnum(rsp, rdp);
 
-	/*
-	 * Because this CPU just now started the new grace period, we know
-	 * that all of its callbacks will be covered by this upcoming grace
-	 * period, even the ones that were registered arbitrarily recently.
-	 * Therefore, advance all outstanding callbacks to RCU_WAIT_TAIL.
-	 *
-	 * Other CPUs cannot be sure exactly when the grace period started.
-	 * Therefore, their recently registered callbacks must pass through
-	 * an additional RCU_NEXT_READY stage, so that they will be handled
-	 * by the next RCU grace period.
-	 */
-	rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
-	rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
-
 	/* Special-case the common single-level case. */
 	if (NUM_RCU_NODES == 1) {
 		rcu_preempt_check_blocked_tasks(rnp);
 		rnp->qsmask = rnp->qsmaskinit;
 		rnp->gpnum = rsp->gpnum;
+		rnp->completed = rsp->completed;
 		rsp->signaled = RCU_SIGNAL_INIT; /* force_quiescent_state OK. */
+		rcu_start_gp_per_cpu(rsp, rnp, rdp);
 		spin_unlock_irqrestore(&rnp->lock, flags);
 		return;
 	}
@@ -648,6 +706,9 @@
 		rcu_preempt_check_blocked_tasks(rnp);
 		rnp->qsmask = rnp->qsmaskinit;
 		rnp->gpnum = rsp->gpnum;
+		rnp->completed = rsp->completed;
+		if (rnp == rdp->mynode)
+			rcu_start_gp_per_cpu(rsp, rnp, rdp);
 		spin_unlock(&rnp->lock);	/* irqs remain disabled. */
 	}
 
@@ -659,34 +720,6 @@
 }
 
 /*
- * Advance this CPU's callbacks, but only if the current grace period
- * has ended.  This may be called only from the CPU to whom the rdp
- * belongs.
- */
-static void
-rcu_process_gp_end(struct rcu_state *rsp, struct rcu_data *rdp)
-{
-	long completed_snap;
-	unsigned long flags;
-
-	local_irq_save(flags);
-	completed_snap = ACCESS_ONCE(rsp->completed);  /* outside of lock. */
-
-	/* Did another grace period end? */
-	if (rdp->completed != completed_snap) {
-
-		/* Advance callbacks.  No harm if list empty. */
-		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[RCU_WAIT_TAIL];
-		rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_READY_TAIL];
-		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
-
-		/* Remember that we saw this grace-period completion. */
-		rdp->completed = completed_snap;
-	}
-	local_irq_restore(flags);
-}
-
-/*
  * Clean up after the prior grace period and let rcu_start_gp() start up
  * the next grace period if one is needed.  Note that the caller must
  * hold rnp->lock, as required by rcu_start_gp(), which will release it.
@@ -697,7 +730,6 @@
 	WARN_ON_ONCE(!rcu_gp_in_progress(rsp));
 	rsp->completed = rsp->gpnum;
 	rsp->signaled = RCU_GP_IDLE;
-	rcu_process_gp_end(rsp, rsp->rda[smp_processor_id()]);
 	rcu_start_gp(rsp, flags);  /* releases root node's rnp->lock. */
 }
 
@@ -1539,21 +1571,16 @@
 rcu_init_percpu_data(int cpu, struct rcu_state *rsp, int preemptable)
 {
 	unsigned long flags;
-	long lastcomp;
 	unsigned long mask;
 	struct rcu_data *rdp = rsp->rda[cpu];
 	struct rcu_node *rnp = rcu_get_root(rsp);
 
 	/* Set up local state, ensuring consistent view of global state. */
 	spin_lock_irqsave(&rnp->lock, flags);
-	lastcomp = rsp->completed;
-	rdp->completed = lastcomp;
-	rdp->gpnum = lastcomp;
 	rdp->passed_quiesc = 0;  /* We could be racing with new GP, */
 	rdp->qs_pending = 1;	 /*  so set up to respond to current GP. */
 	rdp->beenonline = 1;	 /* We have now been online. */
 	rdp->preemptable = preemptable;
-	rdp->passed_quiesc_completed = lastcomp - 1;
 	rdp->qlen_last_fqs_check = 0;
 	rdp->n_force_qs_snap = rsp->n_force_qs;
 	rdp->blimit = blimit;
@@ -1575,6 +1602,11 @@
 		spin_lock(&rnp->lock);	/* irqs already disabled. */
 		rnp->qsmaskinit |= mask;
 		mask = rnp->grpmask;
+		if (rnp == rdp->mynode) {
+			rdp->gpnum = rnp->completed; /* if GP in progress... */
+			rdp->completed = rnp->completed;
+			rdp->passed_quiesc_completed = rnp->completed - 1;
+		}
 		spin_unlock(&rnp->lock); /* irqs already disabled. */
 		rnp = rnp->parent;
 	} while (rnp != NULL && !(rnp->qsmaskinit & mask));