Revert^12 "Thread suspension cleanup and deadlock fix"

This reverts commit b6f3b439d4f12e89393ba8101eea8671c94ba237.

PS1: Identical to aosp/2652371 .

PS2: Introduce kSuspensionImmune to disable suspension of a thread
  that is being relied upon to execute ResumeAll(). This replaces
  the test in SuspendAll() to check whether the caller was being asked
  to suspend itself. That test was deadlock-prone, since a SuspendAll
  request from e.g. the GC to block, and GC progress might be required
  to resume the thread running the GC.
  Since SuspendAll() now only loops for a single reason, we no longer
  need to track why we looped.
  Reduce the number of iterations in each 129-ThreadGetId thread
  drammatically.

PS3: Address reviewer comments, including fixing a newly introduced
  bug in CheckSuspend(). Fix 129-GetThreadId by drammatically reducing
  the iteration count when we appear to be running slowly, which is
  normally the case for gcstress. Earlier versions of this CL were
  apparently also failing on this test, but the failure was hidden by
  other failures. This mostly undoes the PS2 change to this test, now
  that the failure is better understood.

PS4: Rebase.

PS5: Fix 129-GetThreadId code formatting.

PS6: Address more reviewer comments related to 129-GetThreadId.

PS7: Remove DCHECK in EnsureFlipFunctionStarted. It was unsafe,
  since the thread may no longer be around.

Test: Treehugger.
Bug: 240742796
Bug: 203363895
Bug: 238032384
Bug: 253671779
Bug: 276660630
Bug: 295880862
Bug: 294334417
(and more)

Change-Id: I99260fdc4feb9bcdc8b8b566e40912532f1a4937
diff --git a/compiler/jni/jni_cfi_test_expected.inc b/compiler/jni/jni_cfi_test_expected.inc
index 3fe8226..47a67c7 100644
--- a/compiler/jni/jni_cfi_test_expected.inc
+++ b/compiler/jni/jni_cfi_test_expected.inc
@@ -1,9 +1,12 @@
 // TODO These arrays should be generated automatically or have instructions for re-creation.
+// For now, the gc_is_marking offset can be adjusted by tweaking the last CL that made a
+// similar change.
+
 static constexpr uint8_t expected_asm_kThumb2[] = {
     0x2D, 0xE9, 0xE0, 0x4D, 0x2D, 0xED, 0x10, 0x8A, 0x81, 0xB0, 0x00, 0x90,
     0x19, 0x91, 0x8D, 0xED, 0x1A, 0x0A, 0x1B, 0x92, 0x1C, 0x93, 0x88, 0xB0,
     0x08, 0xB0, 0x01, 0xB0, 0xBD, 0xEC, 0x10, 0x8A, 0xBD, 0xE8, 0xE0, 0x4D,
-    0xD9, 0xF8, 0x24, 0x80, 0x70, 0x47,
+    0xD9, 0xF8, 0x20, 0x80, 0x70, 0x47,
 };
 static constexpr uint8_t expected_cfi_kThumb2[] = {
     0x44, 0x0E, 0x1C, 0x85, 0x07, 0x86, 0x06, 0x87, 0x05, 0x88, 0x04, 0x8A,
@@ -86,7 +89,7 @@
 // 0x00000024: .cfi_restore: r10
 // 0x00000024: .cfi_restore: r11
 // 0x00000024: .cfi_restore: r14
-// 0x00000024: ldr r8, [tr, #36] ; is_gc_marking
+// 0x00000024: ldr r8, [tr, #32] ; is_gc_marking
 // 0x00000028: bx lr
 // 0x0000002a: .cfi_restore_state
 // 0x0000002a: .cfi_def_cfa_offset: 112
@@ -101,7 +104,7 @@
     0xF3, 0x53, 0x45, 0xA9, 0xF5, 0x5B, 0x46, 0xA9, 0xF7, 0x63, 0x47, 0xA9,
     0xF9, 0x6B, 0x48, 0xA9, 0xFB, 0x73, 0x49, 0xA9, 0xFD, 0x7B, 0x4A, 0xA9,
     0xE8, 0x27, 0x41, 0x6D, 0xEA, 0x2F, 0x42, 0x6D, 0xEC, 0x37, 0x43, 0x6D,
-    0xEE, 0x3F, 0x44, 0x6D, 0x74, 0x26, 0x40, 0xB9, 0xFF, 0xC3, 0x02, 0x91,
+    0xEE, 0x3F, 0x44, 0x6D, 0x74, 0x22, 0x40, 0xB9, 0xFF, 0xC3, 0x02, 0x91,
     0xC0, 0x03, 0x5F, 0xD6,
 };
 static constexpr uint8_t expected_cfi_kArm64[] = {
@@ -188,7 +191,7 @@
 // 0x0000006c: ldp d14, d15, [sp, #64]
 // 0x00000070: .cfi_restore_extended: r78
 // 0x00000070: .cfi_restore_extended: r79
-// 0x00000070: ldr w20, [tr, #48] ; is_gc_marking
+// 0x00000070: ldr w20, [tr, #32] ; is_gc_marking
 // 0x00000074: add sp, sp, #0xb0 (176)
 // 0x00000078: .cfi_def_cfa_offset: 0
 // 0x00000078: ret
diff --git a/compiler/utils/assembler_thumb_test_expected.cc.inc b/compiler/utils/assembler_thumb_test_expected.cc.inc
index aea7f14..6e0048e 100644
--- a/compiler/utils/assembler_thumb_test_expected.cc.inc
+++ b/compiler/utils/assembler_thumb_test_expected.cc.inc
@@ -76,7 +76,7 @@
   "      e4: f1bb 0f00     cmp.w r11, #0\n"
   "      e8: bf18          it ne\n"
   "      ea: 46e3          movne r11, r12\n"
-  "      ec: f8d9 c09c     ldr.w r12, [r9, #156]\n"
+  "      ec: f8d9 c094     ldr.w r12, [r9, #148]\n"
   "      f0: f1bc 0f00     cmp.w r12, #0\n"
   "      f4: d16f          bne 0x1d6     @ imm = #222\n"
   "      f6: f8cd c7ff     str.w r12, [sp, #2047]\n"
@@ -151,10 +151,10 @@
   "     206: b001          add sp, #4\n"
   "     208: ecbd 8a10     vpop {s16, s17, s18, s19, s20, s21, s22, s23, s24, s25, s26, s27, s28, s29, s30, s31}\n"
   "     20c: e8bd 4de0     pop.w {r5, r6, r7, r8, r10, r11, lr}\n"
-  "     210: f8d9 8024     ldr.w r8, [r9, #36]\n"
+  "     210: f8d9 8020     ldr.w r8, [r9, #32]\n"
   "     214: 4770          bx lr\n"
-  "     216: f8d9 009c     ldr.w r0, [r9, #156]\n"
-  "     21a: f8d9 e2d0     ldr.w lr, [r9, #720]\n"
+  "     216: f8d9 0094     ldr.w r0, [r9, #148]\n"
+  "     21a: f8d9 e2c4     ldr.w lr, [r9, #708]\n"
   "     21e: 47f0          blx lr\n"
 };
 
diff --git a/openjdkjvmti/ti_stack.cc b/openjdkjvmti/ti_stack.cc
index 9af8861..f399a67 100644
--- a/openjdkjvmti/ti_stack.cc
+++ b/openjdkjvmti/ti_stack.cc
@@ -363,7 +363,13 @@
     REQUIRES_SHARED(art::Locks::mutator_lock_) {
   // Note: requires the mutator lock as the checkpoint requires the mutator lock.
   GetAllStackTracesVectorClosure<Data> closure(max_frame_count, data);
-  size_t barrier_count = art::Runtime::Current()->GetThreadList()->RunCheckpoint(&closure, nullptr);
+  // TODO(b/253671779): Replace this use of RunCheckpointUnchecked() with RunCheckpoint(). This is
+  // currently not possible, since the following undesirable call chain (abbreviated here) is then
+  // possible and exercised by current tests: (jvmti) GetAllStackTraces -> <this function> ->
+  // RunCheckpoint -> GetStackTraceVisitor -> EncodeMethodId -> Class::EnsureMethodIds ->
+  // Class::Alloc -> AllocObjectWithAllocator -> potentially suspends, or runs GC, etc. -> CHECK
+  // failure.
+  size_t barrier_count = art::Runtime::Current()->GetThreadList()->RunCheckpointUnchecked(&closure);
   if (barrier_count == 0) {
     return;
   }
diff --git a/openjdkjvmti/ti_thread.cc b/openjdkjvmti/ti_thread.cc
index 13eebbf..22aa8fe 100644
--- a/openjdkjvmti/ti_thread.cc
+++ b/openjdkjvmti/ti_thread.cc
@@ -547,6 +547,7 @@
 
 // Suspends the current thread if it has any suspend requests on it.
 void ThreadUtil::SuspendCheck(art::Thread* self) {
+  DCHECK(!self->ReadFlag(art::ThreadFlag::kSuspensionImmune));
   art::ScopedObjectAccess soa(self);
   // Really this is only needed if we are in FastJNI and actually have the mutator_lock_ already.
   self->FullSuspendCheck();
@@ -900,17 +901,13 @@
         }
       }
     }
-    bool timeout = true;
     art::Thread* ret_target = art::Runtime::Current()->GetThreadList()->SuspendThreadByPeer(
-        target_jthread,
-        art::SuspendReason::kForUserCode,
-        &timeout);
-    if (ret_target == nullptr && !timeout) {
+        target_jthread, art::SuspendReason::kForUserCode);
+    if (ret_target == nullptr) {
       // TODO It would be good to get more information about why exactly the thread failed to
       // suspend.
       return ERR(INTERNAL);
-    } else if (!timeout) {
-      // we didn't time out and got a result.
+    } else {
       return OK;
     }
     // We timed out. Just go around and try again.
@@ -927,11 +924,7 @@
       // This can only happen if we race with another thread to suspend 'self' and we lose.
       return ERR(THREAD_SUSPENDED);
     }
-    // We shouldn't be able to fail this.
-    if (!self->ModifySuspendCount(self, +1, nullptr, art::SuspendReason::kForUserCode)) {
-      // TODO More specific error would be nice.
-      return ERR(INTERNAL);
-    }
+    self->IncrementSuspendCount(self, nullptr, nullptr, art::SuspendReason::kForUserCode);
   }
   // Once we have requested the suspend we actually go to sleep. We need to do this after releasing
   // the suspend_lock to make sure we can be woken up. This call gains the mutator lock causing us
diff --git a/runtime/base/locks.h b/runtime/base/locks.h
index c15e5de..e8c83fe 100644
--- a/runtime/base/locks.h
+++ b/runtime/base/locks.h
@@ -108,10 +108,6 @@
   kClassLinkerClassesLock,  // TODO rename.
   kSubtypeCheckLock,
   kBreakpointLock,
-  // This is a generic lock level for a lock meant to be gained after having a
-  // monitor lock.
-  kPostMonitorLock,
-  kMonitorLock,
   kMonitorListLock,
   kThreadListLock,
   kAllocTrackerLock,
@@ -125,7 +121,10 @@
   kRuntimeShutdownLock,
   kTraceLock,
   kHeapBitmapLock,
-
+  // This is a generic lock level for a lock meant to be gained after having a
+  // monitor lock.
+  kPostMonitorLock,
+  kMonitorLock,
   // This is a generic lock level for a top-level lock meant to be gained after having the
   // mutator_lock_.
   kPostMutatorTopLockLevel,
@@ -138,7 +137,7 @@
   kUserCodeSuspensionLock,
   kZygoteCreationLock,
 
-  // The highest valid lock level. Use this if there is code that should only be called with no
+  // The highest valid lock level. Use this for locks that should only be acquired with no
   // other locks held. Since this is the highest lock level we also allow it to be held even if the
   // runtime or current thread is not fully set-up yet (for example during thread attach). Note that
   // this lock also has special behavior around the mutator_lock_. Since the mutator_lock_ is not
diff --git a/runtime/base/mutex-inl.h b/runtime/base/mutex-inl.h
index dba1e12..712b61d 100644
--- a/runtime/base/mutex-inl.h
+++ b/runtime/base/mutex-inl.h
@@ -60,43 +60,44 @@
   // The check below enumerates the cases where we expect not to be able to check the validity of
   // locks on a thread. Lock checking is disabled to avoid deadlock when checking shutdown lock.
   // TODO: tighten this check.
-  if (kDebugLocking) {
-    CHECK(!Locks::IsSafeToCallAbortRacy() ||
-          // Used during thread creation to avoid races with runtime shutdown. Thread::Current not
-          // yet established.
-          level == kRuntimeShutdownLock ||
-          // Thread Ids are allocated/released before threads are established.
-          level == kAllocatedThreadIdsLock ||
-          // Thread LDT's are initialized without Thread::Current established.
-          level == kModifyLdtLock ||
-          // Threads are unregistered while holding the thread list lock, during this process they
-          // no longer exist and so we expect an unlock with no self.
-          level == kThreadListLock ||
-          // Ignore logging which may or may not have set up thread data structures.
-          level == kLoggingLock ||
-          // When transitioning from suspended to runnable, a daemon thread might be in
-          // a situation where the runtime is shutting down. To not crash our debug locking
-          // mechanism we just pass null Thread* to the MutexLock during that transition
-          // (see Thread::TransitionFromSuspendedToRunnable).
-          level == kThreadSuspendCountLock ||
-          // Avoid recursive death.
-          level == kAbortLock ||
-          // Locks at the absolute top of the stack can be locked at any time.
-          level == kTopLockLevel ||
-          // The unexpected signal handler may be catching signals from any thread.
-          level == kUnexpectedSignalLock) << level;
-  }
+  CHECK(!Locks::IsSafeToCallAbortRacy() ||
+        // Used during thread creation to avoid races with runtime shutdown. Thread::Current not
+        // yet established.
+        level == kRuntimeShutdownLock ||
+        // Thread Ids are allocated/released before threads are established.
+        level == kAllocatedThreadIdsLock ||
+        // Thread LDT's are initialized without Thread::Current established.
+        level == kModifyLdtLock ||
+        // Threads are unregistered while holding the thread list lock, during this process they
+        // no longer exist and so we expect an unlock with no self.
+        level == kThreadListLock ||
+        // Ignore logging which may or may not have set up thread data structures.
+        level == kLoggingLock ||
+        // When transitioning from suspended to runnable, a daemon thread might be in
+        // a situation where the runtime is shutting down. To not crash our debug locking
+        // mechanism we just pass null Thread* to the MutexLock during that transition
+        // (see Thread::TransitionFromSuspendedToRunnable).
+        level == kThreadSuspendCountLock ||
+        // Avoid recursive death.
+        level == kAbortLock ||
+        // Locks at the absolute top of the stack can be locked at any time.
+        level == kTopLockLevel ||
+        // The unexpected signal handler may be catching signals from any thread.
+        level == kUnexpectedSignalLock)
+      << level;
 }
 
-inline void BaseMutex::RegisterAsLocked(Thread* self) {
+inline void BaseMutex::RegisterAsLocked(Thread* self, bool check) {
   if (UNLIKELY(self == nullptr)) {
-    CheckUnattachedThread(level_);
-    return;
+    if (check) {
+      CheckUnattachedThread(level_);
+    }
+  } else {
+    RegisterAsLockedImpl(self, level_, check);
   }
-  RegisterAsLockedImpl(self, level_);
 }
 
-inline void BaseMutex::RegisterAsLockedImpl(Thread* self, LockLevel level) {
+inline void BaseMutex::RegisterAsLockedImpl(Thread* self, LockLevel level, bool check) {
   DCHECK(self != nullptr);
   DCHECK_EQ(level_, level);
   // It would be nice to avoid this condition checking in the non-debug case,
@@ -107,7 +108,7 @@
   if (UNLIKELY(level == kThreadWaitLock) && self->GetHeldMutex(kThreadWaitLock) != nullptr) {
     level = kThreadWaitWakeLock;
   }
-  if (kDebugLocking) {
+  if (check) {
     // Check if a bad Mutex of this level or lower is held.
     bool bad_mutexes_held = false;
     // Specifically allow a kTopLockLevel lock to be gained when the current thread holds the
@@ -161,10 +162,12 @@
 
 inline void BaseMutex::RegisterAsUnlocked(Thread* self) {
   if (UNLIKELY(self == nullptr)) {
-    CheckUnattachedThread(level_);
-    return;
+    if (kDebugLocking) {
+      CheckUnattachedThread(level_);
+    }
+  } else {
+    RegisterAsUnlockedImpl(self, level_);
   }
-  RegisterAsUnlockedImpl(self , level_);
 }
 
 inline void BaseMutex::RegisterAsUnlockedImpl(Thread* self, LockLevel level) {
@@ -306,7 +309,7 @@
 }
 
 inline void MutatorMutex::TransitionFromSuspendedToRunnable(Thread* self) {
-  RegisterAsLockedImpl(self, kMutatorLock);
+  RegisterAsLockedImpl(self, kMutatorLock, kDebugLocking);
   AssertSharedHeld(self);
 }
 
diff --git a/runtime/base/mutex.cc b/runtime/base/mutex.cc
index 728dc84..66541f9 100644
--- a/runtime/base/mutex.cc
+++ b/runtime/base/mutex.cc
@@ -246,11 +246,12 @@
 }
 
 void BaseMutex::CheckSafeToWait(Thread* self) {
-  if (self == nullptr) {
-    CheckUnattachedThread(level_);
+  if (!kDebugLocking) {
     return;
   }
-  if (kDebugLocking) {
+  if (self == nullptr) {
+    CheckUnattachedThread(level_);
+  } else {
     CHECK(self->GetHeldMutex(level_) == this || level_ == kMonitorLock)
         << "Waiting on unacquired mutex: " << name_;
     bool bad_mutexes_held = false;
@@ -570,6 +571,7 @@
   }
 }
 
+template <bool kCheck>
 bool Mutex::ExclusiveTryLock(Thread* self) {
   DCHECK(self == nullptr || self == Thread::Current());
   if (kDebugLocking && !recursive_) {
@@ -600,7 +602,7 @@
 #endif
     DCHECK_EQ(GetExclusiveOwnerTid(), 0);
     exclusive_owner_.store(SafeGetTid(self), std::memory_order_relaxed);
-    RegisterAsLocked(self);
+    RegisterAsLocked(self, kCheck);
   }
   recursion_count_++;
   if (kDebugLocking) {
@@ -611,6 +613,9 @@
   return true;
 }
 
+template bool Mutex::ExclusiveTryLock<false>(Thread* self);
+template bool Mutex::ExclusiveTryLock<true>(Thread* self);
+
 bool Mutex::ExclusiveTryLockWithSpinning(Thread* self) {
   // Spin a small number of times, since this affects our ability to respond to suspension
   // requests. We spin repeatedly only if the mutex repeatedly becomes available and unavailable
@@ -721,6 +726,9 @@
       << name_
       << " level=" << static_cast<int>(level_)
       << " rec=" << recursion_count_
+#if ART_USE_FUTEXES
+      << " state_and_contenders = " << std::hex << state_and_contenders_ << std::dec
+#endif
       << " owner=" << GetExclusiveOwnerTid() << " ";
   DumpContention(os);
 }
@@ -923,7 +931,7 @@
 }
 #endif
 
-bool ReaderWriterMutex::SharedTryLock(Thread* self) {
+bool ReaderWriterMutex::SharedTryLock(Thread* self, bool check) {
   DCHECK(self == nullptr || self == Thread::Current());
 #if ART_USE_FUTEXES
   bool done = false;
@@ -947,7 +955,7 @@
     PLOG(FATAL) << "pthread_mutex_trylock failed for " << name_;
   }
 #endif
-  RegisterAsLocked(self);
+  RegisterAsLocked(self, check);
   AssertSharedHeld(self);
   return true;
 }
diff --git a/runtime/base/mutex.h b/runtime/base/mutex.h
index 87e9525..925f7a2 100644
--- a/runtime/base/mutex.h
+++ b/runtime/base/mutex.h
@@ -103,10 +103,11 @@
   BaseMutex(const char* name, LockLevel level);
   virtual ~BaseMutex();
 
-  // Add this mutex to those owned by self, and perform appropriate checking.
-  // For this call only, self may also be another suspended thread.
-  void RegisterAsLocked(Thread* self);
-  void RegisterAsLockedImpl(Thread* self, LockLevel level);
+  // Add this mutex to those owned by self, and optionally perform lock order checking.  Caller
+  // may wish to disable checking for trylock calls that cannot result in deadlock.  For this call
+  // only, self may also be another suspended thread.
+  void RegisterAsLocked(Thread* self, bool check = kDebugLocking);
+  void RegisterAsLockedImpl(Thread* self, LockLevel level, bool check);
 
   void RegisterAsUnlocked(Thread* self);
   void RegisterAsUnlockedImpl(Thread* self, LockLevel level);
@@ -183,7 +184,10 @@
   void ExclusiveLock(Thread* self) ACQUIRE();
   void Lock(Thread* self) ACQUIRE() {  ExclusiveLock(self); }
 
-  // Returns true if acquires exclusive access, false otherwise.
+  // Returns true if acquires exclusive access, false otherwise. The `check` argument specifies
+  // whether lock level checking should be performed.  Should be defaulted unless we are using
+  // TryLock instead of Lock for deadlock avoidance.
+  template <bool kCheck = kDebugLocking>
   bool ExclusiveTryLock(Thread* self) TRY_ACQUIRE(true);
   bool TryLock(Thread* self) TRY_ACQUIRE(true) { return ExclusiveTryLock(self); }
   // Equivalent to ExclusiveTryLock, but retry for a short period before giving up.
@@ -342,7 +346,7 @@
   void ReaderLock(Thread* self) ACQUIRE_SHARED() { SharedLock(self); }
 
   // Try to acquire share of ReaderWriterMutex.
-  bool SharedTryLock(Thread* self) SHARED_TRYLOCK_FUNCTION(true);
+  bool SharedTryLock(Thread* self, bool check = kDebugLocking) SHARED_TRYLOCK_FUNCTION(true);
 
   // Release a share of the access.
   void SharedUnlock(Thread* self) RELEASE_SHARED() ALWAYS_INLINE;
diff --git a/runtime/cha.cc b/runtime/cha.cc
index 8b77f76..0c548d3 100644
--- a/runtime/cha.cc
+++ b/runtime/cha.cc
@@ -237,11 +237,10 @@
       : barrier_(0),
         method_headers_(method_headers) {}
 
-  void Run(Thread* thread) override {
+  void Run(Thread* thread) override REQUIRES_SHARED(Locks::mutator_lock_) {
     // Note thread and self may not be equal if thread was already suspended at
     // the point of the request.
     Thread* self = Thread::Current();
-    ScopedObjectAccess soa(self);
     CHAStackVisitor visitor(thread, nullptr, method_headers_);
     visitor.WalkStack();
     barrier_.Pass(self);
diff --git a/runtime/debugger.cc b/runtime/debugger.cc
index a7b818e..3c8e6e6 100644
--- a/runtime/debugger.cc
+++ b/runtime/debugger.cc
@@ -340,7 +340,11 @@
       Dbg::DdmSendThreadNotification(thread, CHUNK_TYPE("THCR"));
       finish_barrier.Pass(cls_self);
     });
-    size_t checkpoints = Runtime::Current()->GetThreadList()->RunCheckpoint(&fc);
+    // TODO(b/253671779): The above eventually results in calls to EventHandler::DispatchEvent,
+    // which does a ScopedThreadStateChange, which amounts to a thread state change inside the
+    // checkpoint run method. Hence the normal check would fail, and thus we specify Unchecked
+    // here.
+    size_t checkpoints = Runtime::Current()->GetThreadList()->RunCheckpointUnchecked(&fc);
     ScopedThreadSuspension sts(self, ThreadState::kWaitingForCheckPointsToRun);
     finish_barrier.Increment(self, checkpoints);
   }
diff --git a/runtime/entrypoints_order_test.cc b/runtime/entrypoints_order_test.cc
index 9b4af66..a2d5a43 100644
--- a/runtime/entrypoints_order_test.cc
+++ b/runtime/entrypoints_order_test.cc
@@ -70,7 +70,6 @@
     EXPECT_OFFSET_DIFFP(Thread, tls32_, daemon, throwing_OutOfMemoryError, 4);
     EXPECT_OFFSET_DIFFP(Thread, tls32_, throwing_OutOfMemoryError, no_thread_suspension, 4);
     EXPECT_OFFSET_DIFFP(Thread, tls32_, no_thread_suspension, thread_exit_check_count, 4);
-    EXPECT_OFFSET_DIFFP(Thread, tls32_, thread_exit_check_count, is_transitioning_to_runnable, 4);
 
     // TODO: Better connection. Take alignment into account.
     EXPECT_OFFSET_DIFF_GT3(Thread, tls32_.thread_exit_check_count, tls64_.trace_clock_base, 4,
@@ -108,13 +107,14 @@
     EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, name, pthread_self, sizeof(void*));
     EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, pthread_self, last_no_thread_suspension_cause,
                         sizeof(void*));
-    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, last_no_thread_suspension_cause, active_suspend_barriers,
-                        sizeof(void*));
-    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, active_suspend_barriers, thread_local_start,
-                        sizeof(Thread::tls_ptr_sized_values::active_suspend_barriers));
-    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, thread_local_start, thread_local_pos, sizeof(void*));
+    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, last_no_thread_suspension_cause,
+                        active_suspendall_barrier, sizeof(void*));
+    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, active_suspendall_barrier,
+                        active_suspend1_barriers, sizeof(void*));
+    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, active_suspend1_barriers, thread_local_pos, sizeof(void*));
     EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, thread_local_pos, thread_local_end, sizeof(void*));
