diff options
| author | 2023-06-12 18:58:22 +0000 | |
|---|---|---|
| committer | 2023-06-12 19:02:28 +0000 | |
| commit | a0084b6c00139fcb4823f044d6e7a19f3749910a (patch) | |
| tree | d4c1f33ee22c71b5c0e41877ec0a6ac0f20c6248 | |
| parent | 901a2fa08b0c869b4b2c067232a41c04c49d66cb (diff) | |
RootCanal: Write helper script for dumping the controller configuration
Helpful for reading and reproducing the configuration
of a Bluetooth dongle of chipset.
Test: None
Bug: 253525084
Change-Id: I971c8dcb6d8e4b68f047219a7603707d674070af
| -rwxr-xr-x | tools/rootcanal/scripts/controller_info.py | 203 | 
1 files changed, 203 insertions, 0 deletions
| diff --git a/tools/rootcanal/scripts/controller_info.py b/tools/rootcanal/scripts/controller_info.py new file mode 100755 index 0000000000..10a74a0daf --- /dev/null +++ b/tools/rootcanal/scripts/controller_info.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 + +# Copyright 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Dump the configuration of a Bluetooth controller. + +The script expects to find the generated module hci_packets +in PYTHONPATH. + +The controller is expected to be available through HCI over TCP +at the port passed as parameter.""" + +import argparse +import asyncio +import collections +import sys + +import hci_packets as hci + +H4_IDC_CMD = 0x01 +H4_IDC_ACL = 0x02 +H4_IDC_SCO = 0x03 +H4_IDC_EVT = 0x04 +H4_IDC_ISO = 0x05 + +HCI_HEADER_SIZES = dict([(H4_IDC_CMD, 3), (H4_IDC_ACL, 4), (H4_IDC_SCO, 3), (H4_IDC_EVT, 2), (H4_IDC_ISO, 4)]) + + +class Host: + +    def __init__(self): +        self.evt_queue = collections.deque() +        self.evt_queue_event = asyncio.Event() + +    async def connect(self, ip: str, port: int): +        reader, writer = await asyncio.open_connection(ip, port) +        self.reader = asyncio.create_task(self._read(reader)) +        self.writer = writer + +    async def _read(self, reader): +        try: +            while True: +                idc = await reader.readexactly(1) + +                assert idc[0] in HCI_HEADER_SIZES +                header = await reader.readexactly(HCI_HEADER_SIZES[idc[0]]) + +                if idc[0] == H4_IDC_EVT: +                    evt = hci.Event.parse_all(header + (await reader.readexactly(header[1]))) +                    #print(f"<< {evt.__class__.__name__}") +                    self.evt_queue.append(evt) +                    self.evt_queue_event.set() +                else: +                    assert False +        except Exception as exn: +            print(f"Reader interrupted: {exn}") +            return + +    async def send_cmd(self, cmd: hci.Command): +        #print(f">> {cmd.__class__.__name__}") +        packet = bytes([H4_IDC_CMD]) + cmd.serialize() +        self.writer.write(packet) + +    async def recv_evt(self) -> hci.Event: +        while not self.evt_queue: +            await self.evt_queue_event.wait() +            self.evt_queue_event.clear() +        return self.evt_queue.popleft() + +    async def expect_evt(self, expected_evt: type) -> hci.Event: +        evt = await self.recv_evt() +        assert isinstance(evt, expected_evt) +        if not isinstance(evt, expected_evt): +            print("Received unexpected event:") +            evt.show() +            print(f"Expected event of type {expected_evt.__name__}") +            print(f"{list(evt.payload)}") +        return evt + + +async def br_edr_properties(host: Host): +    await host.send_cmd(hci.ReadLocalSupportedFeatures()) +    page0 = await host.expect_evt(hci.ReadLocalSupportedFeaturesComplete) +    await host.send_cmd(hci.ReadLocalExtendedFeatures(page_number=1)) +    page1 = await host.expect_evt(hci.ReadLocalExtendedFeaturesComplete) +    await host.send_cmd(hci.ReadLocalExtendedFeatures(page_number=2)) +    page2 = await host.expect_evt(hci.ReadLocalExtendedFeaturesComplete) + +    print( +        f"lmp_features: {{ 0x{page0.lmp_features:x}, 0x{page1.extended_lmp_features:x}, 0x{page2.extended_lmp_features:x} }}" +    ) + +    await host.send_cmd(hci.ReadBufferSize()) +    evt = await host.expect_evt(hci.ReadBufferSizeComplete) + +    print(f"acl_data_packet_length: {evt.acl_data_packet_length}") +    print(f"total_num_acl_data_packets: {evt.total_num_acl_data_packets}") +    print(f"sco_data_packet_length: {evt.synchronous_data_packet_length}") +    print(f"total_num_sco_data_packets: {evt.total_num_synchronous_data_packets}") + +    await host.send_cmd(hci.ReadNumberOfSupportedIac()) +    evt = await host.expect_evt(hci.ReadNumberOfSupportedIacComplete) + +    print(f"num_supported_iac: {evt.num_support_iac}") + + +async def le_properties(host: Host): +    await host.send_cmd(hci.LeReadLocalSupportedFeatures()) +    evt = await host.expect_evt(hci.LeReadLocalSupportedFeaturesComplete) + +    print(f"le_features: 0x{evt.le_features:x}") + +    await host.send_cmd(hci.LeReadBufferSizeV2()) +    evt = await host.expect_evt(hci.LeReadBufferSizeV2Complete) + +    print(f"le_acl_data_packet_length: {evt.le_buffer_size.le_data_packet_length}") +    print(f"total_num_le_acl_data_packets: {evt.le_buffer_size.total_num_le_packets}") +    print(f"iso_data_packet_length: {evt.iso_buffer_size.le_data_packet_length}") +    print(f"total_num_iso_data_packets: {evt.iso_buffer_size.total_num_le_packets}") + +    await host.send_cmd(hci.LeReadFilterAcceptListSize()) +    evt = await host.expect_evt(hci.LeReadFilterAcceptListSizeComplete) + +    print(f"le_filter_accept_list_size: {evt.filter_accept_list_size}") + +    await host.send_cmd(hci.LeReadResolvingListSize()) +    evt = await host.expect_evt(hci.LeReadResolvingListSizeComplete) + +    print(f"le_resolving_list_size: {evt.resolving_list_size}") + +    await host.send_cmd(hci.LeReadSupportedStates()) +    evt = await host.expect_evt(hci.LeReadSupportedStatesComplete) + +    print(f"le_supported_states: 0x{evt.le_states:x}") + +    await host.send_cmd(hci.LeReadMaximumAdvertisingDataLength()) +    evt = await host.expect_evt(hci.LeReadMaximumAdvertisingDataLengthComplete) + +    print(f"le_max_advertising_data_length: {evt.maximum_advertising_data_length}") + +    await host.send_cmd(hci.LeReadNumberOfSupportedAdvertisingSets()) +    evt = await host.expect_evt(hci.LeReadNumberOfSupportedAdvertisingSetsComplete) + +    print(f"le_num_supported_advertising_sets: {evt.number_supported_advertising_sets}") + +    await host.send_cmd(hci.LeReadPeriodicAdvertiserListSize()) +    evt = await host.expect_evt(hci.LeReadPeriodicAdvertiserListSizeComplete) + +    print(f"le_periodic_advertiser_list_size: {evt.periodic_advertiser_list_size}") + + +async def run(tcp_port: int): +    host = Host() +    await host.connect('127.0.0.1', tcp_port) + +    await host.send_cmd(hci.Reset()) +    await host.expect_evt(hci.ResetComplete) + +    await host.send_cmd(hci.ReadLocalVersionInformation()) +    evt = await host.expect_evt(hci.ReadLocalVersionInformationComplete) + +    print(f"hci_version: {evt.local_version_information.hci_version}") +    print(f"hci_subversion: 0x{evt.local_version_information.hci_revision:x}") +    print(f"lmp_version: {evt.local_version_information.lmp_version}") +    print(f"lmp_subversion: 0x{evt.local_version_information.lmp_subversion:x}") +    print(f"company_identifier: 0x{evt.local_version_information.manufacturer_name:x}") + +    await host.send_cmd(hci.ReadLocalSupportedCommands()) +    evt = await host.expect_evt(hci.ReadLocalSupportedCommandsComplete) + +    print(f"supported_commands: {{ {', '.join([f'0x{b:x}' for b in evt.supported_commands])} }}") + +    try: +        await br_edr_properties(host) +    except Exception: +        pass + +    try: +        await le_properties(host) +    except Exception: +        pass + + +def main() -> int: +    """Generate cxx PDL backend.""" +    parser = argparse.ArgumentParser(description=__doc__) +    parser.add_argument('tcp_port', type=int, help='HCI port') +    return asyncio.run(run(**vars(parser.parse_args()))) + + +if __name__ == '__main__': +    sys.exit(main()) |