Implement mutex requeueing for cv broadcasts.

Make the mutex guarding a condition variable part of its state. On a
broadcast requeue waiters on the mutex so they are awoken as the mutex
is unlocked (thereby avoiding thundering herds). Explicit futex use
still guarded behind ART_USE_FUTEXES which remains disabled as I'm
unhappy with some of the warts of mutex usage. Uploading so that the API
changes can stabilize.

Change-Id: Iedb601856ccd8bbc3a64da4ba0cee82246e7bcbf
diff --git a/src/debugger.cc b/src/debugger.cc
index 1e1e7b6..a34f690 100644
--- a/src/debugger.cc
+++ b/src/debugger.cc
@@ -2425,7 +2425,7 @@
 
       // Wait for the request to finish executing.
       while (req->invoke_needed_) {
-        req->cond_.Wait(self, req->lock_);
+        req->cond_.Wait(self);
       }
     }
     VLOG(jdwp) << "    Control has returned from event thread";
diff --git a/src/debugger.h b/src/debugger.h
index c577590..f9d00d1 100644
--- a/src/debugger.h
+++ b/src/debugger.h
@@ -43,7 +43,7 @@
         arg_count_(0), arg_values_(NULL), options_(0), error(JDWP::ERR_NONE),
         result_tag(JDWP::JT_VOID), exception(0),
         lock_("a DebugInvokeReq lock"),
-        cond_("a DebugInvokeReq condition variable") {
+        cond_("a DebugInvokeReq condition variable", lock_) {
   }
 
   /* boolean; only set when we're in the tail end of an event handler */
@@ -68,8 +68,8 @@
   JDWP::ObjectId exception;
 
   /* condition variable to wait on while the method executes */
-  Mutex lock_;
-  ConditionVariable cond_;
+  Mutex lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
+  ConditionVariable cond_ GUARDED_BY(lock_);
 };
 
 class Dbg {
diff --git a/src/heap.cc b/src/heap.cc
index 2f5362e..98845d8 100644
--- a/src/heap.cc
+++ b/src/heap.cc
@@ -279,7 +279,8 @@
   // but we can create the heap lock now. We don't create it earlier to
   // make it clear that you can't use locks during heap initialization.
   gc_complete_lock_ = new Mutex("GC complete lock");
-  gc_complete_cond_.reset(new ConditionVariable("GC complete condition variable"));
+  gc_complete_cond_.reset(new ConditionVariable("GC complete condition variable",
+                                                *gc_complete_lock_));
 
   // Set up the cumulative timing loggers.
   for (size_t i = static_cast<size_t>(kGcTypeSticky); i < static_cast<size_t>(kGcTypeMax);
@@ -963,7 +964,7 @@
     is_gc_running_ = false;
     last_gc_type_ = gc_type;
     // Wake anyone who may have been waiting for the GC to complete.
-    gc_complete_cond_->Broadcast();
+    gc_complete_cond_->Broadcast(self);
   }
   // Inform DDMS that a GC completed.
   Dbg::GcDidFinish();
@@ -1803,7 +1804,7 @@
       {
         MutexLock mu(self, *gc_complete_lock_);
         while (is_gc_running_) {
-          gc_complete_cond_->Wait(self, *gc_complete_lock_);
+          gc_complete_cond_->Wait(self);
         }
         last_gc_type = last_gc_type_;
         wait_time = NanoTime() - wait_start;;
diff --git a/src/jdwp/jdwp_event.cc b/src/jdwp/jdwp_event.cc
index 2865f3a..de6a235 100644
--- a/src/jdwp/jdwp_event.cc
+++ b/src/jdwp/jdwp_event.cc
@@ -537,8 +537,9 @@
     pReq->invoke_needed_ = false;
 
     VLOG(jdwp) << "invoke complete, signaling and self-suspending";
-    MutexLock mu(Thread::Current(), pReq->lock_);
-    pReq->cond_.Signal();
+    Thread* self = Thread::Current();
+    MutexLock mu(self, pReq->lock_);
+    pReq->cond_.Signal(self);
   }
 }
 
@@ -595,7 +596,7 @@
   while (event_thread_id_ != 0) {
     VLOG(jdwp) << StringPrintf("event in progress (%#llx), %#llx sleeping", event_thread_id_, threadId);
     waited = true;
-    event_thread_cond_.Wait(self, event_thread_lock_);
+    event_thread_cond_.Wait(self);
   }
 
   if (waited || threadId != 0) {
@@ -615,14 +616,15 @@
    * function is called by dvmSuspendSelf(), and the transition back
    * to RUNNING would confuse it.
    */
-  MutexLock mu(Thread::Current(), event_thread_lock_);
+  Thread* self = Thread::Current();
+  MutexLock mu(self, event_thread_lock_);
 
   CHECK_NE(event_thread_id_, 0U);
   VLOG(jdwp) << StringPrintf("cleared event token (%#llx)", event_thread_id_);
 
   event_thread_id_ = 0;
 
-  event_thread_cond_.Signal();
+  event_thread_cond_.Signal(self);
 }
 
 
diff --git a/src/jdwp/jdwp_main.cc b/src/jdwp/jdwp_main.cc
index 69fc2fc..064e566 100644
--- a/src/jdwp/jdwp_main.cc
+++ b/src/jdwp/jdwp_main.cc
@@ -88,7 +88,7 @@
 JdwpState::JdwpState(const JdwpOptions* options)
     : options_(options),
       thread_start_lock_("JDWP thread start lock"),
-      thread_start_cond_("JDWP thread start condition variable"),
+      thread_start_cond_("JDWP thread start condition variable", thread_start_lock_),
       pthread_(0),
       thread_(NULL),
       debug_thread_started_(false),
@@ -97,7 +97,7 @@
       transport_(NULL),
       netState(NULL),
       attach_lock_("JDWP attach lock"),
