diff options
author | 2024-07-30 15:40:52 +0000 | |
---|---|---|
committer | 2024-07-30 15:40:52 +0000 | |
commit | c80c3632d61f6dea3f225bb6a0008418e13d6956 (patch) | |
tree | c633c65aa90387ab5ed594f1fb905d64df63dc93 /pandora | |
parent | 26b196f6ada546ae6ca194988b94a4114955426a (diff) | |
parent | 7f8500791c76cb80d28c987499c0c08137c49245 (diff) |
Merge "Bumble infra changes" into main
Diffstat (limited to 'pandora')
-rw-r--r-- | pandora/interfaces/pandora_experimental/hid.proto | 2 | ||||
-rw-r--r-- | pandora/server/bumble_experimental/hid.py | 369 |
2 files changed, 361 insertions, 10 deletions
diff --git a/pandora/interfaces/pandora_experimental/hid.proto b/pandora/interfaces/pandora_experimental/hid.proto index 1eb22768c6..7b1e7d2c8a 100644 --- a/pandora/interfaces/pandora_experimental/hid.proto +++ b/pandora/interfaces/pandora_experimental/hid.proto @@ -11,6 +11,8 @@ service HID { rpc ConnectHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Disconnect HID Host rpc DisconnectHost(google.protobuf.Empty) returns (google.protobuf.Empty); + // Virtual Cable Unplug HID Host + rpc VirtualCableUnplugHost(google.protobuf.Empty) returns (google.protobuf.Empty); // Send a SET_REPORT command, acting as a HID host, to a connected HID device rpc SendHostReport(SendHostReportRequest) returns (SendHostReportResponse); } diff --git a/pandora/server/bumble_experimental/hid.py b/pandora/server/bumble_experimental/hid.py index 60b4ce48b3..512ff2204f 100644 --- a/pandora/server/bumble_experimental/hid.py +++ b/pandora/server/bumble_experimental/hid.py @@ -3,20 +3,28 @@ import asyncio import grpc import grpc.aio import logging +import struct from bumble.device import Device from google.protobuf import empty_pb2 # pytype: disable=pyi-error from pandora_experimental.hid_grpc_aio import HIDServicer +from bumble.pandora import utils from bumble.core import ( BT_BR_EDR_TRANSPORT, BT_L2CAP_PROTOCOL_ID, BT_HUMAN_INTERFACE_DEVICE_SERVICE, BT_HIDP_PROTOCOL_ID, UUID, + ProtocolError, ) +from bumble.hci import ( + HCI_StatusError, + HCI_CONNECTION_ALREADY_EXISTS_ERROR, + HCI_PAGE_TIMEOUT_ERROR, +) from bumble.hid import ( Device as HID_Device, HID_CONTROL_PSM, @@ -205,13 +213,137 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor 0x02, # . Report Count (2) 0x81, 0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position) - 0xC0, # . End Collection - 0xC0, # End Collection + 0xC0, # . End Collection (Physical) + 0xC0, # End Collection (Application) ]) # Default protocol mode set to report protocol protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL +from bumble.core import AdvertisingData +from bumble.device import Device, Connection, Peer +from bumble.gatt import ( + Descriptor, + Service, + Characteristic, + CharacteristicValue, + GATT_DEVICE_INFORMATION_SERVICE, + GATT_HUMAN_INTERFACE_DEVICE_SERVICE, + GATT_BATTERY_SERVICE, + GATT_BATTERY_LEVEL_CHARACTERISTIC, + GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, + GATT_REPORT_CHARACTERISTIC, + GATT_REPORT_MAP_CHARACTERISTIC, + GATT_PROTOCOL_MODE_CHARACTERISTIC, + GATT_HID_INFORMATION_CHARACTERISTIC, + GATT_HID_CONTROL_POINT_CHARACTERISTIC, + GATT_REPORT_REFERENCE_DESCRIPTOR, +) + +# ----------------------------------------------------------------------------- + +# Protocol Modes (HID Specification V1.1.1 Section 2.1.2) +HID_BOOT_PROTOCOL = 0x00 +HID_REPORT_PROTOCOL = 0x01 + +# Report Types (HID Specification V1.1.1 Section 2.1.1) +HID_INPUT_REPORT = 0x01 +HID_OUTPUT_REPORT = 0x02 +HID_FEATURE_REPORT = 0x03 + +# Report Map +HID_KEYBOARD_REPORT_MAP = bytes( + # pylint: disable=line-too-long + [ + 0x05, + 0x01, # Usage Page (Generic Desktop Controls) + 0x09, + 0x06, # Usage (Keyboard) + 0xA1, + 0x01, # Collection (Application) + 0x85, + 0x01, # . Report ID (1) + 0x05, + 0x07, # . Usage Page (Keyboard/Keypad) + 0x19, + 0xE0, # . Usage Minimum (0xE0) + 0x29, + 0xE7, # . Usage Maximum (0xE7) + 0x15, + 0x00, # . Logical Minimum (0) + 0x25, + 0x01, # . Logical Maximum (1) + 0x75, + 0x01, # . Report Size (1) + 0x95, + 0x08, # . Report Count (8) + 0x81, + 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x08, # . Report Size (8) + 0x81, + 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x06, # . Report Count (6) + 0x75, + 0x08, # . Report Size (8) + 0x15, + 0x00, # . Logical Minimum (0x00) + 0x25, + 0x94, # . Logical Maximum (0x94) + 0x05, + 0x07, # . Usage Page (Keyboard/Keypad) + 0x19, + 0x00, # . Usage Minimum (0x00) + 0x29, + 0x94, # . Usage Maximum (0x94) + 0x81, + 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) + 0x95, + 0x05, # . Report Count (5) + 0x75, + 0x01, # . Report Size (1) + 0x05, + 0x08, # . Usage Page (LEDs) + 0x19, + 0x01, # . Usage Minimum (Num Lock) + 0x29, + 0x05, # . Usage Maximum (Kana) + 0x91, + 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) + 0x95, + 0x01, # . Report Count (1) + 0x75, + 0x03, # . Report Size (3) + 0x91, + 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) + 0xC0, # End Collection + ]) + + +# ----------------------------------------------------------------------------- +# pylint: disable=invalid-overridden-method +class ServerListener(Device.Listener, Connection.Listener): + + def __init__(self, device): + self.device = device + + @AsyncRunner.run_in_task() + async def on_connection(self, connection): + logging.info(f'=== Connected to {connection}') + connection.listener = self + + @AsyncRunner.run_in_task() + async def on_disconnection(self, reason): + logging.info(f'### Disconnected, reason={reason}') + + +# ----------------------------------------------------------------------------- +def on_hid_control_point_write(_connection, value): + logging.info(f'Control Point Write: {value}') + # ----------------------------------------------------------------------------- def sdp_records(): @@ -355,6 +487,186 @@ def sdp_records(): } +# ----------------------------------------------------------------------------- +def hogp_device(device): + global input_report_characteristic + # Create an 'input report' characteristic to send keyboard reports to the host + input_report_characteristic = Characteristic( + GATT_REPORT_CHARACTERISTIC, + Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY, + Characteristic.READABLE | Characteristic.WRITEABLE, + bytes([0, 0, 0, 0, 0, 0, 0, 0]), + [Descriptor( + GATT_REPORT_REFERENCE_DESCRIPTOR, + Descriptor.READABLE, + bytes([0x01, HID_INPUT_REPORT]), + )], + ) + + # Create an 'output report' characteristic to receive keyboard reports from the host + output_report_characteristic = Characteristic( + GATT_REPORT_CHARACTERISTIC, + Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE, + Characteristic.READABLE | Characteristic.WRITEABLE, + bytes([0]), + [Descriptor( + GATT_REPORT_REFERENCE_DESCRIPTOR, + Descriptor.READABLE, + bytes([0x01, HID_OUTPUT_REPORT]), + )], + ) + + # Add the services to the GATT sever + device.add_services([ + Service( + GATT_DEVICE_INFORMATION_SERVICE, + [ + Characteristic( + GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READABLE, + 'Bumble', + ) + ], + ), + Service( + GATT_HUMAN_INTERFACE_DEVICE_SERVICE, + [ + Characteristic( + GATT_PROTOCOL_MODE_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READABLE, + bytes([HID_REPORT_PROTOCOL]), + ), + Characteristic( + GATT_HID_INFORMATION_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READABLE, + # bcdHID=1.1, bCountryCode=0x00, + # Flags=RemoteWake|NormallyConnectable + bytes([0x11, 0x01, 0x00, 0x03]), + ), + Characteristic( + GATT_HID_CONTROL_POINT_CHARACTERISTIC, + Characteristic.WRITE_WITHOUT_RESPONSE, + Characteristic.WRITEABLE, + CharacteristicValue(write=on_hid_control_point_write), + ), + Characteristic( + GATT_REPORT_MAP_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READABLE, + HID_KEYBOARD_REPORT_MAP, + ), + input_report_characteristic, + output_report_characteristic, + ], + ), + Service( + GATT_BATTERY_SERVICE, + [ + Characteristic( + GATT_BATTERY_LEVEL_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READABLE, + bytes([100]), + ) + ], + ), + ]) + + # Debug print + for attribute in device.gatt_server.attributes: + logging.info(attribute) + + # Set the advertising data + device.advertising_data = bytes( + AdvertisingData([ + ( + AdvertisingData.COMPLETE_LOCAL_NAME, + bytes('Bumble Keyboard', 'utf-8'), + ), + ( + AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, + bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE), + ), + (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)), + (AdvertisingData.FLAGS, bytes([0x05])), + ])) + + # Attach a listener + device.listener = ServerListener(device) + + +async def handle_virtual_cable_unplug(): + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await hid_device.disconnect_interrupt_channel() + await hid_device.disconnect_control_channel() + await hid_device.device.keystore.delete(hid_host_bd_addr) # type: ignore + connection = hid_device.connection + if connection is not None: + await connection.disconnect() + + +def on_get_report_cb(report_id: int, report_type: int, buffer_size: int): + retValue = hid_device.GetSetStatus() + logging.info("GET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "buffer_size:" + + str(buffer_size)) + if report_type == Message.ReportType.INPUT_REPORT: + if report_id == 1: + retValue.data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + retValue.status = hid_device.GetSetReturn.SUCCESS + elif report_id == 2: + retValue.data = bytearray([0x02, 0x00, 0x00, 0x00]) + retValue.status = hid_device.GetSetReturn.SUCCESS + else: + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND + + return retValue + + +def on_set_report_cb(report_id: int, report_type: int, report_size: int, data: bytes): + retValue = hid_device.GetSetStatus() + logging.info("SET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "report_size " + + str(report_size) + "data:" + str(data)) + + if report_type == Message.ReportType.FEATURE_REPORT: + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_type == Message.ReportType.INPUT_REPORT: + if report_id == 1 and report_size != 9: + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_id == 2 and report_size != 4: + retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER + elif report_id == 3: + retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND + else: + retValue.status = hid_device.GetSetReturn.SUCCESS + else: + retValue.status = hid_device.GetSetReturn.SUCCESS + + return retValue + + +def on_get_protocol_cb(): + retValue = hid_device.GetSetStatus() + retValue.data = protocol_mode.to_bytes() + retValue.status = hid_device.GetSetReturn.SUCCESS + return retValue + + +def on_set_protocol_cb(protocol: int): + retValue = hid_device.GetSetStatus() + # We do not support SET_PROTOCOL. + logging.info(f"SET_PROTOCOL report_id: {protocol}") + retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST + return retValue + + +def on_virtual_cable_unplug_cb(): + logging.info('Received Virtual Cable Unplug') + asyncio.create_task(handle_virtual_cable_unplug()) + + # This class implements the Hid Pandora interface. class HIDService(HIDServicer): @@ -364,27 +676,43 @@ class HIDService(HIDServicer): super().__init__() self.device = device self.device.sdp_service_records.update(sdp_records()) + hogp_device(self.device) logging.info(f'Hid device register: ') global hid_device hid_device = HID_Device(self.device) + # Register for call backs + hid_device.register_get_report_cb(on_get_report_cb) + hid_device.register_set_report_cb(on_set_report_cb) + hid_device.register_get_protocol_cb(on_get_protocol_cb) + hid_device.register_set_protocol_cb(on_set_protocol_cb) + # Register for virtual cable unplug call back + hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb) + @utils.rpc async def ConnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: - logging.info(f'ConnectHidHost') - hid_host_bd_addr = str(hid_device.remote_device_bd_address) - connection = await self.device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT) - await connection.authenticate() - await connection.encrypt() - if hid_device is not None: + logging.info(f'ConnectHost') + try: + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + connection = await self.device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT) + await connection.authenticate() + await connection.encrypt() await hid_device.connect_control_channel() await hid_device.connect_interrupt_channel() + except AttributeError as e: + logging.error(f'Device does not exist') + raise e + except (HCI_StatusError, ProtocolError) as e: + logging.error(f"Connection failure error: {e}") + raise e return empty_pb2.Empty() + @utils.rpc async def DisconnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: - logging.info(f'DisconnectHidHost') - if hid_device is not None: + logging.info(f'DisconnectHost') + try: await hid_device.disconnect_interrupt_channel() await hid_device.disconnect_control_channel() connection = hid_device.connection @@ -392,4 +720,25 @@ class HIDService(HIDServicer): await connection.disconnect() else: logging.info(f'Already disconnected from Hid Host') + except AttributeError as e: + logging.error(f'Device does not exist') + raise e + + return empty_pb2.Empty() + + @utils.rpc + async def VirtualCableUnplugHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty: + + logging.info(f'VirtualCableUnplugHost') + try: + hid_device.virtual_cable_unplug() + try: + hid_host_bd_addr = str(hid_device.remote_device_bd_address) + await hid_device.device.keystore.delete(hid_host_bd_addr) + except KeyError: + logging.error(f'Device not found or Device already unpaired.') + raise + except AttributeError as e: + logging.exception(f'Device does not exist') + raise e return empty_pb2.Empty() |