diff options
-rw-r--r-- | android/pandora/test/a2dp_test.py | 236 |
1 files changed, 118 insertions, 118 deletions
diff --git a/android/pandora/test/a2dp_test.py b/android/pandora/test/a2dp_test.py index bdf3a8ce26..236f8b8b99 100644 --- a/android/pandora/test/a2dp_test.py +++ b/android/pandora/test/a2dp_test.py @@ -372,124 +372,6 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc] await asyncio.wait_for(avdtp_future, timeout=10.0) @avatar.asynchronous - async def test_avdt_signaling_channel_connection_collision_case1(self) -> None: - """Test AVDTP signaling channel connection collision. - - Test steps after DUT and RD1 connected and paired: - 1. RD1 connects DUT over AVDTP - first AVDTP signaling channel - 2. AVDTP signaling channel configuration postponed until DUT tries to initiate AVDTP signaling channel connection - 3. DUT tries connecting RD1 - collision simulated - 4. RD1 rejects AVDTP signaling channel connection request from DUT - 5. RD1 proceeds with first AVDTP signaling channel configuration - 6. Channel established - collision avoided - """ - - @dataclasses.dataclass - class L2capConfigurationRequest: - connection: Optional[Connection] = None - cid: Optional[int] = None - request: Optional[L2CAP_Configure_Request] = None - - global pending_configuration_request - pending_configuration_request = L2capConfigurationRequest() - - class TestChannelManager(ChannelManager): - - def __init__( - self, - device: BumblePandoraDevice, - ) -> None: - super().__init__( - device.l2cap_channel_manager.extended_features, - device.l2cap_channel_manager.connectionless_mtu, - ) - self.register_fixed_channel(bumble.smp.SMP_CID, device.on_smp_pdu) - device.sdp_server.register(self) - self.register_fixed_channel(bumble.att.ATT_CID, device.on_gatt_pdu) - self.host = device.host - - def on_l2cap_connection_request(self, connection: Connection, cid: int, request) -> None: - global pending_configuration_request - if request.psm == AVDTP_PSM and pending_configuration_request is not None: - logger.info("<< 4. RD1 rejects AVDTP connection request from DUT >>") - self.send_control_frame( - connection, - cid, - L2CAP_Connection_Response( - identifier=request.identifier, - destination_cid=0, - source_cid=request.source_cid, - result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, - status=0x0000, - ), - ) - logger.info("<< 5. RD1 proceeds with first AVDTP channel configuration >>") - chan_connection = pending_configuration_request.connection - chan_cid = pending_configuration_request.cid - chan_request = pending_configuration_request.request - pending_configuration_request = None - super().on_control_frame(connection=chan_connection, cid=chan_cid, control_frame=chan_request) - return - super().on_l2cap_connection_request(connection, cid, request) - - class TestClassicChannel(ClassicChannel): - - def on_connection_response(self, response): - assert self.state == self.State.WAIT_CONNECT_RSP - assert (response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL - ), f"Connection response: {response}" - self.destination_cid = response.destination_cid - self._change_state(self.State.WAIT_CONFIG) - logger.info("<< 2. RD1 connected DUT, configuration postponed >>") - - def on_configure_request(self, request) -> None: - global pending_configuration_request - if pending_configuration_request is not None: - logger.info("<< 3. Block RD1 until DUT tries AVDTP channel connection >>") - pending_configuration_request.connection = self.connection - pending_configuration_request.cid = self.source_cid - pending_configuration_request.request = request - else: - super().on_configure_request(request) - - # Override L2CAP Channel Manager to control signaling - self.ref1.device.l2cap_channel_manager = TestChannelManager(self.ref1.device) - - # Connect and pair DUT -> RD1. - dut_ref1, ref1_dut = await asyncio.gather( - initiate_pairing(self.dut, self.ref1.address), - accept_pairing(self.ref1, self.dut.address), - ) - - # Retrieve Bumble connection object from Pandora connection token - connection = pandora_snippet.get_raw_connection(device=self.ref1, connection=ref1_dut) - # Find a free CID for a new channel - connection_channels = self.ref1.device.l2cap_channel_manager.channels.setdefault(connection.handle, {}) - source_cid = self.ref1.device.l2cap_channel_manager.find_free_br_edr_cid(connection_channels) - assert source_cid is not None, "source_cid is None" - - spec = ClassicChannelSpec(AVDTP_PSM) - channel = TestClassicChannel( - self.ref1.device.l2cap_channel_manager, - connection, - L2CAP_SIGNALING_CID, - AVDTP_PSM, - source_cid, - spec.mtu, - ) - connection_channels[source_cid] = channel - - logger.info("<< 1. RD1 connects DUT over AVDTP - first channel >>") - await channel.connect() - logger.info(f"<< 6. Channel established: {channel} >>") - assert channel.state == ClassicChannel.State.OPEN - - # Initiate AVDTP with connected L2CAP signaling channel - protocol = Protocol(channel) - protocol.add_sink(sbc_codec_capabilities()) - logger.info("<< Test finished! >>") - - @avatar.asynchronous async def test_reconfigure_codec_success(self) -> None: """Basic A2DP connection and codec reconfiguration. @@ -720,6 +602,124 @@ class A2dpTest(base_test.BaseTestClass): # type: ignore[misc] await asyncio.wait_for(avdtp_future, timeout=10.0) @avatar.asynchronous + async def test_avdt_signaling_channel_connection_collision_case1(self) -> None: + """Test AVDTP signaling channel connection collision. + + Test steps after DUT and RD1 connected and paired: + 1. RD1 connects DUT over AVDTP - first AVDTP signaling channel + 2. AVDTP signaling channel configuration postponed until DUT tries to initiate AVDTP signaling channel connection + 3. DUT tries connecting RD1 - collision simulated + 4. RD1 rejects AVDTP signaling channel connection request from DUT + 5. RD1 proceeds with first AVDTP signaling channel configuration + 6. Channel established - collision avoided + """ + + @dataclasses.dataclass + class L2capConfigurationRequest: + connection: Optional[Connection] = None + cid: Optional[int] = None + request: Optional[L2CAP_Configure_Request] = None + + global pending_configuration_request + pending_configuration_request = L2capConfigurationRequest() + + class TestChannelManager(ChannelManager): + + def __init__( + self, + device: BumblePandoraDevice, + ) -> None: + super().__init__( + device.l2cap_channel_manager.extended_features, + device.l2cap_channel_manager.connectionless_mtu, + ) + self.register_fixed_channel(bumble.smp.SMP_CID, device.on_smp_pdu) + device.sdp_server.register(self) + self.register_fixed_channel(bumble.att.ATT_CID, device.on_gatt_pdu) + self.host = device.host + + def on_l2cap_connection_request(self, connection: Connection, cid: int, request) -> None: + global pending_configuration_request + if request.psm == AVDTP_PSM and pending_configuration_request is not None: + logger.info("<< 4. RD1 rejects AVDTP connection request from DUT >>") + self.send_control_frame( + connection, + cid, + L2CAP_Connection_Response( + identifier=request.identifier, + destination_cid=0, + source_cid=request.source_cid, + result=L2CAP_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE, + status=0x0000, + ), + ) + logger.info("<< 5. RD1 proceeds with first AVDTP channel configuration >>") + chan_connection = pending_configuration_request.connection + chan_cid = pending_configuration_request.cid + chan_request = pending_configuration_request.request + pending_configuration_request = None + super().on_control_frame(connection=chan_connection, cid=chan_cid, control_frame=chan_request) + return + super().on_l2cap_connection_request(connection, cid, request) + + class TestClassicChannel(ClassicChannel): + + def on_connection_response(self, response): + assert self.state == self.State.WAIT_CONNECT_RSP + assert (response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL + ), f"Connection response: {response}" + self.destination_cid = response.destination_cid + self._change_state(self.State.WAIT_CONFIG) + logger.info("<< 2. RD1 connected DUT, configuration postponed >>") + + def on_configure_request(self, request) -> None: + global pending_configuration_request + if pending_configuration_request is not None: + logger.info("<< 3. Block RD1 until DUT tries AVDTP channel connection >>") + pending_configuration_request.connection = self.connection + pending_configuration_request.cid = self.source_cid + pending_configuration_request.request = request + else: + super().on_configure_request(request) + + # Override L2CAP Channel Manager to control signaling + self.ref1.device.l2cap_channel_manager = TestChannelManager(self.ref1.device) + + # Connect and pair DUT -> RD1. + dut_ref1, ref1_dut = await asyncio.gather( + initiate_pairing(self.dut, self.ref1.address), + accept_pairing(self.ref1, self.dut.address), + ) + + # Retrieve Bumble connection object from Pandora connection token + connection = pandora_snippet.get_raw_connection(device=self.ref1, connection=ref1_dut) + # Find a free CID for a new channel + connection_channels = self.ref1.device.l2cap_channel_manager.channels.setdefault(connection.handle, {}) + source_cid = self.ref1.device.l2cap_channel_manager.find_free_br_edr_cid(connection_channels) + assert source_cid is not None, "source_cid is None" + + spec = ClassicChannelSpec(AVDTP_PSM) + channel = TestClassicChannel( + self.ref1.device.l2cap_channel_manager, + connection, + L2CAP_SIGNALING_CID, + AVDTP_PSM, + source_cid, + spec.mtu, + ) + connection_channels[source_cid] = channel + + logger.info("<< 1. RD1 connects DUT over AVDTP - first channel >>") + await channel.connect() + logger.info(f"<< 6. Channel established: {channel} >>") + assert channel.state == ClassicChannel.State.OPEN + + # Initiate AVDTP with connected L2CAP signaling channel + protocol = Protocol(channel) + protocol.add_sink(sbc_codec_capabilities()) + logger.info("<< Test finished! >>") + + @avatar.asynchronous @enableFlag(A2DP_SM_IGNORE_CONNECT_EVENTS_IN_CONNECTING_STATE) async def test_avdt_signaling_channel_connection_collision_case2(self) -> None: """Test AVDTP signaling channel connection collision with Android as initiator. |