-      attach_cond_("JDWP attach condition variable"),
+      attach_cond_("JDWP attach condition variable", attach_lock_),
       last_activity_time_ms_(0),
       serial_lock_("JDWP serial lock", kJdwpSerialLock),
       request_serial_(0x10000000),
@@ -106,7 +106,7 @@
       event_list_(NULL),
       event_list_size_(0),
       event_thread_lock_("JDWP event thread lock"),
-      event_thread_cond_("JDWP event thread condition variable"),
+      event_thread_cond_("JDWP event thread condition variable", event_thread_lock_),
       event_thread_id_(0),
       ddm_is_active_(false) {
 }
@@ -158,7 +158,7 @@
        * Wait until the thread finishes basic initialization.
        * TODO: cond vars should be waited upon in a loop
        */
-      state->thread_start_cond_.Wait(self, state->thread_start_lock_);
+      state->thread_start_cond_.Wait(self);
     } else {
       {
         MutexLock attach_locker(self, state->attach_lock_);
@@ -172,7 +172,7 @@
          * Wait until the thread finishes basic initialization.
          * TODO: cond vars should be waited upon in a loop
          */
-        state->thread_start_cond_.Wait(self, state->thread_start_lock_);
+        state->thread_start_cond_.Wait(self);
 
         /*
          * For suspend=y, wait for the debugger to connect to us or for us to
@@ -184,7 +184,7 @@
          */
         {
           ScopedThreadStateChange tsc(self, kWaitingForDebuggerToAttach);
-          state->attach_cond_.Wait(self, state->attach_lock_);
+          state->attach_cond_.Wait(self);
         }
       }
       if (!state->IsActive()) {
@@ -298,7 +298,7 @@
   {
     MutexLock locker(thread_, thread_start_lock_);
     debug_thread_started_ = true;
-    thread_start_cond_.Broadcast();
+    thread_start_cond_.Broadcast(thread_);
   }
 
   /* set the thread state to kWaitingInMainDebuggerLoop so GCs don't wait for us */
@@ -332,7 +332,7 @@
       if (!(*transport_->establish)(this, options_)) {
         /* wake anybody who was waiting for us to succeed */
         MutexLock mu(thread_, attach_lock_);
-        attach_cond_.Broadcast();
+        attach_cond_.Broadcast(thread_);
         break;
       }
     }
@@ -366,7 +366,7 @@
 
         /* wake anybody who's waiting for us */
         MutexLock mu(thread_, attach_lock_);
-        attach_cond_.Broadcast();
+        attach_cond_.Broadcast(thread_);
       }
     }
 
diff --git a/src/jni_internal.cc b/src/jni_internal.cc
index 22438a0..60b49de 100644
--- a/src/jni_internal.cc
+++ b/src/jni_internal.cc
@@ -509,7 +509,7 @@
         handle_(handle),
         class_loader_(class_loader),
         jni_on_load_lock_("JNI_OnLoad lock"),
-        jni_on_load_cond_("JNI_OnLoad condition variable"),
+        jni_on_load_cond_("JNI_OnLoad condition variable", jni_on_load_lock_),
         jni_on_load_thread_id_(Thread::Current()->GetThinLockId()),
         jni_on_load_result_(kPending) {
   }
@@ -543,7 +543,7 @@
       } else {
         while (jni_on_load_result_ == kPending) {
           VLOG(jni) << "[" << *self << " waiting for \"" << path_ << "\" " << "JNI_OnLoad...]";
-          jni_on_load_cond_.Wait(self, jni_on_load_lock_);
+          jni_on_load_cond_.Wait(self);
         }
 
         okay = (jni_on_load_result_ == kOkay);
@@ -556,13 +556,14 @@
   }
 
   void SetResult(bool result) LOCKS_EXCLUDED(jni_on_load_lock_) {
-    MutexLock mu(Thread::Current(), jni_on_load_lock_);
+    Thread* self = Thread::Current();
+    MutexLock mu(self, jni_on_load_lock_);
 
     jni_on_load_result_ = result ? kOkay : kFailed;
     jni_on_load_thread_id_ = 0;
 
     // Broadcast a wakeup to anybody sleeping on the condition variable.
-    jni_on_load_cond_.Broadcast();
+    jni_on_load_cond_.Broadcast(self);
   }
 
   void* FindSymbol(const std::string& symbol_name) {
@@ -588,7 +589,7 @@
   // Guards remaining items.
   Mutex jni_on_load_lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
   // Wait for JNI_OnLoad in other thread.
-  ConditionVariable jni_on_load_cond_;
+  ConditionVariable jni_on_load_cond_ GUARDED_BY(jni_on_load_lock_);
   // Recursive invocation guard.
   uint32_t jni_on_load_thread_id_ GUARDED_BY(jni_on_load_lock_);
   // Result of earlier JNI_OnLoad call.
diff --git a/src/monitor.cc b/src/monitor.cc
index 9890822..8977ec9 100644
--- a/src/monitor.cc
+++ b/src/monitor.cc
@@ -16,14 +16,6 @@
 
 #include "monitor.h"
 
-#include <errno.h>
-#include <fcntl.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <sys/time.h>
-#include <time.h>
-#include <unistd.h>
-
 #include <vector>
 
 #include "class_linker.h"
@@ -364,38 +356,6 @@
   return true;
 }
 
