rcu: Merge preemptable-RCU functionality into hierarchical RCU

Create a kernel/rcutree_plugin.h file that contains definitions
for preemptable RCU (or, under the #else branch of the #ifdef,
empty definitions for the classic non-preemptable semantics).
These definitions fit into plugins defined in kernel/rcutree.c
for this purpose.

This variant of preemptable RCU uses a new algorithm whose
read-side expense is roughly that of classic hierarchical RCU
under CONFIG_PREEMPT. This new algorithm's update-side expense
is similar to that of classic hierarchical RCU, and, in absence
of read-side preemption or blocking, is exactly that of classic
hierarchical RCU.  Perhaps more important, this new algorithm
has a much simpler implementation, saving well over 1,000 lines
of code compared to mainline's implementation of preemptable
RCU, which will hopefully be retired in favor of this new
algorithm.

The simplifications are obtained by maintaining per-task
nesting state for running tasks, and using a simple
lock-protected algorithm to handle accounting when tasks block
within RCU read-side critical sections, making use of lessons
learned while creating numerous user-level RCU implementations
over the past 18 months.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Cc: laijs@cn.fujitsu.com
Cc: dipankar@in.ibm.com
Cc: akpm@linux-foundation.org
Cc: mathieu.desnoyers@polymtl.ca
Cc: josht@linux.vnet.ibm.com
Cc: dvhltc@us.ibm.com
Cc: niv@us.ibm.com
Cc: peterz@infradead.org
Cc: rostedt@goodmis.org
LKML-Reference: <12509746134003-git-send-email->
Signed-off-by: Ingo Molnar <mingo@elte.hu>
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 4ce3adc..cc02557 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -80,6 +80,21 @@
 struct rcu_state rcu_bh_state = RCU_STATE_INITIALIZER(rcu_bh_state);
 DEFINE_PER_CPU(struct rcu_data, rcu_bh_data);
 
+extern long rcu_batches_completed_sched(void);
+static void cpu_quiet_msk(unsigned long mask, struct rcu_state *rsp,
+			  struct rcu_node *rnp, unsigned long flags);
+static void cpu_quiet_msk_finish(struct rcu_state *rsp, unsigned long flags);
+static void __rcu_process_callbacks(struct rcu_state *rsp,
+				    struct rcu_data *rdp);
+static void __call_rcu(struct rcu_head *head,
+		       void (*func)(struct rcu_head *rcu),
+		       struct rcu_state *rsp);
+static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp);
+static void __cpuinit rcu_init_percpu_data(int cpu, struct rcu_state *rsp,
+					   int preemptable);
+
+#include "rcutree_plugin.h"
+
 /*
  * Note a quiescent state.  Because we do not need to know
  * how many quiescent states passed, just if there was at least
@@ -87,16 +102,27 @@
  */
 void rcu_sched_qs(int cpu)
 {
-	struct rcu_data *rdp = &per_cpu(rcu_sched_data, cpu);
+	unsigned long flags;
+	struct rcu_data *rdp;
+
+	local_irq_save(flags);
+	rdp = &per_cpu(rcu_sched_data, cpu);
 	rdp->passed_quiesc = 1;
 	rdp->passed_quiesc_completed = rdp->completed;
+	rcu_preempt_qs(cpu);
+	local_irq_restore(flags);
 }
 
 void rcu_bh_qs(int cpu)
 {
-	struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
+	unsigned long flags;
+	struct rcu_data *rdp;
+
+	local_irq_save(flags);
+	rdp = &per_cpu(rcu_bh_data, cpu);
 	rdp->passed_quiesc = 1;
 	rdp->passed_quiesc_completed = rdp->completed;
+	local_irq_restore(flags);
 }
 
 #ifdef CONFIG_NO_HZ
@@ -123,16 +149,6 @@
 EXPORT_SYMBOL_GPL(rcu_batches_completed_sched);
 
 /*
- * Return the number of RCU batches processed thus far for debug & stats.
- * @@@ placeholder, maps to rcu_batches_completed_sched().
- */
-long rcu_batches_completed(void)
-{
-	return rcu_batches_completed_sched();
-}
-EXPORT_SYMBOL_GPL(rcu_batches_completed);
-
-/*
  * Return the number of RCU BH batches processed thus far for debug & stats.
  */
 long rcu_batches_completed_bh(void)
@@ -193,6 +209,10 @@
 		return 1;
 	}
 
+	/* If preemptable RCU, no point in sending reschedule IPI. */
+	if (rdp->preemptable)
+		return 0;
+
 	/* The CPU is online, so send it a reschedule IPI. */
 	if (rdp->cpu != smp_processor_id())
 		smp_send_reschedule(rdp->cpu);
