adb: guarantee that fdevent_run_on_main_thread happens last.

Make it so that we handle run_on_main_thread calls after regular socket
events, so that we can use it as a way to ensure we've processed all
pending socket events.

Test: adb_test
Test: wine adb_test.exe
Change-Id: Ic215c7fed19a8e1699e759970658b3775aa08c45
diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp
index 9776c1b..cf441cf 100644
--- a/adb/fdevent.cpp
+++ b/adb/fdevent.cpp
@@ -75,6 +75,7 @@
 static bool main_thread_valid;
 static uint64_t main_thread_id;
 
+static bool run_needs_flush = false;
 static auto& run_queue_notify_fd = *new unique_fd();
 static auto& run_queue_mutex = *new std::mutex();
 static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::deque<std::function<void()>>();
@@ -317,7 +318,8 @@
         PLOG(FATAL) << "failed to empty run queue notify fd";
     }
 
-    fdevent_run_flush();
+    // Mark that we need to flush, and then run it at the end of fdevent_loop.
+    run_needs_flush = true;
 }
 
 static void fdevent_run_setup() {
@@ -378,6 +380,11 @@
             g_pending_list.pop_front();
             fdevent_call_fdfunc(fde);
         }
+
+        if (run_needs_flush) {
+            fdevent_run_flush();
+            run_needs_flush = false;
+        }
     }
 }
 
diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h
index 1a2d41c..15ded21 100644
--- a/adb/fdevent_test.h
+++ b/adb/fdevent_test.h
@@ -21,6 +21,24 @@
 #include "socket.h"
 #include "sysdeps.h"
 
+static void WaitForFdeventLoop() {
+    // Sleep for a bit to make sure that network events have propagated.
+    std::this_thread::sleep_for(100ms);
+
+    // fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after
+    // socket events, so as soon as our function is called, we know that we've processed all
+    // previous events.
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::unique_lock<std::mutex> lock(mutex);
+    fdevent_run_on_main_thread([&]() {
+        mutex.lock();
+        mutex.unlock();
+        cv.notify_one();
+    });
+    cv.wait(lock);
+}
+
 class FdeventTest : public ::testing::Test {
   protected:
     int dummy = -1;
diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp
index f587fdb..068c175 100644
--- a/adb/socket_test.cpp
+++ b/adb/socket_test.cpp
@@ -42,10 +42,6 @@
 
 class LocalSocketTest : public FdeventTest {};
 
-static void WaitForFdeventLoop() {
-    std::this_thread::sleep_for(100ms);
-}
-
 TEST_F(LocalSocketTest, smoke) {
     // Join two socketpairs with a chain of intermediate socketpairs.
     int first[2];