-// Converts the given waiting time (relative to "now") into an absolute time in 'ts'.
-static void ToAbsoluteTime(int64_t ms, int32_t ns, timespec* ts)
-    SHARED_LOCKS_REQUIRED(Locks::mutator_lock_) {
-  int64_t endSec;
-
-#ifdef HAVE_TIMEDWAIT_MONOTONIC
-  clock_gettime(CLOCK_MONOTONIC, ts);
-#else
-  {
-    timeval tv;
-    gettimeofday(&tv, NULL);
-    ts->tv_sec = tv.tv_sec;
-    ts->tv_nsec = tv.tv_usec * 1000;
-  }
-#endif
-  endSec = ts->tv_sec + ms / 1000;
-  if (endSec >= 0x7fffffff) {
-    std::ostringstream ss;
-    Thread::Current()->Dump(ss);
-    LOG(INFO) << "Note: end time exceeds epoch: " << ss.str();
-    endSec = 0x7ffffffe;
-  }
-  ts->tv_sec = endSec;
-  ts->tv_nsec = (ts->tv_nsec + (ms % 1000) * 1000000) + ns;
-
-  // Catch rollover.
-  if (ts->tv_nsec >= 1000000000L) {
-    ts->tv_sec++;
-    ts->tv_nsec -= 1000000000L;
-  }
-}
-
 /*
  * Wait on a monitor until timeout, interrupt, or notification.  Used for
  * Object.wait() and (somewhat indirectly) Thread.sleep() and Thread.join().
@@ -439,14 +399,6 @@
     return;
   }
 
-  // Compute absolute wakeup time, if necessary.
-  timespec ts;
-  bool timed = false;
-  if (ms != 0 || ns != 0) {
-    ToAbsoluteTime(ms, ns, &ts);
-    timed = true;
-  }
-
   /*
    * Add ourselves to the set of threads waiting on this monitor, and
    * release our hold.  We need to let it go even if we're a few levels
@@ -471,6 +423,7 @@
    * that we won't touch any references in this state, and we'll check
    * our suspend mode before we transition out.
    */
+  bool timed = (ms != 0 || ns != 0);
   self->TransitionFromRunnableToSuspended(timed ? kTimedWaiting : kWaiting);
 
   bool wasInterrupted = false;
@@ -496,9 +449,9 @@
     } else {
       // Wait for a notification or a timeout to occur.
       if (!timed) {
-        self->wait_cond_->Wait(self, *self->wait_mutex_);
+        self->wait_cond_->Wait(self);
       } else {
-        self->wait_cond_->TimedWait(self, *self->wait_mutex_, ts);
+        self->wait_cond_->TimedWait(self, ms, ns);
       }
       if (self->interrupted_) {
         wasInterrupted = true;
@@ -568,7 +521,7 @@
     // Check to see if the thread is still waiting.
     MutexLock mu(self, *thread->wait_mutex_);
     if (thread->wait_monitor_ != NULL) {
-      thread->wait_cond_->Signal();
+      thread->wait_cond_->Signal(self);
       return;
     }
   }
diff --git a/src/mutex.cc b/src/mutex.cc
index 96e04ef..18dc863 100644
--- a/src/mutex.cc
+++ b/src/mutex.cc
@@ -17,11 +17,13 @@
 #include "mutex.h"
 
 #include <errno.h>
+#include <sys/time.h>
 
 #include "cutils/atomic.h"
 #include "cutils/atomic-inline.h"
 #include "logging.h"
 #include "runtime.h"
+#include "scoped_thread_state_change.h"
 #include "thread.h"
 #include "utils.h"
 
@@ -41,8 +43,8 @@
 #ifndef SYS_futex
 #define SYS_futex __NR_futex
 #endif
-int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, int *, int ) {
-  return syscall(SYS_futex, uaddr, op, val, timeout, NULL, NULL);
+int futex(volatile int *uaddr, int op, int val, const struct timespec *timeout, volatile int *uaddr2, int val3) {
+  return syscall(SYS_futex, uaddr, op, val, timeout, uaddr2, val3);
 }
 #endif  // ART_USE_FUTEXES
 
@@ -96,6 +98,51 @@
   }
 }
 
+// Initialize a timespec to either an absolute or relative time.
+static void InitTimeSpec(Thread* self, bool absolute, int clock, int64_t ms, int32_t ns,
+                         timespec* ts) {
+  int64_t endSec;
+
+  if (absolute) {
+    clock_gettime(clock, ts);
+  } else {
+    ts->tv_sec = 0;
+    ts->tv_nsec = 0;
+  }
+  endSec = ts->tv_sec + ms / 1000;
+  if (UNLIKELY(endSec >= 0x7fffffff)) {
+    std::ostringstream ss;
+    ScopedObjectAccess soa(self);
+    self->Dump(ss);
+    LOG(INFO) << "Note: end time exceeds epoch: " << ss.str();
+    endSec = 0x7ffffffe;
+  }
+  ts->tv_sec = endSec;
+  ts->tv_nsec = (ts->tv_nsec + (ms % 1000) * 1000000) + ns;
+
+  // Catch rollover.
+  if (ts->tv_nsec >= 1000000000L) {
+    ts->tv_sec++;
+    ts->tv_nsec -= 1000000000L;
+  }
+}
+
+#if ART_USE_FUTEXES
+static bool ComputeRelativeTimeSpec(timespec* result_ts, const timespec& lhs, const timespec& rhs) {
+  const long int one_sec = 1000 * 1000 * 1000;  // one second in nanoseconds.
+  result_ts->tv_sec = lhs.tv_sec - rhs.tv_sec;
+  result_ts->tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
+  if (result_ts->tv_nsec < 0) {
+    result_ts->tv_sec--;
+    result_ts->tv_nsec += one_sec;
+  } else if (result_ts->tv_nsec > one_sec) {
+    result_ts->tv_sec++;
+    result_ts->tv_nsec -= one_sec;
+  }
+  return result_ts->tv_sec < 0;
+}
+#endif
+
 BaseMutex::BaseMutex(const char* name, LockLevel level) : level_(level), name_(name) {}
 
 static void CheckUnattachedThread(LockLevel level) NO_THREAD_SAFETY_ANALYSIS {
@@ -176,7 +223,11 @@
 
 Mutex::Mutex(const char* name, LockLevel level, bool recursive)
     : BaseMutex(name, level), recursive_(recursive), recursion_count_(0) {
-#if defined(__BIONIC__) || defined(__APPLE__)
+#if ART_USE_FUTEXES
+  state_ = 0;
+  exclusive_owner_ = 0;
+  num_contenders_ = 0;
+#elif defined(__BIONIC__) || defined(__APPLE__)
   // Use recursive mutexes for bionic and Apple otherwise the
   // non-recursive mutexes don't have TIDs to check lock ownership of.
   pthread_mutexattr_t attributes;
@@ -190,6 +241,17 @@
 }
 
 Mutex::~Mutex() {
+#if ART_USE_FUTEXES
+  if (state_ != 0) {
+    MutexLock mu(Thread::Current(), *Locks::runtime_shutdown_lock_);
+    Runtime* runtime = Runtime::Current();
+    bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
+    LOG(shutting_down ? WARNING : FATAL) << "destroying mutex with owner: " << exclusive_owner_;
+  } else {
+    CHECK_EQ(exclusive_owner_, 0U)  << "unexpectedly found an owner on unlocked mutex " << name_;
+    CHECK_EQ(num_contenders_, 0) << "unexpectedly found a contender on mutex " << name_;
+  }
+#else
   // We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
   // may still be using locks.
   int rc = pthread_mutex_destroy(&mutex_);
@@ -201,6 +263,7 @@
     bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
     PLOG(shutting_down ? WARNING : FATAL) << "pthread_mutex_destroy failed for " << name_;
   }
+#endif
 }
 
 void Mutex::ExclusiveLock(Thread* self) {
@@ -209,7 +272,29 @@
     AssertNotHeld(self);
   }
   if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+    bool done = false;
