summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Jakub Rotkiewicz <rotkiewicz@google.com> 2025-01-22 13:47:53 +0000
committer Jakub Rotkiewicz <rotkiewicz@google.com> 2025-02-05 18:58:23 +0000
commitb8c660d032e1a0284d580ea41b5aa74b1e1c5903 (patch)
tree262b9fc4f720c7c956ecc7c259f2d2813297e584
parent9e4ac5bafb8013ed2acd20feee2712f885a17210 (diff)
avatar: refactor a2dp_test test order
Keep the related tests close to each other for better readability. Bug: 372300895 Flag: EXEMPT - test only Test: atest avatar:'A2dpTest' -v Change-Id: Id803e0e62ca179a22477c7a96b479dc04dba8d21
-rw-r--r--android/pandora/test/a2dp_test.py236
1 files changed, 118 insertions, 118 deletions
diff --git a/android/pandora/test/a2dp_test.py b/android/pandora/test/a2dp_test.py
index bdf3a8ce26..236f8b8b99 100644
--- a/android/pandora/test/a2dp_test.py
+++ b/android/pandora/test/a2dp_test.py
@@ -372,124 +372,6 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc]
await asyncio.wait_for(avdtp_future, timeout=10.0)
@avatar.asynchronous
- async def test_avdt_signaling_channel_connection_collision_case1(self) -> None:
- """Test AVDTP signaling channel connection collision.
-
- Test steps after DUT and RD1 connected and paired:
- 1. RD1 connects DUT over AVDTP - first AVDTP signaling channel
- 2. AVDTP signaling channel configuration postponed until DUT tries to initiate AVDTP signaling channel connection
- 3. DUT tries connecting RD1 - collision simulated
- 4. RD1 rejects AVDTP signaling channel connection request from DUT
- 5. RD1 proceeds with first AVDTP signaling channel configuration
- 6. Channel established - collision avoided
- """
-
- @dataclasses.dataclass
- class L2capConfigurationRequest:
- connection: Optional[Connection] = None
- cid: Optional[int] = None
- request: Optional[L2CAP_Configure_Request] = None
-
- global pending_configuration_request
- pending_configuration_request = L2capConfigurationRequest()
-
- class TestChannelManager(ChannelManager):
-
- def __init__(
- self,
- device: BumblePandoraDevice,
- ) -> None:
- super().__init__(
- device.l2cap_channel_manager.extended_features,
- device.l2cap_channel_manager.connectionless_mtu,
- )
- self.register_fixed_channel(bumble.smp.SMP_CID, device.on_smp_pdu)
- device.sdp_server.register(self)
- self.register_fixed_channel(bumble.att.ATT_CID, device.on_gatt_pdu)
- self.host = device.host
-
- def on_l2cap_connection_request(self, connection: Connection, cid: int, request) -> None:
- global pending_configuration_request
- if request.psm == AVDTP_PSM and pending_configuration_request is not None:
- logger.info("<< 4. RD1 rejects AVDTP connection request from DUT >>")
- self.send_control_frame(
- connection,
- cid,
- L2CAP_Connection_Response(
- identifier=request.identifier,
- destination_cid=0,
- source_cid=request.source_cid,
- result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
- status=0x0000,
- ),
- )
- logger.info("<< 5. RD1 proceeds with first AVDTP channel configuration >>")
- chan_connection = pending_configuration_request.connection
- chan_cid = pending_configuration_request.cid
- chan_request = pending_configuration_request.request
- pending_configuration_request = None
- super().on_control_frame(connection=chan_connection, cid=chan_cid, control_frame=chan_request)
- return
- super().on_l2cap_connection_request(connection, cid, request)
-
- class TestClassicChannel(ClassicChannel):
-
- def on_connection_response(self, response):
- assert self.state == self.State.WAIT_CONNECT_RSP
- assert (response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
- ), f"Connection response: {response}"
- self.destination_cid = response.destination_cid
- self._change_state(self.State.WAIT_CONFIG)
- logger.info("<< 2. RD1 connected DUT, configuration postponed >>")
-
- def on_configure_request(self, request) -> None:
- global pending_configuration_request
- if pending_configuration_request is not None:
- logger.info("<< 3. Block RD1 until DUT tries AVDTP channel connection >>")
- pending_configuration_request.connection = self.connection
- pending_configuration_request.cid = self.source_cid
- pending_configuration_request.request = request
- else:
- super().on_configure_request(request)
-
- # Override L2CAP Channel Manager to control signaling
- self.ref1.device.l2cap_channel_manager = TestChannelManager(self.ref1.device)
-
- # Connect and pair DUT -> RD1.
- dut_ref1, ref1_dut = await asyncio.gather(
- initiate_pairing(self.dut, self.ref1.address),
- accept_pairing(self.ref1, self.dut.address),
- )
-
- # Retrieve Bumble connection object from Pandora connection token
- connection = pandora_snippet.get_raw_connection(device=self.ref1, connection=ref1_dut)
- # Find a free CID for a new channel
- connection_channels = self.ref1.device.l2cap_channel_manager.channels.setdefault(connection.handle, {})
- source_cid = self.ref1.device.l2cap_channel_manager.find_free_br_edr_cid(connection_channels)
- assert source_cid is not None, "source_cid is None"
-
- spec = ClassicChannelSpec(AVDTP_PSM)
- channel = TestClassicChannel(
- self.ref1.device.l2cap_channel_manager,
- connection,
- L2CAP_SIGNALING_CID,
- AVDTP_PSM,
- source_cid,
- spec.mtu,
- )
- connection_channels[source_cid] = channel
-
- logger.info("<< 1. RD1 connects DUT over AVDTP - first channel >>")
- await channel.connect()
- logger.info(f"<< 6. Channel established: {channel} >>")
- assert channel.state == ClassicChannel.State.OPEN
-
- # Initiate AVDTP with connected L2CAP signaling channel
- protocol = Protocol(channel)
- protocol.add_sink(sbc_codec_capabilities())
- logger.info("<< Test finished! >>")
-
- @avatar.asynchronous
async def test_reconfigure_codec_success(self) -> None:
"""Basic A2DP connection and codec reconfiguration.
@@ -720,6 +602,124 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc]
await asyncio.wait_for(avdtp_future, timeout=10.0)
@avatar.asynchronous
+ async def test_avdt_signaling_channel_connection_collision_case1(self) -> None:
+ """Test AVDTP signaling channel connection collision.
+
+ Test steps after DUT and RD1 connected and paired:
+ 1. RD1 connects DUT over AVDTP - first AVDTP signaling channel
+ 2. AVDTP signaling channel configuration postponed until DUT tries to initiate AVDTP signaling channel connection
+ 3. DUT tries connecting RD1 - collision simulated
+ 4. RD1 rejects AVDTP signaling channel connection request from DUT
+ 5. RD1 proceeds with first AVDTP signaling channel configuration
+ 6. Channel established - collision avoided
+ """
+
+ @dataclasses.dataclass
+ class L2capConfigurationRequest:
+ connection: Optional[Connection] = None
+ cid: Optional[int] = None
+ request: Optional[L2CAP_Configure_Request] = None
+
+ global pending_configuration_request
+ pending_configuration_request = L2capConfigurationRequest()
+
+ class TestChannelManager(ChannelManager):
+
+ def __init__(
+ self,
+ device: BumblePandoraDevice,
+ ) -> None:
+ super().__init__(
+ device.l2cap_channel_manager.extended_features,
+ device.l2cap_channel_manager.connectionless_mtu,
+ )
+ self.register_fixed_channel(bumble.smp.SMP_CID, device.on_smp_pdu)
+ device.sdp_server.register(self)
+ self.register_fixed_channel(bumble.att.ATT_CID, device.on_gatt_pdu)
+ self.host = device.host
+
+ def on_l2cap_connection_request(self, connection: Connection, cid: int, request) -> None:
+ global pending_configuration_request
+ if request.psm == AVDTP_PSM and pending_configuration_request is not None:
+ logger.info("<< 4. RD1 rejects AVDTP connection request from DUT >>")
+ self.send_control_frame(
+ connection,
+ cid,
+ L2CAP_Connection_Response(
+ identifier=request.identifier,
+ destination_cid=0,
+ source_cid=request.source_cid,
+ result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
+ status=0x0000,
+ ),
+ )
+ logger.info("<< 5. RD1 proceeds with first AVDTP channel configuration >>")
+ chan_connection = pending_configuration_request.connection
+ chan_cid = pending_configuration_request.cid
+ chan_request = pending_configuration_request.request
+ pending_configuration_request = None
+ super().on_control_frame(connection=chan_connection, cid=chan_cid, control_frame=chan_request)
+ return
+ super().on_l2cap_connection_request(connection, cid, request)
+
+ class TestClassicChannel(ClassicChannel):
+
+ def on_connection_response(self, response):
+ assert self.state == self.State.WAIT_CONNECT_RSP
+ assert (response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
+ ), f"Connection response: {response}"
+ self.destination_cid = response.destination_cid
+ self._change_state(self.State.WAIT_CONFIG)
+ logger.info("<< 2. RD1 connected DUT, configuration postponed >>")
+
+ def on_configure_request(self, request) -> None:
+ global pending_configuration_request
+ if pending_configuration_request is not None:
+ logger.info("<< 3. Block RD1 until DUT tries AVDTP channel connection >>")
+ pending_configuration_request.connection = self.connection
+ pending_configuration_request.cid = self.source_cid
+ pending_configuration_request.request = request
+ else:
+ super().on_configure_request(request)
+
+ # Override L2CAP Channel Manager to control signaling
+ self.ref1.device.l2cap_channel_manager = TestChannelManager(self.ref1.device)
+
+ # Connect and pair DUT -> RD1.
+ dut_ref1, ref1_dut = await asyncio.gather(
+ initiate_pairing(self.dut, self.ref1.address),
+ accept_pairing(self.ref1, self.dut.address),
+ )
+
+ # Retrieve Bumble connection object from Pandora connection token
+ connection = pandora_snippet.get_raw_connection(device=self.ref1, connection=ref1_dut)
+ # Find a free CID for a new channel
+ connection_channels = self.ref1.device.l2cap_channel_manager.channels.setdefault(connection.handle, {})
+ source_cid = self.ref1.device.l2cap_channel_manager.find_free_br_edr_cid(connection_channels)
+ assert source_cid is not None, "source_cid is None"
+
+ spec = ClassicChannelSpec(AVDTP_PSM)
+ channel = TestClassicChannel(
+ self.ref1.device.l2cap_channel_manager,
+ connection,
+ L2CAP_SIGNALING_CID,
+ AVDTP_PSM,
+ source_cid,
+ spec.mtu,
+ )
+ connection_channels[source_cid] = channel
+
+ logger.info("<< 1. RD1 connects DUT over AVDTP - first channel >>")
+ await channel.connect()
+ logger.info(f"<< 6. Channel established: {channel} >>")
+ assert channel.state == ClassicChannel.State.OPEN
+
+ # Initiate AVDTP with connected L2CAP signaling channel
+ protocol = Protocol(channel)
+ protocol.add_sink(sbc_codec_capabilities())
+ logger.info("<< Test finished! >>")
+
+ @avatar.asynchronous
@enableFlag(A2DP_SM_IGNORE_CONNECT_EVENTS_IN_CONNECTING_STATE)
async def test_avdt_signaling_channel_connection_collision_case2(self) -> None:
"""Test AVDTP signaling channel connection collision with Android as initiator.