diff options
| -rw-r--r-- | runtime/thread.cc | 1 | ||||
| -rw-r--r-- | runtime/thread.h | 8 | ||||
| -rw-r--r-- | runtime/thread_pool.h | 2 | ||||
| -rw-r--r-- | runtime/trace.cc | 146 | ||||
| -rw-r--r-- | runtime/trace.h | 44 |
5 files changed, 167 insertions, 34 deletions
diff --git a/runtime/thread.cc b/runtime/thread.cc index ee79281731..18bbdcf661 100644 --- a/runtime/thread.cc +++ b/runtime/thread.cc @@ -2607,7 +2607,6 @@ void Thread::Destroy(bool should_run_callbacks) { if (UNLIKELY(self->GetMethodTraceBuffer() != nullptr)) { Trace::FlushThreadBuffer(self); - self->ResetMethodTraceBuffer(); } // this.nativePeer = 0; diff --git a/runtime/thread.h b/runtime/thread.h index 6de1e26615..6f46faa83a 100644 --- a/runtime/thread.h +++ b/runtime/thread.h @@ -1350,14 +1350,6 @@ class EXPORT Thread { return tlsPtr_.method_trace_buffer = buffer; } - void ResetMethodTraceBuffer() { - if (tlsPtr_.method_trace_buffer != nullptr) { - delete[] tlsPtr_.method_trace_buffer; - } - tlsPtr_.method_trace_buffer = nullptr; - tlsPtr_.method_trace_buffer_index = 0; - } - uint64_t GetTraceClockBase() const { return tls64_.trace_clock_base; } diff --git a/runtime/thread_pool.h b/runtime/thread_pool.h index 3ea17a1a5b..9c0cbb03f1 100644 --- a/runtime/thread_pool.h +++ b/runtime/thread_pool.h @@ -238,13 +238,13 @@ class ThreadPool : public AbstractThreadPool { return started_ && !tasks_.empty(); } - private: ThreadPool(const char* name, size_t num_threads, bool create_peers, size_t worker_stack_size) : AbstractThreadPool(name, num_threads, create_peers, worker_stack_size) {} + private: std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_); DISALLOW_COPY_AND_ASSIGN(ThreadPool); diff --git a/runtime/trace.cc b/runtime/trace.cc index 4327a820af..8f9cee8a28 100644 --- a/runtime/trace.cc +++ b/runtime/trace.cc @@ -68,6 +68,7 @@ static const uint16_t kTraceVersionSingleClock = 2; static const uint16_t kTraceVersionDualClock = 3; static const uint16_t kTraceRecordSizeSingleClock = 10; // using v2 static const uint16_t kTraceRecordSizeDualClock = 14; // using v3 with two timestamps +static const size_t kNumTracePoolBuffers = 32; TraceClockSource Trace::default_clock_source_ = kDefaultTraceClockSource; @@ -241,11 +242,14 @@ uint16_t TraceWriter::GetThreadEncoding(pid_t thread_id) { class TraceWriterTask final : public SelfDeletingTask { public: - TraceWriterTask(TraceWriter* trace_writer, uintptr_t* buffer, size_t cur_offset, size_t thread_id) + TraceWriterTask( + TraceWriter* trace_writer, int index, uintptr_t* buffer, size_t cur_offset, size_t thread_id) : trace_writer_(trace_writer), + index_(index), buffer_(buffer), cur_offset_(cur_offset), - thread_id_(thread_id) {} + thread_id_(thread_id), + reserve_buf_for_tid_(0) {} void Run(Thread* self ATTRIBUTE_UNUSED) override { std::unordered_map<ArtMethod*, std::string> method_infos; @@ -254,14 +258,36 @@ class TraceWriterTask final : public SelfDeletingTask { trace_writer_->PreProcessTraceForMethodInfos(buffer_, cur_offset_, method_infos); } trace_writer_->FlushBuffer(buffer_, cur_offset_, thread_id_, method_infos); - delete[] buffer_; + if (index_ == -1) { + // This was a temporary buffer we allocated since there are no more free buffers and we + // couldn't find one by flushing the pending tasks either. This should only happen when we + // have fewer buffers than the number of threads. + if (reserve_buf_for_tid_ == 0) { + // Just free the buffer here if it wasn't reserved for any thread. + free(buffer_); + } + } else { + trace_writer_->FetchTraceBufferForThread(index_, reserve_buf_for_tid_); + } + } + + // Reserves the buffer for a particular thread. The thread is free to use this buffer once the + // task has finished running. This is used when there are no free buffers for the thread to use. + uintptr_t* ReserveBufferForTid(size_t tid) { + reserve_buf_for_tid_ = tid; + return buffer_; } private: TraceWriter* trace_writer_; + int index_; uintptr_t* buffer_; size_t cur_offset_; size_t thread_id_; + // Sometimes we want to acquire a buffer for a particular thread. This holds + // the tid of the thread that we want to acquire the buffer for. If this value + // is 0 then it means we can use it for other threads. + size_t reserve_buf_for_tid_; }; std::vector<ArtMethod*>* Trace::AllocStackTrace() { @@ -679,8 +705,8 @@ void Trace::StopTracing(bool flush_entries) { // We may have pending requests to flush the data. So just enqueue a // request to flush the current buffer so all the requests are // processed in order. - the_trace->trace_writer_->FlushBuffer(thread, /* is_sync= */ false); - thread->ResetMethodTraceBuffer(); + the_trace->trace_writer_->FlushBuffer( + thread, /* is_sync= */ false, /* free_buffer= */ true); } } } @@ -705,7 +731,7 @@ void Trace::StopTracing(bool flush_entries) { void Trace::FlushThreadBuffer(Thread* self) { MutexLock mu(self, *Locks::trace_lock_); - the_trace_->trace_writer_->FlushBuffer(self, /* is_sync= */ false); + the_trace_->trace_writer_->FlushBuffer(self, /* is_sync= */ false, /* free_buffer= */ true); } void Trace::Abort() { @@ -771,6 +797,7 @@ TraceWriter::TraceWriter(File* trace_file, TraceOutputMode output_mode, TraceClockSource clock_source, size_t buffer_size, + int num_trace_buffers, uint32_t clock_overhead_ns) : trace_file_(trace_file), trace_output_mode_(output_mode), @@ -780,6 +807,7 @@ TraceWriter::TraceWriter(File* trace_file, start_time_(GetMicroTime(GetTimestamp())), overflow_(false), clock_overhead_ns_(clock_overhead_ns), + owner_tids_(num_trace_buffers), tracing_lock_("tracing lock", LockLevel::kTracingStreamingLock) { uint16_t trace_version = GetTraceVersion(clock_source_); if (output_mode == TraceOutputMode::kStreaming) { @@ -815,9 +843,12 @@ TraceWriter::TraceWriter(File* trace_file, // to stop and start this thread pool. Method tracing on zygote isn't a frequent use case and // it is okay to flush on the main thread in such cases. if (!Runtime::Current()->IsZygote()) { - thread_pool_.reset(ThreadPool::Create("Trace writer pool", 1)); + thread_pool_.reset(TraceWriterThreadPool::Create("Trace writer pool")); thread_pool_->StartWorkers(Thread::Current()); } + + // Initialize the pool of per-thread buffers. + InitializeTraceBuffers(); } Trace::Trace(File* trace_file, @@ -838,8 +869,12 @@ Trace::Trace(File* trace_file, size_t buf_size = (output_mode == TraceOutputMode::kStreaming) ? kPerThreadBufSize * kScalingFactorEncodedEntries : buffer_size; - trace_writer_.reset(new TraceWriter( - trace_file, output_mode, clock_source_, buf_size, GetClockOverheadNanoSeconds())); + trace_writer_.reset(new TraceWriter(trace_file, + output_mode, + clock_source_, + buf_size, + kNumTracePoolBuffers, + GetClockOverheadNanoSeconds())); } void TraceWriter::FinishTracing(int flags, bool flush_entries) { @@ -1054,6 +1089,24 @@ void Trace::ReadClocks(Thread* thread, uint32_t* thread_clock_diff, uint64_t* ti } } +uintptr_t* TraceWriterThreadPool::FinishTaskAndClaimBuffer(size_t tid) { + Thread* self = Thread::Current(); + TraceWriterTask* task = static_cast<TraceWriterTask*>(TryGetTask(self)); + if (task == nullptr) { + // TODO(mythria): We need to ensure we have at least as many buffers in the pool as the number + // of active threads for efficiency. It's a bit unlikely to hit this case and not trivial to + // handle this. So we haven't fixed this yet. + LOG(WARNING) + << "Fewer buffers in the pool than the number of threads. Might cause some slowdown"; + return nullptr; + } + + uintptr_t* buffer = task->ReserveBufferForTid(tid); + task->Run(self); + task->Finalize(); + return buffer; +} + std::string TraceWriter::GetMethodLine(const std::string& method_line, uint32_t method_index) { return StringPrintf("%#x\t%s", (method_index << TraceActionBits), method_line.c_str()); } @@ -1167,7 +1220,7 @@ void TraceWriter::FlushAllThreadBuffers() { MutexLock mu(Thread::Current(), *Locks::thread_list_lock_); for (Thread* thread : Runtime::Current()->GetThreadList()->GetList()) { if (thread->GetMethodTraceBuffer() != nullptr) { - FlushBuffer(thread, /* is_sync= */ true); + FlushBuffer(thread, /* is_sync= */ true, /* free_buffer= */ false); // We cannot flush anynore data, so just return. if (overflow_) { return; @@ -1181,7 +1234,7 @@ uintptr_t* TraceWriter::PrepareBufferForNewEntries(Thread* thread) { if (trace_output_mode_ == TraceOutputMode::kStreaming) { // In streaming mode, just flush the per-thread buffer and reuse the // existing buffer for new entries. - FlushBuffer(thread, /* is_sync= */ false); + FlushBuffer(thread, /* is_sync= */ false, /* free_buffer= */ false); DCHECK_EQ(overflow_, false); } else { // For non-streaming mode, flush all the threads to check if we have space in the common @@ -1194,7 +1247,52 @@ uintptr_t* TraceWriter::PrepareBufferForNewEntries(Thread* thread) { return thread->GetMethodTraceBuffer(); } -void TraceWriter::FlushBuffer(Thread* thread, bool is_sync) { +void TraceWriter::InitializeTraceBuffers() { + for (size_t i = 0; i < owner_tids_.size(); i++) { + owner_tids_[i].store(0); + } + + trace_buffer_.reset(reinterpret_cast<uintptr_t*>( + malloc(sizeof(uintptr_t) * kPerThreadBufSize * owner_tids_.size()))); + CHECK(trace_buffer_.get() != nullptr); +} + +uintptr_t* TraceWriter::AcquireTraceBuffer(size_t tid) { + for (size_t index = 0; index < owner_tids_.size(); index++) { + size_t owner = 0; + if (owner_tids_[index].compare_exchange_strong(owner, tid)) { + return trace_buffer_.get() + index * kPerThreadBufSize; + } + } + + // No free buffers, flush the buffer at the start of task queue synchronously and then use that + // buffer. + uintptr_t* buffer = thread_pool_->FinishTaskAndClaimBuffer(tid); + if (buffer == nullptr) { + // We couldn't find a free buffer even after flushing all the tasks. So allocate a new buffer + // here. This should only happen if we have more threads than the number of pool buffers. + // TODO(mythria): Add a check for the above case here. + buffer = reinterpret_cast<uintptr_t*>(malloc(sizeof(uintptr_t) * kPerThreadBufSize)); + CHECK(buffer != nullptr); + } + return buffer; +} + +void TraceWriter::FetchTraceBufferForThread(int index, size_t tid) { + // Only the trace_writer_ thread can release the buffer. + owner_tids_[index].store(tid); +} + +int TraceWriter::GetMethodTraceIndex(uintptr_t* current_buffer) { + if (current_buffer < trace_buffer_.get() || + current_buffer > trace_buffer_.get() + (owner_tids_.size() - 1) * kPerThreadBufSize) { + // This was the temporary buffer we allocated. + return -1; + } + return (current_buffer - trace_buffer_.get()) / kPerThreadBufSize; +} + +void TraceWriter::FlushBuffer(Thread* thread, bool is_sync, bool release) { uintptr_t* method_trace_entries = thread->GetMethodTraceBuffer(); size_t* current_offset = thread->GetMethodTraceIndexPtr(); size_t tid = thread->GetTid(); @@ -1209,17 +1307,22 @@ void TraceWriter::FlushBuffer(Thread* thread, bool is_sync) { // when the tracing has finished or in non-streaming mode. // Just reset the buffer pointer to the initial value, so we can reuse the same buffer. *current_offset = kPerThreadBufSize; + if (release) { + thread->SetMethodTraceBuffer(nullptr); + } } else { - // The TraceWriterTask takes the ownership of the buffer and delets the buffer once the + int old_index = GetMethodTraceIndex(method_trace_entries); + // The TraceWriterTask takes the ownership of the buffer and releases the buffer once the // entries are flushed. - thread_pool_->AddTask(Thread::Current(), - new TraceWriterTask(this, method_trace_entries, *current_offset, tid)); - - // Create a new buffer and update the per-thread buffer so we don't have to wait for the - // flushing to finish. - uintptr_t* method_trace_buffer = new uintptr_t[std::max(kMinBufSize, kPerThreadBufSize)](); - thread->SetMethodTraceBuffer(method_trace_buffer); + thread_pool_->AddTask( + Thread::Current(), + new TraceWriterTask(this, old_index, method_trace_entries, *current_offset, tid)); *current_offset = kPerThreadBufSize; + if (release) { + thread->SetMethodTraceBuffer(nullptr); + } else { + thread->SetMethodTraceBuffer(AcquireTraceBuffer(tid)); + } } return; @@ -1330,7 +1433,8 @@ void Trace::LogMethodTraceEvent(Thread* thread, size_t* current_index = thread->GetMethodTraceIndexPtr(); // Initialize the buffer lazily. It's just simpler to keep the creation at one place. if (method_trace_buffer == nullptr) { - method_trace_buffer = new uintptr_t[std::max(kMinBufSize, kPerThreadBufSize)](); + method_trace_buffer = trace_writer_->AcquireTraceBuffer(thread->GetTid()); + DCHECK(method_trace_buffer != nullptr); thread->SetMethodTraceBuffer(method_trace_buffer); *current_index = kPerThreadBufSize; trace_writer_->RecordThreadInfo(thread); diff --git a/runtime/trace.h b/runtime/trace.h index ec9bedce0a..eb809f5246 100644 --- a/runtime/trace.h +++ b/runtime/trace.h @@ -121,12 +121,31 @@ static constexpr int32_t kHighTimestampOffsetInBytes = static constexpr uintptr_t kMaskTraceAction = ~0b11; +class TraceWriterThreadPool : public ThreadPool { + public: + static TraceWriterThreadPool* Create(const char* name) { + TraceWriterThreadPool* pool = new TraceWriterThreadPool(name); + pool->CreateThreads(); + return pool; + } + + uintptr_t* FinishTaskAndClaimBuffer(size_t tid); + + private: + explicit TraceWriterThreadPool(const char* name) + : ThreadPool(name, + /* num_threads= */ 1, + /* create_peers= */ false, + /* worker_stack_size= */ ThreadPoolWorker::kDefaultStackSize) {} +}; + class TraceWriter { public: TraceWriter(File* trace_file, TraceOutputMode output_mode, TraceClockSource clock_source, size_t buffer_size, + int num_trace_buffers, uint32_t clock_overhead_ns); // This encodes all the events in the per-thread trace buffer and writes it to the trace file / @@ -134,8 +153,8 @@ class TraceWriter { // required to serialize these since each method is encoded with a unique id which is assigned // when the method is seen for the first time in the recoreded events. So we need to serialize // these flushes across threads. - void FlushBuffer(Thread* thread, bool is_sync) REQUIRES_SHARED(Locks::mutator_lock_) - REQUIRES(!tracing_lock_); + void FlushBuffer(Thread* thread, bool is_sync, bool free_buffer) + REQUIRES_SHARED(Locks::mutator_lock_) REQUIRES(!tracing_lock_); // This is called when the per-thread buffer is full and a new entry needs to be recorded. This // returns a pointer to the new buffer where the entries should be recorded. @@ -174,6 +193,22 @@ class TraceWriter { TraceOutputMode GetOutputMode() { return trace_output_mode_; } size_t GetBufferSize() { return buffer_size_; } + // Performs the initialization for the buffer pool. It marks all buffers as free by storing 0 + // as the owner tid. This also allocates the buffer pool. + void InitializeTraceBuffers(); + + // Releases the trace buffer and transfers the ownership to the specified tid. If the tid is 0, + // then it means it is free and other threads can claim it. + void FetchTraceBufferForThread(int index, size_t tid); + + // Tries to find a free buffer (which has owner of 0) from the pool. If there are no free buffers + // it fetches a task, flushes the contents of the buffer and returns that buffer. + uintptr_t* AcquireTraceBuffer(size_t tid); + + // Returns the index corresponding to the start of the current_buffer. We allocate one large + // buffer and assign parts of it for each thread. + int GetMethodTraceIndex(uintptr_t* current_buffer); + private: // Get a 32-bit id for the method and specify if the method hasn't been seen before. If this is // the first time we see this method record information (like method name, declaring class etc.,) @@ -269,12 +304,15 @@ class TraceWriter { // Clock overhead. const uint32_t clock_overhead_ns_; + std::vector<std::atomic<size_t>> owner_tids_; + std::unique_ptr<uintptr_t> trace_buffer_; + // Lock to protect common data structures accessed from multiple threads like // art_method_id_map_, thread_id_map_. Mutex tracing_lock_; // Thread pool to flush the trace entries to file. - std::unique_ptr<ThreadPool> thread_pool_; + std::unique_ptr<TraceWriterThreadPool> thread_pool_; }; // Class for recording event traces. Trace data is either collected |