+    do {
+      int32_t cur_state = state_;
+      if (cur_state == 0) {
+        // Change state from 0 to 1.
+        done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+      } else {
+        // Failed to acquire, hang up.
+        android_atomic_inc(&num_contenders_);
+        if (futex(&state_, FUTEX_WAIT, 1, NULL, NULL, 0) != 0) {
+          if (errno != EAGAIN) {
+            PLOG(FATAL) << "futex wait failed for " << name_;
+          }
+        }
+        android_atomic_dec(&num_contenders_);
+      }
+    } while(!done);
+    DCHECK_EQ(state_, 1);
+    exclusive_owner_ = SafeGetTid(self);
+#else
     CHECK_MUTEX_CALL(pthread_mutex_lock, (&mutex_));
+#endif
     RegisterAsLocked(self);
   }
   recursion_count_++;
@@ -226,6 +311,20 @@
     AssertNotHeld(self);
   }
   if (!recursive_ || !IsExclusiveHeld(self)) {
+#if ART_USE_FUTEXES
+    bool done = false;
+    do {
+      int32_t cur_state = state_;
+      if (cur_state == 0) {
+        // Change state from 0 to 1.
+        done = android_atomic_cmpxchg(0, 1, &state_) == 0;
+      } else {
+        return false;
+      }
+    } while(!done);
+    DCHECK_EQ(state_, 1);
+    exclusive_owner_ = SafeGetTid(self);
+#else
     int result = pthread_mutex_trylock(&mutex_);
     if (result == EBUSY) {
       return false;
@@ -234,6 +333,7 @@
       errno = result;
       PLOG(FATAL) << "pthread_mutex_trylock failed for " << name_;
     }
+#endif
     RegisterAsLocked(self);
   }
   recursion_count_++;
@@ -255,7 +355,28 @@
           << name_ << " " << recursion_count_;
     }
     RegisterAsUnlocked(self);
+#if ART_USE_FUTEXES
+  bool done = false;
+  do {
+    int32_t cur_state = state_;
+    if (cur_state == 1) {
+      // We're no longer the owner.
+      exclusive_owner_ = 0;
+      // Change state to 0.
+      done = android_atomic_cmpxchg(cur_state, 0, &state_) == 0;
+      if (done) { // Spurious fail?
+        // Wake a contender
+        if (num_contenders_ > 0) {
+          futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0);
+        }
+      }
+    } else {
+      LOG(FATAL) << "Unexpected state_:" << cur_state << " for " << name_;
+    }
+  } while(!done);
+#else
     CHECK_MUTEX_CALL(pthread_mutex_unlock, (&mutex_));
+#endif
   }
 }
 
@@ -272,7 +393,9 @@
 }
 
 uint64_t Mutex::GetExclusiveOwnerTid() const {
-#if defined(__BIONIC__)
+#if ART_USE_FUTEXES
+  return exclusive_owner_;
+#elif defined(__BIONIC__)
   return static_cast<uint64_t>((mutex_.value >> 16) & 0xffff);
 #elif defined(__GLIBC__)
   return reinterpret_cast<const glibc_pthread_mutex_t*>(&mutex_)->owner;
@@ -396,10 +519,12 @@
 }
 
 #if HAVE_TIMED_RWLOCK
-bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, const timespec& abs_timeout) {
+bool ReaderWriterMutex::ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns) {
   DCHECK(self == NULL || self == Thread::Current());
 #if ART_USE_FUTEXES
   bool done = false;
+  timespec end_abs_ts;
+  InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
   do {
     int32_t cur_state = state_;
     if (cur_state == 0) {
@@ -407,12 +532,18 @@
       done = android_atomic_cmpxchg(0, -1, &state_) == 0;
     } else {
       // Failed to acquire, hang up.
+      timespec now_abs_ts;
+      InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+      timespec rel_ts;
+      if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+        return false;  // Timed out.
+      }
       android_atomic_inc(&num_pending_writers_);
-      if (futex(&state_, FUTEX_WAIT, cur_state, &abs_timeout, NULL, 0) != 0) {
+      if (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
         if (errno == ETIMEDOUT) {
           android_atomic_dec(&num_pending_writers_);
-          return false;
-        } else if (errno != EAGAIN) {
+          return false;  // Timed out.
+        } else if (errno != EAGAIN && errno != EINTR) {
           PLOG(FATAL) << "timed futex wait failed for " << name_;
         }
       }
@@ -421,7 +552,9 @@
   } while(!done);
   exclusive_owner_ = SafeGetTid(self);
 #else
-  int result = pthread_rwlock_timedwrlock(&rwlock_, &abs_timeout);
+  timespec ts;
+  InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &ts);
+  int result = pthread_rwlock_timedwrlock(&rwlock_, &ts);
   if (result == ETIMEDOUT) {
     return false;
   }
@@ -576,11 +709,19 @@
   return os << mu.Dump();
 }
 
