diff options
author | 2025-01-30 17:29:33 -0800 | |
---|---|---|
committer | 2025-01-30 17:29:33 -0800 | |
commit | e4a9db4b71c297fa17272be4e2a3e8ab6a7f878e (patch) | |
tree | 942cdb094f3208bd3046bc8f6921796288bec6f7 | |
parent | c577dad4566ce47140b9e7ae01654acd0ae90a16 (diff) | |
parent | b182e3dd6daaa2c2f5076e18128e07c6b8c3c6a9 (diff) |
Merge "system/rust: Restructure and add logs for FFI" into main
-rw-r--r-- | system/rust/src/core/mod.rs | 10 | ||||
-rw-r--r-- | system/rust/src/lib.rs | 148 |
2 files changed, 108 insertions, 50 deletions
diff --git a/system/rust/src/core/mod.rs b/system/rust/src/core/mod.rs index 3d25dae585..d63fa55b4c 100644 --- a/system/rust/src/core/mod.rs +++ b/system/rust/src/core/mod.rs @@ -12,7 +12,7 @@ use cxx::UniquePtr; use crate::{ gatt::ffi::{AttTransportImpl, GattCallbacksImpl}, - GlobalModuleRegistry, MainThreadTxMessage, GLOBAL_MODULE_REGISTRY, + RustModuleRunner, }; use self::ffi::{future_ready, Future, GattServerCallbacks}; @@ -22,7 +22,7 @@ fn start( on_started: Pin<&'static mut Future>, ) { thread::spawn(move || { - GlobalModuleRegistry::start( + RustModuleRunner::run( Rc::new(GattCallbacksImpl(gatt_server_callbacks)), Rc::new(AttTransportImpl()), || { @@ -33,9 +33,5 @@ fn start( } fn stop() { - let _ = GLOBAL_MODULE_REGISTRY - .try_lock() - .unwrap() - .as_ref() - .map(|registry| registry.task_tx.send(MainThreadTxMessage::Stop)); + RustModuleRunner::stop(); } diff --git a/system/rust/src/lib.rs b/system/rust/src/lib.rs index adfb665de3..bcef07a97f 100644 --- a/system/rust/src/lib.rs +++ b/system/rust/src/lib.rs @@ -16,7 +16,7 @@ //! dependency order. use gatt::{channel::AttTransport, GattCallbacks}; -use log::{info, warn}; +use log::{error, info, warn}; use tokio::task::LocalSet; use std::{rc::Rc, sync::Mutex}; @@ -29,9 +29,21 @@ pub mod gatt; pub mod packets; pub mod utils; -/// The owner of the main Rust thread on which all Rust modules run -struct GlobalModuleRegistry { - pub task_tx: MainThreadTx, +/// The Rust Modules runner. Starts and processes messages from Java / C++ +/// while the Rust thread is running. Starts in an idle state. +#[derive(Default, Debug)] +enum RustModuleRunner { + /// Not started yet + #[default] + NotStarted, + /// Main event loop is running and messages can be processed. + /// Use [`RustModuleRunner::send`] to queue a callback to be sent. + Running { tx: mpsc::UnboundedSender<BoxedMainThreadCallback> }, + /// The event loop has been asked to stop from the FFI interface and will stop when all + /// messages in the queue are processed. No further messages can be sent. + Stopping, + /// The event loop has ended. `result` holds an error if the thread ended not gracefully. + Ended { result: Result<(), String> }, } /// The ModuleViews lets us access all publicly accessible Rust modules from @@ -46,36 +58,43 @@ pub struct ModuleViews<'a> { pub gatt_module: &'a mut gatt::server::GattModule, } -static GLOBAL_MODULE_REGISTRY: Mutex<Option<GlobalModuleRegistry>> = Mutex::new(None); +static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new()); + +impl RustModuleRunner { + const fn new() -> Self { + Self::NotStarted + } -impl GlobalModuleRegistry { /// Handles bringup of all Rust modules. This occurs after GD C++ modules /// have started, but before the legacy stack has initialized. /// Must be invoked from the Rust thread after JNI initializes it and passes /// in JNI modules. - pub fn start( + /// + /// This function can only be run once, if it is run more than once it will panic. + pub fn run( gatt_callbacks: Rc<dyn GattCallbacks>, att_transport: Rc<dyn AttTransport>, on_started: impl FnOnce(), ) { info!("starting Rust modules"); + let mut main_thread_rx = match GLOBAL_MODULE_RUNNER.lock().unwrap().start() { + Ok(main_thread_rx) => main_thread_rx, + Err(reason) => { + error!("Cannot start rust modules: {reason}"); + panic!("Bluetooth Rust modules: {reason}"); + } + }; let rt = Builder::new_current_thread() .enable_all() .build() .expect("failed to start tokio runtime"); let local = LocalSet::new(); - let (tx, mut rx) = mpsc::unbounded_channel(); - let prev_registry = GLOBAL_MODULE_REGISTRY.lock().unwrap().replace(Self { task_tx: tx }); - - // initialization should only happen once - assert!(prev_registry.is_none()); - - // First, setup FFI and C++ modules + // Setup FFI and C++ modules let arbiter = gatt::arbiter::initialize_arbiter(); // Now enter the runtime - local.block_on(&rt, async move { + let result = local.block_on(&rt, async move { // Then follow the pure-Rust modules let gatt_incoming_callbacks = Rc::new(gatt::callbacks::CallbackTransactionManager::new(gatt_callbacks.clone())); @@ -95,40 +114,82 @@ impl GlobalModuleRegistry { // This is the core event loop that serializes incoming requests into the Rust // thread do_in_rust_thread lets us post into here from foreign // threads - info!("starting Tokio event loop"); - while let Some(message) = rx.recv().await { - match message { - MainThreadTxMessage::Callback(f) => f(&mut modules), - MainThreadTxMessage::Stop => { - break; - } - } + info!("starting event loop"); + while let Some(f) = main_thread_rx.recv().await { + f(&mut modules); } + Ok(()) }); - warn!("Rust thread queue has stopped, shutting down executor thread"); - GLOBAL_MODULE_REGISTRY.lock().unwrap().take(); + warn!("RustModuleRunner has stopped, shutting down executor thread"); + + if let Err(e) = GLOBAL_MODULE_RUNNER.lock().unwrap().finished(result) { + warn!("failed to record runner finish: {e:?}"); + } + gatt::arbiter::clean_arbiter(); } -} -type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>; -enum MainThreadTxMessage { - Callback(BoxedMainThreadCallback), - Stop, -} -type MainThreadTx = mpsc::UnboundedSender<MainThreadTxMessage>; + /// Externally stop the global runner. + pub fn stop() { + GLOBAL_MODULE_RUNNER.lock().unwrap().shutdown(); + } -thread_local! { - /// The TX end of a channel into the Rust thread, so external callers can - /// access Rust modules. JNI / direct FFI should use do_in_rust_thread for - /// convenience, but objects passed into C++ as callbacks should - /// clone this channel to fail loudly if it's not yet initialized. - /// - /// This will be lazily initialized on first use from each client thread - static MAIN_THREAD_TX: MainThreadTx = - GLOBAL_MODULE_REGISTRY.lock().unwrap().as_ref().expect("stack not initialized").task_tx.clone(); + #[allow(dead_code)] + fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> { + match self { + Self::NotStarted => Err(("Not started yet".to_string(), f)), + Self::Ended { .. } | Self::Stopping => Err(("Runner ended".to_string(), f)), + Self::Running { tx } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)), + } + } + + fn start(&mut self) -> Result<mpsc::UnboundedReceiver<BoxedMainThreadCallback>, String> { + match self { + Self::Running { .. } => { + return Err("Already started".to_string()); + } + Self::Ended { result } => { + return Err(format!("Already finished: {result:?}")); + } + Self::Stopping => { + return Err("Can't start, finishing".to_string()); + } + Self::NotStarted => {} + }; + + let (tx, rx) = mpsc::unbounded_channel(); + + *self = Self::Running { tx }; + Ok(rx) + } + + fn shutdown(&mut self) { + match std::mem::replace(self, Self::Stopping) { + Self::NotStarted => { + warn!("Runner being stopped when it hasn't been started"); + self.finished(Err("Never started".to_string())).unwrap(); + } + Self::Stopping => { + warn!("Asked to shutdown twice before stopped"); + } + Self::Ended { .. } | Self::Running { .. } => {} + } + } + + fn finished(&mut self, result: Result<(), String>) -> Result<(), String> { + match self { + Self::NotStarted => return Err("Not started".to_string()), + Self::Ended { result } => return Err(format!("Already finished with {result:?}")), + Self::Running { .. } | Self::Stopping => {} + } + + *self = Self::Ended { result }; + Ok(()) + } } +type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>; + /// Posts a callback to the Rust thread and gives it access to public Rust /// modules, used from JNI. /// @@ -142,8 +203,9 @@ pub fn do_in_rust_thread<F>(f: F) where F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static, { - let ret = MAIN_THREAD_TX.with(|tx| tx.send(MainThreadTxMessage::Callback(Box::new(f)))); - if ret.is_err() { + if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f)) + { + error!("Failed to do_in_rust_thread, panicking: {s}"); panic!("Rust call failed"); } } |