Revert "Add MessageQueue for cross-thread communication"
This reverts commit de60ef3f91af06a3b8ef24f4bab5c547dc65e10d.
Reason for revert: bot failures - http://b/174652565
Change-Id: I23dc6d275ce04a8cd78c92728a8d33f66979e481
diff --git a/runtime/Android.bp b/runtime/Android.bp
index 3b04177..30473d9 100644
--- a/runtime/Android.bp
+++ b/runtime/Android.bp
@@ -651,7 +651,6 @@
"arch/x86/instruction_set_features_x86_test.cc",
"arch/x86_64/instruction_set_features_x86_64_test.cc",
"barrier_test.cc",
- "base/message_queue_test.cc",
"base/mutex_test.cc",
"base/timing_logger_test.cc",
"cha_test.cc",
diff --git a/runtime/base/message_queue.h b/runtime/base/message_queue.h
deleted file mode 100644
index 2c02b2a..0000000
--- a/runtime/base/message_queue.h
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Copyright (C) 2020 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
-#define ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
-
-#include <deque>
-#include <optional>
-#include <variant>
-
-#include "base/time_utils.h"
-#include "mutex.h"
-#include "thread.h"
-
-namespace art {
-
-struct TimeoutExpiredMessage {};
-
-// MessageQueue is an unbounded multiple producer, multiple consumer (MPMC) queue that can be
-// specialized to send messages between threads. The queue is parameterized by a set of types that
-// serve as the message types. Note that messages are passed by value, so smaller messages should be
-// used when possible.
-//
-// Example:
-//
-// struct IntMessage { int value; };
-// struct DoubleMessage { double value; };
-//
-// MessageQueue<IntMessage, DoubleMessage> queue;
-//
-// queue.SendMessage(IntMessage{42});
-// queue.SendMessage(DoubleMessage{42.0});
-//
-// auto message = queue.ReceiveMessage(); // message is a std::variant of the different
-// // message types.
-//
-// if (std::holds_alternative<IntMessage>(message)) {
-// cout << "Received int message with value " << std::get<IntMessage>(message) << "\n";
-// }
-//
-// The message queue also supports a special timeout message. This is scheduled to be sent by the
-// SetTimeout method, which will cause the MessageQueue to deliver a TimeoutExpiredMessage after the
-// time period has elapsed. Note that only one timeout can be active can be active at a time, and
-// subsequent calls to SetTimeout will overwrite any existing timeout.
-//
-// Example:
-//
-// queue.SetTimeout(5000); // request to send TimeoutExpiredMessage in 5000ms.
-//
-// auto message = queue.ReceiveMessage(); // blocks for 5000ms and returns
-// // TimeoutExpiredMessage
-//
-// Note additional messages can be sent in the meantime and a ReceiveMessage call will wake up to
-// return that message. The TimeoutExpiredMessage will still be sent at the right time.
-//
-// Finally, MessageQueue has a SwitchReceive method that can be used to run different code depending
-// on the type of message received. SwitchReceive takes a set of lambda expressions that take one
-// argument of one of the allowed message types. An additional lambda expression that takes a single
-// auto argument can be used to serve as a catch-all case.
-//
-// Example:
-//
-// queue.SwitchReceive(
-// [&](IntMessage message) {
-// cout << "Received int: " << message.value << "\n";
-// },
-// [&](DoubleMessage message) {
-// cout << "Received double: " << message.value << "\n";
-// },
-// [&](auto other_message) {
-// // Another message was received. In this case, it's TimeoutExpiredMessage.
-// }
-// )
-//
-// For additional examples, see message_queue_test.cc.
-template <typename... MessageTypes>
-class MessageQueue {
- public:
- using Message = std::variant<TimeoutExpiredMessage, MessageTypes...>;
-
- // Adds a message to the message queue, which can later be received with ReceiveMessage. See class
- // comment for more details.
- void SendMessage(Message message) {
- // TimeoutExpiredMessage should not be sent manually.
- DCHECK(!std::holds_alternative<TimeoutExpiredMessage>(message));
- Thread* self = Thread::Current();
- MutexLock lock{self, mutex_};
- messages_.push_back(message);
- cv_.Signal(self);
- }
-
- // Schedule a TimeoutExpiredMessage to be delivered in timeout_milliseconds. See class comment for
- // more details.
- void SetTimeout(uint64_t timeout_milliseconds) {
- Thread* self = Thread::Current();
- MutexLock lock{self, mutex_};
- deadline_milliseconds_ = timeout_milliseconds + MilliTime();
- cv_.Signal(self);
- }
-
- // Remove and return a message from the queue. If no message is available, ReceiveMessage will
- // block until one becomes available. See class comment for more details.
- Message ReceiveMessage() {
- Thread* self = Thread::Current();
- MutexLock lock{self, mutex_};
-
- // Loop until we receive a message
- while (true) {
- // First check if the deadline has passed.
- if (deadline_milliseconds_.has_value() && deadline_milliseconds_.value() < MilliTime()) {
- deadline_milliseconds_.reset();
- return TimeoutExpiredMessage{};
- }
-
- // Check if there is a message in the queue.
- if (messages_.size() > 0) {
- Message message = messages_.front();
- messages_.pop_front();
- return message;
- }
-
- // Otherwise, wait until we have a message or a timeout.
- if (deadline_milliseconds_.has_value()) {
- uint64_t timeout = deadline_milliseconds_.value() - MilliTime();
- cv_.TimedWait(self, timeout, /*ns=*/0);
- } else {
- cv_.Wait(self);
- }
- }
- }
-
- // Waits for a message and applies the appropriate function argument to the received message. See
- // class comment for more details.
- template <typename ReturnType = void, typename... Fn>
- ReturnType SwitchReceive(Fn... case_fn) {
- struct Matcher : Fn... {
- using Fn::operator()...;
- } matcher{case_fn...};
- return std::visit(matcher, ReceiveMessage());
- }
-
- private:
- Mutex mutex_{"MessageQueue Mutex"};
- ConditionVariable cv_{"MessageQueue ConditionVariable", mutex_};
-
- std::deque<Message> messages_;
- std::optional<uint64_t> deadline_milliseconds_;
-};
-
-} // namespace art
-
-#endif // ART_RUNTIME_BASE_MESSAGE_QUEUE_H_
diff --git a/runtime/base/message_queue_test.cc b/runtime/base/message_queue_test.cc
deleted file mode 100644
index c3e5858..0000000
--- a/runtime/base/message_queue_test.cc
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Copyright (C) 2020 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "message_queue.h"
-
-#include <thread>
-
-#include "common_runtime_test.h"
-#include "thread-current-inl.h"
-
-namespace art {
-
-class MessageQueueTest : public CommonRuntimeTest {};
-
-namespace {
-
-// Define some message types
-struct EmptyMessage {};
-struct IntMessage {
- int value;
-};
-struct OtherIntMessage {
- int other_value;
-};
-struct TwoIntMessage {
- int value1;
- int value2;
-};
-struct StringMessage {
- std::string message;
-};
-
-using TestMessageQueue =
- MessageQueue<EmptyMessage, IntMessage, OtherIntMessage, TwoIntMessage, StringMessage>;
-
-} // namespace
-
-TEST_F(MessageQueueTest, SendReceiveTest) {
- TestMessageQueue queue;
-
- queue.SendMessage(EmptyMessage{});
- ASSERT_TRUE(std::holds_alternative<EmptyMessage>(queue.ReceiveMessage()));
-
- queue.SendMessage(IntMessage{42});
- ASSERT_TRUE(std::holds_alternative<IntMessage>(queue.ReceiveMessage()));
-
- queue.SendMessage(OtherIntMessage{43});
- ASSERT_TRUE(std::holds_alternative<OtherIntMessage>(queue.ReceiveMessage()));
-
- queue.SendMessage(TwoIntMessage{1, 2});
- ASSERT_TRUE(std::holds_alternative<TwoIntMessage>(queue.ReceiveMessage()));
-
- queue.SendMessage(StringMessage{"Hello, World!"});
- ASSERT_TRUE(std::holds_alternative<StringMessage>(queue.ReceiveMessage()));
-}
-
-TEST_F(MessageQueueTest, TestTimeout) {
- TestMessageQueue queue;
-
- constexpr uint64_t kDuration = 500;
-
- const auto start = MilliTime();
- queue.SetTimeout(kDuration);
- ASSERT_TRUE(std::holds_alternative<TimeoutExpiredMessage>(queue.ReceiveMessage()));
- const auto elapsed = MilliTime() - start;
-
- ASSERT_GT(elapsed, kDuration);
-}
-
-TEST_F(MessageQueueTest, TwoWayMessaging) {
- TestMessageQueue queue1;
- TestMessageQueue queue2;
-
- std::thread thread{[&]() {
- // Tell the parent thread we are running.
- queue1.SendMessage(EmptyMessage{});
-
- // Wait for a message from the parent thread.
- queue2.ReceiveMessage();
- }};
-
- queue1.ReceiveMessage();
- queue2.SendMessage(EmptyMessage{});
-
- thread.join();
-}
-
-TEST_F(MessageQueueTest, SwitchReceiveTest) {
- TestMessageQueue queue;
-
- queue.SendMessage(EmptyMessage{});
- queue.SendMessage(IntMessage{42});
- queue.SendMessage(OtherIntMessage{43});
- queue.SendMessage(TwoIntMessage{1, 2});
- queue.SendMessage(StringMessage{"Hello, World!"});
- queue.SetTimeout(500);
-
- bool empty_received = false;
- bool int_received = false;
- bool other_int_received = false;
- bool two_int_received = false;
- bool string_received = false;
- bool timeout_received = false;
-
- while (!(empty_received && int_received && other_int_received && two_int_received &&
- string_received && timeout_received)) {
- queue.SwitchReceive(
- [&]([[maybe_unused]] const EmptyMessage& message) {
- ASSERT_FALSE(empty_received);
- empty_received = true;
- },
- [&](const IntMessage& message) {
- ASSERT_FALSE(int_received);
- int_received = true;
-
- ASSERT_EQ(message.value, 42);
- },
- [&](const OtherIntMessage& message) {
- ASSERT_FALSE(other_int_received);
- other_int_received = true;
-
- ASSERT_EQ(message.other_value, 43);
- },
- // The timeout message is here to make sure the cases can go in any order
- [&]([[maybe_unused]] const TimeoutExpiredMessage& message) {
- ASSERT_FALSE(timeout_received);
- timeout_received = true;
- },
- [&](const TwoIntMessage& message) {
- ASSERT_FALSE(two_int_received);
- two_int_received = true;
-
- ASSERT_EQ(message.value1, 1);
- ASSERT_EQ(message.value2, 2);
- },
- [&](const StringMessage& message) {
- ASSERT_FALSE(string_received);
- string_received = true;
-
- ASSERT_EQ(message.message, "Hello, World!");
- });
- }
-}
-
-TEST_F(MessageQueueTest, SwitchReceiveAutoTest) {
- TestMessageQueue queue;
-
- queue.SendMessage(EmptyMessage{});
- queue.SendMessage(IntMessage{42});
- queue.SendMessage(OtherIntMessage{43});
- queue.SendMessage(TwoIntMessage{1, 2});
- queue.SendMessage(StringMessage{"Hello, World!"});
- queue.SetTimeout(500);
-
- int pending_messages = 6;
-
- while (pending_messages > 0) {
- queue.SwitchReceive([&]([[maybe_unused]] auto message) { pending_messages--; });
- }
-}
-
-TEST_F(MessageQueueTest, SwitchReceivePartialAutoTest) {
- TestMessageQueue queue;
-
- queue.SendMessage(EmptyMessage{});
- queue.SendMessage(IntMessage{42});
- queue.SendMessage(OtherIntMessage{43});
- queue.SendMessage(TwoIntMessage{1, 2});
- queue.SendMessage(StringMessage{"Hello, World!"});
- queue.SetTimeout(500);
-
- bool running = true;
- while (running) {
- queue.SwitchReceive(
- [&](const StringMessage& message) {
- ASSERT_EQ(message.message, "Hello, World!");
- running = false;
- },
- [&]([[maybe_unused]] const auto& message) {
- const bool is_string{std::is_same<StringMessage, decltype(message)>()};
- ASSERT_FALSE(is_string);
- });
- }
-}
-
-TEST_F(MessageQueueTest, SwitchReceiveReturn) {
- TestMessageQueue queue;
-
- queue.SendMessage(EmptyMessage{});
-
- ASSERT_TRUE(
- queue.SwitchReceive<bool>([&]([[maybe_unused]] const EmptyMessage& message) { return true; },
- [&]([[maybe_unused]] const auto& message) { return false; }));
-
- queue.SendMessage(IntMessage{42});
-
- ASSERT_FALSE(
- queue.SwitchReceive<bool>([&]([[maybe_unused]] const EmptyMessage& message) { return true; },
- [&]([[maybe_unused]] const auto& message) { return false; }));
-}
-
-TEST_F(MessageQueueTest, ReceiveInOrder) {
- TestMessageQueue queue;
-
- std::vector<TestMessageQueue::Message> messages{
- EmptyMessage{},
- IntMessage{42},
- OtherIntMessage{43},
- TwoIntMessage{1, 2},
- StringMessage{"Hello, World!"},
- };
-
- // Send the messages
- for (const auto& message : messages) {
- queue.SendMessage(message);
- }
- queue.SetTimeout(500);
-
- // Receive the messages. Make sure they came in order, except for the TimeoutExpiredMessage, which
- // can come at any time.
- bool received_timeout = false;
- for (const auto& expected : messages) {
- auto message = queue.ReceiveMessage();
- if (std::holds_alternative<TimeoutExpiredMessage>(message)) {
- ASSERT_FALSE(received_timeout);
- received_timeout = true;
- } else {
- ASSERT_EQ(message.index(), expected.index());
- }
- }
- if (!received_timeout) {
- // If we have not received the timeout yet, receive one more message and make sure it's the
- // timeout.
- ASSERT_TRUE(std::holds_alternative<TimeoutExpiredMessage>(queue.ReceiveMessage()));
- }
-}
-
-} // namespace art