-ConditionVariable::ConditionVariable(const std::string& name) : name_(name) {
+ConditionVariable::ConditionVariable(const std::string& name, Mutex& guard)
+    : name_(name), guard_(guard) {
+#if ART_USE_FUTEXES
+  state_ = 0;
+  num_waiters_ = 0;
+  num_awoken_ = 0;
+#else
   CHECK_MUTEX_CALL(pthread_cond_init, (&cond_, NULL));
+#endif
 }
 
 ConditionVariable::~ConditionVariable() {
+#if !ART_USE_FUTEXES
   // We can't use CHECK_MUTEX_CALL here because on shutdown a suspended daemon thread
   // may still be using condition variables.
   int rc = pthread_cond_destroy(&cond_);
@@ -591,39 +732,162 @@
     bool shutting_down = (runtime == NULL) || runtime->IsShuttingDown();
     PLOG(shutting_down ? WARNING : FATAL) << "pthread_cond_destroy failed for " << name_;
   }
+#endif
 }
 
-void ConditionVariable::Broadcast() {
+void ConditionVariable::Broadcast(Thread* self) {
+  DCHECK(self == NULL || self == Thread::Current());
+  // TODO: enable below, there's a race in thread creation that causes false failures currently.
+  // guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+  if (num_waiters_ > 0) {
+    android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+    bool done = false;
+    do {
+       int32_t cur_state = state_;
+       // Compute number of waiters requeued and add to mutex contenders.
+       int32_t num_requeued = num_waiters_ - num_awoken_;
+       android_atomic_add(num_requeued, &guard_.num_contenders_);
+       // Requeue waiters onto contenders.
+       done = futex(&state_, FUTEX_CMP_REQUEUE, 0,
+                    reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
+                    &guard_.state_, cur_state) != -1;
+       if (!done) {
+         if (errno != EAGAIN) {
+           PLOG(FATAL) << "futex cmp requeue failed for " << name_;
+         }
+       } else {
+         num_awoken_ = num_waiters_;
+       }
+    } while (!done);
+  }
+#else
   CHECK_MUTEX_CALL(pthread_cond_broadcast, (&cond_));
+#endif
 }
 
-void ConditionVariable::Signal() {
+void ConditionVariable::Signal(Thread* self) {
+  DCHECK(self == NULL || self == Thread::Current());
+  guard_.AssertExclusiveHeld(self);
+#if ART_USE_FUTEXES
+  if (num_waiters_ > 0) {
+    android_atomic_inc(&state_); // Indicate a wake has occurred to waiters coming in.
+    // Futex wake 1 waiter who will then come and in contend on mutex. It'd be nice to requeue them
+    // to avoid this, however, requeueing can only move all waiters.
+    if (futex(&state_, FUTEX_WAKE, 1, NULL, NULL, 0) == 1) {
+      // Wake success.
+      // We weren't requeued, however, to make accounting simpler in the Wait code, increment the
+      // number of contenders on the mutex.
+      num_awoken_++;
+      android_atomic_inc(&guard_.num_contenders_);
+    }
+  }
+#else
   CHECK_MUTEX_CALL(pthread_cond_signal, (&cond_));
+#endif
 }
 
-void ConditionVariable::Wait(Thread* self, Mutex& mutex) {
-  mutex.CheckSafeToWait(self);
-  unsigned int old_recursion_count = mutex.recursion_count_;
-  mutex.recursion_count_ = 0;
-  CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &mutex.mutex_));
-  mutex.recursion_count_ = old_recursion_count;
+void ConditionVariable::Wait(Thread* self) {
+  DCHECK(self == NULL || self == Thread::Current());
+  guard_.AssertExclusiveHeld(self);
+  guard_.CheckSafeToWait(self);
+  unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+  int32_t cur_state = state_;
+  num_waiters_++;
+  guard_.recursion_count_ = 1;
+  guard_.ExclusiveUnlock(self);
+  bool woken = true;
+  while (futex(&state_, FUTEX_WAIT, cur_state, NULL, NULL, 0) != 0) {
+    if (errno == EINTR || errno == EAGAIN) {
+      if (state_ != cur_state) {
+        // We failed and a signal has come in.
+        woken = false;
+        break;
+      }
+    } else {
+      PLOG(FATAL) << "futex wait failed for " << name_;
+    }
+  }
+  guard_.ExclusiveLock(self);
+  num_waiters_--;
+  if (woken) {
+    // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+    android_atomic_dec(&guard_.num_contenders_);
+    num_awoken_--;
+  }
+#else
+  guard_.recursion_count_ = 0;
+  CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &guard_.mutex_));
+#endif
+  guard_.recursion_count_ = old_recursion_count;
 }
 
