summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Marie Janssen <jamuraa@google.com> 2024-10-23 21:26:22 +0000
committer Chris Suter <csuter@google.com> 2025-02-10 14:11:06 -0800
commitf26a364d98314eca29aeda783a6129521e551bc6 (patch)
treed97bd0883091ecb8ad420070d9dc2cac00330e6f
parentab0891868513de52e36aa440ff7616b28e952bc6 (diff)
system/rust: Restructure and add logs for FFI
Restructure the GlobalModuleRegistry: - Rename to RustModuleRunner - Clarify states - Internalize the static global - Add a number of logs when error states occur This is reland of https://r.android.com/3318393 except that starting and stopping the module multiple times is now supported. Tested with `atest net_test_bluetooth` to verify the cause of revert has been fixed. Fix: 356462170 Test: m com.android.btservices Flag: EXEMPT refactor only Change-Id: I2aece397937e075a494b0b6e200492bb5664f46c
-rw-r--r--system/rust/src/core/mod.rs20
-rw-r--r--system/rust/src/lib.rs120
2 files changed, 78 insertions, 62 deletions
diff --git a/system/rust/src/core/mod.rs b/system/rust/src/core/mod.rs
index 3d25dae585..e4f4932b79 100644
--- a/system/rust/src/core/mod.rs
+++ b/system/rust/src/core/mod.rs
@@ -6,13 +6,13 @@ pub mod shared_box;
pub mod shared_mutex;
pub mod uuid;
-use std::{pin::Pin, rc::Rc, thread};
+use std::pin::Pin;
use cxx::UniquePtr;
use crate::{
gatt::ffi::{AttTransportImpl, GattCallbacksImpl},
- GlobalModuleRegistry, MainThreadTxMessage, GLOBAL_MODULE_REGISTRY,
+ RustModuleRunner,
};
use self::ffi::{future_ready, Future, GattServerCallbacks};
@@ -21,21 +21,11 @@ fn start(
gatt_server_callbacks: UniquePtr<GattServerCallbacks>,
on_started: Pin<&'static mut Future>,
) {
- thread::spawn(move || {
- GlobalModuleRegistry::start(
- Rc::new(GattCallbacksImpl(gatt_server_callbacks)),
- Rc::new(AttTransportImpl()),
- || {
- future_ready(on_started);
- },
- );
+ RustModuleRunner::start(GattCallbacksImpl(gatt_server_callbacks), AttTransportImpl(), || {
+ future_ready(on_started);
});
}
fn stop() {
- let _ = GLOBAL_MODULE_REGISTRY
- .try_lock()
- .unwrap()
- .as_ref()
- .map(|registry| registry.task_tx.send(MainThreadTxMessage::Stop));
+ RustModuleRunner::stop();
}
diff --git a/system/rust/src/lib.rs b/system/rust/src/lib.rs
index adfb665de3..3886ee4772 100644
--- a/system/rust/src/lib.rs
+++ b/system/rust/src/lib.rs
@@ -19,7 +19,7 @@ use gatt::{channel::AttTransport, GattCallbacks};
use log::{info, warn};
use tokio::task::LocalSet;
-use std::{rc::Rc, sync::Mutex};
+use std::{rc::Rc, sync::Mutex, thread::JoinHandle};
use tokio::runtime::Builder;
use tokio::sync::mpsc;
@@ -29,9 +29,15 @@ pub mod gatt;
pub mod packets;
pub mod utils;
-/// The owner of the main Rust thread on which all Rust modules run
-struct GlobalModuleRegistry {
- pub task_tx: MainThreadTx,
+/// The Rust Modules runner. Starts and processes messages from Java / C++
+/// while the Rust thread is running. Starts in an idle state.
+#[derive(Default, Debug)]
+enum RustModuleRunner {
+ #[default]
+ NotRunning,
+ /// Main event loop is running and messages can be processed. Use [`RustModuleRunner::send`] to
+ /// queue a callback to be sent.
+ Running { thread: JoinHandle<()>, tx: mpsc::UnboundedSender<BoxedMainThreadCallback> },
}
/// The ModuleViews lets us access all publicly accessible Rust modules from
@@ -46,32 +52,65 @@ pub struct ModuleViews<'a> {
pub gatt_module: &'a mut gatt::server::GattModule,
}
-static GLOBAL_MODULE_REGISTRY: Mutex<Option<GlobalModuleRegistry>> = Mutex::new(None);
+static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new());
+
+impl RustModuleRunner {
+ const fn new() -> Self {
+ Self::NotRunning
+ }
-impl GlobalModuleRegistry {
/// Handles bringup of all Rust modules. This occurs after GD C++ modules
/// have started, but before the legacy stack has initialized.
/// Must be invoked from the Rust thread after JNI initializes it and passes
/// in JNI modules.
pub fn start(
+ gatt_callbacks: impl GattCallbacks + Send + 'static,
+ att_transport: impl AttTransport + Send + 'static,
+ on_started: impl FnOnce() + Send + 'static,
+ ) {
+ let mut runner = GLOBAL_MODULE_RUNNER.lock().unwrap();
+
+ if let Self::Running { .. } = &*runner {
+ panic!("Already running");
+ }
+
+ let (tx, rx) = mpsc::unbounded_channel();
+ let thread = std::thread::spawn(move || {
+ RustModuleRunner::run(Rc::new(gatt_callbacks), Rc::new(att_transport), on_started, rx)
+ });
+
+ *runner = Self::Running { thread, tx };
+ }
+
+ /// Externally stop the global runner.
+ pub fn stop() {
+ match std::mem::replace(&mut *GLOBAL_MODULE_RUNNER.lock().unwrap(), Self::NotRunning) {
+ Self::NotRunning => warn!("Already not running"),
+ Self::Running { thread, tx } => {
+ // Dropping the send end of the channel should cause the runner to stop.
+ std::mem::drop(tx);
+
+ // Wait for the thread to terminate.
+ let _ = thread.join();
+ }
+ }
+ }
+
+ fn run(
gatt_callbacks: Rc<dyn GattCallbacks>,
att_transport: Rc<dyn AttTransport>,
on_started: impl FnOnce(),
+ mut rx: mpsc::UnboundedReceiver<BoxedMainThreadCallback>,
) {
info!("starting Rust modules");
+
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start tokio runtime");
let local = LocalSet::new();
- let (tx, mut rx) = mpsc::unbounded_channel();
- let prev_registry = GLOBAL_MODULE_REGISTRY.lock().unwrap().replace(Self { task_tx: tx });
-
- // initialization should only happen once
- assert!(prev_registry.is_none());
-
- // First, setup FFI and C++ modules
+ // Setup FFI and C++ modules
let arbiter = gatt::arbiter::initialize_arbiter();
// Now enter the runtime
@@ -95,55 +134,42 @@ impl GlobalModuleRegistry {
// This is the core event loop that serializes incoming requests into the Rust
// thread do_in_rust_thread lets us post into here from foreign
// threads
- info!("starting Tokio event loop");
- while let Some(message) = rx.recv().await {
- match message {
- MainThreadTxMessage::Callback(f) => f(&mut modules),
- MainThreadTxMessage::Stop => {
- break;
- }
- }
+ info!("starting event loop");
+ while let Some(f) = rx.recv().await {
+ f(&mut modules);
}
});
- warn!("Rust thread queue has stopped, shutting down executor thread");
- GLOBAL_MODULE_REGISTRY.lock().unwrap().take();
+
+ info!("RustModuleRunner has stopped, shutting down executor thread");
+
gatt::arbiter::clean_arbiter();
}
+
+ #[allow(dead_code)]
+ fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> {
+ match self {
+ Self::NotRunning => Err(("Not running".to_string(), f)),
+ Self::Running { tx, .. } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)),
+ }
+ }
}
type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>;
-enum MainThreadTxMessage {
- Callback(BoxedMainThreadCallback),
- Stop,
-}
-type MainThreadTx = mpsc::UnboundedSender<MainThreadTxMessage>;
-
-thread_local! {
- /// The TX end of a channel into the Rust thread, so external callers can
- /// access Rust modules. JNI / direct FFI should use do_in_rust_thread for
- /// convenience, but objects passed into C++ as callbacks should
- /// clone this channel to fail loudly if it's not yet initialized.
- ///
- /// This will be lazily initialized on first use from each client thread
- static MAIN_THREAD_TX: MainThreadTx =
- GLOBAL_MODULE_REGISTRY.lock().unwrap().as_ref().expect("stack not initialized").task_tx.clone();
-}
/// Posts a callback to the Rust thread and gives it access to public Rust
/// modules, used from JNI.
///
-/// Do not call this from Rust modules / the Rust thread! Instead, Rust modules
-/// should receive references to their dependent modules at startup. If passing
-/// callbacks into C++, don't use this method either - instead, acquire a clone
-/// of MAIN_THREAD_TX when the callback is created. This ensures that there
-/// never are "invalid" callbacks that may still work depending on when the
+/// Do not call this from Rust modules / the Rust thread! Instead, Rust modules should receive
+/// references to their dependent modules at startup. If passing callbacks into C++, don't use this
+/// method either - instead, acquire a clone of RustModule's `tx` when the callback is created. This
+/// ensures that there never are "invalid" callbacks that may still work depending on when the
/// GLOBAL_MODULE_REGISTRY is initialized.
pub fn do_in_rust_thread<F>(f: F)
where
F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,
{
- let ret = MAIN_THREAD_TX.with(|tx| tx.send(MainThreadTxMessage::Callback(Box::new(f))));
- if ret.is_err() {
- panic!("Rust call failed");
+ if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f))
+ {
+ panic!("Rust call failed: {s}");
}
}