summaryrefslogtreecommitdiff
path: root/pandora
diff options
context:
space:
mode:
author Gopi Sakshihally Bhuthaiah (xWF) <bhuthaiah@google.com> 2024-07-30 15:40:52 +0000
committer Gerrit Code Review <noreply-gerritcodereview@google.com> 2024-07-30 15:40:52 +0000
commitc80c3632d61f6dea3f225bb6a0008418e13d6956 (patch)
treec633c65aa90387ab5ed594f1fb905d64df63dc93 /pandora
parent26b196f6ada546ae6ca194988b94a4114955426a (diff)
parent7f8500791c76cb80d28c987499c0c08137c49245 (diff)
Merge "Bumble infra changes" into main
Diffstat (limited to 'pandora')
-rw-r--r--pandora/interfaces/pandora_experimental/hid.proto2
-rw-r--r--pandora/server/bumble_experimental/hid.py369
2 files changed, 361 insertions, 10 deletions
diff --git a/pandora/interfaces/pandora_experimental/hid.proto b/pandora/interfaces/pandora_experimental/hid.proto
index 1eb22768c6..7b1e7d2c8a 100644
--- a/pandora/interfaces/pandora_experimental/hid.proto
+++ b/pandora/interfaces/pandora_experimental/hid.proto
@@ -11,6 +11,8 @@ service HID {
rpc ConnectHost(google.protobuf.Empty) returns (google.protobuf.Empty);
// Disconnect HID Host
rpc DisconnectHost(google.protobuf.Empty) returns (google.protobuf.Empty);
+ // Virtual Cable Unplug HID Host
+ rpc VirtualCableUnplugHost(google.protobuf.Empty) returns (google.protobuf.Empty);
// Send a SET_REPORT command, acting as a HID host, to a connected HID device
rpc SendHostReport(SendHostReportRequest) returns (SendHostReportResponse);
}
diff --git a/pandora/server/bumble_experimental/hid.py b/pandora/server/bumble_experimental/hid.py
index 60b4ce48b3..512ff2204f 100644
--- a/pandora/server/bumble_experimental/hid.py
+++ b/pandora/server/bumble_experimental/hid.py
@@ -3,20 +3,28 @@ import asyncio
import grpc
import grpc.aio
import logging
+import struct
from bumble.device import Device
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
from pandora_experimental.hid_grpc_aio import HIDServicer
+from bumble.pandora import utils
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_L2CAP_PROTOCOL_ID,
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
BT_HIDP_PROTOCOL_ID,
UUID,
+ ProtocolError,
)
+from bumble.hci import (
+ HCI_StatusError,
+ HCI_CONNECTION_ALREADY_EXISTS_ERROR,
+ HCI_PAGE_TIMEOUT_ERROR,
+)
from bumble.hid import (
Device as HID_Device,
HID_CONTROL_PSM,
@@ -205,13 +213,137 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor
0x02, # . Report Count (2)
0x81,
0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position)
- 0xC0, # . End Collection
- 0xC0, # End Collection
+ 0xC0, # . End Collection (Physical)
+ 0xC0, # End Collection (Application)
])
# Default protocol mode set to report protocol
protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL
+from bumble.core import AdvertisingData
+from bumble.device import Device, Connection, Peer
+from bumble.gatt import (
+ Descriptor,
+ Service,
+ Characteristic,
+ CharacteristicValue,
+ GATT_DEVICE_INFORMATION_SERVICE,
+ GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
+ GATT_BATTERY_SERVICE,
+ GATT_BATTERY_LEVEL_CHARACTERISTIC,
+ GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
+ GATT_REPORT_CHARACTERISTIC,
+ GATT_REPORT_MAP_CHARACTERISTIC,
+ GATT_PROTOCOL_MODE_CHARACTERISTIC,
+ GATT_HID_INFORMATION_CHARACTERISTIC,
+ GATT_HID_CONTROL_POINT_CHARACTERISTIC,
+ GATT_REPORT_REFERENCE_DESCRIPTOR,
+)
+
+# -----------------------------------------------------------------------------
+
+# Protocol Modes (HID Specification V1.1.1 Section 2.1.2)
+HID_BOOT_PROTOCOL = 0x00
+HID_REPORT_PROTOCOL = 0x01
+
+# Report Types (HID Specification V1.1.1 Section 2.1.1)
+HID_INPUT_REPORT = 0x01
+HID_OUTPUT_REPORT = 0x02
+HID_FEATURE_REPORT = 0x03
+
+# Report Map
+HID_KEYBOARD_REPORT_MAP = bytes(
+ # pylint: disable=line-too-long
+ [
+ 0x05,
+ 0x01, # Usage Page (Generic Desktop Controls)
+ 0x09,
+ 0x06, # Usage (Keyboard)
+ 0xA1,
+ 0x01, # Collection (Application)
+ 0x85,
+ 0x01, # . Report ID (1)
+ 0x05,
+ 0x07, # . Usage Page (Keyboard/Keypad)
+ 0x19,
+ 0xE0, # . Usage Minimum (0xE0)
+ 0x29,
+ 0xE7, # . Usage Maximum (0xE7)
+ 0x15,
+ 0x00, # . Logical Minimum (0)
+ 0x25,
+ 0x01, # . Logical Maximum (1)
+ 0x75,
+ 0x01, # . Report Size (1)
+ 0x95,
+ 0x08, # . Report Count (8)
+ 0x81,
+ 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
+ 0x95,
+ 0x01, # . Report Count (1)
+ 0x75,
+ 0x08, # . Report Size (8)
+ 0x81,
+ 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
+ 0x95,
+ 0x06, # . Report Count (6)
+ 0x75,
+ 0x08, # . Report Size (8)
+ 0x15,
+ 0x00, # . Logical Minimum (0x00)
+ 0x25,
+ 0x94, # . Logical Maximum (0x94)
+ 0x05,
+ 0x07, # . Usage Page (Keyboard/Keypad)
+ 0x19,
+ 0x00, # . Usage Minimum (0x00)
+ 0x29,
+ 0x94, # . Usage Maximum (0x94)
+ 0x81,
+ 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
+ 0x95,
+ 0x05, # . Report Count (5)
+ 0x75,
+ 0x01, # . Report Size (1)
+ 0x05,
+ 0x08, # . Usage Page (LEDs)
+ 0x19,
+ 0x01, # . Usage Minimum (Num Lock)
+ 0x29,
+ 0x05, # . Usage Maximum (Kana)
+ 0x91,
+ 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
+ 0x95,
+ 0x01, # . Report Count (1)
+ 0x75,
+ 0x03, # . Report Size (3)
+ 0x91,
+ 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
+ 0xC0, # End Collection
+ ])
+
+
+# -----------------------------------------------------------------------------
+# pylint: disable=invalid-overridden-method
+class ServerListener(Device.Listener, Connection.Listener):
+
+ def __init__(self, device):
+ self.device = device
+
+ @AsyncRunner.run_in_task()
+ async def on_connection(self, connection):
+ logging.info(f'=== Connected to {connection}')
+ connection.listener = self
+
+ @AsyncRunner.run_in_task()
+ async def on_disconnection(self, reason):
+ logging.info(f'### Disconnected, reason={reason}')
+
+
+# -----------------------------------------------------------------------------
+def on_hid_control_point_write(_connection, value):
+ logging.info(f'Control Point Write: {value}')
+
# -----------------------------------------------------------------------------
def sdp_records():
@@ -355,6 +487,186 @@ def sdp_records():
}
+# -----------------------------------------------------------------------------
+def hogp_device(device):
+ global input_report_characteristic
+ # Create an 'input report' characteristic to send keyboard reports to the host
+ input_report_characteristic = Characteristic(
+ GATT_REPORT_CHARACTERISTIC,
+ Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.Properties.NOTIFY,
+ Characteristic.READABLE | Characteristic.WRITEABLE,
+ bytes([0, 0, 0, 0, 0, 0, 0, 0]),
+ [Descriptor(
+ GATT_REPORT_REFERENCE_DESCRIPTOR,
+ Descriptor.READABLE,
+ bytes([0x01, HID_INPUT_REPORT]),
+ )],
+ )
+
+ # Create an 'output report' characteristic to receive keyboard reports from the host
+ output_report_characteristic = Characteristic(
+ GATT_REPORT_CHARACTERISTIC,
+ Characteristic.Properties.READ | Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
+ Characteristic.READABLE | Characteristic.WRITEABLE,
+ bytes([0]),
+ [Descriptor(
+ GATT_REPORT_REFERENCE_DESCRIPTOR,
+ Descriptor.READABLE,
+ bytes([0x01, HID_OUTPUT_REPORT]),
+ )],
+ )
+
+ # Add the services to the GATT sever
+ device.add_services([
+ Service(
+ GATT_DEVICE_INFORMATION_SERVICE,
+ [
+ Characteristic(
+ GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
+ Characteristic.Properties.READ,
+ Characteristic.READABLE,
+ 'Bumble',
+ )
+ ],
+ ),
+ Service(
+ GATT_HUMAN_INTERFACE_DEVICE_SERVICE,
+ [
+ Characteristic(
+ GATT_PROTOCOL_MODE_CHARACTERISTIC,
+ Characteristic.Properties.READ,
+ Characteristic.READABLE,
+ bytes([HID_REPORT_PROTOCOL]),
+ ),
+ Characteristic(
+ GATT_HID_INFORMATION_CHARACTERISTIC,
+ Characteristic.Properties.READ,
+ Characteristic.READABLE,
+ # bcdHID=1.1, bCountryCode=0x00,
+ # Flags=RemoteWake|NormallyConnectable
+ bytes([0x11, 0x01, 0x00, 0x03]),
+ ),
+ Characteristic(
+ GATT_HID_CONTROL_POINT_CHARACTERISTIC,
+ Characteristic.WRITE_WITHOUT_RESPONSE,
+ Characteristic.WRITEABLE,
+ CharacteristicValue(write=on_hid_control_point_write),
+ ),
+ Characteristic(
+ GATT_REPORT_MAP_CHARACTERISTIC,
+ Characteristic.Properties.READ,
+ Characteristic.READABLE,
+ HID_KEYBOARD_REPORT_MAP,
+ ),
+ input_report_characteristic,
+ output_report_characteristic,
+ ],
+ ),
+ Service(
+ GATT_BATTERY_SERVICE,
+ [
+ Characteristic(
+ GATT_BATTERY_LEVEL_CHARACTERISTIC,
+ Characteristic.Properties.READ,
+ Characteristic.READABLE,
+ bytes([100]),
+ )
+ ],
+ ),
+ ])
+
+ # Debug print
+ for attribute in device.gatt_server.attributes:
+ logging.info(attribute)
+
+ # Set the advertising data
+ device.advertising_data = bytes(
+ AdvertisingData([
+ (
+ AdvertisingData.COMPLETE_LOCAL_NAME,
+ bytes('Bumble Keyboard', 'utf-8'),
+ ),
+ (
+ AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
+ bytes(GATT_HUMAN_INTERFACE_DEVICE_SERVICE),
+ ),
+ (AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
+ (AdvertisingData.FLAGS, bytes([0x05])),
+ ]))
+
+ # Attach a listener
+ device.listener = ServerListener(device)
+
+
+async def handle_virtual_cable_unplug():
+ hid_host_bd_addr = str(hid_device.remote_device_bd_address)
+ await hid_device.disconnect_interrupt_channel()
+ await hid_device.disconnect_control_channel()
+ await hid_device.device.keystore.delete(hid_host_bd_addr) # type: ignore
+ connection = hid_device.connection
+ if connection is not None:
+ await connection.disconnect()
+
+
+def on_get_report_cb(report_id: int, report_type: int, buffer_size: int):
+ retValue = hid_device.GetSetStatus()
+ logging.info("GET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "buffer_size:" +
+ str(buffer_size))
+ if report_type == Message.ReportType.INPUT_REPORT:
+ if report_id == 1:
+ retValue.data = bytearray([0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
+ retValue.status = hid_device.GetSetReturn.SUCCESS
+ elif report_id == 2:
+ retValue.data = bytearray([0x02, 0x00, 0x00, 0x00])
+ retValue.status = hid_device.GetSetReturn.SUCCESS
+ else:
+ retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
+
+ return retValue
+
+
+def on_set_report_cb(report_id: int, report_type: int, report_size: int, data: bytes):
+ retValue = hid_device.GetSetStatus()
+ logging.info("SET_REPORT report_id: " + str(report_id) + "report_type: " + str(report_type) + "report_size " +
+ str(report_size) + "data:" + str(data))
+
+ if report_type == Message.ReportType.FEATURE_REPORT:
+ retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
+ elif report_type == Message.ReportType.INPUT_REPORT:
+ if report_id == 1 and report_size != 9:
+ retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
+ elif report_id == 2 and report_size != 4:
+ retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
+ elif report_id == 3:
+ retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
+ else:
+ retValue.status = hid_device.GetSetReturn.SUCCESS
+ else:
+ retValue.status = hid_device.GetSetReturn.SUCCESS
+
+ return retValue
+
+
+def on_get_protocol_cb():
+ retValue = hid_device.GetSetStatus()
+ retValue.data = protocol_mode.to_bytes()
+ retValue.status = hid_device.GetSetReturn.SUCCESS
+ return retValue
+
+
+def on_set_protocol_cb(protocol: int):
+ retValue = hid_device.GetSetStatus()
+ # We do not support SET_PROTOCOL.
+ logging.info(f"SET_PROTOCOL report_id: {protocol}")
+ retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
+ return retValue
+
+
+def on_virtual_cable_unplug_cb():
+ logging.info('Received Virtual Cable Unplug')
+ asyncio.create_task(handle_virtual_cable_unplug())
+
+
# This class implements the Hid Pandora interface.
class HIDService(HIDServicer):
@@ -364,27 +676,43 @@ class HIDService(HIDServicer):
super().__init__()
self.device = device
self.device.sdp_service_records.update(sdp_records())
+ hogp_device(self.device)
logging.info(f'Hid device register: ')
global hid_device
hid_device = HID_Device(self.device)
+ # Register for call backs
+ hid_device.register_get_report_cb(on_get_report_cb)
+ hid_device.register_set_report_cb(on_set_report_cb)
+ hid_device.register_get_protocol_cb(on_get_protocol_cb)
+ hid_device.register_set_protocol_cb(on_set_protocol_cb)
+ # Register for virtual cable unplug call back
+ hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb)
+ @utils.rpc
async def ConnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
- logging.info(f'ConnectHidHost')
- hid_host_bd_addr = str(hid_device.remote_device_bd_address)
- connection = await self.device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT)
- await connection.authenticate()
- await connection.encrypt()
- if hid_device is not None:
+ logging.info(f'ConnectHost')
+ try:
+ hid_host_bd_addr = str(hid_device.remote_device_bd_address)
+ connection = await self.device.connect(hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT)
+ await connection.authenticate()
+ await connection.encrypt()
await hid_device.connect_control_channel()
await hid_device.connect_interrupt_channel()
+ except AttributeError as e:
+ logging.error(f'Device does not exist')
+ raise e
+ except (HCI_StatusError, ProtocolError) as e:
+ logging.error(f"Connection failure error: {e}")
+ raise e
return empty_pb2.Empty()
+ @utils.rpc
async def DisconnectHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
- logging.info(f'DisconnectHidHost')
- if hid_device is not None:
+ logging.info(f'DisconnectHost')
+ try:
await hid_device.disconnect_interrupt_channel()
await hid_device.disconnect_control_channel()
connection = hid_device.connection
@@ -392,4 +720,25 @@ class HIDService(HIDServicer):
await connection.disconnect()
else:
logging.info(f'Already disconnected from Hid Host')
+ except AttributeError as e:
+ logging.error(f'Device does not exist')
+ raise e
+
+ return empty_pb2.Empty()
+
+ @utils.rpc
+ async def VirtualCableUnplugHost(self, request: empty_pb2.Empty, context: grpc.ServicerContext) -> empty_pb2.Empty:
+
+ logging.info(f'VirtualCableUnplugHost')
+ try:
+ hid_device.virtual_cable_unplug()
+ try:
+ hid_host_bd_addr = str(hid_device.remote_device_bd_address)
+ await hid_device.device.keystore.delete(hid_host_bd_addr)
+ except KeyError:
+ logging.error(f'Device not found or Device already unpaired.')
+ raise
+ except AttributeError as e:
+ logging.exception(f'Device does not exist')
+ raise e
return empty_pb2.Empty()