Refactoring of ThreadPool
Introduce AbstractThreadPool for managing threads, and a subclass
ThreadPool for handling a regular task queue.
This is to prepare for a custom JIT thread pool with multiple queues.
Test: test.py
Change-Id: I180032d14b8946ea6787252bd4fe4c89409e089b
diff --git a/dex2oat/driver/compiler_driver.cc b/dex2oat/driver/compiler_driver.cc
index 460b5da..ee31c7e 100644
--- a/dex2oat/driver/compiler_driver.cc
+++ b/dex2oat/driver/compiler_driver.cc
@@ -2804,8 +2804,8 @@
void CompilerDriver::InitializeThreadPools() {
size_t parallel_count = parallel_thread_count_ > 0 ? parallel_thread_count_ - 1 : 0;
parallel_thread_pool_.reset(
- new ThreadPool("Compiler driver thread pool", parallel_count));
- single_thread_pool_.reset(new ThreadPool("Single-threaded Compiler driver thread pool", 0));
+ ThreadPool::Create("Compiler driver thread pool", parallel_count));
+ single_thread_pool_.reset(ThreadPool::Create("Single-threaded Compiler driver thread pool", 0));
}
void CompilerDriver::FreeThreadPools() {
diff --git a/dex2oat/linker/elf_writer_quick.cc b/dex2oat/linker/elf_writer_quick.cc
index 61e5783..10dc23e 100644
--- a/dex2oat/linker/elf_writer_quick.cc
+++ b/dex2oat/linker/elf_writer_quick.cc
@@ -254,7 +254,7 @@
builder_->GetDex()->Exists() ? builder_->GetDex()->GetAddress() : 0,
dex_section_size_,
debug_info);
- debug_info_thread_pool_ = std::make_unique<ThreadPool>("Mini-debug-info writer", 1);
+ debug_info_thread_pool_.reset(ThreadPool::Create("Mini-debug-info writer", 1));
debug_info_thread_pool_->AddTask(self, debug_info_task_.get());
debug_info_thread_pool_->StartWorkers(self);
}
diff --git a/runtime/barrier_test.cc b/runtime/barrier_test.cc
index 52959bd..e53241c 100644
--- a/runtime/barrier_test.cc
+++ b/runtime/barrier_test.cc
@@ -64,15 +64,16 @@
// Check that barrier wait and barrier increment work.
TEST_F(BarrierTest, CheckWait) {
Thread* self = Thread::Current();
- ThreadPool thread_pool("Barrier test thread pool", num_threads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Barrier test thread pool", num_threads));
Barrier barrier(num_threads + 1); // One extra Wait() in main thread.
Barrier timeout_barrier(0); // Only used for sleeping on timeout.
AtomicInteger count1(0);
AtomicInteger count2(0);
for (int32_t i = 0; i < num_threads; ++i) {
- thread_pool.AddTask(self, new CheckWaitTask(&barrier, &count1, &count2));
+ thread_pool->AddTask(self, new CheckWaitTask(&barrier, &count1, &count2));
}
- thread_pool.StartWorkers(self);
+ thread_pool->StartWorkers(self);
while (count1.load(std::memory_order_relaxed) != num_threads) {
timeout_barrier.Increment(self, 1, 100); // sleep 100 msecs
}
@@ -81,7 +82,7 @@
// Perform one additional Wait(), allowing pool threads to proceed.
barrier.Wait(self);
// Wait for all the threads to finish.
- thread_pool.Wait(self, true, false);
+ thread_pool->Wait(self, true, false);
// Both counts should be equal to num_threads now.
EXPECT_EQ(count1.load(std::memory_order_relaxed), num_threads);
EXPECT_EQ(count2.load(std::memory_order_relaxed), num_threads);
@@ -115,15 +116,16 @@
// Check that barrier pass through works.
TEST_F(BarrierTest, CheckPass) {
Thread* self = Thread::Current();
- ThreadPool thread_pool("Barrier test thread pool", num_threads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Barrier test thread pool", num_threads));
Barrier barrier(0);
AtomicInteger count(0);
const int32_t num_tasks = num_threads * 4;
const int32_t num_sub_tasks = 128;
for (int32_t i = 0; i < num_tasks; ++i) {
- thread_pool.AddTask(self, new CheckPassTask(&barrier, &count, num_sub_tasks));
+ thread_pool->AddTask(self, new CheckPassTask(&barrier, &count, num_sub_tasks));
}
- thread_pool.StartWorkers(self);
+ thread_pool->StartWorkers(self);
const int32_t expected_total_tasks = num_sub_tasks * num_tasks;
// Wait for all the tasks to complete using the barrier.
barrier.Increment(self, expected_total_tasks);
diff --git a/runtime/gc/heap.cc b/runtime/gc/heap.cc
index 4671b75..c10f417 100644
--- a/runtime/gc/heap.cc
+++ b/runtime/gc/heap.cc
@@ -1127,7 +1127,7 @@
num_threads = std::max(parallel_gc_threads_, conc_gc_threads_);
}
if (num_threads != 0) {
- thread_pool_.reset(new ThreadPool("Heap thread pool", num_threads));
+ thread_pool_.reset(ThreadPool::Create("Heap thread pool", num_threads));
}
}
diff --git a/runtime/gc/space/large_object_space_test.cc b/runtime/gc/space/large_object_space_test.cc
index 9736c5d..99a67c5 100644
--- a/runtime/gc/space/large_object_space_test.cc
+++ b/runtime/gc/space/large_object_space_test.cc
@@ -161,14 +161,15 @@
}
Thread* self = Thread::Current();
- ThreadPool thread_pool("Large object space test thread pool", kNumThreads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Large object space test thread pool", kNumThreads));
for (size_t i = 0; i < kNumThreads; ++i) {
- thread_pool.AddTask(self, new AllocRaceTask(i, kNumIterations, 16 * KB, los));
+ thread_pool->AddTask(self, new AllocRaceTask(i, kNumIterations, 16 * KB, los));
}
- thread_pool.StartWorkers(self);
+ thread_pool->StartWorkers(self);
- thread_pool.Wait(self, true, false);
+ thread_pool->Wait(self, true, false);
delete los;
}
diff --git a/runtime/gc/task_processor_test.cc b/runtime/gc/task_processor_test.cc
index a3666e0..3614a51 100644
--- a/runtime/gc/task_processor_test.cc
+++ b/runtime/gc/task_processor_test.cc
@@ -63,7 +63,7 @@
};
TEST_F(TaskProcessorTest, Interrupt) {
- ThreadPool thread_pool("task processor test", 1U);
+ std::unique_ptr<ThreadPool> thread_pool(ThreadPool::Create("task processor test", 1U));
Thread* const self = Thread::Current();
TaskProcessor task_processor;
static constexpr size_t kRecursion = 10;
@@ -72,8 +72,8 @@
task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
task_processor.Start(self);
// Add a task which will wait until interrupted to the thread pool.
- thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
- thread_pool.StartWorkers(self);
+ thread_pool->AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
+ thread_pool->StartWorkers(self);
ASSERT_FALSE(done_running);
// Wait until all the tasks are done, but since we didn't interrupt, done_running should be 0.
while (counter.load(std::memory_order_seq_cst) != kRecursion) {
@@ -81,7 +81,7 @@
}
ASSERT_FALSE(done_running);
task_processor.Stop(self);
- thread_pool.Wait(self, true, false);
+ thread_pool->Wait(self, true, false);
// After the interrupt and wait, the WorkUntilInterruptedTasktask should have terminated and
// set done_running_ to true.
ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
@@ -93,9 +93,9 @@
// working until all the tasks are completed.
task_processor.Stop(self);
task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
- thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, true, false);
+ thread_pool->AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, true, false);
ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
ASSERT_EQ(counter.load(std::memory_order_seq_cst), kRecursion);
}
@@ -133,13 +133,13 @@
auto* task = new TestOrderTask(pair.first, pair.second, &counter);
task_processor.AddTask(self, task);
}
- ThreadPool thread_pool("task processor test", 1U);
+ std::unique_ptr<ThreadPool> thread_pool(ThreadPool::Create("task processor test", 1U));
Atomic<bool> done_running(false);
// Add a task which will wait until interrupted to the thread pool.
- thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
+ thread_pool->AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
ASSERT_FALSE(done_running.load(std::memory_order_seq_cst));
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, true, false);
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, true, false);
ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
ASSERT_EQ(counter, kNumTasks);
}
diff --git a/runtime/jit/jit.cc b/runtime/jit/jit.cc
index f67ffdd..d2beacb 100644
--- a/runtime/jit/jit.cc
+++ b/runtime/jit/jit.cc
@@ -1183,7 +1183,7 @@
// We need peers as we may report the JIT thread, e.g., in the debugger.
constexpr bool kJitPoolNeedsPeers = true;
- thread_pool_.reset(new ThreadPool("Jit thread pool", 1, kJitPoolNeedsPeers));
+ thread_pool_.reset(ThreadPool::Create("Jit thread pool", 1, kJitPoolNeedsPeers));
Runtime* runtime = Runtime::Current();
thread_pool_->SetPthreadPriority(
diff --git a/runtime/monitor_test.cc b/runtime/monitor_test.cc
index 699f2b0..4270740 100644
--- a/runtime/monitor_test.cc
+++ b/runtime/monitor_test.cc
@@ -278,15 +278,15 @@
// Need to drop the mutator lock to allow barriers.
ScopedThreadSuspension sts(soa.Self(), ThreadState::kNative);
- ThreadPool thread_pool(pool_name, 3);
- thread_pool.AddTask(self, new CreateTask(test, create_sleep, c_millis, c_expected));
+ std::unique_ptr<ThreadPool> thread_pool(ThreadPool::Create(pool_name, 3));
+ thread_pool->AddTask(self, new CreateTask(test, create_sleep, c_millis, c_expected));
if (interrupt) {
- thread_pool.AddTask(self, new InterruptTask(test, use_sleep, static_cast<uint64_t>(u_millis)));
+ thread_pool->AddTask(self, new InterruptTask(test, use_sleep, static_cast<uint64_t>(u_millis)));
} else {
- thread_pool.AddTask(self, new UseTask(test, use_sleep, u_millis, u_expected));
+ thread_pool->AddTask(self, new UseTask(test, use_sleep, u_millis, u_expected));
}
- thread_pool.AddTask(self, new WatchdogTask(test));
- thread_pool.StartWorkers(self);
+ thread_pool->AddTask(self, new WatchdogTask(test));
+ thread_pool->StartWorkers(self);
// Wait on completion barrier.
test->complete_barrier_->Wait(self);
@@ -300,7 +300,7 @@
watchdog_obj->MonitorExit(self); // Release the lock.
}
- thread_pool.StopWorkers(self);
+ thread_pool->StopWorkers(self);
}
@@ -361,7 +361,7 @@
ScopedLogSeverity sls(LogSeverity::FATAL);
Thread* const self = Thread::Current();
- ThreadPool thread_pool("the pool", 2);
+ std::unique_ptr<ThreadPool> thread_pool(ThreadPool::Create("the pool", 2));
ScopedObjectAccess soa(self);
StackHandleScope<1> hs(self);
Handle<mirror::Object> obj1(
@@ -375,10 +375,10 @@
EXPECT_TRUE(trylock.Acquired());
}
// Test failure case.
- thread_pool.AddTask(self, new TryLockTask(g_obj1));
- thread_pool.StartWorkers(self);
+ thread_pool->AddTask(self, new TryLockTask(g_obj1));
+ thread_pool->StartWorkers(self);
ScopedThreadSuspension sts(self, ThreadState::kSuspended);
- thread_pool.Wait(Thread::Current(), /*do_work=*/false, /*may_hold_locks=*/false);
+ thread_pool->Wait(Thread::Current(), /*do_work=*/false, /*may_hold_locks=*/false);
}
// Test that the trylock actually locks the object.
{
@@ -388,7 +388,7 @@
// Since we hold the lock there should be no monitor state exeception.
self->AssertNoPendingException();
}
- thread_pool.StopWorkers(self);
+ thread_pool->StopWorkers(self);
}
diff --git a/runtime/oat_file_assistant_test.cc b/runtime/oat_file_assistant_test.cc
index 851c75b..132640d 100644
--- a/runtime/oat_file_assistant_test.cc
+++ b/runtime/oat_file_assistant_test.cc
@@ -1607,17 +1607,18 @@
const size_t kNumThreads = 16;
Thread* self = Thread::Current();
- ThreadPool thread_pool("Oat file assistant test thread pool", kNumThreads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Oat file assistant test thread pool", kNumThreads));
std::vector<std::unique_ptr<RaceGenerateTask>> tasks;
Mutex lock("RaceToGenerate");
for (size_t i = 0; i < kNumThreads; i++) {
std::unique_ptr<RaceGenerateTask> task(
new RaceGenerateTask(*this, dex_location, oat_location, &lock));
- thread_pool.AddTask(self, task.get());
+ thread_pool->AddTask(self, task.get());
tasks.push_back(std::move(task));
}
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, /* do_work= */ true, /* may_hold_locks= */ false);
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, /* do_work= */ true, /* may_hold_locks= */ false);
// Verify that tasks which got an oat file got a unique one.
std::set<const OatFile*> oat_files;
diff --git a/runtime/oat_file_manager.cc b/runtime/oat_file_manager.cc
index f5f72f3..f7b8e7d 100644
--- a/runtime/oat_file_manager.cc
+++ b/runtime/oat_file_manager.cc
@@ -853,7 +853,7 @@
WriterMutexLock mu(self, *Locks::oat_file_manager_lock_);
if (verification_thread_pool_ == nullptr) {
verification_thread_pool_.reset(
- new ThreadPool("Verification thread pool", /* num_threads= */ 1));
+ ThreadPool::Create("Verification thread pool", /* num_threads= */ 1));
verification_thread_pool_->StartWorkers(self);
}
}
diff --git a/runtime/runtime.cc b/runtime/runtime.cc
index 6f0a040..adf75fe 100644
--- a/runtime/runtime.cc
+++ b/runtime/runtime.cc
@@ -1259,7 +1259,8 @@
std::min(static_cast<size_t>(std::thread::hardware_concurrency()), kMaxRuntimeWorkers);
MutexLock mu(Thread::Current(), *Locks::runtime_thread_pool_lock_);
CHECK(thread_pool_ == nullptr);
- thread_pool_.reset(new ThreadPool("Runtime", num_workers, /*create_peers=*/false, kStackSize));
+ thread_pool_.reset(
+ ThreadPool::Create("Runtime", num_workers, /*create_peers=*/false, kStackSize));
thread_pool_->StartWorkers(Thread::Current());
}
diff --git a/runtime/thread_pool.cc b/runtime/thread_pool.cc
index de9ccba..94be457 100644
--- a/runtime/thread_pool.cc
+++ b/runtime/thread_pool.cc
@@ -46,7 +46,8 @@
static constexpr bool kUseCustomThreadPoolStack = true;
#endif
-ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name,
+ThreadPoolWorker::ThreadPoolWorker(AbstractThreadPool* thread_pool,
+ const std::string& name,
size_t stack_size)
: thread_pool_(thread_pool),
name_(name) {
@@ -159,17 +160,28 @@
// The ThreadPool is responsible for calling Finalize (which usually delete
// the task memory) on all the tasks.
Task* task = nullptr;
- while ((task = TryGetTask(self)) != nullptr) {
+ do {
+ {
+ MutexLock mu(self, task_queue_lock_);
+ if (tasks_.empty()) {
+ return;
+ }
+ task = tasks_.front();
+ tasks_.pop_front();
+ }
task->Finalize();
- }
- MutexLock mu(self, task_queue_lock_);
- tasks_.clear();
+ } while (true);
}
-ThreadPool::ThreadPool(const char* name,
- size_t num_threads,
- bool create_peers,
- size_t worker_stack_size)
+ThreadPool::~ThreadPool() {
+ DeleteThreads();
+ RemoveAllTasks(Thread::Current());
+}
+
+AbstractThreadPool::AbstractThreadPool(const char* name,
+ size_t num_threads,
+ bool create_peers,
+ size_t worker_stack_size)
: name_(name),
task_queue_lock_("task queue lock", kGenericBottomLock),
task_queue_condition_("task queue condition", task_queue_lock_),
@@ -182,11 +194,9 @@
creation_barier_(0),
max_active_workers_(num_threads),
create_peers_(create_peers),
- worker_stack_size_(worker_stack_size) {
- CreateThreads();
-}
+ worker_stack_size_(worker_stack_size) {}
-void ThreadPool::CreateThreads() {
+void AbstractThreadPool::CreateThreads() {
CHECK(threads_.empty());
Thread* self = Thread::Current();
{
@@ -203,17 +213,17 @@
}
}
-void ThreadPool::WaitForWorkersToBeCreated() {
+void AbstractThreadPool::WaitForWorkersToBeCreated() {
creation_barier_.Increment(Thread::Current(), 0);
}
-const std::vector<ThreadPoolWorker*>& ThreadPool::GetWorkers() {
+const std::vector<ThreadPoolWorker*>& AbstractThreadPool::GetWorkers() {
// Wait for all the workers to be created before returning them.
WaitForWorkersToBeCreated();
return threads_;
}
-void ThreadPool::DeleteThreads() {
+void AbstractThreadPool::DeleteThreads() {
{
Thread* self = Thread::Current();
MutexLock mu(self, task_queue_lock_);
@@ -229,18 +239,13 @@
STLDeleteElements(&threads_);
}
-void ThreadPool::SetMaxActiveWorkers(size_t max_workers) {
+void AbstractThreadPool::SetMaxActiveWorkers(size_t max_workers) {
MutexLock mu(Thread::Current(), task_queue_lock_);
CHECK_LE(max_workers, GetThreadCount());
max_active_workers_ = max_workers;
}
-ThreadPool::~ThreadPool() {
- DeleteThreads();
- RemoveAllTasks(Thread::Current());
-}
-
-void ThreadPool::StartWorkers(Thread* self) {
+void AbstractThreadPool::StartWorkers(Thread* self) {
MutexLock mu(self, task_queue_lock_);
started_ = true;
task_queue_condition_.Broadcast(self);
@@ -248,17 +253,17 @@
total_wait_time_ = 0;
}
-void ThreadPool::StopWorkers(Thread* self) {
+void AbstractThreadPool::StopWorkers(Thread* self) {
MutexLock mu(self, task_queue_lock_);
started_ = false;
}
-bool ThreadPool::HasStarted(Thread* self) {
+bool AbstractThreadPool::HasStarted(Thread* self) {
MutexLock mu(self, task_queue_lock_);
return started_;
}
-Task* ThreadPool::GetTask(Thread* self) {
+Task* AbstractThreadPool::GetTask(Thread* self) {
MutexLock mu(self, task_queue_lock_);
while (!IsShuttingDown()) {
const size_t thread_count = GetThreadCount();
@@ -290,7 +295,7 @@
return nullptr;
}
-Task* ThreadPool::TryGetTask(Thread* self) {
+Task* AbstractThreadPool::TryGetTask(Thread* self) {
MutexLock mu(self, task_queue_lock_);
return TryGetTaskLocked();
}
@@ -304,7 +309,7 @@
return nullptr;
}
-void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) {
+void AbstractThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) {
if (do_work) {
CHECK(!create_peers_);
Task* task = nullptr;
@@ -329,13 +334,13 @@
return tasks_.size();
}
-void ThreadPool::SetPthreadPriority(int priority) {
+void AbstractThreadPool::SetPthreadPriority(int priority) {
for (ThreadPoolWorker* worker : threads_) {
worker->SetPthreadPriority(priority);
}
}
-void ThreadPool::CheckPthreadPriority(int priority) {
+void AbstractThreadPool::CheckPthreadPriority(int priority) {
#if defined(ART_TARGET_ANDROID)
for (ThreadPoolWorker* worker : threads_) {
CHECK_EQ(worker->GetPthreadPriority(), priority);
diff --git a/runtime/thread_pool.h b/runtime/thread_pool.h
index 5c75733..3ea17a1 100644
--- a/runtime/thread_pool.h
+++ b/runtime/thread_pool.h
@@ -27,7 +27,7 @@
namespace art {
-class ThreadPool;
+class AbstractThreadPool;
class Closure {
public:
@@ -92,23 +92,23 @@
Thread* GetThread() const { return thread_; }
protected:
- ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size);
+ ThreadPoolWorker(AbstractThreadPool* thread_pool, const std::string& name, size_t stack_size);
static void* Callback(void* arg) REQUIRES(!Locks::mutator_lock_);
virtual void Run();
- ThreadPool* const thread_pool_;
+ AbstractThreadPool* const thread_pool_;
const std::string name_;
MemMap stack_;
pthread_t pthread_;
Thread* thread_;
private:
- friend class ThreadPool;
+ friend class AbstractThreadPool;
DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker);
};
// Note that thread pool workers will set Thread#setCanCallIntoJava to false.
-class ThreadPool {
+class AbstractThreadPool {
public:
// Returns the number of threads in the thread pool.
size_t GetThreadCount() const {
@@ -128,21 +128,12 @@
// Add a new task, the first available started worker will process it. Does not delete the task
// after running it, it is the caller's responsibility.
- void AddTask(Thread* self, Task* task) REQUIRES(!task_queue_lock_);
+ virtual void AddTask(Thread* self, Task* task) REQUIRES(!task_queue_lock_) = 0;
// Remove all tasks in the queue.
- void RemoveAllTasks(Thread* self) REQUIRES(!task_queue_lock_);
+ virtual void RemoveAllTasks(Thread* self) REQUIRES(!task_queue_lock_) = 0;
- // Create a named thread pool with the given number of threads.
- //
- // If create_peers is true, all worker threads will have a Java peer object. Note that if the
- // pool is asked to do work on the current thread (see Wait), a peer may not be available. Wait
- // will conservatively abort if create_peers and do_work are true.
- ThreadPool(const char* name,
- size_t num_threads,
- bool create_peers = false,
- size_t worker_stack_size = ThreadPoolWorker::kDefaultStackSize);
- virtual ~ThreadPool();
+ virtual size_t GetTaskCount(Thread* self) REQUIRES(!task_queue_lock_) = 0;
// Create the threads of this pool.
void CreateThreads();
@@ -155,8 +146,6 @@
// When the pool was created with peers for workers, do_work must not be true (see ThreadPool()).
void Wait(Thread* self, bool do_work, bool may_hold_locks) REQUIRES(!task_queue_lock_);
- size_t GetTaskCount(Thread* self) REQUIRES(!task_queue_lock_);
-
// Returns the total amount of workers waited for tasks.
uint64_t GetWaitTime() const {
return total_wait_time_;
@@ -176,22 +165,27 @@
// Wait for workers to be created.
void WaitForWorkersToBeCreated();
+ virtual ~AbstractThreadPool() {}
+
protected:
// get a task to run, blocks if there are no tasks left
- virtual Task* GetTask(Thread* self) REQUIRES(!task_queue_lock_);
+ Task* GetTask(Thread* self) REQUIRES(!task_queue_lock_);
// Try to get a task, returning null if there is none available.
Task* TryGetTask(Thread* self) REQUIRES(!task_queue_lock_);
- Task* TryGetTaskLocked() REQUIRES(task_queue_lock_);
+ virtual Task* TryGetTaskLocked() REQUIRES(task_queue_lock_) = 0;
// Are we shutting down?
bool IsShuttingDown() const REQUIRES(task_queue_lock_) {
return shutting_down_;
}
- bool HasOutstandingTasks() const REQUIRES(task_queue_lock_) {
- return started_ && !tasks_.empty();
- }
+ virtual bool HasOutstandingTasks() const REQUIRES(task_queue_lock_) = 0;
+
+ AbstractThreadPool(const char* name,
+ size_t num_threads,
+ bool create_peers,
+ size_t worker_stack_size);
const std::string name_;
Mutex task_queue_lock_;
@@ -201,7 +195,6 @@
volatile bool shutting_down_ GUARDED_BY(task_queue_lock_);
// How many worker threads are waiting on the condition.
volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_);
- std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_);
std::vector<ThreadPoolWorker*> threads_;
// Work balance detection.
uint64_t start_time_ GUARDED_BY(task_queue_lock_);
@@ -214,6 +207,46 @@
private:
friend class ThreadPoolWorker;
friend class WorkStealingWorker;
+ DISALLOW_COPY_AND_ASSIGN(AbstractThreadPool);
+};
+
+class ThreadPool : public AbstractThreadPool {
+ public:
+ // Create a named thread pool with the given number of threads.
+ //
+ // If create_peers is true, all worker threads will have a Java peer object. Note that if the
+ // pool is asked to do work on the current thread (see Wait), a peer may not be available. Wait
+ // will conservatively abort if create_peers and do_work are true.
+ static ThreadPool* Create(const char* name,
+ size_t num_threads,
+ bool create_peers = false,
+ size_t worker_stack_size = ThreadPoolWorker::kDefaultStackSize) {
+ ThreadPool* pool = new ThreadPool(name, num_threads, create_peers, worker_stack_size);
+ pool->CreateThreads();
+ return pool;
+ }
+
+ void AddTask(Thread* self, Task* task) REQUIRES(!task_queue_lock_) override;
+ size_t GetTaskCount(Thread* self) REQUIRES(!task_queue_lock_) override;
+ void RemoveAllTasks(Thread* self) REQUIRES(!task_queue_lock_) override;
+ ~ThreadPool() override;
+
+ protected:
+ Task* TryGetTaskLocked() REQUIRES(task_queue_lock_) override;
+
+ bool HasOutstandingTasks() const REQUIRES(task_queue_lock_) override {
+ return started_ && !tasks_.empty();
+ }
+
+ private:
+ ThreadPool(const char* name,
+ size_t num_threads,
+ bool create_peers,
+ size_t worker_stack_size)
+ : AbstractThreadPool(name, num_threads, create_peers, worker_stack_size) {}
+
+ std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_);
+
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};
diff --git a/runtime/thread_pool_test.cc b/runtime/thread_pool_test.cc
index 9e7c44a..db2cef0 100644
--- a/runtime/thread_pool_test.cc
+++ b/runtime/thread_pool_test.cc
@@ -61,65 +61,68 @@
// Check that the thread pool actually runs tasks that you assign it.
TEST_F(ThreadPoolTest, CheckRun) {
Thread* self = Thread::Current();
- ThreadPool thread_pool("Thread pool test thread pool", num_threads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Thread pool test thread pool", num_threads));
AtomicInteger count(0);
static const int32_t num_tasks = num_threads * 4;
for (int32_t i = 0; i < num_tasks; ++i) {
- thread_pool.AddTask(self, new CountTask(&count));
+ thread_pool->AddTask(self, new CountTask(&count));
}
- thread_pool.StartWorkers(self);
+ thread_pool->StartWorkers(self);
// Wait for tasks to complete.
- thread_pool.Wait(self, true, false);
+ thread_pool->Wait(self, true, false);
// Make sure that we finished all the work.
EXPECT_EQ(num_tasks, count.load(std::memory_order_seq_cst));
}
TEST_F(ThreadPoolTest, StopStart) {
Thread* self = Thread::Current();
- ThreadPool thread_pool("Thread pool test thread pool", num_threads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Thread pool test thread pool", num_threads));
AtomicInteger count(0);
static const int32_t num_tasks = num_threads * 4;
for (int32_t i = 0; i < num_tasks; ++i) {
- thread_pool.AddTask(self, new CountTask(&count));
+ thread_pool->AddTask(self, new CountTask(&count));
}
usleep(200);
// Check that no threads started prematurely.
EXPECT_EQ(0, count.load(std::memory_order_seq_cst));
// Signal the threads to start processing tasks.
- thread_pool.StartWorkers(self);
+ thread_pool->StartWorkers(self);
usleep(200);
- thread_pool.StopWorkers(self);
+ thread_pool->StopWorkers(self);
AtomicInteger bad_count(0);
- thread_pool.AddTask(self, new CountTask(&bad_count));
+ thread_pool->AddTask(self, new CountTask(&bad_count));
usleep(200);
// Ensure that the task added after the workers were stopped doesn't get run.
EXPECT_EQ(0, bad_count.load(std::memory_order_seq_cst));
// Allow tasks to finish up and delete themselves.
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, false, false);
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, false, false);
}
TEST_F(ThreadPoolTest, StopWait) {
Thread* self = Thread::Current();
- ThreadPool thread_pool("Thread pool test thread pool", num_threads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Thread pool test thread pool", num_threads));
AtomicInteger count(0);
static const int32_t num_tasks = num_threads * 100;
for (int32_t i = 0; i < num_tasks; ++i) {
- thread_pool.AddTask(self, new CountTask(&count));
+ thread_pool->AddTask(self, new CountTask(&count));
}
// Signal the threads to start processing tasks.
- thread_pool.StartWorkers(self);
+ thread_pool->StartWorkers(self);
usleep(200);
- thread_pool.StopWorkers(self);
+ thread_pool->StopWorkers(self);
- thread_pool.Wait(self, false, false); // We should not deadlock here.
+ thread_pool->Wait(self, false, false); // We should not deadlock here.
// Drain the task list. Note: we have to restart here, as no tasks will be finished when
// the pool is stopped.
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, /* do_work= */ true, false);
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, /* do_work= */ true, false);
}
class TreeTask : public Task {
@@ -151,12 +154,13 @@
// Test that adding new tasks from within a task works.
TEST_F(ThreadPoolTest, RecursiveTest) {
Thread* self = Thread::Current();
- ThreadPool thread_pool("Thread pool test thread pool", num_threads);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Thread pool test thread pool", num_threads));
AtomicInteger count(0);
static const int depth = 8;
- thread_pool.AddTask(self, new TreeTask(&thread_pool, &count, depth));
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, true, false);
+ thread_pool->AddTask(self, new TreeTask(thread_pool.get(), &count, depth));
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, true, false);
EXPECT_EQ((1 << depth) - 1, count.load(std::memory_order_seq_cst));
}
@@ -192,10 +196,11 @@
TEST_F(ThreadPoolTest, PeerTest) {
Thread* self = Thread::Current();
{
- ThreadPool thread_pool("Thread pool test thread pool", 1);
- thread_pool.AddTask(self, new NoPeerTask());
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, false, false);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Thread pool test thread pool", 1));
+ thread_pool->AddTask(self, new NoPeerTask());
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, false, false);
}
{
@@ -204,10 +209,11 @@
bool started = runtime_->Start();
ASSERT_TRUE(started);
- ThreadPool thread_pool("Thread pool test thread pool", 1, true);
- thread_pool.AddTask(self, new PeerTask());
- thread_pool.StartWorkers(self);
- thread_pool.Wait(self, false, false);
+ std::unique_ptr<ThreadPool> thread_pool(
+ ThreadPool::Create("Thread pool test thread pool", 1, true));
+ thread_pool->AddTask(self, new PeerTask());
+ thread_pool->StartWorkers(self);
+ thread_pool->Wait(self, false, false);
}
}
diff --git a/runtime/trace.cc b/runtime/trace.cc
index aa485be..89f9a52 100644
--- a/runtime/trace.cc
+++ b/runtime/trace.cc
@@ -809,7 +809,7 @@
// to stop and start this thread pool. Method tracing on zygote isn't a frequent use case and
// it is okay to flush on the main thread in such cases.
if (!Runtime::Current()->IsZygote()) {
- thread_pool_.reset(new ThreadPool("Trace writer pool", 1));
+ thread_pool_.reset(ThreadPool::Create("Trace writer pool", 1));
thread_pool_->StartWorkers(Thread::Current());
}
}