Implement mutex requeueing for cv broadcasts.
Make the mutex guarding a condition variable part of its state. On a
broadcast requeue waiters on the mutex so they are awoken as the mutex
is unlocked (thereby avoiding thundering herds). Explicit futex use
still guarded behind ART_USE_FUTEXES which remains disabled as I'm
unhappy with some of the warts of mutex usage. Uploading so that the API
changes can stabilize.
Change-Id: Iedb601856ccd8bbc3a64da4ba0cee82246e7bcbf
diff --git a/src/debugger.cc b/src/debugger.cc
index 1e1e7b6..a34f690 100644
--- a/src/debugger.cc
+++ b/src/debugger.cc
@@ -2425,7 +2425,7 @@
// Wait for the request to finish executing.
while (req->invoke_needed_) {
- req->cond_.Wait(self, req->lock_);
+ req->cond_.Wait(self);
}
}
VLOG(jdwp) << " Control has returned from event thread";
diff --git a/src/debugger.h b/src/debugger.h
index c577590..f9d00d1 100644
--- a/src/debugger.h
+++ b/src/debugger.h
@@ -43,7 +43,7 @@
arg_count_(0), arg_values_(NULL), options_(0), error(JDWP::ERR_NONE),
result_tag(JDWP::JT_VOID), exception(0),
lock_("a DebugInvokeReq lock"),
- cond_("a DebugInvokeReq condition variable") {
+ cond_("a DebugInvokeReq condition variable", lock_) {
}
/* boolean; only set when we're in the tail end of an event handler */
@@ -68,8 +68,8 @@
JDWP::ObjectId exception;
/* condition variable to wait on while the method executes */
- Mutex lock_;
- ConditionVariable cond_;
+ Mutex lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
+ ConditionVariable cond_ GUARDED_BY(lock_);
};
class Dbg {
diff --git a/src/heap.cc b/src/heap.cc
index 2f5362e..98845d8 100644
--- a/src/heap.cc
+++ b/src/heap.cc
@@ -279,7 +279,8 @@
// but we can create the heap lock now. We don't create it earlier to
// make it clear that you can't use locks during heap initialization.
gc_complete_lock_ = new Mutex("GC complete lock");
- gc_complete_cond_.reset(new ConditionVariable("GC complete condition variable"));
+ gc_complete_cond_.reset(new ConditionVariable("GC complete condition variable",
+ *gc_complete_lock_));
// Set up the cumulative timing loggers.
for (size_t i = static_cast<size_t>(kGcTypeSticky); i < static_cast<size_t>(kGcTypeMax);
@@ -963,7 +964,7 @@
is_gc_running_ = false;
last_gc_type_ = gc_type;
// Wake anyone who may have been waiting for the GC to complete.
- gc_complete_cond_->Broadcast();
+ gc_complete_cond_->Broadcast(self);
}
// Inform DDMS that a GC completed.
Dbg::GcDidFinish();
@@ -1803,7 +1804,7 @@
{
MutexLock mu(self, *gc_complete_lock_);
while (is_gc_running_) {
- gc_complete_cond_->Wait(self, *gc_complete_lock_);
+ gc_complete_cond_->Wait(self);
}
last_gc_type = last_gc_type_;
wait_time = NanoTime() - wait_start;;
diff --git a/src/jdwp/jdwp_event.cc b/src/jdwp/jdwp_event.cc
index 2865f3a..de6a235 100644
--- a/src/jdwp/jdwp_event.cc
+++ b/src/jdwp/jdwp_event.cc
@@ -537,8 +537,9 @@
pReq->invoke_needed_ = false;
VLOG(jdwp) << "invoke complete, signaling and self-suspending";
- MutexLock mu(Thread::Current(), pReq->lock_);
- pReq->cond_.Signal();
+ Thread* self = Thread::Current();
+ MutexLock mu(self, pReq->lock_);
+ pReq->cond_.Signal(self);
}
}
@@ -595,7 +596,7 @@
while (event_thread_id_ != 0) {
VLOG(jdwp) << StringPrintf("event in progress (%#llx), %#llx sleeping", event_thread_id_, threadId);
waited = true;
- event_thread_cond_.Wait(self, event_thread_lock_);
+ event_thread_cond_.Wait(self);
}
if (waited || threadId != 0) {
@@ -615,14 +616,15 @@
* function is called by dvmSuspendSelf(), and the transition back
* to RUNNING would confuse it.
*/
- MutexLock mu(Thread::Current(), event_thread_lock_);
+ Thread* self = Thread::Current();
+ MutexLock mu(self, event_thread_lock_);
CHECK_NE(event_thread_id_, 0U);
VLOG(jdwp) << StringPrintf("cleared event token (%#llx)", event_thread_id_);
event_thread_id_ = 0;
- event_thread_cond_.Signal();
+ event_thread_cond_.Signal(self);
}
diff --git a/src/jdwp/jdwp_main.cc b/src/jdwp/jdwp_main.cc
index 69fc2fc..064e566 100644
--- a/src/jdwp/jdwp_main.cc
+++ b/src/jdwp/jdwp_main.cc
@@ -88,7 +88,7 @@
JdwpState::JdwpState(const JdwpOptions* options)
: options_(options),
thread_start_lock_("JDWP thread start lock"),
- thread_start_cond_("JDWP thread start condition variable"),
+ thread_start_cond_("JDWP thread start condition variable", thread_start_lock_),
pthread_(0),
thread_(NULL),
debug_thread_started_(false),
@@ -97,7 +97,7 @@
transport_(NULL),
netState(NULL),
attach_lock_("JDWP attach lock"),
- attach_cond_("JDWP attach condition variable"),
+ attach_cond_("JDWP attach condition variable", attach_lock_),
last_activity_time_ms_(0),
serial_lock_("JDWP serial lock", kJdwpSerialLock),
request_serial_(0x10000000),
@@ -106,7 +106,7 @@
event_list_(NULL),
event_list_size_(0),
event_thread_lock_("JDWP event thread lock"),
- event_thread_cond_("JDWP event thread condition variable"),
+ event_thread_cond_("JDWP event thread condition variable", event_thread_lock_),
event_thread_id_(0),
ddm_is_active_(false) {
}
@@ -158,7 +158,7 @@
* Wait until the thread finishes basic initialization.
* TODO: cond vars should be waited upon in a loop
*/
- state->thread_start_cond_.Wait(self, state->thread_start_lock_);
+ state->thread_start_cond_.Wait(self);
} else {
{
MutexLock attach_locker(self, state->attach_lock_);
@@ -172,7 +172,7 @@
* Wait until the thread finishes basic initialization.
* TODO: cond vars should be waited upon in a loop
*/
- state->thread_start_cond_.Wait(self, state->thread_start_lock_);
+ state->thread_start_cond_.Wait(self);
/*
* For suspend=y, wait for the debugger to connect to us or for us to
@@ -184,7 +184,7 @@
*/
{
ScopedThreadStateChange tsc(self, kWaitingForDebuggerToAttach);
- state->attach_cond_.Wait(self, state->attach_lock_);
+ state->attach_cond_.Wait(self);
}
}
if (!state->IsActive()) {
@@ -298,7 +298,7 @@
{
MutexLock locker(thread_, thread_start_lock_);
debug_thread_started_ = true;
- thread_start_cond_.Broadcast();
+ thread_start_cond_.Broadcast(thread_);
}
/* set the thread state to kWaitingInMainDebuggerLoop so GCs don't wait for us */
@@ -332,7 +332,7 @@
if (!(*transport_->establish)(this, options_)) {
/* wake anybody who was waiting for us to succeed */
MutexLock mu(thread_, attach_lock_);
- attach_cond_.Broadcast();
+ attach_cond_.Broadcast(thread_);
break;
}
}
@@ -366,7 +366,7 @@
/* wake anybody who's waiting for us */
MutexLock mu(thread_, attach_lock_);
- attach_cond_.Broadcast();
+ attach_cond_.Broadcast(thread_);
}
}
diff --git a/src/jni_internal.cc b/src/jni_internal.cc
index 22438a0..60b49de 100644
--- a/src/jni_internal.cc
+++ b/src/jni_internal.cc
@@ -509,7 +509,7 @@
handle_(handle),
class_loader_(class_loader),
jni_on_load_lock_("JNI_OnLoad lock"),
- jni_on_load_cond_("JNI_OnLoad condition variable"),
+ jni_on_load_cond_("JNI_OnLoad condition variable", jni_on_load_lock_),
jni_on_load_thread_id_(Thread::Current()->GetThinLockId()),
jni_on_load_result_(kPending) {
}
@@ -543,7 +543,7 @@
} else {
while (jni_on_load_result_ == kPending) {
VLOG(jni) << "[" << *self << " waiting for \"" << path_ << "\" " << "JNI_OnLoad...]";
- jni_on_load_cond_.Wait(self, jni_on_load_lock_);
+ jni_on_load_cond_.Wait(self);
}
okay = (jni_on_load_result_ == kOkay);
@@ -556,13 +556,14 @@
}
void SetResult(bool result) LOCKS_EXCLUDED(jni_on_load_lock_) {
- MutexLock mu(Thread::Current(), jni_on_load_lock_);
+ Thread* self = Thread::Current();
+ MutexLock mu(self, jni_on_load_lock_);
jni_on_load_result_ = result ? kOkay : kFailed;
jni_on_load_thread_id_ = 0;
// Broadcast a wakeup to anybody sleeping on the condition variable.
- jni_on_load_cond_.Broadcast();
+ jni_on_load_cond_.Broadcast(self);
}
void* FindSymbol(const std::string& symbol_name) {
@@ -588,7 +589,7 @@
// Guards remaining items.
Mutex jni_on_load_lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
// Wait for JNI_OnLoad in other thread.
- ConditionVariable jni_on_load_cond_;
+ ConditionVariable jni_on_load_cond_ GUARDED_BY(jni_on_load_lock_);
// Recursive invocation guard.
uint32_t jni_on_load_thread_id_ GUARDED_BY(jni_on_load_lock_);
// Result of earlier JNI_OnLoad call.
diff --git a/src/monitor.cc b/src/monitor.cc
index 9890822..8977ec9 100644
--- a/src/monitor.cc
+++ b/src/monitor.cc
@@ -16,14 +16,6 @@
#include "monitor.h"
-#include <errno.h>
-#include <fcntl.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <sys/time.h>
-#include <time.h>
-#include <unistd.h>
-
#include <vector>
#include "class_linker.h"
@@ -364,38 +356,6 @@
return true;
}
-// Converts the given waiting time (relative to "now") into an absolute time in 'ts'.
-static void ToAbsoluteTime(int64_t ms, int32_t ns, timespec* ts)
- SHARED_LOCKS_REQUIRED(Locks::mutator_lock_) {
- int64_t endSec;
-
-#ifdef HAVE_TIMEDWAIT_MONOTONIC
- clock_gettime(CLOCK_MONOTONIC, ts);
-#else
- {
- timeval tv;
- gettimeofday(&tv, NULL);
- ts->tv_sec = tv.tv_sec;
- ts->tv_nsec = tv.tv_usec * 1000;
- }
-#endif
- endSec = ts->tv_sec + ms / 1000;
- if (endSec >= 0x7fffffff) {
- std::ostringstream ss;
- Thread::Current()->Dump(ss);
- LOG(INFO) << "Note: end time exceeds epoch: " << ss.str();
- endSec = 0x7ffffffe;
- }
- ts->tv_sec = endSec;
- ts->tv_nsec = (ts->tv_nsec + (ms % 1000) * 1000000) + ns;
-
- // Catch rollover.
- if (ts->tv_nsec >= 1000000000L) {
- ts->tv_sec++;
- ts->tv_nsec -= 1000000000L;
- }
-}
-
/*
* Wait on a monitor until timeout, interrupt, or notification. Used for
* Object.wait() and (somewhat indirectly) Thread.sleep() and Thread.join().
@@ -439,14 +399,6 @@
return;
}
- // Compute absolute wakeup time, if necessary.
- timespec ts;
- bool timed = false;
- if (ms != 0 || ns != 0) {
- ToAbsoluteTime(ms, ns, &ts);
- timed = true;
- }
-
/*
* Add ourselves to the set of threads waiting on this monitor, and
* release our hold. We need to let it go even if we're a few levels
@@ -471,6 +423,7 @@
* that we won't touch any references in this state, and we'll check
* our suspend mode before we transition out.
*/
+ bool timed = (ms != 0 || ns != 0);
self->TransitionFromRunnableToSuspended(timed ? kTimedWaiting : kWaiting);
bool wasInterrupted = false;
@@ -496,9 +449,9 @@
} else {
// Wait for a notification or a timeout to occur.
if (!timed) {
- self->wait_cond_->Wait(self, *self->wait_mutex_);
+ self->wait_cond_->Wait(self);
} else {
- self->wait_cond_->TimedWait(self, *self->wait_mutex_, ts);
+ self->wait_cond_->TimedWait(self, ms, ns);
}
if (self->interrupted_) {
wasInterrupted = true;
@@ -568,7 +521,7 @@
// Check to see if the thread is still waiting.
MutexLock mu(self, *thread->wait_mutex_);
if (thread->wait_monitor_ != NULL) {
- thread->wait_cond_->Signal();
+ thread->wait_cond_->Signal(self);
return;
}
}
diff --git a/src/mutex.cc b/src/mutex.cc
index 96e04ef..18dc863 100644
--- a/src/mutex.cc
+++ b/src/mutex.cc
@@ -17,11 +17,13 @@
#include "mutex.h"
#include <errno.h>
+#include <sys/time.h>
#include "cutils/atomic.h"
#include "cutils/atomic-inline.h"
#include "logging.h"
#include "runtime.h"
+#include "scoped_thread_state_change.h"
#include "thread.h"
#include "utils.h"
@@ -41,8 +43,8 @@
#ifndef SYS_futex
#define SYS_futex __NR_futex
#endif
-int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, int *, int ) {
- return syscall(SYS_futex, uaddr, op, val, timeout, NULL, NULL);
+int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, volatile int *uaddr2, int val3) {
+ return syscall(SYS_futex, uaddr, op, val, timeout, uaddr2, val3);
}
#endif // ART_USE_FUTEXES
@@ -96,6 +98,51 @@
}
}
+// Initialize a timespec to either an absolute or relative time.
+static void InitTimeSpec(Thread* self, bool absolute, int clock, int64_t ms, int32_t ns,
+ timespec* ts) {
+ int64_t endSec;
+
+ if (absolute) {
+ clock_gettime(clock, ts);
+ } else {
+ ts->tv_sec = 0;
+ ts->tv_nsec = 0;
+ }
+ endSec = ts->tv_sec + ms / 1000;
+ if (UNLIKELY(endSec >= 0x7fffffff)) {
+ std::ostringstream ss;
+ ScopedObjectAccess soa(self);
+ self->Dump(ss);
+ LOG(INFO) << "Note: end time exceeds epoch: " << ss.str();
+ endSec = 0x7ffffffe;
+ }
+ ts->tv_sec = endSec;
+ ts->tv_nsec = (ts->tv_nsec + (ms % 1000) * 1000000) + ns;
+
+ // Catch rollover.
+ if (ts->tv_nsec >= 1000000000L) {
+ ts->tv_sec++;
+ ts->tv_nsec -= 1000000000L;
+ }
+}
+
+#if ART_USE_FUTEXES
+static bool ComputeRelativeTimeSpec(timespec* result_ts, const timespec& lhs, const timespec& rhs) {
+ const long int one_sec = 1000 * 1000 * 1000; // one second in nanoseconds.
+ result_ts->tv_sec = lhs.tv_sec - rhs.tv_sec;
+ result_ts->tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
+ if (result_ts->tv_nsec < 0) {
+ result_ts->tv_sec--;
+ result_ts->tv_nsec += one_sec;
+ } else if (result_ts->tv_nsec > one_sec) {
+ result_ts->tv_sec++;
+ result_ts->tv_nsec -= one_sec;
+ }
+ return result_ts->tv_sec < 0;
+}
+#endif
+
BaseMutex::BaseMutex(const char* name, LockLevel level) : level_(level), name_(name) {}
static void CheckUnattachedThread(LockLevel level) NO_THREAD_SAFETY_ANALYSIS {
@@ -176,7 +223,11 @@
Mutex::Mutex(const char* name, LockLevel level, bool recursive)
: BaseMutex(name, level), recursive_(recursive), recursion_count_(0) {
-#if defined(__BIONIC__) || defined(__APPLE__)
+#if ART_USE_FUTEXES
+ state_ = 0;
+ exclusive_owner_ = 0;
+ num_contenders_ = 0;
+#elif defined(__BIONIC__) || defined(__APPLE__)
// Use recursive mutexes for bionic and Apple otherwise the
// non-recursive mutexes don't have TIDs to check lock ownership of.
pthread_mutexattr_t attributes;
@@ -190,6 +241,17 @@
}
Mutex::~Mutex() {
+#if ART_USE_FUTEXES
+ if (state_ != 0) {
+ MutexLock mu(Thread::Current(), *Locks::runtime_shutdown_lock_);
+ Runtime* runtime = Runtime::Current();
+ bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
+ LOG(shutting_down ? WARNING : FATAL) << "destroying mutex with owner: " << exclusive_owner_;
+ } else {
+ CHECK_EQ(exclusive_owner_, 0U) << "unexpectedly found an owner on unlocked mutex " << name_;
+ CHECK_EQ(num_contenders_, 0) << "unexpectedly found a contender on mutex " << name_;
+ }
+#else
// We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
// may still be using locks.
int rc = pthread_mutex_destroy(&mutex_);
@@ -201,6 +263,7 @@
bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
PLOG(shutting_down ? WARNING : FATAL) << "pthread_mutex_destroy failed for " << name_;
}
+#endif
}
void Mutex::ExclusiveLock(Thread* self) {
@@ -209,7 +272,29 @@
AssertNotHeld(self);
}
if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ if (cur_state == 0) {
+ // Change state from 0 to 1.
+ done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+ } else {
+ // Failed to acquire, hang up.
+ android_atomic_inc(&num_contenders_);
+ if (futex(&state_, FUTEX_WAIT, 1, NULL, NULL, 0) != 0) {
+ if (errno != EAGAIN) {
+ PLOG(FATAL) << "futex wait failed for " << name_;
+ }
+ }
+ android_atomic_dec(&num_contenders_);
+ }
+ } while(!done);
+ DCHECK_EQ(state_, 1);
+ exclusive_owner_ = SafeGetTid(self);
+#else
CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_));
+#endif
RegisterAsLocked(self);
}
recursion_count_++;
@@ -226,6 +311,20 @@
AssertNotHeld(self);
}
if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ if (cur_state == 0) {
+ // Change state from 0 to 1.
+ done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+ } else {
+ return false;
+ }
+ } while(!done);
+ DCHECK_EQ(state_, 1);
+ exclusive_owner_ = SafeGetTid(self);
+#else
int result = pthread_mutex_trylock(&mutex_);
if (result == EBUSY) {
return false;
@@ -234,6 +333,7 @@
errno = result;
PLOG(FATAL) << "pthread_mutex_trylock failed for " << name_;
}
+#endif
RegisterAsLocked(self);
}
recursion_count_++;
@@ -255,7 +355,28 @@
<< name_ << " " << recursion_count_;
}
RegisterAsUnlocked(self);
+#if ART_USE_FUTEXES
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ if (cur_state == 1) {
+ // We're no longer the owner.
+ exclusive_owner_ = 0;
+ // Change state to 0.
+ done = android_atomic_cmpxchg(cur_state, 0, &state_) == 0;
+ if (done) { // Spurious fail?
+ // Wake a contender
+ if (num_contenders_ > 0) {
+ futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0);
+ }
+ }
+ } else {
+ LOG(FATAL) << "Unexpected state_:" << cur_state << " for " << name_;
+ }
+ } while(!done);
+#else
CHECK_MUTEX_CALL(pthread_mutex_unlock, (&mutex_));
+#endif
}
}
@@ -272,7 +393,9 @@
}
uint64_t Mutex::GetExclusiveOwnerTid() const {
-#if defined(__BIONIC__)
+#if ART_USE_FUTEXES
+ return exclusive_owner_;
+#elif defined(__BIONIC__)
return static_cast<uint64_t>((mutex_.value >> 16) & 0xffff);
#elif defined(__GLIBC__)
return reinterpret_cast<const glibc_pthread_mutex_t*>(&mutex_)->owner;
@@ -396,10 +519,12 @@
}
#if HAVE_TIMED_RWLOCK
-bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, const timespec& abs_timeout) {
+bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns) {
DCHECK(self == NULL || self == Thread::Current());
#if ART_USE_FUTEXES
bool done = false;
+ timespec end_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
do {
int32_t cur_state = state_;
if (cur_state == 0) {
@@ -407,12 +532,18 @@
done = android_atomic_cmpxchg(0, -1, &state_) == 0;
} else {
// Failed to acquire, hang up.
+ timespec now_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+ timespec rel_ts;
+ if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+ return false; // Timed out.
+ }
android_atomic_inc(&num_pending_writers_);
- if (futex(&state_, FUTEX_WAIT, cur_state, &abs_timeout, NULL, 0) != 0) {
+ if (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
if (errno == ETIMEDOUT) {
android_atomic_dec(&num_pending_writers_);
- return false;
- } else if (errno != EAGAIN) {
+ return false; // Timed out.
+ } else if (errno != EAGAIN && errno != EINTR) {
PLOG(FATAL) << "timed futex wait failed for " << name_;
}
}
@@ -421,7 +552,9 @@
} while(!done);
exclusive_owner_ = SafeGetTid(self);
#else
- int result = pthread_rwlock_timedwrlock(&rwlock_, &abs_timeout);
+ timespec ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &ts);
+ int result = pthread_rwlock_timedwrlock(&rwlock_, &ts);
if (result == ETIMEDOUT) {
return false;
}
@@ -576,11 +709,19 @@
return os << mu.Dump();
}
-ConditionVariable::ConditionVariable(const std::string& name) : name_(name) {
+ConditionVariable::ConditionVariable(const std::string& name, Mutex& guard)
+ : name_(name), guard_(guard) {
+#if ART_USE_FUTEXES
+ state_ = 0;
+ num_waiters_ = 0;
+ num_awoken_ = 0;
+#else
CHECK_MUTEX_CALL(pthread_cond_init, (&cond_, NULL));
+#endif
}
ConditionVariable::~ConditionVariable() {
+#if !ART_USE_FUTEXES
// We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
// may still be using condition variables.
int rc = pthread_cond_destroy(&cond_);
@@ -591,39 +732,162 @@
bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
PLOG(shutting_down ? WARNING : FATAL) << "pthread_cond_destroy failed for " << name_;
}
+#endif
}
-void ConditionVariable::Broadcast() {
+void ConditionVariable::Broadcast(Thread* self) {
+ DCHECK(self == NULL || self == Thread::Current());
+ // TODO: enable below, there's a race in thread creation that causes false failures currently.
+ // guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+ if (num_waiters_ > 0) {
+ android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+ bool done = false;
+ do {
+ int32_t cur_state = state_;
+ // Compute number of waiters requeued and add to mutex contenders.
+ int32_t num_requeued = num_waiters_ - num_awoken_;
+ android_atomic_add(num_requeued, &guard_.num_contenders_);
+ // Requeue waiters onto contenders.
+ done = futex(&state_, FUTEX_CMP_REQUEUE, 0,
+ reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
+ &guard_.state_, cur_state) != -1;
+ if (!done) {
+ if (errno != EAGAIN) {
+ PLOG(FATAL) << "futex cmp requeue failed for " << name_;
+ }
+ } else {
+ num_awoken_ = num_waiters_;
+ }
+ } while (!done);
+ }
+#else
CHECK_MUTEX_CALL(pthread_cond_broadcast, (&cond_));
+#endif
}
-void ConditionVariable::Signal() {
+void ConditionVariable::Signal(Thread* self) {
+ DCHECK(self == NULL || self == Thread::Current());
+ guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+ if (num_waiters_ > 0) {
+ android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+ // Futex wake 1 waiter who will then come and in contend on mutex. It'd be nice to requeue them
+ // to avoid this, however, requeueing can only move all waiters.
+ if (futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0) == 1) {
+ // Wake success.
+ // We weren't requeued, however, to make accounting simpler in the Wait code, increment the
+ // number of contenders on the mutex.
+ num_awoken_++;
+ android_atomic_inc(&guard_.num_contenders_);
+ }
+ }
+#else
CHECK_MUTEX_CALL(pthread_cond_signal, (&cond_));
+#endif
}
-void ConditionVariable::Wait(Thread* self, Mutex& mutex) {
- mutex.CheckSafeToWait(self);
- unsigned int old_recursion_count = mutex.recursion_count_;
- mutex.recursion_count_ = 0;
- CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &mutex.mutex_));
- mutex.recursion_count_ = old_recursion_count;
+void ConditionVariable::Wait(Thread* self) {
+ DCHECK(self == NULL || self == Thread::Current());
+ guard_.AssertExclusiveHeld(self);
+ guard_.CheckSafeToWait(self);
+ unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+ int32_t cur_state = state_;
+ num_waiters_++;
+ guard_.recursion_count_ = 1;
+ guard_.ExclusiveUnlock(self);
+ bool woken = true;
+ while (futex(&state_, FUTEX_WAIT, cur_state, NULL, NULL, 0) != 0) {
+ if (errno == EINTR || errno == EAGAIN) {
+ if (state_ != cur_state) {
+ // We failed and a signal has come in.
+ woken = false;
+ break;
+ }
+ } else {
+ PLOG(FATAL) << "futex wait failed for " << name_;
+ }
+ }
+ guard_.ExclusiveLock(self);
+ num_waiters_--;
+ if (woken) {
+ // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+ android_atomic_dec(&guard_.num_contenders_);
+ num_awoken_--;
+ }
+#else
+ guard_.recursion_count_ = 0;
+ CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &guard_.mutex_));
+#endif
+ guard_.recursion_count_ = old_recursion_count;
}
-void ConditionVariable::TimedWait(Thread* self, Mutex& mutex, const timespec& ts) {
+void ConditionVariable::TimedWait(Thread* self, int64_t ms, int32_t ns) {
+ DCHECK(self == NULL || self == Thread::Current());
+ guard_.AssertExclusiveHeld(self);
+ guard_.CheckSafeToWait(self);
+ unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+ // Record the original end time so that if the futex call fails we can recompute the appropriate
+ // relative time.
+ timespec end_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
+ timespec rel_ts;
+ InitTimeSpec(self, false, CLOCK_REALTIME, ms, ns, &rel_ts);
+ // Read state so that we can know if a signal comes in during before we sleep.
+ int32_t cur_state = state_;
+ num_waiters_++;
+ guard_.recursion_count_ = 1;
+ guard_.ExclusiveUnlock(self);
+ bool woken = true; // Did the futex wait end because we were awoken?
+ while (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
+ if (errno == ETIMEDOUT) {
+ woken = false;
+ break;
+ }
+ if ((errno == EINTR) || (errno == EAGAIN)) {
+ if (state_ != cur_state) {
+ // We failed and a signal has come in.
+ woken = false;
+ break;
+ }
+ timespec now_abs_ts;
+ InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+ if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+ // futex failed and we timed out in the meantime.
+ woken = false;
+ break;
+ }
+ } else {
+ PLOG(FATAL) << "timed futex wait failed for " << name_;
+ }
+ }
+ guard_.ExclusiveLock(self);
+ num_waiters_--;
+ if (woken) {
+ // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+ android_atomic_dec(&guard_.num_contenders_);
+ num_awoken_--;
+ }
+#else
#ifdef HAVE_TIMEDWAIT_MONOTONIC
#define TIMEDWAIT pthread_cond_timedwait_monotonic
+ int clock = CLOCK_MONOTONIC;
#else
#define TIMEDWAIT pthread_cond_timedwait
+ int clock = CLOCK_REALTIME;
#endif
- mutex.CheckSafeToWait(self);
- unsigned int old_recursion_count = mutex.recursion_count_;
- mutex.recursion_count_ = 0;
- int rc = TIMEDWAIT(&cond_, &mutex.mutex_, &ts);
- mutex.recursion_count_ = old_recursion_count;
+ guard_.recursion_count_ = 0;
+ timespec ts;
+ InitTimeSpec(self, true, clock, ms, ns, &ts);
+ int rc = TIMEDWAIT(&cond_, &guard_.mutex_, &ts);
if (rc != 0 && rc != ETIMEDOUT) {
errno = rc;
PLOG(FATAL) << "TimedWait failed for " << name_;
}
+#endif
+ guard_.recursion_count_ = old_recursion_count;
}
} // namespace art
diff --git a/src/mutex.h b/src/mutex.h
index d51e2cc..4af2ad7 100644
--- a/src/mutex.h
+++ b/src/mutex.h
@@ -132,7 +132,16 @@
std::string Dump() const;
private:
+#if ART_USE_FUTEXES
+ // 0 is unheld, 1 is held.
+ volatile int32_t state_;
+ // Exclusive owner.
+ volatile uint64_t exclusive_owner_;
+ // Number of waiting contenders.
+ volatile int32_t num_contenders_;
+#else
pthread_mutex_t mutex_;
+#endif
const bool recursive_; // Can the lock be recursively held?
unsigned int recursion_count_;
friend class ConditionVariable;
@@ -175,7 +184,7 @@
// Block until ReaderWriterMutex is free and acquire exclusive access. Returns true on success
// or false if timeout is reached.
#if HAVE_TIMED_RWLOCK
- bool ExclusiveLockWithTimeout(Thread* self, const timespec& abs_timeout)
+ bool ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns)
EXCLUSIVE_TRYLOCK_FUNCTION(true);
#endif
@@ -254,17 +263,35 @@
// (Signal) or all at once (Broadcast).
class ConditionVariable {
public:
- explicit ConditionVariable(const std::string& name);
+ explicit ConditionVariable(const std::string& name, Mutex& mutex);
~ConditionVariable();
- void Broadcast();
- void Signal();
- void Wait(Thread* self, Mutex& mutex);
- void TimedWait(Thread* self, Mutex& mutex, const timespec& ts);
+ void Broadcast(Thread* self);
+ void Signal(Thread* self);
+ // TODO: No thread safety analysis on Wait and TimedWait as they call mutex operations via their
+ // pointer copy, thereby defeating annotalysis.
+ void Wait(Thread* self) NO_THREAD_SAFETY_ANALYSIS;
+ void TimedWait(Thread* self, int64_t ms, int32_t ns) NO_THREAD_SAFETY_ANALYSIS;
private:
- pthread_cond_t cond_;
std::string name_;
+ // The Mutex being used by waiters. It is an error to mix condition variables between different
+ // Mutexes.
+ Mutex& guard_;
+#if ART_USE_FUTEXES
+ // A counter that is modified by signals and broadcasts. This ensures that when a waiter gives up
+ // their Mutex and another thread takes it and signals, the waiting thread observes that state_
+ // changed and doesn't enter the wait.
+ volatile int32_t state_;
+ // Number of threads that have come into to wait, not the length of the waiters on the futex as
+ // waiters may have been requeued onto guard_. A non-zero value indicates that signal and
+ // broadcast should do something. Guarded by guard_.
+ volatile int32_t num_waiters_;
+ // Number of threads that have been awoken out of the pool of waiters.
+ volatile int32_t num_awoken_;
+#else
+ pthread_cond_t cond_;
+#endif
DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
};
diff --git a/src/mutex_test.cc b/src/mutex_test.cc
index 4dac3c6..0b0f2c9 100644
--- a/src/mutex_test.cc
+++ b/src/mutex_test.cc
@@ -97,13 +97,13 @@
struct RecursiveLockWait {
explicit RecursiveLockWait()
- : mu("test mutex", kDefaultMutexLevel, true), cv("test condition variable") {
+ : mu("test mutex", kDefaultMutexLevel, true), cv("test condition variable", mu) {
}
static void* Callback(void* arg) {
RecursiveLockWait* state = reinterpret_cast<RecursiveLockWait*>(arg);
state->mu.Lock(Thread::Current());
- state->cv.Signal();
+ state->cv.Signal(Thread::Current());
state->mu.Unlock(Thread::Current());
return NULL;
}
@@ -122,7 +122,7 @@
int pthread_create_result = pthread_create(&pthread, NULL, RecursiveLockWait::Callback, &state);
ASSERT_EQ(0, pthread_create_result);
- state.cv.Wait(Thread::Current(), state.mu);
+ state.cv.Wait(Thread::Current());
state.mu.Unlock(Thread::Current());
state.mu.Unlock(Thread::Current());
diff --git a/src/runtime.cc b/src/runtime.cc
index e2e419a..ee8a8c8 100644
--- a/src/runtime.cc
+++ b/src/runtime.cc
@@ -77,7 +77,7 @@
resolution_method_(NULL),
system_class_loader_(NULL),
threads_being_born_(0),
- shutdown_cond_(new ConditionVariable("Runtime shutdown")),
+ shutdown_cond_(new ConditionVariable("Runtime shutdown", *Locks::runtime_shutdown_lock_)),
shutting_down_(false),
shutting_down_started_(false),
started_(false),
@@ -120,7 +120,7 @@
MutexLock mu(self, *Locks::runtime_shutdown_lock_);
shutting_down_started_ = true;
while (threads_being_born_ > 0) {
- shutdown_cond_->Wait(self, *Locks::runtime_shutdown_lock_);
+ shutdown_cond_->Wait(self);
}
shutting_down_ = true;
}
@@ -685,7 +685,7 @@
DCHECK_GT(threads_being_born_, 0U);
threads_being_born_--;
if (shutting_down_started_ && threads_being_born_ == 0) {
- shutdown_cond_->Broadcast();
+ shutdown_cond_->Broadcast(Thread::Current());
}
}
diff --git a/src/signal_catcher.cc b/src/signal_catcher.cc
index b6f6a41..80c37d4 100644
--- a/src/signal_catcher.cc
+++ b/src/signal_catcher.cc
@@ -62,7 +62,7 @@
SignalCatcher::SignalCatcher(const std::string& stack_trace_file)
: stack_trace_file_(stack_trace_file),
lock_("SignalCatcher lock"),
- cond_("SignalCatcher::cond_"),
+ cond_("SignalCatcher::cond_", lock_),
thread_(NULL) {
SetHaltFlag(false);
@@ -72,7 +72,7 @@
Thread* self = Thread::Current();
MutexLock mu(self, lock_);
while (thread_ == NULL) {
- cond_.Wait(self, lock_);
+ cond_.Wait(self);
}
}
@@ -190,7 +190,7 @@
{
MutexLock mu(self, signal_catcher->lock_);
signal_catcher->thread_ = self;
- signal_catcher->cond_.Broadcast();
+ signal_catcher->cond_.Broadcast(self);
}
// Set up mask with signals we want to handle.
diff --git a/src/signal_catcher.h b/src/signal_catcher.h
index e8ac812..074267e 100644
--- a/src/signal_catcher.h
+++ b/src/signal_catcher.h
@@ -51,8 +51,8 @@
std::string stack_trace_file_;
- mutable Mutex lock_;
- ConditionVariable cond_;
+ mutable Mutex lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
+ ConditionVariable cond_ GUARDED_BY(lock_);
bool halt_ GUARDED_BY(lock_);
pthread_t pthread_ GUARDED_BY(lock_);
Thread* thread_ GUARDED_BY(lock_);
diff --git a/src/thread.cc b/src/thread.cc
index 485caab..6daeadf 100644
--- a/src/thread.cc
+++ b/src/thread.cc
@@ -591,7 +591,7 @@
DCHECK_EQ(old_state_and_flags.as_struct.state, old_state);
while ((old_state_and_flags.as_struct.flags & kSuspendRequest) != 0) {
// Re-check when Thread::resume_cond_ is notified.
- Thread::resume_cond_->Wait(this, *Locks::thread_suspend_count_lock_);
+ Thread::resume_cond_->Wait(this);
old_state_and_flags = state_and_flags_;
DCHECK_EQ(old_state_and_flags.as_struct.state, old_state);
}
@@ -875,7 +875,8 @@
void Thread::Startup() {
{
MutexLock mu(Thread::Current(), *Locks::thread_suspend_count_lock_); // Keep GCC happy.
- resume_cond_ = new ConditionVariable("Thread resumption condition variable");
+ resume_cond_ = new ConditionVariable("Thread resumption condition variable",
+ *Locks::thread_suspend_count_lock_);
}
// Allocate a TLS slot.
@@ -916,7 +917,7 @@
thin_lock_id_(0),
tid_(0),
wait_mutex_(new Mutex("a thread wait mutex")),
- wait_cond_(new ConditionVariable("a thread wait condition variable")),
+ wait_cond_(new ConditionVariable("a thread wait condition variable", *wait_mutex_)),
wait_monitor_(NULL),
interrupted_(false),
wait_next_(NULL),
@@ -1179,22 +1180,24 @@
}
void Thread::Interrupt() {
- MutexLock mu(Thread::Current(), *wait_mutex_);
+ Thread* self = Thread::Current();
+ MutexLock mu(self, *wait_mutex_);
if (interrupted_) {
return;
}
interrupted_ = true;
- NotifyLocked();
+ NotifyLocked(self);
}
void Thread::Notify() {
- MutexLock mu(Thread::Current(), *wait_mutex_);
- NotifyLocked();
+ Thread* self = Thread::Current();
+ MutexLock mu(self, *wait_mutex_);
+ NotifyLocked(self);
}
-void Thread::NotifyLocked() {
+void Thread::NotifyLocked(Thread* self) {
if (wait_monitor_ != NULL) {
- wait_cond_->Signal();
+ wait_cond_->Signal(self);
}
}
diff --git a/src/thread.h b/src/thread.h
index 3ba9f4a..d559449 100644
--- a/src/thread.h
+++ b/src/thread.h
@@ -615,7 +615,7 @@
void InitPthreadKeySelf();
void InitStackHwm();
- void NotifyLocked() EXCLUSIVE_LOCKS_REQUIRED(wait_mutex_);
+ void NotifyLocked(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(wait_mutex_);
bool ReadFlag(ThreadFlag flag) const {
return (state_and_flags_.as_struct.flags & flag) != 0;
@@ -632,8 +632,7 @@
// Used to notify threads that they should attempt to resume, they will suspend again if
// their suspend count is > 0.
- static ConditionVariable* resume_cond_
- GUARDED_BY(Locks::thread_suspend_count_lock_);
+ static ConditionVariable* resume_cond_ GUARDED_BY(Locks::thread_suspend_count_lock_);
// --- Frequently accessed fields first for short offsets ---
diff --git a/src/thread_list.cc b/src/thread_list.cc
index 09a8de6..5ef8625 100644
--- a/src/thread_list.cc
+++ b/src/thread_list.cc
@@ -30,7 +30,7 @@
ThreadList::ThreadList()
: allocated_ids_lock_("allocated thread ids lock"),
suspend_all_count_(0), debug_suspend_all_count_(0),
- thread_exit_cond_("thread exit condition variable") {
+ thread_exit_cond_("thread exit condition variable", *Locks::thread_list_lock_) {
}
ThreadList::~ThreadList() {
@@ -184,10 +184,7 @@
// Block on the mutator lock until all Runnable threads release their share of access.
#if HAVE_TIMED_RWLOCK
// Timeout if we wait more than 30 seconds.
- timespec timeout;
- clock_gettime(CLOCK_REALTIME, &timeout);
- timeout.tv_sec += 30;
- if (UNLIKELY(!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, timeout))) {
+ if (UNLIKELY(!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, 30 * 1000, 0))) {
UnsafeLogFatalForThreadSuspendAllTimeout(self);
}
#else
@@ -226,7 +223,7 @@
// Broadcast a notification to all suspended threads, some or all of
// which may choose to wake up. No need to wait for them.
VLOG(threads) << *self << " ResumeAll waking others";
- Thread::resume_cond_->Broadcast();
+ Thread::resume_cond_->Broadcast(self);
}
VLOG(threads) << *self << " ResumeAll complete";
}
@@ -251,7 +248,7 @@
{
VLOG(threads) << "Resume(" << *thread << ") waking others";
MutexLock mu(self, *Locks::thread_suspend_count_lock_);
- Thread::resume_cond_->Broadcast();
+ Thread::resume_cond_->Broadcast(self);
}
VLOG(threads) << "Resume(" << *thread << ") complete";
@@ -286,10 +283,7 @@
// immediately unlock again.
#if HAVE_TIMED_RWLOCK
// Timeout if we wait more than 30 seconds.
- timespec timeout;
- clock_gettime(CLOCK_REALTIME, &timeout);
- timeout.tv_sec += 30;
- if (!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, timeout)) {
+ if (!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, 30 * 1000, 0)) {
UnsafeLogFatalForThreadSuspendAllTimeout(self);
} else {
Locks::mutator_lock_->ExclusiveUnlock(self);
@@ -328,7 +322,7 @@
Dbg::ClearWaitForEventThread();
while (self->suspend_count_ != 0) {
- Thread::resume_cond_->Wait(self, *Locks::thread_suspend_count_lock_);
+ Thread::resume_cond_->Wait(self);
if (self->suspend_count_ != 0) {
// The condition was signaled but we're still suspended. This
// can happen if the debugger lets go while a SIGQUIT thread
@@ -366,7 +360,7 @@
{
MutexLock mu(self, *Locks::thread_suspend_count_lock_);
- Thread::resume_cond_->Broadcast();
+ Thread::resume_cond_->Broadcast(self);
}
VLOG(threads) << "UndoDebuggerSuspensions(" << *self << ") complete";
@@ -396,7 +390,7 @@
}
if (!all_threads_are_daemons) {
// Wait for another thread to exit before re-checking.
- thread_exit_cond_.Wait(self, *Locks::thread_list_lock_);
+ thread_exit_cond_.Wait(self);
}
} while(!all_threads_are_daemons);
}
@@ -486,7 +480,7 @@
// Signal that a thread just detached.
MutexLock mu(NULL, *Locks::thread_list_lock_);
- thread_exit_cond_.Signal();
+ thread_exit_cond_.Signal(NULL);
}
void ThreadList::ForEach(void (*callback)(Thread*, void*), void* context) {