-    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, thread_local_end, thread_local_limit, sizeof(void*));
+    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, thread_local_end, thread_local_start, sizeof(void*));
+    EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, thread_local_start, thread_local_limit, sizeof(void*));
     EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, thread_local_limit, thread_local_objects, sizeof(void*));
     EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, thread_local_objects, checkpoint_function, sizeof(size_t));
     EXPECT_OFFSET_DIFFP(Thread, tlsPtr_, checkpoint_function, jni_entrypoints,
@@ -137,11 +137,12 @@
         Thread, tlsPtr_, top_reflective_handle_scope, method_trace_buffer, sizeof(void*));
     EXPECT_OFFSET_DIFFP(
         Thread, tlsPtr_, method_trace_buffer, method_trace_buffer_index, sizeof(void*));
+    EXPECT_OFFSET_DIFFP(
+        Thread, tlsPtr_, method_trace_buffer_index, thread_exit_flags, sizeof(void*));
     // The first field after tlsPtr_ is forced to a 16 byte alignment so it might have some space.
     auto offset_tlsptr_end = OFFSETOF_MEMBER(Thread, tlsPtr_) +
         sizeof(decltype(reinterpret_cast<Thread*>(16)->tlsPtr_));
-    CHECKED(offset_tlsptr_end - OFFSETOF_MEMBER(Thread, tlsPtr_.method_trace_buffer_index) ==
-                sizeof(void*),
+    CHECKED(offset_tlsptr_end - OFFSETOF_MEMBER(Thread, tlsPtr_.thread_exit_flags) == sizeof(void*),
             "async_exception last field");
   }
 
diff --git a/runtime/gc/collector/concurrent_copying.cc b/runtime/gc/collector/concurrent_copying.cc
index 787e997..0cb2609 100644
--- a/runtime/gc/collector/concurrent_copying.cc
+++ b/runtime/gc/collector/concurrent_copying.cc
@@ -480,7 +480,8 @@
   }
 
   void Run(Thread* thread) override REQUIRES_SHARED(Locks::mutator_lock_) {
-    // Note: self is not necessarily equal to thread since thread may be suspended.
+    // We are either running this in the target thread, or the target thread will wait for us
+    // before switching back to runnable.
     Thread* self = Thread::Current();
     CHECK(thread == self || thread->GetState() != ThreadState::kRunnable)
         << thread->GetState() << " thread " << thread << " self " << self;
@@ -495,7 +496,6 @@
     // We can use the non-CAS VisitRoots functions below because we update thread-local GC roots
     // only.
     thread->VisitRoots(this, kVisitRootFlagAllRoots);
-    concurrent_copying_->GetBarrier().Pass(self);
   }
 
   void VisitRoots(mirror::Object*** roots,
@@ -764,17 +764,12 @@
   }
   Thread* self = Thread::Current();
   Locks::mutator_lock_->AssertNotHeld(self);
-  gc_barrier_->Init(self, 0);
   ThreadFlipVisitor thread_flip_visitor(this, heap_->use_tlab_);
   FlipCallback flip_callback(this);
 
-  size_t barrier_count = Runtime::Current()->GetThreadList()->FlipThreadRoots(
+  Runtime::Current()->GetThreadList()->FlipThreadRoots(
       &thread_flip_visitor, &flip_callback, this, GetHeap()->GetGcPauseListener());
 
-  {
-    ScopedThreadStateChange tsc(self, ThreadState::kWaitingForCheckPointsToRun);
-    gc_barrier_->Increment(self, barrier_count);
-  }
   is_asserting_to_space_invariant_ = true;
   QuasiAtomic::ThreadFenceForConstructor();
   if (kVerboseMode) {
diff --git a/runtime/gc/collector/mark_compact.cc b/runtime/gc/collector/mark_compact.cc
index aee5b29..0e0d26f 100644
--- a/runtime/gc/collector/mark_compact.cc
+++ b/runtime/gc/collector/mark_compact.cc
@@ -740,7 +740,6 @@
     CHECK(collector_->compacting_);
     thread->SweepInterpreterCache(collector_);
     thread->AdjustTlab(collector_->black_objs_slide_diff_);
-    collector_->GetBarrier().Pass(self);
   }
 
  private:
@@ -788,15 +787,10 @@
 
   {
     // Compaction pause
-    gc_barrier_.Init(self, 0);
     ThreadFlipVisitor visitor(this);
     FlipCallback callback(this);
-    size_t barrier_count = runtime->GetThreadList()->FlipThreadRoots(
+    runtime->GetThreadList()->FlipThreadRoots(
         &visitor, &callback, this, GetHeap()->GetGcPauseListener());
-    {
-      ScopedThreadStateChange tsc(self, ThreadState::kWaitingForCheckPointsToRun);
-      gc_barrier_.Increment(self, barrier_count);
-    }
   }
 
   if (IsValidFd(uffd_)) {
diff --git a/runtime/gc/heap.cc b/runtime/gc/heap.cc
index a5cfb42..bcc6138 100644
--- a/runtime/gc/heap.cc
+++ b/runtime/gc/heap.cc
@@ -2723,113 +2723,122 @@
   }
   ScopedThreadStateChange tsc(self, ThreadState::kWaitingPerformingGc);
   Locks::mutator_lock_->AssertNotHeld(self);
-  if (self->IsHandlingStackOverflow()) {
-    // If we are throwing a stack overflow error we probably don't have enough remaining stack
-    // space to run the GC.
-    // Count this as a GC in case someone is waiting for it to complete.
-    gcs_completed_.fetch_add(1, std::memory_order_release);
-    return collector::kGcTypeNone;
-  }
-  bool compacting_gc;
+  SelfDeletingTask* clear;  // Unconditionally set below.
   {
-    gc_complete_lock_->AssertNotHeld(self);
-    ScopedThreadStateChange tsc2(self, ThreadState::kWaitingForGcToComplete);
-    MutexLock mu(self, *gc_complete_lock_);
-    // Ensure there is only one GC at a time.
-    WaitForGcToCompleteLocked(gc_cause, self);
-    if (requested_gc_num != GC_NUM_ANY && !GCNumberLt(GetCurrentGcNum(), requested_gc_num)) {
-      // The appropriate GC was already triggered elsewhere.
-      return collector::kGcTypeNone;
-    }
-    compacting_gc = IsMovingGc(collector_type_);
-    // GC can be disabled if someone has a used GetPrimitiveArrayCritical.
-    if (compacting_gc && disable_moving_gc_count_ != 0) {
-      LOG(WARNING) << "Skipping GC due to disable moving GC count " << disable_moving_gc_count_;
-      // Again count this as a GC.
+    // We should not ever become runnable and re-suspend while executing a GC.
+    // This would likely cause a deadlock if we acted on a suspension request.
+    // TODO: We really want to assert that we don't transition to kRunnable.
+    ScopedAssertNoThreadSuspension("Performing GC");
+    if (self->IsHandlingStackOverflow()) {
+      // If we are throwing a stack overflow error we probably don't have enough remaining stack
+      // space to run the GC.
+      // Count this as a GC in case someone is waiting for it to complete.
       gcs_completed_.fetch_add(1, std::memory_order_release);
       return collector::kGcTypeNone;
     }
-    if (gc_disabled_for_shutdown_) {
-      gcs_completed_.fetch_add(1, std::memory_order_release);
-      return collector::kGcTypeNone;
-    }
-    collector_type_running_ = collector_type_;
-    last_gc_cause_ = gc_cause;
-  }
-  if (gc_cause == kGcCauseForAlloc && runtime->HasStatsEnabled()) {
-    ++runtime->GetStats()->gc_for_alloc_count;
-    ++self->GetStats()->gc_for_alloc_count;
-  }
-  const size_t bytes_allocated_before_gc = GetBytesAllocated();
-
-  DCHECK_LT(gc_type, collector::kGcTypeMax);
-  DCHECK_NE(gc_type, collector::kGcTypeNone);
-
-  collector::GarbageCollector* collector = nullptr;
-  // TODO: Clean this up.
-  if (compacting_gc) {
-    DCHECK(current_allocator_ == kAllocatorTypeBumpPointer ||
-           current_allocator_ == kAllocatorTypeTLAB ||
-           current_allocator_ == kAllocatorTypeRegion ||
-           current_allocator_ == kAllocatorTypeRegionTLAB);
-    switch (collector_type_) {
-      case kCollectorTypeSS:
-        semi_space_collector_->SetFromSpace(bump_pointer_space_);
-        semi_space_collector_->SetToSpace(temp_space_);
-        semi_space_collector_->SetSwapSemiSpaces(true);
-        collector = semi_space_collector_;
-        break;
-      case kCollectorTypeCMC:
-        collector = mark_compact_;
-        break;
-      case kCollectorTypeCC:
-        collector::ConcurrentCopying* active_cc_collector;
-        if (use_generational_cc_) {
-          // TODO: Other threads must do the flip checkpoint before they start poking at
-          // active_concurrent_copying_collector_. So we should not concurrency here.
-          active_cc_collector = (gc_type == collector::kGcTypeSticky) ?
-                  young_concurrent_copying_collector_ : concurrent_copying_collector_;
-          active_concurrent_copying_collector_.store(active_cc_collector,
-                                                     std::memory_order_relaxed);
-          DCHECK(active_cc_collector->RegionSpace() == region_space_);
-          collector = active_cc_collector;
-        } else {
-          collector = active_concurrent_copying_collector_.load(std::memory_order_relaxed);
-        }
-        break;
-      default:
-        LOG(FATAL) << "Invalid collector type " << static_cast<size_t>(collector_type_);
-    }
-    // temp_space_ will be null for kCollectorTypeCMC.
-    if (temp_space_ != nullptr
-        && collector != active_concurrent_copying_collector_.load(std::memory_order_relaxed)) {
-      temp_space_->GetMemMap()->Protect(PROT_READ | PROT_WRITE);
-      if (kIsDebugBuild) {
-        // Try to read each page of the memory map in case mprotect didn't work properly b/19894268.
-        temp_space_->GetMemMap()->TryReadable();
+    bool compacting_gc;
+    {
+      gc_complete_lock_->AssertNotHeld(self);
+      ScopedThreadStateChange tsc2(self, ThreadState::kWaitingForGcToComplete);
+      MutexLock mu(self, *gc_complete_lock_);
+      // Ensure there is only one GC at a time.
+      WaitForGcToCompleteLocked(gc_cause, self);
+      if (requested_gc_num != GC_NUM_ANY && !GCNumberLt(GetCurrentGcNum(), requested_gc_num)) {
+        // The appropriate GC was already triggered elsewhere.
+        return collector::kGcTypeNone;
       }
-      CHECK(temp_space_->IsEmpty());
+      compacting_gc = IsMovingGc(collector_type_);
+      // GC can be disabled if someone has a used GetPrimitiveArrayCritical.
+      if (compacting_gc && disable_moving_gc_count_ != 0) {
+        LOG(WARNING) << "Skipping GC due to disable moving GC count " << disable_moving_gc_count_;
+        // Again count this as a GC.
+        gcs_completed_.fetch_add(1, std::memory_order_release);
+        return collector::kGcTypeNone;
+      }
+      if (gc_disabled_for_shutdown_) {
+        gcs_completed_.fetch_add(1, std::memory_order_release);
+        return collector::kGcTypeNone;
+      }
+      collector_type_running_ = collector_type_;
+      last_gc_cause_ = gc_cause;
     }
-  } else if (current_allocator_ == kAllocatorTypeRosAlloc ||
-      current_allocator_ == kAllocatorTypeDlMalloc) {
-    collector = FindCollectorByGcType(gc_type);
-  } else {
-    LOG(FATAL) << "Invalid current allocator " << current_allocator_;
-  }
+    if (gc_cause == kGcCauseForAlloc && runtime->HasStatsEnabled()) {
+      ++runtime->GetStats()->gc_for_alloc_count;
+      ++self->GetStats()->gc_for_alloc_count;
+    }
+    const size_t bytes_allocated_before_gc = GetBytesAllocated();
 
-  CHECK(collector != nullptr)
-      << "Could not find garbage collector with collector_type="
-      << static_cast<size_t>(collector_type_) << " and gc_type=" << gc_type;
-  collector->Run(gc_cause, clear_soft_references || runtime->IsZygote());
-  IncrementFreedEver();
-  RequestTrim(self);
-  // Collect cleared references.
-  SelfDeletingTask* clear = reference_processor_->CollectClearedReferences(self);
-  // Grow the heap so that we know when to perform the next GC.
-  GrowForUtilization(collector, bytes_allocated_before_gc);
-  old_native_bytes_allocated_.store(GetNativeBytes());
-  LogGC(gc_cause, collector);
-  FinishGC(self, gc_type);
+    DCHECK_LT(gc_type, collector::kGcTypeMax);
+    DCHECK_NE(gc_type, collector::kGcTypeNone);
+
+    collector::GarbageCollector* collector = nullptr;
+    // TODO: Clean this up.
+    if (compacting_gc) {
+      DCHECK(current_allocator_ == kAllocatorTypeBumpPointer ||
+             current_allocator_ == kAllocatorTypeTLAB ||
+             current_allocator_ == kAllocatorTypeRegion ||
+             current_allocator_ == kAllocatorTypeRegionTLAB);
+      switch (collector_type_) {
+        case kCollectorTypeSS:
+          semi_space_collector_->SetFromSpace(bump_pointer_space_);
+          semi_space_collector_->SetToSpace(temp_space_);
+          semi_space_collector_->SetSwapSemiSpaces(true);
+          collector = semi_space_collector_;
+          break;
+        case kCollectorTypeCMC:
+          collector = mark_compact_;
+          break;
+        case kCollectorTypeCC:
+          collector::ConcurrentCopying* active_cc_collector;
+          if (use_generational_cc_) {
+            // TODO: Other threads must do the flip checkpoint before they start poking at
+            // active_concurrent_copying_collector_. So we should not concurrency here.
+            active_cc_collector = (gc_type == collector::kGcTypeSticky) ?
+                                      young_concurrent_copying_collector_ :
+                                      concurrent_copying_collector_;
+            active_concurrent_copying_collector_.store(active_cc_collector,
+                                                       std::memory_order_relaxed);
+            DCHECK(active_cc_collector->RegionSpace() == region_space_);
+            collector = active_cc_collector;
+          } else {
+            collector = active_concurrent_copying_collector_.load(std::memory_order_relaxed);
+          }
+          break;
+        default:
+          LOG(FATAL) << "Invalid collector type " << static_cast<size_t>(collector_type_);
+      }
+      // temp_space_ will be null for kCollectorTypeCMC.
+      if (temp_space_ != nullptr &&
+          collector != active_concurrent_copying_collector_.load(std::memory_order_relaxed)) {
+        temp_space_->GetMemMap()->Protect(PROT_READ | PROT_WRITE);
+        if (kIsDebugBuild) {
+          // Try to read each page of the memory map in case mprotect didn't work properly
+          // b/19894268.
+          temp_space_->GetMemMap()->TryReadable();
+        }
+        CHECK(temp_space_->IsEmpty());
+      }
+    } else if (current_allocator_ == kAllocatorTypeRosAlloc ||
+               current_allocator_ == kAllocatorTypeDlMalloc) {
+      collector = FindCollectorByGcType(gc_type);
+    } else {
+      LOG(FATAL) << "Invalid current allocator " << current_allocator_;
+    }
+
+    CHECK(collector != nullptr) << "Could not find garbage collector with collector_type="
+                                << static_cast<size_t>(collector_type_)
+                                << " and gc_type=" << gc_type;
+    collector->Run(gc_cause, clear_soft_references || runtime->IsZygote());
+    IncrementFreedEver();
+    RequestTrim(self);
+    // Collect cleared references.
+    clear = reference_processor_->CollectClearedReferences(self);
+    // Grow the heap so that we know when to perform the next GC.
+    GrowForUtilization(collector, bytes_allocated_before_gc);
+    old_native_bytes_allocated_.store(GetNativeBytes());
+    LogGC(gc_cause, collector);
+    FinishGC(self, gc_type);
+  }
   // Actually enqueue all cleared references. Do this after the GC has officially finished since
   // otherwise we can deadlock.
   clear->Run(self);
diff --git a/runtime/mirror/object.cc b/runtime/mirror/object.cc
index 940b82d..995f06e 100644
--- a/runtime/mirror/object.cc
+++ b/runtime/mirror/object.cc
@@ -185,7 +185,8 @@
   hash_code_seed.store(new_seed, std::memory_order_relaxed);
 }
 
-int32_t Object::IdentityHashCode() {
+template <bool kAllowInflation>
+int32_t Object::IdentityHashCodeHelper() {
   ObjPtr<Object> current_this = this;  // The this pointer may get invalidated by thread suspension.
   while (true) {
     LockWord lw = current_this->GetLockWord(false);
@@ -203,6 +204,9 @@
         break;
       }
       case LockWord::kThinLocked: {
+        if (!kAllowInflation) {
+          return 0;
+        }
         // Inflate the thin lock to a monitor and stick the hash code inside of the monitor. May
         // fail spuriously.
         Thread* self = Thread::Current();
@@ -230,6 +234,12 @@
   }
 }
 
+int32_t Object::IdentityHashCode() { return IdentityHashCodeHelper</* kAllowInflation= */ true>(); }
+
+int32_t Object::IdentityHashCodeNoInflation() {
+  return IdentityHashCodeHelper</* kAllowInflation= */ false>();
+}
+
 void Object::CheckFieldAssignmentImpl(MemberOffset field_offset, ObjPtr<Object> new_value) {
   ObjPtr<Class> c = GetClass();
   Runtime* runtime = Runtime::Current();
diff --git a/runtime/mirror/object.h b/runtime/mirror/object.h
index 95b9f86..5e770f4 100644
--- a/runtime/mirror/object.h
+++ b/runtime/mirror/object.h
@@ -137,11 +137,16 @@
       REQUIRES_SHARED(Locks::mutator_lock_)
       REQUIRES(!Roles::uninterruptible_);
 
+  // Returns a nonzero value that fits into lockword slot.
   int32_t IdentityHashCode()
       REQUIRES_SHARED(Locks::mutator_lock_)
       REQUIRES(!Locks::thread_list_lock_,
                !Locks::thread_suspend_count_lock_);
 
+  // Identical to the above, but returns 0 if monitor inflation would otherwise be needed.
+  int32_t IdentityHashCodeNoInflation() REQUIRES_SHARED(Locks::mutator_lock_)
+      REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
+
   static constexpr MemberOffset MonitorOffset() {
     return OFFSET_OF_OBJECT_MEMBER(Object, monitor_);
   }
@@ -726,6 +731,10 @@
       REQUIRES_SHARED(Locks::mutator_lock_);
 
  private:
+  template <bool kAllowInflation>
+  int32_t IdentityHashCodeHelper() REQUIRES_SHARED(Locks::mutator_lock_)
+      REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
+
   // Get a field with acquire semantics.
   template<typename kSize>
   ALWAYS_INLINE kSize GetFieldAcquire(MemberOffset field_offset)
diff --git a/runtime/monitor.cc b/runtime/monitor.cc
index 3fed8d4..4e9e4d9 100644
--- a/runtime/monitor.cc
+++ b/runtime/monitor.cc
@@ -1005,7 +1005,7 @@
     if (monitor->num_waiters_.load(std::memory_order_relaxed) > 0) {
       return false;
     }
-    if (!monitor->monitor_lock_.ExclusiveTryLock(self)) {
+    if (!monitor->monitor_lock_.ExclusiveTryLock</* check= */ false>(self)) {
       // We cannot deflate a monitor that's currently held. It's unclear whether we should if
       // we could.
       return false;
@@ -1065,13 +1065,10 @@
     ThreadList* thread_list = Runtime::Current()->GetThreadList();
     // Suspend the owner, inflate. First change to blocked and give up mutator_lock_.
     self->SetMonitorEnterObject(obj.Get());
-    bool timed_out;
     Thread* owner;
     {
       ScopedThreadSuspension sts(self, ThreadState::kWaitingForLockInflation);
-      owner = thread_list->SuspendThreadByThreadId(owner_thread_id,
-                                                   SuspendReason::kInternal,
-                                                   &timed_out);
+      owner = thread_list->SuspendThreadByThreadId(owner_thread_id, SuspendReason::kInternal);
     }
     if (owner != nullptr) {
       // We succeeded in suspending the thread, check the lock's status didn't change.
diff --git a/runtime/mutator_gc_coord.md b/runtime/mutator_gc_coord.md
index aba8421..01e3ef0 100644
--- a/runtime/mutator_gc_coord.md
+++ b/runtime/mutator_gc_coord.md
@@ -100,7 +100,11 @@
 
 Logically the mutator lock is held in shared/reader mode if ***either*** the
 underlying reader-writer lock is held in shared mode, ***or*** if a mutator is in
-runnable state.
+runnable state. These two ways of holding the mutator mutex are ***not***
+equivalent: In particular, we rely on the garbage collector never actually
+entering a "runnable" state while active (see below). However, it often runs with
+the explicit mutator mutex in shared mode, thus blocking others from acquiring it
+in exclusive mode.
 
 Suspension and checkpoint API
 -----------------------------
@@ -217,13 +221,99 @@
 checkpoints do not preclude client threads from being in the middle of an
 operation that involves a weak reference access, while nonempty checkpoints do.
 
+**Suspending the GC**
+Under unusual conditions, the GC can run on any thread. This means that when
+thread *A* suspends thread *B* for some other reason, Thread *B* might be
+running the garbage collector and conceivably thus cause it to block.  This
+would be very deadlock prone. If Thread *A* allocates while Thread *B* is
+suspended in the GC, and the allocation requires the GC's help to complete, we
+deadlock.
 
-[^1]: Some comments in the code refer to a not-yet-really-implemented scheme in
-which the compiler-generated code would load through the address at
-`tlsPtr_.suspend_trigger`. A thread suspension is requested by setting this to
-null, triggering a `SIGSEGV`, causing that thread to check for GC cooperation
-requests. The real mechanism instead sets an appropriate `ThreadFlag` entry to
-request suspension or a checkpoint. Note that the actual checkpoint function
-value is set, along with the flag, while holding `suspend_count_lock_`. If the
-target thread notices that a checkpoint is requested, it then acquires
-the `suspend_count_lock_` to read the checkpoint function.
+Thus we ensure that the GC, together with anything else that can block GCs,
+cannot be blocked for thread suspension requests. This is accomplished by
+ensuring that it always appears to be in a suspended thread state. Since we
+only check for suspend requests when entering the runnable state, suspend
+requests go unnoticed until the GC completes. It may physically acquire and
+release the actual `mutator_lock_` in either shared or exclusive mode.
+
+Thread Suspension Mechanics
+---------------------------
+
+Thread suspension is initiated by a registered thread, except that, for testing
+purposes, `SuspendAll` may be invoked with `self == nullptr`.  We never suspend
+the initiating thread, explicitly exclusing it from `SuspendAll()`, and failing
+`SuspendThreadBy...()` requests to that effect.
+
+The suspend calls invoke `IncrementSuspendCount()` to increment the thread
+suspend count for each thread. That adds a "suspend barrier" (atomic counter) to
+the per-thread list of such counters to decrement. It normally sets the
+`kSuspendRequest` ("should enter safepoint handler") and `kActiveSuspendBarrier`
+("need to notify us when suspended") flags.
+
+After setting these two flags, we check whether the thread is suspended and
+`kSuspendRequest` is still set. Since the thread is already suspended, it cannot
+be expected to respond to "pass the suspend barrier" (decrement the atomic
+counter) in a timely fashion.  Hence we do so on its behalf. This decrements
+the "barrier" and removes it from the thread's list of barriers to decrement,
+and clears `kActiveSuspendBarrier`. `kSuspendRequest` remains to ensure the
+thread doesn't prematurely return to runnable state.
+
+If `SuspendAllInternal()` does not immediately see a suspended state, then it is up
+to the target thread to decrement the suspend barrier.
+`TransitionFromRunnableToSuspended()` calls
+`TransitionToSuspendedAndRunCheckpoints()`, which changes the thread state
+and returns. `TransitionFromRunnableToSuspended()` then calls
+`CheckActiveSuspendBarriers()` to check for the `kActiveSuspendBarrier` flag
+and decrement the suspend barrier if set.
+
+The `suspend_count_lock_` is not consistently held in the target thread
+during this process.  Thus correctness in resolving the race between a
+suspension-requesting thread and a target thread voluntarily suspending relies
+on first requesting suspension, and then checking whether the target is
+already suspended, The detailed correctness argument is given in a comment
+inside `SuspendAllInternal()`. This also ensures that the barrier cannot be
+decremented after the stack frame holding the barrier goes away.
+
+This relies on the fact that the two stores in the two threads to the state and
+kActiveSuspendBarrier flag are ordered with respect to the later loads. That's
+guaranteed, since they are all stored in a single `atomic<>`. Thus even relaxed
+accesses are OK.
+
+The actual suspend barrier representation still varies between `SuspendAll()`
+and `SuspendThreadBy...()`.  The former relies on the fact that only one such
+barrier can be in use at a time, while the latter maintains a linked list of
+active suspend barriers for each target thread, relying on the fact that each
+one can appear on the list of only one thread, and we can thus use list nodes
+allocated in the stack frames of requesting threads.
+
+**Avoiding suspension cycles**
+
+Any thread can issue a `SuspendThreadByPeer()`, `SuspendThreadByThreadId()` or
+`SuspendAll()` request. But if Thread A increments Thread B's suspend count
+while Thread B increments Thread A's suspend count, and they then both suspend
+during a subsequent thread transition, we're deadlocked.
+
+For single-thread suspension requests, we refuse to initiate
+a suspend request from a registered thread that is also being asked to suspend
+(i.e. the suspend count is nonzero).  Instead the requestor waits for that
+condition to change.  This means that we cannot create a cycle in which each
+thread has asked to suspend the next one, and thus no thread can progress.  The
+required atomicity of the requestor suspend count check with setting the suspend
+count of the target(s) target is ensured by holding `suspend_count_lock_`.
+
+For `SuspendAll()`, we enforce a requirement that at most one `SuspendAll()`
+request is running at one time. We also set the `kSuspensionImmune` thread flag
+to prevent a single thread suspension of a thread currently between
+`SuspendAll()` and `ResumeAll()` calls. Thus once a `SuspendAll()` call starts,
+it will complete before it can be affected by suspension requests from other
+threads.
+
+[^1]: In the most recent versions of ART, compiler-generated code loads through
+    the address at `tlsPtr_.suspend_trigger`. A thread suspension is requested
+    by setting this to null, triggering a `SIGSEGV`, causing that thread to
+    check for GC cooperation requests. The older mechanism instead sets an
+    appropriate `ThreadFlag` entry to request suspension or a checkpoint. Note
+    that the actual checkpoint function value is set, along with the flag, while
+    holding `suspend_count_lock_`. If the target thread notices that a
+    checkpoint is requested, it then acquires the `suspend_count_lock_` to read
+    the checkpoint function.
diff --git a/runtime/native/dalvik_system_VMStack.cc b/runtime/native/dalvik_system_VMStack.cc
index 71078c9..bf441b4 100644
--- a/runtime/native/dalvik_system_VMStack.cc
+++ b/runtime/native/dalvik_system_VMStack.cc
@@ -48,19 +48,29 @@
   } else {
     // Never allow suspending the heap task thread since it may deadlock if allocations are
     // required for the stack trace.
-    Thread* heap_task_thread =
-        Runtime::Current()->GetHeap()->GetTaskProcessor()->GetRunningThread();
-    // heap_task_thread could be null if the daemons aren't yet started.
-    if (heap_task_thread != nullptr && decoded_peer == heap_task_thread->GetPeerFromOtherThread()) {
+    Thread* heap_task_thread = nullptr;
+    for (int i = 0; i < 20; ++i) {
+      heap_task_thread = Runtime::Current()->GetHeap()->GetTaskProcessor()->GetRunningThread();
+      if (heap_task_thread != nullptr) {
+        break;
+      }
+      // heap_task_thread could be null if the daemons aren't fully running yet.  But it can appear
+      // in enumerations, and thus we could try to get its stack even before that. Try to wait
+      // for a non-null value so we can avoid suspending it.
+      static constexpr int kHeapTaskDaemonSleepMicros = 5000;
+      usleep(kHeapTaskDaemonSleepMicros);
+    }
+    if (heap_task_thread == nullptr) {
+      LOG(ERROR) << "No HeapTaskDaemon; refusing to get thread stack.";
+      return nullptr;
+    }
+    if (decoded_peer == heap_task_thread->GetPeerFromOtherThread()) {
       return nullptr;
     }
     // Suspend thread to build stack trace.
     ScopedThreadSuspension sts(soa.Self(), ThreadState::kNative);
     ThreadList* thread_list = Runtime::Current()->GetThreadList();
-    bool timed_out;
-    Thread* thread = thread_list->SuspendThreadByPeer(peer,
-                                                      SuspendReason::kInternal,
-                                                      &timed_out);
+    Thread* thread = thread_list->SuspendThreadByPeer(peer, SuspendReason::kInternal);
     if (thread != nullptr) {
       // Must be runnable to create returned array.
       {
@@ -70,9 +80,6 @@
       // Restart suspended thread.
       bool resumed = thread_list->Resume(thread, SuspendReason::kInternal);
       DCHECK(resumed);
-    } else if (timed_out) {
-      LOG(ERROR) << "Trying to get thread's stack failed as the thread failed to suspend within a "
-          "generous timeout.";
     }
   }
   return trace;
diff --git a/runtime/native/java_lang_Thread.cc b/runtime/native/java_lang_Thread.cc
index 570c554..fd67a0a 100644
--- a/runtime/native/java_lang_Thread.cc
+++ b/runtime/native/java_lang_Thread.cc
@@ -147,11 +147,8 @@
   // thread list lock to avoid this, as setting the thread name causes mutator to lock/unlock
   // in the DDMS send code.
   ThreadList* thread_list = Runtime::Current()->GetThreadList();
-  bool timed_out;
   // Take suspend thread lock to avoid races with threads trying to suspend this one.
-  Thread* thread = thread_list->SuspendThreadByPeer(peer,
-                                                    SuspendReason::kInternal,
-                                                    &timed_out);
+  Thread* thread = thread_list->SuspendThreadByPeer(peer, SuspendReason::kInternal);
   if (thread != nullptr) {
     {
       ScopedObjectAccess soa(env);
@@ -159,9 +156,6 @@
     }
     bool resumed = thread_list->Resume(thread, SuspendReason::kInternal);
     DCHECK(resumed);
-  } else if (timed_out) {
-    LOG(ERROR) << "Trying to set thread name to '" << name.c_str() << "' failed as the thread "
-        "failed to suspend within a generous timeout.";
   }
 }
 
diff --git a/runtime/native/org_apache_harmony_dalvik_ddmc_DdmVmInternal.cc b/runtime/native/org_apache_harmony_dalvik_ddmc_DdmVmInternal.cc
index 081ec20..f20cd28 100644
--- a/runtime/native/org_apache_harmony_dalvik_ddmc_DdmVmInternal.cc
+++ b/runtime/native/org_apache_harmony_dalvik_ddmc_DdmVmInternal.cc
@@ -59,7 +59,6 @@
     trace = Thread::InternalStackTraceToStackTraceElementArray(soa, internal_trace);
   } else {
     ThreadList* thread_list = Runtime::Current()->GetThreadList();
-    bool timed_out;
 
     // Check for valid thread
     if (thin_lock_id == ThreadList::kInvalidThreadId) {
@@ -67,9 +66,7 @@
     }
 
     // Suspend thread to build stack trace.
-    Thread* thread = thread_list->SuspendThreadByThreadId(thin_lock_id,
-                                                          SuspendReason::kInternal,
-                                                          &timed_out);
+    Thread* thread = thread_list->SuspendThreadByThreadId(thin_lock_id, SuspendReason::kInternal);
     if (thread != nullptr) {
       {
         ScopedObjectAccess soa(env);
@@ -79,11 +76,6 @@
       // Restart suspended thread.
       bool resumed = thread_list->Resume(thread, SuspendReason::kInternal);
       DCHECK(resumed);
-    } else {
-      if (timed_out) {
-        LOG(ERROR) << "Trying to get thread's stack by id failed as the thread failed to suspend "
-            "within a generous timeout.";
-      }
     }
   }
   return trace;
diff --git a/runtime/oat.h b/runtime/oat.h
index 64ad9db..d1e1899 100644
--- a/runtime/oat.h
+++ b/runtime/oat.h
@@ -44,8 +44,8 @@
 class PACKED(4) OatHeader {
  public:
   static constexpr std::array<uint8_t, 4> kOatMagic { { 'o', 'a', 't', '\n' } };
-  // Last oat version changed reason: Disable implicit suspend checks; b/291839153
-  static constexpr std::array<uint8_t, 4> kOatVersion{{'2', '3', '4', '\0'}};
+  // Last oat version changed reason: Change suspend barrier data structure.
+  static constexpr std::array<uint8_t, 4> kOatVersion{{'2', '3', '5', '\0'}};
 
   static constexpr const char* kDex2OatCmdLineKey = "dex2oat-cmdline";
   static constexpr const char* kDebuggableKey = "debuggable";
diff --git a/runtime/thread-inl.h b/runtime/thread-inl.h
index ce50471..4bb8160 100644
--- a/runtime/thread-inl.h
+++ b/runtime/thread-inl.h
@@ -17,8 +17,6 @@
 #ifndef ART_RUNTIME_THREAD_INL_H_
 #define ART_RUNTIME_THREAD_INL_H_
 
-#include "thread.h"
-
 #include "arch/instruction_set.h"
 #include "base/aborting.h"
 #include "base/casts.h"
@@ -28,8 +26,8 @@
 #include "jni/jni_env_ext.h"
 #include "managed_stack-inl.h"
 #include "obj_ptr-inl.h"
-#include "suspend_reason.h"
 #include "thread-current-inl.h"
+#include "thread.h"
 #include "thread_pool.h"
 
 namespace art {
@@ -81,12 +79,15 @@
       break;
     } else if (state_and_flags.IsFlagSet(ThreadFlag::kCheckpointRequest)) {
       RunCheckpointFunction();
-    } else if (state_and_flags.IsFlagSet(ThreadFlag::kSuspendRequest)) {
+    } else if (state_and_flags.IsFlagSet(ThreadFlag::kSuspendRequest) &&
+               !state_and_flags.IsFlagSet(ThreadFlag::kSuspensionImmune)) {
       FullSuspendCheck(implicit);
       implicit = false;  // We do not need to `MadviseAwayAlternateSignalStack()` anymore.
-    } else {
-      DCHECK(state_and_flags.IsFlagSet(ThreadFlag::kEmptyCheckpointRequest));
+    } else if (state_and_flags.IsFlagSet(ThreadFlag::kEmptyCheckpointRequest)) {
       RunEmptyCheckpoint();
+    } else {
+      DCHECK(state_and_flags.IsFlagSet(ThreadFlag::kSuspensionImmune));
+      break;
     }
   }
   if (implicit) {
@@ -249,7 +250,8 @@
   }
 }
 
-inline void Thread::PassActiveSuspendBarriers() {
+inline void Thread::CheckActiveSuspendBarriers() {
+  DCHECK_NE(GetState(), ThreadState::kRunnable);
   while (true) {
     StateAndFlags state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
     if (LIKELY(!state_and_flags.IsFlagSet(ThreadFlag::kCheckpointRequest) &&
@@ -257,7 +259,7 @@
                !state_and_flags.IsFlagSet(ThreadFlag::kActiveSuspendBarrier))) {
       break;
     } else if (state_and_flags.IsFlagSet(ThreadFlag::kActiveSuspendBarrier)) {
-      PassActiveSuspendBarriers(this);
+      PassActiveSuspendBarriers();
     } else {
       // Impossible
       LOG(FATAL) << "Fatal, thread transitioned into suspended without running the checkpoint";
@@ -265,6 +267,20 @@
   }
 }
 
+inline void Thread::AddSuspend1Barrier(WrappedSuspend1Barrier* suspend1_barrier) {
+  suspend1_barrier->next_ = tlsPtr_.active_suspend1_barriers;
+  tlsPtr_.active_suspend1_barriers = suspend1_barrier;
+}
+
+inline void Thread::RemoveFirstSuspend1Barrier() {
+  tlsPtr_.active_suspend1_barriers = tlsPtr_.active_suspend1_barriers->next_;
+}
+
+inline bool Thread::HasActiveSuspendBarrier() {
+  return tlsPtr_.active_suspend1_barriers != nullptr ||
+         tlsPtr_.active_suspendall_barrier != nullptr;
+}
+
 inline void Thread::TransitionFromRunnableToSuspended(ThreadState new_state) {
   // Note: JNI stubs inline a fast path of this method that transitions to suspended if
   // there are no flags set and then clears the `held_mutexes[kMutatorLock]` (this comes
@@ -280,7 +296,7 @@
   // Mark the release of the share of the mutator lock.
   GetMutatorLock()->TransitionFromRunnableToSuspended(this);
   // Once suspended - check the active suspend barrier flag
-  PassActiveSuspendBarriers();
+  CheckActiveSuspendBarriers();
 }
 
 inline ThreadState Thread::TransitionFromSuspendedToRunnable() {
@@ -290,6 +306,7 @@
   // inlined from the `GetMutatorLock()->TransitionFromSuspendedToRunnable(this)` below).
   // Therefore any code added here (other than debug build assertions) should be gated
   // on some flag being set, so that the JNI stub can take the slow path to get here.
+  DCHECK(this == Current());
   StateAndFlags old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
   ThreadState old_state = old_state_and_flags.GetState();
   DCHECK_NE(old_state, ThreadState::kRunnable);
@@ -311,7 +328,7 @@
         break;
       }
     } else if (old_state_and_flags.IsFlagSet(ThreadFlag::kActiveSuspendBarrier)) {
-      PassActiveSuspendBarriers(this);
+      PassActiveSuspendBarriers();
     } else if (UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kCheckpointRequest) ||
                         old_state_and_flags.IsFlagSet(ThreadFlag::kEmptyCheckpointRequest))) {
       // Checkpoint flags should not be set while in suspended state.
@@ -333,7 +350,6 @@
         thread_to_pass = this;
       }
       MutexLock mu(thread_to_pass, *Locks::thread_suspend_count_lock_);
-      ScopedTransitioningToRunnable scoped_transitioning_to_runnable(this);
       // Reload state and flags after locking the mutex.
       old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
       DCHECK_EQ(old_state, old_state_and_flags.GetState());
@@ -345,14 +361,15 @@
         DCHECK_EQ(old_state, old_state_and_flags.GetState());
       }
       DCHECK_EQ(GetSuspendCount(), 0);
-    } else if (UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction)) ||
-               UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kWaitingForFlipFunction))) {
-      // It's possible that some thread runs this thread's flip-function in
-      // Thread::GetPeerFromOtherThread() even though it was runnable.
+    } else if (UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction))) {
+      DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction));
+      // Do this before transitioning to runnable, both because we shouldn't wait in a runnable
+      // state, and so that the thread running the flip function can DCHECK we're not runnable.
       WaitForFlipFunction(this);
