summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Omair Kamil <okamil@google.com> 2025-01-30 17:29:33 -0800
committer Gerrit Code Review <noreply-gerritcodereview@google.com> 2025-01-30 17:29:33 -0800
commite4a9db4b71c297fa17272be4e2a3e8ab6a7f878e (patch)
tree942cdb094f3208bd3046bc8f6921796288bec6f7
parentc577dad4566ce47140b9e7ae01654acd0ae90a16 (diff)
parentb182e3dd6daaa2c2f5076e18128e07c6b8c3c6a9 (diff)
Merge "system/rust: Restructure and add logs for FFI" into main
-rw-r--r--system/rust/src/core/mod.rs10
-rw-r--r--system/rust/src/lib.rs148
2 files changed, 108 insertions, 50 deletions
diff --git a/system/rust/src/core/mod.rs b/system/rust/src/core/mod.rs
index 3d25dae585..d63fa55b4c 100644
--- a/system/rust/src/core/mod.rs
+++ b/system/rust/src/core/mod.rs
@@ -12,7 +12,7 @@ use cxx::UniquePtr;
use crate::{
gatt::ffi::{AttTransportImpl, GattCallbacksImpl},
- GlobalModuleRegistry, MainThreadTxMessage, GLOBAL_MODULE_REGISTRY,
+ RustModuleRunner,
};
use self::ffi::{future_ready, Future, GattServerCallbacks};
@@ -22,7 +22,7 @@ fn start(
on_started: Pin<&'static mut Future>,
) {
thread::spawn(move || {
- GlobalModuleRegistry::start(
+ RustModuleRunner::run(
Rc::new(GattCallbacksImpl(gatt_server_callbacks)),
Rc::new(AttTransportImpl()),
|| {
@@ -33,9 +33,5 @@ fn start(
}
fn stop() {
- let _ = GLOBAL_MODULE_REGISTRY
- .try_lock()
- .unwrap()
- .as_ref()
- .map(|registry| registry.task_tx.send(MainThreadTxMessage::Stop));
+ RustModuleRunner::stop();
}
diff --git a/system/rust/src/lib.rs b/system/rust/src/lib.rs
index adfb665de3..bcef07a97f 100644
--- a/system/rust/src/lib.rs
+++ b/system/rust/src/lib.rs
@@ -16,7 +16,7 @@
//! dependency order.
use gatt::{channel::AttTransport, GattCallbacks};
-use log::{info, warn};
+use log::{error, info, warn};
use tokio::task::LocalSet;
use std::{rc::Rc, sync::Mutex};
@@ -29,9 +29,21 @@ pub mod gatt;
pub mod packets;
pub mod utils;
-/// The owner of the main Rust thread on which all Rust modules run
-struct GlobalModuleRegistry {
- pub task_tx: MainThreadTx,
+/// The Rust Modules runner. Starts and processes messages from Java / C++
+/// while the Rust thread is running. Starts in an idle state.
+#[derive(Default, Debug)]
+enum RustModuleRunner {
+ /// Not started yet
+ #[default]
+ NotStarted,
+ /// Main event loop is running and messages can be processed.
+ /// Use [`RustModuleRunner::send`] to queue a callback to be sent.
+ Running { tx: mpsc::UnboundedSender<BoxedMainThreadCallback> },
+ /// The event loop has been asked to stop from the FFI interface and will stop when all
+ /// messages in the queue are processed. No further messages can be sent.
+ Stopping,
+ /// The event loop has ended. `result` holds an error if the thread ended not gracefully.
+ Ended { result: Result<(), String> },
}
/// The ModuleViews lets us access all publicly accessible Rust modules from
@@ -46,36 +58,43 @@ pub struct ModuleViews<'a> {
pub gatt_module: &'a mut gatt::server::GattModule,
}
-static GLOBAL_MODULE_REGISTRY: Mutex<Option<GlobalModuleRegistry>> = Mutex::new(None);
+static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new());
+
+impl RustModuleRunner {
+ const fn new() -> Self {
+ Self::NotStarted
+ }
-impl GlobalModuleRegistry {
/// Handles bringup of all Rust modules. This occurs after GD C++ modules
/// have started, but before the legacy stack has initialized.
/// Must be invoked from the Rust thread after JNI initializes it and passes
/// in JNI modules.
- pub fn start(
+ ///
+ /// This function can only be run once, if it is run more than once it will panic.
+ pub fn run(
gatt_callbacks: Rc<dyn GattCallbacks>,
att_transport: Rc<dyn AttTransport>,
on_started: impl FnOnce(),
) {
info!("starting Rust modules");
+ let mut main_thread_rx = match GLOBAL_MODULE_RUNNER.lock().unwrap().start() {
+ Ok(main_thread_rx) => main_thread_rx,
+ Err(reason) => {
+ error!("Cannot start rust modules: {reason}");
+ panic!("Bluetooth Rust modules: {reason}");
+ }
+ };
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start tokio runtime");
let local = LocalSet::new();
- let (tx, mut rx) = mpsc::unbounded_channel();
- let prev_registry = GLOBAL_MODULE_REGISTRY.lock().unwrap().replace(Self { task_tx: tx });
-
- // initialization should only happen once
- assert!(prev_registry.is_none());
-
- // First, setup FFI and C++ modules
+ // Setup FFI and C++ modules
let arbiter = gatt::arbiter::initialize_arbiter();
// Now enter the runtime
- local.block_on(&rt, async move {
+ let result = local.block_on(&rt, async move {
// Then follow the pure-Rust modules
let gatt_incoming_callbacks =
Rc::new(gatt::callbacks::CallbackTransactionManager::new(gatt_callbacks.clone()));
@@ -95,40 +114,82 @@ impl GlobalModuleRegistry {
// This is the core event loop that serializes incoming requests into the Rust
// thread do_in_rust_thread lets us post into here from foreign
// threads
- info!("starting Tokio event loop");
- while let Some(message) = rx.recv().await {
- match message {
- MainThreadTxMessage::Callback(f) => f(&mut modules),
- MainThreadTxMessage::Stop => {
- break;
- }
- }
+ info!("starting event loop");
+ while let Some(f) = main_thread_rx.recv().await {
+ f(&mut modules);
}
+ Ok(())
});
- warn!("Rust thread queue has stopped, shutting down executor thread");
- GLOBAL_MODULE_REGISTRY.lock().unwrap().take();
+ warn!("RustModuleRunner has stopped, shutting down executor thread");
+
+ if let Err(e) = GLOBAL_MODULE_RUNNER.lock().unwrap().finished(result) {
+ warn!("failed to record runner finish: {e:?}");
+ }
+
gatt::arbiter::clean_arbiter();
}
-}
-type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>;
-enum MainThreadTxMessage {
- Callback(BoxedMainThreadCallback),
- Stop,
-}
-type MainThreadTx = mpsc::UnboundedSender<MainThreadTxMessage>;
+ /// Externally stop the global runner.
+ pub fn stop() {
+ GLOBAL_MODULE_RUNNER.lock().unwrap().shutdown();
+ }
-thread_local! {
- /// The TX end of a channel into the Rust thread, so external callers can
- /// access Rust modules. JNI / direct FFI should use do_in_rust_thread for
- /// convenience, but objects passed into C++ as callbacks should
- /// clone this channel to fail loudly if it's not yet initialized.
- ///
- /// This will be lazily initialized on first use from each client thread
- static MAIN_THREAD_TX: MainThreadTx =
- GLOBAL_MODULE_REGISTRY.lock().unwrap().as_ref().expect("stack not initialized").task_tx.clone();
+ #[allow(dead_code)]
+ fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> {
+ match self {
+ Self::NotStarted => Err(("Not started yet".to_string(), f)),
+ Self::Ended { .. } | Self::Stopping => Err(("Runner ended".to_string(), f)),
+ Self::Running { tx } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)),
+ }
+ }
+
+ fn start(&mut self) -> Result<mpsc::UnboundedReceiver<BoxedMainThreadCallback>, String> {
+ match self {
+ Self::Running { .. } => {
+ return Err("Already started".to_string());
+ }
+ Self::Ended { result } => {
+ return Err(format!("Already finished: {result:?}"));
+ }
+ Self::Stopping => {
+ return Err("Can't start, finishing".to_string());
+ }
+ Self::NotStarted => {}
+ };
+
+ let (tx, rx) = mpsc::unbounded_channel();
+
+ *self = Self::Running { tx };
+ Ok(rx)
+ }
+
+ fn shutdown(&mut self) {
+ match std::mem::replace(self, Self::Stopping) {
+ Self::NotStarted => {
+ warn!("Runner being stopped when it hasn't been started");
+ self.finished(Err("Never started".to_string())).unwrap();
+ }
+ Self::Stopping => {
+ warn!("Asked to shutdown twice before stopped");
+ }
+ Self::Ended { .. } | Self::Running { .. } => {}
+ }
+ }
+
+ fn finished(&mut self, result: Result<(), String>) -> Result<(), String> {
+ match self {
+ Self::NotStarted => return Err("Not started".to_string()),
+ Self::Ended { result } => return Err(format!("Already finished with {result:?}")),
+ Self::Running { .. } | Self::Stopping => {}
+ }
+
+ *self = Self::Ended { result };
+ Ok(())
+ }
}
+type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>;
+
/// Posts a callback to the Rust thread and gives it access to public Rust
/// modules, used from JNI.
///
@@ -142,8 +203,9 @@ pub fn do_in_rust_thread<F>(f: F)
where
F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,
{
- let ret = MAIN_THREAD_TX.with(|tx| tx.send(MainThreadTxMessage::Callback(Box::new(f))));
- if ret.is_err() {
+ if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f))
+ {
+ error!("Failed to do_in_rust_thread, panicking: {s}");
panic!("Rust call failed");
}
}