@@ -473,6 +493,7 @@
 
 	printk(KERN_ERR "INFO: RCU detected CPU stalls:");
 	for (; rnp_cur < rnp_end; rnp_cur++) {
+		rcu_print_task_stall(rnp);
 		if (rnp_cur->qsmask == 0)
 			continue;
 		for (cpu = 0; cpu <= rnp_cur->grphi - rnp_cur->grplo; cpu++)
@@ -686,6 +707,19 @@
 }
 
 /*
+ * Clean up after the prior grace period and let rcu_start_gp() start up
+ * the next grace period if one is needed.  Note that the caller must
+ * hold rnp->lock, as required by rcu_start_gp(), which will release it.
+ */
+static void cpu_quiet_msk_finish(struct rcu_state *rsp, unsigned long flags)
+	__releases(rnp->lock)
+{
+	rsp->completed = rsp->gpnum;
+	rcu_process_gp_end(rsp, rsp->rda[smp_processor_id()]);
+	rcu_start_gp(rsp, flags);  /* releases root node's rnp->lock. */
+}
+
+/*
  * Similar to cpu_quiet(), for which it is a helper function.  Allows
  * a group of CPUs to be quieted at one go, though all the CPUs in the
  * group must be represented by the same leaf rcu_node structure.
@@ -706,7 +740,7 @@
 			return;
 		}
 		rnp->qsmask &= ~mask;
-		if (rnp->qsmask != 0) {
+		if (rnp->qsmask != 0 || rcu_preempted_readers(rnp)) {
 
 			/* Other bits still set at this level, so done. */
 			spin_unlock_irqrestore(&rnp->lock, flags);
@@ -726,14 +760,10 @@
 
 	/*
 	 * Get here if we are the last CPU to pass through a quiescent
-	 * state for this grace period.  Clean up and let rcu_start_gp()
-	 * start up the next grace period if one is needed.  Note that
-	 * we still hold rnp->lock, as required by rcu_start_gp(), which
-	 * will release it.
+	 * state for this grace period.  Invoke cpu_quiet_msk_finish()
+	 * to clean up and start the next grace period if one is needed.
 	 */
-	rsp->completed = rsp->gpnum;
-	rcu_process_gp_end(rsp, rsp->rda[smp_processor_id()]);
-	rcu_start_gp(rsp, flags);  /* releases rnp->lock. */
+	cpu_quiet_msk_finish(rsp, flags); /* releases rnp->lock. */
 }
 
 /*
@@ -840,11 +870,11 @@
 		spin_lock(&rnp->lock);		/* irqs already disabled. */
 		rnp->qsmaskinit &= ~mask;
 		if (rnp->qsmaskinit != 0) {
-			spin_unlock(&rnp->lock); /* irqs already disabled. */
+			spin_unlock(&rnp->lock); /* irqs remain disabled. */
 			break;
 		}
 		mask = rnp->grpmask;
-		spin_unlock(&rnp->lock);	/* irqs already disabled. */
+		spin_unlock(&rnp->lock);	/* irqs remain disabled. */
 		rnp = rnp->parent;
 	} while (rnp != NULL);
 	lastcomp = rsp->completed;
@@ -1007,6 +1037,7 @@
 
 		rcu_bh_qs(cpu);
 	}
+	rcu_preempt_check_callbacks(cpu);
 	raise_softirq(RCU_SOFTIRQ);
 }
 
@@ -1188,6 +1219,7 @@
 	__rcu_process_callbacks(&rcu_sched_state,
 				&__get_cpu_var(rcu_sched_data));
 	__rcu_process_callbacks(&rcu_bh_state, &__get_cpu_var(rcu_bh_data));
+	rcu_preempt_process_callbacks();
 
 	/*
 	 * Memory references from any later RCU read-side critical sections
@@ -1252,17 +1284,6 @@
 EXPORT_SYMBOL_GPL(call_rcu_sched);
 
 /*
- * @@@ Queue an RCU callback for invocation after a grace period.
- * @@@ Placeholder pending rcutree_plugin.h.
- */
-void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
-{
-	call_rcu_sched(head, func);
-}
-EXPORT_SYMBOL_GPL(call_rcu);
-
-
-/*
  * Queue an RCU for invocation after a quicker grace period.
  */
 void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
@@ -1335,7 +1356,8 @@
 static int rcu_pending(int cpu)
 {
 	return __rcu_pending(&rcu_sched_state, &per_cpu(rcu_sched_data, cpu)) ||
-	       __rcu_pending(&rcu_bh_state, &per_cpu(rcu_bh_data, cpu));
+	       __rcu_pending(&rcu_bh_state, &per_cpu(rcu_bh_data, cpu)) ||
+	       rcu_preempt_pending(cpu);
 }
 
 /*
@@ -1348,7 +1370,8 @@
 {
 	/* RCU callbacks either ready or pending? */
 	return per_cpu(rcu_sched_data, cpu).nxtlist ||
