blob: 7391b15b4f31a29e118f6c8cc939681a289296c1 [file] [log] [blame]
//! This module manages LE connection requests and active
//! LE connections. In particular, it de-duplicates connection requests,
//! avoids duplicate connections to the same devices (even with different RPAs),
//! and retries failed connections
use std::{
cell::RefCell, collections::HashSet, fmt::Debug, future::Future, hash::Hash, ops::Deref,
time::Duration,
};
use crate::{
core::{
address::AddressWithType,
shared_box::{SharedBox, WeakBox, WeakBoxRef},
},
gatt::ids::ServerId,
};
use self::{
acceptlist_manager::{determine_target_state, LeAcceptlistManager},
attempt_manager::{ConnectionAttempts, ConnectionMode},
le_manager::{ErrorCode, InactiveLeAclManager, LeAclManagerConnectionCallbacks},
};
mod acceptlist_manager;
mod attempt_manager;
mod ffi;
pub mod le_manager;
mod mocks;
pub use ffi::{register_callbacks, LeAclManagerImpl, LeAclManagerShim};
use log::info;
use scopeguard::ScopeGuard;
use tokio::{task::spawn_local, time::timeout};
/// Possible errors returned when making a connection attempt
#[derive(Debug, PartialEq, Eq)]
pub enum CreateConnectionFailure {
/// This client is already making a connection of the same type
/// to the same address.
ConnectionAlreadyPending,
}
/// Errors returned if a connection successfully starts but fails afterwards.
#[derive(Debug, PartialEq, Eq)]
pub enum ConnectionFailure {
/// The connection attempt was cancelled
Cancelled,
/// The connection completed but with an HCI error code
Error(ErrorCode),
}
/// Errors returned if the client fails to cancel their connection attempt
#[derive(Debug, PartialEq, Eq)]
pub enum CancelConnectFailure {
/// The connection attempt does not exist
ConnectionNotPending,
}
/// Unique identifiers for a client of the connection manager
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum ConnectionManagerClient {
/// A GATT client with given client ID
GattClient(u8),
/// A GATT server with given server ID
GattServer(ServerId),
}
/// An active connection
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct LeConnection {
/// The address of the peer device, as reported in the connection complete event
/// This is guaranteed to be unique across active connections, so we can implement
/// PartialEq/Eq on this.
pub remote_address: AddressWithType,
}
/// Responsible for managing the initiator state and the list of
/// devices on the filter accept list
#[derive(Debug)]
pub struct ConnectionManager {
state: RefCell<ConnectionManagerState>,
}
#[derive(Debug)]
struct ConnectionManagerState {
/// All pending connection attempts (unresolved direct + all background)
attempts: ConnectionAttempts,
/// The addresses we are currently connected to
current_connections: HashSet<AddressWithType>,
/// Tracks the state of the LE connect list, and updates it to drive to a
/// specified target state
acceptlist_manager: LeAcceptlistManager,
}
struct ConnectionManagerCallbackHandler(WeakBox<ConnectionManager>);
const DIRECT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(
29, /* ugly hack to avoid fighting with le_impl timeout, until I remove that timeout */
);
impl LeAclManagerConnectionCallbacks for ConnectionManagerCallbackHandler {
fn on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>) {
self.with_manager(|manager| manager.on_le_connect(address, result))
}
fn on_disconnect(&self, address: AddressWithType) {
self.with_manager(|manager| manager.on_disconnect(address))
}
}
impl ConnectionManagerCallbackHandler {
fn with_manager(&self, f: impl FnOnce(&ConnectionManager)) {
self.0.with(|manager| f(manager.expect("got connection event after stack died").deref()))
}
}
impl ConnectionManager {
/// Constructor
pub fn new(le_manager: impl InactiveLeAclManager) -> SharedBox<Self> {
SharedBox::new_cyclic(|weak| Self {
state: RefCell::new(ConnectionManagerState {
attempts: ConnectionAttempts::new(),
current_connections: HashSet::new(),
acceptlist_manager: LeAcceptlistManager::new(
le_manager.register_callbacks(ConnectionManagerCallbackHandler(weak)),
),
}),
})
}
}
/// Make the state of the LeAcceptlistManager consistent with the attempts tracked in ConnectionAttempts
fn reconcile_state(state: &mut ConnectionManagerState) {
state
.acceptlist_manager
.drive_to_state(determine_target_state(&state.attempts.active_attempts()));
}
impl WeakBoxRef<'_, ConnectionManager> {
/// Start a direct connection to a peer device from a specified client. If the peer
/// is connected, immediately resolve the attempt.
pub fn start_direct_connection(
&self,
client: ConnectionManagerClient,
address: AddressWithType,
) -> Result<(), CreateConnectionFailure> {
spawn_local(timeout(DIRECT_CONNECTION_TIMEOUT, self.direct_connection(client, address)?));
Ok(())
}
/// Start a direct connection to a peer device from a specified client.
///
/// # Cancellation Safety
/// If this future is dropped, the connection attempt will be cancelled. It can also be cancelled
/// from the separate API ConnectionManager#cancel_connection.
fn direct_connection(
&self,
client: ConnectionManagerClient,
address: AddressWithType,
) -> Result<
impl Future<Output = Result<LeConnection, ConnectionFailure>>,
CreateConnectionFailure,
> {
let mut state = self.state.borrow_mut();
// if connected, this is a no-op
let attempt_and_guard = if state.current_connections.contains(&address) {
None
} else {
let pending_attempt = state.attempts.register_direct_connection(client, address)?;
let attempt_id = pending_attempt.id;
reconcile_state(&mut state);
Some((
pending_attempt,
scopeguard::guard(self.downgrade(), move |this| {
// remove the attempt after we are cancelled
this.with(|this| {
this.map(|this| {
info!("Cancelling attempt {attempt_id:?}");
let mut state = this.state.borrow_mut();
state.attempts.cancel_attempt_with_id(attempt_id);
reconcile_state(&mut state);
})
});
}),
))
};
Ok(async move {
let Some((attempt, guard)) = attempt_and_guard else {
// if we did not make an attempt, the connection must be ready
return Ok(LeConnection { remote_address: address })
};
// otherwise, wait until the attempt resolves
let ret = attempt.await;
// defuse scopeguard (no need to cancel now)
ScopeGuard::into_inner(guard);
ret
})
}
}
impl ConnectionManager {
/// Start a background connection to a peer device with given parameters from a specified client.
pub fn add_background_connection(
&self,
client: ConnectionManagerClient,
address: AddressWithType,
) -> Result<(), CreateConnectionFailure> {
let mut state = self.state.borrow_mut();
state.attempts.register_background_connection(client, address)?;
reconcile_state(&mut state);
Ok(())
}
/// Cancel connection attempt from this client to the specified address with the specified mode.
pub fn cancel_connection(
&self,
client: ConnectionManagerClient,
address: AddressWithType,
mode: ConnectionMode,
) -> Result<(), CancelConnectFailure> {
let mut state = self.state.borrow_mut();
state.attempts.cancel_attempt(client, address, mode)?;
reconcile_state(&mut state);
Ok(())
}
/// Cancel all connection attempts to this address
pub fn cancel_unconditionally(&self, address: AddressWithType) {
let mut state = self.state.borrow_mut();
state.attempts.remove_unconditionally(address);
reconcile_state(&mut state);
}
/// Cancel all connection attempts from this client
pub fn remove_client(&self, client: ConnectionManagerClient) {
let mut state = self.state.borrow_mut();
state.attempts.remove_client(client);
reconcile_state(&mut state);
}
fn on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>) {
let mut state = self.state.borrow_mut();
// record this connection while it exists
state.current_connections.insert(address);
// all completed connections remove the address from the direct list
state.acceptlist_manager.on_connect_complete(address);
// invoke any pending callbacks, update set of attempts
state.attempts.process_connection(address, result);
// update the acceptlist
reconcile_state(&mut state);
}
fn on_disconnect(&self, address: AddressWithType) {
let mut state = self.state.borrow_mut();
state.current_connections.remove(&address);
reconcile_state(&mut state);
}
}
#[cfg(test)]
mod test {
use crate::{core::address::AddressType, utils::task::block_on_locally};
use super::{mocks::mock_le_manager::MockLeAclManager, *};
const CLIENT_1: ConnectionManagerClient = ConnectionManagerClient::GattClient(1);
const CLIENT_2: ConnectionManagerClient = ConnectionManagerClient::GattClient(2);
const ADDRESS_1: AddressWithType =
AddressWithType { address: [1, 2, 3, 4, 5, 6], address_type: AddressType::Public };
const ERROR: ErrorCode = ErrorCode(1);
#[test]
fn test_single_direct_connection() {
block_on_locally(async {
// arrange
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
// act: initiate a direct connection
connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
// assert: the direct connection is pending
assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Direct));
assert_eq!(mock_le_manager.current_acceptlist().len(), 1);
assert!(mock_le_manager.current_acceptlist().contains(&ADDRESS_1));
});
}
#[test]
fn test_failed_direct_connection() {
block_on_locally(async {
// arrange: one pending direct connection
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
// act: the connection attempt fails
mock_le_manager.on_le_connect(ADDRESS_1, ERROR);
// assert: the direct connection has stopped
assert_eq!(mock_le_manager.current_connection_mode(), None);
});
}
#[test]
fn test_single_background_connection() {
block_on_locally(async {
// arrange
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
// act: initiate a background connection
connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
// assert: the background connection is pending
assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Background));
assert_eq!(mock_le_manager.current_acceptlist().len(), 1);
assert!(mock_le_manager.current_acceptlist().contains(&ADDRESS_1));
});
}
#[test]
fn test_resolved_connection() {
block_on_locally(async {
// arrange
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
// act: initiate a direct connection, that succeeds
connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
// assert: no connection is pending
assert_eq!(mock_le_manager.current_connection_mode(), None);
});
}
#[test]
fn test_resolved_background_connection() {
block_on_locally(async {
// arrange
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
// act: initiate a background connection, that succeeds
connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
// assert: no connection is pending
assert_eq!(mock_le_manager.current_connection_mode(), None);
});
}
#[test]
fn test_resolved_direct_connection_after_disconnect() {
block_on_locally(async {
// arrange
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
// act: initiate a direct connection, that succeeds, then disconnects
connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
mock_le_manager.on_le_disconnect(ADDRESS_1);
// assert: no connection is pending
assert_eq!(mock_le_manager.current_connection_mode(), None);
});
}
#[test]
fn test_resolved_background_connection_after_disconnect() {
block_on_locally(async {
// arrange
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
// act: initiate a background connection, that succeeds, then disconnects
connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
mock_le_manager.on_le_disconnect(ADDRESS_1);
// assert: the background connection has resumed
assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Background));
});
}
#[test]
fn test_direct_connection_timeout() {
block_on_locally(async {
// arrange: a pending direct connection
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
// act: let it timeout
tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT).await;
// go forward one tick to ensure all timers are fired
// (since we are using fake time, this is not a race condition)
tokio::time::sleep(Duration::from_millis(1)).await;
// assert: it is cancelled and we are idle again
assert_eq!(mock_le_manager.current_connection_mode(), None);
});
}
#[test]
fn test_stacked_direct_connections_timeout() {
block_on_locally(async {
// arrange
let mock_le_manager = MockLeAclManager::new();
let connection_manager = ConnectionManager::new(mock_le_manager.clone());
// act: start a direct connection
connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT * 3 / 4).await;
// act: after some time, start a second one
connection_manager.as_ref().start_direct_connection(CLIENT_2, ADDRESS_1).unwrap();
// act: wait for the first one (but not the second) to time out
tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT * 3 / 4).await;
// assert: we are still doing a direct connection
assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Direct));
});
}
}