-    } else {
-      DCHECK(old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction));
-      if (EnsureFlipFunctionStarted(this, old_state_and_flags)) {
+    } else if (old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction)) {
+      // Logically acquire mutator lock in shared mode.
+      DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction));
+      if (EnsureFlipFunctionStarted(this, this, old_state_and_flags)) {
         break;
       }
     }
@@ -360,6 +377,7 @@
     old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
     DCHECK_EQ(old_state, old_state_and_flags.GetState());
   }
+  DCHECK_EQ(this->GetState(), ThreadState::kRunnable);
   return static_cast<ThreadState>(old_state);
 }
 
@@ -438,35 +456,70 @@
   }
 }
 
-inline bool Thread::ModifySuspendCount(Thread* self,
-                                       int delta,
-                                       AtomicInteger* suspend_barrier,
-                                       SuspendReason reason) {
-  if (delta > 0 &&
-      (((gUseUserfaultfd || gUseReadBarrier) && this != self) || suspend_barrier != nullptr)) {
-    // When delta > 0 (requesting a suspend), ModifySuspendCountInternal() may fail either if
-    // active_suspend_barriers is full or we are in the middle of a thread flip. Retry in a loop.
-    while (true) {
-      if (LIKELY(ModifySuspendCountInternal(self, delta, suspend_barrier, reason))) {
-        return true;
-      } else {
-        // Failure means the list of active_suspend_barriers is full or we are in the middle of a
-        // thread flip, we should release the thread_suspend_count_lock_ (to avoid deadlock) and
-        // wait till the target thread has executed or Thread::PassActiveSuspendBarriers() or the
-        // flip function. Note that we could not simply wait for the thread to change to a suspended
-        // state, because it might need to run checkpoint function before the state change or
-        // resumes from the resume_cond_, which also needs thread_suspend_count_lock_.
-        //
-        // The list of active_suspend_barriers is very unlikely to be full since more than
-        // kMaxSuspendBarriers threads need to execute SuspendAllInternal() simultaneously, and
-        // target thread stays in kRunnable in the mean time.
-        Locks::thread_suspend_count_lock_->ExclusiveUnlock(self);
-        NanoSleep(100000);
-        Locks::thread_suspend_count_lock_->ExclusiveLock(self);
-      }
+inline void Thread::IncrementSuspendCount(Thread* self,
+                                          AtomicInteger* suspendall_barrier,
+                                          WrappedSuspend1Barrier* suspend1_barrier,
+                                          SuspendReason reason) {
+  if (kIsDebugBuild) {
+    Locks::thread_suspend_count_lock_->AssertHeld(self);
+    if (this != self && !IsSuspended()) {
+      Locks::thread_list_lock_->AssertHeld(self);
     }
-  } else {
-    return ModifySuspendCountInternal(self, delta, suspend_barrier, reason);
+  }
+  if (UNLIKELY(reason == SuspendReason::kForUserCode)) {
+    Locks::user_code_suspension_lock_->AssertHeld(self);
+  }
+
+  uint32_t flags = enum_cast<uint32_t>(ThreadFlag::kSuspendRequest);
+  if (suspendall_barrier != nullptr) {
+    DCHECK(suspend1_barrier == nullptr);
+    DCHECK(tlsPtr_.active_suspendall_barrier == nullptr);
+    tlsPtr_.active_suspendall_barrier = suspendall_barrier;
+    flags |= enum_cast<uint32_t>(ThreadFlag::kActiveSuspendBarrier);
+  } else if (suspend1_barrier != nullptr) {
+    AddSuspend1Barrier(suspend1_barrier);
+    flags |= enum_cast<uint32_t>(ThreadFlag::kActiveSuspendBarrier);
+  }
+
+  ++tls32_.suspend_count;
+  if (reason == SuspendReason::kForUserCode) {
+    ++tls32_.user_code_suspend_count;
+  }
+
+  // Two bits might be set simultaneously.
+  tls32_.state_and_flags.fetch_or(flags, std::memory_order_release);
+  TriggerSuspend();
+}
+
+inline void Thread::IncrementSuspendCount(Thread* self) {
+  IncrementSuspendCount(self, nullptr, nullptr, SuspendReason::kInternal);
+}
+
+inline void Thread::DecrementSuspendCount(Thread* self, bool for_user_code) {
+  if (kIsDebugBuild) {
+    Locks::thread_suspend_count_lock_->AssertHeld(self);
+    if (this != self && !IsSuspended()) {
+      Locks::thread_list_lock_->AssertHeld(self);
+    }
+  }
+  if (UNLIKELY(tls32_.suspend_count <= 0)) {
+    UnsafeLogFatalForSuspendCount(self, this);
+    UNREACHABLE();
+  }
+  if (for_user_code) {
+    Locks::user_code_suspension_lock_->AssertHeld(self);
+    if (UNLIKELY(tls32_.user_code_suspend_count <= 0)) {
+      LOG(ERROR) << "user_code_suspend_count incorrect";
+      UnsafeLogFatalForSuspendCount(self, this);
+      UNREACHABLE();
+    }
+    --tls32_.user_code_suspend_count;
+  }
+
+  --tls32_.suspend_count;
+
+  if (tls32_.suspend_count == 0) {
+    AtomicClearFlag(ThreadFlag::kSuspendRequest, std::memory_order_release);
   }
 }
 
@@ -499,6 +552,34 @@
   tlsPtr_.stack_end = tlsPtr_.stack_begin + GetStackOverflowReservedBytes(kRuntimeISA);
 }
 
+inline void Thread::NotifyOnThreadExit(ThreadExitFlag* tef) {
+  tef->next_ = tlsPtr_.thread_exit_flags;
+  if (tef->next_ == nullptr) {
+    tlsPtr_.thread_exit_flags = tef;
+  } else {
+    DCHECK(!tef->next_->HasExited());
+    tef->next_->prev_ = tef;
+  }
+  tef->prev_ = nullptr;
+}
+
+inline void Thread::UnregisterThreadExitFlag(ThreadExitFlag* tef) {
+  if (tef->HasExited()) {
+    // List is no longer used; each client will deallocate its own ThreadExitFlag.
+    return;
+  }
+  // Remove tef from the list.
+  if (tef->next_ != nullptr) {
+    tef->next_->prev_ = tef->prev_;
+  }
+  if (tef->prev_ == nullptr) {
+    DCHECK_EQ(tlsPtr_.thread_exit_flags, tef);
+    tlsPtr_.thread_exit_flags = tef->next_;
+  } else {
+    tef->prev_->next_ = tef->next_;
+  }
+}
+
 }  // namespace art
 
 #endif  // ART_RUNTIME_THREAD_INL_H_
diff --git a/runtime/thread.cc b/runtime/thread.cc
index 4c9d5ef..f444655 100644
--- a/runtime/thread.cc
+++ b/runtime/thread.cc
@@ -144,9 +144,6 @@
 #endif
 
 static constexpr bool kVerifyImageObjectsMarked = kIsDebugBuild;
-// Amount of time (in microseconds) that we sleep if another thread is running
-// flip function of the thread that we are interested in.
-static constexpr size_t kSuspendTimeDuringFlip = 5'000;
 
 // For implicit overflow checks we reserve an extra piece of memory at the bottom
 // of the stack (lowest memory).  The higher portion of the memory
@@ -1454,7 +1451,7 @@
 }
 
 // Attempt to rectify locks so that we dump thread list with required locks before exiting.