-	       per_cpu(rcu_bh_data, cpu).nxtlist;
+	       per_cpu(rcu_bh_data, cpu).nxtlist ||
+	       rcu_preempt_needs_cpu(cpu);
 }
 
 /*
@@ -1383,7 +1406,7 @@
  * that this CPU cannot possibly have any RCU callbacks in flight yet.
  */
 static void __cpuinit
-rcu_init_percpu_data(int cpu, struct rcu_state *rsp)
+rcu_init_percpu_data(int cpu, struct rcu_state *rsp, int preemptable)
 {
 	unsigned long flags;
 	long lastcomp;
@@ -1399,6 +1422,7 @@
 	rdp->passed_quiesc = 0;  /* We could be racing with new GP, */
 	rdp->qs_pending = 1;	 /*  so set up to respond to current GP. */
 	rdp->beenonline = 1;	 /* We have now been online. */
+	rdp->preemptable = preemptable;
 	rdp->passed_quiesc_completed = lastcomp - 1;
 	rdp->blimit = blimit;
 	spin_unlock(&rnp->lock);		/* irqs remain disabled. */
@@ -1441,12 +1465,13 @@
 
 static void __cpuinit rcu_online_cpu(int cpu)
 {
-	rcu_init_percpu_data(cpu, &rcu_sched_state);
-	rcu_init_percpu_data(cpu, &rcu_bh_state);
+	rcu_init_percpu_data(cpu, &rcu_sched_state, 0);
+	rcu_init_percpu_data(cpu, &rcu_bh_state, 0);
+	rcu_preempt_init_percpu_data(cpu);
 }
 
 /*
- * Handle CPU online/offline notifcation events.
+ * Handle CPU online/offline notification events.
  */
 int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 			     unsigned long action, void *hcpu)
@@ -1521,6 +1546,7 @@
 		rnp = rsp->level[i];
 		for (j = 0; j < rsp->levelcnt[i]; j++, rnp++) {
 			spin_lock_init(&rnp->lock);
+			rnp->gpnum = 0;
 			rnp->qsmask = 0;
 			rnp->qsmaskinit = 0;
 			rnp->grplo = j * cpustride;
@@ -1538,13 +1564,16 @@
 					      j / rsp->levelspread[i - 1];
 			}
 			rnp->level = i;
+			INIT_LIST_HEAD(&rnp->blocked_tasks[0]);
+			INIT_LIST_HEAD(&rnp->blocked_tasks[1]);
 		}
 	}
 }
 
 /*
- * Helper macro for __rcu_init().  To be used nowhere else!
- * Assigns leaf node pointers into each CPU's rcu_data structure.
+ * Helper macro for __rcu_init() and __rcu_init_preempt().  To be used
+ * nowhere else!  Assigns leaf node pointers into each CPU's rcu_data
+ * structure.
  */
 #define RCU_INIT_FLAVOR(rsp, rcu_data) \
 do { \
@@ -1560,18 +1589,38 @@
 	} \
 } while (0)
 
-void __init __rcu_init(void)
+#ifdef CONFIG_TREE_PREEMPT_RCU
+
+void __init __rcu_init_preempt(void)
 {
-	int i;			/* All used by RCU_DATA_PTR_INIT(). */
+	int i;			/* All used by RCU_INIT_FLAVOR(). */
 	int j;
 	struct rcu_node *rnp;
 
-	printk(KERN_INFO "Hierarchical RCU implementation.\n");
+	RCU_INIT_FLAVOR(&rcu_preempt_state, rcu_preempt_data);
+}
+
+#else /* #ifdef CONFIG_TREE_PREEMPT_RCU */
+
+void __init __rcu_init_preempt(void)
+{
+}
+
+#endif /* #else #ifdef CONFIG_TREE_PREEMPT_RCU */
+
+void __init __rcu_init(void)
+{
+	int i;			/* All used by RCU_INIT_FLAVOR(). */
+	int j;
+	struct rcu_node *rnp;
+
+	rcu_bootup_announce();
 #ifdef CONFIG_RCU_CPU_STALL_DETECTOR
 	printk(KERN_INFO "RCU-based detection of stalled CPUs is enabled.\n");
 #endif /* #ifdef CONFIG_RCU_CPU_STALL_DETECTOR */
 	RCU_INIT_FLAVOR(&rcu_sched_state, rcu_sched_data);
 	RCU_INIT_FLAVOR(&rcu_bh_state, rcu_bh_data);
+	__rcu_init_preempt();
 	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 }