summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Henri Chataing <henrichataing@google.com> 2023-05-04 11:19:42 -0700
committer Henri Chataing <henrichataing@google.com> 2023-05-10 22:49:26 +0000
commitb0b04a38101d12f1e69548848b45e26f341adbdf (patch)
treed89450269fddffedea198b575ef35afc96673b6a
parente0cf5f94f2087f87c02b7f95e49bb8818cb48bea (diff)
RootCanal: Implement LL tests for Phy update procedure
Added tests: - LL/CON/CEN/BV-41-C - LL/CON/CEN/BV-43-C - LL/CON/PER/BV-40-C - LL/CON/PER/BV-42-C Test: atest rootcanal_ll_test Bug: 275970864 Change-Id: Id1063bdf91346226bf01c09e78f0780d8b7a34de
-rw-r--r--tools/rootcanal/Android.bp2
-rw-r--r--tools/rootcanal/py/controller.py47
-rw-r--r--tools/rootcanal/test/LL/CON_/CEN/BV_41_C.py218
-rw-r--r--tools/rootcanal/test/LL/CON_/CEN/BV_43_C.py237
-rw-r--r--tools/rootcanal/test/LL/CON_/PER/BV_40_C.py233
-rw-r--r--tools/rootcanal/test/LL/CON_/PER/BV_42_C.py241
-rw-r--r--tools/rootcanal/test/LL/DDI/ADV/BV_20_C.py4
-rw-r--r--tools/rootcanal/test/LL/DDI/ADV/BV_26_C.py14
-rw-r--r--tools/rootcanal/test/LL/DDI/SCN/BV_18_C.py12
-rw-r--r--tools/rootcanal/test/main.py4
10 files changed, 980 insertions, 32 deletions
diff --git a/tools/rootcanal/Android.bp b/tools/rootcanal/Android.bp
index 8f11a0571b..b9a012726f 100644
--- a/tools/rootcanal/Android.bp
+++ b/tools/rootcanal/Android.bp
@@ -289,6 +289,8 @@ python_test_host {
":link_layer_packets_python3_gen",
"py/bluetooth.py",
"py/controller.py",
+ "test/LL/CON_/CEN/*.py",
+ "test/LL/CON_/PER/*.py",
"test/LL/DDI/ADV/*.py",
"test/LL/DDI/SCN/*.py",
"test/main.py",
diff --git a/tools/rootcanal/py/controller.py b/tools/rootcanal/py/controller.py
index cc0dbe2360..f0a43d77dc 100644
--- a/tools/rootcanal/py/controller.py
+++ b/tools/rootcanal/py/controller.py
@@ -7,7 +7,7 @@ import py.bluetooth
import sys
import typing
import unittest
-from typing import Optional
+from typing import Optional, Tuple, Union
from hci_packets import ErrorCode
from ctypes import *
@@ -45,6 +45,7 @@ def generate_rpa(irk: bytes) -> hci.Address:
rpa = bytearray(6)
rpa_type = c_char * 6
rootcanal.ffi_generate_rpa(c_char_p(irk), rpa_type.from_buffer(rpa))
+ rpa.reverse()
return hci.Address(bytes(rpa))
@@ -167,12 +168,26 @@ class Controller:
return self.ll_queue.popleft()
+class Any:
+ """Helper class that will match all other values.
+ Use an element of this class in expected packets to match any value
+ returned by the Controller stack."""
+
+ def __eq__(self, other) -> bool:
+ return True
+
+ def __format__(self, format_spec: str) -> str:
+ return "_"
+
+
class ControllerTest(unittest.IsolatedAsyncioTestCase):
"""Helper class for writing controller tests using the python bindings.
The test setups the controller sending the Reset command and configuring
the event masks to allow all events. The local device address is
always configured as 11:11:11:11:11:11."""
+ Any = Any()
+
def setUp(self):
self.controller = Controller(hci.Address('11:11:11:11:11:11'))
@@ -196,49 +211,47 @@ class ControllerTest(unittest.IsolatedAsyncioTestCase):
evt = await self.expect_cmd_complete(hci.LeReadLocalSupportedFeaturesComplete)
controller.le_features = LeFeatures(evt.le_features)
- async def expect_evt(self, expected_evt: hci.Event, timeout: int = 3):
+ async def expect_evt(self, expected_evt: typing.Union[hci.Event, type], timeout: int = 3) -> hci.Event:
packet = await asyncio.wait_for(self.controller.receive_evt(), timeout=timeout)
evt = hci.Event.parse_all(packet)
- if evt != expected_evt:
+ if isinstance(expected_evt, type) and not isinstance(evt, expected_evt):
print("received unexpected event")
- print("expected event:")
- expected_evt.show()
+ print(f"expected event: {expected_evt.__class__.__name__}")
print("received event:")
evt.show()
self.assertTrue(False)
- async def expect_cmd_complete(self, expected_evt: type, timeout: int = 3) -> hci.Event:
- packet = await asyncio.wait_for(self.controller.receive_evt(), timeout=timeout)
- evt = hci.Event.parse_all(packet)
-
- if not isinstance(evt, expected_evt):
+ if isinstance(expected_evt, hci.Event) and evt != expected_evt:
print("received unexpected event")
- print("expected event:")
- print(expected_evt)
+ print(f"expected event:")
+ expected_evt.show()
print("received event:")
evt.show()
self.assertTrue(False)
+ return evt
+
+ async def expect_cmd_complete(self, expected_evt: type, timeout: int = 3) -> hci.Event:
+ evt = await self.expect_evt(expected_evt, timeout=timeout)
assert evt.status == ErrorCode.SUCCESS
assert evt.num_hci_command_packets == 1
return evt
async def expect_ll(self,
expected_pdus: typing.Union[list, typing.Union[ll.LinkLayerPacket, type]],
- timeout: int = 3) -> int:
+ timeout: int = 3) -> ll.LinkLayerPacket:
if not isinstance(expected_pdus, list):
expected_pdus = [expected_pdus]
packet = await asyncio.wait_for(self.controller.receive_ll(), timeout=timeout)
pdu = ll.LinkLayerPacket.parse_all(packet)
- matched_index = -1
- for (index, expected_pdu) in enumerate(expected_pdus):
+ for expected_pdu in expected_pdus:
if isinstance(expected_pdu, type) and isinstance(pdu, expected_pdu):
- return index
+ return pdu
if isinstance(expected_pdu, ll.LinkLayerPacket) and pdu == expected_pdu:
- return index
+ return pdu
print("received unexpected pdu:")
pdu.show()
diff --git a/tools/rootcanal/test/LL/CON_/CEN/BV_41_C.py b/tools/rootcanal/test/LL/CON_/CEN/BV_41_C.py
new file mode 100644
index 0000000000..4de878b0a3
--- /dev/null
+++ b/tools/rootcanal/test/LL/CON_/CEN/BV_41_C.py
@@ -0,0 +1,218 @@
+from dataclasses import dataclass
+import hci_packets as hci
+import link_layer_packets as ll
+import unittest
+from hci_packets import ErrorCode
+from py.bluetooth import Address
+from py.controller import ControllerTest
+
+
+@dataclass
+class TestRound:
+ req_all_phys: int
+ req_tx_phys: int
+ req_rx_phys: int
+ rsp_tx_phys: int
+ rsp_rx_phys: int
+
+
+class Test(ControllerTest):
+
+ # LL/CON/CEN/BV-41-C [Initiating PHY Update Procedure]
+ async def test(self):
+ # Test parameters.
+ controller = self.controller
+ acl_connection_handle = 0xefe
+ peer_address = Address('11:22:33:44:55:66')
+
+ # Prelude: Establish an ACL connection as central with the IUT.
+ controller.send_cmd(
+ hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS,
+ own_address_type=hci.OwnAddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address=peer_address,
+ initiating_phys=0x1,
+ phy_scan_parameters=[
+ hci.LeCreateConnPhyScanParameters(
+ scan_interval=0x200,
+ scan_window=0x100,
+ conn_interval_min=0x200,
+ conn_interval_max=0x200,
+ conn_latency=0x6,
+ supervision_timeout=0xc80,
+ min_ce_length=0,
+ max_ce_length=0,
+ )
+ ]))
+
+ await self.expect_evt(hci.LeExtendedCreateConnectionStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_address,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ advertising_type=ll.LegacyAdvertisingType.ADV_IND,
+ advertising_data=[]),
+ rssi=-16)
+
+ await self.expect_ll(
+ ll.LeConnect(source_address=controller.address,
+ destination_address=peer_address,
+ initiating_address_type=ll.AddressType.PUBLIC,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x6,
+ conn_supervision_timeout=0xc80))
+
+ controller.send_ll(
+ ll.LeConnectComplete(source_address=peer_address,
+ destination_address=controller.address,
+ initiating_address_type=ll.AddressType.PUBLIC,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x6,
+ conn_supervision_timeout=0xc80))
+
+ await self.expect_evt(
+ hci.LeEnhancedConnectionComplete(status=ErrorCode.SUCCESS,
+ connection_handle=acl_connection_handle,
+ role=hci.Role.CENTRAL,
+ peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address=peer_address,
+ conn_interval=0x200,
+ conn_latency=0x6,
+ supervision_timeout=0xc80,
+ central_clock_accuracy=hci.ClockAccuracy.PPM_500))
+
+ await self.expect_evt(
+ hci.LeChannelSelectionAlgorithm(connection_handle=acl_connection_handle,
+ channel_selection_algorithm=hci.ChannelSelectionAlgorithm.ALGORITHM_1))
+
+ test_rounds = [
+ TestRound(0x00, 0x02, 0x02, 0x02, 0x02),
+ TestRound(0x00, 0x01, 0x02, 0x01, 0x02),
+ TestRound(0x00, 0x02, 0x01, 0x02, 0x01),
+ TestRound(0x00, 0x01, 0x01, 0x01, 0x01),
+ TestRound(0x00, 0x03, 0x02, 0x03, 0x02),
+ TestRound(0x00, 0x03, 0x01, 0x03, 0x01),
+ TestRound(0x00, 0x01, 0x03, 0x01, 0x03),
+ TestRound(0x00, 0x02, 0x03, 0x02, 0x03),
+ TestRound(0x00, 0x03, 0x03, 0x03, 0x03),
+ TestRound(0x01, 0x00, 0x02, 0x03, 0x03),
+ TestRound(0x02, 0x02, 0x00, 0x03, 0x03),
+ TestRound(0x03, 0x00, 0x00, 0x03, 0x03),
+ TestRound(0x00, 0x04, 0x04, 0x04, 0x04),
+ TestRound(0x00, 0x01, 0x04, 0x01, 0x04),
+ TestRound(0x00, 0x04, 0x01, 0x04, 0x01),
+ TestRound(0x00, 0x01, 0x01, 0x01, 0x01),
+ TestRound(0x00, 0x05, 0x04, 0x05, 0x04),
+ TestRound(0x00, 0x05, 0x01, 0x05, 0x01),
+ TestRound(0x00, 0x01, 0x05, 0x01, 0x05),
+ TestRound(0x00, 0x04, 0x05, 0x04, 0x05),
+ TestRound(0x00, 0x05, 0x05, 0x05, 0x05),
+ TestRound(0x01, 0x00, 0x04, 0x05, 0x05),
+ TestRound(0x02, 0x04, 0x00, 0x05, 0x05),
+ TestRound(0x03, 0x00, 0x00, 0x05, 0x05),
+ TestRound(0x00, 0x06, 0x06, 0x02, 0x02),
+ TestRound(0x00, 0x06, 0x06, 0x04, 0x04),
+ TestRound(0x00, 0x06, 0x06, 0x02, 0x02),
+ TestRound(0x00, 0x02, 0x04, 0x02, 0x04),
+ TestRound(0x00, 0x04, 0x02, 0x04, 0x02),
+ TestRound(0x03, 0x00, 0x00, 0x04, 0x04),
+ TestRound(0x03, 0x00, 0x00, 0x02, 0x02),
+ TestRound(0x03, 0x00, 0x00, 0x01, 0x01),
+ ]
+
+ # Repeat steps 1-9 for each Round shown in Table 4.77.
+ phy_c_to_p = 0x1
+ phy_p_to_c = 0x1
+ for test_round in test_rounds:
+ (phy_c_to_p, phy_p_to_c) = await self.steps_1_9(peer_address, acl_connection_handle, phy_c_to_p, phy_p_to_c,
+ **vars(test_round))
+
+ async def steps_1_9(self, peer_address: Address, connection_handle: int, phy_c_to_p: int, phy_p_to_c: int,
+ req_all_phys: int, req_tx_phys: int, req_rx_phys: int, rsp_tx_phys: int, rsp_rx_phys: int):
+ controller = self.controller
+
+ def phy_from_mask(mask: int):
+ if mask & 0x4:
+ return hci.PhyType.LE_CODED
+ elif mask & 0x2:
+ return hci.PhyType.LE_2M
+ else:
+ return hci.PhyType.LE_1M
+
+ # 1. Upper Tester sends an HCI_LE_Set_PHY command to the IUT with the payload defined in the
+ # HCI_LE_Set_PHY section of Table 4.77 and PHY_options set to 0x0000.
+ controller.send_cmd(
+ hci.LeSetPhy(connection_handle=connection_handle,
+ all_phys_no_transmit_preference=(req_all_phys & 0x1) != 0,
+ all_phys_no_receive_preference=(req_all_phys & 0x2) != 0,
+ tx_phys=req_tx_phys,
+ rx_phys=req_rx_phys,
+ phy_options=hci.PhyOptions.NO_PREFERENCE))
+
+ # 2. The Upper Tester receives an HCI_Command_Status event from the IUT in response. If any bits
+ # set in TX_PHYS or RX_PHYS correspond to unsupported PHYs, the Status shall be set to
+ # “Unsupported Feature or Parameter Value (0x11)”. Otherwise the Status shall be set to zero.
+ await self.expect_evt(hci.LeSetPhyStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 3. If the IUT does not initiate a PHY change, proceed to step 9 if the Status in step 2 was set to zero
+ # or proceed to the next round if the Status in step 2 was set to a nonzero value.
+
+ if (req_all_phys & 0x1) != 0:
+ req_tx_phys = 0x7
+ if (req_all_phys & 0x2) != 0:
+ req_rx_phys = 0x7
+
+ # 4. The Lower Tester receives an LL_PHY_REQ control PDU from the IUT with at least one bit set in
+ # each field (RX_PHYS, TX_PHYS). The Lower Tester acknowledges the IUT’s request and
+ # responds with an LL_PHY_RSP PDU with the payload defined in the LL_PHY_RSP section of
+ # Table 4.77.
+ await self.expect_ll(
+ ll.LlPhyReq(source_address=controller.address,
+ destination_address=peer_address,
+ tx_phys=req_tx_phys,
+ rx_phys=req_rx_phys))
+
+ controller.send_ll(
+ ll.LlPhyRsp(source_address=peer_address,
+ destination_address=controller.address,
+ tx_phys=rsp_tx_phys,
+ rx_phys=rsp_rx_phys))
+
+ # 5. Lower Tester receives an LL_PHY_UPDATE_IND with zero or one bits set in each field
+ # (PHY_C_TO_P, PHY_P_TO_C) and a selected PHY present in the payload sent in the
+ # LL_PHY_RSP PDU. If no bits are set in either field, proceed to step 8.
+ phy_update_ind = await self.expect_ll(
+ ll.LlPhyUpdateInd(source_address=controller.address,
+ destination_address=peer_address,
+ phy_c_to_p=self.Any,
+ phy_p_to_c=self.Any))
+
+ self.assertTrue((phy_update_ind.phy_c_to_p & ~req_tx_phys) == 0)
+ self.assertTrue((phy_update_ind.phy_c_to_p & ~rsp_rx_phys) == 0)
+ self.assertTrue((phy_update_ind.phy_p_to_c & ~req_rx_phys) == 0)
+ self.assertTrue((phy_update_ind.phy_p_to_c & ~rsp_tx_phys) == 0)
+ phy_c_to_p = phy_update_ind.phy_c_to_p or phy_c_to_p
+ phy_p_to_c = phy_update_ind.phy_p_to_c or phy_p_to_c
+
+ # 6. Maintain the connection using empty DATA packets until the event count matches the Instant
+ # indicated in the LL_PHY_UPDATE_IND packet.
+
+ # 7. Once the event count matches the time, the new PHY(s) selected by the IUT will be used.
+
+ # 8. IUT sends empty DATA packets to the Lower Tester, and the Lower Tester acknowledges these
+ # packets, using the selected PHY(s).
+
+ # 9. If the command was accepted in step 2 or at least one of the PHY fields in the
+ # LL_PHY_UPDATE_IND PDU was nonzero, the Upper Tester receives an
+ # LE_PHY_Update_Complete event from the IUT with a payload consistent with the PHY(s)
+ # indicated in the LL_PHY_UPDATE_IND PDU (or the prior PHY, in cases where a field in
+ # LL_PHY_UPDATE_IND was zero or LL_PHY_UPDATE_IND was not sent). Otherwise the Upper
+ # Tester receives no event.
+ await self.expect_evt(
+ hci.LePhyUpdateComplete(connection_handle=connection_handle,
+ status=ErrorCode.SUCCESS,
+ tx_phy=phy_from_mask(phy_c_to_p),
+ rx_phy=phy_from_mask(phy_p_to_c)))
+
+ return (phy_c_to_p, phy_p_to_c)
diff --git a/tools/rootcanal/test/LL/CON_/CEN/BV_43_C.py b/tools/rootcanal/test/LL/CON_/CEN/BV_43_C.py
new file mode 100644
index 0000000000..d475606a29
--- /dev/null
+++ b/tools/rootcanal/test/LL/CON_/CEN/BV_43_C.py
@@ -0,0 +1,237 @@
+from dataclasses import dataclass
+import hci_packets as hci
+import link_layer_packets as ll
+import unittest
+from hci_packets import ErrorCode
+from py.bluetooth import Address
+from py.controller import ControllerTest
+
+
+@dataclass
+class TestRound:
+ req_tx_phys: int
+ req_rx_phys: int
+
+
+class Test(ControllerTest):
+
+ # LL/CON/CEN/BV-43-C [Responding to PHY Update Procedure]
+ async def test(self):
+ # Test parameters.
+ controller = self.controller
+ acl_connection_handle = 0xefe
+ peer_address = Address('11:22:33:44:55:66')
+
+ # Prelude: Establish an ACL connection as central with the IUT.
+ controller.send_cmd(
+ hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS,
+ own_address_type=hci.OwnAddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address=peer_address,
+ initiating_phys=0x1,
+ phy_scan_parameters=[
+ hci.LeCreateConnPhyScanParameters(
+ scan_interval=0x200,
+ scan_window=0x100,
+ conn_interval_min=0x200,
+ conn_interval_max=0x200,
+ conn_latency=0x6,
+ supervision_timeout=0xc80,
+ min_ce_length=0,
+ max_ce_length=0,
+ )
+ ]))
+
+ await self.expect_evt(hci.LeExtendedCreateConnectionStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_address,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ advertising_type=ll.LegacyAdvertisingType.ADV_IND,
+ advertising_data=[]),
+ rssi=-16)
+
+ await self.expect_ll(
+ ll.LeConnect(source_address=controller.address,
+ destination_address=peer_address,
+ initiating_address_type=ll.AddressType.PUBLIC,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x6,
+ conn_supervision_timeout=0xc80))
+
+ controller.send_ll(
+ ll.LeConnectComplete(source_address=peer_address,
+ destination_address=controller.address,
+ initiating_address_type=ll.AddressType.PUBLIC,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x6,
+ conn_supervision_timeout=0xc80))
+
+ await self.expect_evt(
+ hci.LeEnhancedConnectionComplete(status=ErrorCode.SUCCESS,
+ connection_handle=acl_connection_handle,
+ role=hci.Role.CENTRAL,
+ peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address=peer_address,
+ conn_interval=0x200,
+ conn_latency=0x6,
+ supervision_timeout=0xc80,
+ central_clock_accuracy=hci.ClockAccuracy.PPM_500))
+
+ await self.expect_evt(
+ hci.LeChannelSelectionAlgorithm(connection_handle=acl_connection_handle,
+ channel_selection_algorithm=hci.ChannelSelectionAlgorithm.ALGORITHM_1))
+
+ # 1. The Upper Tester sends an HCI_LE_Set_PHY command to the IUT with the ALL_PHYS field set
+ # to a value of 0x03. The Upper Tester receives an HCI_Command_Status event indicating
+ # success in response. The controller may send a LL_PHY_REQ to the Lower Tester. In this case,
+ # the Lower Tester sends a LL_PHY_RSP specifying the current PHY in both directions in
+ # response and the IUT completes the transaction with an LL_PHY_UPDATE_IND. Whether or not
+ # the procedure is carried out with the Lower Tester, the Upper Tester receives an
+ # HCI_LE_PHY_Update_Complete event from the IUT indicating both directions are operating
+ # using the LE 1M PHY.
+ controller.send_cmd(
+ hci.LeSetPhy(connection_handle=acl_connection_handle,
+ all_phys_no_transmit_preference=True,
+ all_phys_no_receive_preference=True,
+ tx_phys=0,
+ rx_phys=0))
+
+ await self.expect_evt(hci.LeSetPhyStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ await self.expect_ll(
+ ll.LlPhyReq(source_address=controller.address, destination_address=peer_address, tx_phys=0x7, rx_phys=0x7))
+
+ controller.send_ll(
+ ll.LlPhyRsp(source_address=peer_address, destination_address=controller.address, tx_phys=0x1, rx_phys=0x1))
+
+ await self.expect_ll(
+ ll.LlPhyUpdateInd(source_address=controller.address,
+ destination_address=peer_address,
+ phy_c_to_p=0x0,
+ phy_p_to_c=0x0))
+
+ await self.expect_evt(
+ hci.LePhyUpdateComplete(status=ErrorCode.SUCCESS,
+ connection_handle=acl_connection_handle,
+ tx_phy=hci.PhyType.LE_1M,
+ rx_phy=hci.PhyType.LE_1M))
+
+ test_rounds = [
+ TestRound(0x03, 0x01),
+ TestRound(0x05, 0x02),
+ TestRound(0x02, 0x04),
+ TestRound(0x01, 0x02),
+ TestRound(0x04, 0x01),
+ TestRound(0x03, 0x06),
+ TestRound(0x01, 0x01),
+ TestRound(0x04, 0x03),
+ TestRound(0x05, 0x01),
+ TestRound(0x04, 0x04),
+ TestRound(0x05, 0x07),
+ TestRound(0x05, 0x05),
+ TestRound(0x04, 0x02),
+ TestRound(0x03, 0x07),
+ TestRound(0x06, 0x06),
+ TestRound(0x03, 0x02),
+ TestRound(0x01, 0x06),
+ TestRound(0x05, 0x06),
+ TestRound(0x04, 0x05),
+ TestRound(0x01, 0x05),
+ TestRound(0x05, 0x03),
+ TestRound(0x01, 0x04),
+ TestRound(0x01, 0x03),
+ TestRound(0x03, 0x05),
+ TestRound(0x06, 0x04),
+ TestRound(0x02, 0x07),
+ TestRound(0x06, 0x01),
+ TestRound(0x02, 0x02),
+ TestRound(0x03, 0x04),
+ TestRound(0x07, 0x03),
+ TestRound(0x02, 0x01),
+ TestRound(0x03, 0x03),
+ TestRound(0x02, 0x03),
+ TestRound(0x04, 0x07),
+ TestRound(0x07, 0x04),
+ TestRound(0x07, 0x01),
+ TestRound(0x06, 0x05),
+ TestRound(0x02, 0x06),
+ TestRound(0x07, 0x07),
+ TestRound(0x04, 0x06),
+ TestRound(0x02, 0x05),
+ TestRound(0x06, 0x02),
+ TestRound(0x07, 0x02),
+ TestRound(0x07, 0x06),
+ TestRound(0x06, 0x07),
+ TestRound(0x06, 0x03),
+ TestRound(0x05, 0x04),
+ TestRound(0x07, 0x05),
+ TestRound(0x01, 0x07),
+ ]
+
+ # 2. Perform steps 3–9 2N times as follows, where N is the number of cases in Table 4.78, Table
+ # 4.79, or Table 4.80 (selected based on the supported PHY(s)):
+ # ▪ firstly using cases 1 to N from the relevant table in order;
+ # ▪ then using the cases from the relevant table in a random order.
+ phy_c_to_p = 0x1
+ phy_p_to_c = 0x1
+ for test_round in test_rounds:
+ (phy_c_to_p, phy_p_to_c) = await self.steps_3_9(peer_address, acl_connection_handle, phy_c_to_p, phy_p_to_c,
+ **vars(test_round))
+
+ async def steps_3_9(self, peer_address: Address, connection_handle: int, phy_c_to_p: int, phy_p_to_c: int,
+ req_tx_phys: int, req_rx_phys: int):
+ controller = self.controller
+
+ def phy_from_mask(mask: int):
+ if mask & 0x4:
+ return hci.PhyType.LE_CODED
+ elif mask & 0x2:
+ return hci.PhyType.LE_2M
+ else:
+ return hci.PhyType.LE_1M
+
+ # 3. Lower Tester sends an LL_PHY_REQ PDU to the IUT with the payload specified in the relevant
+ # table.
+ controller.send_ll(
+ ll.LlPhyReq(source_address=peer_address,
+ destination_address=controller.address,
+ tx_phys=req_tx_phys,
+ rx_phys=req_rx_phys))
+
+ # 4. Lower Tester receives an LL_PHY_UPDATE_IND PDU from the IUT with a value selected for
+ # PHY_C_TO_P and PHY_P_TO_C that is either a bit value present in the LL_PHY_REQ or zero,
+ # with a maximum of 1 bit set for each field. If either the PHY_C_TO_P or PHY_P_TO_C fields are
+ # nonzero, then the Instant shall have a valid value.
+ phy_update_ind = await self.expect_ll(
+ ll.LlPhyUpdateInd(source_address=controller.address,
+ destination_address=peer_address,
+ phy_c_to_p=self.Any,
+ phy_p_to_c=self.Any))
+
+ # 5. Maintain the connection using empty DATA packets until the event count matches the Instant
+ # indicated in the LL_PHY_UPDATE_IND packet.
+
+ # 6. Once the event count matches the time, the PHY(s) selected by the IUT in the
+ # LL_PHY_UPDATE_IND packet will be used.
+
+ # 7. At the Instant of the PHY change start maintaining the connection with the selected PHY(s).
+
+ # 8. IUT sends empty DATA packets to the Lower Tester, and Lower Tester acknowledges these
+ # packets, using the selected PHY(s).
+
+ # 9. If the PHY(s) were changed, Upper Tester receives an LE_PHY_Update_Complete event from
+ # the IUT containing the PHY(s) selected. If both PHYs were NOT changed, Upper Tester does
+ # NOT receive an LE_PHY_Update_Complete event.
+ next_phy_c_to_p = phy_update_ind.phy_c_to_p or phy_c_to_p
+ next_phy_p_to_c = phy_update_ind.phy_p_to_c or phy_p_to_c
+
+ if phy_update_ind.phy_c_to_p != 0 or phy_update_ind.phy_p_to_c != 0:
+ await self.expect_evt(
+ hci.LePhyUpdateComplete(connection_handle=connection_handle,
+ status=ErrorCode.SUCCESS,
+ tx_phy=phy_from_mask(next_phy_c_to_p),
+ rx_phy=phy_from_mask(next_phy_p_to_c)))
+
+ return (next_phy_c_to_p, next_phy_p_to_c)
diff --git a/tools/rootcanal/test/LL/CON_/PER/BV_40_C.py b/tools/rootcanal/test/LL/CON_/PER/BV_40_C.py
new file mode 100644
index 0000000000..635fcc6f7b
--- /dev/null
+++ b/tools/rootcanal/test/LL/CON_/PER/BV_40_C.py
@@ -0,0 +1,233 @@
+from dataclasses import dataclass
+import hci_packets as hci
+import link_layer_packets as ll
+import unittest
+from hci_packets import ErrorCode
+from py.bluetooth import Address
+from py.controller import ControllerTest
+
+
+@dataclass
+class TestRound:
+ req_all_phys: int
+ req_tx_phys: int
+ req_rx_phys: int
+ phy_ltpref_c_to_p: int
+ phy_ltpref_p_to_c: int
+
+
+class Test(ControllerTest):
+
+ # LL/CON/PER/BV-40-C [Initiating PHY Update Procedure]
+ async def test(self):
+ # Test parameters.
+ controller = self.controller
+ acl_connection_handle = 0xefe
+ peer_address = Address('11:22:33:44:55:66')
+
+ # Prelude: Establish an ACL connection as central with the IUT.
+ controller.send_cmd(
+ hci.LeSetAdvertisingParameters(advertising_interval_min=0x200,
+ advertising_interval_max=0x200,
+ advertising_type=hci.AdvertisingType.ADV_IND,
+ own_address_type=hci.OwnAddressType.PUBLIC_DEVICE_ADDRESS,
+ advertising_channel_map=0x7,
+ advertising_filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES))
+
+ await self.expect_evt(
+ hci.LeSetAdvertisingParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_cmd(hci.LeSetAdvertisingEnable(advertising_enable=True))
+
+ await self.expect_evt(hci.LeSetAdvertisingEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_ll(ll.LeConnect(source_address=peer_address,
+ destination_address=controller.address,
+ initiating_address_type=ll.AddressType.PUBLIC,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x200,
+ conn_supervision_timeout=0x200),
+ rssi=-16)
+
+ await self.expect_ll(
+ ll.LeConnectComplete(source_address=controller.address,
+ destination_address=peer_address,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x200,
+ conn_supervision_timeout=0x200))
+
+ await self.expect_evt(
+ hci.LeEnhancedConnectionComplete(status=ErrorCode.SUCCESS,
+ connection_handle=acl_connection_handle,
+ role=hci.Role.PERIPHERAL,
+ peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address=peer_address,
+ conn_interval=0x200,
+ conn_latency=0x200,
+ supervision_timeout=0x200,
+ central_clock_accuracy=hci.ClockAccuracy.PPM_500))
+
+ test_rounds = [
+ TestRound(0x00, 0x01, 0x03, 0x02, 0x00),
+ TestRound(0x00, 0x02, 0x05, 0x01, 0x02),
+ TestRound(0x00, 0x04, 0x02, 0x02, 0x04),
+ TestRound(0x00, 0x02, 0x01, 0x00, 0x02),
+ TestRound(0x00, 0x01, 0x04, 0x04, 0x00),
+ TestRound(0x00, 0x06, 0x03, 0x02, 0x02),
+ TestRound(0x00, 0x01, 0x01, 0x01, 0x01),
+ TestRound(0x00, 0x03, 0x04, 0x00, 0x01),
+ TestRound(0x00, 0x01, 0x05, 0x00, 0x00),
+ TestRound(0x00, 0x02, 0x04, 0x04, 0x02),
+ TestRound(0x00, 0x07, 0x03, 0x01, 0x00),
+ TestRound(0x00, 0x06, 0x06, 0x02, 0x04),
+ TestRound(0x00, 0x02, 0x03, 0x02, 0x02),
+ TestRound(0x00, 0x06, 0x01, 0x01, 0x04),
+ TestRound(0x00, 0x06, 0x05, 0x01, 0x04),
+ TestRound(0x00, 0x05, 0x04, 0x04, 0x01),
+ TestRound(0x00, 0x05, 0x01, 0x01, 0x00),
+ TestRound(0x00, 0x03, 0x05, 0x01, 0x02),
+ TestRound(0x00, 0x04, 0x06, 0x02, 0x04),
+ TestRound(0x00, 0x07, 0x02, 0x00, 0x01),
+ TestRound(0x00, 0x01, 0x06, 0x02, 0x00),
+ TestRound(0x00, 0x02, 0x02, 0x02, 0x00),
+ TestRound(0x00, 0x04, 0x03, 0x01, 0x04),
+ TestRound(0x00, 0x03, 0x07, 0x04, 0x01),
+ TestRound(0x00, 0x01, 0x02, 0x02, 0x01),
+ TestRound(0x00, 0x03, 0x03, 0x01, 0x01),
+ TestRound(0x00, 0x03, 0x02, 0x02, 0x01),
+ TestRound(0x00, 0x07, 0x04, 0x00, 0x02),
+ TestRound(0x00, 0x04, 0x07, 0x00, 0x00),
+ TestRound(0x00, 0x01, 0x07, 0x04, 0x00),
+ TestRound(0x00, 0x05, 0x06, 0x04, 0x01),
+ TestRound(0x00, 0x06, 0x02, 0x00, 0x04),
+ TestRound(0x00, 0x07, 0x07, 0x01, 0x01),
+ TestRound(0x00, 0x06, 0x04, 0x04, 0x02),
+ TestRound(0x00, 0x05, 0x02, 0x02, 0x01),
+ TestRound(0x00, 0x02, 0x06, 0x04, 0x00),
+ TestRound(0x00, 0x02, 0x07, 0x01, 0x00),
+ TestRound(0x00, 0x06, 0x07, 0x04, 0x04),
+ TestRound(0x00, 0x07, 0x06, 0x02, 0x02),
+ TestRound(0x00, 0x03, 0x06, 0x00, 0x02),
+ TestRound(0x00, 0x04, 0x05, 0x04, 0x04),
+ TestRound(0x00, 0x05, 0x07, 0x04, 0x01),
+ TestRound(0x00, 0x07, 0x01, 0x00, 0x04),
+ TestRound(0x00, 0x06, 0x06, 0x02, 0x02),
+ TestRound(0x00, 0x06, 0x06, 0x04, 0x04),
+ TestRound(0x01, 0x00, 0x01, 0x01, 0x01),
+ TestRound(0x02, 0x03, 0x00, 0x02, 0x02),
+ TestRound(0x03, 0x00, 0x00, 0x01, 0x01),
+ TestRound(0x02, 0x05, 0x00, 0x04, 0x04),
+ TestRound(0x03, 0x00, 0x00, 0x04, 0x04),
+ TestRound(0x03, 0x00, 0x00, 0x02, 0x02),
+ TestRound(0x01, 0x00, 0x06, 0x04, 0x04),
+ TestRound(0x01, 0x00, 0x06, 0x04, 0x02),
+ TestRound(0x02, 0x07, 0x00, 0x02, 0x01),
+ TestRound(0x02, 0x06, 0x00, 0x02, 0x02),
+ # Disabled test cases:
+ # the values are invalid as RFU, not certain
+ # if that is intended by the TS or an
+ # error.
+ #TestRound(0x01, 0x00, 0x08, 0x02, 0x02),
+ #TestRound(0x01, 0x00, 0x11, 0x01, 0x00),
+ #TestRound(0x01, 0x00, 0x20, 0x02, 0x01),
+ #TestRound(0x01, 0x00, 0x41, 0x01, 0x01),
+ #TestRound(0x01, 0x00, 0x80, 0x01, 0x02),
+ #TestRound(0x01, 0x00, 0xFF, 0x00, 0x01),
+ #TestRound(0x02, 0x08, 0x00, 0x00, 0x00),
+ #TestRound(0x02, 0x11, 0x00, 0x02, 0x00),
+ #TestRound(0x02, 0x20, 0x00, 0x00, 0x02),
+ #TestRound(0x02, 0x41, 0x00, 0x01, 0x01),
+ #TestRound(0x02, 0x80, 0x00, 0x02, 0x02),
+ #TestRound(0x02, 0xFF, 0x00, 0x01, 0x01),
+ ]
+
+ # Repeat steps 1-9 for each Round shown in Table 4.77.
+ phy_c_to_p = 0x1
+ phy_p_to_c = 0x1
+ for test_round in test_rounds:
+ (phy_c_to_p, phy_p_to_c) = await self.steps_1_9(peer_address, acl_connection_handle, phy_c_to_p, phy_p_to_c,
+ **vars(test_round))
+
+ async def steps_1_9(self, peer_address: Address, connection_handle: int, phy_c_to_p: int, phy_p_to_c: int,
+ req_all_phys: int, req_tx_phys: int, req_rx_phys: int, phy_ltpref_c_to_p: int,
+ phy_ltpref_p_to_c: int):
+ controller = self.controller
+
+ def phy_from_mask(mask: int):
+ if mask & 0x4:
+ return hci.PhyType.LE_CODED
+ elif mask & 0x2:
+ return hci.PhyType.LE_2M
+ else:
+ return hci.PhyType.LE_1M
+
+ # 1. Upper Tester sends an HCI_LE_Set_PHY command to the IUT with the payload defined in the
+ # HCI_LE_Set_PHY section of Table 4.66 and PHY_options set to 0x0000.
+ controller.send_cmd(
+ hci.LeSetPhy(connection_handle=connection_handle,
+ all_phys_no_transmit_preference=(req_all_phys & 0x1) != 0,
+ all_phys_no_receive_preference=(req_all_phys & 0x2) != 0,
+ tx_phys=req_tx_phys,
+ rx_phys=req_rx_phys,
+ phy_options=hci.PhyOptions.NO_PREFERENCE))
+
+ # 2. The Upper Tester receives an HCI_Command_Status event from the IUT in response. If any bits
+ # set in TX_PHYS or RX_PHYS correspond to unsupported PHYs, the Status shall be set to
+ # “Unsupported Feature or Parameter Value (0x11)”. If the IUT does not support Asymmetric
+ # Connections, when ALL_PHYS is 0x00 and TX_PHYS does not equal RX_PHYS, the Status
+ # shall be set to “Unsupported Feature or Parameter Value (0x11)”. Otherwise. the Status shall be
+ # set to zero.
+ await self.expect_evt(hci.LeSetPhyStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 3. If the IUT does NOT initiate a PHY change, proceed to step 9 if the Status in step 2 was set to
+ # zero, or proceed to the next round if the Status in step 2 was set to a non-zero value.
+
+ if (req_all_phys & 0x1) != 0:
+ req_tx_phys = 0x7
+ if (req_all_phys & 0x2) != 0:
+ req_rx_phys = 0x7
+
+ # 4. The Lower Tester receives an LL_PHY_REQ control PDU from the IUT with at least one bit in
+ # each field (TX_PHYS, RX_PHYS) set. The Lower Tester responds with an
+ # LL_PHY_UPDATE_IND PDU with the values defined in the “Lower Tester preference” section of
+ # Table 4.66 bitwise ANDed against the value sent by the IUT in the LL_PHY_REQ PDU.
+ await self.expect_ll(
+ ll.LlPhyReq(source_address=controller.address,
+ destination_address=peer_address,
+ tx_phys=req_tx_phys,
+ rx_phys=req_rx_phys))
+
+ next_phy_c_to_p = (req_rx_phys & phy_ltpref_c_to_p) or phy_c_to_p
+ next_phy_p_to_c = (req_tx_phys & phy_ltpref_p_to_c) or phy_p_to_c
+ controller.send_ll(
+ ll.LlPhyUpdateInd(source_address=peer_address,
+ destination_address=controller.address,
+ phy_c_to_p=(0 if next_phy_c_to_p == phy_c_to_p else next_phy_c_to_p),
+ phy_p_to_c=(0 if next_phy_p_to_c == phy_p_to_c else next_phy_p_to_c)))
+
+ # 5. Lower Tester receives a packet from the IUT acknowledging the LL_PHY_UPDATE_IND.
+
+ # 6. Lower Tester sends empty DATA packets to the IUT, receiving acknowledgements until the event
+ # count matches the Instant indicated in the LL_PHY_UPDATE_IND packet.
+
+ # 7. At the Instant of the PHY change, start maintaining the connection with the new PHY(s) selected
+ # by the LL_PHY_UPDATE_IND PDU (or no change, if no change was specified in the
+ # LL_PHY_UPDATE_IND PDU).
+
+ # 8. Lower Tester sends empty DATA packets to the IUT, receiving acknowledgements. If PHY has
+ # changed, the Lower Tester shall use the new PHY.
+
+ # 9. If the command was accepted in step 2 or at least one of the PHY fields in the
+ # LL_PHY_UPDATE_IND PDU was non-zero, the Upper Tester receives an
+ # LE_PHY_Update_Complete event from the IUT with a payload consistent with the PHY(s)
+ # indicated in the LL_PHY_UPDATE_IND PDU (or the prior PHY, in cases where a field in the
+ # LL_PHY_UPDATE_IND PDU was zero or the LL_PHY_UPDATE_IND PDU was not sent).
+ # Otherwise the Upper Tester receives no event.
+ await self.expect_evt(
+ hci.LePhyUpdateComplete(connection_handle=connection_handle,
+ status=ErrorCode.SUCCESS,
+ tx_phy=phy_from_mask(next_phy_p_to_c),
+ rx_phy=phy_from_mask(next_phy_c_to_p)))
+
+ return (next_phy_c_to_p, next_phy_p_to_c)
diff --git a/tools/rootcanal/test/LL/CON_/PER/BV_42_C.py b/tools/rootcanal/test/LL/CON_/PER/BV_42_C.py
new file mode 100644
index 0000000000..fe781732d4
--- /dev/null
+++ b/tools/rootcanal/test/LL/CON_/PER/BV_42_C.py
@@ -0,0 +1,241 @@
+from dataclasses import dataclass
+import hci_packets as hci
+import link_layer_packets as ll
+import unittest
+from hci_packets import ErrorCode
+from py.bluetooth import Address
+from py.controller import ControllerTest
+from typing import List
+
+
+@dataclass
+class TestRound:
+ req_tx_phys: int
+ req_rx_phys: int
+ phy_ltpref_c_to_p: List[int]
+ phy_ltpref_p_to_c: List[int]
+
+
+class Test(ControllerTest):
+
+ # LL/CON/PER/BV-40-C [Initiating PHY Update Procedure]
+ async def test(self):
+ # Test parameters.
+ controller = self.controller
+ acl_connection_handle = 0xefe
+ peer_address = Address('11:22:33:44:55:66')
+
+ # Prelude: Establish an ACL connection as central with the IUT.
+ controller.send_cmd(
+ hci.LeSetAdvertisingParameters(advertising_interval_min=0x200,
+ advertising_interval_max=0x200,
+ advertising_type=hci.AdvertisingType.ADV_IND,
+ own_address_type=hci.OwnAddressType.PUBLIC_DEVICE_ADDRESS,
+ advertising_channel_map=0x7,
+ advertising_filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES))
+
+ await self.expect_evt(
+ hci.LeSetAdvertisingParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_cmd(hci.LeSetAdvertisingEnable(advertising_enable=True))
+
+ await self.expect_evt(hci.LeSetAdvertisingEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_ll(ll.LeConnect(source_address=peer_address,
+ destination_address=controller.address,
+ initiating_address_type=ll.AddressType.PUBLIC,
+ advertising_address_type=ll.AddressType.PUBLIC,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x200,
+ conn_supervision_timeout=0x200),
+ rssi=-16)
+
+ await self.expect_ll(
+ ll.LeConnectComplete(source_address=controller.address,
+ destination_address=peer_address,
+ conn_interval=0x200,
+ conn_peripheral_latency=0x200,
+ conn_supervision_timeout=0x200))
+
+ await self.expect_evt(
+ hci.LeEnhancedConnectionComplete(status=ErrorCode.SUCCESS,
+ connection_handle=acl_connection_handle,
+ role=hci.Role.PERIPHERAL,
+ peer_address_type=hci.AddressType.PUBLIC_DEVICE_ADDRESS,
+ peer_address=peer_address,
+ conn_interval=0x200,
+ conn_latency=0x200,
+ supervision_timeout=0x200,
+ central_clock_accuracy=hci.ClockAccuracy.PPM_500))
+
+ # 1. Upper Tester sends an HCI_LE_Set_PHY command to the IUT with the ALL_PHYS fields set to a
+ # value of 0x03. Upper Tester receives an HCI_Command_Status event indicating success in
+ # response.
+ controller.send_cmd(
+ hci.LeSetPhy(connection_handle=acl_connection_handle,
+ all_phys_no_transmit_preference=True,
+ all_phys_no_receive_preference=True,
+ tx_phys=0,
+ rx_phys=0))
+
+ await self.expect_evt(hci.LeSetPhyStatus(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ await self.expect_ll(
+ ll.LlPhyReq(source_address=controller.address, destination_address=peer_address, tx_phys=0x7, rx_phys=0x7))
+
+ controller.send_ll(
+ ll.LlPhyUpdateInd(source_address=peer_address,
+ destination_address=controller.address,
+ phy_c_to_p=0x2,
+ phy_p_to_c=0x2))
+
+ # 2. The Upper Tester receives an HCI_LE_PHY_Update_Complete event from the IUT.
+ await self.expect_evt(
+ hci.LePhyUpdateComplete(status=ErrorCode.SUCCESS,
+ connection_handle=acl_connection_handle,
+ tx_phy=hci.PhyType.LE_2M,
+ rx_phy=hci.PhyType.LE_2M))
+
+ test_rounds = [
+ TestRound(0x03, 0x01, [0x02], [0x01]),
+ TestRound(0x05, 0x02, [0x01], [0x02]),
+ TestRound(0x02, 0x04, [0x02], [0x04]),
+ TestRound(0x01, 0x02, [0x01], [0x02]),
+ TestRound(0x04, 0x01, [0x04], [0x01]),
+ TestRound(0x03, 0x06, [0x02], [0x02]),
+ TestRound(0x01, 0x01, [0x01], [0x01]),
+ TestRound(0x04, 0x03, [0x04], [0x01]),
+ TestRound(0x05, 0x01, [0x01], [0x01]),
+ TestRound(0x04, 0x04, [0x04], [0x04]),
+ TestRound(0x05, 0x07, [0x04], [0x02, 0x04]),
+ TestRound(0x05, 0x05, [0x04], [0x01]),
+ TestRound(0x04, 0x02, [0x04], [0x02]),
+ TestRound(0x03, 0x07, [0x01], [0x04, 0x01]),
+ TestRound(0x06, 0x06, [0x02], [0x04]),
+ TestRound(0x03, 0x02, [0x02], [0x02]),
+ TestRound(0x01, 0x06, [0x01], [0x04]),
+ TestRound(0x05, 0x06, [0x01], [0x04]),
+ TestRound(0x04, 0x05, [0x04], [0x01]),
+ TestRound(0x01, 0x05, [0x01], [0x04]),
+ TestRound(0x05, 0x03, [0x01], [0x02]),
+ TestRound(0x01, 0x04, [0x01], [0x04]),
+ TestRound(0x01, 0x03, [0x01], [0x02]),
+ TestRound(0x03, 0x05, [0x02], [0x01]),
+ TestRound(0x06, 0x04, [0x02], [0x04]),
+ TestRound(0x02, 0x07, [0x02], [0x01, 0x02]),
+ TestRound(0x06, 0x01, [0x02], [0x01]),
+ TestRound(0x02, 0x02, [0x02], [0x02]),
+ TestRound(0x03, 0x04, [0x01], [0x04]),
+ TestRound(0x07, 0x03, [0x04, 0x01], [0x01]),
+ TestRound(0x02, 0x01, [0x02], [0x01]),
+ TestRound(0x03, 0x03, [0x01], [0x01]),
+ TestRound(0x02, 0x03, [0x02], [0x01]),
+ TestRound(0x04, 0x07, [0x04], [0x02, 0x04]),
+ TestRound(0x07, 0x04, [0x01, 0x04], [0x04]),
+ TestRound(0x07, 0x01, [0x04, 0x02], [0x01]),
+ TestRound(0x06, 0x05, [0x04], [0x01]),
+ TestRound(0x02, 0x06, [0x02], [0x04]),
+ TestRound(0x07, 0x07, [0x01, 0x02], [0x01, 0x04]),
+ TestRound(0x04, 0x06, [0x04], [0x02]),
+ TestRound(0x02, 0x05, [0x02], [0x01]),
+ TestRound(0x06, 0x02, [0x04], [0x02]),
+ TestRound(0x07, 0x02, [0x01, 0x02], [0x02]),
+ TestRound(0x07, 0x06, [0x04, 0x01], [0x04]),
+ TestRound(0x06, 0x07, [0x02], [0x02, 0x01]),
+ TestRound(0x06, 0x03, [0x02], [0x02]),
+ TestRound(0x05, 0x04, [0x04], [0x04]),
+ TestRound(0x07, 0x05, [0x04, 0x02], [0x01]),
+ TestRound(0x01, 0x07, [0x01], [0x04, 0x02]),
+ ]
+
+ # 3. Perform steps 4 through 11 2N times as follows, where N is the number of cases in Table 4.67,
+ # Table 4.68, or Table 4.69 (selected based on the supported PHY(s)):
+ # ▪ firstly using cases 1 to N from the relevant table in order;
+ # ▪ then using the cases from the relevant table in a random order.
+ phy_c_to_p = 0x2
+ phy_p_to_c = 0x2
+ for test_round in test_rounds:
+ (phy_c_to_p, phy_p_to_c) = await self.steps_4_11(peer_address, acl_connection_handle, phy_c_to_p,
+ phy_p_to_c, **vars(test_round))
+
+ async def steps_4_11(self, peer_address: Address, connection_handle: int, phy_c_to_p: int, phy_p_to_c: int,
+ req_tx_phys: int, req_rx_phys: int, phy_ltpref_c_to_p: List[int],
+ phy_ltpref_p_to_c: List[int]):
+ controller = self.controller
+
+ def phy_from_mask(mask: int):
+ if mask & 0x4:
+ return hci.PhyType.LE_CODED
+ elif mask & 0x2:
+ return hci.PhyType.LE_2M
+ else:
+ return hci.PhyType.LE_1M
+
+ # 4. Lower Tester sends an LL_PHY_REQ PDU to the IUT to initiate a PHY change with the payload
+ # defined in the LL_PHY_REQ section of the relevant table.
+ controller.send_ll(
+ ll.LlPhyReq(source_address=peer_address,
+ destination_address=controller.address,
+ tx_phys=req_tx_phys,
+ rx_phys=req_rx_phys))
+
+ # 5. Lower Tester receives an LL_PHY_RSP control PDU from the IUT with at least one bit set in
+ # each field (TX_PHYS, RX_PHYS).
+ phy_rsp = await self.expect_ll(
+ ll.LlPhyRsp(source_address=controller.address,
+ destination_address=peer_address,
+ tx_phys=self.Any,
+ rx_phys=self.Any))
+
+ self.assertTrue(phy_rsp.tx_phys != 0)
+ self.assertTrue(phy_rsp.rx_phys != 0)
+
+ # 6. Lower Tester responds with an LL_PHY_UPDATE_IND PDU.
+ next_phy_c_to_p = req_tx_phys & phy_rsp.rx_phys
+ next_phy_p_to_c = req_rx_phys & phy_rsp.tx_phys
+
+ if next_phy_c_to_p.bit_count() > 1:
+ for phy in phy_ltpref_c_to_p:
+ if (next_phy_c_to_p & phy) != 0:
+ next_phy_c_to_p = phy
+ break
+
+ if next_phy_p_to_c.bit_count() > 1:
+ for phy in phy_ltpref_p_to_c:
+ if (next_phy_p_to_c & phy) != 0:
+ next_phy_p_to_c = phy
+ break
+
+ next_phy_c_to_p = next_phy_c_to_p or phy_c_to_p
+ next_phy_p_to_c = next_phy_p_to_c or phy_p_to_c
+
+ controller.send_ll(
+ ll.LlPhyUpdateInd(source_address=peer_address,
+ destination_address=controller.address,
+ phy_c_to_p=(0 if next_phy_c_to_p == phy_c_to_p else next_phy_c_to_p),
+ phy_p_to_c=(0 if next_phy_p_to_c == phy_p_to_c else next_phy_p_to_c)))
+
+ # 7. Lower Tester receives a packet from the IUT acknowledging the LL_PHY_UPDATE_IND.
+ # If both the PHY_C_TO_P and PHY_P_TO_C fields of the LL_PHY_UPDATE_IND are zero, skip
+ # to step 11.
+
+ # 8. Lower Tester sends empty DATA packets to the IUT, receiving acknowledgements until the event
+ # count matches the indicated Instant of the PHY change.
+
+ # 9. At the Instant of the PHY change the IUT starts maintaining the connection with the new PHY(s)
+ # selected by the Lower Tester.
+
+ # 10. Lower Tester sends empty DATA packets to the IUT, receiving acknowledgements. If the PHY(s)
+ # have changed, the Lower Tester shall use the new PHY(s).
+
+ # 11. If the PHY(s) were changed, Upper Tester receives a LE_PHY_Update_Complete event from the
+ # IUT containing the PHYs selected. If both PHYs were NOT changed, Upper Tester does NOT
+ # receive a LE_PHY_Update_Complete event
+ if next_phy_c_to_p != phy_c_to_p or next_phy_p_to_c != phy_p_to_c:
+ await self.expect_evt(
+ hci.LePhyUpdateComplete(connection_handle=connection_handle,
+ status=ErrorCode.SUCCESS,
+ tx_phy=phy_from_mask(next_phy_p_to_c),
+ rx_phy=phy_from_mask(next_phy_c_to_p)))
+
+ return (next_phy_c_to_p, next_phy_p_to_c)
diff --git a/tools/rootcanal/test/LL/DDI/ADV/BV_20_C.py b/tools/rootcanal/test/LL/DDI/ADV/BV_20_C.py
index b1c1769b5a..f342961cce 100644
--- a/tools/rootcanal/test/LL/DDI/ADV/BV_20_C.py
+++ b/tools/rootcanal/test/LL/DDI/ADV/BV_20_C.py
@@ -28,8 +28,8 @@ class Test(ControllerTest):
controller.send_cmd(
hci.LeSetDefaultPhy(all_phys_no_transmit_preference=False,
all_phys_no_receive_preference=False,
- tx_phys_bitmask=0x2,
- rx_phys_bitmask=0x2))
+ tx_phys=0x2,
+ rx_phys=0x2))
await self.expect_evt(hci.LeSetDefaultPhyComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
diff --git a/tools/rootcanal/test/LL/DDI/ADV/BV_26_C.py b/tools/rootcanal/test/LL/DDI/ADV/BV_26_C.py
index f3b1365ed9..6cdbaa7010 100644
--- a/tools/rootcanal/test/LL/DDI/ADV/BV_26_C.py
+++ b/tools/rootcanal/test/LL/DDI/ADV/BV_26_C.py
@@ -162,9 +162,10 @@ class Test(ControllerTest):
# until an AUX_CHAIN_IND PDU is received with no AuxPtr field and all data has been received.
# 12. Repeat steps 8–11 100 times.
- received = [0, 0]
+ received_extended_advertising_pdus = 0
+ received_periodic_advertising_pdus = 0
for n in range(15):
- index = await self.expect_ll([
+ pdu = await self.expect_ll([
ll.LeExtendedAdvertisingPdu(source_address=controller.address,
advertising_address_type=ll.AddressType.PUBLIC,
target_address_type=ll.AddressType.PUBLIC,
@@ -184,13 +185,16 @@ class Test(ControllerTest):
advertising_interval=0x100,
advertising_data=advertising_data)
])
- received[index] = received[index] + 1
+ if isinstance(pdu, ll.LeExtendedAdvertisingPdu):
+ received_extended_advertising_pdus += 1
+ if isinstance(pdu, ll.LePeriodicAdvertisingPdu):
+ received_periodic_advertising_pdus += 1
# Note: the extended advertising interval is twice the periodic
# advertising interval; the number of events received of each kind is
# deterministic.
- self.assertTrue(received[0] == 5)
- self.assertTrue(received[1] == 10)
+ self.assertTrue(received_extended_advertising_pdus == 5)
+ self.assertTrue(received_periodic_advertising_pdus == 10)
# 13. The Upper Tester disables extended advertising using the
# HCI_LE_Set_Extended_Advertising_Enable command but maintains periodic advertising.
diff --git a/tools/rootcanal/test/LL/DDI/SCN/BV_18_C.py b/tools/rootcanal/test/LL/DDI/SCN/BV_18_C.py
index a56ba366df..5ab25411b5 100644
--- a/tools/rootcanal/test/LL/DDI/SCN/BV_18_C.py
+++ b/tools/rootcanal/test/LL/DDI/SCN/BV_18_C.py
@@ -88,17 +88,13 @@ class Test(ControllerTest):
# 6. Lower Tester receives a SCAN_REQ packet T_IFS after any of the
# ADV_SCAN_IND packets. The ScanA field in the SCAN_REQ packet shall
# use the same resolvable private address.
- scan_req = await asyncio.wait_for(controller.receive_ll(), timeout=3)
- scan_req = ll.LinkLayerPacket.parse_all(scan_req)
- self.assertTrue(isinstance(scan_req, ll.LeScan))
- # TODO check that source_address is resolvable by lower tester.
- self.assertTrue(scan_req.source_address.is_resolvable())
- self.assertEqual(
- scan_req,
- ll.LeScan(source_address=scan_req.source_address,
+ scan_req = await self.expect_ll(
+ ll.LeScan(source_address=self.Any,
destination_address=peer_resolvable_address,
advertising_address_type=ll.AddressType.RANDOM,
scanning_address_type=ll.AddressType.RANDOM))
+ # TODO check that source_address is resolvable by lower tester.
+ self.assertTrue(scan_req.source_address.is_resolvable())
# 8. Interleave with step 6: Upper Tester receives an
# HCI_LE_Advertising_Report containing the information used in the
diff --git a/tools/rootcanal/test/main.py b/tools/rootcanal/test/main.py
index aefd3d4972..8868e26c0a 100644
--- a/tools/rootcanal/test/main.py
+++ b/tools/rootcanal/test/main.py
@@ -6,6 +6,10 @@ import tempfile
import unittest
tests = [
+ 'LL.CON_.CEN.BV_41_C',
+ 'LL.CON_.CEN.BV_43_C',
+ 'LL.CON_.PER.BV_40_C',
+ 'LL.CON_.PER.BV_42_C',
'LL.DDI.ADV.BV_01_C',
'LL.DDI.ADV.BV_02_C',
'LL.DDI.ADV.BV_03_C',