summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Hansong Zhang <hsz@google.com> 2018-12-12 14:52:00 -0800
committer Hansong Zhang <hsz@google.com> 2019-02-25 18:31:29 -0800
commit8bb04bb877a35b37202078ecf8ea8daee2fdfed7 (patch)
tree57bceed6b527f7a25d6a82437bf718f5f8157650
parentf8e86f4ce8864b05b1f64829dc30881c2b678ba2 (diff)
Reactor-based threading model
* Use Reactor+Thread (common/thread.h) to replace existing libchrome-based message_loop_thread * Use Handler to implement multiple message queue per thread, by using kernel-based eventfd Test: run unit test, and run benchmark Change-Id: Idd2e4ef99fb9a7b2c0956de0e372c67a1098f1b6
-rw-r--r--.clang-format3
-rw-r--r--system/bta/Android.bp1
-rw-r--r--system/common/Android.bp28
-rw-r--r--system/common/benchmark/thread_performance_benchmark.cc52
-rw-r--r--system/common/handler.cc87
-rw-r--r--system/common/handler.h59
-rw-r--r--system/common/handler_unittest.cc71
-rw-r--r--system/common/reactor.cc212
-rw-r--r--system/common/reactor.h78
-rw-r--r--system/common/reactor_unittest.cc283
-rw-r--r--system/common/thread.cc84
-rw-r--r--system/common/thread.h75
-rw-r--r--system/common/thread_unittest.cc99
-rw-r--r--system/common/utils.h31
-rw-r--r--system/stack/Android.bp1
-rwxr-xr-xsystem/test/gen_coverage.py5
16 files changed, 1165 insertions, 4 deletions
diff --git a/.clang-format b/.clang-format
index fc5f5fe2bf..1f1f586184 100644
--- a/.clang-format
+++ b/.clang-format
@@ -23,3 +23,6 @@ BasedOnStyle: Google
CommentPragmas: NOLINT:.*
DerivePointerAlignment: false
ColumnLimit: 120
+AllowShortFunctionsOnASingleLine: Empty
+ConstructorInitializerAllOnOneLineOrOnePerLine: false
+BreakConstructorInitializers: BeforeColon
diff --git a/system/bta/Android.bp b/system/bta/Android.bp
index 71652c5734..5826885a14 100644
--- a/system/bta/Android.bp
+++ b/system/bta/Android.bp
@@ -129,6 +129,7 @@ cc_test {
"test/gatt/database_test.cc",
],
shared_libs: [
+ "libcrypto",
"liblog",
"libprotobuf-cpp-lite",
],
diff --git a/system/common/Android.bp b/system/common/Android.bp
index 6312ebd605..e0c1bfb40f 100644
--- a/system/common/Android.bp
+++ b/system/common/Android.bp
@@ -1,6 +1,9 @@
cc_library_static {
name: "libbt-common",
- defaults: ["fluoride_defaults"],
+ defaults: [
+ "fluoride_defaults",
+ "clang_file_coverage",
+ ],
host_supported: true,
include_dirs: [
"packages/modules/Bluetooth/system",
@@ -8,10 +11,13 @@ cc_library_static {
],
srcs: [
"address_obfuscator.cc",
+ "handler.cc",
"message_loop_thread.cc",
"metrics.cc",
"once_timer.cc",
+ "reactor.cc",
"repeating_timer.cc",
+ "thread.cc",
"time_util.cc",
],
shared_libs: [
@@ -25,7 +31,10 @@ cc_library_static {
cc_test {
name: "bluetooth_test_common",
test_suites: ["device-tests"],
- defaults: ["fluoride_defaults"],
+ defaults: [
+ "fluoride_defaults",
+ "clang_coverage_bin",
+ ],
host_supported: true,
include_dirs: [
"packages/modules/Bluetooth/system",
@@ -33,12 +42,15 @@ cc_test {
],
srcs : [
"address_obfuscator_unittest.cc",
+ "handler_unittest.cc",
"leaky_bonded_queue_unittest.cc",
"message_loop_thread_unittest.cc",
"metrics_unittest.cc",
"once_timer_unittest.cc",
+ "reactor_unittest.cc",
"repeating_timer_unittest.cc",
"state_machine_unittest.cc",
+ "thread_unittest.cc",
"time_util_unittest.cc",
"id_generator_unittest.cc",
],
@@ -77,12 +89,16 @@ cc_test {
cc_benchmark {
name: "bluetooth_benchmark_thread_performance",
- defaults: ["fluoride_defaults"],
+ defaults: [
+ "fluoride_defaults",
+ ],
+ host_supported: true,
include_dirs: ["packages/modules/Bluetooth/system"],
srcs: [
"benchmark/thread_performance_benchmark.cc",
],
shared_libs: [
+ "libcrypto",
"liblog",
],
static_libs: [
@@ -93,13 +109,17 @@ cc_benchmark {
cc_benchmark {
name: "bluetooth_benchmark_timer_performance",
- defaults: ["fluoride_defaults"],
+ defaults: [
+ "fluoride_defaults",
+ ],
+ host_supported: false,
include_dirs: ["packages/modules/Bluetooth/system"],
srcs: [
"benchmark/timer_performance_benchmark.cc",
],
shared_libs: [
"liblog",
+ "libcrypto",
"libprotobuf-cpp-lite",
"libcutils",
],
diff --git a/system/common/benchmark/thread_performance_benchmark.cc b/system/common/benchmark/thread_performance_benchmark.cc
index 74f157f85b..01f332fe07 100644
--- a/system/common/benchmark/thread_performance_benchmark.cc
+++ b/system/common/benchmark/thread_performance_benchmark.cc
@@ -23,12 +23,16 @@
#include <memory>
#include <thread>
+#include "common/handler.h"
#include "common/message_loop_thread.h"
+#include "common/thread.h"
#include "osi/include/fixed_queue.h"
#include "osi/include/thread.h"
using ::benchmark::State;
+using bluetooth::common::Handler;
using bluetooth::common::MessageLoopThread;
+using bluetooth::common::Thread;
#define NUM_MESSAGES_TO_SEND 100000
@@ -419,6 +423,54 @@ BENCHMARK_F(BM_LibChromeThread, sequential_execution)(State& state) {
}
};
+class BM_ReactorThread : public BM_ThreadPerformance {
+ protected:
+ void SetUp(State& st) override {
+ BM_ThreadPerformance::SetUp(st);
+ std::future<void> set_up_future = set_up_promise_->get_future();
+ thread_ = new Thread("BM_ReactorThread thread", Thread::Priority::NORMAL);
+ handler_ = new Handler(thread_);
+ handler_->Post([this]() { set_up_promise_->set_value(); });
+ set_up_future.wait();
+ }
+
+ void TearDown(State& st) override {
+ delete handler_;
+ handler_ = nullptr;
+ thread_->Stop();
+ delete thread_;
+ thread_ = nullptr;
+ BM_ThreadPerformance::TearDown(st);
+ }
+
+ Thread* thread_ = nullptr;
+ Handler* handler_ = nullptr;
+};
+
+BENCHMARK_F(BM_ReactorThread, batch_enque_dequeue)(State& state) {
+ for (auto _ : state) {
+ g_counter = 0;
+ g_counter_promise = std::make_unique<std::promise<void>>();
+ std::future<void> counter_future = g_counter_promise->get_future();
+ for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
+ fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter);
+ handler_->Post([this]() { callback_batch(bt_msg_queue_, nullptr); });
+ }
+ counter_future.wait();
+ }
+};
+
+BENCHMARK_F(BM_ReactorThread, sequential_execution)(State& state) {
+ for (auto _ : state) {
+ for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
+ g_counter_promise = std::make_unique<std::promise<void>>();
+ std::future<void> counter_future = g_counter_promise->get_future();
+ handler_->Post([]() { callback_sequential(nullptr); });
+ counter_future.wait();
+ }
+ }
+};
+
int main(int argc, char** argv) {
// Disable LOG() output from libchrome
logging::LoggingSettings log_settings;
diff --git a/system/common/handler.cc b/system/common/handler.cc
new file mode 100644
index 0000000000..219dd82dde
--- /dev/null
+++ b/system/common/handler.cc
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "handler.h"
+
+#include <sys/eventfd.h>
+#include <cstring>
+
+#include "base/logging.h"
+
+#include "reactor.h"
+#include "utils.h"
+
+#ifndef EFD_SEMAPHORE
+#define EFD_SEMAPHORE 1
+#endif
+
+namespace bluetooth {
+namespace common {
+
+Handler::Handler(Thread* thread)
+ : thread_(thread),
+ fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) {
+ CHECK_NE(fd_, -1) << __func__ << ": cannot create eventfd: " << strerror(errno);
+
+ reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr);
+}
+
+Handler::~Handler() {
+ thread_->GetReactor()->Unregister(reactable_);
+ reactable_ = nullptr;
+
+ int close_status;
+ RUN_NO_INTR(close_status = close(fd_));
+ CHECK_NE(close_status, -1) << __func__ << ": cannot close eventfd: " << strerror(errno);
+}
+
+void Handler::Post(Closure closure) {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ tasks_.emplace(std::move(closure));
+ }
+ uint64_t val = 1;
+ auto write_result = eventfd_write(fd_, val);
+ CHECK_NE(write_result, -1) << __func__ << ": failed to write: " << strerror(errno);
+}
+
+void Handler::Clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ std::queue<Closure> empty;
+ std::swap(tasks_, empty);
+
+ uint64_t val;
+ while (eventfd_read(fd_, &val) == 0) {
+ }
+}
+
+void Handler::handle_next_event() {
+ Closure closure;
+ uint64_t val = 0;
+ auto read_result = eventfd_read(fd_, &val);
+ CHECK_NE(read_result, -1) << __func__ << ": failed to read fd: " << strerror(errno);
+
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ closure = std::move(tasks_.front());
+ tasks_.pop();
+ }
+ closure();
+}
+
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/handler.h b/system/common/handler.h
new file mode 100644
index 0000000000..71b4dbaff8
--- /dev/null
+++ b/system/common/handler.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <queue>
+
+#include "common/thread.h"
+#include "common/utils.h"
+
+namespace bluetooth {
+namespace common {
+
+// A message-queue style handler for reactor-based thread to handle incoming events from different threads. When it's
+// constructed, it will register a reactable on the specified thread; when it's destroyed, it will unregister itself
+// from the thread.
+class Handler {
+ public:
+ // Create and register a handler on given thread
+ explicit Handler(Thread* thread);
+
+ // Unregister this handler from the thread and release resource. Unhandled events will be discarded and not executed.
+ ~Handler();
+
+ DISALLOW_COPY_AND_ASSIGN(Handler);
+
+ // Enqueue a closure to the queue of this handler
+ void Post(Closure closure);
+
+ // Remove all pending events from the queue of this handler
+ void Clear();
+
+ private:
+ std::queue<Closure> tasks_;
+ Thread* thread_;
+ int fd_;
+ Reactor::Reactable* reactable_;
+ mutable std::mutex mutex_;
+ void handle_next_event();
+};
+
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/handler_unittest.cc b/system/common/handler_unittest.cc
new file mode 100644
index 0000000000..c51e51b52b
--- /dev/null
+++ b/system/common/handler_unittest.cc
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "handler.h"
+
+#include <sys/eventfd.h>
+#include <thread>
+
+#include <gtest/gtest.h>
+#include "base/logging.h"
+
+namespace bluetooth {
+namespace common {
+namespace {
+
+class HandlerTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
+ handler_ = new Handler(thread_);
+ }
+ void TearDown() override {
+ delete handler_;
+ delete thread_;
+ }
+
+ Handler* handler_;
+ Thread* thread_;
+};
+
+TEST_F(HandlerTest, empty) {}
+
+TEST_F(HandlerTest, post_task_invoked) {
+ int val = 0;
+ Closure closure = [&val]() { val++; };
+ handler_->Post(closure);
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ EXPECT_EQ(val, 1);
+}
+
+TEST_F(HandlerTest, post_task_cleared) {
+ int val = 0;
+ Closure closure = [&val]() {
+ val++;
+ std::this_thread::sleep_for(std::chrono::milliseconds(5));
+ };
+ handler_->Post(std::move(closure));
+ closure = []() { LOG(FATAL) << "Should not happen"; };
+ std::this_thread::sleep_for(std::chrono::milliseconds(5));
+ handler_->Post(std::move(closure));
+ handler_->Clear();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ EXPECT_EQ(val, 1);
+}
+
+} // namespace
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/reactor.cc b/system/common/reactor.cc
new file mode 100644
index 0000000000..61cc7d60fe
--- /dev/null
+++ b/system/common/reactor.cc
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "reactor.h"
+
+#include <fcntl.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <unistd.h>
+#include <algorithm>
+#include <cerrno>
+#include <cstring>
+
+#include "base/logging.h"
+
+namespace {
+
+// Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
+constexpr int kEpollMaxEvents = 64;
+
+} // namespace
+
+namespace bluetooth {
+namespace common {
+
+class Reactor::Reactable {
+ public:
+ Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
+ : fd_(fd),
+ on_read_ready_(std::move(on_read_ready)),
+ on_write_ready_(std::move(on_write_ready)),
+ is_executing_(false) {}
+ const int fd_;
+ Closure on_read_ready_;
+ Closure on_write_ready_;
+ bool is_executing_;
+ std::recursive_mutex lock_;
+};
+
+Reactor::Reactor()
+ : epoll_fd_(0),
+ control_fd_(0),
+ is_running_(false),
+ reactable_removed_(false) {
+ RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
+ CHECK_NE(epoll_fd_, -1) << __func__ << ": cannot create epoll_fd: " << strerror(errno);
+
+ control_fd_ = eventfd(0, EFD_NONBLOCK);
+ CHECK_NE(control_fd_, -1) << __func__ << ": cannot create control_fd: " << strerror(errno);
+
+ epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
+ int result;
+ RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
+ CHECK_NE(result, -1) << __func__ << ": cannot register control_fd: " << strerror(errno);
+}
+
+Reactor::~Reactor() {
+ int result;
+ RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
+ CHECK_NE(result, -1) << __func__ << ": cannot unregister control_fd: " << strerror(errno);
+
+ RUN_NO_INTR(result = close(control_fd_));
+ CHECK_NE(result, -1) << __func__ << ": cannot close control_fd: " << strerror(errno);
+
+ RUN_NO_INTR(result = close(epoll_fd_));
+ CHECK_NE(result, -1) << __func__ << ": cannot close epoll_fd: " << strerror(errno);
+}
+
+void Reactor::Run() {
+ bool previously_running = is_running_.exchange(true);
+ CHECK_EQ(previously_running, false) << __func__ << ": already running";
+ LOG(INFO) << __func__ << ": started";
+
+ for (;;) {
+ invalidation_list_.clear();
+ epoll_event events[kEpollMaxEvents];
+ int count;
+ RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, -1));
+ CHECK_NE(count, -1) << __func__ << ": Error polling for fds: " << strerror(errno);
+
+ for (int i = 0; i < count; ++i) {
+ auto event = events[i];
+ CHECK_NE(event.events, 0u) << __func__ << ": no result in epoll result";
+
+ // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
+ if (event.data.ptr == nullptr) {
+ uint64_t value;
+ eventfd_read(control_fd_, &value);
+ LOG(INFO) << __func__ << ": stopped";
+ is_running_ = false;
+ return;
+ }
+ auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ // See if this reactable has been removed in the meantime.
+ if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
+ continue;
+ }
+
+ std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_);
+ lock.unlock();
+ reactable_removed_ = false;
+ reactable->is_executing_ = true;
+ if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && reactable->on_read_ready_ != nullptr) {
+ reactable->on_read_ready_();
+ }
+ if (!reactable_removed_ && event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) {
+ reactable->on_write_ready_();
+ }
+ reactable->is_executing_ = false;
+ }
+ if (reactable_removed_) {
+ delete reactable;
+ }
+ }
+ }
+}
+
+void Reactor::Stop() {
+ if (!is_running_) {
+ LOG(WARNING) << __func__ << ": not running, will stop once it's started";
+ }
+ auto control = eventfd_write(control_fd_, 1);
+ CHECK_NE(control, -1) << __func__ << ": failed: " << strerror(errno);
+}
+
+Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
+ uint32_t poll_event_type = 0;
+ if (on_read_ready != nullptr) {
+ poll_event_type |= (EPOLLIN | EPOLLRDHUP);
+ }
+ if (on_write_ready != nullptr) {
+ poll_event_type |= EPOLLOUT;
+ }
+ auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
+ epoll_event event = {
+ .events = poll_event_type,
+ {.ptr = reactable}
+ };
+ int register_fd;
+ RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
+ CHECK_NE(register_fd, -1) << __func__ << ": failed: " << strerror(errno);
+ return reactable;
+}
+
+void Reactor::Unregister(Reactor::Reactable* reactable) {
+ CHECK_NE(reactable, nullptr);
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ invalidation_list_.push_back(reactable);
+ }
+ {
+ int result;
+ std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_);
+ RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
+ if (result == -1 && errno == ENOENT) {
+ LOG(INFO) << __func__ << ": reactable is invalid or unregistered";
+ } else if (result == -1) {
+ LOG(FATAL) << __func__ << ": failed: " << strerror(errno);
+ }
+ // If we are unregistering during the callback event from this reactable, we delete it after the callback is executed.
+ // reactable->is_executing_ is protected by reactable->lock_, so it's thread safe.
+ if (reactable->is_executing_) {
+ reactable_removed_ = true;
+ }
+ }
+ // If we are unregistering outside of the callback event from this reactable, we delete it now
+ if (!reactable_removed_) {
+ delete reactable;
+ }
+}
+
+void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) {
+ CHECK_NE(reactable, nullptr);
+
+ uint32_t poll_event_type = 0;
+ if (on_read_ready != nullptr) {
+ poll_event_type |= (EPOLLIN | EPOLLRDHUP);
+ }
+ if (on_write_ready != nullptr) {
+ poll_event_type |= EPOLLOUT;
+ }
+ {
+ std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_);
+ reactable->on_read_ready_ = std::move(on_read_ready);
+ reactable->on_write_ready_ = std::move(on_write_ready);
+ }
+ epoll_event event = {
+ .events = poll_event_type,
+ {.ptr = reactable}
+ };
+ int modify_fd;
+ RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
+ CHECK_NE(modify_fd, -1) << __func__ << ": failed: " << strerror(errno);
+}
+
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/reactor.h b/system/common/reactor.h
new file mode 100644
index 0000000000..27528cea9c
--- /dev/null
+++ b/system/common/reactor.h
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <sys/epoll.h>
+#include <atomic>
+#include <functional>
+#include <list>
+#include <mutex>
+#include <thread>
+
+#include "common/utils.h"
+
+namespace bluetooth {
+namespace common {
+
+// Format of closure to be used in the entire stack
+using Closure = std::function<void()>;
+
+// A simple implementation of reactor-style looper.
+// When a reactor is running, the main loop is polling and blocked until at least one registered reactable is ready to
+// read or write. It will invoke on_read_ready() or on_write_ready(), which is registered with the reactor. Then, it
+// blocks again until ready event.
+class Reactor {
+ public:
+ // An object used for Unregister() and ModifyRegistration()
+ class Reactable;
+
+ // Construct a reactor on the current thread
+ Reactor();
+
+ // Destruct this reactor and release its resources
+ ~Reactor();
+
+ DISALLOW_COPY_AND_ASSIGN(Reactor);
+
+ // Start the reactor. The current thread will be blocked until Stop() is invoked and handled.
+ void Run();
+
+ // Stop the reactor. Must be invoked from a different thread. Note: all registered reactables will not be unregistered
+ // by Stop(). If the reactor is not running, it will be stopped once it's started.
+ void Stop();
+
+ // Register a reactable fd to this reactor. Returns a pointer to a Reactable. Caller must use this object to
+ // unregister or modify registration. Ownership of the memory space is NOT transferred to user.
+ Reactable* Register(int fd, Closure on_read_ready, Closure on_write_ready);
+
+ // Unregister a reactable from this reactor
+ void Unregister(Reactable* reactable);
+
+ // Modify the registration for a reactable with given reactable
+ void ModifyRegistration(Reactable* reactable, Closure on_read_ready, Closure on_write_ready);
+
+ private:
+ mutable std::mutex mutex_;
+ int epoll_fd_;
+ int control_fd_;
+ std::atomic<bool> is_running_;
+ std::list<Reactable*> invalidation_list_;
+ bool reactable_removed_;
+};
+
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/reactor_unittest.cc b/system/common/reactor_unittest.cc
new file mode 100644
index 0000000000..5c7e90ea47
--- /dev/null
+++ b/system/common/reactor_unittest.cc
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "reactor.h"
+
+#include <sys/eventfd.h>
+#include <chrono>
+#include <future>
+#include <thread>
+
+#include "base/logging.h"
+#include "gtest/gtest.h"
+
+namespace bluetooth {
+namespace common {
+namespace {
+
+constexpr int kReadReadyValue = 100;
+
+std::promise<int>* g_promise;
+
+class ReactorTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ g_promise = new std::promise<int>;
+ reactor_ = new Reactor;
+ }
+
+ void TearDown() override {
+ delete g_promise;
+ g_promise = nullptr;
+ delete reactor_;
+ reactor_ = nullptr;
+ }
+
+ Reactor* reactor_;
+};
+
+class SampleReactable {
+ public:
+ SampleReactable() : fd_(eventfd(0, EFD_NONBLOCK)) {
+ EXPECT_NE(fd_, 0);
+ }
+
+ ~SampleReactable() {
+ close(fd_);
+ }
+
+ void OnReadReady() {}
+
+ void OnWriteReady() {}
+
+ int fd_;
+};
+
+class FakeReactable {
+ public:
+ enum EventFdValue {
+ kSetPromise = 1,
+ kRegisterSampleReactable,
+ kUnregisterSampleReactable,
+ kSampleOutputValue,
+ };
+ FakeReactable() : fd_(eventfd(0, 0)), reactor_(nullptr) {
+ EXPECT_NE(fd_, 0);
+ }
+
+ FakeReactable(Reactor* reactor) : fd_(eventfd(0, 0)), reactor_(reactor) {
+ EXPECT_NE(fd_, 0);
+ }
+
+ ~FakeReactable() {
+ close(fd_);
+ }
+
+ void OnReadReady() {
+ uint64_t value = 0;
+ auto read_result = eventfd_read(fd_, &value);
+ EXPECT_EQ(read_result, 0);
+ if (value == kSetPromise && g_promise != nullptr) {
+ g_promise->set_value(kReadReadyValue);
+ }
+ if (value == kRegisterSampleReactable) {
+ reactable_ = reactor_->Register(sample_reactable_.fd_, [this] { this->sample_reactable_.OnReadReady(); },
+ [this] { this->sample_reactable_.OnWriteReady(); });
+ g_promise->set_value(kReadReadyValue);
+ }
+ if (value == kUnregisterSampleReactable) {
+ reactor_->Unregister(reactable_);
+ g_promise->set_value(kReadReadyValue);
+ }
+ }
+
+ void OnWriteReady() {
+ auto write_result = eventfd_write(fd_, output_data_);
+ output_data_ = 0;
+ EXPECT_EQ(write_result, 0);
+ }
+
+ SampleReactable sample_reactable_;
+ Reactor::Reactable* reactable_ = nullptr;
+ int fd_;
+
+ private:
+ Reactor* reactor_;
+ uint64_t output_data_ = kSampleOutputValue;
+};
+
+TEST_F(ReactorTest, start_and_stop) {
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ reactor_->Stop();
+ reactor_thread.join();
+}
+
+TEST_F(ReactorTest, stop_and_start) {
+ auto reactor_thread = std::thread(&Reactor::Stop, reactor_);
+ auto another_thread = std::thread(&Reactor::Run, reactor_);
+ reactor_thread.join();
+ another_thread.join();
+}
+
+TEST_F(ReactorTest, stop_multi_times) {
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ for (int i = 0; i < 5; i++) {
+ reactor_->Stop();
+ }
+ reactor_thread.join();
+}
+
+TEST_F(ReactorTest, cold_register_only) {
+ FakeReactable fake_reactable;
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr);
+
+ reactor_->Unregister(reactable);
+}
+
+TEST_F(ReactorTest, cold_register) {
+ FakeReactable fake_reactable;
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr);
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ auto future = g_promise->get_future();
+
+ auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
+ EXPECT_EQ(write_result, 0);
+ EXPECT_EQ(future.get(), kReadReadyValue);
+ reactor_->Stop();
+ reactor_thread.join();
+ reactor_->Unregister(reactable);
+}
+
+TEST_F(ReactorTest, hot_register_from_different_thread) {
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ auto future = g_promise->get_future();
+
+ FakeReactable fake_reactable;
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr);
+ auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
+ EXPECT_EQ(write_result, 0);
+ EXPECT_EQ(future.get(), kReadReadyValue);
+ reactor_->Stop();
+ reactor_thread.join();
+
+ reactor_->Unregister(reactable);
+}
+
+TEST_F(ReactorTest, hot_unregister_from_different_thread) {
+ FakeReactable fake_reactable;
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr);
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ reactor_->Unregister(reactable);
+ auto future = g_promise->get_future();
+
+ auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise);
+ EXPECT_EQ(write_result, 0);
+ future.wait_for(std::chrono::milliseconds(10));
+ g_promise->set_value(2);
+ EXPECT_EQ(future.get(), 2);
+ reactor_->Stop();
+ reactor_thread.join();
+}
+
+TEST_F(ReactorTest, hot_register_from_same_thread) {
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ auto future = g_promise->get_future();
+
+ FakeReactable fake_reactable(reactor_);
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr);
+ auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
+ EXPECT_EQ(write_result, 0);
+ EXPECT_EQ(future.get(), kReadReadyValue);
+ reactor_->Stop();
+ reactor_thread.join();
+
+ reactor_->Unregister(reactable);
+}
+
+TEST_F(ReactorTest, hot_unregister_from_same_thread) {
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ auto future = g_promise->get_future();
+
+ FakeReactable fake_reactable(reactor_);
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr);
+ auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable);
+ EXPECT_EQ(write_result, 0);
+ EXPECT_EQ(future.get(), kReadReadyValue);
+ delete g_promise;
+ g_promise = new std::promise<int>;
+ future = g_promise->get_future();
+ write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable);
+ EXPECT_EQ(write_result, 0);
+ EXPECT_EQ(future.get(), kReadReadyValue);
+ reactor_->Stop();
+ reactor_thread.join();
+ LOG(INFO);
+
+ reactor_->Unregister(reactable);
+ LOG(INFO);
+}
+
+TEST_F(ReactorTest, start_and_stop_multi_times) {
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ reactor_->Stop();
+ reactor_thread.join();
+ for (int i = 0; i < 5; i++) {
+ reactor_thread = std::thread(&Reactor::Run, reactor_);
+ reactor_->Stop();
+ reactor_thread.join();
+ }
+}
+
+TEST_F(ReactorTest, on_write_ready) {
+ FakeReactable fake_reactable;
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, nullptr, std::bind(&FakeReactable::OnWriteReady, &fake_reactable));
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ uint64_t value = 0;
+ auto read_result = eventfd_read(fake_reactable.fd_, &value);
+ EXPECT_EQ(read_result, 0);
+ EXPECT_EQ(value, FakeReactable::kSampleOutputValue);
+
+ reactor_->Stop();
+ reactor_thread.join();
+
+ reactor_->Unregister(reactable);
+}
+
+TEST_F(ReactorTest, modify_registration) {
+ FakeReactable fake_reactable;
+ auto* reactable =
+ reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr);
+ reactor_->ModifyRegistration(reactable, nullptr, std::bind(&FakeReactable::OnWriteReady, &fake_reactable));
+ auto reactor_thread = std::thread(&Reactor::Run, reactor_);
+ uint64_t value = 0;
+ auto read_result = eventfd_read(fake_reactable.fd_, &value);
+ EXPECT_EQ(read_result, 0);
+ EXPECT_EQ(value, FakeReactable::kSampleOutputValue);
+
+ reactor_->Stop();
+ reactor_thread.join();
+}
+
+} // namespace
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/thread.cc b/system/common/thread.cc
new file mode 100644
index 0000000000..2cb22058d4
--- /dev/null
+++ b/system/common/thread.cc
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/thread.h"
+
+#include <fcntl.h>
+#include <sys/syscall.h>
+#include <cerrno>
+#include <cstring>
+
+#include "base/logging.h"
+
+namespace bluetooth {
+namespace common {
+
+namespace {
+constexpr int kRealTimeFifoSchedulingPriority = 1;
+}
+
+Thread::Thread(const std::string& name, const Priority priority)
+ : name_(name),
+ reactor_(),
+ running_thread_(&Thread::run, this, priority) {}
+
+void Thread::run(Priority priority) {
+ if (priority == Priority::REAL_TIME) {
+ struct sched_param rt_params = {.sched_priority = kRealTimeFifoSchedulingPriority};
+ auto linux_tid = static_cast<pid_t>(syscall(SYS_gettid));
+ int rc;
+ RUN_NO_INTR(rc = sched_setscheduler(linux_tid, SCHED_FIFO, &rt_params));
+ if (rc != 0) {
+ LOG(ERROR) << __func__ << ": unable to set SCHED_FIFO priority: " << strerror(errno);
+ }
+ }
+ reactor_.Run();
+}
+
+Thread::~Thread() {
+ Stop();
+}
+
+bool Thread::Stop() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ CHECK_NE(std::this_thread::get_id(), running_thread_.get_id());
+
+ if (!running_thread_.joinable()) {
+ return false;
+ }
+ reactor_.Stop();
+ running_thread_.join();
+ return true;
+}
+
+bool Thread::IsSameThread() const {
+ return std::this_thread::get_id() == running_thread_.get_id();
+}
+
+Reactor* Thread::GetReactor() const {
+ return &reactor_;
+}
+
+std::string Thread::GetThreadName() const {
+ return name_;
+}
+
+std::string Thread::ToString() const {
+ return "Thread " + name_;
+}
+
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/thread.h b/system/common/thread.h
new file mode 100644
index 0000000000..09e6b48328
--- /dev/null
+++ b/system/common/thread.h
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "common/reactor.h"
+#include "common/utils.h"
+
+namespace bluetooth {
+namespace common {
+
+// Reactor-based looper thread implementation. The thread runs immediately after it is constructed, and stops after
+// Stop() is invoked. To assign task to this thread, user needs to register a reactable object to the underlying
+// reactor.
+class Thread {
+ public:
+ // Used by thread constructor. Suggest the priority to the kernel scheduler. Use REAL_TIME if we need (soft) real-time
+ // scheduling guarantee for this thread; use NORMAL if no real-time guarantee is needed to save CPU time slice for
+ // other threads
+ enum class Priority {
+ REAL_TIME,
+ NORMAL,
+ };
+
+ // name: thread name for POSIX systems
+ // priority: priority for kernel scheduler
+ Thread(const std::string& name, Priority priority);
+
+ // Stop and destroy this thread
+ ~Thread();
+
+ DISALLOW_COPY_AND_ASSIGN(Thread);
+
+ // Stop this thread. Must be invoked from another thread. After this thread is stopped, it cannot be started again.
+ bool Stop();
+
+ // Return true if this function is invoked from this thread
+ bool IsSameThread() const;
+
+ // Return the POSIX thread name
+ std::string GetThreadName() const;
+
+ // Return a user-friendly string representation of this thread object
+ std::string ToString() const;
+
+ // Return the pointer of underlying reactor. The ownership is NOT transferred.
+ Reactor* GetReactor() const;
+
+ private:
+ void run(Priority priority);
+ mutable std::mutex mutex_;
+ const std::string name_;
+ mutable Reactor reactor_;
+ std::thread running_thread_;
+};
+
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/thread_unittest.cc b/system/common/thread_unittest.cc
new file mode 100644
index 0000000000..678f34da78
--- /dev/null
+++ b/system/common/thread_unittest.cc
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "thread.h"
+
+#include <sys/eventfd.h>
+
+#include "base/logging.h"
+#include "gtest/gtest.h"
+#include "reactor.h"
+
+namespace bluetooth {
+namespace common {
+namespace {
+
+constexpr int kCheckIsSameThread = 1;
+
+class SampleReactable {
+ public:
+ explicit SampleReactable(Thread* thread) : thread_(thread), fd_(eventfd(0, 0)), is_same_thread_checked_(false) {
+ EXPECT_NE(fd_, 0);
+ }
+
+ ~SampleReactable() {
+ close(fd_);
+ }
+
+ void OnReadReady() {
+ EXPECT_TRUE(thread_->IsSameThread());
+ is_same_thread_checked_ = true;
+ uint64_t val;
+ eventfd_read(fd_, &val);
+ }
+
+ bool IsSameThreadCheckDone() {
+ return is_same_thread_checked_;
+ }
+
+ Thread* thread_;
+ int fd_;
+ bool is_same_thread_checked_;
+};
+
+class ThreadTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ thread = new Thread("test", Thread::Priority::NORMAL);
+ }
+
+ void TearDown() override {
+ delete thread;
+ }
+ Thread* thread = nullptr;
+};
+
+TEST_F(ThreadTest, just_stop_no_op) {
+ thread->Stop();
+}
+
+TEST_F(ThreadTest, thread_name) {
+ EXPECT_EQ(thread->GetThreadName(), "test");
+}
+
+TEST_F(ThreadTest, thread_to_string) {
+ EXPECT_NE(thread->ToString().find("test"), std::string::npos);
+}
+
+TEST_F(ThreadTest, not_same_thread) {
+ EXPECT_FALSE(thread->IsSameThread());
+}
+
+TEST_F(ThreadTest, same_thread) {
+ Reactor* reactor = thread->GetReactor();
+ SampleReactable sample_reactable(thread);
+ auto* reactable =
+ reactor->Register(sample_reactable.fd_, std::bind(&SampleReactable::OnReadReady, &sample_reactable), nullptr);
+ int fd = sample_reactable.fd_;
+ int write_result = eventfd_write(fd, kCheckIsSameThread);
+ EXPECT_EQ(write_result, 0);
+ while (!sample_reactable.IsSameThreadCheckDone()) std::this_thread::yield();
+ reactor->Unregister(reactable);
+}
+
+} // namespace
+} // namespace common
+} // namespace bluetooth
diff --git a/system/common/utils.h b/system/common/utils.h
new file mode 100644
index 0000000000..26e8f23df9
--- /dev/null
+++ b/system/common/utils.h
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+// A macro to re-try a syscall when it receives EINTR
+#ifndef RUN_NO_INTR
+#define RUN_NO_INTR(fn) \
+ do { \
+ } while ((fn) == -1 && errno == EINTR)
+#endif
+
+// A macro to disallow the copy constructor and operator= functions
+#ifndef DISALLOW_COPY_AND_ASSIGN
+#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
+ TypeName(const TypeName&) = delete; \
+ void operator=(const TypeName&) = delete
+#endif
diff --git a/system/stack/Android.bp b/system/stack/Android.bp
index 640441e930..dcf68b120d 100644
--- a/system/stack/Android.bp
+++ b/system/stack/Android.bp
@@ -202,6 +202,7 @@ cc_test {
"test/stack_a2dp_test.cc",
],
shared_libs: [
+ "libcrypto",
"libhidlbase",
"liblog",
"libprotobuf-cpp-lite",
diff --git a/system/test/gen_coverage.py b/system/test/gen_coverage.py
index eedc27aa31..d4399f4d05 100755
--- a/system/test/gen_coverage.py
+++ b/system/test/gen_coverage.py
@@ -79,6 +79,11 @@ COVERAGE_TESTS = [
"covered_files": [
"packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/packets",
],
+ }, {
+ "test_name": "bluetooth_test_common",
+ "covered_files": [
+ "packages/modules/Bluetooth/system/common",
+ ],
},
]