diff options
author | 2025-01-30 21:39:58 -0800 | |
---|---|---|
committer | 2025-01-30 21:39:58 -0800 | |
commit | add1b56df349fefafd3763233be1ad5cc17e9310 (patch) | |
tree | 99b0844dd5427c1eba0be5a8f072a34a7cd0e553 | |
parent | b182e3dd6daaa2c2f5076e18128e07c6b8c3c6a9 (diff) |
Revert "system/rust: Restructure and add logs for FFI"
This reverts commit b182e3dd6daaa2c2f5076e18128e07c6b8c3c6a9.
Reason for revert: b/393481018
Change-Id: I26d3fe5ffc9cd7854e2e3b8e7b9226b976978bbb
-rw-r--r-- | system/rust/src/core/mod.rs | 10 | ||||
-rw-r--r-- | system/rust/src/lib.rs | 148 |
2 files changed, 50 insertions, 108 deletions
diff --git a/system/rust/src/core/mod.rs b/system/rust/src/core/mod.rs index d63fa55b4c..3d25dae585 100644 --- a/system/rust/src/core/mod.rs +++ b/system/rust/src/core/mod.rs @@ -12,7 +12,7 @@ use cxx::UniquePtr; use crate::{ gatt::ffi::{AttTransportImpl, GattCallbacksImpl}, - RustModuleRunner, + GlobalModuleRegistry, MainThreadTxMessage, GLOBAL_MODULE_REGISTRY, }; use self::ffi::{future_ready, Future, GattServerCallbacks}; @@ -22,7 +22,7 @@ fn start( on_started: Pin<&'static mut Future>, ) { thread::spawn(move || { - RustModuleRunner::run( + GlobalModuleRegistry::start( Rc::new(GattCallbacksImpl(gatt_server_callbacks)), Rc::new(AttTransportImpl()), || { @@ -33,5 +33,9 @@ fn start( } fn stop() { - RustModuleRunner::stop(); + let _ = GLOBAL_MODULE_REGISTRY + .try_lock() + .unwrap() + .as_ref() + .map(|registry| registry.task_tx.send(MainThreadTxMessage::Stop)); } diff --git a/system/rust/src/lib.rs b/system/rust/src/lib.rs index bcef07a97f..adfb665de3 100644 --- a/system/rust/src/lib.rs +++ b/system/rust/src/lib.rs @@ -16,7 +16,7 @@ //! dependency order. use gatt::{channel::AttTransport, GattCallbacks}; -use log::{error, info, warn}; +use log::{info, warn}; use tokio::task::LocalSet; use std::{rc::Rc, sync::Mutex}; @@ -29,21 +29,9 @@ pub mod gatt; pub mod packets; pub mod utils; -/// The Rust Modules runner. Starts and processes messages from Java / C++ -/// while the Rust thread is running. Starts in an idle state. -#[derive(Default, Debug)] -enum RustModuleRunner { - /// Not started yet - #[default] - NotStarted, - /// Main event loop is running and messages can be processed. - /// Use [`RustModuleRunner::send`] to queue a callback to be sent. - Running { tx: mpsc::UnboundedSender<BoxedMainThreadCallback> }, - /// The event loop has been asked to stop from the FFI interface and will stop when all - /// messages in the queue are processed. No further messages can be sent. - Stopping, - /// The event loop has ended. `result` holds an error if the thread ended not gracefully. - Ended { result: Result<(), String> }, +/// The owner of the main Rust thread on which all Rust modules run +struct GlobalModuleRegistry { + pub task_tx: MainThreadTx, } /// The ModuleViews lets us access all publicly accessible Rust modules from @@ -58,43 +46,36 @@ pub struct ModuleViews<'a> { pub gatt_module: &'a mut gatt::server::GattModule, } -static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new()); - -impl RustModuleRunner { - const fn new() -> Self { - Self::NotStarted - } +static GLOBAL_MODULE_REGISTRY: Mutex<Option<GlobalModuleRegistry>> = Mutex::new(None); +impl GlobalModuleRegistry { /// Handles bringup of all Rust modules. This occurs after GD C++ modules /// have started, but before the legacy stack has initialized. /// Must be invoked from the Rust thread after JNI initializes it and passes /// in JNI modules. - /// - /// This function can only be run once, if it is run more than once it will panic. - pub fn run( + pub fn start( gatt_callbacks: Rc<dyn GattCallbacks>, att_transport: Rc<dyn AttTransport>, on_started: impl FnOnce(), ) { info!("starting Rust modules"); - let mut main_thread_rx = match GLOBAL_MODULE_RUNNER.lock().unwrap().start() { - Ok(main_thread_rx) => main_thread_rx, - Err(reason) => { - error!("Cannot start rust modules: {reason}"); - panic!("Bluetooth Rust modules: {reason}"); - } - }; let rt = Builder::new_current_thread() .enable_all() .build() .expect("failed to start tokio runtime"); let local = LocalSet::new(); - // Setup FFI and C++ modules + let (tx, mut rx) = mpsc::unbounded_channel(); + let prev_registry = GLOBAL_MODULE_REGISTRY.lock().unwrap().replace(Self { task_tx: tx }); + + // initialization should only happen once + assert!(prev_registry.is_none()); + + // First, setup FFI and C++ modules let arbiter = gatt::arbiter::initialize_arbiter(); // Now enter the runtime - let result = local.block_on(&rt, async move { + local.block_on(&rt, async move { // Then follow the pure-Rust modules let gatt_incoming_callbacks = Rc::new(gatt::callbacks::CallbackTransactionManager::new(gatt_callbacks.clone())); @@ -114,81 +95,39 @@ impl RustModuleRunner { // This is the core event loop that serializes incoming requests into the Rust // thread do_in_rust_thread lets us post into here from foreign // threads - info!("starting event loop"); - while let Some(f) = main_thread_rx.recv().await { - f(&mut modules); + info!("starting Tokio event loop"); + while let Some(message) = rx.recv().await { + match message { + MainThreadTxMessage::Callback(f) => f(&mut modules), + MainThreadTxMessage::Stop => { + break; + } + } } - Ok(()) }); - warn!("RustModuleRunner has stopped, shutting down executor thread"); - - if let Err(e) = GLOBAL_MODULE_RUNNER.lock().unwrap().finished(result) { - warn!("failed to record runner finish: {e:?}"); - } - + warn!("Rust thread queue has stopped, shutting down executor thread"); + GLOBAL_MODULE_REGISTRY.lock().unwrap().take(); gatt::arbiter::clean_arbiter(); } - - /// Externally stop the global runner. - pub fn stop() { - GLOBAL_MODULE_RUNNER.lock().unwrap().shutdown(); - } - - #[allow(dead_code)] - fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> { - match self { - Self::NotStarted => Err(("Not started yet".to_string(), f)), - Self::Ended { .. } | Self::Stopping => Err(("Runner ended".to_string(), f)), - Self::Running { tx } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)), - } - } - - fn start(&mut self) -> Result<mpsc::UnboundedReceiver<BoxedMainThreadCallback>, String> { - match self { - Self::Running { .. } => { - return Err("Already started".to_string()); - } - Self::Ended { result } => { - return Err(format!("Already finished: {result:?}")); - } - Self::Stopping => { - return Err("Can't start, finishing".to_string()); - } - Self::NotStarted => {} - }; - - let (tx, rx) = mpsc::unbounded_channel(); - - *self = Self::Running { tx }; - Ok(rx) - } - - fn shutdown(&mut self) { - match std::mem::replace(self, Self::Stopping) { - Self::NotStarted => { - warn!("Runner being stopped when it hasn't been started"); - self.finished(Err("Never started".to_string())).unwrap(); - } - Self::Stopping => { - warn!("Asked to shutdown twice before stopped"); - } - Self::Ended { .. } | Self::Running { .. } => {} - } - } - - fn finished(&mut self, result: Result<(), String>) -> Result<(), String> { - match self { - Self::NotStarted => return Err("Not started".to_string()), - Self::Ended { result } => return Err(format!("Already finished with {result:?}")), - Self::Running { .. } | Self::Stopping => {} - } - - *self = Self::Ended { result }; - Ok(()) - } } type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>; +enum MainThreadTxMessage { + Callback(BoxedMainThreadCallback), + Stop, +} +type MainThreadTx = mpsc::UnboundedSender<MainThreadTxMessage>; + +thread_local! { + /// The TX end of a channel into the Rust thread, so external callers can + /// access Rust modules. JNI / direct FFI should use do_in_rust_thread for + /// convenience, but objects passed into C++ as callbacks should + /// clone this channel to fail loudly if it's not yet initialized. + /// + /// This will be lazily initialized on first use from each client thread + static MAIN_THREAD_TX: MainThreadTx = + GLOBAL_MODULE_REGISTRY.lock().unwrap().as_ref().expect("stack not initialized").task_tx.clone(); +} /// Posts a callback to the Rust thread and gives it access to public Rust /// modules, used from JNI. @@ -203,9 +142,8 @@ pub fn do_in_rust_thread<F>(f: F) where F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static, { - if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f)) - { - error!("Failed to do_in_rust_thread, panicking: {s}"); + let ret = MAIN_THREAD_TX.with(|tx| tx.send(MainThreadTxMessage::Callback(Box::new(f)))); + if ret.is_err() { panic!("Rust call failed"); } } |