[PATCH] Use alloc_percpu to allocate workqueues locally

This patch makes the workqueus use alloc_percpu instead of an array.  The
workqueues are placed on nodes local to each processor.

The workqueue structure can grow to a significant size on a system with
lots of processors if this patch is not applied.  64 bit architectures with
all debugging features enabled and configured for 512 processors will not
be able to boot without this patch.

Signed-off-by: Christoph Lameter <clameter@sgi.com>
Signed-off-by: Andrew Morton <akpm@osdl.org>
Signed-off-by: Linus Torvalds <torvalds@osdl.org>
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 91bacb1..7cee222 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -12,6 +12,8 @@
  *   Andrew Morton <andrewm@uow.edu.au>
  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
  *   Theodore Ts'o <tytso@mit.edu>
+ *
+ * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
  */
 
 #include <linux/module.h>
@@ -57,7 +59,7 @@
  * per-CPU workqueues:
  */
 struct workqueue_struct {
-	struct cpu_workqueue_struct cpu_wq[NR_CPUS];
+	struct cpu_workqueue_struct *cpu_wq;
 	const char *name;
 	struct list_head list; 	/* Empty if single thread */
 };
@@ -102,7 +104,7 @@
 		if (unlikely(is_single_threaded(wq)))
 			cpu = 0;
 		BUG_ON(!list_empty(&work->entry));
-		__queue_work(wq->cpu_wq + cpu, work);
+		__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
 		ret = 1;
 	}
 	put_cpu();
@@ -118,7 +120,7 @@
 	if (unlikely(is_single_threaded(wq)))
 		cpu = 0;
 
-	__queue_work(wq->cpu_wq + cpu, work);
+	__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
 }
 
 int fastcall queue_delayed_work(struct workqueue_struct *wq,
@@ -265,13 +267,13 @@
 
 	if (is_single_threaded(wq)) {
 		/* Always use cpu 0's area. */
-		flush_cpu_workqueue(wq->cpu_wq + 0);
+		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, 0));
 	} else {
 		int cpu;
 
 		lock_cpu_hotplug();
 		for_each_online_cpu(cpu)
-			flush_cpu_workqueue(wq->cpu_wq + cpu);
+			flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
 		unlock_cpu_hotplug();
 	}
 }
@@ -279,7 +281,7 @@
 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
 						   int cpu)
 {
-	struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
+	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
 	struct task_struct *p;
 
 	spin_lock_init(&cwq->lock);
@@ -312,6 +314,7 @@
 	if (!wq)
 		return NULL;
 
+	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
 	wq->name = name;
 	/* We don't need the distraction of CPUs appearing and vanishing. */
 	lock_cpu_hotplug();
@@ -353,7 +356,7 @@
 	unsigned long flags;
 	struct task_struct *p;
 
-	cwq = wq->cpu_wq + cpu;
+	cwq = per_cpu_ptr(wq->cpu_wq, cpu);
 	spin_lock_irqsave(&cwq->lock, flags);
 	p = cwq->thread;
 	cwq->thread = NULL;
@@ -380,6 +383,7 @@
 		spin_unlock(&workqueue_lock);
 	}
 	unlock_cpu_hotplug();
+	free_percpu(wq->cpu_wq);
 	kfree(wq);
 }
 
@@ -458,7 +462,7 @@
 
 	BUG_ON(!keventd_wq);
 
-	cwq = keventd_wq->cpu_wq + cpu;
+	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
 	if (current == cwq->thread)
 		ret = 1;
 
@@ -470,7 +474,7 @@
 /* Take the work from this (downed) CPU. */
 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
 {
-	struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
+	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
 	LIST_HEAD(list);
 	struct work_struct *work;
 
@@ -481,7 +485,7 @@
 		printk("Taking work for %s\n", wq->name);
 		work = list_entry(list.next,struct work_struct,entry);
 		list_del(&work->entry);
-		__queue_work(wq->cpu_wq + smp_processor_id(), work);
+		__queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
 	}
 	spin_unlock_irq(&cwq->lock);
 }
@@ -508,15 +512,18 @@
 	case CPU_ONLINE:
 		/* Kick off worker threads. */
 		list_for_each_entry(wq, &workqueues, list) {
-			kthread_bind(wq->cpu_wq[hotcpu].thread, hotcpu);
-			wake_up_process(wq->cpu_wq[hotcpu].thread);
+			struct cpu_workqueue_struct *cwq;
+
+			cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
+			kthread_bind(cwq->thread, hotcpu);
+			wake_up_process(cwq->thread);
 		}
 		break;
 
 	case CPU_UP_CANCELED:
 		list_for_each_entry(wq, &workqueues, list) {
 			/* Unbind so it can run. */
-			kthread_bind(wq->cpu_wq[hotcpu].thread,
+			kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
 				     smp_processor_id());
 			cleanup_workqueue_thread(wq, hotcpu);
 		}