diff options
| author | 2012-10-23 16:13:36 -0700 | |
|---|---|---|
| committer | 2012-10-26 12:00:03 -0700 | |
| commit | 0e4627e593bc39f8e3d89c31f8977d55054c07cc (patch) | |
| tree | 1d69558732c0c916e51a530985a26d4235ef0e6c /src/thread_pool.cc | |
| parent | 9281f004db3f194930ef34d31e5d80c98341f38f (diff) | |
Add thread pool class
Added a thread pool class loosely based on google3 code.
Modified the compiler to have a single thread pool instead of creating new threads in ForAll.
Moved barrier to be in top level directory as it is not GC specific code.
Performance Timings:
Reference:
boot.oat: 14.306596s
time mm oat-target:
real 2m33.748s
user 10m23.190s
sys 5m54.140s
Thread pool:
boot.oat: 13.111049s
time mm oat-target:
real 2m29.372s
user 10m3.130s
sys 5m46.290s
The speed increase is probably just noise.
Change-Id: If3c1280cbaa4c7e4361127d064ac744ea12cdf49
Diffstat (limited to 'src/thread_pool.cc')
| -rw-r--r-- | src/thread_pool.cc | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/src/thread_pool.cc b/src/thread_pool.cc new file mode 100644 index 0000000000..fa0cf794c4 --- /dev/null +++ b/src/thread_pool.cc @@ -0,0 +1,124 @@ +#include "runtime.h" +#include "stl_util.h" +#include "thread.h" +#include "thread_pool.h" + +namespace art { + +ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, + size_t stack_size) + : thread_pool_(thread_pool), + name_(name), + stack_size_(stack_size) { + const char* reason = "new thread pool worker thread"; + CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); + CHECK_PTHREAD_CALL(pthread_attr_setstacksize, (&attr, stack_size), reason); + CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); + CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); +} + +ThreadPoolWorker::~ThreadPoolWorker() { + CHECK_PTHREAD_CALL(pthread_join, (pthread_, NULL), "thread pool worker shutdown"); +} + +void ThreadPoolWorker::Run() { + Thread* self = Thread::Current(); + Closure* task = NULL; + while ((task = thread_pool_->GetTask(self)) != NULL) { + task->Run(self); + } +} + +void* ThreadPoolWorker::Callback(void* arg) { + ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); + Runtime* runtime = Runtime::Current(); + CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), true, NULL)); + // Do work until its time to shut down. + worker->Run(); + runtime->DetachCurrentThread(); + return NULL; +} + +void ThreadPool::AddTask(Thread* self, Closure* task){ + MutexLock mu(self, task_queue_lock_); + tasks_.push_back(task); + // If we have any waiters, signal one. + if (waiting_count_ != 0) { + task_queue_condition_.Signal(self); + } +} + +void ThreadPool::AddThread(size_t stack_size) { + threads_.push_back( + new ThreadPoolWorker( + this, + StringPrintf("Thread pool worker %d", static_cast<int>(GetThreadCount())), + stack_size)); +} + +ThreadPool::ThreadPool(size_t num_threads) + : task_queue_lock_("task queue lock"), + task_queue_condition_("task queue condition", task_queue_lock_), + completion_condition_("task completion condition", task_queue_lock_), + started_(false), + shutting_down_(false), + waiting_count_(0) { + while (GetThreadCount() < num_threads) { + AddThread(ThreadPoolWorker::kDefaultStackSize); + } +} + +ThreadPool::~ThreadPool() { + // Tell any remaining workers to shut down. + shutting_down_ = true; + android_memory_barrier(); + // Broadcast to everyone waiting. + task_queue_condition_.Broadcast(Thread::Current()); + // Wait for the threads to finish. + STLDeleteElements(&threads_); +} + +void ThreadPool::StartWorkers(Thread* self) { + MutexLock mu(self, task_queue_lock_); + started_ = true; + android_memory_barrier(); + task_queue_condition_.Broadcast(self); +} + +void ThreadPool::StopWorkers(Thread* self) { + MutexLock mu(self, task_queue_lock_); + started_ = false; + android_memory_barrier(); +} + +Closure* ThreadPool::GetTask(Thread* self) { + MutexLock mu(self, task_queue_lock_); + while (!shutting_down_) { + if (started_ && !tasks_.empty()) { + Closure* task = tasks_.front(); + tasks_.pop_front(); + return task; + } + + waiting_count_++; + if (waiting_count_ == GetThreadCount() && tasks_.empty()) { + // We may be done, lets broadcast to the completion condition. + completion_condition_.Broadcast(self); + } + task_queue_condition_.Wait(self); + waiting_count_--; + } + + // We are shutting down, return NULL to tell the worker thread to stop looping. + return NULL; +} + +void ThreadPool::Wait(Thread* self) { + MutexLock mu(self, task_queue_lock_); + // Wait until each thread is waiting and the task list is empty. + while (waiting_count_ != GetThreadCount() || !tasks_.empty()) { + completion_condition_.Wait(self); + } +} + +} // namespace art |