summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Henri Chataing <henrichataing@google.com> 2022-11-10 22:44:03 +0000
committer Henri Chataing <henrichataing@google.com> 2022-12-01 00:28:28 +0000
commit41b6cae9c74732b89c30d56f7a5a6ba7553889a0 (patch)
tree3e4f9a75508e35c0ab07124e5fd51ebc72421a0d
parentd18c06ea231786fd4c6120c746974a623cbb5ec9 (diff)
RootCanal: Implement rootcanal_ll_test
This change adds a python binding for the DualModeController class to enable isolated LL+HCI tests on one controller instance. Implement test LL/DDI/SCN/BV-13-C test from Bluetooth controller certification test suite. Test: atest --host rootcanal_ll_test Change-Id: I5b0b9fd9608769346deb9c20dbf1d7aa809e8a0a
-rw-r--r--tools/rootcanal/Android.bp107
-rw-r--r--tools/rootcanal/model/controller/dual_mode_controller_python3.cc230
-rw-r--r--tools/rootcanal/model/controller/link_layer_controller.cc5
-rw-r--r--tools/rootcanal/model/controller/link_layer_controller.h4
-rw-r--r--tools/rootcanal/py/bluetooth.py48
-rw-r--r--tools/rootcanal/py/controller.py119
-rw-r--r--tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py129
-rw-r--r--tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py85
-rw-r--r--tools/rootcanal/test/main.py29
9 files changed, 752 insertions, 4 deletions
diff --git a/tools/rootcanal/Android.bp b/tools/rootcanal/Android.bp
index 52ec3745b1..fa5c135fe3 100644
--- a/tools/rootcanal/Android.bp
+++ b/tools/rootcanal/Android.bp
@@ -105,6 +105,84 @@ cc_library_static {
],
}
+// This library implements Python bindings to the DualModeController
+// class to enable scripted testing in Python.
+cc_library_host_shared {
+ name: "lib_rootcanal_python3",
+ defaults: [
+ "bluetooth_py3_native_extension_defaults",
+ "rootcanal_defaults",
+ ],
+ srcs: [
+ "model/controller/acl_connection.cc",
+ "model/controller/acl_connection_handler.cc",
+ "model/controller/controller_properties.cc",
+ "model/controller/dual_mode_controller.cc",
+ "model/controller/dual_mode_controller_python3.cc",
+ "model/controller/isochronous_connection_handler.cc",
+ "model/controller/le_advertiser.cc",
+ "model/controller/link_layer_controller.cc",
+ "model/controller/sco_connection.cc",
+ "model/controller/security_manager.cc",
+ "model/devices/device.cc",
+ "model/setup/async_manager.cc",
+ ":BluetoothPacketSources",
+ ":BluetoothHciClassSources",
+ ":BluetoothCryptoToolboxSources",
+ ],
+ export_include_dirs: [
+ "include",
+ ".",
+ ],
+ stl: "libc++_static",
+ static_libs: [
+ "libjsoncpp",
+ ],
+ whole_static_libs: [
+ "libbase",
+ "liblmp",
+ ],
+ header_libs: [
+ "pybind11_headers",
+ ],
+ cflags: [
+ "-fexceptions",
+ ],
+ rtti: true,
+}
+
+// Generate the python parser+serializer backend for
+// packets/link_layer_packets.pdl.
+genrule {
+ name: "link_layer_packets_python3_gen",
+ defaults: [ "pdl_python_generator_defaults" ],
+ cmd: "$(location :pdl) $(in) |" +
+ " $(location :pdl_python_generator)" +
+ " --output $(out) --custom-type-location py.bluetooth",
+ srcs: [
+ "packets/link_layer_packets.pdl",
+ ],
+ out: [
+ "link_layer_packets.py",
+ ],
+}
+
+// Generate the python parser+serializer backend for
+// hci_packets.pdl.
+genrule {
+ name: "hci_packets_python3_gen",
+ defaults: [ "pdl_python_generator_defaults" ],
+ cmd: "$(location :pdl) $(in) |" +
+ " $(location :pdl_python_generator)" +
+ " --output $(out) --custom-type-location py.bluetooth",
+ srcs: [
+ ":BluetoothHciPackets",
+ ],
+ out: [
+ "hci_packets.py",
+ ],
+}
+
cc_library_static {
name: "libscriptedbeaconpayload-protos-lite",
host_supported: true,
@@ -162,6 +240,35 @@ cc_test_host {
],
}
+// Implement the Bluetooth official LL test suite for root-canal.
+python_test_host {
+ name: "rootcanal_ll_test",
+ main: "test/main.py",
+ srcs: [
+ "py/controller.py",
+ "py/bluetooth.py",
+ ":hci_packets_python3_gen",
+ ":link_layer_packets_python3_gen",
+ "test/main.py",
+ "test/LL/DDI/SCN/BV_13_C.py",
+ "test/LL/DDI/SCN/BV_14_C.py",
+ ],
+ data: [
+ ":lib_rootcanal_python3",
+ ],
+ libs: [
+ "typing_extensions",
+ ],
+ test_options: {
+ unit_test: true,
+ },
+ version: {
+ py3: {
+ embedded_launcher: true,
+ },
+ },
+}
+
// test-vendor unit tests for host
cc_test_host {
name: "rootcanal_test_host",
diff --git a/tools/rootcanal/model/controller/dual_mode_controller_python3.cc b/tools/rootcanal/model/controller/dual_mode_controller_python3.cc
new file mode 100644
index 0000000000..d24c792246
--- /dev/null
+++ b/tools/rootcanal/model/controller/dual_mode_controller_python3.cc
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <android-base/logging.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+
+#include "dual_mode_controller.h"
+
+using namespace std::literals;
+namespace py = pybind11;
+
+namespace rootcanal {
+
+namespace hci {
+enum Type {
+ CMD,
+ EVT,
+ ACL,
+ SCO,
+ ISO,
+};
+} // namespace hci
+
+// Overload the class DualModeController to implement
+// SendLinkLayerPacket as forwarding packets to a registered handler.
+class BaseController : public DualModeController {
+ public:
+ BaseController() : DualModeController() {
+ RegisterTaskScheduler(
+ [this](std::chrono::milliseconds delay, TaskCallback const& task) {
+ return this->async_manager_.ExecAsync(0, delay, task);
+ });
+ RegisterPeriodicTaskScheduler([this](std::chrono::milliseconds delay,
+ std::chrono::milliseconds period,
+ TaskCallback const& task) {
+ return this->async_manager_.ExecAsyncPeriodically(0, delay, period, task);
+ });
+ RegisterTaskCancel([this](AsyncTaskId task_id) {
+ this->async_manager_.CancelAsyncTask(task_id);
+ });
+ }
+ ~BaseController() = default;
+
+ void RegisterLLChannel(
+ std::function<void(std::shared_ptr<std::vector<uint8_t>>)> const&
+ send_ll) {
+ send_ll_ = send_ll;
+ }
+
+ void Start() {
+ if (timer_task_id_ == kInvalidTaskId) {
+ timer_task_id_ = async_manager_.ExecAsyncPeriodically(
+ 0, 0ms, 5ms, [this]() { this->TimerTick(); });
+ }
+ }
+
+ void Stop() {
+ if (timer_task_id_ != kInvalidTaskId) {
+ async_manager_.CancelAsyncTask(timer_task_id_);
+ timer_task_id_ = kInvalidTaskId;
+ }
+ }
+
+ virtual void SendLinkLayerPacket(
+ std::shared_ptr<model::packets::LinkLayerPacketBuilder> packet,
+ Phy::Type phy_type_) override {
+ (void)phy_type_;
+ auto bytes = std::make_shared<std::vector<uint8_t>>();
+ bluetooth::packet::BitInserter inserter(*bytes);
+ bytes->reserve(packet->size());
+ packet->Serialize(inserter);
+ send_ll_(bytes);
+ }
+
+ private:
+ std::function<void(std::shared_ptr<std::vector<uint8_t>>)> send_ll_{};
+ AsyncManager async_manager_;
+ AsyncTaskId timer_task_id_;
+
+ BaseController(BaseController const&) = delete;
+ DualModeController& operator=(BaseController const&) = delete;
+};
+
+PYBIND11_MODULE(lib_rootcanal_python3, m) {
+ m.doc() = "RootCanal controller plugin";
+
+ py::enum_<hci::Type>(m, "HciType")
+ .value("Cmd", hci::Type::CMD)
+ .value("Evt", hci::Type::EVT)
+ .value("Acl", hci::Type::ACL)
+ .value("Sco", hci::Type::SCO)
+ .value("Iso", hci::Type::ISO);
+
+ m.def(
+ "generate_rpa",
+ [](py::bytes arg) {
+ std::string irk_str = arg;
+ irk_str.resize(LinkLayerController::kIrkSize);
+
+ std::array<uint8_t, LinkLayerController::kIrkSize> irk{};
+ std::copy(irk_str.begin(), irk_str.end(), irk.begin());
+
+ bluetooth::hci::Address rpa =
+ rootcanal::LinkLayerController::generate_rpa(irk);
+ return rpa.address;
+ },
+ "Bluetooth RPA generation");
+
+ py::class_<rootcanal::BaseController,
+ std::shared_ptr<rootcanal::BaseController>>
+ basic_controller(m, "BaseController");
+
+ // Implement the constructor with two callback parameters to
+ // handle emitted HCI packets and LL packets.
+ basic_controller.def(py::init([](py::object hci_handler,
+ py::object ll_handler) {
+ std::shared_ptr<BaseController> controller =
+ std::make_shared<BaseController>();
+ controller->RegisterEventChannel(
+ [=](std::shared_ptr<std::vector<uint8_t>> data) {
+ pybind11::gil_scoped_acquire acquire;
+ hci_handler(
+ hci::Type::EVT,
+ py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
+ });
+ controller->RegisterAclChannel(
+ [=](std::shared_ptr<std::vector<uint8_t>> data) {
+ pybind11::gil_scoped_acquire acquire;
+ hci_handler(
+ hci::Type::ACL,
+ py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
+ });
+ controller->RegisterScoChannel(
+ [=](std::shared_ptr<std::vector<uint8_t>> data) {
+ pybind11::gil_scoped_acquire acquire;
+ hci_handler(
+ hci::Type::SCO,
+ py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
+ });
+ controller->RegisterIsoChannel(
+ [=](std::shared_ptr<std::vector<uint8_t>> data) {
+ pybind11::gil_scoped_acquire acquire;
+ hci_handler(
+ hci::Type::ISO,
+ py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
+ });
+ controller->RegisterLLChannel(
+ [=](std::shared_ptr<std::vector<uint8_t>> data) {
+ pybind11::gil_scoped_acquire acquire;
+ ll_handler(
+ py::bytes(reinterpret_cast<char*>(data->data()), data->size()));
+ });
+ return controller;
+ }));
+
+ // Timer interface.
+ basic_controller.def("start", &BaseController::Start);
+ basic_controller.def("stop", &BaseController::Stop);
+
+ // Implement method BaseController.receive_hci which
+ // injects HCI packets into the controller as if sent from the host.
+ basic_controller.def(
+ "send_hci", [](std::shared_ptr<rootcanal::BaseController> controller,
+ hci::Type typ, py::bytes data) {
+ std::string data_str = data;
+ std::shared_ptr<std::vector<uint8_t>> bytes =
+ std::make_shared<std::vector<uint8_t>>(data_str.begin(),
+ data_str.end());
+
+ switch (typ) {
+ case hci::Type::CMD:
+ controller->HandleCommand(bytes);
+ break;
+ case hci::Type::ACL:
+ controller->HandleAcl(bytes);
+ break;
+ case hci::Type::SCO:
+ controller->HandleSco(bytes);
+ break;
+ case hci::Type::ISO:
+ controller->HandleIso(bytes);
+ break;
+ default:
+ std::cerr << "Dropping HCI packet with unknown type " << typ
+ << std::endl;
+ break;
+ }
+ });
+
+ // Implement method BaseController.receive_hci which
+ // injects LL packets into the controller as if sent over the air.
+ basic_controller.def(
+ "send_ll", [](std::shared_ptr<rootcanal::BaseController> controller,
+ py::bytes data) {
+ std::string data_str = data;
+ std::shared_ptr<std::vector<uint8_t>> bytes =
+ std::make_shared<std::vector<uint8_t>>(data_str.begin(),
+ data_str.end());
+
+ model::packets::LinkLayerPacketView packet =
+ model::packets::LinkLayerPacketView::Create(
+ bluetooth::packet::PacketView<bluetooth::packet::kLittleEndian>(
+ bytes));
+ if (!packet.IsValid()) {
+ std::cerr << "Dropping malformed LL packet" << std::endl;
+ return;
+ }
+ controller->IncomingPacket(std::move(packet));
+ });
+}
+
+__attribute__((constructor)) static void ConfigureLogging() {
+ android::base::InitLogging({}, android::base::StdioLogger);
+}
+
+} // namespace rootcanal
diff --git a/tools/rootcanal/model/controller/link_layer_controller.cc b/tools/rootcanal/model/controller/link_layer_controller.cc
index d03651c46c..f36b42f958 100644
--- a/tools/rootcanal/model/controller/link_layer_controller.cc
+++ b/tools/rootcanal/model/controller/link_layer_controller.cc
@@ -212,9 +212,6 @@ std::optional<AddressWithType> LinkLayerController::ResolvePrivateAddress(
return {};
}
-static Address generate_rpa(
- std::array<uint8_t, LinkLayerController::kIrkSize> irk);
-
std::optional<AddressWithType>
LinkLayerController::GenerateResolvablePrivateAddress(AddressWithType address,
IrkSelection irk) {
@@ -2621,7 +2618,7 @@ void LinkLayerController::IncomingKeypressNotificationPacket(
}
#endif /* !ROOTCANAL_LMP */
-static Address generate_rpa(
+Address LinkLayerController::generate_rpa(
std::array<uint8_t, LinkLayerController::kIrkSize> irk) {
// most significant bit, bit7, bit6 is 01 to be resolvable random
// Bits of the random part of prand shall not be all 1 or all 0
diff --git a/tools/rootcanal/model/controller/link_layer_controller.h b/tools/rootcanal/model/controller/link_layer_controller.h
index e01511cb1c..de5b2a6fdf 100644
--- a/tools/rootcanal/model/controller/link_layer_controller.h
+++ b/tools/rootcanal/model/controller/link_layer_controller.h
@@ -62,6 +62,10 @@ class LinkLayerController {
public:
static constexpr size_t kIrkSize = 16;
+ // Generate a resolvable private address using the specified IRK.
+ static Address generate_rpa(
+ std::array<uint8_t, LinkLayerController::kIrkSize> irk);
+
LinkLayerController(const Address& address,
const ControllerProperties& properties);
diff --git a/tools/rootcanal/py/bluetooth.py b/tools/rootcanal/py/bluetooth.py
new file mode 100644
index 0000000000..dba35cd53a
--- /dev/null
+++ b/tools/rootcanal/py/bluetooth.py
@@ -0,0 +1,48 @@
+from dataclasses import dataclass, field
+from typing import Tuple
+
+
+@dataclass
+class Address:
+ address: bytes = field(default=bytes([0, 0, 0, 0, 0, 0]))
+
+ def __post_init__(self):
+ self.address = bytes(self.address)
+
+ def from_str(address: str) -> 'Address':
+ return Address(bytes([int(b, 16) for b in address.split(':')]))
+
+ def parse(span: bytes) -> Tuple['Address', bytes]:
+ assert len(span) > 6
+ return (Address(bytes(reversed(span[:6]))), span[6:])
+
+ def parse_all(span: bytes) -> 'Address':
+ assert (len(span) == 6)
+ return Address(bytes(reversed(span)))
+
+ def serialize(self) -> bytes:
+ return bytes(reversed(self.address))
+
+ def __repr__(self) -> str:
+ return ':'.join([f'{b:02x}' for b in self.address])
+
+ @property
+ def size(self) -> int:
+ return 6
+
+
+@dataclass
+class ClassOfDevice:
+
+ def parse(span: bytes) -> Tuple['Address', bytes]:
+ assert False
+
+ def parse_all(span: bytes) -> 'Address':
+ assert False
+
+ def serialize(self) -> bytes:
+ assert False
+
+ @property
+ def size(self) -> int:
+ assert False
diff --git a/tools/rootcanal/py/controller.py b/tools/rootcanal/py/controller.py
new file mode 100644
index 0000000000..dafde53e51
--- /dev/null
+++ b/tools/rootcanal/py/controller.py
@@ -0,0 +1,119 @@
+import asyncio
+import collections
+import hci_packets as hci
+import lib_rootcanal_python3 as rootcanal
+import link_layer_packets as ll
+import py.bluetooth
+import unittest
+from typing import Optional
+from hci_packets import ErrorCode
+
+
+class Controller(rootcanal.BaseController):
+ """Binder class to DualModeController.
+ The methods send_cmd, send_hci, send_ll are used to inject HCI or LL
+ packets into the controller, and receive_hci, receive_ll to
+ catch outgoing HCI packets of LL pdus."""
+
+ def __init__(self):
+ super().__init__(self.receive_hci_, self.receive_ll_)
+ self.evt_queue = collections.deque()
+ self.acl_queue = collections.deque()
+ self.ll_queue = collections.deque()
+ self.evt_queue_event = asyncio.Event()
+ self.acl_queue_event = asyncio.Event()
+ self.ll_queue_event = asyncio.Event()
+
+ def receive_hci_(self, typ: rootcanal.HciType, packet: bytes):
+ if typ == rootcanal.HciType.Evt:
+ print(f"<-- received HCI event data={len(packet)}[..]")
+ self.evt_queue.append(packet)
+ self.evt_queue_event.set()
+ elif typ == rootcanal.HciType.Acl:
+ print(f"<-- received HCI ACL packet data={len(packet)}[..]")
+ self.acl_queue.append(packet)
+ self.acl_queue_event.set()
+ else:
+ print(f"ignoring HCI packet typ={typ}")
+
+ def receive_ll_(self, packet: bytes):
+ print(f"<-- received LL pdu data={len(packet)}[..]")
+ self.ll_queue.append(packet)
+ self.ll_queue_event.set()
+
+ def send_cmd(self, cmd: hci.Command):
+ print(f"--> sending HCI command {cmd.__class__.__name__}")
+ self.send_hci(rootcanal.HciType.Cmd, cmd.serialize())
+
+ def send_ll(self, pdu: ll.LinkLayerPacket, rssi: Optional[int] = None):
+ print(f"--> sending LL pdu {pdu.__class__.__name__}")
+ if rssi is not None:
+ pdu = ll.RssiWrapper(rssi=rssi, payload=pdu.serialize())
+ super().send_ll(pdu.serialize())
+
+ def stop(self):
+ super().stop()
+ if self.evt_queue:
+ print("evt queue not empty at stop():")
+ for packet in self.evt_queue:
+ evt = hci.Event.parse_all(packet)
+ evt.show()
+ raise Exception("evt queue not empty at stop()")
+
+ if self.ll_queue:
+ for packet in self.ll_queue:
+ pdu = ll.LinkLayerPacket.parse_all(packet)
+ pdu.show()
+ raise Exception("ll queue not empty at stop()")
+
+ async def receive_evt(self):
+ while not self.evt_queue:
+ await self.evt_queue_event.wait()
+ self.evt_queue_event.clear()
+ return self.evt_queue.popleft()
+
+ async def expect_evt(self, expected_evt: hci.Event):
+ packet = await self.receive_evt()
+ evt = hci.Event.parse_all(packet)
+ if evt != expected_evt:
+ print("received unexpected event")
+ print("expected event:")
+ expected_evt.show()
+ print("received event:")
+ evt.show()
+ raise Exception(f"unexpected evt {evt.__class__.__name__}")
+
+
+class ControllerTest(unittest.IsolatedAsyncioTestCase):
+ """Helper class for writing controller tests using the python bindings.
+ The test setups the controller sending the Reset command and configuring
+ the event masks to allow all events."""
+
+ def setUp(self):
+ self.controller = Controller()
+ self.controller.start()
+
+ async def asyncSetUp(self):
+ controller = self.controller
+
+ # Reset the controller and enable all events and LE events.
+ controller.send_cmd(hci.Reset())
+ await controller.expect_evt(hci.ResetComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+ controller.send_cmd(hci.SetEventMask(event_mask=0xffffffffffffffff))
+ await controller.expect_evt(hci.SetEventMaskComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+ controller.send_cmd(hci.LeSetEventMask(le_event_mask=0xffffffffffffffff))
+ await controller.expect_evt(hci.LeSetEventMaskComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ async def expect_evt(self, expected_evt: hci.Event):
+ packet = await self.controller.receive_evt()
+ evt = hci.Event.parse_all(packet)
+ if evt != expected_evt:
+ print("received unexpected event")
+ print("expected event:")
+ expected_evt.show()
+ print("received event:")
+ evt.show()
+ self.AssertTrue(False)
+
+ def tearDown(self):
+ self.controller.stop()
diff --git a/tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py b/tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py
new file mode 100644
index 0000000000..371ba987a0
--- /dev/null
+++ b/tools/rootcanal/test/LL/DDI/SCN/BV_13_C.py
@@ -0,0 +1,129 @@
+import lib_rootcanal_python3 as rootcanal
+import hci_packets as hci
+import link_layer_packets as ll
+import unittest
+from hci_packets import ErrorCode
+from py.bluetooth import Address
+from py.controller import ControllerTest
+
+
+class Test(ControllerTest):
+
+ # LL/DDI/SCN/BV-13-C [Network Privacy – Passive Scanning, Peer IRK]
+ async def test(self):
+ # Test parameters.
+ LL_scanner_scanInterval_MIN = 0x2000
+ LL_scanner_scanInterval_MAX = 0x2000
+ LL_scanner_scanWindow_MIN = 0x200
+ LL_scanner_scanWindow_MAX = 0x200
+ LL_scanner_Adv_Channel_Map = 0x7
+
+ controller = self.controller
+ peer_irk = bytes([1] * 16)
+ peer_identity_address = Address.from_str('aa:bb:cc:dd:ee:ff')
+ peer_identity_address_type = hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS
+ peer_resolvable_address = Address(rootcanal.generate_rpa(peer_irk))
+
+ # 1. The Upper Tester populates the IUT resolving list with the peer IRK
+ # and identity address.
+ controller.send_cmd(
+ hci.LeAddDeviceToResolvingList(peer_irk=peer_irk,
+ local_irk=bytes([0] * 16),
+ peer_identity_address=peer_identity_address,
+ peer_identity_address_type=peer_identity_address_type))
+
+ await self.expect_evt(
+ hci.LeAddDeviceToResolvingListComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_cmd(hci.LeSetResolvablePrivateAddressTimeout(rpa_timeout=0x10))
+
+ await self.expect_evt(
+ hci.LeSetResolvablePrivateAddressTimeoutComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_cmd(hci.LeSetAddressResolutionEnable(address_resolution_enable=hci.Enable.ENABLED))
+
+ await self.expect_evt(
+ hci.LeSetAddressResolutionEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 2. The Upper Tester enables passive scanning in the IUT.
+ controller.send_cmd(
+ hci.LeSetScanParameters(le_scan_type=hci.LeScanType.PASSIVE,
+ le_scan_interval=LL_scanner_scanInterval_MAX,
+ le_scan_window=LL_scanner_scanWindow_MAX,
+ own_address_type=hci.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS,
+ scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL))
+
+ await self.expect_evt(hci.LeSetScanParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_cmd(
+ hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED))
+
+ await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 3. Configure the Lower Tester to start advertising. The Lower Tester uses
+ # a resolvable private address in the AdvA field.
+ # 4. The Lower Tester sends an ADV_NONCONN_IND packet each advertising event
+ # using the selected advertising channel only. Repeat for at least 20
+ # advertising intervals.
+ controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_resolvable_address,
+ advertising_address_type=ll.AddressType.RANDOM,
+ advertising_type=ll.LegacyAdvertisingType.ADV_NONCONN_IND,
+ advertising_data=[1, 2, 3]),
+ rssi=0xf0)
+
+ # 5. The Upper Tester receives at least one HCI_LE_Advertising_Report
+ # reporting the advertising packets sent by the Lower Tester. The address in
+ # the report is resolved by the IUT using the distributed IRK.
+ await self.expect_evt(
+ hci.LeAdvertisingReportRaw(responses=[
+ hci.LeAdvertisingResponseRaw(event_type=hci.AdvertisingEventType.ADV_NONCONN_IND,
+ address_type=hci.AddressType.PUBLIC_IDENTITY_ADDRESS,
+ address=peer_identity_address,
+ advertising_data=[1, 2, 3],
+ rssi=0xf0)
+ ]))
+
+ # 6. The Upper Tester sends an HCI_LE_Set_Scan_Enable to the IUT to stop the
+ # scanning function and receives an HCI_Command_Complete event in response.
+ controller.send_cmd(hci.LeSetScanEnable(le_scan_enable=hci.Enable.DISABLED))
+
+ await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 7. The Upper Tester disables address resolution.
+ controller.send_cmd(hci.LeSetAddressResolutionEnable(address_resolution_enable=hci.Enable.DISABLED))
+
+ await self.expect_evt(
+ hci.LeSetAddressResolutionEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 8. The Upper Tester enables passive scanning in the IUT.
+ controller.send_cmd(
+ hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED))
+
+ await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 9. The Lower Tester sends an ADV_NONCONN_IND packet each advertising event
+ # using the selected advertising channel only. Repeat for at least 20
+ # advertising intervals.
+ controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_resolvable_address,
+ advertising_address_type=ll.AddressType.RANDOM,
+ advertising_type=ll.LegacyAdvertisingType.ADV_NONCONN_IND,
+ advertising_data=[1, 2, 3]),
+ rssi=0xf0)
+
+ # 10. The IUT does not resolve the Lower Tester’s address and reports it
+ # unresolved (as received in the advertising PDU) in the advertising report
+ # events to the Upper Tester.
+ await self.expect_evt(
+ hci.LeAdvertisingReportRaw(responses=[
+ hci.LeAdvertisingResponseRaw(event_type=hci.AdvertisingEventType.ADV_NONCONN_IND,
+ address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
+ address=peer_resolvable_address,
+ advertising_data=[1, 2, 3],
+ rssi=0xf0)
+ ]))
+
+ # 11. The Upper Tester sends an HCI_LE_Set_Scan_Enable to the IUT to stop the
+ # scanning function and receives an HCI_Command_Complete event in response.
+ controller.send_cmd(hci.LeSetScanEnable(le_scan_enable=hci.Enable.DISABLED))
+
+ await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
diff --git a/tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py b/tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py
new file mode 100644
index 0000000000..8d27956b96
--- /dev/null
+++ b/tools/rootcanal/test/LL/DDI/SCN/BV_14_C.py
@@ -0,0 +1,85 @@
+import lib_rootcanal_python3 as rootcanal
+import hci_packets as hci
+import link_layer_packets as ll
+import unittest
+from hci_packets import ErrorCode
+from py.bluetooth import Address
+from py.controller import ControllerTest
+
+
+class Test(ControllerTest):
+
+ # LL/DDI/SCN/BV-13-C [Network Privacy - Passive Scanning: Directed Events to an address
+ # different from the scanner’s address]
+ async def test(self):
+ # Test parameters.
+ RPA_timeout = 0x10
+ LL_scanner_scanInterval_MIN = 0x2000
+ LL_scanner_scanInterval_MAX = 0x2000
+ LL_scanner_scanWindow_MIN = 0x200
+ LL_scanner_scanWindow_MAX = 0x200
+ LL_scanner_Adv_Channel_Map = 0x7
+
+ controller = self.controller
+ peer_irk = bytes([1] * 16)
+ local_irk = bytes([2] * 16)
+ peer_resolvable_address = Address(rootcanal.generate_rpa(peer_irk))
+ local_resolvable_address_1 = Address(rootcanal.generate_rpa(local_irk))
+ local_resolvable_address_2 = Address(rootcanal.generate_rpa(local_irk))
+
+ # 1. The Upper Tester sets a resolvable private address for the IUT to use.
+ controller.send_cmd(hci.LeSetRandomAddress(random_address=local_resolvable_address_1))
+
+ await self.expect_evt(hci.LeSetRandomAddressComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_cmd(hci.LeSetResolvablePrivateAddressTimeout(rpa_timeout=RPA_timeout))
+
+ await self.expect_evt(
+ hci.LeSetResolvablePrivateAddressTimeoutComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 2. The Upper Tester enables passive scanning using filter policy 0x02 in the IUT.
+ controller.send_cmd(
+ hci.LeSetScanParameters(le_scan_type=hci.LeScanType.PASSIVE,
+ le_scan_interval=LL_scanner_scanInterval_MAX,
+ le_scan_window=LL_scanner_scanWindow_MAX,
+ own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
+ scanning_filter_policy=hci.LeScanningFilterPolicy.CHECK_INITIATORS_IDENTITY))
+
+ await self.expect_evt(hci.LeSetScanParametersComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ controller.send_cmd(
+ hci.LeSetScanEnable(le_scan_enable=hci.Enable.ENABLED, filter_duplicates=hci.Enable.DISABLED))
+
+ await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
+
+ # 3. Configure the Lower Tester to start advertising. The Lower Tester uses a resolvable private
+ # address type in the AdvA field. The InitA field also contains a resolvable private address, which
+ # does not match the address set by the Upper Tester in the IUT.
+
+ # 4. The Lower Tester sends an ADV_ DIRECT _IND packet each advertising event using the
+ # selected advertising channel only. Repeat for at least 20 advertising intervals.
+ controller.send_ll(ll.LeLegacyAdvertisingPdu(source_address=peer_resolvable_address,
+ destination_address=local_resolvable_address_2,
+ advertising_address_type=ll.AddressType.RANDOM,
+ target_address_type=ll.AddressType.RANDOM,
+ advertising_type=ll.LegacyAdvertisingType.ADV_DIRECT_IND,
+ advertising_data=[1, 2, 3]),
+ rssi=0xf0)
+
+ # 5. The Upper Tester receives at least one HCI_LE_Direct_Advertising_Report reporting the
+ # advertising packets sent by the Lower Tester.
+ await self.expect_evt(
+ hci.LeDirectedAdvertisingReport(responses=[
+ hci.LeDirectedAdvertisingResponse(event_type=hci.AdvertisingEventType.ADV_DIRECT_IND,
+ address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
+ address=peer_resolvable_address,
+ direct_address_type=hci.DirectAddressType.RANDOM_DEVICE_ADDRESS,
+ direct_address=local_resolvable_address_2,
+ rssi=0xf0)
+ ]))
+
+ # 6. The Upper Tester sends an HCI_LE_Set_Scan_Enable to the IUT to stop the scanning function
+ # and receives an HCI_Command_Complete event in response.
+ controller.send_cmd(hci.LeSetScanEnable(le_scan_enable=hci.Enable.DISABLED))
+
+ await self.expect_evt(hci.LeSetScanEnableComplete(status=ErrorCode.SUCCESS, num_hci_command_packets=1))
diff --git a/tools/rootcanal/test/main.py b/tools/rootcanal/test/main.py
new file mode 100644
index 0000000000..728307e439
--- /dev/null
+++ b/tools/rootcanal/test/main.py
@@ -0,0 +1,29 @@
+from importlib import resources
+from pathlib import Path
+import importlib
+import tempfile
+
+# Python is not able to load the module lib_rootcanal_python3.so
+# when the test target is configured with embedded_launcher: true.
+# This code loads the file to a temporary directory and adds the
+# path to the sys lookup.
+with tempfile.TemporaryDirectory() as cache:
+ with (Path('lib_rootcanal_python3.so').open('rb') as fin,
+ Path(cache, 'lib_rootcanal_python3.so').open('wb') as fout):
+ fout.write(fin.read())
+ sys.path.append(cache)
+ import lib_rootcanal_python3
+
+import unittest
+
+tests = [
+ 'LL.DDI.SCN.BV_13_C',
+ 'LL.DDI.SCN.BV_14_C'
+]
+
+if __name__ == "__main__":
+ suite = unittest.TestSuite()
+ for test in tests:
+ module = importlib.import_module(f'test.{test}')
+ suite.addTest(unittest.defaultTestLoader.loadTestsFromModule(module))
+ unittest.TextTestRunner(verbosity=3).run(suite)