diff options
| author | 2013-08-21 23:38:52 +0000 | |
|---|---|---|
| committer | 2013-08-21 23:38:52 +0000 | |
| commit | ee23f85dc66e651a1a220a612d3407689b8de5e8 (patch) | |
| tree | cb6e257982a1f7d19ba99bff3a9132f3eb53f503 | |
| parent | dd3413f4a1a4fcf9fd62b4660805a4ce64b22bd6 (diff) | |
| parent | 2775ee4f82dff260663ca16adddc0b15327aaa42 (diff) | |
Merge "Add more runtime options." into dalvik-dev
| -rw-r--r-- | runtime/gc/collector/mark_sweep.cc | 53 | ||||
| -rw-r--r-- | runtime/gc/collector/mark_sweep.h | 6 | ||||
| -rw-r--r-- | runtime/gc/heap.cc | 78 | ||||
| -rw-r--r-- | runtime/gc/heap.h | 38 | ||||
| -rw-r--r-- | runtime/runtime.cc | 33 | ||||
| -rw-r--r-- | runtime/runtime.h | 6 | ||||
| -rw-r--r-- | runtime/thread_pool.cc | 23 | ||||
| -rw-r--r-- | runtime/thread_pool.h | 5 |
8 files changed, 172 insertions, 70 deletions
diff --git a/runtime/gc/collector/mark_sweep.cc b/runtime/gc/collector/mark_sweep.cc index fd82e2c87c..cedea61bda 100644 --- a/runtime/gc/collector/mark_sweep.cc +++ b/runtime/gc/collector/mark_sweep.cc @@ -742,11 +742,23 @@ class CardScanTask : public MarkStackTask<false> { } }; +size_t MarkSweep::GetThreadCount(bool paused) const { + if (heap_->GetThreadPool() == nullptr || !heap_->CareAboutPauseTimes()) { + return 0; + } + if (paused) { + return heap_->GetParallelGCThreadCount() + 1; + } else { + return heap_->GetConcGCThreadCount() + 1; + } +} + void MarkSweep::ScanGrayObjects(bool paused, byte minimum_age) { accounting::CardTable* card_table = GetHeap()->GetCardTable(); ThreadPool* thread_pool = GetHeap()->GetThreadPool(); - const bool parallel = kParallelCardScan && thread_pool != nullptr; - if (parallel) { + size_t thread_count = GetThreadCount(paused); + // The parallel version with only one thread is faster for card scanning, TODO: fix. + if (kParallelCardScan && thread_count > 0) { Thread* self = Thread::Current(); // Can't have a different split for each space since multiple spaces can have their cards being // scanned at the same time. @@ -755,7 +767,6 @@ void MarkSweep::ScanGrayObjects(bool paused, byte minimum_age) { const Object** mark_stack_begin = const_cast<const Object**>(mark_stack_->Begin()); const Object** mark_stack_end = const_cast<const Object**>(mark_stack_->End()); const size_t mark_stack_size = mark_stack_end - mark_stack_begin; - const size_t thread_count = thread_pool->GetThreadCount() + 1; // Estimated number of work tasks we will create. const size_t mark_stack_tasks = GetHeap()->GetContinuousSpaces().size() * thread_count; DCHECK_NE(mark_stack_tasks, 0U); @@ -788,8 +799,9 @@ void MarkSweep::ScanGrayObjects(bool paused, byte minimum_age) { card_begin += card_increment; } } + thread_pool->SetMaxActiveWorkers(thread_count - 1); thread_pool->StartWorkers(self); - thread_pool->Wait(self, paused, true); // Only do work in the main thread if we are paused. + thread_pool->Wait(self, true, true); thread_pool->StopWorkers(self); timings_.EndSplit(); } else { @@ -885,7 +897,8 @@ void MarkSweep::RecursiveMark() { ScanObjectVisitor scan_visitor(this); auto* self = Thread::Current(); ThreadPool* thread_pool = heap_->GetThreadPool(); - const bool parallel = kParallelRecursiveMark && thread_pool != NULL; + size_t thread_count = GetThreadCount(false); + const bool parallel = kParallelRecursiveMark && thread_count > 1; mark_stack_->Reset(); for (const auto& space : GetHeap()->GetContinuousSpaces()) { if ((space->GetGcRetentionPolicy() == space::kGcRetentionPolicyAlwaysCollect) || @@ -904,7 +917,7 @@ void MarkSweep::RecursiveMark() { atomic_finger_ = static_cast<int32_t>(0xFFFFFFFF); // Create a few worker tasks. - size_t n = (thread_pool->GetThreadCount() + 1) * 2; + const size_t n = thread_count * 2; while (begin != end) { uintptr_t start = begin; uintptr_t delta = (end - begin) / n; @@ -915,8 +928,9 @@ void MarkSweep::RecursiveMark() { begin); thread_pool->AddTask(self, task); } + thread_pool->SetMaxActiveWorkers(thread_count - 1); thread_pool->StartWorkers(self); - thread_pool->Wait(self, false, true); + thread_pool->Wait(self, true, true); thread_pool->StopWorkers(self); } else { // This function does not handle heap end increasing, so we must use the space end. @@ -1369,13 +1383,11 @@ void MarkSweep::ScanObject(const Object* obj) { ScanObjectVisit(obj, visitor); } -void MarkSweep::ProcessMarkStackParallel(bool paused) { +void MarkSweep::ProcessMarkStackParallel(size_t thread_count) { Thread* self = Thread::Current(); ThreadPool* thread_pool = GetHeap()->GetThreadPool(); - const size_t num_threads = thread_pool->GetThreadCount(); - const size_t chunk_size = - std::min(mark_stack_->Size() / num_threads + 1, - static_cast<size_t>(MarkStackTask<false>::kMaxSize)); + const size_t chunk_size = std::min(mark_stack_->Size() / thread_count + 1, + static_cast<size_t>(MarkStackTask<false>::kMaxSize)); CHECK_GT(chunk_size, 0U); // Split the current mark stack up into work tasks. for (mirror::Object **it = mark_stack_->Begin(), **end = mark_stack_->End(); it < end; ) { @@ -1384,10 +1396,9 @@ void MarkSweep::ProcessMarkStackParallel(bool paused) { const_cast<const mirror::Object**>(it))); it += delta; } + thread_pool->SetMaxActiveWorkers(thread_count - 1); thread_pool->StartWorkers(self); - // Don't do work in the main thread since it assumed at least one other thread will require CPU - // time during the GC. - thread_pool->Wait(self, paused, true); + thread_pool->Wait(self, true, true); thread_pool->StopWorkers(self); mark_stack_->Reset(); CHECK_EQ(work_chunks_created_, work_chunks_deleted_) << " some of the work chunks were leaked"; @@ -1396,10 +1407,10 @@ void MarkSweep::ProcessMarkStackParallel(bool paused) { // Scan anything that's on the mark stack. void MarkSweep::ProcessMarkStack(bool paused) { timings_.StartSplit("ProcessMarkStack"); - const bool parallel = kParallelProcessMarkStack && GetHeap()->GetThreadPool() && - mark_stack_->Size() >= kMinimumParallelMarkStackSize; - if (parallel) { - ProcessMarkStackParallel(paused); + size_t thread_count = GetThreadCount(paused); + if (kParallelProcessMarkStack && thread_count > 1 && + mark_stack_->Size() >= kMinimumParallelMarkStackSize) { + ProcessMarkStackParallel(thread_count); } else { // TODO: Tune this. static const size_t kFifoSize = 4; @@ -1610,8 +1621,8 @@ void MarkSweep::FinishPhase() { total_time_ns_ += GetDurationNs(); total_paused_time_ns_ += std::accumulate(GetPauseTimes().begin(), GetPauseTimes().end(), 0, std::plus<uint64_t>()); - total_freed_objects_ += GetFreedObjects(); - total_freed_bytes_ += GetFreedBytes(); + total_freed_objects_ += GetFreedObjects() + GetFreedLargeObjects(); + total_freed_bytes_ += GetFreedBytes() + GetFreedLargeObjectBytes(); // Ensure that the mark stack is empty. CHECK(mark_stack_->IsEmpty()); diff --git a/runtime/gc/collector/mark_sweep.h b/runtime/gc/collector/mark_sweep.h index 8430839e8a..dbec3e9064 100644 --- a/runtime/gc/collector/mark_sweep.h +++ b/runtime/gc/collector/mark_sweep.h @@ -308,6 +308,10 @@ class MarkSweep : public GarbageCollector { // Expand mark stack to 2x its current size. Thread safe. void ExpandMarkStack(); + // Returns how many threads we should use for the current GC phase based on if we are paused, + // whether or not we care about pauses. + size_t GetThreadCount(bool paused) const; + // Returns true if an object is inside of the immune region (assumed to be marked). bool IsImmune(const mirror::Object* obj) const { return obj >= immune_begin_ && obj < immune_end_; @@ -367,7 +371,7 @@ class MarkSweep : public GarbageCollector { EXCLUSIVE_LOCKS_REQUIRED(Locks::heap_bitmap_lock_) SHARED_LOCKS_REQUIRED(Locks::mutator_lock_); - void ProcessMarkStackParallel(bool paused) + void ProcessMarkStackParallel(size_t thread_count) EXCLUSIVE_LOCKS_REQUIRED(Locks::heap_bitmap_lock_) SHARED_LOCKS_REQUIRED(Locks::mutator_lock_); diff --git a/runtime/gc/heap.cc b/runtime/gc/heap.cc index 800159a73f..e20c2c5f05 100644 --- a/runtime/gc/heap.cc +++ b/runtime/gc/heap.cc @@ -58,10 +58,6 @@ namespace art { namespace gc { -// When to create a log message about a slow GC, 100ms. -static constexpr uint64_t kSlowGcThreshold = MsToNs(100); -// When to create a log message about a long pause, 5ms. -static constexpr uint64_t kLongGcPauseThreshold = MsToNs(5); static constexpr bool kGCALotMode = false; static constexpr size_t kGcAlotInterval = KB; static constexpr bool kDumpGcPerformanceOnShutdown = false; @@ -72,12 +68,18 @@ static constexpr bool kMeasureAllocationTime = false; Heap::Heap(size_t initial_size, size_t growth_limit, size_t min_free, size_t max_free, double target_utilization, size_t capacity, const std::string& original_image_file_name, - bool concurrent_gc, size_t num_gc_threads, bool low_memory_mode) + bool concurrent_gc, size_t parallel_gc_threads, size_t conc_gc_threads, + bool low_memory_mode, size_t long_pause_log_threshold, size_t long_gc_log_threshold, + bool ignore_max_footprint) : alloc_space_(NULL), card_table_(NULL), concurrent_gc_(concurrent_gc), - num_gc_threads_(num_gc_threads), + parallel_gc_threads_(parallel_gc_threads), + conc_gc_threads_(conc_gc_threads), low_memory_mode_(low_memory_mode), + long_pause_log_threshold_(long_pause_log_threshold), + long_gc_log_threshold_(long_gc_log_threshold), + ignore_max_footprint_(ignore_max_footprint), have_zygote_space_(false), soft_ref_queue_lock_(NULL), weak_ref_queue_lock_(NULL), @@ -230,6 +232,11 @@ Heap::Heap(size_t initial_size, size_t growth_limit, size_t min_free, size_t max last_gc_time_ns_ = NanoTime(); last_gc_size_ = GetBytesAllocated(); + if (ignore_max_footprint_) { + SetIdealFootprint(std::numeric_limits<size_t>::max()); + concurrent_start_bytes_ = max_allowed_footprint_; + } + // Create our garbage collectors. for (size_t i = 0; i < 2; ++i) { const bool concurrent = i != 0; @@ -245,13 +252,14 @@ Heap::Heap(size_t initial_size, size_t growth_limit, size_t min_free, size_t max } void Heap::CreateThreadPool() { - if (num_gc_threads_ != 0) { - thread_pool_.reset(new ThreadPool(num_gc_threads_)); + const size_t num_threads = std::max(parallel_gc_threads_, conc_gc_threads_); + if (num_threads != 0) { + thread_pool_.reset(new ThreadPool(num_threads)); } } void Heap::DeleteThreadPool() { - thread_pool_.reset(NULL); + thread_pool_.reset(nullptr); } static bool ReadStaticInt(JNIEnvExt* env, jclass clz, const char* name, int* out_value) { @@ -1249,11 +1257,11 @@ collector::GcType Heap::CollectGarbageInternal(collector::GcType gc_type, GcCaus const size_t duration = collector->GetDurationNs(); std::vector<uint64_t> pauses = collector->GetPauseTimes(); // GC for alloc pauses the allocating thread, so consider it as a pause. - bool was_slow = duration > kSlowGcThreshold || - (gc_cause == kGcCauseForAlloc && duration > kLongGcPauseThreshold); + bool was_slow = duration > long_gc_log_threshold_ || + (gc_cause == kGcCauseForAlloc && duration > long_pause_log_threshold_); if (!was_slow) { for (uint64_t pause : pauses) { - was_slow = was_slow || pause > kLongGcPauseThreshold; + was_slow = was_slow || pause > long_pause_log_threshold_; } } @@ -1702,7 +1710,7 @@ collector::GcType Heap::WaitForConcurrentGcToComplete(Thread* self) { wait_time = NanoTime() - wait_start; total_wait_time_ += wait_time; } - if (wait_time > kLongGcPauseThreshold) { + if (wait_time > long_pause_log_threshold_) { LOG(INFO) << "WaitForConcurrentGcToComplete blocked for " << PrettyDuration(wait_time); } } @@ -1776,28 +1784,32 @@ void Heap::GrowForUtilization(collector::GcType gc_type, uint64_t gc_duration) { target_size = std::max(bytes_allocated, max_allowed_footprint_); } } - SetIdealFootprint(target_size); - // Calculate when to perform the next ConcurrentGC. - if (concurrent_gc_) { - // Calculate the estimated GC duration. - double gc_duration_seconds = NsToMs(gc_duration) / 1000.0; - // Estimate how many remaining bytes we will have when we need to start the next GC. - size_t remaining_bytes = allocation_rate_ * gc_duration_seconds; - remaining_bytes = std::max(remaining_bytes, kMinConcurrentRemainingBytes); - if (UNLIKELY(remaining_bytes > max_allowed_footprint_)) { - // A never going to happen situation that from the estimated allocation rate we will exceed - // the applications entire footprint with the given estimated allocation rate. Schedule - // another GC straight away. - concurrent_start_bytes_ = bytes_allocated; - } else { - // Start a concurrent GC when we get close to the estimated remaining bytes. When the - // allocation rate is very high, remaining_bytes could tell us that we should start a GC - // right away. - concurrent_start_bytes_ = std::max(max_allowed_footprint_ - remaining_bytes, bytes_allocated); + if (!ignore_max_footprint_) { + SetIdealFootprint(target_size); + + if (concurrent_gc_) { + // Calculate when to perform the next ConcurrentGC. + + // Calculate the estimated GC duration. + double gc_duration_seconds = NsToMs(gc_duration) / 1000.0; + // Estimate how many remaining bytes we will have when we need to start the next GC. + size_t remaining_bytes = allocation_rate_ * gc_duration_seconds; + remaining_bytes = std::max(remaining_bytes, kMinConcurrentRemainingBytes); + if (UNLIKELY(remaining_bytes > max_allowed_footprint_)) { + // A never going to happen situation that from the estimated allocation rate we will exceed + // the applications entire footprint with the given estimated allocation rate. Schedule + // another GC straight away. + concurrent_start_bytes_ = bytes_allocated; + } else { + // Start a concurrent GC when we get close to the estimated remaining bytes. When the + // allocation rate is very high, remaining_bytes could tell us that we should start a GC + // right away. + concurrent_start_bytes_ = std::max(max_allowed_footprint_ - remaining_bytes, bytes_allocated); + } + DCHECK_LE(concurrent_start_bytes_, max_allowed_footprint_); + DCHECK_LE(max_allowed_footprint_, growth_limit_); } - DCHECK_LE(concurrent_start_bytes_, max_allowed_footprint_); - DCHECK_LE(max_allowed_footprint_, growth_limit_); } UpdateMaxNativeFootprint(); diff --git a/runtime/gc/heap.h b/runtime/gc/heap.h index cda252e81c..c93dacb8fb 100644 --- a/runtime/gc/heap.h +++ b/runtime/gc/heap.h @@ -107,6 +107,8 @@ class Heap { static constexpr size_t kDefaultMaximumSize = 32 * MB; static constexpr size_t kDefaultMaxFree = 2 * MB; static constexpr size_t kDefaultMinFree = kDefaultMaxFree / 4; + static constexpr size_t kDefaultLongPauseLogThreshold = MsToNs(5); + static constexpr size_t kDefaultLongGCLogThreshold = MsToNs(100); // Default target utilization. static constexpr double kDefaultTargetUtilization = 0.5; @@ -120,7 +122,8 @@ class Heap { explicit Heap(size_t initial_size, size_t growth_limit, size_t min_free, size_t max_free, double target_utilization, size_t capacity, const std::string& original_image_file_name, bool concurrent_gc, - size_t num_gc_threads, bool low_memory_mode); + size_t parallel_gc_threads, size_t conc_gc_threads, bool low_memory_mode, + size_t long_pause_threshold, size_t long_gc_threshold, bool ignore_max_footprint); ~Heap(); @@ -401,12 +404,23 @@ class Heap { // GC performance measuring void DumpGcPerformanceInfo(std::ostream& os); + // Returns true if we currently care about pause times. + bool CareAboutPauseTimes() const { + return care_about_pause_times_; + } + // Thread pool. void CreateThreadPool(); void DeleteThreadPool(); ThreadPool* GetThreadPool() { return thread_pool_.get(); } + size_t GetParallelGCThreadCount() const { + return parallel_gc_threads_; + } + size_t GetConcGCThreadCount() const { + return conc_gc_threads_; + } private: // Allocates uninitialized storage. Passing in a null space tries to place the object in the @@ -514,12 +528,26 @@ class Heap { // false for stop-the-world mark sweep. const bool concurrent_gc_; - // How many GC threads we may use for garbage collection. - const size_t num_gc_threads_; + // How many GC threads we may use for paused parts of garbage collection. + const size_t parallel_gc_threads_; + + // How many GC threads we may use for unpaused parts of garbage collection. + const size_t conc_gc_threads_; // Boolean for if we are in low memory mode. const bool low_memory_mode_; + // If we get a pause longer than long pause log threshold, then we print out the GC after it + // finishes. + const size_t long_pause_log_threshold_; + + // If we get a GC longer than long GC log threshold, then we print out the GC after it finishes. + const size_t long_gc_log_threshold_; + + // If we ignore the max footprint it lets the heap grow until it hits the heap capacity, this is + // useful for benchmarking since it reduces time spent in GC to a low %. + const bool ignore_max_footprint_; + // If we have a zygote space. bool have_zygote_space_; @@ -544,14 +572,18 @@ class Heap { // Maximum size that the heap can reach. const size_t capacity_; + // The size the heap is limited to. This is initially smaller than capacity, but for largeHeap // programs it is "cleared" making it the same as capacity. size_t growth_limit_; + // When the number of bytes allocated exceeds the footprint TryAllocate returns NULL indicating // a GC should be triggered. size_t max_allowed_footprint_; + // The watermark at which a concurrent GC is requested by registerNativeAllocation. size_t native_footprint_gc_watermark_; + // The watermark at which a GC is performed inside of registerNativeAllocation. size_t native_footprint_limit_; diff --git a/runtime/runtime.cc b/runtime/runtime.cc index 65bd495995..51a67c1bf4 100644 --- a/runtime/runtime.cc +++ b/runtime/runtime.cc @@ -339,7 +339,9 @@ Runtime::ParsedOptions* Runtime::ParsedOptions::Create(const Options& options, b parsed->heap_target_utilization_ = gc::Heap::kDefaultTargetUtilization; parsed->heap_growth_limit_ = 0; // 0 means no growth limit. // Default to number of processors minus one since the main GC thread also does work. - parsed->heap_gc_threads_ = sysconf(_SC_NPROCESSORS_CONF) - 1; + parsed->parallel_gc_threads_ = sysconf(_SC_NPROCESSORS_CONF) - 1; + // Only the main GC thread, no workers. + parsed->conc_gc_threads_ = 0; parsed->stack_size_ = 0; // 0 means default. parsed->low_memory_mode_ = false; @@ -349,6 +351,10 @@ Runtime::ParsedOptions* Runtime::ParsedOptions::Create(const Options& options, b parsed->is_concurrent_gc_enabled_ = true; parsed->is_explicit_gc_disabled_ = false; + parsed->long_pause_log_threshold_ = gc::Heap::kDefaultLongPauseLogThreshold; + parsed->long_gc_log_threshold_ = gc::Heap::kDefaultLongGCLogThreshold; + parsed->ignore_max_footprint_ = false; + parsed->lock_profiling_threshold_ = 0; parsed->hook_is_sensitive_thread_ = NULL; @@ -480,9 +486,12 @@ Runtime::ParsedOptions* Runtime::ParsedOptions::Create(const Options& options, b return NULL; } parsed->heap_target_utilization_ = value; - } else if (StartsWith(option, "-XX:HeapGCThreads=")) { - parsed->heap_gc_threads_ = - ParseMemoryOption(option.substr(strlen("-XX:HeapGCThreads=")).c_str(), 1024); + } else if (StartsWith(option, "-XX:ParallelGCThreads=")) { + parsed->parallel_gc_threads_ = + ParseMemoryOption(option.substr(strlen("-XX:ParallelGCThreads=")).c_str(), 1024); + } else if (StartsWith(option, "-XX:ConcGCThreads=")) { + parsed->conc_gc_threads_ = + ParseMemoryOption(option.substr(strlen("-XX:ConcGCThreads=")).c_str(), 1024); } else if (StartsWith(option, "-Xss")) { size_t size = ParseMemoryOption(option.substr(strlen("-Xss")).c_str(), 1); if (size == 0) { @@ -494,6 +503,14 @@ Runtime::ParsedOptions* Runtime::ParsedOptions::Create(const Options& options, b return NULL; } parsed->stack_size_ = size; + } else if (option == "-XX:LongPauseLogThreshold") { + parsed->long_pause_log_threshold_ = + ParseMemoryOption(option.substr(strlen("-XX:LongPauseLogThreshold=")).c_str(), 1024); + } else if (option == "-XX:LongGCLogThreshold") { + parsed->long_gc_log_threshold_ = + ParseMemoryOption(option.substr(strlen("-XX:LongGCLogThreshold")).c_str(), 1024); + } else if (option == "-XX:IgnoreMaxFootprint") { + parsed->ignore_max_footprint_ = true; } else if (option == "-XX:LowMemoryMode") { parsed->low_memory_mode_ = true; } else if (StartsWith(option, "-D")) { @@ -865,8 +882,12 @@ bool Runtime::Init(const Options& raw_options, bool ignore_unrecognized) { options->heap_maximum_size_, options->image_, options->is_concurrent_gc_enabled_, - options->heap_gc_threads_, - options->low_memory_mode_); + options->parallel_gc_threads_, + options->conc_gc_threads_, + options->low_memory_mode_, + options->long_pause_log_threshold_, + options->long_gc_log_threshold_, + options->ignore_max_footprint_); BlockSignals(); InitPlatformSignalHandlers(); diff --git a/runtime/runtime.h b/runtime/runtime.h index be29a865a4..50108ac4e2 100644 --- a/runtime/runtime.h +++ b/runtime/runtime.h @@ -100,13 +100,17 @@ class Runtime { bool interpreter_only_; bool is_concurrent_gc_enabled_; bool is_explicit_gc_disabled_; + size_t long_pause_log_threshold_; + size_t long_gc_log_threshold_; + bool ignore_max_footprint_; size_t heap_initial_size_; size_t heap_maximum_size_; size_t heap_growth_limit_; - size_t heap_gc_threads_; size_t heap_min_free_; size_t heap_max_free_; double heap_target_utilization_; + size_t parallel_gc_threads_; + size_t conc_gc_threads_; size_t stack_size_; bool low_memory_mode_; size_t lock_profiling_threshold_; diff --git a/runtime/thread_pool.cc b/runtime/thread_pool.cc index 39d30bb24d..674ab9d9ae 100644 --- a/runtime/thread_pool.cc +++ b/runtime/thread_pool.cc @@ -81,7 +81,8 @@ ThreadPool::ThreadPool(size_t num_threads) start_time_(0), total_wait_time_(0), // Add one since the caller of constructor waits on the barrier too. - creation_barier_(num_threads + 1) { + creation_barier_(num_threads + 1), + max_active_workers_(num_threads) { Thread* self = Thread::Current(); while (GetThreadCount() < num_threads) { const std::string name = StringPrintf("Thread pool worker %zu", GetThreadCount()); @@ -91,6 +92,12 @@ ThreadPool::ThreadPool(size_t num_threads) creation_barier_.Wait(self); } +void ThreadPool::SetMaxActiveWorkers(size_t threads) { + MutexLock mu(Thread::Current(), task_queue_lock_); + CHECK_LE(threads, GetThreadCount()); + max_active_workers_ = threads; +} + ThreadPool::~ThreadPool() { { Thread* self = Thread::Current(); @@ -121,12 +128,18 @@ void ThreadPool::StopWorkers(Thread* self) { Task* ThreadPool::GetTask(Thread* self) { MutexLock mu(self, task_queue_lock_); while (!IsShuttingDown()) { - Task* task = TryGetTaskLocked(self); - if (task != NULL) { - return task; + const size_t thread_count = GetThreadCount(); + // Ensure that we don't use more threads than the maximum active workers. + const size_t active_threads = thread_count - waiting_count_; + // <= since self is considered an active worker. + if (active_threads <= max_active_workers_) { + Task* task = TryGetTaskLocked(self); + if (task != NULL) { + return task; + } } - waiting_count_++; + ++waiting_count_; if (waiting_count_ == GetThreadCount() && tasks_.empty()) { // We may be done, lets broadcast to the completion condition. completion_condition_.Broadcast(self); diff --git a/runtime/thread_pool.h b/runtime/thread_pool.h index 9c6d47b0a9..b9a97a1427 100644 --- a/runtime/thread_pool.h +++ b/runtime/thread_pool.h @@ -90,6 +90,10 @@ class ThreadPool { return total_wait_time_; } + // Provides a way to bound the maximum number of worker threads, threads must be less the the + // thread count of the thread pool. + void SetMaxActiveWorkers(size_t threads); + protected: // Get a task to run, blocks if there are no tasks left virtual Task* GetTask(Thread* self); @@ -117,6 +121,7 @@ class ThreadPool { uint64_t start_time_ GUARDED_BY(task_queue_lock_); uint64_t total_wait_time_; Barrier creation_barier_; + size_t max_active_workers_ GUARDED_BY(task_queue_lock_); private: friend class ThreadPoolWorker; |