-static void UnsafeLogFatalForSuspendCount(Thread* self, Thread* thread) NO_THREAD_SAFETY_ANALYSIS {
+void Thread::UnsafeLogFatalForSuspendCount(Thread* self, Thread* thread) NO_THREAD_SAFETY_ANALYSIS {
   LOG(ERROR) << *thread << " suspend count already zero.";
   Locks::thread_suspend_count_lock_->Unlock(self);
   if (!Locks::mutator_lock_->IsSharedHeld(self)) {
@@ -1472,141 +1469,53 @@
   std::ostringstream ss;
   Runtime::Current()->GetThreadList()->Dump(ss);
   LOG(FATAL) << ss.str();
+  UNREACHABLE();
 }
 
-bool Thread::ModifySuspendCountInternal(Thread* self,
-                                        int delta,
-                                        AtomicInteger* suspend_barrier,
-                                        SuspendReason reason) {
-  if (kIsDebugBuild) {
-    DCHECK(delta == -1 || delta == +1)
-          << reason << " " << delta << " " << this;
-    Locks::thread_suspend_count_lock_->AssertHeld(self);
-    if (this != self && !IsSuspended()) {
-      Locks::thread_list_lock_->AssertHeld(self);
-    }
-  }
-  // User code suspensions need to be checked more closely since they originate from code outside of
-  // the runtime's control.
-  if (UNLIKELY(reason == SuspendReason::kForUserCode)) {
-    Locks::user_code_suspension_lock_->AssertHeld(self);
-    if (UNLIKELY(delta + tls32_.user_code_suspend_count < 0)) {
-      LOG(ERROR) << "attempting to modify suspend count in an illegal way.";
-      return false;
-    }
-  }
-  if (UNLIKELY(delta < 0 && tls32_.suspend_count <= 0)) {
-    UnsafeLogFatalForSuspendCount(self, this);
-    return false;
-  }
-
-  if (delta > 0 && this != self && tlsPtr_.flip_function != nullptr) {
-    // Force retry of a suspend request if it's in the middle of a thread flip to avoid a
-    // deadlock. b/31683379.
-    return false;
-  }
-
-  uint32_t flags = enum_cast<uint32_t>(ThreadFlag::kSuspendRequest);
-  if (delta > 0 && suspend_barrier != nullptr) {
-    uint32_t available_barrier = kMaxSuspendBarriers;
-    for (uint32_t i = 0; i < kMaxSuspendBarriers; ++i) {
-      if (tlsPtr_.active_suspend_barriers[i] == nullptr) {
-        available_barrier = i;
-        break;
-      }
-    }
-    if (available_barrier == kMaxSuspendBarriers) {
-      // No barrier spaces available, we can't add another.
-      return false;
-    }
-    tlsPtr_.active_suspend_barriers[available_barrier] = suspend_barrier;
-    flags |= enum_cast<uint32_t>(ThreadFlag::kActiveSuspendBarrier);
-  }
-
-  tls32_.suspend_count += delta;
-  switch (reason) {
-    case SuspendReason::kForUserCode:
-      tls32_.user_code_suspend_count += delta;
-      break;
-    case SuspendReason::kInternal:
-      break;
-  }
-
-  if (tls32_.suspend_count == 0) {
-    AtomicClearFlag(ThreadFlag::kSuspendRequest);
-  } else {
-    // Two bits might be set simultaneously.
-    tls32_.state_and_flags.fetch_or(flags, std::memory_order_seq_cst);
-    TriggerSuspend();
-  }
-  return true;
-}
-
-bool Thread::PassActiveSuspendBarriers(Thread* self) {
-  // Grab the suspend_count lock and copy the current set of
-  // barriers. Then clear the list and the flag. The ModifySuspendCount
-  // function requires the lock so we prevent a race between setting
+bool Thread::PassActiveSuspendBarriers() {
+  DCHECK_EQ(this, Thread::Current());
+  DCHECK_NE(GetState(), ThreadState::kRunnable);
+  // Grab the suspend_count lock and copy the current set of barriers. Then clear the list and the
+  // flag. The IncrementSuspendCount function requires the lock so we prevent a race between setting
   // the kActiveSuspendBarrier flag and clearing it.
-  AtomicInteger* pass_barriers[kMaxSuspendBarriers];
+  std::vector<AtomicInteger*> pass_barriers{};
   {
-    MutexLock mu(self, *Locks::thread_suspend_count_lock_);
+    MutexLock mu(this, *Locks::thread_suspend_count_lock_);
     if (!ReadFlag(ThreadFlag::kActiveSuspendBarrier)) {
-      // quick exit test: the barriers have already been claimed - this is
-      // possible as there may be a race to claim and it doesn't matter
-      // who wins.
-      // All of the callers of this function (except the SuspendAllInternal)
-      // will first test the kActiveSuspendBarrier flag without lock. Here
-      // double-check whether the barrier has been passed with the
-      // suspend_count lock.
+      // quick exit test: the barriers have already been claimed - this is possible as there may
+      // be a race to claim and it doesn't matter who wins.
+      // All of the callers of this function (except the SuspendAllInternal) will first test the
+      // kActiveSuspendBarrier flag without lock. Here double-check whether the barrier has been
+      // passed with the suspend_count lock.
       return false;
     }
-
-    for (uint32_t i = 0; i < kMaxSuspendBarriers; ++i) {
-      pass_barriers[i] = tlsPtr_.active_suspend_barriers[i];
-      tlsPtr_.active_suspend_barriers[i] = nullptr;
+    if (tlsPtr_.active_suspendall_barrier != nullptr) {
+      // We have at most one active active_suspendall_barrier. See thread.h comment.
+      pass_barriers.push_back(tlsPtr_.active_suspendall_barrier);
+      tlsPtr_.active_suspendall_barrier = nullptr;
     }
+    for (WrappedSuspend1Barrier* w = tlsPtr_.active_suspend1_barriers; w != nullptr; w = w->next_) {
+      pass_barriers.push_back(&(w->barrier_));
+    }
+    tlsPtr_.active_suspend1_barriers = nullptr;
     AtomicClearFlag(ThreadFlag::kActiveSuspendBarrier);
   }
 
   uint32_t barrier_count = 0;
-  for (uint32_t i = 0; i < kMaxSuspendBarriers; i++) {
-    AtomicInteger* pending_threads = pass_barriers[i];
-    if (pending_threads != nullptr) {
-      bool done = false;
-      do {
-        int32_t cur_val = pending_threads->load(std::memory_order_relaxed);
-        CHECK_GT(cur_val, 0) << "Unexpected value for PassActiveSuspendBarriers(): " << cur_val;
-        // Reduce value by 1.
-        done = pending_threads->CompareAndSetWeakRelaxed(cur_val, cur_val - 1);
+  for (AtomicInteger* barrier : pass_barriers) {
+    ++barrier_count;
+    int32_t old_val = barrier->fetch_sub(1, std::memory_order_release);
+    CHECK_GT(old_val, 0) << "Unexpected value for PassActiveSuspendBarriers(): " << old_val;
 #if ART_USE_FUTEXES
-        if (done && (cur_val - 1) == 0) {  // Weak CAS may fail spuriously.
-          futex(pending_threads->Address(), FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
-        }
-#endif
-      } while (!done);
-      ++barrier_count;
+    if (old_val == 1) {
+      futex(barrier->Address(), FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
     }
+#endif
   }
   CHECK_GT(barrier_count, 0U);
   return true;
 }
 
-void Thread::ClearSuspendBarrier(AtomicInteger* target) {
-  CHECK(ReadFlag(ThreadFlag::kActiveSuspendBarrier));
-  bool clear_flag = true;
-  for (uint32_t i = 0; i < kMaxSuspendBarriers; ++i) {
-    AtomicInteger* ptr = tlsPtr_.active_suspend_barriers[i];
-    if (ptr == target) {
-      tlsPtr_.active_suspend_barriers[i] = nullptr;
-    } else if (ptr != nullptr) {
-      clear_flag = false;
-    }
-  }
-  if (LIKELY(clear_flag)) {
-    AtomicClearFlag(ThreadFlag::kActiveSuspendBarrier);
-  }
-}
-
 void Thread::RunCheckpointFunction() {
   DCHECK_EQ(Thread::Current(), this);
   CHECK(!GetStateAndFlags(std::memory_order_relaxed).IsAnyOfFlagsSet(FlipFunctionFlags()));
@@ -1641,28 +1550,25 @@
 }
 
 bool Thread::RequestCheckpoint(Closure* function) {
-  StateAndFlags old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
-  if (old_state_and_flags.GetState() != ThreadState::kRunnable) {
-    return false;  // Fail, thread is suspended and so can't run a checkpoint.
-  }
-
-  // We must be runnable to request a checkpoint.
-  DCHECK_EQ(old_state_and_flags.GetState(), ThreadState::kRunnable);
-  StateAndFlags new_state_and_flags = old_state_and_flags;
-  new_state_and_flags.SetFlag(ThreadFlag::kCheckpointRequest);
-  bool success = tls32_.state_and_flags.CompareAndSetStrongSequentiallyConsistent(
-      old_state_and_flags.GetValue(), new_state_and_flags.GetValue());
-  if (success) {
-    // Succeeded setting checkpoint flag, now insert the actual checkpoint.
-    if (tlsPtr_.checkpoint_function == nullptr) {
-      tlsPtr_.checkpoint_function = function;
-    } else {
-      checkpoint_overflow_.push_back(function);
+  StateAndFlags old_state_and_flags(0 /* unused */), new_state_and_flags(0 /* unused */);
+  do {
+    old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
+    if (old_state_and_flags.GetState() != ThreadState::kRunnable) {
+      return false;  // Fail, thread is suspended and so can't run a checkpoint.
     }
-    CHECK(ReadFlag(ThreadFlag::kCheckpointRequest));
-    TriggerSuspend();
+    new_state_and_flags = old_state_and_flags;
+    new_state_and_flags.SetFlag(ThreadFlag::kCheckpointRequest);
+  } while (!tls32_.state_and_flags.CompareAndSetWeakSequentiallyConsistent(
+      old_state_and_flags.GetValue(), new_state_and_flags.GetValue()));
+  // Succeeded setting checkpoint flag, now insert the actual checkpoint.
+  if (tlsPtr_.checkpoint_function == nullptr) {
+    tlsPtr_.checkpoint_function = function;
+  } else {
+    checkpoint_overflow_.push_back(function);
   }
-  return success;
+  DCHECK(ReadFlag(ThreadFlag::kCheckpointRequest));
+  TriggerSuspend();
+  return true;
 }
 
 bool Thread::RequestEmptyCheckpoint() {
@@ -1694,8 +1600,8 @@
     barrier_.Pass(self);
   }
 
-  void Wait(Thread* self, ThreadState suspend_state) {
-    if (suspend_state != ThreadState::kRunnable) {
+  void Wait(Thread* self, ThreadState wait_state) {
+    if (wait_state != ThreadState::kRunnable) {
       barrier_.Increment<Barrier::kDisallowHoldingLocks>(self, 1);
     } else {
       barrier_.Increment<Barrier::kAllowHoldingLocks>(self, 1);
@@ -1708,9 +1614,9 @@
 };
 
 // RequestSynchronousCheckpoint releases the thread_list_lock_ as a part of its execution.
-bool Thread::RequestSynchronousCheckpoint(Closure* function, ThreadState suspend_state) {
+bool Thread::RequestSynchronousCheckpoint(Closure* function, ThreadState wait_state) {
   Thread* self = Thread::Current();
-  if (this == Thread::Current()) {
+  if (this == self) {
     Locks::thread_list_lock_->AssertExclusiveHeld(self);
     // Unlock the tll before running so that the state is the same regardless of thread.
     Locks::thread_list_lock_->ExclusiveUnlock(self);
@@ -1726,181 +1632,225 @@
     return false;
   }
 
-  struct ScopedThreadListLockUnlock {
-    explicit ScopedThreadListLockUnlock(Thread* self_in) RELEASE(*Locks::thread_list_lock_)
-        : self_thread(self_in) {
-      Locks::thread_list_lock_->AssertHeld(self_thread);
-      Locks::thread_list_lock_->Unlock(self_thread);
-    }
-
-    ~ScopedThreadListLockUnlock() ACQUIRE(*Locks::thread_list_lock_) {
-      Locks::thread_list_lock_->AssertNotHeld(self_thread);
-      Locks::thread_list_lock_->Lock(self_thread);
-    }
-
-    Thread* self_thread;
-  };
-
-  for (;;) {
-    Locks::thread_list_lock_->AssertExclusiveHeld(self);
-    // If this thread is runnable, try to schedule a checkpoint. Do some gymnastics to not hold the
-    // suspend-count lock for too long.
-    if (GetState() == ThreadState::kRunnable) {
-      BarrierClosure barrier_closure(function);
-      bool installed = false;
-      {
-        MutexLock mu(self, *Locks::thread_suspend_count_lock_);
-        installed = RequestCheckpoint(&barrier_closure);
-      }
-      if (installed) {
-        // Relinquish the thread-list lock. We should not wait holding any locks. We cannot
-        // reacquire it since we don't know if 'this' hasn't been deleted yet.
-        Locks::thread_list_lock_->ExclusiveUnlock(self);
-        ScopedThreadStateChange sts(self, suspend_state);
-        barrier_closure.Wait(self, suspend_state);
-        return true;
-      }
-      // Fall-through.
-    }
-
-    // This thread is not runnable, make sure we stay suspended, then run the checkpoint.
-    // Note: ModifySuspendCountInternal also expects the thread_list_lock to be held in
-    //       certain situations.
+  Locks::thread_list_lock_->AssertExclusiveHeld(self);
+  // If target "this" thread is runnable, try to schedule a checkpoint. Do some gymnastics to not
+  // hold the suspend-count lock for too long.
+  if (GetState() == ThreadState::kRunnable) {
+    BarrierClosure barrier_closure(function);
+    bool installed = false;
     {
-      MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-
-      if (!ModifySuspendCount(self, +1, nullptr, SuspendReason::kInternal)) {
-        // Just retry the loop.
-        sched_yield();
-        continue;
-      }
+      MutexLock mu(self, *Locks::thread_suspend_count_lock_);
+      installed = RequestCheckpoint(&barrier_closure);
     }
-
-    {
-      // Release for the wait. The suspension will keep us from being deleted. Reacquire after so
-      // that we can call ModifySuspendCount without racing against ThreadList::Unregister.
-      ScopedThreadListLockUnlock stllu(self);
-      {
-        ScopedThreadStateChange sts(self, suspend_state);
-        while (GetState() == ThreadState::kRunnable) {
-          // We became runnable again. Wait till the suspend triggered in ModifySuspendCount
-          // moves us to suspended.
-          sched_yield();
-        }
-      }
-      // Ensure that the flip function for this thread, if pending, is finished *before*
-      // the checkpoint function is run. Otherwise, we may end up with both `to' and 'from'
-      // space references on the stack, confusing the GC's thread-flip logic. The caller is
-      // runnable so can't have a pending flip function.
-      DCHECK_EQ(self->GetState(), ThreadState::kRunnable);
-      DCHECK(
-          !self->GetStateAndFlags(std::memory_order_relaxed).IsAnyOfFlagsSet(FlipFunctionFlags()));
-      EnsureFlipFunctionStarted(self);
-      while (GetStateAndFlags(std::memory_order_acquire).IsAnyOfFlagsSet(FlipFunctionFlags())) {
-        usleep(kSuspendTimeDuringFlip);
-      }
-
-      function->Run(this);
+    if (installed) {
+      // Relinquish the thread-list lock. We should not wait holding any locks. We cannot
+      // reacquire it since we don't know if 'this' hasn't been deleted yet.
+      Locks::thread_list_lock_->ExclusiveUnlock(self);
+      ScopedThreadStateChange sts(self, wait_state);
+      // Wait state can be kRunnable, in which case, for lock ordering purposes, it's as if we ran
+      // the closure ourselves. This means that the target thread should not acquire a pre-mutator
+      // lock without running the checkpoint, and the closure should not acquire a pre-mutator
+      // lock or suspend.
+      barrier_closure.Wait(self, wait_state);
+      return true;
     }
-
-    {
-      MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-
-      DCHECK_NE(GetState(), ThreadState::kRunnable);
-      bool updated = ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-      DCHECK(updated);
-      // Imitate ResumeAll, the thread may be waiting on Thread::resume_cond_ since we raised its
-      // suspend count. Now the suspend_count_ is lowered so we must do the broadcast.
-      Thread::resume_cond_->Broadcast(self);
-    }
-
-    // Release the thread_list_lock_ to be consistent with the barrier-closure path.
-    Locks::thread_list_lock_->ExclusiveUnlock(self);
-
-    return true;  // We're done, break out of the loop.
+    // No longer runnable. Fall-through.
   }
+
+  // Target "this" thread was not runnable. Suspend it, hopefully redundantly,
+  // but it might have become runnable in the meantime.
+  // Although this is a thread suspension, the target thread only blocks while we run the
+  // checkpoint, which is presumed to terminate quickly even if other threads are blocked.
+  // Note: IncrementSuspendCount also expects the thread_list_lock to be held in
+  //       certain situations.
+  {
+    bool is_suspended = false;
+    WrappedSuspend1Barrier wrapped_barrier{};
+
+    {
+      MutexLock suspend_count_mu(self, *Locks::thread_suspend_count_lock_);
+      // If wait_state is kRunnable, function may not suspend. We thus never block because
+      // we ourselves are being asked to suspend.
+      if (UNLIKELY(wait_state != ThreadState::kRunnable && self->GetSuspendCount() != 0)) {
+        // We are being asked to suspend while we are suspending another thread that may be
+        // responsible for our suspension. This is likely to result in deadlock if we each
+        // block on the suspension request. Instead we wait for the situation to change.
+        ThreadExitFlag target_status;
+        NotifyOnThreadExit(&target_status);
+        for (int iter_count = 1; self->GetSuspendCount() != 0; ++iter_count) {
+          Locks::thread_suspend_count_lock_->ExclusiveUnlock(self);
+          Locks::thread_list_lock_->ExclusiveUnlock(self);
+          {
+            ScopedThreadStateChange sts(self, wait_state);
+            usleep(ThreadList::kThreadSuspendSleepUs);
+          }
+          CHECK_LT(iter_count, ThreadList::kMaxSuspendRetries);
+          Locks::thread_list_lock_->Lock(self);
+          if (target_status.HasExited()) {
+            Locks::thread_list_lock_->ExclusiveUnlock(self);
+            return false;
+          }
+          Locks::thread_suspend_count_lock_->Lock(self);
+        }
+        UnregisterThreadExitFlag(&target_status);
+      }
+      IncrementSuspendCount(self, nullptr, &wrapped_barrier, SuspendReason::kInternal);
+      DCHECK_GT(GetSuspendCount(), 0);
+      DCHECK_EQ(self->GetSuspendCount(), 0);
+      // Since we've incremented the suspend count, "this" thread can no longer disappear.
+      Locks::thread_list_lock_->ExclusiveUnlock(self);
+      if (IsSuspended()) {
+        // See the discussion in mutator_gc_coord.md and SuspendAllInternal for the race here.
+        RemoveFirstSuspend1Barrier();
+        if (!HasActiveSuspendBarrier()) {
+          AtomicClearFlag(ThreadFlag::kActiveSuspendBarrier);
+        }
+        is_suspended = true;
+      }
+    }
+    if (!is_suspended) {
+      bool success =
+          Runtime::Current()->GetThreadList()->WaitForSuspendBarrier(&wrapped_barrier.barrier_);
+      CHECK(success);
+    }
+
+    // Ensure that the flip function for this thread, if pending, is finished *before*
+    // the checkpoint function is run. Otherwise, we may end up with both `to' and 'from'
+    // space references on the stack, confusing the GC's thread-flip logic. The caller is
+    // runnable so can't have a pending flip function.
+    DCHECK_EQ(self->GetState(), ThreadState::kRunnable);
+    DCHECK(!self->GetStateAndFlags(std::memory_order_relaxed).IsAnyOfFlagsSet(FlipFunctionFlags()));
+    EnsureFlipFunctionStarted(self, this);
+    if (ReadFlag(ThreadFlag::kRunningFlipFunction)) {
+      WaitForFlipFunction(self);
+    }
+    function->Run(this);
+  }
+
+  {
+    MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
+    DCHECK_NE(GetState(), ThreadState::kRunnable);
+    DCHECK_GT(GetSuspendCount(), 0);
+    DecrementSuspendCount(self);
+    resume_cond_->Broadcast(self);
+  }
+
+  Locks::thread_list_lock_->AssertNotHeld(self);
+  return true;
 }
 
 void Thread::SetFlipFunction(Closure* function) {
   // This is called with all threads suspended, except for the calling thread.
   DCHECK(IsSuspended() || Thread::Current() == this);
   DCHECK(function != nullptr);
-  DCHECK(tlsPtr_.flip_function == nullptr);
-  tlsPtr_.flip_function = function;
+  DCHECK(GetFlipFunction() == nullptr);
+  tlsPtr_.flip_function.store(function, std::memory_order_relaxed);
   DCHECK(!GetStateAndFlags(std::memory_order_relaxed).IsAnyOfFlagsSet(FlipFunctionFlags()));
   AtomicSetFlag(ThreadFlag::kPendingFlipFunction, std::memory_order_release);
 }
 
-bool Thread::EnsureFlipFunctionStarted(Thread* self, StateAndFlags old_state_and_flags) {
+bool Thread::EnsureFlipFunctionStarted(Thread* self,
+                                       Thread* target,
+                                       StateAndFlags old_state_and_flags,
+                                       ThreadExitFlag* tef,
+                                       bool* finished) {
+  //  Note: *target may have been destroyed. We have to be careful about
+  //  accessing it. That is the reason this is static and not a member function.
   bool become_runnable;
+  DCHECK(self == Current());
+  bool check_exited = (tef != nullptr);
+  auto maybe_release = [=]() NO_THREAD_SAFETY_ANALYSIS /* conditionally unlocks */ {
+    if (check_exited) {
+      Locks::thread_list_lock_->Unlock(self);
+    }
+  };
+  auto set_finished = [=](bool value) {
+    if (finished != nullptr) {
+      *finished = value;
+    }
+  };
+
+  if (check_exited) {
+    Locks::thread_list_lock_->Lock(self);
+    if (tef->HasExited()) {
+      Locks::thread_list_lock_->Unlock(self);
+      set_finished(true);
+      return false;
+    }
+  }
   if (old_state_and_flags.GetValue() == 0) {
     become_runnable = false;
-    old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
+    old_state_and_flags = target->GetStateAndFlags(std::memory_order_relaxed);
   } else {
     become_runnable = true;
+    DCHECK(!check_exited);
+    DCHECK(target == self);
+    DCHECK(old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction));
     DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kSuspendRequest));
   }
-
-  while (old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction)) {
+  while (true) {
+    DCHECK(!check_exited || (Locks::thread_list_lock_->IsExclusiveHeld(self) && !tef->HasExited()));
+    if (!old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction)) {
+      maybe_release();
+      set_finished(!old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction));
+      return false;
+    }
     DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction));
     StateAndFlags new_state_and_flags =
         old_state_and_flags.WithFlag(ThreadFlag::kRunningFlipFunction)
                            .WithoutFlag(ThreadFlag::kPendingFlipFunction);
     if (become_runnable) {
-      DCHECK_EQ(self, this);
+      DCHECK_EQ(self, target);
       DCHECK_NE(self->GetState(), ThreadState::kRunnable);
       new_state_and_flags = new_state_and_flags.WithState(ThreadState::kRunnable);
     }
-    if (tls32_.state_and_flags.CompareAndSetWeakAcquire(old_state_and_flags.GetValue(),
-                                                        new_state_and_flags.GetValue())) {
+    if (target->tls32_.state_and_flags.CompareAndSetWeakAcquire(old_state_and_flags.GetValue(),
+                                                                new_state_and_flags.GetValue())) {
       if (become_runnable) {
-        GetMutatorLock()->TransitionFromSuspendedToRunnable(this);
+        self->GetMutatorLock()->TransitionFromSuspendedToRunnable(self);
       }
       art::Locks::mutator_lock_->AssertSharedHeld(self);
-      RunFlipFunction(self, /*notify=*/ true);
-      DCHECK(!GetStateAndFlags(std::memory_order_relaxed).IsAnyOfFlagsSet(FlipFunctionFlags()));
+      maybe_release();
+      // Thread will not go away while kRunningFlipFunction is set.
+      target->RunFlipFunction(self);
+      // At this point, no flip function flags should be set. It's unsafe to DCHECK that, since
+      // the thread may now have exited.
+      set_finished(true);
       return true;
-    } else {
-      old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
-      if (become_runnable && old_state_and_flags.IsFlagSet(ThreadFlag::kSuspendRequest)) {
-        break;
-      }
     }
+    if (become_runnable) {
+      DCHECK(!check_exited);  // We didn't acquire thread_list_lock_ .
+      // Let caller retry.
+      return false;
+    }
+    old_state_and_flags = target->GetStateAndFlags(std::memory_order_acquire);
   }
-  return false;
+  // Unreachable.
 }
 
-void Thread::RunFlipFunction(Thread* self, bool notify) {
-  // This function is called for suspended threads and by the thread running
-  // `ThreadList::FlipThreadRoots()` after we've successfully set the flag
-  // `ThreadFlag::kRunningFlipFunction`. This flag is not set if the thread is
-  // running the flip function right after transitioning to Runnable as
-  // no other thread may run checkpoints on a thread that's actually Runnable.
-  DCHECK_EQ(notify, ReadFlag(ThreadFlag::kRunningFlipFunction));
+void Thread::RunFlipFunction(Thread* self) {
+  // This function is called either by the thread running `ThreadList::FlipThreadRoots()` or when
+  // a thread becomes runnable, after we've successfully set the kRunningFlipFunction ThreadFlag.
+  DCHECK(ReadFlag(ThreadFlag::kRunningFlipFunction));
 
-  Closure* flip_function = tlsPtr_.flip_function;
-  tlsPtr_.flip_function = nullptr;
+  Closure* flip_function = GetFlipFunction();
+  tlsPtr_.flip_function.store(nullptr, std::memory_order_relaxed);
   DCHECK(flip_function != nullptr);
   flip_function->Run(this);
+  // The following also serves as a very approximate check for a deallocated thread.
+  DCHECK_NE(GetState(), ThreadState::kTerminated);
+  DCHECK(!ReadFlag(ThreadFlag::kPendingFlipFunction));
+  AtomicClearFlag(ThreadFlag::kRunningFlipFunction, std::memory_order_release);
+  // From here on this thread may go away, and it is no longer safe to access.
 
-  if (notify) {
-    // Clear the `ThreadFlag::kRunningFlipFunction` and `ThreadFlag::kWaitingForFlipFunction`.
-    // Check if the latter was actually set, indicating that there is at least one waiting thread.
-    constexpr uint32_t kFlagsToClear = enum_cast<uint32_t>(ThreadFlag::kRunningFlipFunction) |
-                                       enum_cast<uint32_t>(ThreadFlag::kWaitingForFlipFunction);
-    StateAndFlags old_state_and_flags(
-        tls32_.state_and_flags.fetch_and(~kFlagsToClear, std::memory_order_release));
-    if (old_state_and_flags.IsFlagSet(ThreadFlag::kWaitingForFlipFunction)) {
-      // Notify all threads that are waiting for completion (at least one).
-      // TODO: Should we create a separate mutex and condition variable instead
-      // of piggy-backing on the `thread_suspend_count_lock_` and `resume_cond_`?
-      MutexLock mu(self, *Locks::thread_suspend_count_lock_);
-      resume_cond_->Broadcast(self);
-    }
-  }
+  // Notify all threads that are waiting for completion.
+  // TODO: Should we create a separate mutex and condition variable instead
+  // of piggy-backing on the `thread_suspend_count_lock_` and `resume_cond_`?
+  MutexLock mu(self, *Locks::thread_suspend_count_lock_);
+  resume_cond_->Broadcast(self);
 }
 
-void Thread::WaitForFlipFunction(Thread* self) {
+void Thread::WaitForFlipFunction(Thread* self) const {
   // Another thread is running the flip function. Wait for it to complete.
   // Check the flag while holding the mutex so that we do not miss the broadcast.
   // Repeat the check after waiting to guard against spurious wakeups (and because
@@ -1910,24 +1860,46 @@
     StateAndFlags old_state_and_flags = GetStateAndFlags(std::memory_order_acquire);
     DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction));
     if (!old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction)) {
-      DCHECK(!old_state_and_flags.IsAnyOfFlagsSet(FlipFunctionFlags()));
-      break;
+      return;
     }
-    if (!old_state_and_flags.IsFlagSet(ThreadFlag::kWaitingForFlipFunction)) {
-      // Mark that there is a waiting thread.
-      StateAndFlags new_state_and_flags =
-          old_state_and_flags.WithFlag(ThreadFlag::kWaitingForFlipFunction);
-      if (!tls32_.state_and_flags.CompareAndSetWeakRelaxed(old_state_and_flags.GetValue(),
-                                                           new_state_and_flags.GetValue())) {
-        continue;  // Retry.
-      }
+    // We sometimes hold mutator lock here. OK since the flip function must complete quickly.
+    resume_cond_->WaitHoldingLocks(self);
+  }
+}
+
+void Thread::WaitForFlipFunctionTestingExited(Thread* self, ThreadExitFlag* tef) {
+  Locks::thread_list_lock_->Lock(self);
+  if (tef->HasExited()) {
+    Locks::thread_list_lock_->Unlock(self);
+    return;
+  }
+  // We need to hold suspend_count_lock_ to avoid missed wakeups when the flip function finishes.
+  // We need to hold thread_list_lock_ because the tef test result is only valid while we hold the
+  // lock, and once kRunningFlipFunction is no longer set, "this" may be deallocated. Hence the
+  // complicated locking dance.
+  MutexLock mu(self, *Locks::thread_suspend_count_lock_);
+  while (true) {
+    StateAndFlags old_state_and_flags = GetStateAndFlags(std::memory_order_acquire);
+    DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction));
+    Locks::thread_list_lock_->Unlock(self);  // So we can wait or return.
+    if (!old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction)) {
+      return;
     }
-    resume_cond_->Wait(self);
+    resume_cond_->WaitHoldingLocks(self);
+    Locks::thread_suspend_count_lock_->Unlock(self);  // To re-lock thread_list_lock.
+    Locks::thread_list_lock_->Lock(self);
+    Locks::thread_suspend_count_lock_->Lock(self);
+    if (tef->HasExited()) {
+      Locks::thread_list_lock_->Unlock(self);
+      return;
+    }
   }
 }
 
 void Thread::FullSuspendCheck(bool implicit) {
   ScopedTrace trace(__FUNCTION__);
+  DCHECK(!ReadFlag(ThreadFlag::kSuspensionImmune));
+  DCHECK(this == Thread::Current());
   VLOG(threads) << this << " self-suspending";
   // Make thread appear suspended to other threads, release mutator_lock_.
   // Transition to suspended and back to runnable, re-acquire share on mutator_lock_.
@@ -2242,19 +2214,27 @@
     if (obj == nullptr) {
       os << msg << "an unknown object";
     } else {
-      if ((obj->GetLockWord(true).GetState() == LockWord::kThinLocked) &&
-          Locks::mutator_lock_->IsExclusiveHeld(Thread::Current())) {
-        // Getting the identity hashcode here would result in lock inflation and suspension of the
-        // current thread, which isn't safe if this is the only runnable thread.
-        os << msg << StringPrintf("<@addr=0x%" PRIxPTR "> (a %s)",
-                                  reinterpret_cast<intptr_t>(obj.Ptr()),
-                                  obj->PrettyTypeOf().c_str());
+      const std::string pretty_type(obj->PrettyTypeOf());
+      // It's often unsafe to allow lock inflation here. We may be the only runnable thread, or
+      // this may be called from a checkpoint. We get the hashcode on a best effort basis.
+      static constexpr int kNumRetries = 3;
+      static constexpr int kSleepMicros = 10;
+      int32_t hash_code;
+      for (int i = 0;; ++i) {
+        hash_code = obj->IdentityHashCodeNoInflation();
+        if (hash_code != 0 || i == kNumRetries) {
+          break;
+        }
+        usleep(kSleepMicros);
+      }
+      if (hash_code == 0) {
+        os << msg
+           << StringPrintf("<@addr=0x%" PRIxPTR "> (a %s)",
+                           reinterpret_cast<intptr_t>(obj.Ptr()),
+                           pretty_type.c_str());
       } else {
-        // - waiting on <0x6008c468> (a java.lang.Class<java.lang.ref.ReferenceQueue>)
-        // Call PrettyTypeOf before IdentityHashCode since IdentityHashCode can cause thread
-        // suspension and move pretty_object.
-        const std::string pretty_type(obj->PrettyTypeOf());
-        os << msg << StringPrintf("<0x%08x> (a %s)", obj->IdentityHashCode(), pretty_type.c_str());
+        // - waiting on <0x608c468> (a java.lang.Class<java.lang.ref.ReferenceQueue>)
+        os << msg << StringPrintf("<0x%08x> (a %s)", hash_code, pretty_type.c_str());
       }
     }
     if (owner_tid != ThreadList::kInvalidThreadId) {
@@ -2452,6 +2432,13 @@
       soa.Self(), thread_group_object, thread_object);
 }
 
+void Thread::SignalExitFlags() {
+  for (ThreadExitFlag* tef = tlsPtr_.thread_exit_flags; tef != nullptr; tef = tef->next_) {
+    tef->exited_ = true;
+  }
+  tlsPtr_.thread_exit_flags = nullptr;  // Now unused.
+}
+
 Thread::Thread(bool daemon)
     : tls32_(daemon),
       wait_monitor_(nullptr),
@@ -2476,12 +2463,10 @@
             tlsPtr_.rosalloc_runs + kNumRosAllocThreadLocalSizeBracketsInThread,
             gc::allocator::RosAlloc::GetDedicatedFullRun());
   tlsPtr_.checkpoint_function = nullptr;
-  for (uint32_t i = 0; i < kMaxSuspendBarriers; ++i) {
-    tlsPtr_.active_suspend_barriers[i] = nullptr;
-  }
-  tlsPtr_.flip_function = nullptr;
+  tlsPtr_.active_suspendall_barrier = nullptr;
+  tlsPtr_.active_suspend1_barriers = nullptr;
+  tlsPtr_.flip_function.store(nullptr, std::memory_order_relaxed);
   tlsPtr_.thread_local_mark_stack = nullptr;
-  tls32_.is_transitioning_to_runnable = false;
   ResetTlab();
 }
 
@@ -2623,10 +2608,11 @@
   CHECK_NE(GetState(), ThreadState::kRunnable);
   CHECK(!ReadFlag(ThreadFlag::kCheckpointRequest));
   CHECK(!ReadFlag(ThreadFlag::kEmptyCheckpointRequest));
+  CHECK(!ReadFlag(ThreadFlag::kSuspensionImmune));
   CHECK(tlsPtr_.checkpoint_function == nullptr);
   CHECK_EQ(checkpoint_overflow_.size(), 0u);
-  CHECK(tlsPtr_.flip_function == nullptr);
-  CHECK_EQ(tls32_.is_transitioning_to_runnable, false);
+  // A pending flip function request is OK. FlipThreadRoots will have been notified that we
+  // exited, and nobody will attempt to process the request.
 
   // Make sure we processed all deoptimization requests.
   CHECK(tlsPtr_.deoptimization_context_stack == nullptr) << "Missed deoptimization";
@@ -4717,9 +4703,11 @@
 mirror::Object* Thread::GetPeerFromOtherThread() {
   DCHECK(tlsPtr_.jpeer == nullptr);
   // Ensure that opeer is not obsolete.
-  EnsureFlipFunctionStarted(Thread::Current());
-  while (GetStateAndFlags(std::memory_order_acquire).IsAnyOfFlagsSet(FlipFunctionFlags())) {
-    usleep(kSuspendTimeDuringFlip);
+  Thread* self = Thread::Current();
+  EnsureFlipFunctionStarted(self, this);
+  if (ReadFlag(ThreadFlag::kRunningFlipFunction)) {
+    // Does not release mutator lock. Hence no new flip requests can be issued.
+    WaitForFlipFunction(self);
   }
   return tlsPtr_.opeer;
 }
diff --git a/runtime/thread.h b/runtime/thread.h
index 08c803d..34ece64 100644
--- a/runtime/thread.h
+++ b/runtime/thread.h
@@ -47,6 +47,7 @@
 #include "reflective_handle_scope.h"
 #include "runtime_globals.h"
 #include "runtime_stats.h"
+#include "suspend_reason.h"
 #include "thread_state.h"
 
 namespace unwindstack {
@@ -101,7 +102,6 @@
 class ScopedObjectAccessAlreadyRunnable;
 class ShadowFrame;
 class StackedShadowFrameRecord;
-enum class SuspendReason : char;
 class Thread;
 class ThreadList;
 enum VisitRootFlags : uint8_t;
@@ -133,25 +133,31 @@
   kEmptyCheckpointRequest = 1u << 2,
 
   // Register that at least 1 suspend barrier needs to be passed.
+  // Changes to this flag are guarded by suspend_count_lock_ .
   kActiveSuspendBarrier = 1u << 3,
 
   // Marks that a "flip function" needs to be executed on this thread.
+  // Set only while holding thread_list_lock_.
   kPendingFlipFunction = 1u << 4,
 
   // Marks that the "flip function" is being executed by another thread.
   //
-  // This is used to guards against multiple threads trying to run the
+  // This is used to guard against multiple threads trying to run the
   // "flip function" for the same thread while the thread is suspended.
   //
-  // This is not needed when the thread is running the flip function
-  // on its own after transitioning to Runnable.
+  // Set either by the thread itself or while holding thread_list_lock, Prevents a thread from
+  // exiting.
   kRunningFlipFunction = 1u << 5,
 
-  // Marks that a thread is wating for "flip function" to complete.
-  //
-  // This is used to check if we need to broadcast the completion of the
-  // "flip function" to other threads. See also `kRunningFlipFunction`.
-  kWaitingForFlipFunction = 1u << 6,
+  // We are responsible for resuming all other threads. We ignore suspension requests,
+  // but not checkpoint requests, until a more opportune time. GC code should
+  // in any case not check for such requests; other clients of SuspendAll might.
+  // Prevents a situation in which we are asked to suspend just before we suspend all
+  // other threads, and then notice the suspension request and suspend ourselves,
+  // leading to deadlock. Guarded by suspend_count_lock_ .
+  // TODO(b/296639267): Generalize use to prevent SuspendAll from blocking
+  // in-progress GC.
+  kSuspensionImmune = 1u << 6,
 
   // Request that compiled JNI stubs do not transition to Native or Runnable with
   // inlined code, but take a slow path for monitoring method entry and exit events.
@@ -187,6 +193,28 @@
   kDisabled
 };
 
+// See Thread.tlsPtr_.active_suspend1_barriers below for explanation.
+struct WrappedSuspend1Barrier {
+  WrappedSuspend1Barrier() : barrier_(1), next_(nullptr) {}
+  AtomicInteger barrier_;
+  struct WrappedSuspend1Barrier* next_ GUARDED_BY(Locks::thread_suspend_count_lock_);
+};
+
+// Mostly opaque structure allocated by the client of NotifyOnThreadExit.  Allows a client to
+// check whether the thread still exists after temporarily releasing thread_list_lock_, usually
+// because we need to wait for something.
+class ThreadExitFlag {
+ public:
+  ThreadExitFlag() : exited_(false) {}
+  bool HasExited() REQUIRES(Locks::thread_list_lock_) { return exited_; }
+
+ private:
+  ThreadExitFlag* next_ GUARDED_BY(Locks::thread_list_lock_);
+  ThreadExitFlag* prev_ GUARDED_BY(Locks::thread_list_lock_);
+  bool exited_ GUARDED_BY(*Locks::thread_list_lock_);
+  friend class Thread;
+};
+
 // This should match RosAlloc::kNumThreadLocalSizeBrackets.
 static constexpr size_t kNumRosAllocThreadLocalSizeBracketsInThread = 16;
 
@@ -320,7 +348,10 @@
   }
 
   bool IsSuspended() const {
-    StateAndFlags state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
+    // We need to ensure that once we return true, all prior accesses to the Java data by "this"
+    // thread are complete. Hence we need "acquire" ordering here, and "release" when the flags
+    // are set.
+    StateAndFlags state_and_flags = GetStateAndFlags(std::memory_order_acquire);
     return state_and_flags.GetState() != ThreadState::kRunnable &&
            state_and_flags.IsFlagSet(ThreadFlag::kSuspendRequest);
   }
@@ -336,20 +367,30 @@
     return tls32_.define_class_counter;
   }
 
-  // If delta > 0 and (this != self or suspend_barrier is not null), this function may temporarily
-  // release thread_suspend_count_lock_ internally.
+  // Increment suspend count and optionally install at most one suspend barrier.
   ALWAYS_INLINE
-  bool ModifySuspendCount(Thread* self,
-                          int delta,
-                          AtomicInteger* suspend_barrier,
-                          SuspendReason reason)
-      WARN_UNUSED
+  void IncrementSuspendCount(Thread* self,
+                             AtomicInteger* suspendall_barrier,
+                             WrappedSuspend1Barrier* suspend1_barrier,
+                             SuspendReason reason) REQUIRES(Locks::thread_suspend_count_lock_);
+
+  // The same, but default reason to kInternal, and barriers to nullptr.
+  ALWAYS_INLINE void IncrementSuspendCount(Thread* self)
       REQUIRES(Locks::thread_suspend_count_lock_);
 
+  // Follows one of the above calls. For_user_code indicates if SuspendReason was kForUserCode.
+  ALWAYS_INLINE void DecrementSuspendCount(Thread* self, bool for_user_code = false)
+      REQUIRES(Locks::thread_suspend_count_lock_);
+
+ private:
+  NO_RETURN static void UnsafeLogFatalForSuspendCount(Thread* self, Thread* thread);
+
+ public:
   // Requests a checkpoint closure to run on another thread. The closure will be run when the
   // thread notices the request, either in an explicit runtime CheckSuspend() call, or in a call
   // originating from a compiler generated suspend point check. This returns true if the closure
-  // was added and will (eventually) be executed. It returns false otherwise.
+  // was added and will (eventually) be executed. It returns false if this was impossible
+  // because the thread was suspended, and we thus did nothing.
   //
   // Since multiple closures can be queued and some closures can delay other threads from running,
   // no closure should attempt to suspend another thread while running.
@@ -363,27 +404,37 @@
 
   // RequestSynchronousCheckpoint releases the thread_list_lock_ as a part of its execution. This is
   // due to the fact that Thread::Current() needs to go to sleep to allow the targeted thread to
-  // execute the checkpoint for us if it is Runnable. The suspend_state is the state that the thread
+  // execute the checkpoint for us if it is Runnable. The wait_state is the state that the thread
   // will go into while it is awaiting the checkpoint to be run.
-  // NB Passing ThreadState::kRunnable may cause the current thread to wait in a condition variable
-  // while holding the mutator_lock_.  Callers should ensure that this will not cause any problems
-  // for the closure or the rest of the system.
+  // The closure may be run on Thread::Current() on behalf of "this" thread.
+  // Thus for lock ordering purposes, the closure should be runnable by the caller. This also
+  // sometimes makes it reasonable to pass ThreadState::kRunnable as wait_state: We may wait on
+  // a condition variable for the "this" thread to act, but for lock ordering purposes, this is
+  // exactly as though Thread::Current() had run the closure.
   // NB Since multiple closures can be queued and some closures can delay other threads from running
   // no closure should attempt to suspend another thread while running.
   bool RequestSynchronousCheckpoint(Closure* function,
-                                    ThreadState suspend_state = ThreadState::kWaiting)
-      REQUIRES_SHARED(Locks::mutator_lock_)
-      RELEASE(Locks::thread_list_lock_)
-      REQUIRES(!Locks::thread_suspend_count_lock_);
+                                    ThreadState wait_state = ThreadState::kWaiting)
+      REQUIRES_SHARED(Locks::mutator_lock_) RELEASE(Locks::thread_list_lock_)
+          REQUIRES(!Locks::thread_suspend_count_lock_);
 
   bool RequestEmptyCheckpoint()
       REQUIRES(Locks::thread_suspend_count_lock_);
 
-  // Set the flip function. This is done with all threads suspended, except for the calling thread.
-  void SetFlipFunction(Closure* function);
+  Closure* GetFlipFunction() { return tlsPtr_.flip_function.load(std::memory_order_relaxed); }
 
-  // Wait for the flip function to complete if still running on another thread.
-  void WaitForFlipFunction(Thread* self);
+  // Set the flip function. This is done with all threads suspended, except for the calling thread.
+  void SetFlipFunction(Closure* function) REQUIRES(Locks::thread_suspend_count_lock_)
+      REQUIRES(Locks::thread_list_lock_);
+
+  // Wait for the flip function to complete if still running on another thread. Assumes the "this"
+  // thread remains live.
+  void WaitForFlipFunction(Thread* self) const REQUIRES(!Locks::thread_suspend_count_lock_);
+
+  // An enhanced version of the above that uses tef to safely return if the thread exited in the
+  // meantime.
+  void WaitForFlipFunctionTestingExited(Thread* self, ThreadExitFlag* tef)
+      REQUIRES(!Locks::thread_suspend_count_lock_, !Locks::thread_list_lock_);
 
   gc::accounting::AtomicStack<mirror::Object>* GetThreadLocalMarkStack() {
     CHECK(gUseReadBarrier);
@@ -405,6 +456,7 @@
 
   // Called when thread detected that the thread_suspend_count_ was non-zero. Gives up share of
   // mutator_lock_ and waits until it is resumed and thread_suspend_count_ is zero.
+  // Should be called only when the kSuspensionImmune flag is clear. Requires this == Current();
   void FullSuspendCheck(bool implicit = false)
       REQUIRES(!Locks::thread_suspend_count_lock_)
       REQUIRES_SHARED(Locks::mutator_lock_);
@@ -520,7 +572,7 @@
   // the thread's stack may have not been flipped yet and peer may be a from-space (stale) ref.
   // This function will force a flip for the other thread if necessary.
   // Since we hold a shared mutator lock, a new flip function cannot be concurrently
-  // installed
+  // installed.
   mirror::Object* GetPeerFromOtherThread() REQUIRES_SHARED(Locks::mutator_lock_);
 
   bool HasPeer() const {
@@ -644,6 +696,19 @@
   void NotifyThreadGroup(ScopedObjectAccessAlreadyRunnable& soa, jobject thread_group = nullptr)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
+  // Request notification when this thread is unregistered or exits. The caller must allocate a
+  // ThreadExitFlag, and pass it in. The caller is responsible for either waiting until the
+  // thread has exited, or uregistering the ThreadExitFlag, and then, and only then, deallocating
+  // the ThreadExitFlag. (This scheme avoids an allocation and questions about what to do if the
+  // allocation fails. Allows detection of thread exit after temporary release of
+  // thread_list_lock_)
+  void NotifyOnThreadExit(ThreadExitFlag* tef) REQUIRES(Locks::thread_list_lock_);
+  void UnregisterThreadExitFlag(ThreadExitFlag* tef) REQUIRES(Locks::thread_list_lock_);
+
+  // Called when thread is unregistered. May be called repeatedly, in which case only newly
+  // registered clients are processed.
+  void SignalExitFlags() REQUIRES(Locks::thread_list_lock_);
+
   // JNI methods
   JNIEnvExt* GetJniEnv() const {
     return tlsPtr_.jni_env;
@@ -1241,18 +1306,39 @@
     tlsPtr_.held_mutexes[level] = mutex;
   }
 
-  void ClearSuspendBarrier(AtomicInteger* target)
-      REQUIRES(Locks::thread_suspend_count_lock_);
+  // Possibly check that no mutexes at level kMonitorLock or above are subsequently acquired.
+  // Only invoked by the thread itself.
+  void DisallowPreMonitorMutexes() {
+    if (kIsDebugBuild) {
+      CHECK(this == Thread::Current());
+      CHECK(GetHeldMutex(kMonitorLock) == nullptr);
+      // Pretend we hold a kMonitorLock level mutex to detect disallowed mutex
+      // acquisitions by checkpoint Run() methods.  We don't normally register or thus check
+      // kMonitorLock level mutexes, but this is an exception.
+      static Mutex placeholder_mutex("checkpoint placeholder mutex", kMonitorLock);
+      SetHeldMutex(kMonitorLock, &placeholder_mutex);
+    }
+  }
+
+  // Undo the effect of the previous call. Again only invoked by the thread itself.
+  void AllowPreMonitorMutexes() {
+    if (kIsDebugBuild) {
+      CHECK(GetHeldMutex(kMonitorLock) != nullptr);
+      SetHeldMutex(kMonitorLock, nullptr);
+    }
+  }
 
   bool ReadFlag(ThreadFlag flag) const {
     return GetStateAndFlags(std::memory_order_relaxed).IsFlagSet(flag);
   }
 
   void AtomicSetFlag(ThreadFlag flag, std::memory_order order = std::memory_order_seq_cst) {
+    // Since we discard the returned value, memory_order_release will often suffice.
     tls32_.state_and_flags.fetch_or(enum_cast<uint32_t>(flag), order);
   }
 
   void AtomicClearFlag(ThreadFlag flag, std::memory_order order = std::memory_order_seq_cst) {
+    // Since we discard the returned value, memory_order_release will often suffice.
     tls32_.state_and_flags.fetch_and(~enum_cast<uint32_t>(flag), order);
   }
 
@@ -1345,14 +1431,6 @@
   bool ProtectStack(bool fatal_on_error = true);
   bool UnprotectStack();
 
-  bool IsTransitioningToRunnable() const {
-    return tls32_.is_transitioning_to_runnable;
-  }
-
-  void SetIsTransitioningToRunnable(bool value) {
-    tls32_.is_transitioning_to_runnable = value;
-  }
-
   uint32_t DecrementForceInterpreterCount() REQUIRES(Locks::thread_list_lock_) {
     return --tls32_.force_interpreter_count;
   }
@@ -1463,8 +1541,7 @@
 
   static constexpr uint32_t FlipFunctionFlags() {
     return enum_cast<uint32_t>(ThreadFlag::kPendingFlipFunction) |
-           enum_cast<uint32_t>(ThreadFlag::kRunningFlipFunction) |
-           enum_cast<uint32_t>(ThreadFlag::kWaitingForFlipFunction);
+           enum_cast<uint32_t>(ThreadFlag::kRunningFlipFunction);
   }
 
   static constexpr uint32_t StoredThreadStateValue(ThreadState state) {
@@ -1486,6 +1563,7 @@
 
  private:
   explicit Thread(bool daemon);
+  // A runnable thread should only be deleted from the thread itself.
   ~Thread() REQUIRES(!Locks::mutator_lock_, !Locks::thread_suspend_count_lock_);
   void Destroy(bool should_run_callbacks);
 
@@ -1513,7 +1591,8 @@
 
   // Avoid use, callers should use SetState.
   // Used only by `Thread` destructor and stack trace collection in semi-space GC (currently
-  // disabled by `kStoreStackTraces = false`).
+  // disabled by `kStoreStackTraces = false`). May not be called on a runnable thread other
+  // than Thread::Current().
   // NO_THREAD_SAFETY_ANALYSIS: This function is "Unsafe" and can be called in
   // different states, so clang cannot perform the thread safety analysis.
   ThreadState SetStateUnsafe(ThreadState new_state) NO_THREAD_SAFETY_ANALYSIS {
@@ -1522,12 +1601,13 @@
     if (old_state == new_state) {
       // Nothing to do.
     } else if (old_state == ThreadState::kRunnable) {
+      DCHECK_EQ(this, Thread::Current());
       // Need to run pending checkpoint and suspend barriers. Run checkpoints in runnable state in
       // case they need to use a ScopedObjectAccess. If we are holding the mutator lock and a SOA
       // attempts to TransitionFromSuspendedToRunnable, it results in a deadlock.
       TransitionToSuspendedAndRunCheckpoints(new_state);
       // Since we transitioned to a suspended state, check the pass barrier requests.
-      PassActiveSuspendBarriers();
+      CheckActiveSuspendBarriers();
     } else {
       while (true) {
         StateAndFlags new_state_and_flags = old_state_and_flags;
@@ -1601,8 +1681,25 @@
       REQUIRES(!Locks::thread_suspend_count_lock_, !Roles::uninterruptible_)
       REQUIRES_SHARED(Locks::mutator_lock_);
 
-  ALWAYS_INLINE void PassActiveSuspendBarriers()
-      REQUIRES(!Locks::thread_suspend_count_lock_, !Roles::uninterruptible_);
+  // Call PassActiveSuspendBarriers() if there are active barriers. Only called on current thread.
+  ALWAYS_INLINE void CheckActiveSuspendBarriers()
+      REQUIRES(!Locks::thread_suspend_count_lock_, !Locks::mutator_lock_, !Roles::uninterruptible_);
+
+  // Decrement all "suspend barriers" for the current thread, notifying threads that requested our
+  // suspension. Only called on current thread, when suspended. If suspend_count_ > 0 then we
+  // promise that we are and will remain "suspended" until the suspend count is decremented.
+  bool PassActiveSuspendBarriers()
+      REQUIRES(!Locks::thread_suspend_count_lock_, !Locks::mutator_lock_);
+
+  // Add an entry to active_suspend1_barriers.
+  ALWAYS_INLINE void AddSuspend1Barrier(WrappedSuspend1Barrier* suspend1_barrier)
+      REQUIRES(Locks::thread_suspend_count_lock_);
+
+  // Remove last-added entry from active_suspend1_barriers.
+  // Only makes sense if we're still holding thread_suspend_count_lock_ since insertion.
+  ALWAYS_INLINE void RemoveFirstSuspend1Barrier() REQUIRES(Locks::thread_suspend_count_lock_);
+
+  ALWAYS_INLINE bool HasActiveSuspendBarrier() REQUIRES(Locks::thread_suspend_count_lock_);
 
   // Registers the current thread as the jit sensitive thread. Should be called just once.
   static void SetJitSensitiveThread() {
@@ -1618,13 +1715,6 @@
     is_sensitive_thread_hook_ = is_sensitive_thread_hook;
   }
 
-  bool ModifySuspendCountInternal(Thread* self,
-                                  int delta,
-                                  AtomicInteger* suspend_barrier,
-                                  SuspendReason reason)
-      WARN_UNUSED
-      REQUIRES(Locks::thread_suspend_count_lock_);
-
   // Runs a single checkpoint function. If there are no more pending checkpoint functions it will
   // clear the kCheckpointRequest flag. The caller is responsible for calling this in a loop until
   // the kCheckpointRequest flag is cleared.
@@ -1633,9 +1723,6 @@
       REQUIRES_SHARED(Locks::mutator_lock_);
   void RunEmptyCheckpoint();
 
-  bool PassActiveSuspendBarriers(Thread* self)
-      REQUIRES(!Locks::thread_suspend_count_lock_);
-
   // Install the protected region for implicit stack checks.
   void InstallImplicitProtection();
 
@@ -1739,21 +1826,31 @@
   // Format state and flags as a hex string. For diagnostic output.
   std::string StateAndFlagsAsHexString() const;
 
-  // Run the flip function and, if requested, notify other threads that may have tried
+  // Run the flip function and notify other threads that may have tried
   // to do that concurrently.
-  void RunFlipFunction(Thread* self, bool notify) REQUIRES_SHARED(Locks::mutator_lock_);
+  void RunFlipFunction(Thread* self) REQUIRES_SHARED(Locks::mutator_lock_);
 
-  // Ensure that thread flip function started running. If no other thread is executing
-  // it, the calling thread shall run the flip function and then notify other threads
+  // Ensure that thread flip function for thread target started running. If no other thread is
+  // executing it, the calling thread shall run the flip function and then notify other threads
   // that have tried to do that concurrently. After this function returns, the
-  // `ThreadFlag::kPendingFlipFunction` is cleared but another thread may still
-  // run the flip function as indicated by the `ThreadFlag::kRunningFlipFunction`.
-  // A non-zero 'old_state_and_flags' indicates that the thread should logically
-  // acquire mutator lock if we win the race to run the flip function, if a
-  // suspend request is not already set. A zero 'old_state_and_flags' indicates
-  // we already hold the mutator lock.
-  // Returns true if this call ran the flip function.
-  bool EnsureFlipFunctionStarted(Thread* self, StateAndFlags old_state_and_flags = StateAndFlags(0))
+  // `ThreadFlag::kPendingFlipFunction` is cleared but another thread may still be running the
+  // flip function as indicated by the `ThreadFlag::kRunningFlipFunction`. Optional arguments:
+  //  - old_state_and_flags indicates the current and state and flags value for the thread, with
+  //    at least kPendingFlipFunction set. The thread should logically acquire the
+  //    mutator lock before running the flip function.  A special zero value indicates that the
+  //    thread already holds the mutator lock, and the actual state_and_flags must be read.
+  //    A non-zero value implies this == Current().
+  //  - If tef is non-null, we check that the target thread has not yet exited, as indicated by
+  //    tef. In that case, we acquire thread_list_lock_ as needed.
+  //  - If finished is non-null, we assign to *finished to indicate whether the flip was known to
+  //    be completed when we returned.
+  //  Returns true if and only if we acquired the mutator lock (which implies that we ran the flip
+  //  function after finding old_state_and_flags unchanged).
+  static bool EnsureFlipFunctionStarted(Thread* self,
+                                        Thread* target,
+                                        StateAndFlags old_state_and_flags = StateAndFlags(0),
+                                        ThreadExitFlag* tef = nullptr,
+                                        /*out*/ bool* finished = nullptr)
       TRY_ACQUIRE_SHARED(true, Locks::mutator_lock_);
 
   static void ThreadExitCallback(void* arg);
@@ -1799,7 +1896,6 @@
           throwing_OutOfMemoryError(false),
           no_thread_suspension(0),
           thread_exit_check_count(0),
-          is_transitioning_to_runnable(false),
           is_gc_marking(false),
           is_deopt_check_required(false),
           weak_ref_access_enabled(WeakRefAccessState::kVisiblyEnabled),
@@ -1809,8 +1905,7 @@
           make_visibly_initialized_counter(0),
           define_class_counter(0),
           num_name_readers(0),
-          shared_method_hotness(kSharedMethodHotnessThreshold)
-        {}
+          shared_method_hotness(kSharedMethodHotnessThreshold) {}
 
     // The state and flags field must be changed atomically so that flag values aren't lost.
     // See `StateAndFlags` for bit assignments of `ThreadFlag` and `ThreadState` values.
@@ -1846,11 +1941,6 @@
     // How many times has our pthread key's destructor been called?
     uint32_t thread_exit_check_count;
 
-    // True if the thread is in TransitionFromSuspendedToRunnable(). This is used to distinguish the
-    // non-runnable threads (eg. kNative, kWaiting) that are about to transition to runnable from
-    // the rest of them.
-    bool32_t is_transitioning_to_runnable;
-
     // True if the GC is in the marking phase. This is used for the CC collector only. This is
     // thread local so that we can simplify the logic to check for the fast path of read barriers of
     // GC roots.
@@ -1955,9 +2045,11 @@
                                name(nullptr),
                                pthread_self(0),
                                last_no_thread_suspension_cause(nullptr),
-                               thread_local_start(nullptr),
+                               active_suspendall_barrier(nullptr),
+                               active_suspend1_barriers(nullptr),
                                thread_local_pos(nullptr),
                                thread_local_end(nullptr),
+                               thread_local_start(nullptr),
                                thread_local_limit(nullptr),
                                thread_local_objects(0),
                                checkpoint_function(nullptr),
@@ -1969,7 +2061,8 @@
                                async_exception(nullptr),
                                top_reflective_handle_scope(nullptr),
                                method_trace_buffer(nullptr),
-                               method_trace_buffer_index(0) {
+                               method_trace_buffer_index(0),
+                               thread_exit_flags(nullptr) {
       std::fill(held_mutexes, held_mutexes + kLockLevelCount, nullptr);
     }
 
@@ -2067,27 +2160,37 @@
     // If no_thread_suspension_ is > 0, what is causing that assertion.
     const char* last_no_thread_suspension_cause;
 
-    // Pending barriers that require passing or NULL if non-pending. Installation guarding by
-    // Locks::thread_suspend_count_lock_.
-    // They work effectively as art::Barrier, but implemented directly using AtomicInteger and futex
-    // to avoid additional cost of a mutex and a condition variable, as used in art::Barrier.
-    AtomicInteger* active_suspend_barriers[kMaxSuspendBarriers];
-
-    // Thread-local allocation pointer. Moved here to force alignment for thread_local_pos on ARM.
-    uint8_t* thread_local_start;
+    // After a thread observes a suspend request and enters a suspended state,
+    // it notifies the requestor by arriving at a "suspend barrier". This consists of decrementing
+    // the atomic integer representing the barrier. (This implementation was introduced in 2015 to
+    // minimize cost. There may be other options.) These atomic integer barriers are always
+    // stored on the requesting thread's stack. They are referenced from the target thread's
+    // data structure in one of two ways; in either case the data structure referring to these
+    // barriers is guarded by suspend_count_lock:
+    // 1. A SuspendAll barrier is directly referenced from the target thread. Only one of these
+    // can be active at a time:
+    AtomicInteger* active_suspendall_barrier GUARDED_BY(Locks::thread_suspend_count_lock_);
+    // 2. For individual thread suspensions, active barriers are embedded in a struct that is used
+    // to link together all suspend requests for this thread. Unlike the SuspendAll case, each
+    // barrier is referenced by a single target thread, and thus can appear only on a single list.
+    // The struct as a whole is still stored on the requesting thread's stack.
+    WrappedSuspend1Barrier* active_suspend1_barriers GUARDED_BY(Locks::thread_suspend_count_lock_);
 
     // thread_local_pos and thread_local_end must be consecutive for ldrd and are 8 byte aligned for
     // potentially better performance.
     uint8_t* thread_local_pos;
     uint8_t* thread_local_end;
 
+    // Thread-local allocation pointer. Can be moved above the preceding two to correct alignment.
+    uint8_t* thread_local_start;
+
     // Thread local limit is how much we can expand the thread local buffer to, it is greater or
     // equal to thread_local_end.
     uint8_t* thread_local_limit;
 
     size_t thread_local_objects;
 
-    // Pending checkpoint function or null if non-pending. If this checkpoint is set and someone\
+    // Pending checkpoint function or null if non-pending. If this checkpoint is set and someone
     // requests another checkpoint, it goes to the checkpoint overflow list.
     Closure* checkpoint_function GUARDED_BY(Locks::thread_suspend_count_lock_);
 
@@ -2110,8 +2213,9 @@
     // Support for Mutex lock hierarchy bug detection.
     BaseMutex* held_mutexes[kLockLevelCount];
 
-    // The function used for thread flip.
-    Closure* flip_function;
+    // The function used for thread flip.  Set while holding Locks::thread_suspend_count_lock_ and
+    // with all other threads suspended.  May be cleared while being read.
+    std::atomic<Closure*> flip_function;
 
     union {
       // Thread-local mark stack for the concurrent copying collector.
@@ -2131,6 +2235,9 @@
 
     // The index of the next free entry in method_trace_buffer.
     size_t method_trace_buffer_index;
+
+    // Pointer to the first node of an intrusively doubly-linked list of ThreadExitFlags.
+    ThreadExitFlag* thread_exit_flags;
   } tlsPtr_;
 
   // Small thread-local cache to be used from the interpreter.
@@ -2278,20 +2385,6 @@
   Thread* const self_;
 };
 
-class ScopedTransitioningToRunnable : public ValueObject {
- public:
-  explicit ScopedTransitioningToRunnable(Thread* self)
-      : self_(self) {
-    DCHECK_EQ(self, Thread::Current());
-    self_->SetIsTransitioningToRunnable(true);
-  }
-
-  ~ScopedTransitioningToRunnable() { self_->SetIsTransitioningToRunnable(false); }
-
- private:
-  Thread* const self_;
-};
-
 class ThreadLifecycleCallback {
  public:
   virtual ~ThreadLifecycleCallback() {}
diff --git a/runtime/thread_list.cc b/runtime/thread_list.cc
index 5c71324..2ff1606 100644
--- a/runtime/thread_list.cc
+++ b/runtime/thread_list.cc
@@ -67,10 +67,6 @@
 using android::base::StringPrintf;
 
 static constexpr uint64_t kLongThreadSuspendThreshold = MsToNs(5);
-// Use 0 since we want to yield to prevent blocking for an unpredictable amount of time.
-static constexpr useconds_t kThreadSuspendInitialSleepUs = 0;
-static constexpr useconds_t kThreadSuspendMaxYieldUs = 3000;
-static constexpr useconds_t kThreadSuspendMaxSleepUs = 5000;
 
 // Whether we should try to dump the native stack of unattached threads. See commit ed8b723 for
 // some history.
@@ -79,7 +75,7 @@
 ThreadList::ThreadList(uint64_t thread_suspend_timeout_ns)
     : suspend_all_count_(0),
       unregistering_count_(0),
-      suspend_all_historam_("suspend all histogram", 16, 64),
+      suspend_all_histogram_("suspend all histogram", 16, 64),
       long_suspend_(false),
       shut_down_(false),
       thread_suspend_timeout_ns_(thread_suspend_timeout_ns),
@@ -140,10 +136,10 @@
   {
     ScopedObjectAccess soa(Thread::Current());
     // Only print if we have samples.
-    if (suspend_all_historam_.SampleSize() > 0) {
+    if (suspend_all_histogram_.SampleSize() > 0) {
       Histogram<uint64_t>::CumulativeData data;
-      suspend_all_historam_.CreateHistogram(&data);
-      suspend_all_historam_.PrintConfidenceIntervals(os, 0.99, data);  // Dump time to suspend.
+      suspend_all_histogram_.CreateHistogram(&data);
+      suspend_all_histogram_.PrintConfidenceIntervals(os, 0.99, data);  // Dump time to suspend.
     }
   }
   bool dump_native_stack = Runtime::Current()->GetDumpNativeStackOnSigQuit();
@@ -279,11 +275,11 @@
   }
 }
 
-void ThreadList::AssertThreadsAreSuspended(Thread* self, Thread* ignore1, Thread* ignore2) {
+void ThreadList::AssertOtherThreadsAreSuspended(Thread* self) {
   MutexLock mu(self, *Locks::thread_list_lock_);
   MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
   for (const auto& thread : list_) {
-    if (thread != ignore1 && thread != ignore2) {
+    if (thread != self) {
       CHECK(thread->IsSuspended())
             << "\nUnsuspended thread: <<" << *thread << "\n"
             << "self: <<" << *Thread::Current();
@@ -310,20 +306,9 @@
 }
 #endif
 
-// Unlike suspending all threads where we can wait to acquire the mutator_lock_, suspending an
-// individual thread requires polling. delay_us is the requested sleep wait. If delay_us is 0 then
-// we use sched_yield instead of calling usleep.
-// Although there is the possibility, here and elsewhere, that usleep could return -1 and
-// errno = EINTR, there should be no problem if interrupted, so we do not check.
-static void ThreadSuspendSleep(useconds_t delay_us) {
-  if (delay_us == 0) {
-    sched_yield();
-  } else {
-    usleep(delay_us);
-  }
-}
-
-size_t ThreadList::RunCheckpoint(Closure* checkpoint_function, Closure* callback) {
+size_t ThreadList::RunCheckpoint(Closure* checkpoint_function,
+                                 Closure* callback,
+                                 bool allow_lock_checking) {
   Thread* self = Thread::Current();
   Locks::mutator_lock_->AssertNotExclusiveHeld(self);
   Locks::thread_list_lock_->AssertNotHeld(self);
@@ -332,9 +317,12 @@
   std::vector<Thread*> suspended_count_modified_threads;
   size_t count = 0;
   {
-    // Call a checkpoint function for each thread, threads which are suspended get their checkpoint
-    // manually called.
+    // Call a checkpoint function for each thread. We directly invoke the function on behalf of
+    // suspended threads.
     MutexLock mu(self, *Locks::thread_list_lock_);
+    if (kIsDebugBuild && allow_lock_checking) {
+      self->DisallowPreMonitorMutexes();
+    }
     MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
     count = list_.size();
     for (const auto& thread : list_) {
@@ -345,39 +333,35 @@
             // This thread will run its checkpoint some time in the near future.
             if (requested_suspend) {
               // The suspend request is now unnecessary.
-              bool updated =
-                  thread->ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-              DCHECK(updated);
+              thread->DecrementSuspendCount(self);
               requested_suspend = false;
             }
             break;
           } else {
-            // The thread is probably suspended, try to make sure that it stays suspended.
-            if (thread->GetState() == ThreadState::kRunnable) {
-              // Spurious fail, try again.
-              continue;
-            }
+            // The thread was, and probably still is, suspended.
             if (!requested_suspend) {
-              bool updated =
-                  thread->ModifySuspendCount(self, +1, nullptr, SuspendReason::kInternal);
-              DCHECK(updated);
+              // This does not risk suspension cycles: We may have a pending suspension request,
+              // but it cannot block us: Checkpoint Run() functions may not suspend, thus we cannot
+              // be blocked from decrementing the count again.
+              thread->IncrementSuspendCount(self);
               requested_suspend = true;
-              if (thread->IsSuspended()) {
-                break;
-              }
-              // The thread raced us to become Runnable. Try to RequestCheckpoint() again.
-            } else {
-              // The thread previously raced our suspend request to become Runnable but
-              // since it is suspended again, it must honor that suspend request now.
-              DCHECK(thread->IsSuspended());
+            }
+            if (thread->IsSuspended()) {
+              // We saw it suspended after incrementing suspend count, so it will stay that way.
               break;
             }
           }
+          // We only get here if the thread entered kRunnable again. Retry immediately.
         }
+        // At this point, either the thread was runnable, and will run the checkpoint itself,
+        // or requested_suspend is true, and the thread is safely suspended.
         if (requested_suspend) {
+          DCHECK(thread->IsSuspended());
           suspended_count_modified_threads.push_back(thread);
         }
       }
+      // Thread either has honored or will honor the checkpoint, or it has been added to
+      // suspended_count_modified_threads.
     }
     // Run the callback to be called inside this critical section.
     if (callback != nullptr) {
@@ -388,6 +372,7 @@
   // Run the checkpoint on ourself while we wait for threads to suspend.
   checkpoint_function->Run(self);
 
+  bool mutator_lock_held = Locks::mutator_lock_->IsSharedHeld(self);
   bool repeat = true;
   // Run the checkpoint on the suspended threads.
   while (repeat) {
@@ -396,21 +381,23 @@
       if (thread != nullptr) {
         // We know for sure that the thread is suspended at this point.
         DCHECK(thread->IsSuspended());
-        // Make sure there is no pending flip function before running checkpoint
-        // on behalf of thread.
-        thread->EnsureFlipFunctionStarted(self);
-        if (thread->GetStateAndFlags(std::memory_order_acquire)
-                .IsAnyOfFlagsSet(Thread::FlipFunctionFlags())) {
-          // There is another thread running the flip function for 'thread'.
-          // Instead of waiting for it to complete, move to the next thread.
-          repeat = true;
-          continue;
-        }
+        if (mutator_lock_held) {
+          // Make sure there is no pending flip function before running Java-heap-accessing
+          // checkpoint on behalf of thread.
+          Thread::EnsureFlipFunctionStarted(self, thread);
+          if (thread->GetStateAndFlags(std::memory_order_acquire)
+                  .IsAnyOfFlagsSet(Thread::FlipFunctionFlags())) {
+            // There is another thread running the flip function for 'thread'.
+            // Instead of waiting for it to complete, move to the next thread.
+            repeat = true;
+            continue;
+          }
+        }  // O.w. the checkpoint will not access Java data structures, and doesn't care whether
+           // the flip function has been called.
         checkpoint_function->Run(thread);
         {
           MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-          bool updated = thread->ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-          DCHECK(updated);
+          thread->DecrementSuspendCount(self);
         }
         // We are done with 'thread' so set it to nullptr so that next outer
         // loop iteration, if any, skips 'thread'.
@@ -429,6 +416,9 @@
     Thread::resume_cond_->Broadcast(self);
   }
 
+  if (kIsDebugBuild && allow_lock_checking) {
+    self->AllowPreMonitorMutexes();
+  }
   return count;
 }
 
@@ -537,113 +527,153 @@
   }
 }
 
+// Separate function to disable just the right amount of thread-safety analysis.
+ALWAYS_INLINE void AcquireMutatorLockSharedUncontended(Thread* self)
+    ACQUIRE_SHARED(*Locks::mutator_lock_) NO_THREAD_SAFETY_ANALYSIS {
+  bool success = Locks::mutator_lock_->SharedTryLock(self, /*check=*/false);
+  CHECK(success);
+}
+
 // A checkpoint/suspend-all hybrid to switch thread roots from
 // from-space to to-space refs. Used to synchronize threads at a point
 // to mark the initiation of marking while maintaining the to-space
 // invariant.
-size_t ThreadList::FlipThreadRoots(Closure* thread_flip_visitor,
-                                   Closure* flip_callback,
-                                   gc::collector::GarbageCollector* collector,
-                                   gc::GcPauseListener* pause_listener) {
+void ThreadList::FlipThreadRoots(Closure* thread_flip_visitor,
+                                 Closure* flip_callback,
+                                 gc::collector::GarbageCollector* collector,
+                                 gc::GcPauseListener* pause_listener) {
   TimingLogger::ScopedTiming split("ThreadListFlip", collector->GetTimings());
   Thread* self = Thread::Current();
   Locks::mutator_lock_->AssertNotHeld(self);
   Locks::thread_list_lock_->AssertNotHeld(self);
   Locks::thread_suspend_count_lock_->AssertNotHeld(self);
   CHECK_NE(self->GetState(), ThreadState::kRunnable);
-  size_t runnable_thread_count = 0;
-  std::vector<Thread*> other_threads;
 
   collector->GetHeap()->ThreadFlipBegin(self);  // Sync with JNI critical calls.
 
   // ThreadFlipBegin happens before we suspend all the threads, so it does not
   // count towards the pause.
   const uint64_t suspend_start_time = NanoTime();
-  SuspendAllInternal(self, self, nullptr);
+  VLOG(threads) << "Suspending all for thread flip";
+  SuspendAllInternal(self);
   if (pause_listener != nullptr) {
     pause_listener->StartPause();
   }
 
   // Run the flip callback for the collector.
   Locks::mutator_lock_->ExclusiveLock(self);
-  suspend_all_historam_.AdjustAndAddValue(NanoTime() - suspend_start_time);
+  suspend_all_histogram_.AdjustAndAddValue(NanoTime() - suspend_start_time);
   flip_callback->Run(self);
-  // Releasing mutator-lock *before* setting up flip function in the threads
-  // leaves a gap for another thread trying to suspend all threads. That thread
-  // gets to run with mutator-lock, thereby accessing the heap, without running
-  // its flip function. It's not a problem with CC as the gc-thread hasn't
-  // started marking yet and the from-space is accessible. By delaying releasing
-  // mutator-lock until after the flip function are running on all threads we
-  // fix that without increasing pause time, except for any thread that might be
-  // trying to suspend all. Even though the change works irrespective of the GC,
-  // it has been limited to userfaultfd GC to keep the change behind the flag.
-  //
-  // TODO: It's a temporary change as aosp/2377951 is going to clean-up at a
-  // broad scale, including not allowing concurrent suspend-all.
 
-  // Resume runnable threads.
+  std::vector<Thread*> flipping_threads;  // All suspended threads. Includes us.
+  int thread_count;
+  // Flipping threads might exit between the time we resume them and try to run the flip function.
+  // Track that in a parallel vector.
+  std::unique_ptr<ThreadExitFlag[]> exit_flags;
   {
     TimingLogger::ScopedTiming split2("ResumeRunnableThreads", collector->GetTimings());
     MutexLock mu(self, *Locks::thread_list_lock_);
     MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-    --suspend_all_count_;
+    thread_count = list_.size();
+    exit_flags.reset(new ThreadExitFlag[thread_count]);
+    flipping_threads.resize(thread_count, nullptr);
+    int i = 1;
     for (Thread* thread : list_) {
       // Set the flip function for all threads because once we start resuming any threads,
       // they may need to run the flip function on behalf of other threads, even this one.
+      DCHECK(thread == self || thread->IsSuspended());
       thread->SetFlipFunction(thread_flip_visitor);
-      if (thread == self) {
-        continue;
-      }
-      // Resume early the threads that were runnable but are suspended just for this thread flip or
-      // about to transition from non-runnable (eg. kNative at the SOA entry in a JNI function) to
-      // runnable (both cases waiting inside Thread::TransitionFromSuspendedToRunnable), or waiting
-      // for the thread flip to end at the JNI critical section entry (kWaitingForGcThreadFlip),
-      ThreadState state = thread->GetState();
-      if ((state == ThreadState::kWaitingForGcThreadFlip || thread->IsTransitioningToRunnable()) &&
-          thread->GetSuspendCount() == 1) {
-        // The thread will resume right after the broadcast.
-        bool updated = thread->ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-        DCHECK(updated);
-        ++runnable_thread_count;
-      } else {
-        other_threads.push_back(thread);
-      }
+      // Put ourselves first, so other threads are more likely to have finished before we get
+      // there.
+      int thread_index = thread == self ? 0 : i++;
+      flipping_threads[thread_index] = thread;
+      thread->NotifyOnThreadExit(&exit_flags[thread_index]);
     }
-    Thread::resume_cond_->Broadcast(self);
+    DCHECK(i == thread_count);
   }
 
-  collector->RegisterPause(NanoTime() - suspend_start_time);
   if (pause_listener != nullptr) {
     pause_listener->EndPause();
   }
-  collector->GetHeap()->ThreadFlipEnd(self);
+  // Any new threads created after this will be created by threads that already ran their flip
+  // functions. In the normal GC use case in which the flip function converts all local references
+  // to to-space references, these newly created threads will also see only to-space references.
+
+  // Resume threads, making sure that we do not release suspend_count_lock_ until we've reacquired
+  // the mutator_lock_ in shared mode, and decremented suspend_all_count_.  This avoids a
+  // concurrent SuspendAll, and ensures that newly started threads see a correct value of
+  // suspend_all_count.
+  {
+    MutexLock mu(self, *Locks::thread_list_lock_);
+    Locks::thread_suspend_count_lock_->Lock(self);
+    ResumeAllInternal(self);
+  }
+
+  collector->RegisterPause(NanoTime() - suspend_start_time);
+
+  // Since all threads were suspended, they will attempt to run the flip function before
+  // reentering a runnable state. We will also attempt to run the flip functions ourselves.  Any
+  // intervening checkpoint request will do the same.  Exactly one of those flip function attempts
+  // will succeed, and the target thread will not be able to reenter a runnable state until one of
+  // them does.
 
   // Try to run the closure on the other threads.
-  {
-    TimingLogger::ScopedTiming split3("FlipOtherThreads", collector->GetTimings());
-    for (Thread* thread : other_threads) {
-      thread->EnsureFlipFunctionStarted(self);
-      DCHECK(!thread->ReadFlag(ThreadFlag::kPendingFlipFunction));
+  TimingLogger::ScopedTiming split3("RunningThreadFlips", collector->GetTimings());
+  // Reacquire the mutator lock while holding suspend_count_lock. This cannot fail, since we
+  // do not acquire the mutator lock unless suspend_all_count was read as 0 while holding
+  // suspend_count_lock. We did not release suspend_count_lock since releasing the mutator
+  // lock.
+  AcquireMutatorLockSharedUncontended(self);
+
+  Locks::thread_suspend_count_lock_->Unlock(self);
+  // Concurrent SuspendAll may now see zero suspend_all_count_, but block on mutator_lock_.
+
+  collector->GetHeap()->ThreadFlipEnd(self);
+
+  for (int i = 0; i < thread_count; ++i) {
+    bool finished;
+    Thread::EnsureFlipFunctionStarted(
+        self, flipping_threads[i], Thread::StateAndFlags(0), &exit_flags[i], &finished);
+    if (finished) {
+      MutexLock mu2(self, *Locks::thread_list_lock_);
+      flipping_threads[i]->UnregisterThreadExitFlag(&exit_flags[i]);
+      flipping_threads[i] = nullptr;
     }
-    // Try to run the flip function for self.
-    self->EnsureFlipFunctionStarted(self);
-    DCHECK(!self->ReadFlag(ThreadFlag::kPendingFlipFunction));
   }
-
-  Locks::mutator_lock_->ExclusiveUnlock(self);
-
-  // Resume other threads.
-  {
-    TimingLogger::ScopedTiming split4("ResumeOtherThreads", collector->GetTimings());
-    MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-    for (const auto& thread : other_threads) {
-      bool updated = thread->ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-      DCHECK(updated);
+  // Make sure all flips complete before we return.
+  for (int i = 0; i < thread_count; ++i) {
+    if (UNLIKELY(flipping_threads[i] != nullptr)) {
+      flipping_threads[i]->WaitForFlipFunctionTestingExited(self, &exit_flags[i]);
+      MutexLock mu2(self, *Locks::thread_list_lock_);
+      flipping_threads[i]->UnregisterThreadExitFlag(&exit_flags[i]);
     }
-    Thread::resume_cond_->Broadcast(self);
   }
+  Locks::mutator_lock_->SharedUnlock(self);
+}
 
-  return runnable_thread_count + other_threads.size() + 1;  // +1 for self.
+bool ThreadList::WaitForSuspendBarrier(AtomicInteger* barrier) {
+#if ART_USE_FUTEXES
+  timespec wait_timeout;
+  InitTimeSpec(false, CLOCK_MONOTONIC, NsToMs(thread_suspend_timeout_ns_), 0, &wait_timeout);
+#endif
+  while (true) {
+    int32_t cur_val = barrier->load(std::memory_order_acquire);
+    if (cur_val <= 0) {
+      CHECK_EQ(cur_val, 0);
+      return true;
+    }
+#if ART_USE_FUTEXES
+    if (futex(barrier->Address(), FUTEX_WAIT_PRIVATE, cur_val, &wait_timeout, nullptr, 0) != 0) {
+      if (errno == ETIMEDOUT) {
+        return false;
+      } else if (errno != EAGAIN && errno != EINTR) {
+        PLOG(FATAL) << "futex wait for suspend barrier failed";
+      }
+    }
+#endif
+    // Else spin wait. This is likely to be slow, but ART_USE_FUTEXES is set on Linux,
+    // including all targets.
+  }
 }
 
 void ThreadList::SuspendAll(const char* cause, bool long_suspend) {
@@ -658,7 +688,7 @@
     ScopedTrace trace("Suspending mutator threads");
     const uint64_t start_time = NanoTime();
 
-    SuspendAllInternal(self, self);
+    SuspendAllInternal(self);
     // All threads are known to have suspended (but a thread may still own the mutator lock)
     // Make sure this thread grabs exclusive access to the mutator lock and its protected data.
 #if HAVE_TIMED_RWLOCK
@@ -682,16 +712,21 @@
 
     const uint64_t end_time = NanoTime();
     const uint64_t suspend_time = end_time - start_time;
-    suspend_all_historam_.AdjustAndAddValue(suspend_time);
+    suspend_all_histogram_.AdjustAndAddValue(suspend_time);
     if (suspend_time > kLongThreadSuspendThreshold) {
       LOG(WARNING) << "Suspending all threads took: " << PrettyDuration(suspend_time);
     }
 
     if (kDebugLocking) {
       // Debug check that all threads are suspended.
-      AssertThreadsAreSuspended(self, self);
+      AssertOtherThreadsAreSuspended(self);
     }
   }
+
+  // SuspendAllInternal blocks if we are in the middle of a flip.
+  DCHECK(!self->ReadFlag(ThreadFlag::kPendingFlipFunction));
+  DCHECK(!self->ReadFlag(ThreadFlag::kRunningFlipFunction));
+
   ATraceBegin((std::string("Mutator threads suspended for ") + cause).c_str());
 
   if (self != nullptr) {
@@ -702,10 +737,9 @@
 }
 
 // Ensures all threads running Java suspend and that those not running Java don't start.
-void ThreadList::SuspendAllInternal(Thread* self,
-                                    Thread* ignore1,
-                                    Thread* ignore2,
-                                    SuspendReason reason) {
+void ThreadList::SuspendAllInternal(Thread* self, SuspendReason reason) {
+  // self can be nullptr if this is an unregistered thread.
+  const uint64_t start_time = NanoTime();
   Locks::mutator_lock_->AssertNotExclusiveHeld(self);
   Locks::thread_list_lock_->AssertNotHeld(self);
   Locks::thread_suspend_count_lock_->AssertNotHeld(self);
@@ -723,91 +757,97 @@
 
   // The atomic counter for number of threads that need to pass the barrier.
   AtomicInteger pending_threads;
-  uint32_t num_ignored = 0;
-  if (ignore1 != nullptr) {
-    ++num_ignored;
-  }
-  if (ignore2 != nullptr && ignore1 != ignore2) {
-    ++num_ignored;
-  }
-  {
-    MutexLock mu(self, *Locks::thread_list_lock_);
-    MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-    // Update global suspend all state for attaching threads.
-    ++suspend_all_count_;
-    pending_threads.store(list_.size() - num_ignored, std::memory_order_relaxed);
-    // Increment everybody's suspend count (except those that should be ignored).
-    for (const auto& thread : list_) {
-      if (thread == ignore1 || thread == ignore2) {
+
+  for (int iter_count = 1;; ++iter_count) {
+    {
+      MutexLock mu(self, *Locks::thread_list_lock_);
+      MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
+      if (suspend_all_count_ == 0) {
+        // Never run multiple SuspendAlls concurrently.
+        // If we are asked to suspend ourselves, we proceed anyway, but must ignore suspend
+        // request from other threads until we resume them.
+        bool found_myself = false;
+        // Update global suspend all state for attaching threads.
+        ++suspend_all_count_;
+        pending_threads.store(list_.size() - (self == nullptr ? 0 : 1), std::memory_order_relaxed);
+        // Increment everybody else's suspend count.
+        for (const auto& thread : list_) {
+          if (thread == self) {
+            found_myself = true;
+          } else {
+            VLOG(threads) << "requesting thread suspend: " << *thread;
+            DCHECK_EQ(suspend_all_count_, 1);
+            thread->IncrementSuspendCount(self, &pending_threads, nullptr, reason);
+            if (thread->IsSuspended()) {
+              // Effectively pass the barrier on behalf of the already suspended thread.
+              // The thread itself cannot yet have acted on our request since we still hold the
+              // suspend_count_lock_, and it will notice that kActiveSuspendBarrier has already
+              // been cleared if and when it acquires the lock in PassActiveSuspendBarriers().
+              DCHECK_EQ(thread->tlsPtr_.active_suspendall_barrier, &pending_threads);
+              pending_threads.fetch_sub(1, std::memory_order_seq_cst);
+              thread->tlsPtr_.active_suspendall_barrier = nullptr;
+              if (!thread->HasActiveSuspendBarrier()) {
+                thread->AtomicClearFlag(ThreadFlag::kActiveSuspendBarrier);
+              }
+            }
+            // else:
+            // The target thread was not yet suspended, and hence will be forced to execute
+            // TransitionFromRunnableToSuspended shortly. Since we set the kSuspendRequest flag
+            // before checking, and it checks kActiveSuspendBarrier after noticing kSuspendRequest,
+            // it must notice kActiveSuspendBarrier when it does. Thus it is guaranteed to
+            // decrement the suspend barrier. We're relying on store; load ordering here, but
+            // that's not a problem, since state and flags all reside in the same atomic, and
+            // are thus properly ordered, even for relaxed accesses.
+          }
+        }
+        self->AtomicSetFlag(ThreadFlag::kSuspensionImmune, std::memory_order_relaxed);
+        DCHECK(self == nullptr || found_myself);
+        break;
+      }
+    }
+    if (iter_count >= kMaxSuspendRetries) {
+      LOG(FATAL) << "Too many SuspendAll retries: " << iter_count;
+    } else {
+      MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
+      DCHECK_LE(suspend_all_count_, 1);
+      if (suspend_all_count_ != 0) {
+        // This may take a while, and we're not runnable, and thus would otherwise not block.
+        Thread::resume_cond_->WaitHoldingLocks(self);
         continue;
       }
-      VLOG(threads) << "requesting thread suspend: " << *thread;
-      bool updated = thread->ModifySuspendCount(self, +1, &pending_threads, reason);
-      DCHECK(updated);
-
-      // Must install the pending_threads counter first, then check thread->IsSuspend() and clear
-      // the counter. Otherwise there's a race with Thread::TransitionFromRunnableToSuspended()
-      // that can lead a thread to miss a call to PassActiveSuspendBarriers().
-      if (thread->IsSuspended()) {
-        // Only clear the counter for the current thread.
-        thread->ClearSuspendBarrier(&pending_threads);
-        pending_threads.fetch_sub(1, std::memory_order_seq_cst);
-      }
     }
+    // We're already not runnable, so an attempt to suspend us should succeed.
   }
 
-  // Wait for the barrier to be passed by all runnable threads. This wait
-  // is done with a timeout so that we can detect problems.
-#if ART_USE_FUTEXES
-  timespec wait_timeout;
-  InitTimeSpec(false, CLOCK_MONOTONIC, NsToMs(thread_suspend_timeout_ns_), 0, &wait_timeout);
-#endif
-  const uint64_t start_time = NanoTime();
-  while (true) {
-    int32_t cur_val = pending_threads.load(std::memory_order_relaxed);
-    if (LIKELY(cur_val > 0)) {
-#if ART_USE_FUTEXES
-      if (futex(pending_threads.Address(), FUTEX_WAIT_PRIVATE, cur_val, &wait_timeout, nullptr, 0)
-          != 0) {
-        if ((errno == EAGAIN) || (errno == EINTR)) {
-          // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
-          continue;
-        }
-        if (errno == ETIMEDOUT) {
-          const uint64_t wait_time = NanoTime() - start_time;
-          MutexLock mu(self, *Locks::thread_list_lock_);
-          MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-          std::ostringstream oss;
-          for (const auto& thread : list_) {
-            if (thread == ignore1 || thread == ignore2) {
-              continue;
-            }
-            if (!thread->IsSuspended()) {
-              oss << std::endl << "Thread not suspended: " << *thread;
-            }
-          }
-          LOG(kIsDebugBuild ? ::android::base::FATAL : ::android::base::ERROR)
-              << "Timed out waiting for threads to suspend, waited for "
-              << PrettyDuration(wait_time)
-              << oss.str();
-        } else {
-          PLOG(FATAL) << "futex wait failed for SuspendAllInternal()";
-        }
-      }  // else re-check pending_threads in the next iteration (this may be a spurious wake-up).
-#else
-      // Spin wait. This is likely to be slow, but on most architecture ART_USE_FUTEXES is set.
-      UNUSED(start_time);
-#endif
-    } else {
-      CHECK_EQ(cur_val, 0);
-      break;
+  if (!WaitForSuspendBarrier(&pending_threads)) {
+    const uint64_t wait_time = NanoTime() - start_time;
+    MutexLock mu(self, *Locks::thread_list_lock_);
+    MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
+    std::ostringstream oss;
+    for (const auto& thread : list_) {
+      if (thread != self && !thread->IsSuspended()) {
+        oss << std::endl << "Thread not suspended: " << *thread;
+      }
     }
+    LOG(::android::base::FATAL) << "Timed out waiting for threads to suspend, waited for "
+                                << PrettyDuration(wait_time) << oss.str();
   }
 }
 
 void ThreadList::ResumeAll() {
   Thread* self = Thread::Current();
+  if (kDebugLocking) {
+    // Debug check that all threads are suspended.
+    AssertOtherThreadsAreSuspended(self);
+  }
+  MutexLock mu(self, *Locks::thread_list_lock_);
+  MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
+  ResumeAllInternal(self);
+}
 
+// Holds thread_list_lock_ and suspend_count_lock_
+void ThreadList::ResumeAllInternal(Thread* self) {
+  DCHECK_NE(self->GetState(), ThreadState::kRunnable);
   if (self != nullptr) {
     VLOG(threads) << *self << " ResumeAll starting";
   } else {
@@ -818,38 +858,32 @@
 
   ScopedTrace trace("Resuming mutator threads");
 
-  if (kDebugLocking) {
-    // Debug check that all threads are suspended.
-    AssertThreadsAreSuspended(self, self);
-  }
-
   long_suspend_ = false;
 
   Locks::mutator_lock_->ExclusiveUnlock(self);
-  {
-    MutexLock mu(self, *Locks::thread_list_lock_);
-    MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-    // Update global suspend all state for attaching threads.
-    --suspend_all_count_;
-    // Decrement the suspend counts for all threads.
-    for (const auto& thread : list_) {
-      if (thread == self) {
-        continue;
-      }
-      bool updated = thread->ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-      DCHECK(updated);
-    }
 
-    // Broadcast a notification to all suspended threads, some or all of
-    // which may choose to wake up.  No need to wait for them.
-    if (self != nullptr) {
-      VLOG(threads) << *self << " ResumeAll waking others";
-    } else {
-      VLOG(threads) << "Thread[null] ResumeAll waking others";
+  // Decrement the suspend counts for all threads.
+  for (const auto& thread : list_) {
+    if (thread != self) {
+      thread->DecrementSuspendCount(self);
     }
-    Thread::resume_cond_->Broadcast(self);
   }
 
+  // Update global suspend all state for attaching threads. Unblocks other SuspendAlls once
+  // suspend_count_lock_ is released.
+  --suspend_all_count_;
+  self->AtomicClearFlag(ThreadFlag::kSuspensionImmune, std::memory_order_relaxed);
+  // Pending suspend requests for us will be handled when we become Runnable again.
+
+  // Broadcast a notification to all suspended threads, some or all of
+  // which may choose to wake up.  No need to wait for them.
+  if (self != nullptr) {
+    VLOG(threads) << *self << " ResumeAll waking others";
+  } else {
+    VLOG(threads) << "Thread[null] ResumeAll waking others";
+  }
+  Thread::resume_cond_->Broadcast(self);
+
   if (self != nullptr) {
     VLOG(threads) << *self << " ResumeAll complete";
   } else {
@@ -882,11 +916,7 @@
           << ") thread not within thread list";
       return false;
     }
-    if (UNLIKELY(!thread->ModifySuspendCount(self, -1, nullptr, reason))) {
-      LOG(ERROR) << "Resume(" << reinterpret_cast<void*>(thread)
-                 << ") could not modify suspend count.";
-      return false;
-    }
+    thread->DecrementSuspendCount(self, /*for_user_code=*/(reason == SuspendReason::kForUserCode));
   }
 
   {
@@ -912,40 +942,19 @@
   }
 }
 
-Thread* ThreadList::SuspendThreadByPeer(jobject peer,
-                                        SuspendReason reason,
-                                        bool* timed_out) {
-  bool request_suspension = true;
-  const uint64_t start_time = NanoTime();
-  int self_suspend_count = 0;
-  useconds_t sleep_us = kThreadSuspendInitialSleepUs;
-  *timed_out = false;
+Thread* ThreadList::SuspendThreadByPeer(jobject peer, SuspendReason reason) {
+  bool is_suspended = false;
   Thread* const self = Thread::Current();
-  Thread* suspended_thread = nullptr;
   VLOG(threads) << "SuspendThreadByPeer starting";
-  while (true) {
-    Thread* thread;
+  Thread* thread;
+  WrappedSuspend1Barrier wrapped_barrier{};
+  for (int iter_count = 1;; ++iter_count) {
     {
-      // Note: this will transition to runnable and potentially suspend. We ensure only one thread
-      // is requesting another suspend, to avoid deadlock, by requiring this function be called
-      // holding Locks::thread_list_suspend_thread_lock_. Its important this thread suspend rather
-      // than request thread suspension, to avoid potential cycles in threads requesting each other
-      // suspend.
+      // Note: this will transition to runnable and potentially suspend.
       ScopedObjectAccess soa(self);
       MutexLock thread_list_mu(self, *Locks::thread_list_lock_);
       thread = Thread::FromManagedThread(soa, peer);
       if (thread == nullptr) {
-        if (suspended_thread != nullptr) {
-          MutexLock suspend_count_mu(self, *Locks::thread_suspend_count_lock_);
-          // If we incremented the suspend count but the thread reset its peer, we need to
-          // re-decrement it since it is shutting down and may deadlock the runtime in
-          // ThreadList::WaitForOtherNonDaemonThreadsToExit.
-          bool updated = suspended_thread->ModifySuspendCount(soa.Self(),
-                                                              -1,
-                                                              nullptr,
-                                                              reason);
-          DCHECK(updated);
-        }
         ThreadSuspendByPeerWarning(soa,
                                    ::android::base::WARNING,
                                     "No such thread for suspend",
@@ -953,84 +962,64 @@
         return nullptr;
       }
       if (!Contains(thread)) {
-        CHECK(suspended_thread == nullptr);
         VLOG(threads) << "SuspendThreadByPeer failed for unattached thread: "
             << reinterpret_cast<void*>(thread);
         return nullptr;
       }
+      // IsSuspended on the current thread will fail as the current thread is changed into
+      // Runnable above. As the suspend count is now raised if this is the current thread
+      // it will self suspend on transition to Runnable, making it hard to work with. It's simpler
+      // to just explicitly handle the current thread in the callers to this code.
+      CHECK_NE(thread, self) << "Attempt to suspend the current thread for the debugger";
       VLOG(threads) << "SuspendThreadByPeer found thread: " << *thread;
       {
         MutexLock suspend_count_mu(self, *Locks::thread_suspend_count_lock_);
-        if (request_suspension) {
-          if (self->GetSuspendCount() > 0) {
-            // We hold the suspend count lock but another thread is trying to suspend us. Its not
-            // safe to try to suspend another thread in case we get a cycle. Start the loop again
-            // which will allow this thread to be suspended.
-            ++self_suspend_count;
-            continue;
+        if (LIKELY(self->GetSuspendCount() == 0)) {
+          thread->IncrementSuspendCount(self, nullptr, &wrapped_barrier, reason);
+          if (thread->IsSuspended()) {
+            // See the discussion in mutator_gc_coord.md and SuspendAllInternal for the race here.
+            thread->RemoveFirstSuspend1Barrier();
+            if (!thread->HasActiveSuspendBarrier()) {
+              thread->AtomicClearFlag(ThreadFlag::kActiveSuspendBarrier);
+            }
+            is_suspended = true;
           }
-          CHECK(suspended_thread == nullptr);
-          suspended_thread = thread;
-          bool updated = suspended_thread->ModifySuspendCount(self, +1, nullptr, reason);
-          DCHECK(updated);
-          request_suspension = false;
-        } else {
-          // If the caller isn't requesting suspension, a suspension should have already occurred.
-          CHECK_GT(thread->GetSuspendCount(), 0);
+          DCHECK_GT(thread->GetSuspendCount(), 0);
+          break;
         }
-        // IsSuspended on the current thread will fail as the current thread is changed into
-        // Runnable above. As the suspend count is now raised if this is the current thread
-        // it will self suspend on transition to Runnable, making it hard to work with. It's simpler
-        // to just explicitly handle the current thread in the callers to this code.
-        CHECK_NE(thread, self) << "Attempt to suspend the current thread for the debugger";
-        // If thread is suspended (perhaps it was already not Runnable but didn't have a suspend
-        // count, or else we've waited and it has self suspended) or is the current thread, we're
-        // done.
-        if (thread->IsSuspended()) {
-          VLOG(threads) << "SuspendThreadByPeer thread suspended: " << *thread;
-          if (ATraceEnabled()) {
-            std::string name;
-            thread->GetThreadName(name);
-            ATraceBegin(StringPrintf("SuspendThreadByPeer suspended %s for peer=%p", name.c_str(),
-                                      peer).c_str());
-          }
-          return thread;
-        }
-        const uint64_t total_delay = NanoTime() - start_time;
-        if (total_delay >= thread_suspend_timeout_ns_) {
-          if (suspended_thread == nullptr) {
-            ThreadSuspendByPeerWarning(soa,
-                                       ::android::base::FATAL,
-                                       "Failed to issue suspend request",
-                                       peer);
-          } else {
-            CHECK_EQ(suspended_thread, thread);
-            LOG(WARNING) << "Suspended thread state_and_flags: "
-                         << suspended_thread->StateAndFlagsAsHexString()
-                         << ", self_suspend_count = " << self_suspend_count;
-            // Explicitly release thread_suspend_count_lock_; we haven't held it for long, so
-            // seeing threads blocked on it is not informative.
-            Locks::thread_suspend_count_lock_->Unlock(self);
-            ThreadSuspendByPeerWarning(soa,
-                                       ::android::base::FATAL,
-                                       "Thread suspension timed out",
-                                       peer);
-          }
-          UNREACHABLE();
-        } else if (sleep_us == 0 &&
-            total_delay > static_cast<uint64_t>(kThreadSuspendMaxYieldUs) * 1000) {
-          // We have spun for kThreadSuspendMaxYieldUs time, switch to sleeps to prevent
-          // excessive CPU usage.
-          sleep_us = kThreadSuspendMaxYieldUs / 2;
-        }
+        // Else we hold the suspend count lock but another thread is trying to suspend us,
+        // making it unsafe to try to suspend another thread in case we get a cycle.
+        // We start the loop again, which will allow this thread to be suspended.
       }
-      // Release locks and come out of runnable state.
     }
-    VLOG(threads) << "SuspendThreadByPeer waiting to allow thread chance to suspend";
-    ThreadSuspendSleep(sleep_us);
-    // This may stay at 0 if sleep_us == 0, but this is WAI since we want to avoid using usleep at
-    // all if possible. This shouldn't be an issue since time to suspend should always be small.
-    sleep_us = std::min(sleep_us * 2, kThreadSuspendMaxSleepUs);
+    // All locks are released, and we should quickly exit the suspend-unfriendly state. Retry.
+    if (iter_count >= kMaxSuspendRetries) {
+      LOG(FATAL) << "Too many suspend retries";
+    }
+    usleep(kThreadSuspendSleepUs);
+  }
+  // Now wait for target to decrement suspend barrier.
+  if (is_suspended || WaitForSuspendBarrier(&wrapped_barrier.barrier_)) {
+    // wrapped_barrier.barrier_ has been decremented and will no longer be accessed.
+    VLOG(threads) << "SuspendThreadByPeer thread suspended: " << *thread;
+    if (ATraceEnabled()) {
+      std::string name;
+      thread->GetThreadName(name);
+      ATraceBegin(
+          StringPrintf("SuspendThreadByPeer suspended %s for peer=%p", name.c_str(), peer).c_str());
+    }
+    DCHECK(thread->IsSuspended());
+    return thread;
+  } else {
+    LOG(WARNING) << "Suspended thread state_and_flags: " << thread->StateAndFlagsAsHexString();
+    // thread still has a pointer to wrapped_barrier. Returning and continuing would be unsafe
+    // without additional cleanup.
+    {
+      ScopedObjectAccess soa(self);
+      ThreadSuspendByPeerWarning(
+          soa, ::android::base::FATAL, "SuspendThreadByPeer timed out", peer);
+    }
+    UNREACHABLE();
   }
 }
 
@@ -1040,101 +1029,74 @@
   LOG(severity) << StringPrintf("%s: %d", message, thread_id);
 }
 
-Thread* ThreadList::SuspendThreadByThreadId(uint32_t thread_id,
-                                            SuspendReason reason,
-                                            bool* timed_out) {
-  const uint64_t start_time = NanoTime();
-  useconds_t sleep_us = kThreadSuspendInitialSleepUs;
-  *timed_out = false;
-  Thread* suspended_thread = nullptr;
+Thread* ThreadList::SuspendThreadByThreadId(uint32_t thread_id, SuspendReason reason) {
+  bool is_suspended = false;
   Thread* const self = Thread::Current();
   CHECK_NE(thread_id, kInvalidThreadId);
   VLOG(threads) << "SuspendThreadByThreadId starting";
-  while (true) {
+  Thread* thread;
+  WrappedSuspend1Barrier wrapped_barrier{};
+  for (int iter_count = 1;; ++iter_count) {
     {
-      // Note: this will transition to runnable and potentially suspend. We ensure only one thread
-      // is requesting another suspend, to avoid deadlock, by requiring this function be called
-      // holding Locks::thread_list_suspend_thread_lock_. Its important this thread suspend rather
-      // than request thread suspension, to avoid potential cycles in threads requesting each other
-      // suspend.
+      // Note: this will transition to runnable and potentially suspend.
       ScopedObjectAccess soa(self);
       MutexLock thread_list_mu(self, *Locks::thread_list_lock_);
-      Thread* thread = nullptr;
-      for (const auto& it : list_) {
-        if (it->GetThreadId() == thread_id) {
-          thread = it;
-          break;
-        }
-      }
+      thread = FindThreadByThreadId(thread_id);
       if (thread == nullptr) {
-        CHECK(suspended_thread == nullptr) << "Suspended thread " << suspended_thread
-            << " no longer in thread list";
         // There's a race in inflating a lock and the owner giving up ownership and then dying.
         ThreadSuspendByThreadIdWarning(::android::base::WARNING,
                                        "No such thread id for suspend",
                                        thread_id);
         return nullptr;
       }
-      VLOG(threads) << "SuspendThreadByThreadId found thread: " << *thread;
       DCHECK(Contains(thread));
+      CHECK_NE(thread, self) << "Attempt to suspend the current thread for the debugger";
+      VLOG(threads) << "SuspendThreadByThreadId found thread: " << *thread;
       {
         MutexLock suspend_count_mu(self, *Locks::thread_suspend_count_lock_);
-        if (suspended_thread == nullptr) {
-          if (self->GetSuspendCount() > 0) {
-            // We hold the suspend count lock but another thread is trying to suspend us. Its not
-            // safe to try to suspend another thread in case we get a cycle. Start the loop again
-            // which will allow this thread to be suspended.
-            continue;
+        if (LIKELY(self->GetSuspendCount() == 0)) {
+          thread->IncrementSuspendCount(self, nullptr, &wrapped_barrier, reason);
+          if (thread->IsSuspended()) {
+            // See the discussion in mutator_gc_coord.md and SuspendAllInternal for the race here.
+            thread->RemoveFirstSuspend1Barrier();
+            if (!thread->HasActiveSuspendBarrier()) {
+              thread->AtomicClearFlag(ThreadFlag::kActiveSuspendBarrier);
+            }
+            is_suspended = true;
           }
-          bool updated = thread->ModifySuspendCount(self, +1, nullptr, reason);
-          DCHECK(updated);
-          suspended_thread = thread;
-        } else {
-          CHECK_EQ(suspended_thread, thread);
-          // If the caller isn't requesting suspension, a suspension should have already occurred.
-          CHECK_GT(thread->GetSuspendCount(), 0);
+          DCHECK_GT(thread->GetSuspendCount(), 0);
+          break;
         }
-        // IsSuspended on the current thread will fail as the current thread is changed into
-        // Runnable above. As the suspend count is now raised if this is the current thread
-        // it will self suspend on transition to Runnable, making it hard to work with. It's simpler
-        // to just explicitly handle the current thread in the callers to this code.
-        CHECK_NE(thread, self) << "Attempt to suspend the current thread for the debugger";
-        // If thread is suspended (perhaps it was already not Runnable but didn't have a suspend
-        // count, or else we've waited and it has self suspended) or is the current thread, we're
-        // done.
-        if (thread->IsSuspended()) {
-          if (ATraceEnabled()) {
-            std::string name;
-            thread->GetThreadName(name);
-            ATraceBegin(StringPrintf("SuspendThreadByThreadId suspended %s id=%d",
-                                      name.c_str(), thread_id).c_str());
-          }
-          VLOG(threads) << "SuspendThreadByThreadId thread suspended: " << *thread;
-          return thread;
-        }
-        const uint64_t total_delay = NanoTime() - start_time;
-        if (total_delay >= thread_suspend_timeout_ns_) {
-          ThreadSuspendByThreadIdWarning(::android::base::WARNING,
-                                         "Thread suspension timed out",
-                                         thread_id);
-          if (suspended_thread != nullptr) {
-            bool updated = thread->ModifySuspendCount(soa.Self(), -1, nullptr, reason);
-            DCHECK(updated);
-          }
-          *timed_out = true;
-          return nullptr;
-        } else if (sleep_us == 0 &&
-            total_delay > static_cast<uint64_t>(kThreadSuspendMaxYieldUs) * 1000) {
-          // We have spun for kThreadSuspendMaxYieldUs time, switch to sleeps to prevent
-          // excessive CPU usage.
-          sleep_us = kThreadSuspendMaxYieldUs / 2;
-        }
+        // Else we hold the suspend count lock but another thread is trying to suspend us,
+        // making it unsafe to try to suspend another thread in case we get a cycle.
+        // Start the loop again, which will allow this thread to be suspended.
       }
-      // Release locks and come out of runnable state.
     }
-    VLOG(threads) << "SuspendThreadByThreadId waiting to allow thread chance to suspend";
-    ThreadSuspendSleep(sleep_us);
-    sleep_us = std::min(sleep_us * 2, kThreadSuspendMaxSleepUs);
+    // All locks are released, and we should quickly exit the suspend-unfriendly state. Retry.
+    if (iter_count >= kMaxSuspendRetries) {
+      LOG(FATAL) << "Too many suspend retries";
+    }
+    usleep(kThreadSuspendSleepUs);
+  }
+  // Now wait for target to decrement suspend barrier.
+  if (is_suspended || WaitForSuspendBarrier(&wrapped_barrier.barrier_)) {
+    // wrapped_barrier.barrier_ has been decremented and will no longer be accessed.
+    VLOG(threads) << "SuspendThreadByThreadId thread suspended: " << *thread;
+    if (ATraceEnabled()) {
+      std::string name;
+      thread->GetThreadName(name);
+      ATraceBegin(
+          StringPrintf("SuspendThreadByPeer suspended %s for id=%d", name.c_str(), thread_id)
+              .c_str());
+    }
+    DCHECK(thread->IsSuspended());
+    return thread;
+  } else {
+    // thread still has a pointer to wrapped_barrier. Returning and continuing would be unsafe
+    // without additional cleanup.
+    ThreadSuspendByThreadIdWarning(
+        ::android::base::FATAL, "SuspendThreadByThreadId timed out", thread_id);
+    UNREACHABLE();
   }
 }
 
@@ -1210,8 +1172,7 @@
       // daemons.
       CHECK(thread->IsDaemon()) << *thread;
       if (thread != self) {
-        bool updated = thread->ModifySuspendCount(self, +1, nullptr, SuspendReason::kInternal);
-        DCHECK(updated);
+        thread->IncrementSuspendCount(self);
         ++daemons_left;
       }
       // We are shutting down the runtime, set the JNI functions of all the JNIEnvs to be
@@ -1311,11 +1272,10 @@
   // SuspendAll requests.
   MutexLock mu(self, *Locks::thread_list_lock_);
   MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-  // Modify suspend count in increments of 1 to maintain invariants in ModifySuspendCount. While
-  // this isn't particularly efficient the suspend counts are most commonly 0 or 1.
-  for (int delta = suspend_all_count_; delta > 0; delta--) {
-    bool updated = self->ModifySuspendCount(self, +1, nullptr, SuspendReason::kInternal);
-    DCHECK(updated);
+  if (suspend_all_count_ == 1) {
+    self->IncrementSuspendCount(self);
+  } else {
+    DCHECK_EQ(suspend_all_count_, 0);
   }
   CHECK(!Contains(self));
   list_.push_back(self);
@@ -1361,6 +1321,7 @@
     // Note: deliberately not using MutexLock that could hold a stale self pointer.
     {
       MutexLock mu(self, *Locks::thread_list_lock_);
+      self->SignalExitFlags();
       if (!Contains(self)) {
         std::string thread_name;
         self->GetThreadName(thread_name);
@@ -1370,16 +1331,19 @@
         break;
       } else {
         MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
-        if (!self->IsSuspended()) {
-          list_.remove(self);
-          break;
+        if (!self->IsSuspended() && !self->ReadFlag(ThreadFlag::kRunningFlipFunction)) {
+          Thread::StateAndFlags state_and_flags = self->GetStateAndFlags(std::memory_order_acquire);
+          if (!state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction)) {
+            list_.remove(self);
+            break;
+          }
         }
       }
     }
     // In the case where we are not suspended yet, sleep to leave other threads time to execute.
     // This is important if there are realtime threads. b/111277984
     usleep(1);
-    // We failed to remove the thread due to a suspend request, loop and try again.
+    // We failed to remove the thread due to a suspend request or the like, loop and try again.
   }
   delete self;
 
@@ -1418,13 +1382,11 @@
     MutexLock mu(self, *Locks::thread_list_lock_);
     MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
     for (Thread* thread : list_) {
-      bool suspended = thread->ModifySuspendCount(self, +1, nullptr, SuspendReason::kInternal);
-      DCHECK(suspended);
+      thread->IncrementSuspendCount(self);
       if (thread == self || thread->IsSuspended()) {
         threads_to_visit.push_back(thread);
       } else {
-        bool resumed = thread->ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-        DCHECK(resumed);
+        thread->DecrementSuspendCount(self);
       }
     }
   }
@@ -1439,9 +1401,9 @@
   {
     MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
     for (Thread* thread : threads_to_visit) {
-      bool updated = thread->ModifySuspendCount(self, -1, nullptr, SuspendReason::kInternal);
-      DCHECK(updated);
+      thread->DecrementSuspendCount(self);
     }
+    Thread::resume_cond_->Broadcast(self);
   }
 }
 
diff --git a/runtime/thread_list.h b/runtime/thread_list.h
index 51fac4a..cad51aa 100644
--- a/runtime/thread_list.h
+++ b/runtime/thread_list.h
@@ -49,7 +49,11 @@
   static constexpr uint32_t kInvalidThreadId = 0;
   static constexpr uint32_t kMainThreadId = 1;
   static constexpr uint64_t kDefaultThreadSuspendTimeout =
-      kIsDebugBuild ? 50'000'000'000ull : 10'000'000'000ull;
+      kIsDebugBuild ? 2'000'000'000ull : 4'000'000'000ull;
+  // We fail more aggressively in debug builds to catch potential issues early.
+  // The number of times we may retry when we find ourselves in a suspend-unfriendly state.
+  static constexpr int kMaxSuspendRetries = kIsDebugBuild ? 500 : 5000;
+  static constexpr useconds_t kThreadSuspendSleepUs = 100;
 
   explicit ThreadList(uint64_t thread_suspend_timeout_ns);
   ~ThreadList();
@@ -70,7 +74,7 @@
   bool Resume(Thread* thread, SuspendReason reason = SuspendReason::kInternal)
       REQUIRES(!Locks::thread_suspend_count_lock_) WARN_UNUSED;
 
-  // Suspends all threads and gets exclusive access to the mutator lock.
+  // Suspends all other threads and gets exclusive access to the mutator lock.
   // If long_suspend is true, then other threads who try to suspend will never timeout.
   // long_suspend is currenly used for hprof since large heaps take a long time.
   void SuspendAll(const char* cause, bool long_suspend = false)
@@ -81,10 +85,7 @@
 
   // Suspend a thread using a peer, typically used by the debugger. Returns the thread on success,
   // else null. The peer is used to identify the thread to avoid races with the thread terminating.
-  // If the suspension times out then *timeout is set to true.
-  Thread* SuspendThreadByPeer(jobject peer,
-                              SuspendReason reason,
-                              bool* timed_out)
+  Thread* SuspendThreadByPeer(jobject peer, SuspendReason reason)
       REQUIRES(!Locks::mutator_lock_,
                !Locks::thread_list_lock_,
                !Locks::thread_suspend_count_lock_);
@@ -92,8 +93,8 @@
   // Suspend a thread using its thread id, typically used by lock/monitor inflation. Returns the
   // thread on success else null. The thread id is used to identify the thread to avoid races with
   // the thread terminating. Note that as thread ids are recycled this may not suspend the expected
-  // thread, that may be terminating. If the suspension times out then *timeout is set to true.
-  Thread* SuspendThreadByThreadId(uint32_t thread_id, SuspendReason reason, bool* timed_out)
+  // thread, that may be terminating.
+  Thread* SuspendThreadByThreadId(uint32_t thread_id, SuspendReason reason)
       REQUIRES(!Locks::mutator_lock_,
                !Locks::thread_list_lock_,
                !Locks::thread_suspend_count_lock_);
@@ -113,11 +114,24 @@
   // Running threads are not suspended but run the checkpoint inside of the suspend check. The
   // return value includes already suspended threads for b/24191051. Runs or requests the
   // callback, if non-null, inside the thread_list_lock critical section after determining the
-  // runnable/suspended states of the threads. Does not wait for completion of the callbacks in
-  // running threads.
-  size_t RunCheckpoint(Closure* checkpoint_function, Closure* callback = nullptr)
+  // runnable/suspended states of the threads. Does not wait for completion of the checkpoint
+  // function in running threads. If the caller holds the mutator lock, then all instances of the
+  // checkpoint function are run with the mutator lock. If the caller does not hold the mutator
+  // lock (see mutator_gc_coord.md) then, since the checkpoint code may not acquire or release the
+  // mutator lock, the checkpoint will have no way to access Java data.
+  // TODO: Is it possible to just require the mutator lock here?
+  size_t RunCheckpoint(Closure* checkpoint_function,
+                       Closure* callback = nullptr,
+                       bool allow_lock_checking = true)
       REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
 
+  // Convenience version of the above to disable lock checking inside Run function. Hopefully this
+  // and the third parameter above will eventually disappear.
+  size_t RunCheckpointUnchecked(Closure* checkpoint_function, Closure* callback = nullptr)
+      REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_) {
+    return RunCheckpoint(checkpoint_function, callback, false);
+  }
+
   // Run an empty checkpoint on threads. Wait until threads pass the next suspend point or are
   // suspended. This is used to ensure that the threads finish or aren't in the middle of an
   // in-flight mutator heap access (eg. a read barrier.) Runnable threads will respond by
@@ -126,12 +140,17 @@
   void RunEmptyCheckpoint()
       REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
 
-  // Flip thread roots from from-space refs to to-space refs. Used by
-  // the concurrent moving collectors.
-  size_t FlipThreadRoots(Closure* thread_flip_visitor,
-                         Closure* flip_callback,
-                         gc::collector::GarbageCollector* collector,
-                         gc::GcPauseListener* pause_listener)
+  // Used to flip thread roots from from-space refs to to-space refs. Used only by the concurrent
+  // moving collectors during a GC, and hence cannot be called from multiple threads concurrently.
+  //
+  // Briefly suspends all threads to atomically install a checkpoint-like thread_flip_visitor
+  // function to be run on each thread. Run flip_callback while threads are suspended.
+  // Thread_flip_visitors are run by each thread before it becomes runnable, or by us. We do not
+  // return until all thread_flip_visitors have been run.
+  void FlipThreadRoots(Closure* thread_flip_visitor,
+                       Closure* flip_callback,
+                       gc::collector::GarbageCollector* collector,
+                       gc::GcPauseListener* pause_listener)
       REQUIRES(!Locks::mutator_lock_,
                !Locks::thread_list_lock_,
                !Locks::thread_suspend_count_lock_);
@@ -186,26 +205,29 @@
       REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_,
                !Locks::mutator_lock_);
 
+  // Wait for suspend barrier to reach zero. Return false on timeout.
+  bool WaitForSuspendBarrier(AtomicInteger* barrier);
+
  private:
   uint32_t AllocThreadId(Thread* self);
   void ReleaseThreadId(Thread* self, uint32_t id) REQUIRES(!Locks::allocated_thread_ids_lock_);
 
-  size_t RunCheckpoint(Closure* checkpoint_function, bool includeSuspended)
-      REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
-
   void DumpUnattachedThreads(std::ostream& os, bool dump_native_stack)
       REQUIRES(!Locks::thread_list_lock_);
 
   void SuspendAllDaemonThreadsForShutdown()
       REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
 
-  void SuspendAllInternal(Thread* self,
-                          Thread* ignore1,
-                          Thread* ignore2 = nullptr,
-                          SuspendReason reason = SuspendReason::kInternal)
-      REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
+  void ResumeAllInternal(Thread* self)
+      REQUIRES(Locks::thread_list_lock_, Locks::thread_suspend_count_lock_)
+          UNLOCK_FUNCTION(Locks::mutator_lock_);
 
-  void AssertThreadsAreSuspended(Thread* self, Thread* ignore1, Thread* ignore2 = nullptr)
+  void SuspendAllInternal(Thread* self, SuspendReason reason = SuspendReason::kInternal)
+      REQUIRES(!Locks::thread_list_lock_,
+               !Locks::thread_suspend_count_lock_,
+               !Locks::mutator_lock_);
+
+  void AssertOtherThreadsAreSuspended(Thread* self)
       REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
 
   std::bitset<kMaxThreadId> allocated_ids_ GUARDED_BY(Locks::allocated_thread_ids_lock_);
@@ -213,7 +235,10 @@
   // The actual list of all threads.
   std::list<Thread*> list_ GUARDED_BY(Locks::thread_list_lock_);
 
-  // Ongoing suspend all requests, used to ensure threads added to list_ respect SuspendAll.
+  // Ongoing suspend all requests, used to ensure threads added to list_ respect SuspendAll, and
+  // to ensure that only one SuspendAll ot FlipThreadRoots call is active at a time.  The value is
+  // always either 0 or 1. Thread_suspend_count_lock must be held continuously while these two
+  // functions modify suspend counts of all other threads and modify suspend_all_count_ .
   int suspend_all_count_ GUARDED_BY(Locks::thread_suspend_count_lock_);
 
   // Number of threads unregistering, ~ThreadList blocks until this hits 0.
@@ -221,7 +246,7 @@
 
   // Thread suspend time histogram. Only modified when all the threads are suspended, so guarding
   // by mutator lock ensures no thread can read when another thread is modifying it.
-  Histogram<uint64_t> suspend_all_historam_ GUARDED_BY(Locks::mutator_lock_);
+  Histogram<uint64_t> suspend_all_histogram_ GUARDED_BY(Locks::mutator_lock_);
 
   // Whether or not the current thread suspension is long.
   bool long_suspend_;
@@ -237,6 +262,9 @@
 
   friend class Thread;
 
+  friend class Mutex;
+  friend class BaseMutex;
+
   DISALLOW_COPY_AND_ASSIGN(ThreadList);
 };
 
diff --git a/test/129-ThreadGetId/expected-stdout.txt b/test/129-ThreadGetId/expected-stdout.txt
index aadf90d..4455320 100644
--- a/test/129-ThreadGetId/expected-stdout.txt
+++ b/test/129-ThreadGetId/expected-stdout.txt
@@ -1,2 +1,8 @@
+Thread finished
+Thread finished
+Thread finished
+Thread finished
+Thread finished
+All joined
 HeapTaskDaemon depth 0
 Finishing
diff --git a/test/129-ThreadGetId/src/Main.java b/test/129-ThreadGetId/src/Main.java
index 50e8c09..3b4b076 100644
--- a/test/129-ThreadGetId/src/Main.java
+++ b/test/129-ThreadGetId/src/Main.java
@@ -16,23 +16,71 @@
 
 import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class Main implements Runnable {
     static final int NUMBER_OF_THREADS = 5;
-    static final int TOTAL_OPERATIONS = 900;
+    static volatile int ops_per_thread = 1000;
+    static AtomicInteger operations_completed = new AtomicInteger(0);
+    static int[] progress = new int[NUMBER_OF_THREADS];
+    static AtomicInteger totalStackFrames = new AtomicInteger(0);
+    static final boolean printStats = false;  // True causes test to fail.
+    int index;
+
+    Main(int i) {
+        index = i;
+    }
 
     public static void main(String[] args) throws Exception {
         final Thread[] threads = new Thread[NUMBER_OF_THREADS];
+        Thread watchdog = new Thread() {
+            public void run() {
+                try {
+                    if (printStats) {
+                        System.out.println("ops_per_thread = " + ops_per_thread);
+                    }
+                    Thread.sleep(10_000);
+                    if (printStats) {
+                        System.out.println("Ops completed after 10 seconds: " +
+                                operations_completed.get());
+                    }
+                    if (operations_completed.get() < NUMBER_OF_THREADS * ops_per_thread / 2) {
+                        // We're in some sort of "go slow" mode, probably gcstress. Finish early.
+                        ops_per_thread /= 10;
+                    }
+                    if (printStats) {
+                        System.out.println("ops_per_thread = " + ops_per_thread);
+                    }
+                    Thread.sleep(200_000);
+                    System.out.print("Watchdog timed out: ");
+                    for (int i = 0; i < NUMBER_OF_THREADS; ++i) {
+                        System.out.print(progress[i] + ", ");
+                    }
+                    System.out.println("");
+                    System.err.println("Watchdog thread timed out");
+                    System.exit(1);
+                } catch (InterruptedException e) {}
+            }
+        };
+        watchdog.start();
+        long start_millis = System.currentTimeMillis();
         for (int t = 0; t < threads.length; t++) {
-            threads[t] = new Thread(new Main());
+            threads[t] = new Thread(new Main(t));
             threads[t].start();
         }
         for (Thread t : threads) {
             t.join();
         }
+        if (printStats) {
+            long elapsed_millis = System.currentTimeMillis() - start_millis;
+            System.out.println("Captured " + totalStackFrames + " stack frames in " +
+                    elapsed_millis + "msecs");
+        }
+        System.out.println("All joined");
         // Do this test after the other part to leave some time for the heap task daemon to start
         // up.
         test_getStackTraces();
+        watchdog.interrupt();
         System.out.println("Finishing");
     }
 
@@ -46,9 +94,9 @@
             Thread[] array = new Thread[activeCount];
             systemThreadGroup.enumerate(array);
             for (Thread thread : array) {
-               if (thread.getName().equals("HeapTaskDaemon") &&
-                   thread.getState() != Thread.State.NEW) {
-                  return thread;
+                if (thread.getName().equals("HeapTaskDaemon") &&
+                    thread.getState() != Thread.State.NEW) {
+                    return thread;
                 }
             }
             // Yield to eventually get the daemon started.
@@ -83,12 +131,16 @@
             if (thread.getId() <= 0) {
                 System.out.println("thread's ID is not positive: " + thread.getName());
             }
+            totalStackFrames.addAndGet(stMap.get(thread).length);
         }
     }
 
     public void run() {
-        for (int i = 0; i < TOTAL_OPERATIONS; ++i) {
+        for (int i = 1; i <= ops_per_thread; ++i) {
             test_getId();
+            operations_completed.addAndGet(1);
+            progress[index] = i;
         }
+        System.out.println("Thread finished");
     }
 }
diff --git a/test/2011-stack-walk-concurrent-instrument/stack_walk_concurrent.cc b/test/2011-stack-walk-concurrent-instrument/stack_walk_concurrent.cc
index ae1d830..9ae1bed 100644
--- a/test/2011-stack-walk-concurrent-instrument/stack_walk_concurrent.cc
+++ b/test/2011-stack-walk-concurrent-instrument/stack_walk_concurrent.cc
@@ -81,10 +81,8 @@
                                                                    jobject target) {
   while (!instrument_waiting) {
   }
-  bool timed_out = false;
   Thread* other = Runtime::Current()->GetThreadList()->SuspendThreadByPeer(
-      target, SuspendReason::kInternal, &timed_out);
-  CHECK(!timed_out);
+      target, SuspendReason::kInternal);
   CHECK(other != nullptr);
   ScopedSuspendAll ssa(__FUNCTION__);
   Runtime::Current()->GetInstrumentation()->InstrumentThreadStack(other,