-void ConditionVariable::TimedWait(Thread* self, Mutex& mutex, const timespec& ts) {
+void ConditionVariable::TimedWait(Thread* self, int64_t ms, int32_t ns) {
+  DCHECK(self == NULL || self == Thread::Current());
+  guard_.AssertExclusiveHeld(self);
+  guard_.CheckSafeToWait(self);
+  unsigned int old_recursion_count = guard_.recursion_count_;
+#if ART_USE_FUTEXES
+  // Record the original end time so that if the futex call fails we can recompute the appropriate
+  // relative time.
+  timespec end_abs_ts;
+  InitTimeSpec(self, true, CLOCK_REALTIME, ms, ns, &end_abs_ts);
+  timespec rel_ts;
+  InitTimeSpec(self, false, CLOCK_REALTIME, ms, ns, &rel_ts);
+  // Read state so that we can know if a signal comes in during before we sleep.
+  int32_t cur_state = state_;
+  num_waiters_++;
+  guard_.recursion_count_ = 1;
+  guard_.ExclusiveUnlock(self);
+  bool woken = true;  // Did the futex wait end because we were awoken?
+  while (futex(&state_, FUTEX_WAIT, cur_state, &rel_ts, NULL, 0) != 0) {
+    if (errno == ETIMEDOUT) {
+      woken = false;
+      break;
+    }
+    if ((errno == EINTR) || (errno == EAGAIN)) {
+      if (state_ != cur_state) {
+        // We failed and a signal has come in.
+        woken = false;
+        break;
+      }
+      timespec now_abs_ts;
+      InitTimeSpec(self, true, CLOCK_REALTIME, 0, 0, &now_abs_ts);
+      if (ComputeRelativeTimeSpec(&rel_ts, end_abs_ts, now_abs_ts)) {
+        // futex failed and we timed out in the meantime.
+        woken = false;
+        break;
+      }
+    } else {
+      PLOG(FATAL) << "timed futex wait failed for " << name_;
+    }
+  }
+  guard_.ExclusiveLock(self);
+  num_waiters_--;
+  if (woken) {
+    // If we were woken we were requeued on the mutex, decrement the mutex's contender count.
+    android_atomic_dec(&guard_.num_contenders_);
+    num_awoken_--;
+  }
+#else
 #ifdef HAVE_TIMEDWAIT_MONOTONIC
 #define TIMEDWAIT pthread_cond_timedwait_monotonic
+  int clock = CLOCK_MONOTONIC;
 #else
 #define TIMEDWAIT pthread_cond_timedwait
+  int clock = CLOCK_REALTIME;
 #endif
-  mutex.CheckSafeToWait(self);
-  unsigned int old_recursion_count = mutex.recursion_count_;
-  mutex.recursion_count_ = 0;
-  int rc = TIMEDWAIT(&cond_, &mutex.mutex_, &ts);
-  mutex.recursion_count_ = old_recursion_count;
+  guard_.recursion_count_ = 0;
+  timespec ts;
+  InitTimeSpec(self, true, clock, ms, ns, &ts);
+  int rc = TIMEDWAIT(&cond_, &guard_.mutex_, &ts);
   if (rc != 0 && rc != ETIMEDOUT) {
     errno = rc;
     PLOG(FATAL) << "TimedWait failed for " << name_;
   }
+#endif
+  guard_.recursion_count_ = old_recursion_count;
 }
 
 }  // namespace art
diff --git a/src/mutex.h b/src/mutex.h
index d51e2cc..4af2ad7 100644
--- a/src/mutex.h
+++ b/src/mutex.h
@@ -132,7 +132,16 @@
   std::string Dump() const;
 
  private:
+#if ART_USE_FUTEXES
+  // 0 is unheld, 1 is held.
+  volatile int32_t state_;
+  // Exclusive owner.
+  volatile uint64_t exclusive_owner_;
+  // Number of waiting contenders.
+  volatile int32_t num_contenders_;
+#else
   pthread_mutex_t mutex_;
+#endif
   const bool recursive_;  // Can the lock be recursively held?
   unsigned int recursion_count_;
   friend class ConditionVariable;
@@ -175,7 +184,7 @@
   // Block until ReaderWriterMutex is free and acquire exclusive access. Returns true on success
   // or false if timeout is reached.
 #if HAVE_TIMED_RWLOCK
-  bool ExclusiveLockWithTimeout(Thread* self, const timespec& abs_timeout)
+  bool ExclusiveLockWithTimeout(Thread* self, int64_t ms, int32_t ns)
       EXCLUSIVE_TRYLOCK_FUNCTION(true);
 #endif
 
@@ -254,17 +263,35 @@
 // (Signal) or all at once (Broadcast).
 class ConditionVariable {
  public:
-  explicit ConditionVariable(const std::string& name);
+  explicit ConditionVariable(const std::string& name, Mutex& mutex);
   ~ConditionVariable();
 
-  void Broadcast();
-  void Signal();
-  void Wait(Thread* self, Mutex& mutex);
-  void TimedWait(Thread* self, Mutex& mutex, const timespec& ts);
+  void Broadcast(Thread* self);
+  void Signal(Thread* self);
+  // TODO: No thread safety analysis on Wait and TimedWait as they call mutex operations via their
+  //       pointer copy, thereby defeating annotalysis.
+  void Wait(Thread* self) NO_THREAD_SAFETY_ANALYSIS;
+  void TimedWait(Thread* self, int64_t ms, int32_t ns) NO_THREAD_SAFETY_ANALYSIS;
 
  private:
-  pthread_cond_t cond_;
   std::string name_;
+  // The Mutex being used by waiters. It is an error to mix condition variables between different
+  // Mutexes.
+  Mutex& guard_;
+#if ART_USE_FUTEXES
+  // A counter that is modified by signals and broadcasts. This ensures that when a waiter gives up
+  // their Mutex and another thread takes it and signals, the waiting thread observes that state_
+  // changed and doesn't enter the wait.
+  volatile int32_t state_;
+  // Number of threads that have come into to wait, not the length of the waiters on the futex as
+  // waiters may have been requeued onto guard_. A non-zero value indicates that signal and
+  // broadcast should do something. Guarded by guard_.
+  volatile int32_t num_waiters_;
+  // Number of threads that have been awoken out of the pool of waiters.
+  volatile int32_t num_awoken_;
+#else
+  pthread_cond_t cond_;
+#endif
   DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
 };
 
