Add thread pool class

Added a thread pool class loosely based on google3 code.

Modified the compiler to have a single thread pool instead of creating new threads in ForAll.

Moved barrier to be in top level directory as it is not GC specific code.

Performance Timings:

Reference:
boot.oat: 14.306596s
time mm oat-target:
real    2m33.748s
user    10m23.190s
sys 5m54.140s

Thread pool:
boot.oat: 13.111049s
time mm oat-target:
real    2m29.372s
user    10m3.130s
sys 5m46.290s

The speed increase is probably just noise.

Change-Id: If3c1280cbaa4c7e4361127d064ac744ea12cdf49
diff --git a/src/compiler.cc b/src/compiler.cc
index 8d7f5b6..ddf9e87 100644
--- a/src/compiler.cc
+++ b/src/compiler.cc
@@ -35,6 +35,7 @@
 #include "ScopedLocalRef.h"
 #include "stl_util.h"
 #include "thread.h"
+#include "thread_pool.h"
 #include "timing_logger.h"
 #include "verifier/method_verifier.h"
 
@@ -313,7 +314,8 @@
       compiler_(NULL),
       compiler_context_(NULL),
       jni_compiler_(NULL),
-      create_invoke_stub_(NULL)
+      create_invoke_stub_(NULL),
+      thread_pool_(new ThreadPool(thread_count))
 {
   std::string compiler_so_name(MakeCompilerSoName(instruction_set_));
   compiler_library_ = dlopen(compiler_so_name.c_str(), RTLD_LAZY);
@@ -980,14 +982,18 @@
 
 class CompilationContext {
  public:
+  typedef void Callback(const CompilationContext* context, size_t index);
+
   CompilationContext(ClassLinker* class_linker,
           jobject class_loader,
           Compiler* compiler,
-          const DexFile* dex_file)
+          const DexFile* dex_file,
+          ThreadPool* thread_pool)
     : class_linker_(class_linker),
       class_loader_(class_loader),
       compiler_(compiler),
-      dex_file_(dex_file) {}
+      dex_file_(dex_file),
+      thread_pool_(thread_pool) {}
 
   ClassLinker* GetClassLinker() const {
     CHECK(class_linker_ != NULL);
@@ -1008,96 +1014,64 @@
     return dex_file_;
   }
 
+  void ForAll(size_t begin, size_t end, Callback callback, size_t work_units) {
+    Thread* self = Thread::Current();
+    self->AssertNoPendingException();
+    CHECK_GT(work_units, 0U);
+
+    std::vector<Closure*> closures(work_units);
+    for (size_t i = 0; i < work_units; ++i) {
+      closures[i] = new ForAllClosure(this, begin + i, end, callback, work_units);
+      thread_pool_->AddTask(self, closures[i]);
+    }
+    thread_pool_->StartWorkers(self);
+
+    // Ensure we're suspended while we're blocked waiting for the other threads to finish (worker
+    // thread destructor's called below perform join).
+    CHECK_NE(self->GetState(), kRunnable);
+
+    // Wait for all the worker threads to finish.
+    thread_pool_->Wait(self);
+
+    STLDeleteElements(&closures);
+  }
+
  private:
+
+  class ForAllClosure : public Closure {
+   public:
+    ForAllClosure(CompilationContext* context, size_t begin, size_t end, Callback* callback,
+                  size_t stripe)
+        : context_(context),
+          begin_(begin),
+          end_(end),
+          callback_(callback),
+          stripe_(stripe)
+    {
+
+    }
+
+    virtual void Run(Thread* self) {
+      for (size_t i = begin_; i < end_; i += stripe_) {
+        callback_(context_, i);
+        self->AssertNoPendingException();
+      }
+    }
+   private:
+    CompilationContext* const context_;
+    const size_t begin_;
+    const size_t end_;
+    const Callback* callback_;
+    const size_t stripe_;
+  };
+
   ClassLinker* const class_linker_;
   const jobject class_loader_;
   Compiler* const compiler_;
   const DexFile* const dex_file_;
+  ThreadPool* thread_pool_;
 };
 
