diff options
Diffstat (limited to 'floss')
-rw-r--r-- | floss/build/Dockerfile | 2 | ||||
-rw-r--r-- | floss/pandora/floss/telephony_client.py | 334 | ||||
-rw-r--r-- | floss/pandora/server/bluetooth.py | 51 | ||||
-rw-r--r-- | floss/pandora/server/hfp.py | 191 | ||||
-rw-r--r-- | floss/pandora/server/modem.py | 54 | ||||
-rw-r--r-- | floss/pandora/server/server.py | 10 |
6 files changed, 640 insertions, 2 deletions
diff --git a/floss/build/Dockerfile b/floss/build/Dockerfile index 2be3277773..54fd57ca46 100644 --- a/floss/build/Dockerfile +++ b/floss/build/Dockerfile @@ -61,7 +61,7 @@ RUN apt-get update && \ # Next install the Rust toolchain. Download the toolchain to the local folder # using curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs ADD rustup/rustup.sh /tmp -RUN /tmp/rustup.sh -y --default-toolchain 1.68.2 +RUN /tmp/rustup.sh -y --default-toolchain 1.77.1 # Add .cargo/bin to $PATH ENV PATH="/root/.cargo/bin:${PATH}" diff --git a/floss/pandora/floss/telephony_client.py b/floss/pandora/floss/telephony_client.py new file mode 100644 index 0000000000..afa9417fdf --- /dev/null +++ b/floss/pandora/floss/telephony_client.py @@ -0,0 +1,334 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Client class to access the Floss telephony interface.""" +import logging + +from floss.pandora.floss import observer_base +from floss.pandora.floss import utils + + +class BluetoothTelephonyCallbacks: + """Callbacks for the telephony interface. + + Implement this to observe these callbacks when exporting callbacks via register_callback. + """ + + def on_telephony_use(self, addr, state): + """Called when telephony is in use. + + Args: + addr: The address of the telephony device. + state: The boolean value indicating the telephony state. + """ + pass + + +class FlossTelephonyClient: + """Handles method calls and callbacks from the telephony interface.""" + + TELEPHONY_SERVICE = 'org.chromium.bluetooth' + TELEPHONY_INTERFACE = 'org.chromium.bluetooth.BluetoothTelephony' + TELEPHONY_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/telephony' + TELEPHONY_CB_INTF = 'org.chromium.bluetooth.BluetoothTelephonyCallback' + TELEPHONY_CB_OBJ_NAME = 'test_telephony_client' + + class ExportedTelephonyCallbacks(observer_base.ObserverBase): + """ + <node> + <interface name="org.chromium.bluetooth.BluetoothTelephonyCallback"> + <method name="OnTelephonyUse"> + <arg type="s" name="add" direction="in" /> + <arg type="b" name="state" direction="in" /> + </method> + </interface> + </node> + """ + + def __init__(self): + """Constructs exported callbacks object.""" + observer_base.ObserverBase.__init__(self) + + def OnTelephonyUse(self, addr, state): + """Handles telephony use callback. + + Args: + addr: The address of the telephony device. + state: The boolean value indicating the telephony state. + """ + + for observer in self.observers.values(): + observer.on_telephony_use(addr, state) + + def __init__(self, bus, hci): + """Constructs the client. + + Args: + bus: D-Bus bus over which we'll establish connections. + hci: HCI adapter index. Get this value from `get_default_adapter` on FlossManagerClient. + """ + self.bus = bus + self.hci = hci + self.objpath = self.TELEPHONY_OBJECT_PATTERN.format(hci) + + # We don't register callbacks by default. + self.callbacks = None + + def __del__(self): + """Destructor.""" + del self.callbacks + + @utils.glib_callback() + def on_telephony_use(self, addr, state): + """Handles telephony use callback. + + Args: + addr: The address of the telephony device. + state: The boolean value indicating the telephony state. + """ + logging.debug('on_telephony_use: addr: %s, state: %s', addr, state) + + def _make_dbus_phone_number(self, number): + """Makes struct for phone number D-Bus. + + Args: + number : The phone number to use. + + Returns: + Dictionary of phone number. + """ + return utils.dbus_optional_value('s', number) + + @utils.glib_call(False) + def has_proxy(self): + """Checks whether telephony proxy can be acquired.""" + return bool(self.proxy()) + + def proxy(self): + """Gets proxy object to telephony interface for method calls.""" + return self.bus.get(self.TELEPHONY_SERVICE, self.objpath)[self.TELEPHONY_INTERFACE] + + @utils.glib_call(None) + def register_telephony_callback(self): + """Registers telephony callback for this client if one doesn't already exist. + + Returns: + True on success, False on failure, None on DBus error. + """ + if self.callbacks: + return True + + # Create and publish callbacks + self.callbacks = self.ExportedTelephonyCallbacks() + self.callbacks.add_observer('telephony_client', self) + objpath = utils.generate_dbus_cb_objpath(self.TELEPHONY_CB_OBJ_NAME, self.hci) + self.bus.register_object(objpath, self.callbacks, None) + + # Register published callbacks with manager daemon + return self.proxy().RegisterTelephonyCallback(objpath) + + @utils.glib_call(False) + def set_network_available(self, network_available): + """Sets network availability status. + + Args: + network_available: A boolean value indicating whether the device is connected to the cellular network. + + Returns: + True on success, False otherwise. + """ + self.proxy().SetNetworkAvailable(network_available) + return True + + @utils.glib_call(False) + def set_roaming(self, roaming): + """Sets roaming mode. + + Args: + roaming: A boolean value indicating whether the device is in roaming mode. + + Returns: + True on success, False otherwise. + """ + self.proxy().SetRoaming(roaming) + return True + + @utils.glib_call(None) + def set_signal_strength(self, signal_strength): + """Sets signal strength. + + Args: + signal_strength: The signal strength value to be set, ranging from 0 to 5. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().SetSignalStrength(signal_strength) + + @utils.glib_call(None) + def set_battery_level(self, battery_level): + """Sets battery level. + + Args: + battery_level: The battery level value to be set, ranging from 0 to 5. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().SetBatteryLevel(battery_level) + + @utils.glib_call(False) + def set_phone_ops_enabled(self, enable): + """Sets phone operations status. + + Args: + enable: A boolean value indicating whether phone operations are enabled. + + Returns: + True on success, False otherwise. + """ + self.proxy().SetPhoneOpsEnabled(enable) + return True + + @utils.glib_call(False) + def set_mps_qualification_enabled(self, enable): + """Sets MPS qualification status. + + Args: + enable: A boolean value indicating whether MPS qualification is enabled. + + Returns: + True on success, False otherwise. + """ + self.proxy().SetMpsQualificationEnabled(enable) + return True + + @utils.glib_call(None) + def incoming_call(self, number): + """Initiates an incoming call with the specified phone number. + + Args: + number: The phone number of the incoming call. + + Returns: + True on success, False on failure, None on DBus error. + """ + + return self.proxy().IncomingCall(number) + + @utils.glib_call(None) + def dialing_call(self, number): + """Initiates a dialing call with the specified phone number. + + Args: + number: The phone number to dial. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().DialingCall(number) + + @utils.glib_call(None) + def answer_call(self): + """Answers an incoming or dialing call. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().AnswerCall() + + @utils.glib_call(None) + def hangup_call(self): + """Hangs up an active, incoming, or dialing call. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().HangupCall() + + @utils.glib_call(None) + def set_last_call(self, number=None): + """Sets last call with the specified phone number. + + Args: + number: Optional phone number value to be set as the last call, Defaults to None if not provided. + Returns: + True on success, False on failure, None on DBus error. + """ + number = self._make_dbus_phone_number(number) + return self.proxy().SetLastCall(number) + + @utils.glib_call(None) + def set_memory_call(self, number=None): + """Sets memory call with the specified phone number. + + Args: + number: Optional phone number value to be set as the last call, Defaults to None if not provided. + + Returns: + True on success, False on failure, None on DBus error. + """ + number = self._make_dbus_phone_number(number) + return self.proxy().SetMemoryCall(number) + + @utils.glib_call(None) + def release_held(self): + """Releases all of the held calls. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().ReleaseHeld() + + @utils.glib_call(None) + def release_active_accept_held(self): + """Releases the active call and accepts a held call. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().ReleaseActiveAcceptHeld() + + @utils.glib_call(None) + def hold_active_accept_held(self): + """Holds the active call and accepts a held call. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().HoldActiveAcceptHeld() + + @utils.glib_call(None) + def audio_connect(self, address): + """Initiates an audio connection to the remote device. + + Args: + address: The address of the remote device for audio connection. + + Returns: + True on success, False on failure, None on DBus error. + """ + return self.proxy().AudioConnect(address) + + @utils.glib_call(False) + def audio_disconnect(self, address): + """Disconnects the audio connection to the remote device. + + Args: + address: The address of the remote device for audio disconnection. + + Returns: + True on success, False otherwise. + """ + self.proxy().AudioDisconnect(address) + return True diff --git a/floss/pandora/server/bluetooth.py b/floss/pandora/server/bluetooth.py index fd2d3d3a6c..41254a9ada 100644 --- a/floss/pandora/server/bluetooth.py +++ b/floss/pandora/server/bluetooth.py @@ -27,6 +27,7 @@ from floss.pandora.floss import media_client from floss.pandora.floss import qa_client from floss.pandora.floss import scanner_client from floss.pandora.floss import socket_manager +from floss.pandora.floss import telephony_client from floss.pandora.floss import utils from gi.repository import GLib import pydbus @@ -71,6 +72,7 @@ class Bluetooth(object): self.gatt_client = gatt_client.FlossGattClient(self.bus, self.DEFAULT_ADAPTER) self.gatt_server = gatt_server.FlossGattServer(self.bus, self.DEFAULT_ADAPTER) self.socket_manager = socket_manager.FlossSocketManagerClient(self.bus, self.DEFAULT_ADAPTER) + self.telephony_client = telephony_client.FlossTelephonyClient(self.bus, self.DEFAULT_ADAPTER) def __del__(self): if not self.is_clean: @@ -147,6 +149,9 @@ class Bluetooth(object): if not self.socket_manager.register_callbacks(): logging.error('scanner_client: Failed to register callbacks') return False + if not self.telephony_client.register_telephony_callback(): + logging.error('telephony_client: Failed to register callbacks') + return False return True def is_bluetoothd_proxy_valid(self): @@ -161,7 +166,8 @@ class Bluetooth(object): self.media_client.has_proxy(), self.gatt_client.has_proxy(), self.gatt_server.has_proxy(), - self.socket_manager.has_proxy() + self.socket_manager.has_proxy(), + self.telephony_client.has_proxy() ]) if not proxy_ready: @@ -199,6 +205,7 @@ class Bluetooth(object): self.gatt_client = gatt_client.FlossGattClient(self.bus, default_adapter) self.gatt_server = gatt_server.FlossGattServer(self.bus, default_adapter) self.socket_manager = socket_manager.FlossSocketManagerClient(self.bus, default_adapter) + self.telephony_client = telephony_client.FlossTelephonyClient(self.bus, default_adapter) try: utils.poll_for_condition( @@ -352,6 +359,45 @@ class Bluetooth(object): def write_characteristic(self, address, handle, write_type, auth_req, value): return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value) + def set_mps_qualification_enabled(self, enable): + return self.telephony_client.set_mps_qualification_enabled(enable) + + def incoming_call(self, number): + return self.telephony_client.incoming_call(number) + + def set_phone_ops_enabled(self, enable): + return self.telephony_client.set_phone_ops_enabled(enable) + + def dial_call(self, number): + return self.telephony_client.dialing_call(number) + + def answer_call(self): + return self.telephony_client.answer_call() + + def swap_active_call(self): + return self.telephony_client.hold_active_accept_held() + + def set_last_call(self, number=None): + return self.telephony_client.set_last_call(number) + + def set_memory_call(self, number=None): + return self.telephony_client.set_memory_call(number) + + def get_connected_audio_devices(self): + return self.media_client.devices + + def audio_connect(self, address): + return self.telephony_client.audio_connect(address) + + def audio_disconnect(self, address): + return self.telephony_client.audio_disconnect(address) + + def hangup_call(self): + return self.telephony_client.hangup_call() + + def set_battery_level(self, battery_level): + return self.telephony_client.set_battery_level(battery_level) + def gatt_connect(self, address, is_direct, transport): return self.gatt_client.connect_client(address, is_direct, transport) @@ -410,3 +456,6 @@ class Bluetooth(object): def disconnect_media(self, address): return self.media_client.disconnect(address) + + def incoming_call(self, number): + return self.telephony_client.incoming_call(number) diff --git a/floss/pandora/server/hfp.py b/floss/pandora/server/hfp.py new file mode 100644 index 0000000000..59f2e81318 --- /dev/null +++ b/floss/pandora/server/hfp.py @@ -0,0 +1,191 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""HFP grpc interface.""" + +import math + +from floss.pandora.floss import utils +from floss.pandora.server import bluetooth as bluetooth_module +from google.protobuf import empty_pb2 +import grpc +from pandora_experimental import hfp_grpc_aio +from pandora_experimental import hfp_pb2 + + +class HFPService(hfp_grpc_aio.HFPServicer): + """Service to trigger Bluetooth HFP procedures. + + This class implements the Pandora bluetooth test interfaces, + where the meta class definition is automatically generated by the protobuf. + The interface definition can be found in: + https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/hfp.proto + """ + + def __init__(self, bluetooth: bluetooth_module.Bluetooth): + self.bluetooth = bluetooth + + def enable_phone_for_testing(self): + self.bluetooth.set_phone_ops_enabled(True) + self.bluetooth.set_mps_qualification_enabled(True) + + async def EnableSlc(self, request: hfp_pb2.EnableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty: + self.enable_phone_for_testing() + address = utils.connection_from(request.connection).address + self.bluetooth.connect_device(address) + return empty_pb2.Empty() + + async def EnableSlcAsHandsfree(self, request: hfp_pb2.EnableSlcAsHandsfreeRequest, + context: grpc.ServicerContext) -> empty_pb2.Empty: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details("Method not implemented!") # type: ignore + raise NotImplementedError("Method not implemented!") + + async def DisableSlc(self, request: hfp_pb2.DisableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty: + address = utils.connection_from(request.connection).address + self.bluetooth.disconnect_device(address) + return empty_pb2.Empty() + + async def DisableSlcAsHandsfree(self, request: hfp_pb2.DisableSlcAsHandsfreeRequest, + context: grpc.ServicerContext) -> empty_pb2.Empty: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details("Method not implemented!") # type: ignore + raise NotImplementedError("Method not implemented!") + + async def DeclineCall(self, request: hfp_pb2.DeclineCallRequest, + context: grpc.ServicerContext) -> hfp_pb2.DeclineCallResponse: + self.enable_phone_for_testing() + self.bluetooth.hangup_call() + return hfp_pb2.DeclineCallResponse() + + async def DeclineCallAsHandsfree(self, request: hfp_pb2.DeclineCallAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.DeclineCallAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details("Method not implemented!") # type: ignore + raise NotImplementedError("Method not implemented!") + + async def SetBatteryLevel(self, request: hfp_pb2.SetBatteryLevelRequest, + context: grpc.ServicerContext) -> (empty_pb2.Empty): + self.enable_phone_for_testing() + if request.battery_percentage > 100 or request.battery_percentage < 0: + await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Wrong battery percentage.') + self.bluetooth.set_battery_level(math.floor((request.battery_percentage / 100) * 5)) + return empty_pb2.Empty() + + async def ConnectToAudioAsHandsfree(self, request: hfp_pb2.ConnectToAudioAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.ConnectToAudioAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details("Method not implemented!") # type: ignore + raise NotImplementedError("Method not implemented!") + + async def DisconnectFromAudioAsHandsfree( + self, request: hfp_pb2.DisconnectFromAudioAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.DisconnectFromAudioAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details("Method not implemented!") # type: ignore + raise NotImplementedError("Method not implemented!") + + async def SetVoiceRecognition(self, request: hfp_pb2.SetVoiceRecognitionRequest, + context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details("Method not implemented!") # type: ignore + raise NotImplementedError("Method not implemented!") + + async def SetVoiceRecognitionAsHandsfree( + self, request: hfp_pb2.SetVoiceRecognitionAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details("Method not implemented!") # type: ignore + raise NotImplementedError("Method not implemented!") + + async def MakeCall(self, request: hfp_pb2.MakeCallRequest, + context: grpc.ServicerContext) -> hfp_pb2.MakeCallResponse: + self.enable_phone_for_testing() + number = request.number + if number is None or len(number) == 0: + await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.') + call_result = self.bluetooth.dial_call(number) + if call_result is None or not call_result: + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to make a call.') + return hfp_pb2.MakeCallResponse() + + async def MakeCallAsHandsfree(self, request: hfp_pb2.MakeCallAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.MakeCallAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def AnswerCall(self, request: hfp_pb2.AnswerCallRequest, + context: grpc.ServicerContext) -> hfp_pb2.AnswerCallResponse: + self.enable_phone_for_testing() + answer_result = self.bluetooth.answer_call() + if answer_result is None or not answer_result: + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to answer call.') + return hfp_pb2.AnswerCallResponse() + + async def AnswerCallAsHandsfree(self, request: hfp_pb2.AnswerCallAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.AnswerCallAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def SetAudioPath(self, request: hfp_pb2.SetAudioPathRequest, + context: grpc.ServicerContext) -> hfp_pb2.SetAudioPathResponse: + self.enable_phone_for_testing() + connected_devices = self.bluetooth.get_connected_audio_devices() + if len(connected_devices) == 0: + await context.abort(grpc.StatusCode.INTERNAL, 'No connected devices.') + if request.audio_path == hfp_pb2.AUDIO_PATH_SPEAKERS: + self.bluetooth.audio_disconnect(connected_devices[0]) + elif request.audio_path == hfp_pb2.AUDIO_PATH_HANDSFREE: + self.bluetooth.audio_connect(connected_devices[0]) + return hfp_pb2.SetAudioPathResponse() + + async def SwapActiveCall(self, request: hfp_pb2.SwapActiveCallRequest, + context: grpc.ServicerContext) -> hfp_pb2.SwapActiveCallResponse: + self.enable_phone_for_testing() + swap_result = self.bluetooth.swap_active_call() + if swap_result is None or not swap_result: + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to swap active call.') + return hfp_pb2.SwapActiveCallResponse() + + async def SetInBandRingtone(self, request: hfp_pb2.SetInBandRingtoneRequest, + context: grpc.ServicerContext) -> hfp_pb2.SetInBandRingtoneResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def ClearCallHistory(self, request: hfp_pb2.ClearCallHistoryRequest, + context: grpc.ServicerContext) -> hfp_pb2.ClearCallHistoryResponse: + self.enable_phone_for_testing() + if not all((self.bluetooth.set_memory_call(), self.bluetooth.set_last_call())): + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear hall history.') + return hfp_pb2.ClearCallHistoryResponse() + + async def EndCallAsHandsfree(self, request: hfp_pb2.EndCallAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.EndCallAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def CallTransferAsHandsfree(self, request: hfp_pb2.CallTransferAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.CallTransferAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def SendDtmfFromHandsfree(self, request: hfp_pb2.SendDtmfFromHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.SendDtmfFromHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') diff --git a/floss/pandora/server/modem.py b/floss/pandora/server/modem.py new file mode 100644 index 0000000000..65eaade611 --- /dev/null +++ b/floss/pandora/server/modem.py @@ -0,0 +1,54 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Modem grpc interface.""" + +from floss.pandora.server import bluetooth as bluetooth_module +import grpc +from pandora_experimental import modem_grpc_aio +from pandora_experimental import modem_pb2 + + +class Modem(modem_grpc_aio.ModemServicer): + """Service to trigger modem procedures. + + This class implements the Pandora bluetooth test interfaces, + where the meta class definition is automatically generated by the protobuf. + The interface definition can be found in: + https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/modem.proto + """ + + def __init__(self, bluetooth: bluetooth_module.Bluetooth): + self.bluetooth = bluetooth + + async def Call(self, request: modem_pb2.CallRequest, context: grpc.ServicerContext) -> modem_pb2.CallResponse: + phone_number = request.phone_number + if phone_number is None or len(phone_number) == 0: + await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.') + + call_result = self.bluetooth.incoming_call(phone_number) + if not call_result: + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to receive a call.') + + return modem_pb2.CallResponse() + + async def AnswerCall(self, request: modem_pb2.AnswerCallRequest, + context: grpc.ServicerContext) -> modem_pb2.AnswerCallResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def Close(self, request: modem_pb2.CloseRequest, context: grpc.ServicerContext) -> modem_pb2.CloseResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') diff --git a/floss/pandora/server/server.py b/floss/pandora/server/server.py index 3c58f307fe..426d168558 100644 --- a/floss/pandora/server/server.py +++ b/floss/pandora/server/server.py @@ -19,9 +19,11 @@ import logging from floss.pandora.server import a2dp from floss.pandora.server import bluetooth as bluetooth_module from floss.pandora.server import gatt +from floss.pandora.server import hfp from floss.pandora.server import hid from floss.pandora.server import host from floss.pandora.server import l2cap +from floss.pandora.server import modem from floss.pandora.server import rfcomm from floss.pandora.server import security import grpc @@ -29,8 +31,10 @@ from pandora import a2dp_grpc_aio from pandora import host_grpc_aio from pandora import security_grpc_aio from pandora_experimental import gatt_grpc_aio +from pandora_experimental import hfp_grpc_aio from pandora_experimental import hid_grpc_aio from pandora_experimental import l2cap_grpc_aio +from pandora_experimental import modem_grpc_aio from pandora_experimental import rfcomm_grpc_aio @@ -58,6 +62,12 @@ async def serve(port): gatt_service = gatt.GATTService(bluetooth) gatt_grpc_aio.add_GATTServicer_to_server(gatt_service, server) + modem_service = modem.Modem(bluetooth) + modem_grpc_aio.add_ModemServicer_to_server(modem_service, server) + + hfp_service = hfp.HFPService(bluetooth) + hfp_grpc_aio.add_HFPServicer_to_server(hfp_service, server) + hid_service = hid.HIDService(bluetooth) hid_grpc_aio.add_HIDServicer_to_server(hid_service, server) |