diff --git a/src/mutex_test.cc b/src/mutex_test.cc
index 4dac3c6..0b0f2c9 100644
--- a/src/mutex_test.cc
+++ b/src/mutex_test.cc
@@ -97,13 +97,13 @@
 
 struct RecursiveLockWait {
   explicit RecursiveLockWait()
-      : mu("test mutex", kDefaultMutexLevel, true), cv("test condition variable") {
+      : mu("test mutex", kDefaultMutexLevel, true), cv("test condition variable", mu) {
   }
 
   static void* Callback(void* arg) {
     RecursiveLockWait* state = reinterpret_cast<RecursiveLockWait*>(arg);
     state->mu.Lock(Thread::Current());
-    state->cv.Signal();
+    state->cv.Signal(Thread::Current());
     state->mu.Unlock(Thread::Current());
     return NULL;
   }
@@ -122,7 +122,7 @@
   int pthread_create_result = pthread_create(&pthread, NULL, RecursiveLockWait::Callback, &state);
   ASSERT_EQ(0, pthread_create_result);
 
-  state.cv.Wait(Thread::Current(), state.mu);
+  state.cv.Wait(Thread::Current());
 
   state.mu.Unlock(Thread::Current());
   state.mu.Unlock(Thread::Current());
diff --git a/src/runtime.cc b/src/runtime.cc
index e2e419a..ee8a8c8 100644
--- a/src/runtime.cc
+++ b/src/runtime.cc
@@ -77,7 +77,7 @@
       resolution_method_(NULL),
       system_class_loader_(NULL),
       threads_being_born_(0),
-      shutdown_cond_(new ConditionVariable("Runtime shutdown")),
+      shutdown_cond_(new ConditionVariable("Runtime shutdown", *Locks::runtime_shutdown_lock_)),
       shutting_down_(false),
       shutting_down_started_(false),
       started_(false),
@@ -120,7 +120,7 @@
     MutexLock mu(self, *Locks::runtime_shutdown_lock_);
     shutting_down_started_ = true;
     while (threads_being_born_ > 0) {
-      shutdown_cond_->Wait(self, *Locks::runtime_shutdown_lock_);
+      shutdown_cond_->Wait(self);
     }
     shutting_down_ = true;
   }
@@ -685,7 +685,7 @@
   DCHECK_GT(threads_being_born_, 0U);
   threads_being_born_--;
   if (shutting_down_started_ && threads_being_born_ == 0) {
-    shutdown_cond_->Broadcast();
+    shutdown_cond_->Broadcast(Thread::Current());
   }
 }
 
diff --git a/src/signal_catcher.cc b/src/signal_catcher.cc
index b6f6a41..80c37d4 100644
--- a/src/signal_catcher.cc
+++ b/src/signal_catcher.cc
@@ -62,7 +62,7 @@
 SignalCatcher::SignalCatcher(const std::string& stack_trace_file)
     : stack_trace_file_(stack_trace_file),
       lock_("SignalCatcher lock"),
-      cond_("SignalCatcher::cond_"),
+      cond_("SignalCatcher::cond_", lock_),
       thread_(NULL) {
   SetHaltFlag(false);
 
@@ -72,7 +72,7 @@
   Thread* self = Thread::Current();
   MutexLock mu(self, lock_);
   while (thread_ == NULL) {
-    cond_.Wait(self, lock_);
+    cond_.Wait(self);
   }
 }
 
@@ -190,7 +190,7 @@
   {
     MutexLock mu(self, signal_catcher->lock_);
     signal_catcher->thread_ = self;
-    signal_catcher->cond_.Broadcast();
+    signal_catcher->cond_.Broadcast(self);
   }
 
   // Set up mask with signals we want to handle.
diff --git a/src/signal_catcher.h b/src/signal_catcher.h
index e8ac812..074267e 100644
--- a/src/signal_catcher.h
+++ b/src/signal_catcher.h
@@ -51,8 +51,8 @@
 
   std::string stack_trace_file_;
 
-  mutable Mutex lock_;
-  ConditionVariable cond_;
+  mutable Mutex lock_ DEFAULT_MUTEX_ACQUIRED_AFTER;
+  ConditionVariable cond_ GUARDED_BY(lock_);
   bool halt_ GUARDED_BY(lock_);
   pthread_t pthread_ GUARDED_BY(lock_);
   Thread* thread_ GUARDED_BY(lock_);
diff --git a/src/thread.cc b/src/thread.cc
index 485caab..6daeadf 100644
--- a/src/thread.cc
+++ b/src/thread.cc
@@ -591,7 +591,7 @@
       DCHECK_EQ(old_state_and_flags.as_struct.state, old_state);
       while ((old_state_and_flags.as_struct.flags & kSuspendRequest) != 0) {
         // Re-check when Thread::resume_cond_ is notified.
-        Thread::resume_cond_->Wait(this, *Locks::thread_suspend_count_lock_);
+        Thread::resume_cond_->Wait(this);
         old_state_and_flags = state_and_flags_;
         DCHECK_EQ(old_state_and_flags.as_struct.state, old_state);
       }
@@ -875,7 +875,8 @@
 void Thread::Startup() {
   {
     MutexLock mu(Thread::Current(), *Locks::thread_suspend_count_lock_);  // Keep GCC happy.
-    resume_cond_ = new ConditionVariable("Thread resumption condition variable");
+    resume_cond_ = new ConditionVariable("Thread resumption condition variable",
+                                         *Locks::thread_suspend_count_lock_);
   }
 
   // Allocate a TLS slot.
@@ -916,7 +917,7 @@
       thin_lock_id_(0),
       tid_(0),
       wait_mutex_(new Mutex("a thread wait mutex")),
-      wait_cond_(new ConditionVariable("a thread wait condition variable")),
+      wait_cond_(new ConditionVariable("a thread wait condition variable", *wait_mutex_)),
       wait_monitor_(NULL),
       interrupted_(false),
       wait_next_(NULL),
@@ -1179,22 +1180,24 @@
 }
 
 void Thread::Interrupt() {
-  MutexLock mu(Thread::Current(), *wait_mutex_);
+  Thread* self = Thread::Current();
+  MutexLock mu(self, *wait_mutex_);
   if (interrupted_) {
     return;
   }
   interrupted_ = true;
-  NotifyLocked();
+  NotifyLocked(self);
 }
 
 void Thread::Notify() {
-  MutexLock mu(Thread::Current(), *wait_mutex_);
-  NotifyLocked();
+  Thread* self = Thread::Current();
+  MutexLock mu(self, *wait_mutex_);
+  NotifyLocked(self);
 }
 