-typedef void Callback(const CompilationContext* context, size_t index);
-
-static void ForAll(CompilationContext* context, size_t begin, size_t end, Callback callback,
-                   size_t thread_count);
-
-class WorkerThread {
- public:
-  WorkerThread(CompilationContext* context, size_t begin, size_t end, Callback callback, size_t stripe, bool spawn)
-      : spawn_(spawn), context_(context), begin_(begin), end_(end), callback_(callback), stripe_(stripe) {
-    if (spawn_) {
-      // Mac OS stacks are only 512KiB. Make sure we have the same stack size on all platforms.
-      pthread_attr_t attr;
-      CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), "new compiler worker thread");
-      CHECK_PTHREAD_CALL(pthread_attr_setstacksize, (&attr, 1*MB), "new compiler worker thread");
-      CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Go, this), "new compiler worker thread");
-      CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), "new compiler worker thread");
-    }
-  }
-
-  ~WorkerThread() {
-    if (spawn_) {
-      CHECK_PTHREAD_CALL(pthread_join, (pthread_, NULL), "compiler worker shutdown");
-    }
-  }
-
- private:
-  static void* Go(void* arg) LOCKS_EXCLUDED(Locks::mutator_lock_) {
-    WorkerThread* worker = reinterpret_cast<WorkerThread*>(arg);
-    Runtime* runtime = Runtime::Current();
-    if (worker->spawn_) {
-      CHECK(runtime->AttachCurrentThread("Compiler Worker", true, NULL));
-    }
-    worker->Run();
-    if (worker->spawn_) {
-      runtime->DetachCurrentThread();
-    }
-    return NULL;
-  }
-
-  void Go() LOCKS_EXCLUDED(Locks::mutator_lock_) {
-    Go(this);
-  }
-
-  void Run() LOCKS_EXCLUDED(Locks::mutator_lock_) {
-    Thread* self = Thread::Current();
-    for (size_t i = begin_; i < end_; i += stripe_) {
-      callback_(context_, i);
-      self->AssertNoPendingException();
-    }
-  }
-
-  pthread_t pthread_;
-  // Was this thread spawned or is it the main thread?
-  const bool spawn_;
-
-  const CompilationContext* const context_;
-  const size_t begin_;
-  const size_t end_;
-  Callback* callback_;
-  const size_t stripe_;
-
-  friend void ForAll(CompilationContext*, size_t, size_t, Callback, size_t);
-};
-
-static void ForAll(CompilationContext* context, size_t begin, size_t end, Callback callback,
-                   size_t thread_count)
-    LOCKS_EXCLUDED(Locks::mutator_lock_) {
-  Thread* self = Thread::Current();
-  self->AssertNoPendingException();
-  CHECK_GT(thread_count, 0U);
-
-  std::vector<WorkerThread*> threads;
-  for (size_t i = 0; i < thread_count; ++i) {
-    threads.push_back(new WorkerThread(context, begin + i, end, callback, thread_count, (i != 0)));
-  }
-  threads[0]->Go();
-
-  // Ensure we're suspended while we're blocked waiting for the other threads to finish (worker
-  // thread destructor's called below perform join).
-  CHECK_NE(self->GetState(), kRunnable);
-  STLDeleteElements(&threads);
-}
-
 // Return true if the class should be skipped during compilation. We
 // never skip classes in the boot class loader. However, if we have a
 // non-boot class loader and we can resolve the class in the boot
@@ -1216,11 +1190,11 @@
   // TODO: we could resolve strings here, although the string table is largely filled with class
   //       and method names.
 
-  CompilationContext context(class_linker, class_loader, this, &dex_file);
-  ForAll(&context, 0, dex_file.NumTypeIds(), ResolveType, thread_count_);
+  CompilationContext context(class_linker, class_loader, this, &dex_file, thread_pool_.get());
+  context.ForAll(0, dex_file.NumTypeIds(), ResolveType, thread_count_);
   timings.AddSplit("Resolve " + dex_file.GetLocation() + " Types");
 
-  ForAll(&context, 0, dex_file.NumClassDefs(), ResolveClassFieldsAndMethods, thread_count_);
+  context.ForAll(0, dex_file.NumClassDefs(), ResolveClassFieldsAndMethods, thread_count_);
   timings.AddSplit("Resolve " + dex_file.GetLocation() + " MethodsAndFields");
 }
 
@@ -1281,8 +1255,8 @@
 
 void Compiler::VerifyDexFile(jobject class_loader, const DexFile& dex_file, TimingLogger& timings) {
   ClassLinker* class_linker = Runtime::Current()->GetClassLinker();
-  CompilationContext context(class_linker, class_loader, this, &dex_file);
-  ForAll(&context, 0, dex_file.NumClassDefs(), VerifyClass, thread_count_);
+  CompilationContext context(class_linker, class_loader, this, &dex_file, thread_pool_.get());
+  context.ForAll(0, dex_file.NumClassDefs(), VerifyClass, thread_count_);
   timings.AddSplit("Verify " + dex_file.GetLocation());
 }
 
@@ -1326,8 +1300,8 @@
 void Compiler::InitializeClassesWithoutClinit(jobject jni_class_loader, const DexFile& dex_file,
                                               TimingLogger& timings) {
   ClassLinker* class_linker = Runtime::Current()->GetClassLinker();
-  CompilationContext context(class_linker, jni_class_loader, this, &dex_file);
-  ForAll(&context, 0, dex_file.NumClassDefs(), InitializeClassWithoutClinit, thread_count_);
+  CompilationContext context(class_linker, jni_class_loader, this, &dex_file, thread_pool_.get());
+  context.ForAll(0, dex_file.NumClassDefs(), InitializeClassWithoutClinit, thread_count_);
   timings.AddSplit("InitializeNoClinit " + dex_file.GetLocation());
 }
 
@@ -1416,8 +1390,8 @@
 
 void Compiler::CompileDexFile(jobject class_loader, const DexFile& dex_file,
                               TimingLogger& timings) {
-  CompilationContext context(NULL, class_loader, this, &dex_file);
-  ForAll(&context, 0, dex_file.NumClassDefs(), Compiler::CompileClass, thread_count_);
+  CompilationContext context(NULL, class_loader, this, &dex_file, thread_pool_.get());
+  context.ForAll(0, dex_file.NumClassDefs(), Compiler::CompileClass, thread_count_);
   timings.AddSplit("Compile " + dex_file.GetLocation());
 }