diff options
| author | 2022-11-10 22:44:03 +0000 | |
|---|---|---|
| committer | 2022-12-01 00:28:28 +0000 | |
| commit | 41b6cae9c74732b89c30d56f7a5a6ba7553889a0 (patch) | |
| tree | 3e4f9a75508e35c0ab07124e5fd51ebc72421a0d | |
| parent | d18c06ea231786fd4c6120c746974a623cbb5ec9 (diff) | |
RootCanal: Implement rootcanal_ll_test
This change adds a python binding for the DualModeController
class to enable isolated LL+HCI tests on one controller
instance.
Implement test LL/DDI/SCN/BV-13-C test from Bluetooth controller
certification test suite.
Test: atest --host rootcanal_ll_test
Change-Id: I5b0b9fd9608769346deb9c20dbf1d7aa809e8a0a
| -rw-r--r-- | tools/rootcanal/Android.bp | 107 | ||||
| -rw-r--r-- | tools/rootcanal/model/controller/dual_mode_controller_python3.cc | 230 | ||||
| -rw-r--r-- | tools/rootcanal/model/controller/link_layer_controller.cc | 5 | ||||
| -rw-r--r-- | tools/rootcanal/model/controller/link_layer_controller.h | 4 | ||||
| -rw-r--r-- | tools/rootcanal/py/bluetooth.py | 48 | ||||
| -rw-r--r-- | tools/rootcanal/py/controller.py | 119 | ||||
| -rw-r--r-- | tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py | 129 | ||||
| -rw-r--r-- | tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py | 85 | ||||
| -rw-r--r-- | tools/rootcanal/test/main.py | 29 |
9 files changed, 752 insertions, 4 deletions
diff --git a/tools/rootcanal/Android.bp b/tools/rootcanal/Android.bp index 52ec3745b1..fa5c135fe3 100644 --- a/tools/rootcanal/Android.bp +++ b/tools/rootcanal/Android.bp @@ -105,6 +105,84 @@ cc_library_static { ], } +// This library implements Python bindings to the DualModeController +// class to enable scripted testing in Python. +cc_library_host_shared { + name: "lib_rootcanal_python3", + defaults: [ + "bluetooth_py3_native_extension_defaults", + "rootcanal_defaults", + ], + srcs: [ + "model/controller/acl_connection.cc", + "model/controller/acl_connection_handler.cc", + "model/controller/controller_properties.cc", + "model/controller/dual_mode_controller.cc", + "model/controller/dual_mode_controller_python3.cc", + "model/controller/isochronous_connection_handler.cc", + "model/controller/le_advertiser.cc", + "model/controller/link_layer_controller.cc", + "model/controller/sco_connection.cc", + "model/controller/security_manager.cc", + "model/devices/device.cc", + "model/setup/async_manager.cc", + ":BluetoothPacketSources", + ":BluetoothHciClassSources", + ":BluetoothCryptoToolboxSources", + ], + export_include_dirs: [ + "include", + ".", + ], + stl: "libc++_static", + static_libs: [ + "libjsoncpp", + ], + whole_static_libs: [ + "libbase", + "liblmp", + ], + header_libs: [ + "pybind11_headers", + ], + cflags: [ + "-fexceptions", + ], + rtti: true, +} + +// Generate the python parser+serializer backend for +// packets/link_layer_packets.pdl. +genrule { + name: "link_layer_packets_python3_gen", + defaults: [ "pdl_python_generator_defaults" ], + cmd: "$(location :pdl) $(in) |" + + " $(location :pdl_python_generator)" + + " --output $(out) --custom-type-location py.bluetooth", + srcs: [ + "packets/link_layer_packets.pdl", + ], + out: [ + "link_layer_packets.py", + ], +} + +// Generate the python parser+serializer backend for +// hci_packets.pdl. +genrule { + name: "hci_packets_python3_gen", + defaults: [ "pdl_python_generator_defaults" ], + cmd: "$(location :pdl) $(in) |" + + " $(location :pdl_python_generator)" + + " --output $(out) --custom-type-location py.bluetooth", + srcs: [ + ":BluetoothHciPackets", + ], + out: [ + "hci_packets.py", + ], +} + cc_library_static { name: "libscriptedbeaconpayload-protos-lite", host_supported: true, @@ -162,6 +240,35 @@ cc_test_host { ], } +// Implement the Bluetooth official LL test suite for root-canal. +python_test_host { + name: "rootcanal_ll_test", + main: "test/main.py", + srcs: [ + "py/controller.py", + "py/bluetooth.py", + ":hci_packets_python3_gen", + ":link_layer_packets_python3_gen", + "test/main.py", + "test/LL/DDI/SCN/BV_13_C.py", + "test/LL/DDI/SCN/BV_14_C.py", + ], + data: [ + ":lib_rootcanal_python3", + ], + libs: [ + "typing_extensions", + ], + test_options: { + unit_test: true, + }, + version: { + py3: { + embedded_launcher: true, + }, + }, +} + // test-vendor unit tests for host cc_test_host { name: "rootcanal_test_host", diff --git a/tools/rootcanal/model/controller/dual_mode_controller_python3.cc b/tools/rootcanal/model/controller/dual_mode_controller_python3.cc new file mode 100644 index 0000000000..d24c792246 --- /dev/null +++ b/tools/rootcanal/model/controller/dual_mode_controller_python3.cc @@ -0,0 +1,230 @@ +/* + * Copyright 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <android-base/logging.h> +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> + +#include "dual_mode_controller.h" + +using namespace std::literals; +namespace py = pybind11; + +namespace rootcanal { + +namespace hci { +enum Type { + CMD, + EVT, + ACL, + SCO, + ISO, +}; +} // namespace hci + +// Overload the class DualModeController to implement +// SendLinkLayerPacket as forwarding packets to a registered handler. +class BaseController : public DualModeController { + public: + BaseController() : DualModeController() { + RegisterTaskScheduler( + [this](std::chrono::milliseconds delay, TaskCallback const& task) { + return this->async_manager_.ExecAsync(0, delay, task); + }); + RegisterPeriodicTaskScheduler([this](std::chrono::milliseconds delay, + std::chrono::milliseconds period, + TaskCallback const& task) { + return this->async_manager_.ExecAsyncPeriodically(0, delay, period, task); + }); + RegisterTaskCancel([this](AsyncTaskId task_id) { + this->async_manager_.CancelAsyncTask(task_id); + }); + } + ~BaseController() = default; + + void RegisterLLChannel( + std::function<void(std::shared_ptr<std::vector<uint8_t>>)> const& + send_ll) { + send_ll_ = send_ll; + } + + void Start() { + if (timer_task_id_ == kInvalidTaskId) { + timer_task_id_ = async_manager_.ExecAsyncPeriodically( + 0, 0ms, 5ms, [this]() { this->TimerTick(); }); + } + } + + void Stop() { + if (timer_task_id_ != kInvalidTaskId) { + async_manager_.CancelAsyncTask(timer_task_id_); + timer_task_id_ = kInvalidTaskId; + } + } + + virtual void SendLinkLayerPacket( + std::shared_ptr<model::packets::LinkLayerPacketBuilder> packet, + Phy::Type phy_type_) override { + (void)phy_type_; + auto bytes = std::make_shared<std::vector<uint8_t>>(); + bluetooth::packet::BitInserter inserter(*bytes); + bytes->reserve(packet->size()); + packet->Serialize(inserter); + send_ll_(bytes); + } + + private: + std::function<void(std::shared_ptr<std::vector<uint8_t>>)> send_ll_{}; + AsyncManager async_manager_; + AsyncTaskId timer_task_id_; + + BaseController(BaseController const&) = delete; + DualModeController& operator=(BaseController const&) = delete; +}; + +PYBIND11_MODULE(lib_rootcanal_python3, m) { + m.doc() = "RootCanal controller plugin"; + + py::enum_<hci::Type>(m, "HciType") + .value("Cmd", hci::Type::CMD) + .value("Evt", hci::Type::EVT) + .value("Acl", hci::Type::ACL) + .value("Sco", hci::Type::SCO) + .value("Iso", hci::Type::ISO); + + m.def( + "generate_rpa", + [](py::bytes arg) { + std::string irk_str = arg; + irk_str.resize(LinkLayerController::kIrkSize); + + std::array<uint8_t, LinkLayerController::kIrkSize> irk{}; + std::copy(irk_str.begin(), irk_str.end(), irk.begin()); + + bluetooth::hci::Address rpa = + rootcanal::LinkLayerController::generate_rpa(irk); + return rpa.address; + }, + "Bluetooth RPA generation"); + + py::class_<rootcanal::BaseController, + std::shared_ptr<rootcanal::BaseController>> + basic_controller(m, "BaseController"); + + // Implement the constructor with two callback parameters to + // handle emitted HCI packets and LL packets. + basic_controller.def(py::init([](py::object hci_handler, + py::object ll_handler) { + std::shared_ptr<BaseController> controller = + std::make_shared<BaseController>(); + controller->RegisterEventChannel( + [=](std::shared_ptr<std::vector<uint8_t>> data) { + pybind11::gil_scoped_acquire acquire; + hci_handler( + hci::Type::EVT, + py::bytes(reinterpret_cast<char*>(data->data()), data->size())); + }); + controller->RegisterAclChannel( + [=](std::shared_ptr<std::vector<uint8_t>> data) { + pybind11::gil_scoped_acquire acquire; + hci_handler( + hci::Type::ACL, + py::bytes(reinterpret_cast<char*>(data->data()), data->size())); + }); + controller->RegisterScoChannel( + [=](std::shared_ptr<std::vector<uint8_t>> data) { + pybind11::gil_scoped_acquire acquire; + hci_handler( + hci::Type::SCO, + py::bytes(reinterpret_cast<char*>(data->data()), data->size())); + }); + controller->RegisterIsoChannel( + [=](std::shared_ptr<std::vector<uint8_t>> data) { + pybind11::gil_scoped_acquire acquire; + hci_handler( + hci::Type::ISO, + py::bytes(reinterpret_cast<char*>(data->data()), data->size())); + }); + controller->RegisterLLChannel( + [=](std::shared_ptr<std::vector<uint8_t>> data) { + pybind11::gil_scoped_acquire acquire; + ll_handler( + py::bytes(reinterpret_cast<char*>(data->data()), data->size())); + }); + return controller; + })); + + // Timer interface. + basic_controller.def("start", &BaseController::Start); + basic_controller.def("stop", &BaseController::Stop); + + // Implement method BaseController.receive_hci which + // injects HCI packets into the controller as if sent from the host. + basic_controller.def( + "send_hci", [](std::shared_ptr<rootcanal::BaseController> controller, + hci::Type typ, py::bytes data) { + std::string data_str = data; + std::shared_ptr<std::vector<uint8_t>> bytes = + std::make_shared<std::vector<uint8_t>>(data_str.begin(), + data_str.end()); + + switch (typ) { + case hci::Type::CMD: + controller->HandleCommand(bytes); + break; + case hci::Type::ACL: + controller->HandleAcl(bytes); + break; + case hci::Type::SCO: + controller->HandleSco(bytes); + break; + case hci::Type::ISO: + controller->HandleIso(bytes); + break; + default: + std::cerr << "Dropping HCI packet with unknown type " << typ + << std::endl; + break; + } + }); + + // Implement method BaseController.receive_hci which + // injects LL packets into the controller as if sent over the air. + basic_controller.def( + "send_ll", [](std::shared_ptr<rootcanal::BaseController> controller, + py::bytes data) { + std::string data_str = data; + std::shared_ptr<std::vector<uint8_t>> bytes = + std::make_shared<std::vector<uint8_t>>(data_str.begin(), + data_str.end()); + + model::packets::LinkLayerPacketView packet = + model::packets::LinkLayerPacketView::Create( + bluetooth::packet::PacketView<bluetooth::packet::kLittleEndian>( + bytes)); + if (!packet.IsValid()) { + std::cerr << "Dropping malformed LL packet" << std::endl; + return; + } + controller->IncomingPacket(std::move(packet)); + }); +} + +__attribute__((constructor)) static void ConfigureLogging() { + android::base::InitLogging({}, android::base::StdioLogger); +} + +} // namespace rootcanal diff --git a/tools/rootcanal/model/controller/link_layer_controller.cc b/tools/rootcanal/model/controller/link_layer_controller.cc index d03651c46c..f36b42f958 100644 --- a/tools/rootcanal/model/controller/link_layer_controller.cc +++ b/tools/rootcanal/model/controller/link_layer_controller.cc @@ -212,9 +212,6 @@ std::optional<AddressWithType> LinkLayerController::ResolvePrivateAddress( return {}; } -static Address generate_rpa( - std::array<uint8_t, LinkLayerController::kIrkSize> irk); - std::optional<AddressWithType> LinkLayerController::GenerateResolvablePrivateAddress(AddressWithType address, IrkSelection irk) { @@ -2621,7 +2618,7 @@ void LinkLayerController::IncomingKeypressNotificationPacket( } #endif /* !ROOTCANAL_LMP */ -static Address generate_rpa( +Address LinkLayerController::generate_rpa( std::array<uint8_t, LinkLayerController::kIrkSize> irk) { // most significant bit, bit7, bit6 is 01 to be resolvable random // Bits of the random part of prand shall not be all 1 or all 0 diff --git a/tools/rootcanal/model/controller/link_layer_controller.h b/tools/rootcanal/model/controller/link_layer_controller.h index e01511cb1c..de5b2a6fdf 100644 --- a/tools/rootcanal/model/controller/link_layer_controller.h +++ b/tools/rootcanal/model/controller/link_layer_controller.h @@ -62,6 +62,10 @@ class LinkLayerController { public: static constexpr size_t kIrkSize = 16; + // Generate a resolvable private address using the specified IRK. + static Address generate_rpa( + std::array<uint8_t, LinkLayerController::kIrkSize> irk); + LinkLayerController(const Address& address, const ControllerProperties& properties); diff --git a/tools/rootcanal/py/bluetooth.py b/tools/rootcanal/py/bluetooth.py new file mode 100644 index 0000000000..dba35cd53a --- /dev/null +++ b/tools/rootcanal/py/bluetooth.py @@ -0,0 +1,48 @@ +from dataclasses import dataclass, field +from typing import Tuple + + +@dataclass +class Address: + address: bytes = field(default=bytes([0, 0, 0, 0, 0, 0])) + + def __post_init__(self): + self.address = bytes(self.address) + + def from_str(address: str) -> 'Address': + return Address(bytes([int(b, 16) for b in address.split(':')])) + + def parse(span: bytes) -> Tuple['Address', bytes]: + assert len(span) > 6 + return (Address(bytes(reversed(span[:6]))), span[6:]) + + def parse_all(span: bytes) -> 'Address': + assert (len(span) == 6) + return Address(bytes(reversed(span))) + + def serialize(self) -> bytes: + return bytes(reversed(self.address)) + + def __repr__(self) -> str: + return ':'.join([f'{b:02x}' for b in self.address]) + + @property + def size(self) -> int: + return 6 + + +@dataclass +class ClassOfDevice: + + def parse(span: bytes) -> Tuple['Address', bytes]: + assert False + + def parse_all(span: bytes) -> 'Address': + assert False + + def serialize(self) -> bytes: + assert False + + @property + def size(self) -> int: + assert False diff --git a/tools/rootcanal/py/controller.py b/tools/rootcanal/py/controller.py new file mode 100644 index 0000000000..dafde53e51 --- /dev/null +++ b/tools/rootcanal/py/controller.py @@ -0,0 +1,119 @@ +import asyncio +import collections +import hci_packets as hci +import lib_rootcanal_python3 as rootcanal +import link_layer_packets as ll +import py.bluetooth +import unittest +from typing import Optional +from hci_packets import ErrorCode + + +class Controller(rootcanal.BaseController): + """Binder class to DualModeController. + The methods send_cmd, send_hci, send_ll are used to inject HCI or LL + packets into the controller, and receive_hci, receive_ll to + catch outgoing HCI packets of LL pdus.""" + + def __init__(self): + super().__init__(self.receive_hci_, self.receive_ll_) + self.evt_queue = collections.deque() + self.acl_queue = collections.deque() + self.ll_queue = collections.deque() + self.evt_queue_event = asyncio.Event() + self.acl_queue_event = asyncio.Event() + self.ll_queue_event = asyncio.Event() + + def receive_hci_(self, typ: rootcanal.HciType, packet: bytes): + if typ == rootcanal.HciType.Evt: + print(f"<-- received HCI event data={len(packet)}[..]") + self.evt_queue.append(packet) + self.evt_queue_event.set() + elif typ == rootcanal.HciType.Acl: + print(f"<-- received HCI ACL packet data={len(packet)}[..]") + self.acl_queue.append(packet) + self.acl_queue_event.set() + else: + print(f"ignoring HCI packet typ={typ}") + + def receive_ll_(self, packet: bytes): + print(f"<-- received LL pdu data={len(packet)}[..]") + self.ll_queue.append(packet) + self.ll_queue_event.set() + + def send_cmd(self, cmd: hci.Command): + print(f"--> sending HCI command {cmd.__class__.__name__}") + self.send_hci(rootcanal.HciType.Cmd, cmd.serialize()) + + def send_ll(self, pdu: ll.LinkLayerPacket, rssi: Optional[int] = None): + print(f"--> sending LL pdu {pdu.__class__.__name__}") + if rssi is not None: + pdu = ll.RssiWrapper(rssi=rssi, payload=pdu.serialize()) + super().send_ll(pdu.serialize()) + + def stop(self): + super().stop() + if self.evt_queue: + print("evt queue not empty at stop():") + for packet in self.evt_queue: + evt = hci.Event.parse_all(packet) + evt.show() + raise Exception("evt queue not empty at stop()") + + if self.ll_queue: + for packet in self.ll_queue: + pdu = ll.LinkLayerPacket.parse_all(packet) + pdu.show() + raise Exception("ll queue not empty at stop()") + + async def receive_evt(self): + while not self.evt_queue: + await self.evt_queue_event.wait() + self.evt_queue_event.clear() + return self.evt_queue.popleft() + + async def expect_evt(self, expected_evt: hci.Event): + packet = await self.receive_evt() + evt = hci.Event.parse_all(packet) + if evt != expected_evt: + print("received unexpected event") + print("expected event:") + expected_evt.show() + print("received event:") + evt.show() + raise Exception(f"unexpected evt {evt.__class__.__name__}") + + +class ControllerTest(unittest.IsolatedAsyncioTestCase): + """Helper class for writing controller tests using the python bindings. + The test setups the controller sending the Reset command and configuring + the event masks to allow all events.""" + + def setUp(self): + self.controller = Controller() + self.controller.start() + + async def asyncSetUp(self): + controller = self.controller + + # Reset the controller and enable all events and LE events. + controller.send_cmd(hci.Reset()) + await controller.expect_evt(hci.ResetComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + controller.send_cmd(hci.SetEventMask(event_mask=0xffffffffffffffff)) + await controller.expect_evt(hci.SetEventMaskComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + controller.send_cmd(hci.LeSetEventMask(le_event_mask=0xffffffffffffffff)) + await controller.expect_evt(hci.LeSetEventMaskComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + async def expect_evt(self, expected_evt: hci.Event): + packet = await self.controller.receive_evt() + evt = hci.Event.parse_all(packet) + if evt != expected_evt: + print("received unexpected event") + print("expected event:") + expected_evt.show() + print("received event:") + evt.show() + self.AssertTrue(False) + + def tearDown(self): + self.controller.stop() diff --git a/tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py b/tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py new file mode 100644 index 0000000000..371ba987a0 --- /dev/null +++ b/tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py @@ -0,0 +1,129 @@ +import lib_rootcanal_python3 as rootcanal +import hci_packets as hci +import link_layer_packets as ll +import unittest +from hci_packets import ErrorCode +from py.bluetooth import Address +from py.controller import ControllerTest + + +class Test(ControllerTest): + + # LL/DDI/SCN/BV-13-C [Network Privacy – Passive Scanning, Peer IRK] + async def test(self): + # Test parameters. + LL_scanner_scanInterval_MIN = 0x2000 + LL_scanner_scanInterval_MAX = 0x2000 + LL_scanner_scanWindow_MIN = 0x200 + LL_scanner_scanWindow_MAX = 0x200 + LL_scanner_Adv_Channel_Map = 0x7 + + controller = self.controller + peer_irk = bytes([1] * 16) + peer_identity_address = Address.from_str('aa:bb:cc:dd:ee:ff') + peer_identity_address_type = hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS + peer_resolvable_address = Address(rootcanal.generate_rpa(peer_irk)) + + # 1. The Upper Tester populates the IUT resolving list with the peer IRK + # and identity address. + controller.send_cmd( + hci.LeAddDeviceToResolvingList(peer_irk=peer_irk, + local_irk=bytes([0] * 16), + peer_identity_address=peer_identity_address, + peer_identity_address_type=peer_identity_address_type)) + + await self.expect_evt( + hci.LeAddDeviceToResolvingListComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + controller.send_cmd(hci.LeSetResolvablePrivateAddressTimeout(rpa_timeout=0x10)) + + await self.expect_evt( + hci.LeSetResolvablePrivateAddressTimeoutComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + controller.send_cmd(hci.LeSetAddressResolutionEnable(address_resolution_enable=hci.Enable.ENABLED)) + + await self.expect_evt( + hci.LeSetAddressResolutionEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + # 2. The Upper Tester enables passive scanning in the IUT. + controller.send_cmd( + hci.LeSetScanParameters(le_scan_type=hci.LeScanType.PASSIVE, + le_scan_interval=LL_scanner_scanInterval_MAX, + le_scan_window=LL_scanner_scanWindow_MAX, + own_address_type=hci.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS, + scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL)) + + await self.expect_evt(hci.LeSetScanParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + controller.send_cmd( + hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED)) + + await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + # 3. Configure the Lower Tester to start advertising. The Lower Tester uses + # a resolvable private address in the AdvA field. + # 4. The Lower Tester sends an ADV_NONCONN_IND packet each advertising event + # using the selected advertising channel only. Repeat for at least 20 + # advertising intervals. + controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_resolvable_address, + advertising_address_type=ll.AddressType.RANDOM, + advertising_type=ll.LegacyAdvertisingType.ADV_NONCONN_IND, + advertising_data=[1, 2, 3]), + rssi=0xf0) + + # 5. The Upper Tester receives at least one HCI_LE_Advertising_Report + # reporting the advertising packets sent by the Lower Tester. The address in + # the report is resolved by the IUT using the distributed IRK. + await self.expect_evt( + hci.LeAdvertisingReportRaw(responses=[ + hci.LeAdvertisingResponseRaw(event_type=hci.AdvertisingEventType.ADV_NONCONN_IND, + address_type=hci.AddressType.PUBLIC_IDENTITY_ADDRESS, + address=peer_identity_address, + advertising_data=[1, 2, 3], + rssi=0xf0) + ])) + + # 6. The Upper Tester sends an HCI_LE_Set_Scan_Enable to the IUT to stop the + # scanning function and receives an HCI_Command_Complete event in response. + controller.send_cmd(hci.LeSetScanEnable(le_scan_enable=hci.Enable.DISABLED)) + + await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + # 7. The Upper Tester disables address resolution. + controller.send_cmd(hci.LeSetAddressResolutionEnable(address_resolution_enable=hci.Enable.DISABLED)) + + await self.expect_evt( + hci.LeSetAddressResolutionEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + # 8. The Upper Tester enables passive scanning in the IUT. + controller.send_cmd( + hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED)) + + await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + # 9. The Lower Tester sends an ADV_NONCONN_IND packet each advertising event + # using the selected advertising channel only. Repeat for at least 20 + # advertising intervals. + controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_resolvable_address, + advertising_address_type=ll.AddressType.RANDOM, + advertising_type=ll.LegacyAdvertisingType.ADV_NONCONN_IND, + advertising_data=[1, 2, 3]), + rssi=0xf0) + + # 10. The IUT does not resolve the Lower Tester’s address and reports it + # unresolved (as received in the advertising PDU) in the advertising report + # events to the Upper Tester. + await self.expect_evt( + hci.LeAdvertisingReportRaw(responses=[ + hci.LeAdvertisingResponseRaw(event_type=hci.AdvertisingEventType.ADV_NONCONN_IND, + address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, + address=peer_resolvable_address, + advertising_data=[1, 2, 3], + rssi=0xf0) + ])) + + # 11. The Upper Tester sends an HCI_LE_Set_Scan_Enable to the IUT to stop the + # scanning function and receives an HCI_Command_Complete event in response. + controller.send_cmd(hci.LeSetScanEnable(le_scan_enable=hci.Enable.DISABLED)) + + await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) diff --git a/tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py b/tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py new file mode 100644 index 0000000000..8d27956b96 --- /dev/null +++ b/tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py @@ -0,0 +1,85 @@ +import lib_rootcanal_python3 as rootcanal +import hci_packets as hci +import link_layer_packets as ll +import unittest +from hci_packets import ErrorCode +from py.bluetooth import Address +from py.controller import ControllerTest + + +class Test(ControllerTest): + + # LL/DDI/SCN/BV-13-C [Network Privacy - Passive Scanning: Directed Events to an address + # different from the scanner’s address] + async def test(self): + # Test parameters. + RPA_timeout = 0x10 + LL_scanner_scanInterval_MIN = 0x2000 + LL_scanner_scanInterval_MAX = 0x2000 + LL_scanner_scanWindow_MIN = 0x200 + LL_scanner_scanWindow_MAX = 0x200 + LL_scanner_Adv_Channel_Map = 0x7 + + controller = self.controller + peer_irk = bytes([1] * 16) + local_irk = bytes([2] * 16) + peer_resolvable_address = Address(rootcanal.generate_rpa(peer_irk)) + local_resolvable_address_1 = Address(rootcanal.generate_rpa(local_irk)) + local_resolvable_address_2 = Address(rootcanal.generate_rpa(local_irk)) + + # 1. The Upper Tester sets a resolvable private address for the IUT to use. + controller.send_cmd(hci.LeSetRandomAddress(random_address=local_resolvable_address_1)) + + await self.expect_evt(hci.LeSetRandomAddressComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + controller.send_cmd(hci.LeSetResolvablePrivateAddressTimeout(rpa_timeout=RPA_timeout)) + + await self.expect_evt( + hci.LeSetResolvablePrivateAddressTimeoutComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + # 2. The Upper Tester enables passive scanning using filter policy 0x02 in the IUT. + controller.send_cmd( + hci.LeSetScanParameters(le_scan_type=hci.LeScanType.PASSIVE, + le_scan_interval=LL_scanner_scanInterval_MAX, + le_scan_window=LL_scanner_scanWindow_MAX, + own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, + scanning_filter_policy=hci.LeScanningFilterPolicy.CHECK_INITIATORS_IDENTITY)) + + await self.expect_evt(hci.LeSetScanParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + controller.send_cmd( + hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED)) + + await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) + + # 3. Configure the Lower Tester to start advertising. The Lower Tester uses a resolvable private + # address type in the AdvA field. The InitA field also contains a resolvable private address, which + # does not match the address set by the Upper Tester in the IUT. + + # 4. The Lower Tester sends an ADV_ DIRECT _IND packet each advertising event using the + # selected advertising channel only. Repeat for at least 20 advertising intervals. + controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_resolvable_address, + destination_address=local_resolvable_address_2, + advertising_address_type=ll.AddressType.RANDOM, + target_address_type=ll.AddressType.RANDOM, + advertising_type=ll.LegacyAdvertisingType.ADV_DIRECT_IND, + advertising_data=[1, 2, 3]), + rssi=0xf0) + + # 5. The Upper Tester receives at least one HCI_LE_Direct_Advertising_Report reporting the + # advertising packets sent by the Lower Tester. + await self.expect_evt( + hci.LeDirectedAdvertisingReport(responses=[ + hci.LeDirectedAdvertisingResponse(event_type=hci.AdvertisingEventType.ADV_DIRECT_IND, + address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, + address=peer_resolvable_address, + direct_address_type=hci.DirectAddressType.RANDOM_DEVICE_ADDRESS, + direct_address=local_resolvable_address_2, + rssi=0xf0) + ])) + + # 6. The Upper Tester sends an HCI_LE_Set_Scan_Enable to the IUT to stop the scanning function + # and receives an HCI_Command_Complete event in response. + controller.send_cmd(hci.LeSetScanEnable(le_scan_enable=hci.Enable.DISABLED)) + + await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1)) diff --git a/tools/rootcanal/test/main.py b/tools/rootcanal/test/main.py new file mode 100644 index 0000000000..728307e439 --- /dev/null +++ b/tools/rootcanal/test/main.py @@ -0,0 +1,29 @@ +from importlib import resources +from pathlib import Path +import importlib +import tempfile + +# Python is not able to load the module lib_rootcanal_python3.so +# when the test target is configured with embedded_launcher: true. +# This code loads the file to a temporary directory and adds the +# path to the sys lookup. +with tempfile.TemporaryDirectory() as cache: + with (Path('lib_rootcanal_python3.so').open('rb') as fin, + Path(cache, 'lib_rootcanal_python3.so').open('wb') as fout): + fout.write(fin.read()) + sys.path.append(cache) + import lib_rootcanal_python3 + +import unittest + +tests = [ + 'LL.DDI.SCN.BV_13_C', + 'LL.DDI.SCN.BV_14_C' +] + +if __name__ == "__main__": + suite = unittest.TestSuite() + for test in tests: + module = importlib.import_module(f'test.{test}') + suite.addTest(unittest.defaultTestLoader.loadTestsFromModule(module)) + unittest.TextTestRunner(verbosity=3).run(suite) |