diff options
author | 2024-10-23 21:26:22 +0000 | |
---|---|---|
committer | 2025-02-10 14:11:06 -0800 | |
commit | f26a364d98314eca29aeda783a6129521e551bc6 (patch) | |
tree | d97bd0883091ecb8ad420070d9dc2cac00330e6f | |
parent | ab0891868513de52e36aa440ff7616b28e952bc6 (diff) |
system/rust: Restructure and add logs for FFI
Restructure the GlobalModuleRegistry:
- Rename to RustModuleRunner
- Clarify states
- Internalize the static global
- Add a number of logs when error states occur
This is reland of https://r.android.com/3318393 except that starting and
stopping the module multiple times is now supported.
Tested with `atest net_test_bluetooth` to verify the cause of revert has
been fixed.
Fix: 356462170
Test: m com.android.btservices
Flag: EXEMPT refactor only
Change-Id: I2aece397937e075a494b0b6e200492bb5664f46c
-rw-r--r-- | system/rust/src/core/mod.rs | 20 | ||||
-rw-r--r-- | system/rust/src/lib.rs | 120 |
2 files changed, 78 insertions, 62 deletions
diff --git a/system/rust/src/core/mod.rs b/system/rust/src/core/mod.rs index 3d25dae585..e4f4932b79 100644 --- a/system/rust/src/core/mod.rs +++ b/system/rust/src/core/mod.rs @@ -6,13 +6,13 @@ pub mod shared_box; pub mod shared_mutex; pub mod uuid; -use std::{pin::Pin, rc::Rc, thread}; +use std::pin::Pin; use cxx::UniquePtr; use crate::{ gatt::ffi::{AttTransportImpl, GattCallbacksImpl}, - GlobalModuleRegistry, MainThreadTxMessage, GLOBAL_MODULE_REGISTRY, + RustModuleRunner, }; use self::ffi::{future_ready, Future, GattServerCallbacks}; @@ -21,21 +21,11 @@ fn start( gatt_server_callbacks: UniquePtr<GattServerCallbacks>, on_started: Pin<&'static mut Future>, ) { - thread::spawn(move || { - GlobalModuleRegistry::start( - Rc::new(GattCallbacksImpl(gatt_server_callbacks)), - Rc::new(AttTransportImpl()), - || { - future_ready(on_started); - }, - ); + RustModuleRunner::start(GattCallbacksImpl(gatt_server_callbacks), AttTransportImpl(), || { + future_ready(on_started); }); } fn stop() { - let _ = GLOBAL_MODULE_REGISTRY - .try_lock() - .unwrap() - .as_ref() - .map(|registry| registry.task_tx.send(MainThreadTxMessage::Stop)); + RustModuleRunner::stop(); } diff --git a/system/rust/src/lib.rs b/system/rust/src/lib.rs index adfb665de3..3886ee4772 100644 --- a/system/rust/src/lib.rs +++ b/system/rust/src/lib.rs @@ -19,7 +19,7 @@ use gatt::{channel::AttTransport, GattCallbacks}; use log::{info, warn}; use tokio::task::LocalSet; -use std::{rc::Rc, sync::Mutex}; +use std::{rc::Rc, sync::Mutex, thread::JoinHandle}; use tokio::runtime::Builder; use tokio::sync::mpsc; @@ -29,9 +29,15 @@ pub mod gatt; pub mod packets; pub mod utils; -/// The owner of the main Rust thread on which all Rust modules run -struct GlobalModuleRegistry { - pub task_tx: MainThreadTx, +/// The Rust Modules runner. Starts and processes messages from Java / C++ +/// while the Rust thread is running. Starts in an idle state. +#[derive(Default, Debug)] +enum RustModuleRunner { + #[default] + NotRunning, + /// Main event loop is running and messages can be processed. Use [`RustModuleRunner::send`] to + /// queue a callback to be sent. + Running { thread: JoinHandle<()>, tx: mpsc::UnboundedSender<BoxedMainThreadCallback> }, } /// The ModuleViews lets us access all publicly accessible Rust modules from @@ -46,32 +52,65 @@ pub struct ModuleViews<'a> { pub gatt_module: &'a mut gatt::server::GattModule, } -static GLOBAL_MODULE_REGISTRY: Mutex<Option<GlobalModuleRegistry>> = Mutex::new(None); +static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new()); + +impl RustModuleRunner { + const fn new() -> Self { + Self::NotRunning + } -impl GlobalModuleRegistry { /// Handles bringup of all Rust modules. This occurs after GD C++ modules /// have started, but before the legacy stack has initialized. /// Must be invoked from the Rust thread after JNI initializes it and passes /// in JNI modules. pub fn start( + gatt_callbacks: impl GattCallbacks + Send + 'static, + att_transport: impl AttTransport + Send + 'static, + on_started: impl FnOnce() + Send + 'static, + ) { + let mut runner = GLOBAL_MODULE_RUNNER.lock().unwrap(); + + if let Self::Running { .. } = &*runner { + panic!("Already running"); + } + + let (tx, rx) = mpsc::unbounded_channel(); + let thread = std::thread::spawn(move || { + RustModuleRunner::run(Rc::new(gatt_callbacks), Rc::new(att_transport), on_started, rx) + }); + + *runner = Self::Running { thread, tx }; + } + + /// Externally stop the global runner. + pub fn stop() { + match std::mem::replace(&mut *GLOBAL_MODULE_RUNNER.lock().unwrap(), Self::NotRunning) { + Self::NotRunning => warn!("Already not running"), + Self::Running { thread, tx } => { + // Dropping the send end of the channel should cause the runner to stop. + std::mem::drop(tx); + + // Wait for the thread to terminate. + let _ = thread.join(); + } + } + } + + fn run( gatt_callbacks: Rc<dyn GattCallbacks>, att_transport: Rc<dyn AttTransport>, on_started: impl FnOnce(), + mut rx: mpsc::UnboundedReceiver<BoxedMainThreadCallback>, ) { info!("starting Rust modules"); + let rt = Builder::new_current_thread() .enable_all() .build() .expect("failed to start tokio runtime"); let local = LocalSet::new(); - let (tx, mut rx) = mpsc::unbounded_channel(); - let prev_registry = GLOBAL_MODULE_REGISTRY.lock().unwrap().replace(Self { task_tx: tx }); - - // initialization should only happen once - assert!(prev_registry.is_none()); - - // First, setup FFI and C++ modules + // Setup FFI and C++ modules let arbiter = gatt::arbiter::initialize_arbiter(); // Now enter the runtime @@ -95,55 +134,42 @@ impl GlobalModuleRegistry { // This is the core event loop that serializes incoming requests into the Rust // thread do_in_rust_thread lets us post into here from foreign // threads - info!("starting Tokio event loop"); - while let Some(message) = rx.recv().await { - match message { - MainThreadTxMessage::Callback(f) => f(&mut modules), - MainThreadTxMessage::Stop => { - break; - } - } + info!("starting event loop"); + while let Some(f) = rx.recv().await { + f(&mut modules); } }); - warn!("Rust thread queue has stopped, shutting down executor thread"); - GLOBAL_MODULE_REGISTRY.lock().unwrap().take(); + + info!("RustModuleRunner has stopped, shutting down executor thread"); + gatt::arbiter::clean_arbiter(); } + + #[allow(dead_code)] + fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> { + match self { + Self::NotRunning => Err(("Not running".to_string(), f)), + Self::Running { tx, .. } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)), + } + } } type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>; -enum MainThreadTxMessage { - Callback(BoxedMainThreadCallback), - Stop, -} -type MainThreadTx = mpsc::UnboundedSender<MainThreadTxMessage>; - -thread_local! { - /// The TX end of a channel into the Rust thread, so external callers can - /// access Rust modules. JNI / direct FFI should use do_in_rust_thread for - /// convenience, but objects passed into C++ as callbacks should - /// clone this channel to fail loudly if it's not yet initialized. - /// - /// This will be lazily initialized on first use from each client thread - static MAIN_THREAD_TX: MainThreadTx = - GLOBAL_MODULE_REGISTRY.lock().unwrap().as_ref().expect("stack not initialized").task_tx.clone(); -} /// Posts a callback to the Rust thread and gives it access to public Rust /// modules, used from JNI. /// -/// Do not call this from Rust modules / the Rust thread! Instead, Rust modules -/// should receive references to their dependent modules at startup. If passing -/// callbacks into C++, don't use this method either - instead, acquire a clone -/// of MAIN_THREAD_TX when the callback is created. This ensures that there -/// never are "invalid" callbacks that may still work depending on when the +/// Do not call this from Rust modules / the Rust thread! Instead, Rust modules should receive +/// references to their dependent modules at startup. If passing callbacks into C++, don't use this +/// method either - instead, acquire a clone of RustModule's `tx` when the callback is created. This +/// ensures that there never are "invalid" callbacks that may still work depending on when the /// GLOBAL_MODULE_REGISTRY is initialized. pub fn do_in_rust_thread<F>(f: F) where F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static, { - let ret = MAIN_THREAD_TX.with(|tx| tx.send(MainThreadTxMessage::Callback(Box::new(f)))); - if ret.is_err() { - panic!("Rust call failed"); + if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f)) + { + panic!("Rust call failed: {s}"); } } |