diff options
author | 2018-12-12 14:52:00 -0800 | |
---|---|---|
committer | 2019-02-25 18:31:29 -0800 | |
commit | 8bb04bb877a35b37202078ecf8ea8daee2fdfed7 (patch) | |
tree | 57bceed6b527f7a25d6a82437bf718f5f8157650 | |
parent | f8e86f4ce8864b05b1f64829dc30881c2b678ba2 (diff) |
Reactor-based threading model
* Use Reactor+Thread (common/thread.h) to replace existing
libchrome-based message_loop_thread
* Use Handler to implement multiple message queue per thread, by using
kernel-based eventfd
Test: run unit test, and run benchmark
Change-Id: Idd2e4ef99fb9a7b2c0956de0e372c67a1098f1b6
-rw-r--r-- | .clang-format | 3 | ||||
-rw-r--r-- | system/bta/Android.bp | 1 | ||||
-rw-r--r-- | system/common/Android.bp | 28 | ||||
-rw-r--r-- | system/common/benchmark/thread_performance_benchmark.cc | 52 | ||||
-rw-r--r-- | system/common/handler.cc | 87 | ||||
-rw-r--r-- | system/common/handler.h | 59 | ||||
-rw-r--r-- | system/common/handler_unittest.cc | 71 | ||||
-rw-r--r-- | system/common/reactor.cc | 212 | ||||
-rw-r--r-- | system/common/reactor.h | 78 | ||||
-rw-r--r-- | system/common/reactor_unittest.cc | 283 | ||||
-rw-r--r-- | system/common/thread.cc | 84 | ||||
-rw-r--r-- | system/common/thread.h | 75 | ||||
-rw-r--r-- | system/common/thread_unittest.cc | 99 | ||||
-rw-r--r-- | system/common/utils.h | 31 | ||||
-rw-r--r-- | system/stack/Android.bp | 1 | ||||
-rwxr-xr-x | system/test/gen_coverage.py | 5 |
16 files changed, 1165 insertions, 4 deletions
diff --git a/.clang-format b/.clang-format index fc5f5fe2bf..1f1f586184 100644 --- a/.clang-format +++ b/.clang-format @@ -23,3 +23,6 @@ BasedOnStyle: Google CommentPragmas: NOLINT:.* DerivePointerAlignment: false ColumnLimit: 120 +AllowShortFunctionsOnASingleLine: Empty +ConstructorInitializerAllOnOneLineOrOnePerLine: false +BreakConstructorInitializers: BeforeColon diff --git a/system/bta/Android.bp b/system/bta/Android.bp index 71652c5734..5826885a14 100644 --- a/system/bta/Android.bp +++ b/system/bta/Android.bp @@ -129,6 +129,7 @@ cc_test { "test/gatt/database_test.cc", ], shared_libs: [ + "libcrypto", "liblog", "libprotobuf-cpp-lite", ], diff --git a/system/common/Android.bp b/system/common/Android.bp index 6312ebd605..e0c1bfb40f 100644 --- a/system/common/Android.bp +++ b/system/common/Android.bp @@ -1,6 +1,9 @@ cc_library_static { name: "libbt-common", - defaults: ["fluoride_defaults"], + defaults: [ + "fluoride_defaults", + "clang_file_coverage", + ], host_supported: true, include_dirs: [ "packages/modules/Bluetooth/system", @@ -8,10 +11,13 @@ cc_library_static { ], srcs: [ "address_obfuscator.cc", + "handler.cc", "message_loop_thread.cc", "metrics.cc", "once_timer.cc", + "reactor.cc", "repeating_timer.cc", + "thread.cc", "time_util.cc", ], shared_libs: [ @@ -25,7 +31,10 @@ cc_library_static { cc_test { name: "bluetooth_test_common", test_suites: ["device-tests"], - defaults: ["fluoride_defaults"], + defaults: [ + "fluoride_defaults", + "clang_coverage_bin", + ], host_supported: true, include_dirs: [ "packages/modules/Bluetooth/system", @@ -33,12 +42,15 @@ cc_test { ], srcs : [ "address_obfuscator_unittest.cc", + "handler_unittest.cc", "leaky_bonded_queue_unittest.cc", "message_loop_thread_unittest.cc", "metrics_unittest.cc", "once_timer_unittest.cc", + "reactor_unittest.cc", "repeating_timer_unittest.cc", "state_machine_unittest.cc", + "thread_unittest.cc", "time_util_unittest.cc", "id_generator_unittest.cc", ], @@ -77,12 +89,16 @@ cc_test { cc_benchmark { name: "bluetooth_benchmark_thread_performance", - defaults: ["fluoride_defaults"], + defaults: [ + "fluoride_defaults", + ], + host_supported: true, include_dirs: ["packages/modules/Bluetooth/system"], srcs: [ "benchmark/thread_performance_benchmark.cc", ], shared_libs: [ + "libcrypto", "liblog", ], static_libs: [ @@ -93,13 +109,17 @@ cc_benchmark { cc_benchmark { name: "bluetooth_benchmark_timer_performance", - defaults: ["fluoride_defaults"], + defaults: [ + "fluoride_defaults", + ], + host_supported: false, include_dirs: ["packages/modules/Bluetooth/system"], srcs: [ "benchmark/timer_performance_benchmark.cc", ], shared_libs: [ "liblog", + "libcrypto", "libprotobuf-cpp-lite", "libcutils", ], diff --git a/system/common/benchmark/thread_performance_benchmark.cc b/system/common/benchmark/thread_performance_benchmark.cc index 74f157f85b..01f332fe07 100644 --- a/system/common/benchmark/thread_performance_benchmark.cc +++ b/system/common/benchmark/thread_performance_benchmark.cc @@ -23,12 +23,16 @@ #include <memory> #include <thread> +#include "common/handler.h" #include "common/message_loop_thread.h" +#include "common/thread.h" #include "osi/include/fixed_queue.h" #include "osi/include/thread.h" using ::benchmark::State; +using bluetooth::common::Handler; using bluetooth::common::MessageLoopThread; +using bluetooth::common::Thread; #define NUM_MESSAGES_TO_SEND 100000 @@ -419,6 +423,54 @@ BENCHMARK_F(BM_LibChromeThread, sequential_execution)(State& state) { } }; +class BM_ReactorThread : public BM_ThreadPerformance { + protected: + void SetUp(State& st) override { + BM_ThreadPerformance::SetUp(st); + std::future<void> set_up_future = set_up_promise_->get_future(); + thread_ = new Thread("BM_ReactorThread thread", Thread::Priority::NORMAL); + handler_ = new Handler(thread_); + handler_->Post([this]() { set_up_promise_->set_value(); }); + set_up_future.wait(); + } + + void TearDown(State& st) override { + delete handler_; + handler_ = nullptr; + thread_->Stop(); + delete thread_; + thread_ = nullptr; + BM_ThreadPerformance::TearDown(st); + } + + Thread* thread_ = nullptr; + Handler* handler_ = nullptr; +}; + +BENCHMARK_F(BM_ReactorThread, batch_enque_dequeue)(State& state) { + for (auto _ : state) { + g_counter = 0; + g_counter_promise = std::make_unique<std::promise<void>>(); + std::future<void> counter_future = g_counter_promise->get_future(); + for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { + fixed_queue_enqueue(bt_msg_queue_, (void*)&g_counter); + handler_->Post([this]() { callback_batch(bt_msg_queue_, nullptr); }); + } + counter_future.wait(); + } +}; + +BENCHMARK_F(BM_ReactorThread, sequential_execution)(State& state) { + for (auto _ : state) { + for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) { + g_counter_promise = std::make_unique<std::promise<void>>(); + std::future<void> counter_future = g_counter_promise->get_future(); + handler_->Post([]() { callback_sequential(nullptr); }); + counter_future.wait(); + } + } +}; + int main(int argc, char** argv) { // Disable LOG() output from libchrome logging::LoggingSettings log_settings; diff --git a/system/common/handler.cc b/system/common/handler.cc new file mode 100644 index 0000000000..219dd82dde --- /dev/null +++ b/system/common/handler.cc @@ -0,0 +1,87 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "handler.h" + +#include <sys/eventfd.h> +#include <cstring> + +#include "base/logging.h" + +#include "reactor.h" +#include "utils.h" + +#ifndef EFD_SEMAPHORE +#define EFD_SEMAPHORE 1 +#endif + +namespace bluetooth { +namespace common { + +Handler::Handler(Thread* thread) + : thread_(thread), + fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { + CHECK_NE(fd_, -1) << __func__ << ": cannot create eventfd: " << strerror(errno); + + reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr); +} + +Handler::~Handler() { + thread_->GetReactor()->Unregister(reactable_); + reactable_ = nullptr; + + int close_status; + RUN_NO_INTR(close_status = close(fd_)); + CHECK_NE(close_status, -1) << __func__ << ": cannot close eventfd: " << strerror(errno); +} + +void Handler::Post(Closure closure) { + { + std::lock_guard<std::mutex> lock(mutex_); + tasks_.emplace(std::move(closure)); + } + uint64_t val = 1; + auto write_result = eventfd_write(fd_, val); + CHECK_NE(write_result, -1) << __func__ << ": failed to write: " << strerror(errno); +} + +void Handler::Clear() { + std::lock_guard<std::mutex> lock(mutex_); + + std::queue<Closure> empty; + std::swap(tasks_, empty); + + uint64_t val; + while (eventfd_read(fd_, &val) == 0) { + } +} + +void Handler::handle_next_event() { + Closure closure; + uint64_t val = 0; + auto read_result = eventfd_read(fd_, &val); + CHECK_NE(read_result, -1) << __func__ << ": failed to read fd: " << strerror(errno); + + { + std::lock_guard<std::mutex> lock(mutex_); + closure = std::move(tasks_.front()); + tasks_.pop(); + } + closure(); +} + +} // namespace common +} // namespace bluetooth diff --git a/system/common/handler.h b/system/common/handler.h new file mode 100644 index 0000000000..71b4dbaff8 --- /dev/null +++ b/system/common/handler.h @@ -0,0 +1,59 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <mutex> +#include <queue> + +#include "common/thread.h" +#include "common/utils.h" + +namespace bluetooth { +namespace common { + +// A message-queue style handler for reactor-based thread to handle incoming events from different threads. When it's +// constructed, it will register a reactable on the specified thread; when it's destroyed, it will unregister itself +// from the thread. +class Handler { + public: + // Create and register a handler on given thread + explicit Handler(Thread* thread); + + // Unregister this handler from the thread and release resource. Unhandled events will be discarded and not executed. + ~Handler(); + + DISALLOW_COPY_AND_ASSIGN(Handler); + + // Enqueue a closure to the queue of this handler + void Post(Closure closure); + + // Remove all pending events from the queue of this handler + void Clear(); + + private: + std::queue<Closure> tasks_; + Thread* thread_; + int fd_; + Reactor::Reactable* reactable_; + mutable std::mutex mutex_; + void handle_next_event(); +}; + +} // namespace common +} // namespace bluetooth diff --git a/system/common/handler_unittest.cc b/system/common/handler_unittest.cc new file mode 100644 index 0000000000..c51e51b52b --- /dev/null +++ b/system/common/handler_unittest.cc @@ -0,0 +1,71 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "handler.h" + +#include <sys/eventfd.h> +#include <thread> + +#include <gtest/gtest.h> +#include "base/logging.h" + +namespace bluetooth { +namespace common { +namespace { + +class HandlerTest : public ::testing::Test { + protected: + void SetUp() override { + thread_ = new Thread("test_thread", Thread::Priority::NORMAL); + handler_ = new Handler(thread_); + } + void TearDown() override { + delete handler_; + delete thread_; + } + + Handler* handler_; + Thread* thread_; +}; + +TEST_F(HandlerTest, empty) {} + +TEST_F(HandlerTest, post_task_invoked) { + int val = 0; + Closure closure = [&val]() { val++; }; + handler_->Post(closure); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + EXPECT_EQ(val, 1); +} + +TEST_F(HandlerTest, post_task_cleared) { + int val = 0; + Closure closure = [&val]() { + val++; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + }; + handler_->Post(std::move(closure)); + closure = []() { LOG(FATAL) << "Should not happen"; }; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + handler_->Post(std::move(closure)); + handler_->Clear(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + EXPECT_EQ(val, 1); +} + +} // namespace +} // namespace common +} // namespace bluetooth diff --git a/system/common/reactor.cc b/system/common/reactor.cc new file mode 100644 index 0000000000..61cc7d60fe --- /dev/null +++ b/system/common/reactor.cc @@ -0,0 +1,212 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "reactor.h" + +#include <fcntl.h> +#include <sys/epoll.h> +#include <sys/eventfd.h> +#include <unistd.h> +#include <algorithm> +#include <cerrno> +#include <cstring> + +#include "base/logging.h" + +namespace { + +// Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory +constexpr int kEpollMaxEvents = 64; + +} // namespace + +namespace bluetooth { +namespace common { + +class Reactor::Reactable { + public: + Reactable(int fd, Closure on_read_ready, Closure on_write_ready) + : fd_(fd), + on_read_ready_(std::move(on_read_ready)), + on_write_ready_(std::move(on_write_ready)), + is_executing_(false) {} + const int fd_; + Closure on_read_ready_; + Closure on_write_ready_; + bool is_executing_; + std::recursive_mutex lock_; +}; + +Reactor::Reactor() + : epoll_fd_(0), + control_fd_(0), + is_running_(false), + reactable_removed_(false) { + RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC)); + CHECK_NE(epoll_fd_, -1) << __func__ << ": cannot create epoll_fd: " << strerror(errno); + + control_fd_ = eventfd(0, EFD_NONBLOCK); + CHECK_NE(control_fd_, -1) << __func__ << ": cannot create control_fd: " << strerror(errno); + + epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}}; + int result; + RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event)); + CHECK_NE(result, -1) << __func__ << ": cannot register control_fd: " << strerror(errno); +} + +Reactor::~Reactor() { + int result; + RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr)); + CHECK_NE(result, -1) << __func__ << ": cannot unregister control_fd: " << strerror(errno); + + RUN_NO_INTR(result = close(control_fd_)); + CHECK_NE(result, -1) << __func__ << ": cannot close control_fd: " << strerror(errno); + + RUN_NO_INTR(result = close(epoll_fd_)); + CHECK_NE(result, -1) << __func__ << ": cannot close epoll_fd: " << strerror(errno); +} + +void Reactor::Run() { + bool previously_running = is_running_.exchange(true); + CHECK_EQ(previously_running, false) << __func__ << ": already running"; + LOG(INFO) << __func__ << ": started"; + + for (;;) { + invalidation_list_.clear(); + epoll_event events[kEpollMaxEvents]; + int count; + RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, -1)); + CHECK_NE(count, -1) << __func__ << ": Error polling for fds: " << strerror(errno); + + for (int i = 0; i < count; ++i) { + auto event = events[i]; + CHECK_NE(event.events, 0u) << __func__ << ": no result in epoll result"; + + // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered + if (event.data.ptr == nullptr) { + uint64_t value; + eventfd_read(control_fd_, &value); + LOG(INFO) << __func__ << ": stopped"; + is_running_ = false; + return; + } + auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr); + { + std::unique_lock<std::mutex> lock(mutex_); + // See if this reactable has been removed in the meantime. + if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) { + continue; + } + + std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); + lock.unlock(); + reactable_removed_ = false; + reactable->is_executing_ = true; + if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && reactable->on_read_ready_ != nullptr) { + reactable->on_read_ready_(); + } + if (!reactable_removed_ && event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) { + reactable->on_write_ready_(); + } + reactable->is_executing_ = false; + } + if (reactable_removed_) { + delete reactable; + } + } + } +} + +void Reactor::Stop() { + if (!is_running_) { + LOG(WARNING) << __func__ << ": not running, will stop once it's started"; + } + auto control = eventfd_write(control_fd_, 1); + CHECK_NE(control, -1) << __func__ << ": failed: " << strerror(errno); +} + +Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) { + uint32_t poll_event_type = 0; + if (on_read_ready != nullptr) { + poll_event_type |= (EPOLLIN | EPOLLRDHUP); + } + if (on_write_ready != nullptr) { + poll_event_type |= EPOLLOUT; + } + auto* reactable = new Reactable(fd, on_read_ready, on_write_ready); + epoll_event event = { + .events = poll_event_type, + {.ptr = reactable} + }; + int register_fd; + RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event)); + CHECK_NE(register_fd, -1) << __func__ << ": failed: " << strerror(errno); + return reactable; +} + +void Reactor::Unregister(Reactor::Reactable* reactable) { + CHECK_NE(reactable, nullptr); + { + std::lock_guard<std::mutex> lock(mutex_); + invalidation_list_.push_back(reactable); + } + { + int result; + std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); + RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr)); + if (result == -1 && errno == ENOENT) { + LOG(INFO) << __func__ << ": reactable is invalid or unregistered"; + } else if (result == -1) { + LOG(FATAL) << __func__ << ": failed: " << strerror(errno); + } + // If we are unregistering during the callback event from this reactable, we delete it after the callback is executed. + // reactable->is_executing_ is protected by reactable->lock_, so it's thread safe. + if (reactable->is_executing_) { + reactable_removed_ = true; + } + } + // If we are unregistering outside of the callback event from this reactable, we delete it now + if (!reactable_removed_) { + delete reactable; + } +} + +void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { + CHECK_NE(reactable, nullptr); + + uint32_t poll_event_type = 0; + if (on_read_ready != nullptr) { + poll_event_type |= (EPOLLIN | EPOLLRDHUP); + } + if (on_write_ready != nullptr) { + poll_event_type |= EPOLLOUT; + } + { + std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); + reactable->on_read_ready_ = std::move(on_read_ready); + reactable->on_write_ready_ = std::move(on_write_ready); + } + epoll_event event = { + .events = poll_event_type, + {.ptr = reactable} + }; + int modify_fd; + RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event)); + CHECK_NE(modify_fd, -1) << __func__ << ": failed: " << strerror(errno); +} + +} // namespace common +} // namespace bluetooth diff --git a/system/common/reactor.h b/system/common/reactor.h new file mode 100644 index 0000000000..27528cea9c --- /dev/null +++ b/system/common/reactor.h @@ -0,0 +1,78 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <sys/epoll.h> +#include <atomic> +#include <functional> +#include <list> +#include <mutex> +#include <thread> + +#include "common/utils.h" + +namespace bluetooth { +namespace common { + +// Format of closure to be used in the entire stack +using Closure = std::function<void()>; + +// A simple implementation of reactor-style looper. +// When a reactor is running, the main loop is polling and blocked until at least one registered reactable is ready to +// read or write. It will invoke on_read_ready() or on_write_ready(), which is registered with the reactor. Then, it +// blocks again until ready event. +class Reactor { + public: + // An object used for Unregister() and ModifyRegistration() + class Reactable; + + // Construct a reactor on the current thread + Reactor(); + + // Destruct this reactor and release its resources + ~Reactor(); + + DISALLOW_COPY_AND_ASSIGN(Reactor); + + // Start the reactor. The current thread will be blocked until Stop() is invoked and handled. + void Run(); + + // Stop the reactor. Must be invoked from a different thread. Note: all registered reactables will not be unregistered + // by Stop(). If the reactor is not running, it will be stopped once it's started. + void Stop(); + + // Register a reactable fd to this reactor. Returns a pointer to a Reactable. Caller must use this object to + // unregister or modify registration. Ownership of the memory space is NOT transferred to user. + Reactable* Register(int fd, Closure on_read_ready, Closure on_write_ready); + + // Unregister a reactable from this reactor + void Unregister(Reactable* reactable); + + // Modify the registration for a reactable with given reactable + void ModifyRegistration(Reactable* reactable, Closure on_read_ready, Closure on_write_ready); + + private: + mutable std::mutex mutex_; + int epoll_fd_; + int control_fd_; + std::atomic<bool> is_running_; + std::list<Reactable*> invalidation_list_; + bool reactable_removed_; +}; + +} // namespace common +} // namespace bluetooth diff --git a/system/common/reactor_unittest.cc b/system/common/reactor_unittest.cc new file mode 100644 index 0000000000..5c7e90ea47 --- /dev/null +++ b/system/common/reactor_unittest.cc @@ -0,0 +1,283 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "reactor.h" + +#include <sys/eventfd.h> +#include <chrono> +#include <future> +#include <thread> + +#include "base/logging.h" +#include "gtest/gtest.h" + +namespace bluetooth { +namespace common { +namespace { + +constexpr int kReadReadyValue = 100; + +std::promise<int>* g_promise; + +class ReactorTest : public ::testing::Test { + protected: + void SetUp() override { + g_promise = new std::promise<int>; + reactor_ = new Reactor; + } + + void TearDown() override { + delete g_promise; + g_promise = nullptr; + delete reactor_; + reactor_ = nullptr; + } + + Reactor* reactor_; +}; + +class SampleReactable { + public: + SampleReactable() : fd_(eventfd(0, EFD_NONBLOCK)) { + EXPECT_NE(fd_, 0); + } + + ~SampleReactable() { + close(fd_); + } + + void OnReadReady() {} + + void OnWriteReady() {} + + int fd_; +}; + +class FakeReactable { + public: + enum EventFdValue { + kSetPromise = 1, + kRegisterSampleReactable, + kUnregisterSampleReactable, + kSampleOutputValue, + }; + FakeReactable() : fd_(eventfd(0, 0)), reactor_(nullptr) { + EXPECT_NE(fd_, 0); + } + + FakeReactable(Reactor* reactor) : fd_(eventfd(0, 0)), reactor_(reactor) { + EXPECT_NE(fd_, 0); + } + + ~FakeReactable() { + close(fd_); + } + + void OnReadReady() { + uint64_t value = 0; + auto read_result = eventfd_read(fd_, &value); + EXPECT_EQ(read_result, 0); + if (value == kSetPromise && g_promise != nullptr) { + g_promise->set_value(kReadReadyValue); + } + if (value == kRegisterSampleReactable) { + reactable_ = reactor_->Register(sample_reactable_.fd_, [this] { this->sample_reactable_.OnReadReady(); }, + [this] { this->sample_reactable_.OnWriteReady(); }); + g_promise->set_value(kReadReadyValue); + } + if (value == kUnregisterSampleReactable) { + reactor_->Unregister(reactable_); + g_promise->set_value(kReadReadyValue); + } + } + + void OnWriteReady() { + auto write_result = eventfd_write(fd_, output_data_); + output_data_ = 0; + EXPECT_EQ(write_result, 0); + } + + SampleReactable sample_reactable_; + Reactor::Reactable* reactable_ = nullptr; + int fd_; + + private: + Reactor* reactor_; + uint64_t output_data_ = kSampleOutputValue; +}; + +TEST_F(ReactorTest, start_and_stop) { + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + reactor_->Stop(); + reactor_thread.join(); +} + +TEST_F(ReactorTest, stop_and_start) { + auto reactor_thread = std::thread(&Reactor::Stop, reactor_); + auto another_thread = std::thread(&Reactor::Run, reactor_); + reactor_thread.join(); + another_thread.join(); +} + +TEST_F(ReactorTest, stop_multi_times) { + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + for (int i = 0; i < 5; i++) { + reactor_->Stop(); + } + reactor_thread.join(); +} + +TEST_F(ReactorTest, cold_register_only) { + FakeReactable fake_reactable; + auto* reactable = + reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr); + + reactor_->Unregister(reactable); +} + +TEST_F(ReactorTest, cold_register) { + FakeReactable fake_reactable; + auto* reactable = + reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr); + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + auto future = g_promise->get_future(); + + auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise); + EXPECT_EQ(write_result, 0); + EXPECT_EQ(future.get(), kReadReadyValue); + reactor_->Stop(); + reactor_thread.join(); + reactor_->Unregister(reactable); +} + +TEST_F(ReactorTest, hot_register_from_different_thread) { + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + auto future = g_promise->get_future(); + + FakeReactable fake_reactable; + auto* reactable = + reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr); + auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise); + EXPECT_EQ(write_result, 0); + EXPECT_EQ(future.get(), kReadReadyValue); + reactor_->Stop(); + reactor_thread.join(); + + reactor_->Unregister(reactable); +} + +TEST_F(ReactorTest, hot_unregister_from_different_thread) { + FakeReactable fake_reactable; + auto* reactable = + reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr); + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + reactor_->Unregister(reactable); + auto future = g_promise->get_future(); + + auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise); + EXPECT_EQ(write_result, 0); + future.wait_for(std::chrono::milliseconds(10)); + g_promise->set_value(2); + EXPECT_EQ(future.get(), 2); + reactor_->Stop(); + reactor_thread.join(); +} + +TEST_F(ReactorTest, hot_register_from_same_thread) { + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + auto future = g_promise->get_future(); + + FakeReactable fake_reactable(reactor_); + auto* reactable = + reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr); + auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable); + EXPECT_EQ(write_result, 0); + EXPECT_EQ(future.get(), kReadReadyValue); + reactor_->Stop(); + reactor_thread.join(); + + reactor_->Unregister(reactable); +} + +TEST_F(ReactorTest, hot_unregister_from_same_thread) { + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + auto future = g_promise->get_future(); + + FakeReactable fake_reactable(reactor_); + auto* reactable = + reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr); + auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kRegisterSampleReactable); + EXPECT_EQ(write_result, 0); + EXPECT_EQ(future.get(), kReadReadyValue); + delete g_promise; + g_promise = new std::promise<int>; + future = g_promise->get_future(); + write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kUnregisterSampleReactable); + EXPECT_EQ(write_result, 0); + EXPECT_EQ(future.get(), kReadReadyValue); + reactor_->Stop(); + reactor_thread.join(); + LOG(INFO); + + reactor_->Unregister(reactable); + LOG(INFO); +} + +TEST_F(ReactorTest, start_and_stop_multi_times) { + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + reactor_->Stop(); + reactor_thread.join(); + for (int i = 0; i < 5; i++) { + reactor_thread = std::thread(&Reactor::Run, reactor_); + reactor_->Stop(); + reactor_thread.join(); + } +} + +TEST_F(ReactorTest, on_write_ready) { + FakeReactable fake_reactable; + auto* reactable = + reactor_->Register(fake_reactable.fd_, nullptr, std::bind(&FakeReactable::OnWriteReady, &fake_reactable)); + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + uint64_t value = 0; + auto read_result = eventfd_read(fake_reactable.fd_, &value); + EXPECT_EQ(read_result, 0); + EXPECT_EQ(value, FakeReactable::kSampleOutputValue); + + reactor_->Stop(); + reactor_thread.join(); + + reactor_->Unregister(reactable); +} + +TEST_F(ReactorTest, modify_registration) { + FakeReactable fake_reactable; + auto* reactable = + reactor_->Register(fake_reactable.fd_, std::bind(&FakeReactable::OnReadReady, &fake_reactable), nullptr); + reactor_->ModifyRegistration(reactable, nullptr, std::bind(&FakeReactable::OnWriteReady, &fake_reactable)); + auto reactor_thread = std::thread(&Reactor::Run, reactor_); + uint64_t value = 0; + auto read_result = eventfd_read(fake_reactable.fd_, &value); + EXPECT_EQ(read_result, 0); + EXPECT_EQ(value, FakeReactable::kSampleOutputValue); + + reactor_->Stop(); + reactor_thread.join(); +} + +} // namespace +} // namespace common +} // namespace bluetooth diff --git a/system/common/thread.cc b/system/common/thread.cc new file mode 100644 index 0000000000..2cb22058d4 --- /dev/null +++ b/system/common/thread.cc @@ -0,0 +1,84 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/thread.h" + +#include <fcntl.h> +#include <sys/syscall.h> +#include <cerrno> +#include <cstring> + +#include "base/logging.h" + +namespace bluetooth { +namespace common { + +namespace { +constexpr int kRealTimeFifoSchedulingPriority = 1; +} + +Thread::Thread(const std::string& name, const Priority priority) + : name_(name), + reactor_(), + running_thread_(&Thread::run, this, priority) {} + +void Thread::run(Priority priority) { + if (priority == Priority::REAL_TIME) { + struct sched_param rt_params = {.sched_priority = kRealTimeFifoSchedulingPriority}; + auto linux_tid = static_cast<pid_t>(syscall(SYS_gettid)); + int rc; + RUN_NO_INTR(rc = sched_setscheduler(linux_tid, SCHED_FIFO, &rt_params)); + if (rc != 0) { + LOG(ERROR) << __func__ << ": unable to set SCHED_FIFO priority: " << strerror(errno); + } + } + reactor_.Run(); +} + +Thread::~Thread() { + Stop(); +} + +bool Thread::Stop() { + std::lock_guard<std::mutex> lock(mutex_); + CHECK_NE(std::this_thread::get_id(), running_thread_.get_id()); + + if (!running_thread_.joinable()) { + return false; + } + reactor_.Stop(); + running_thread_.join(); + return true; +} + +bool Thread::IsSameThread() const { + return std::this_thread::get_id() == running_thread_.get_id(); +} + +Reactor* Thread::GetReactor() const { + return &reactor_; +} + +std::string Thread::GetThreadName() const { + return name_; +} + +std::string Thread::ToString() const { + return "Thread " + name_; +} + +} // namespace common +} // namespace bluetooth diff --git a/system/common/thread.h b/system/common/thread.h new file mode 100644 index 0000000000..09e6b48328 --- /dev/null +++ b/system/common/thread.h @@ -0,0 +1,75 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <mutex> +#include <string> +#include <thread> + +#include "common/reactor.h" +#include "common/utils.h" + +namespace bluetooth { +namespace common { + +// Reactor-based looper thread implementation. The thread runs immediately after it is constructed, and stops after +// Stop() is invoked. To assign task to this thread, user needs to register a reactable object to the underlying +// reactor. +class Thread { + public: + // Used by thread constructor. Suggest the priority to the kernel scheduler. Use REAL_TIME if we need (soft) real-time + // scheduling guarantee for this thread; use NORMAL if no real-time guarantee is needed to save CPU time slice for + // other threads + enum class Priority { + REAL_TIME, + NORMAL, + }; + + // name: thread name for POSIX systems + // priority: priority for kernel scheduler + Thread(const std::string& name, Priority priority); + + // Stop and destroy this thread + ~Thread(); + + DISALLOW_COPY_AND_ASSIGN(Thread); + + // Stop this thread. Must be invoked from another thread. After this thread is stopped, it cannot be started again. + bool Stop(); + + // Return true if this function is invoked from this thread + bool IsSameThread() const; + + // Return the POSIX thread name + std::string GetThreadName() const; + + // Return a user-friendly string representation of this thread object + std::string ToString() const; + + // Return the pointer of underlying reactor. The ownership is NOT transferred. + Reactor* GetReactor() const; + + private: + void run(Priority priority); + mutable std::mutex mutex_; + const std::string name_; + mutable Reactor reactor_; + std::thread running_thread_; +}; + +} // namespace common +} // namespace bluetooth diff --git a/system/common/thread_unittest.cc b/system/common/thread_unittest.cc new file mode 100644 index 0000000000..678f34da78 --- /dev/null +++ b/system/common/thread_unittest.cc @@ -0,0 +1,99 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "thread.h" + +#include <sys/eventfd.h> + +#include "base/logging.h" +#include "gtest/gtest.h" +#include "reactor.h" + +namespace bluetooth { +namespace common { +namespace { + +constexpr int kCheckIsSameThread = 1; + +class SampleReactable { + public: + explicit SampleReactable(Thread* thread) : thread_(thread), fd_(eventfd(0, 0)), is_same_thread_checked_(false) { + EXPECT_NE(fd_, 0); + } + + ~SampleReactable() { + close(fd_); + } + + void OnReadReady() { + EXPECT_TRUE(thread_->IsSameThread()); + is_same_thread_checked_ = true; + uint64_t val; + eventfd_read(fd_, &val); + } + + bool IsSameThreadCheckDone() { + return is_same_thread_checked_; + } + + Thread* thread_; + int fd_; + bool is_same_thread_checked_; +}; + +class ThreadTest : public ::testing::Test { + protected: + void SetUp() override { + thread = new Thread("test", Thread::Priority::NORMAL); + } + + void TearDown() override { + delete thread; + } + Thread* thread = nullptr; +}; + +TEST_F(ThreadTest, just_stop_no_op) { + thread->Stop(); +} + +TEST_F(ThreadTest, thread_name) { + EXPECT_EQ(thread->GetThreadName(), "test"); +} + +TEST_F(ThreadTest, thread_to_string) { + EXPECT_NE(thread->ToString().find("test"), std::string::npos); +} + +TEST_F(ThreadTest, not_same_thread) { + EXPECT_FALSE(thread->IsSameThread()); +} + +TEST_F(ThreadTest, same_thread) { + Reactor* reactor = thread->GetReactor(); + SampleReactable sample_reactable(thread); + auto* reactable = + reactor->Register(sample_reactable.fd_, std::bind(&SampleReactable::OnReadReady, &sample_reactable), nullptr); + int fd = sample_reactable.fd_; + int write_result = eventfd_write(fd, kCheckIsSameThread); + EXPECT_EQ(write_result, 0); + while (!sample_reactable.IsSameThreadCheckDone()) std::this_thread::yield(); + reactor->Unregister(reactable); +} + +} // namespace +} // namespace common +} // namespace bluetooth diff --git a/system/common/utils.h b/system/common/utils.h new file mode 100644 index 0000000000..26e8f23df9 --- /dev/null +++ b/system/common/utils.h @@ -0,0 +1,31 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +// A macro to re-try a syscall when it receives EINTR +#ifndef RUN_NO_INTR +#define RUN_NO_INTR(fn) \ + do { \ + } while ((fn) == -1 && errno == EINTR) +#endif + +// A macro to disallow the copy constructor and operator= functions +#ifndef DISALLOW_COPY_AND_ASSIGN +#define DISALLOW_COPY_AND_ASSIGN(TypeName) \ + TypeName(const TypeName&) = delete; \ + void operator=(const TypeName&) = delete +#endif diff --git a/system/stack/Android.bp b/system/stack/Android.bp index 640441e930..dcf68b120d 100644 --- a/system/stack/Android.bp +++ b/system/stack/Android.bp @@ -202,6 +202,7 @@ cc_test { "test/stack_a2dp_test.cc", ], shared_libs: [ + "libcrypto", "libhidlbase", "liblog", "libprotobuf-cpp-lite", diff --git a/system/test/gen_coverage.py b/system/test/gen_coverage.py index eedc27aa31..d4399f4d05 100755 --- a/system/test/gen_coverage.py +++ b/system/test/gen_coverage.py @@ -79,6 +79,11 @@ COVERAGE_TESTS = [ "covered_files": [ "packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/packets", ], + }, { + "test_name": "bluetooth_test_common", + "covered_files": [ + "packages/modules/Bluetooth/system/common", + ], }, ] |