-void Thread::NotifyLocked() {
+void Thread::NotifyLocked(Thread* self) {
   if (wait_monitor_ != NULL) {
-    wait_cond_->Signal();
+    wait_cond_->Signal(self);
   }
 }
 
diff --git a/src/thread.h b/src/thread.h
index 3ba9f4a..d559449 100644
--- a/src/thread.h
+++ b/src/thread.h
@@ -615,7 +615,7 @@
   void InitPthreadKeySelf();
   void InitStackHwm();
 
-  void NotifyLocked() EXCLUSIVE_LOCKS_REQUIRED(wait_mutex_);
+  void NotifyLocked(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(wait_mutex_);
 
   bool ReadFlag(ThreadFlag flag) const {
     return (state_and_flags_.as_struct.flags & flag) != 0;
@@ -632,8 +632,7 @@
 
   // Used to notify threads that they should attempt to resume, they will suspend again if
   // their suspend count is > 0.
-  static ConditionVariable* resume_cond_
-      GUARDED_BY(Locks::thread_suspend_count_lock_);
+  static ConditionVariable* resume_cond_ GUARDED_BY(Locks::thread_suspend_count_lock_);
 
   // --- Frequently accessed fields first for short offsets ---
 
diff --git a/src/thread_list.cc b/src/thread_list.cc
index 09a8de6..5ef8625 100644
--- a/src/thread_list.cc
+++ b/src/thread_list.cc
@@ -30,7 +30,7 @@
 ThreadList::ThreadList()
     : allocated_ids_lock_("allocated thread ids lock"),
       suspend_all_count_(0), debug_suspend_all_count_(0),
-      thread_exit_cond_("thread exit condition variable") {
+      thread_exit_cond_("thread exit condition variable", *Locks::thread_list_lock_) {
 }
 
 ThreadList::~ThreadList() {
@@ -184,10 +184,7 @@
   // Block on the mutator lock until all Runnable threads release their share of access.
 #if HAVE_TIMED_RWLOCK
   // Timeout if we wait more than 30 seconds.
-  timespec timeout;
-  clock_gettime(CLOCK_REALTIME, &timeout);
-  timeout.tv_sec += 30;
-  if (UNLIKELY(!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, timeout))) {
+  if (UNLIKELY(!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, 30 * 1000, 0))) {
     UnsafeLogFatalForThreadSuspendAllTimeout(self);
   }
 #else
@@ -226,7 +223,7 @@
     // Broadcast a notification to all suspended threads, some or all of
     // which may choose to wake up.  No need to wait for them.
     VLOG(threads) << *self << " ResumeAll waking others";
-    Thread::resume_cond_->Broadcast();
+    Thread::resume_cond_->Broadcast(self);
   }
   VLOG(threads) << *self << " ResumeAll complete";
 }
@@ -251,7 +248,7 @@
   {
     VLOG(threads) << "Resume(" << *thread << ") waking others";
     MutexLock mu(self, *Locks::thread_suspend_count_lock_);
-    Thread::resume_cond_->Broadcast();
+    Thread::resume_cond_->Broadcast(self);
   }
 
   VLOG(threads) << "Resume(" << *thread << ") complete";
@@ -286,10 +283,7 @@
   // immediately unlock again.
 #if HAVE_TIMED_RWLOCK
   // Timeout if we wait more than 30 seconds.
-  timespec timeout;
-  clock_gettime(CLOCK_REALTIME, &timeout);
-  timeout.tv_sec += 30;
-  if (!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, timeout)) {
+  if (!Locks::mutator_lock_->ExclusiveLockWithTimeout(self, 30 * 1000, 0)) {
     UnsafeLogFatalForThreadSuspendAllTimeout(self);
   } else {
     Locks::mutator_lock_->ExclusiveUnlock(self);
@@ -328,7 +322,7 @@
   Dbg::ClearWaitForEventThread();
 
   while (self->suspend_count_ != 0) {
-    Thread::resume_cond_->Wait(self, *Locks::thread_suspend_count_lock_);
+    Thread::resume_cond_->Wait(self);
     if (self->suspend_count_ != 0) {
       // The condition was signaled but we're still suspended. This
       // can happen if the debugger lets go while a SIGQUIT thread
@@ -366,7 +360,7 @@
 
   {
     MutexLock mu(self, *Locks::thread_suspend_count_lock_);
-    Thread::resume_cond_->Broadcast();
+    Thread::resume_cond_->Broadcast(self);
   }
 
   VLOG(threads) << "UndoDebuggerSuspensions(" << *self << ") complete";
@@ -396,7 +390,7 @@
     }
     if (!all_threads_are_daemons) {
       // Wait for another thread to exit before re-checking.
-      thread_exit_cond_.Wait(self, *Locks::thread_list_lock_);
+      thread_exit_cond_.Wait(self);
     }
   } while(!all_threads_are_daemons);
 }
@@ -486,7 +480,7 @@
 
   // Signal that a thread just detached.
   MutexLock mu(NULL, *Locks::thread_list_lock_);
-  thread_exit_cond_.Signal();
+  thread_exit_cond_.Signal(NULL);
 }
 
 void ThreadList::ForEach(void (*callback)(Thread*, void*), void* context) {