Revert^2 "Add MessageQueue for cross-thread communication"

This reverts commit c7edde06dc2cfb90114898e8de82e0e1287c2885.

Reason for revert: fixing bugs.

There were two issues. The first was a logic error in
MessageQueueTest::ReceiveInOrder.  The test would allow any message to
be replaced by a timeout message, but then the sequence would be out of
sync. The change now explicitly advances the message pointer, which
means the timeout message does not effectively replace an expected
message.

The seccond issue was one where MessageQueue may try to call
ConditionVariable::TimedWait with a negative timeout if the deadline had
already passed, which fails with an EINVAL error during the call to
futex. The code in MessageQueue::ReceiveMessage now checks for negative
timeouts before calling TimedWait.

Test: m test-art-host-gtest-art_runtime_tests
Bug: 174652565
Change-Id: I107af849e2a01727719f7662a7685d989b42d176
diff --git a/runtime/Android.bp b/runtime/Android.bp
index 3943836..7e0e282 100644
--- a/runtime/Android.bp
+++ b/runtime/Android.bp
@@ -651,6 +651,7 @@
         "arch/x86/instruction_set_features_x86_test.cc",
         "arch/x86_64/instruction_set_features_x86_64_test.cc",
         "barrier_test.cc",
+        "base/message_queue_test.cc",
         "base/mutex_test.cc",
         "base/timing_logger_test.cc",
         "cha_test.cc",
