summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Omair Kamil <okamil@google.com> 2025-01-30 21:39:58 -0800
committer Gerrit Code Review <noreply-gerritcodereview@google.com> 2025-01-30 21:39:58 -0800
commitadd1b56df349fefafd3763233be1ad5cc17e9310 (patch)
tree99b0844dd5427c1eba0be5a8f072a34a7cd0e553
parentb182e3dd6daaa2c2f5076e18128e07c6b8c3c6a9 (diff)
Revert "system/rust: Restructure and add logs for FFI"
This reverts commit b182e3dd6daaa2c2f5076e18128e07c6b8c3c6a9. Reason for revert: b/393481018 Change-Id: I26d3fe5ffc9cd7854e2e3b8e7b9226b976978bbb
-rw-r--r--system/rust/src/core/mod.rs10
-rw-r--r--system/rust/src/lib.rs148
2 files changed, 50 insertions, 108 deletions
diff --git a/system/rust/src/core/mod.rs b/system/rust/src/core/mod.rs
index d63fa55b4c..3d25dae585 100644
--- a/system/rust/src/core/mod.rs
+++ b/system/rust/src/core/mod.rs
@@ -12,7 +12,7 @@ use cxx::UniquePtr;
use crate::{
gatt::ffi::{AttTransportImpl, GattCallbacksImpl},
- RustModuleRunner,
+ GlobalModuleRegistry, MainThreadTxMessage, GLOBAL_MODULE_REGISTRY,
};
use self::ffi::{future_ready, Future, GattServerCallbacks};
@@ -22,7 +22,7 @@ fn start(
on_started: Pin<&'static mut Future>,
) {
thread::spawn(move || {
- RustModuleRunner::run(
+ GlobalModuleRegistry::start(
Rc::new(GattCallbacksImpl(gatt_server_callbacks)),
Rc::new(AttTransportImpl()),
|| {
@@ -33,5 +33,9 @@ fn start(
}
fn stop() {
- RustModuleRunner::stop();
+ let _ = GLOBAL_MODULE_REGISTRY
+ .try_lock()
+ .unwrap()
+ .as_ref()
+ .map(|registry| registry.task_tx.send(MainThreadTxMessage::Stop));
}
diff --git a/system/rust/src/lib.rs b/system/rust/src/lib.rs
index bcef07a97f..adfb665de3 100644
--- a/system/rust/src/lib.rs
+++ b/system/rust/src/lib.rs
@@ -16,7 +16,7 @@
//! dependency order.
use gatt::{channel::AttTransport, GattCallbacks};
-use log::{error, info, warn};
+use log::{info, warn};
use tokio::task::LocalSet;
use std::{rc::Rc, sync::Mutex};
@@ -29,21 +29,9 @@ pub mod gatt;
pub mod packets;
pub mod utils;
-/// The Rust Modules runner. Starts and processes messages from Java / C++
-/// while the Rust thread is running. Starts in an idle state.
-#[derive(Default, Debug)]
-enum RustModuleRunner {
- /// Not started yet
- #[default]
- NotStarted,
- /// Main event loop is running and messages can be processed.
- /// Use [`RustModuleRunner::send`] to queue a callback to be sent.
- Running { tx: mpsc::UnboundedSender<BoxedMainThreadCallback> },
- /// The event loop has been asked to stop from the FFI interface and will stop when all
- /// messages in the queue are processed. No further messages can be sent.
- Stopping,
- /// The event loop has ended. `result` holds an error if the thread ended not gracefully.
- Ended { result: Result<(), String> },
+/// The owner of the main Rust thread on which all Rust modules run
+struct GlobalModuleRegistry {
+ pub task_tx: MainThreadTx,
}
/// The ModuleViews lets us access all publicly accessible Rust modules from
@@ -58,43 +46,36 @@ pub struct ModuleViews<'a> {
pub gatt_module: &'a mut gatt::server::GattModule,
}
-static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new());
-
-impl RustModuleRunner {
- const fn new() -> Self {
- Self::NotStarted
- }
+static GLOBAL_MODULE_REGISTRY: Mutex<Option<GlobalModuleRegistry>> = Mutex::new(None);
+impl GlobalModuleRegistry {
/// Handles bringup of all Rust modules. This occurs after GD C++ modules
/// have started, but before the legacy stack has initialized.
/// Must be invoked from the Rust thread after JNI initializes it and passes
/// in JNI modules.
- ///
- /// This function can only be run once, if it is run more than once it will panic.
- pub fn run(
+ pub fn start(
gatt_callbacks: Rc<dyn GattCallbacks>,
att_transport: Rc<dyn AttTransport>,
on_started: impl FnOnce(),
) {
info!("starting Rust modules");
- let mut main_thread_rx = match GLOBAL_MODULE_RUNNER.lock().unwrap().start() {
- Ok(main_thread_rx) => main_thread_rx,
- Err(reason) => {
- error!("Cannot start rust modules: {reason}");
- panic!("Bluetooth Rust modules: {reason}");
- }
- };
let rt = Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start tokio runtime");
let local = LocalSet::new();
- // Setup FFI and C++ modules
+ let (tx, mut rx) = mpsc::unbounded_channel();
+ let prev_registry = GLOBAL_MODULE_REGISTRY.lock().unwrap().replace(Self { task_tx: tx });
+
+ // initialization should only happen once
+ assert!(prev_registry.is_none());
+
+ // First, setup FFI and C++ modules
let arbiter = gatt::arbiter::initialize_arbiter();
// Now enter the runtime
- let result = local.block_on(&rt, async move {
+ local.block_on(&rt, async move {
// Then follow the pure-Rust modules
let gatt_incoming_callbacks =
Rc::new(gatt::callbacks::CallbackTransactionManager::new(gatt_callbacks.clone()));
@@ -114,81 +95,39 @@ impl RustModuleRunner {
// This is the core event loop that serializes incoming requests into the Rust
// thread do_in_rust_thread lets us post into here from foreign
// threads
- info!("starting event loop");
- while let Some(f) = main_thread_rx.recv().await {
- f(&mut modules);
+ info!("starting Tokio event loop");
+ while let Some(message) = rx.recv().await {
+ match message {
+ MainThreadTxMessage::Callback(f) => f(&mut modules),
+ MainThreadTxMessage::Stop => {
+ break;
+ }
+ }
}
- Ok(())
});
- warn!("RustModuleRunner has stopped, shutting down executor thread");
-
- if let Err(e) = GLOBAL_MODULE_RUNNER.lock().unwrap().finished(result) {
- warn!("failed to record runner finish: {e:?}");
- }
-
+ warn!("Rust thread queue has stopped, shutting down executor thread");
+ GLOBAL_MODULE_REGISTRY.lock().unwrap().take();
gatt::arbiter::clean_arbiter();
}
-
- /// Externally stop the global runner.
- pub fn stop() {
- GLOBAL_MODULE_RUNNER.lock().unwrap().shutdown();
- }
-
- #[allow(dead_code)]
- fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> {
- match self {
- Self::NotStarted => Err(("Not started yet".to_string(), f)),
- Self::Ended { .. } | Self::Stopping => Err(("Runner ended".to_string(), f)),
- Self::Running { tx } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)),
- }
- }
-
- fn start(&mut self) -> Result<mpsc::UnboundedReceiver<BoxedMainThreadCallback>, String> {
- match self {
- Self::Running { .. } => {
- return Err("Already started".to_string());
- }
- Self::Ended { result } => {
- return Err(format!("Already finished: {result:?}"));
- }
- Self::Stopping => {
- return Err("Can't start, finishing".to_string());
- }
- Self::NotStarted => {}
- };
-
- let (tx, rx) = mpsc::unbounded_channel();
-
- *self = Self::Running { tx };
- Ok(rx)
- }
-
- fn shutdown(&mut self) {
- match std::mem::replace(self, Self::Stopping) {
- Self::NotStarted => {
- warn!("Runner being stopped when it hasn't been started");
- self.finished(Err("Never started".to_string())).unwrap();
- }
- Self::Stopping => {
- warn!("Asked to shutdown twice before stopped");
- }
- Self::Ended { .. } | Self::Running { .. } => {}
- }
- }
-
- fn finished(&mut self, result: Result<(), String>) -> Result<(), String> {
- match self {
- Self::NotStarted => return Err("Not started".to_string()),
- Self::Ended { result } => return Err(format!("Already finished with {result:?}")),
- Self::Running { .. } | Self::Stopping => {}
- }
-
- *self = Self::Ended { result };
- Ok(())
- }
}
type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>;
+enum MainThreadTxMessage {
+ Callback(BoxedMainThreadCallback),
+ Stop,
+}
+type MainThreadTx = mpsc::UnboundedSender<MainThreadTxMessage>;
+
+thread_local! {
+ /// The TX end of a channel into the Rust thread, so external callers can
+ /// access Rust modules. JNI / direct FFI should use do_in_rust_thread for
+ /// convenience, but objects passed into C++ as callbacks should
+ /// clone this channel to fail loudly if it's not yet initialized.
+ ///
+ /// This will be lazily initialized on first use from each client thread
+ static MAIN_THREAD_TX: MainThreadTx =
+ GLOBAL_MODULE_REGISTRY.lock().unwrap().as_ref().expect("stack not initialized").task_tx.clone();
+}
/// Posts a callback to the Rust thread and gives it access to public Rust
/// modules, used from JNI.
@@ -203,9 +142,8 @@ pub fn do_in_rust_thread<F>(f: F)
where
F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,
{
- if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f))
- {
- error!("Failed to do_in_rust_thread, panicking: {s}");
+ let ret = MAIN_THREAD_TX.with(|tx| tx.send(MainThreadTxMessage::Callback(Box::new(f))));
+ if ret.is_err() {
panic!("Rust call failed");
}
}