diff --git a/runtime/base/message_queue.h b/runtime/base/message_queue.h
new file mode 100644
index 0000000..b0f1460
--- /dev/null
+++ b/runtime/base/message_queue.h
@@ -0,0 +1,172 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
+#define ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
+
+#include <deque>
+#include <optional>
+#include <variant>
+
+#include "base/time_utils.h"
+#include "mutex.h"
+#include "thread.h"
+
+#pragma clang diagnostic push
+#pragma clang diagnostic error "-Wconversion"
+
+namespace art {
+
+struct TimeoutExpiredMessage {};
+
+// MessageQueue is an unbounded multiple producer, multiple consumer (MPMC) queue that can be
+// specialized to send messages between threads. The queue is parameterized by a set of types that
+// serve as the message types. Note that messages are passed by value, so smaller messages should be
+// used when possible.
+//
+// Example:
+//
+//     struct IntMessage { int value; };
+//     struct DoubleMessage { double value; };
+//
+//     MessageQueue<IntMessage, DoubleMessage> queue;
+//
+//     queue.SendMessage(IntMessage{42});
+//     queue.SendMessage(DoubleMessage{42.0});
+//
+//     auto message = queue.ReceiveMessage();  // message is a std::variant of the different
+//                                             // message types.
+//
+//     if (std::holds_alternative<IntMessage>(message)) {
+//       cout << "Received int message with value " << std::get<IntMessage>(message) << "\n";
+//     }
+//
+// The message queue also supports a special timeout message. This is scheduled to be sent by the
+// SetTimeout method, which will cause the MessageQueue to deliver a TimeoutExpiredMessage after the
+// time period has elapsed. Note that only one timeout can be active can be active at a time, and
+// subsequent calls to SetTimeout will overwrite any existing timeout.
+//
+// Example:
+//
+//     queue.SetTimeout(5000);  // request to send TimeoutExpiredMessage in 5000ms.
+//
+//     auto message = queue.ReceiveMessage();  // blocks for 5000ms and returns
+//                                             // TimeoutExpiredMessage
+//
+// Note additional messages can be sent in the meantime and a ReceiveMessage call will wake up to
+// return that message. The TimeoutExpiredMessage will still be sent at the right time.
+//
+// Finally, MessageQueue has a SwitchReceive method that can be used to run different code depending
+// on the type of message received. SwitchReceive takes a set of lambda expressions that take one
+// argument of one of the allowed message types. An additional lambda expression that takes a single
+// auto argument can be used to serve as a catch-all case.
+//
+// Example:
+//
+//     queue.SwitchReceive(
+//       [&](IntMessage message) {
+//         cout << "Received int: " << message.value << "\n";
+//       },
+//       [&](DoubleMessage message) {
+//         cout << "Received double: " << message.value << "\n";
+//       },
+//       [&](auto other_message) {
+//         // Another message was received. In this case, it's TimeoutExpiredMessage.
+//       }
+//     )
+//
+// For additional examples, see message_queue_test.cc.
+template <typename... MessageTypes>
+class MessageQueue {
+ public:
+  using Message = std::variant<TimeoutExpiredMessage, MessageTypes...>;
+
+  // Adds a message to the message queue, which can later be received with ReceiveMessage. See class
+  // comment for more details.
+  void SendMessage(Message message) {
+    // TimeoutExpiredMessage should not be sent manually.
+    DCHECK(!std::holds_alternative<TimeoutExpiredMessage>(message));
+    Thread* self = Thread::Current();
+    MutexLock lock{self, mutex_};
+    messages_.push_back(message);
+    cv_.Signal(self);
+  }
+
+  // Schedule a TimeoutExpiredMessage to be delivered in timeout_milliseconds. See class comment for
+  // more details.
+  void SetTimeout(uint64_t timeout_milliseconds) {
+    Thread* self = Thread::Current();
+    MutexLock lock{self, mutex_};
+    deadline_milliseconds_ = timeout_milliseconds + MilliTime();
+    cv_.Signal(self);
+  }
+
+  // Remove and return a message from the queue. If no message is available, ReceiveMessage will
+  // block until one becomes available. See class comment for more details.
+  Message ReceiveMessage() {
+    Thread* self = Thread::Current();
+    MutexLock lock{self, mutex_};
+
+    // Loop until we receive a message
+    while (true) {
+      uint64_t const current_time = MilliTime();
+      // First check if the deadline has passed.
+      if (deadline_milliseconds_.has_value() && deadline_milliseconds_.value() < current_time) {
+        deadline_milliseconds_.reset();
+        return TimeoutExpiredMessage{};
+      }
+
+      // Check if there is a message in the queue.
+      if (messages_.size() > 0) {
+        Message message = messages_.front();
+        messages_.pop_front();
+        return message;
+      }
+
+      // Otherwise, wait until we have a message or a timeout.
+      if (deadline_milliseconds_.has_value()) {
+        DCHECK_LE(current_time, deadline_milliseconds_.value());
+        int64_t timeout = static_cast<int64_t>(deadline_milliseconds_.value() - current_time);
+        cv_.TimedWait(self, timeout, /*ns=*/0);
+      } else {
+        cv_.Wait(self);
+      }
+    }
+  }
+
+  // Waits for a message and applies the appropriate function argument to the received message. See
+  // class comment for more details.
+  template <typename ReturnType = void, typename... Fn>
+  ReturnType SwitchReceive(Fn... case_fn) {
+    struct Matcher : Fn... {
+      using Fn::operator()...;
+    } matcher{case_fn...};
+    return std::visit(matcher, ReceiveMessage());
+  }
+
+ private:
+  Mutex mutex_{"MessageQueue Mutex"};
+  ConditionVariable cv_{"MessageQueue ConditionVariable", mutex_};
+
+  std::deque<Message> messages_ GUARDED_BY(mutex_);
+  std::optional<uint64_t> deadline_milliseconds_ GUARDED_BY(mutex_);
+};
+
+}  // namespace art
+
+#pragma clang diagnostic pop  // -Wconversion
+
+#endif  // ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
diff --git a/runtime/base/message_queue_test.cc b/runtime/base/message_queue_test.cc
new file mode 100644
index 0000000..7a788a9
--- /dev/null
+++ b/runtime/base/message_queue_test.cc
@@ -0,0 +1,253 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "message_queue.h"
+
+#include <thread>
+
+#include "common_runtime_test.h"
+#include "thread-current-inl.h"
+
+namespace art {
+
+class MessageQueueTest : public CommonRuntimeTest {};
+
+namespace {
+
+// Define some message types
+struct EmptyMessage {};
+struct IntMessage {
+  int value;
+};
+struct OtherIntMessage {
+  int other_value;
+};
+struct TwoIntMessage {
+  int value1;
+  int value2;
+};
+struct StringMessage {
+  std::string message;
+};
+
+using TestMessageQueue =
+    MessageQueue<EmptyMessage, IntMessage, OtherIntMessage, TwoIntMessage, StringMessage>;
+
+}  // namespace
+
+TEST_F(MessageQueueTest, SendReceiveTest) {
+  TestMessageQueue queue;
+
+  queue.SendMessage(EmptyMessage{});
+  ASSERT_TRUE(std::holds_alternative<EmptyMessage>(queue.ReceiveMessage()));
+
+  queue.SendMessage(IntMessage{42});
+  ASSERT_TRUE(std::holds_alternative<IntMessage>(queue.ReceiveMessage()));
+
+  queue.SendMessage(OtherIntMessage{43});
+  ASSERT_TRUE(std::holds_alternative<OtherIntMessage>(queue.ReceiveMessage()));
+
+  queue.SendMessage(TwoIntMessage{1, 2});
+  ASSERT_TRUE(std::holds_alternative<TwoIntMessage>(queue.ReceiveMessage()));
+
+  queue.SendMessage(StringMessage{"Hello, World!"});
+  ASSERT_TRUE(std::holds_alternative<StringMessage>(queue.ReceiveMessage()));
+}
+
+TEST_F(MessageQueueTest, TestTimeout) {
+  TestMessageQueue queue;
+
+  constexpr uint64_t kDuration = 500;
+
+  const auto start = MilliTime();
+  queue.SetTimeout(kDuration);
+  ASSERT_TRUE(std::holds_alternative<TimeoutExpiredMessage>(queue.ReceiveMessage()));
+  const auto elapsed = MilliTime() - start;
+
+  ASSERT_GT(elapsed, kDuration);
+}
+
+TEST_F(MessageQueueTest, TwoWayMessaging) {
+  TestMessageQueue queue1;
+  TestMessageQueue queue2;
+
+  std::thread thread{[&]() {
+    // Tell the parent thread we are running.
+    queue1.SendMessage(EmptyMessage{});
+
+    // Wait for a message from the parent thread.
+    queue2.ReceiveMessage();
+  }};
+
+  queue1.ReceiveMessage();
+  queue2.SendMessage(EmptyMessage{});
+
+  thread.join();
+}
+
+TEST_F(MessageQueueTest, SwitchReceiveTest) {
+  TestMessageQueue queue;
+
+  queue.SendMessage(EmptyMessage{});
+  queue.SendMessage(IntMessage{42});
+  queue.SendMessage(OtherIntMessage{43});
+  queue.SendMessage(TwoIntMessage{1, 2});
+  queue.SendMessage(StringMessage{"Hello, World!"});
+  queue.SetTimeout(500);
+
+  bool empty_received = false;
+  bool int_received = false;
+  bool other_int_received = false;
+  bool two_int_received = false;
+  bool string_received = false;
+  bool timeout_received = false;
+
+  while (!(empty_received && int_received && other_int_received && two_int_received &&
+           string_received && timeout_received)) {
+    queue.SwitchReceive(
+        [&]([[maybe_unused]] const EmptyMessage& message) {
+          ASSERT_FALSE(empty_received);
+          empty_received = true;
+        },
+        [&](const IntMessage& message) {
+          ASSERT_FALSE(int_received);
+          int_received = true;
+
+          ASSERT_EQ(message.value, 42);
+        },
+        [&](const OtherIntMessage& message) {
+          ASSERT_FALSE(other_int_received);
+          other_int_received = true;
+
+          ASSERT_EQ(message.other_value, 43);
+        },
+        // The timeout message is here to make sure the cases can go in any order
+        [&]([[maybe_unused]] const TimeoutExpiredMessage& message) {
+          ASSERT_FALSE(timeout_received);
+          timeout_received = true;
+        },
+        [&](const TwoIntMessage& message) {
+          ASSERT_FALSE(two_int_received);
+          two_int_received = true;
+
+          ASSERT_EQ(message.value1, 1);
+          ASSERT_EQ(message.value2, 2);
+        },
+        [&](const StringMessage& message) {
+          ASSERT_FALSE(string_received);
+          string_received = true;
+
+          ASSERT_EQ(message.message, "Hello, World!");
+        });
+  }
+}
+
+TEST_F(MessageQueueTest, SwitchReceiveAutoTest) {
+  TestMessageQueue queue;
+
+  queue.SendMessage(EmptyMessage{});
+  queue.SendMessage(IntMessage{42});
+  queue.SendMessage(OtherIntMessage{43});
+  queue.SendMessage(TwoIntMessage{1, 2});
+  queue.SendMessage(StringMessage{"Hello, World!"});
+  queue.SetTimeout(500);
+
+  int pending_messages = 6;
+
+  while (pending_messages > 0) {
+    queue.SwitchReceive([&]([[maybe_unused]] auto message) { pending_messages--; });
+  }
+}
+
+TEST_F(MessageQueueTest, SwitchReceivePartialAutoTest) {
+  TestMessageQueue queue;
+
+  queue.SendMessage(EmptyMessage{});
+  queue.SendMessage(IntMessage{42});
+  queue.SendMessage(OtherIntMessage{43});
+  queue.SendMessage(TwoIntMessage{1, 2});
+  queue.SendMessage(StringMessage{"Hello, World!"});
+  queue.SetTimeout(500);
+
+  bool running = true;
+  while (running) {
+    queue.SwitchReceive(
+        [&](const StringMessage& message) {
+          ASSERT_EQ(message.message, "Hello, World!");
+          running = false;
+        },
+        [&]([[maybe_unused]] const auto& message) {
+          const bool is_string{std::is_same<StringMessage, decltype(message)>()};
+          ASSERT_FALSE(is_string);
+        });
+  }
+}
+
+TEST_F(MessageQueueTest, SwitchReceiveReturn) {
+  TestMessageQueue queue;
+
+  queue.SendMessage(EmptyMessage{});
+
+  ASSERT_TRUE(
+      queue.SwitchReceive<bool>([&]([[maybe_unused]] const EmptyMessage& message) { return true; },
+                                [&]([[maybe_unused]] const auto& message) { return false; }));
+
+  queue.SendMessage(IntMessage{42});
+
+  ASSERT_FALSE(
+      queue.SwitchReceive<bool>([&]([[maybe_unused]] const EmptyMessage& message) { return true; },
+                                [&]([[maybe_unused]] const auto& message) { return false; }));
+}
+
+TEST_F(MessageQueueTest, ReceiveInOrder) {
+  TestMessageQueue queue;
+
+  std::vector<TestMessageQueue::Message> messages{
+      EmptyMessage{},
+      IntMessage{42},
+      OtherIntMessage{43},
+      TwoIntMessage{1, 2},
+      StringMessage{"Hello, World!"},
+  };
+
+  // Send the messages
+  for (const auto& message : messages) {
+    queue.SendMessage(message);
+  }
+  queue.SetTimeout(500);
+
+  // Receive the messages. Make sure they came in order, except for the TimeoutExpiredMessage, which
+  // can come at any time.
+  bool received_timeout = false;
+  size_t i = 0;
+  while (i < messages.size()) {
+    auto message = queue.ReceiveMessage();
+    if (std::holds_alternative<TimeoutExpiredMessage>(message)) {
+      ASSERT_FALSE(received_timeout);
+      received_timeout = true;
+    } else {
+      ASSERT_EQ(message.index(), messages[i].index());
+      i++;
+    }
+  }
+  if (!received_timeout) {
+    // If we have not received the timeout yet, receive one more message and make sure it's the
+    // timeout.
+    ASSERT_TRUE(std::holds_alternative<TimeoutExpiredMessage>(queue.ReceiveMessage()));
+  }
+}
+
+}  // namespace art