summaryrefslogtreecommitdiff
path: root/floss
diff options
context:
space:
mode:
Diffstat (limited to 'floss')
-rw-r--r--floss/build/Dockerfile2
-rw-r--r--floss/pandora/floss/telephony_client.py334
-rw-r--r--floss/pandora/server/bluetooth.py51
-rw-r--r--floss/pandora/server/hfp.py191
-rw-r--r--floss/pandora/server/modem.py54
-rw-r--r--floss/pandora/server/server.py10
6 files changed, 640 insertions, 2 deletions
diff --git a/floss/build/Dockerfile b/floss/build/Dockerfile
index 2be3277773..54fd57ca46 100644
--- a/floss/build/Dockerfile
+++ b/floss/build/Dockerfile
@@ -61,7 +61,7 @@ RUN apt-get update && \
# Next install the Rust toolchain. Download the toolchain to the local folder
# using curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs
ADD rustup/rustup.sh /tmp
-RUN /tmp/rustup.sh -y --default-toolchain 1.68.2
+RUN /tmp/rustup.sh -y --default-toolchain 1.77.1
# Add .cargo/bin to $PATH
ENV PATH="/root/.cargo/bin:${PATH}"
diff --git a/floss/pandora/floss/telephony_client.py b/floss/pandora/floss/telephony_client.py
new file mode 100644
index 0000000000..afa9417fdf
--- /dev/null
+++ b/floss/pandora/floss/telephony_client.py
@@ -0,0 +1,334 @@
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Client class to access the Floss telephony interface."""
+import logging
+
+from floss.pandora.floss import observer_base
+from floss.pandora.floss import utils
+
+
+class BluetoothTelephonyCallbacks:
+ """Callbacks for the telephony interface.
+
+ Implement this to observe these callbacks when exporting callbacks via register_callback.
+ """
+
+ def on_telephony_use(self, addr, state):
+ """Called when telephony is in use.
+
+ Args:
+ addr: The address of the telephony device.
+ state: The boolean value indicating the telephony state.
+ """
+ pass
+
+
+class FlossTelephonyClient:
+ """Handles method calls and callbacks from the telephony interface."""
+
+ TELEPHONY_SERVICE = 'org.chromium.bluetooth'
+ TELEPHONY_INTERFACE = 'org.chromium.bluetooth.BluetoothTelephony'
+ TELEPHONY_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/telephony'
+ TELEPHONY_CB_INTF = 'org.chromium.bluetooth.BluetoothTelephonyCallback'
+ TELEPHONY_CB_OBJ_NAME = 'test_telephony_client'
+
+ class ExportedTelephonyCallbacks(observer_base.ObserverBase):
+ """
+ <node>
+ <interface name="org.chromium.bluetooth.BluetoothTelephonyCallback">
+ <method name="OnTelephonyUse">
+ <arg type="s" name="add" direction="in" />
+ <arg type="b" name="state" direction="in" />
+ </method>
+ </interface>
+ </node>
+ """
+
+ def __init__(self):
+ """Constructs exported callbacks object."""
+ observer_base.ObserverBase.__init__(self)
+
+ def OnTelephonyUse(self, addr, state):
+ """Handles telephony use callback.
+
+ Args:
+ addr: The address of the telephony device.
+ state: The boolean value indicating the telephony state.
+ """
+
+ for observer in self.observers.values():
+ observer.on_telephony_use(addr, state)
+
+ def __init__(self, bus, hci):
+ """Constructs the client.
+
+ Args:
+ bus: D-Bus bus over which we'll establish connections.
+ hci: HCI adapter index. Get this value from `get_default_adapter` on FlossManagerClient.
+ """
+ self.bus = bus
+ self.hci = hci
+ self.objpath = self.TELEPHONY_OBJECT_PATTERN.format(hci)
+
+ # We don't register callbacks by default.
+ self.callbacks = None
+
+ def __del__(self):
+ """Destructor."""
+ del self.callbacks
+
+ @utils.glib_callback()
+ def on_telephony_use(self, addr, state):
+ """Handles telephony use callback.
+
+ Args:
+ addr: The address of the telephony device.
+ state: The boolean value indicating the telephony state.
+ """
+ logging.debug('on_telephony_use: addr: %s, state: %s', addr, state)
+
+ def _make_dbus_phone_number(self, number):
+ """Makes struct for phone number D-Bus.
+
+ Args:
+ number : The phone number to use.
+
+ Returns:
+ Dictionary of phone number.
+ """
+ return utils.dbus_optional_value('s', number)
+
+ @utils.glib_call(False)
+ def has_proxy(self):
+ """Checks whether telephony proxy can be acquired."""
+ return bool(self.proxy())
+
+ def proxy(self):
+ """Gets proxy object to telephony interface for method calls."""
+ return self.bus.get(self.TELEPHONY_SERVICE, self.objpath)[self.TELEPHONY_INTERFACE]
+
+ @utils.glib_call(None)
+ def register_telephony_callback(self):
+ """Registers telephony callback for this client if one doesn't already exist.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ if self.callbacks:
+ return True
+
+ # Create and publish callbacks
+ self.callbacks = self.ExportedTelephonyCallbacks()
+ self.callbacks.add_observer('telephony_client', self)
+ objpath = utils.generate_dbus_cb_objpath(self.TELEPHONY_CB_OBJ_NAME, self.hci)
+ self.bus.register_object(objpath, self.callbacks, None)
+
+ # Register published callbacks with manager daemon
+ return self.proxy().RegisterTelephonyCallback(objpath)
+
+ @utils.glib_call(False)
+ def set_network_available(self, network_available):
+ """Sets network availability status.
+
+ Args:
+ network_available: A boolean value indicating whether the device is connected to the cellular network.
+
+ Returns:
+ True on success, False otherwise.
+ """
+ self.proxy().SetNetworkAvailable(network_available)
+ return True
+
+ @utils.glib_call(False)
+ def set_roaming(self, roaming):
+ """Sets roaming mode.
+
+ Args:
+ roaming: A boolean value indicating whether the device is in roaming mode.
+
+ Returns:
+ True on success, False otherwise.
+ """
+ self.proxy().SetRoaming(roaming)
+ return True
+
+ @utils.glib_call(None)
+ def set_signal_strength(self, signal_strength):
+ """Sets signal strength.
+
+ Args:
+ signal_strength: The signal strength value to be set, ranging from 0 to 5.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().SetSignalStrength(signal_strength)
+
+ @utils.glib_call(None)
+ def set_battery_level(self, battery_level):
+ """Sets battery level.
+
+ Args:
+ battery_level: The battery level value to be set, ranging from 0 to 5.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().SetBatteryLevel(battery_level)
+
+ @utils.glib_call(False)
+ def set_phone_ops_enabled(self, enable):
+ """Sets phone operations status.
+
+ Args:
+ enable: A boolean value indicating whether phone operations are enabled.
+
+ Returns:
+ True on success, False otherwise.
+ """
+ self.proxy().SetPhoneOpsEnabled(enable)
+ return True
+
+ @utils.glib_call(False)
+ def set_mps_qualification_enabled(self, enable):
+ """Sets MPS qualification status.
+
+ Args:
+ enable: A boolean value indicating whether MPS qualification is enabled.
+
+ Returns:
+ True on success, False otherwise.
+ """
+ self.proxy().SetMpsQualificationEnabled(enable)
+ return True
+
+ @utils.glib_call(None)
+ def incoming_call(self, number):
+ """Initiates an incoming call with the specified phone number.
+
+ Args:
+ number: The phone number of the incoming call.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+
+ return self.proxy().IncomingCall(number)
+
+ @utils.glib_call(None)
+ def dialing_call(self, number):
+ """Initiates a dialing call with the specified phone number.
+
+ Args:
+ number: The phone number to dial.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().DialingCall(number)
+
+ @utils.glib_call(None)
+ def answer_call(self):
+ """Answers an incoming or dialing call.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().AnswerCall()
+
+ @utils.glib_call(None)
+ def hangup_call(self):
+ """Hangs up an active, incoming, or dialing call.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().HangupCall()
+
+ @utils.glib_call(None)
+ def set_last_call(self, number=None):
+ """Sets last call with the specified phone number.
+
+ Args:
+ number: Optional phone number value to be set as the last call, Defaults to None if not provided.
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ number = self._make_dbus_phone_number(number)
+ return self.proxy().SetLastCall(number)
+
+ @utils.glib_call(None)
+ def set_memory_call(self, number=None):
+ """Sets memory call with the specified phone number.
+
+ Args:
+ number: Optional phone number value to be set as the last call, Defaults to None if not provided.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ number = self._make_dbus_phone_number(number)
+ return self.proxy().SetMemoryCall(number)
+
+ @utils.glib_call(None)
+ def release_held(self):
+ """Releases all of the held calls.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().ReleaseHeld()
+
+ @utils.glib_call(None)
+ def release_active_accept_held(self):
+ """Releases the active call and accepts a held call.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().ReleaseActiveAcceptHeld()
+
+ @utils.glib_call(None)
+ def hold_active_accept_held(self):
+ """Holds the active call and accepts a held call.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().HoldActiveAcceptHeld()
+
+ @utils.glib_call(None)
+ def audio_connect(self, address):
+ """Initiates an audio connection to the remote device.
+
+ Args:
+ address: The address of the remote device for audio connection.
+
+ Returns:
+ True on success, False on failure, None on DBus error.
+ """
+ return self.proxy().AudioConnect(address)
+
+ @utils.glib_call(False)
+ def audio_disconnect(self, address):
+ """Disconnects the audio connection to the remote device.
+
+ Args:
+ address: The address of the remote device for audio disconnection.
+
+ Returns:
+ True on success, False otherwise.
+ """
+ self.proxy().AudioDisconnect(address)
+ return True
diff --git a/floss/pandora/server/bluetooth.py b/floss/pandora/server/bluetooth.py
index fd2d3d3a6c..41254a9ada 100644
--- a/floss/pandora/server/bluetooth.py
+++ b/floss/pandora/server/bluetooth.py
@@ -27,6 +27,7 @@ from floss.pandora.floss import media_client
from floss.pandora.floss import qa_client
from floss.pandora.floss import scanner_client
from floss.pandora.floss import socket_manager
+from floss.pandora.floss import telephony_client
from floss.pandora.floss import utils
from gi.repository import GLib
import pydbus
@@ -71,6 +72,7 @@ class Bluetooth(object):
self.gatt_client = gatt_client.FlossGattClient(self.bus, self.DEFAULT_ADAPTER)
self.gatt_server = gatt_server.FlossGattServer(self.bus, self.DEFAULT_ADAPTER)
self.socket_manager = socket_manager.FlossSocketManagerClient(self.bus, self.DEFAULT_ADAPTER)
+ self.telephony_client = telephony_client.FlossTelephonyClient(self.bus, self.DEFAULT_ADAPTER)
def __del__(self):
if not self.is_clean:
@@ -147,6 +149,9 @@ class Bluetooth(object):
if not self.socket_manager.register_callbacks():
logging.error('scanner_client: Failed to register callbacks')
return False
+ if not self.telephony_client.register_telephony_callback():
+ logging.error('telephony_client: Failed to register callbacks')
+ return False
return True
def is_bluetoothd_proxy_valid(self):
@@ -161,7 +166,8 @@ class Bluetooth(object):
self.media_client.has_proxy(),
self.gatt_client.has_proxy(),
self.gatt_server.has_proxy(),
- self.socket_manager.has_proxy()
+ self.socket_manager.has_proxy(),
+ self.telephony_client.has_proxy()
])
if not proxy_ready:
@@ -199,6 +205,7 @@ class Bluetooth(object):
self.gatt_client = gatt_client.FlossGattClient(self.bus, default_adapter)
self.gatt_server = gatt_server.FlossGattServer(self.bus, default_adapter)
self.socket_manager = socket_manager.FlossSocketManagerClient(self.bus, default_adapter)
+ self.telephony_client = telephony_client.FlossTelephonyClient(self.bus, default_adapter)
try:
utils.poll_for_condition(
@@ -352,6 +359,45 @@ class Bluetooth(object):
def write_characteristic(self, address, handle, write_type, auth_req, value):
return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value)
+ def set_mps_qualification_enabled(self, enable):
+ return self.telephony_client.set_mps_qualification_enabled(enable)
+
+ def incoming_call(self, number):
+ return self.telephony_client.incoming_call(number)
+
+ def set_phone_ops_enabled(self, enable):
+ return self.telephony_client.set_phone_ops_enabled(enable)
+
+ def dial_call(self, number):
+ return self.telephony_client.dialing_call(number)
+
+ def answer_call(self):
+ return self.telephony_client.answer_call()
+
+ def swap_active_call(self):
+ return self.telephony_client.hold_active_accept_held()
+
+ def set_last_call(self, number=None):
+ return self.telephony_client.set_last_call(number)
+
+ def set_memory_call(self, number=None):
+ return self.telephony_client.set_memory_call(number)
+
+ def get_connected_audio_devices(self):
+ return self.media_client.devices
+
+ def audio_connect(self, address):
+ return self.telephony_client.audio_connect(address)
+
+ def audio_disconnect(self, address):
+ return self.telephony_client.audio_disconnect(address)
+
+ def hangup_call(self):
+ return self.telephony_client.hangup_call()
+
+ def set_battery_level(self, battery_level):
+ return self.telephony_client.set_battery_level(battery_level)
+
def gatt_connect(self, address, is_direct, transport):
return self.gatt_client.connect_client(address, is_direct, transport)
@@ -410,3 +456,6 @@ class Bluetooth(object):
def disconnect_media(self, address):
return self.media_client.disconnect(address)
+
+ def incoming_call(self, number):
+ return self.telephony_client.incoming_call(number)
diff --git a/floss/pandora/server/hfp.py b/floss/pandora/server/hfp.py
new file mode 100644
index 0000000000..59f2e81318
--- /dev/null
+++ b/floss/pandora/server/hfp.py
@@ -0,0 +1,191 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""HFP grpc interface."""
+
+import math
+
+from floss.pandora.floss import utils
+from floss.pandora.server import bluetooth as bluetooth_module
+from google.protobuf import empty_pb2
+import grpc
+from pandora_experimental import hfp_grpc_aio
+from pandora_experimental import hfp_pb2
+
+
+class HFPService(hfp_grpc_aio.HFPServicer):
+ """Service to trigger Bluetooth HFP procedures.
+
+ This class implements the Pandora bluetooth test interfaces,
+ where the meta class definition is automatically generated by the protobuf.
+ The interface definition can be found in:
+ https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/hfp.proto
+ """
+
+ def __init__(self, bluetooth: bluetooth_module.Bluetooth):
+ self.bluetooth = bluetooth
+
+ def enable_phone_for_testing(self):
+ self.bluetooth.set_phone_ops_enabled(True)
+ self.bluetooth.set_mps_qualification_enabled(True)
+
+ async def EnableSlc(self, request: hfp_pb2.EnableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
+ self.enable_phone_for_testing()
+ address = utils.connection_from(request.connection).address
+ self.bluetooth.connect_device(address)
+ return empty_pb2.Empty()
+
+ async def EnableSlcAsHandsfree(self, request: hfp_pb2.EnableSlcAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> empty_pb2.Empty:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details("Method not implemented!") # type: ignore
+ raise NotImplementedError("Method not implemented!")
+
+ async def DisableSlc(self, request: hfp_pb2.DisableSlcRequest, context: grpc.ServicerContext) -> empty_pb2.Empty:
+ address = utils.connection_from(request.connection).address
+ self.bluetooth.disconnect_device(address)
+ return empty_pb2.Empty()
+
+ async def DisableSlcAsHandsfree(self, request: hfp_pb2.DisableSlcAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> empty_pb2.Empty:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details("Method not implemented!") # type: ignore
+ raise NotImplementedError("Method not implemented!")
+
+ async def DeclineCall(self, request: hfp_pb2.DeclineCallRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.DeclineCallResponse:
+ self.enable_phone_for_testing()
+ self.bluetooth.hangup_call()
+ return hfp_pb2.DeclineCallResponse()
+
+ async def DeclineCallAsHandsfree(self, request: hfp_pb2.DeclineCallAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.DeclineCallAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details("Method not implemented!") # type: ignore
+ raise NotImplementedError("Method not implemented!")
+
+ async def SetBatteryLevel(self, request: hfp_pb2.SetBatteryLevelRequest,
+ context: grpc.ServicerContext) -> (empty_pb2.Empty):
+ self.enable_phone_for_testing()
+ if request.battery_percentage > 100 or request.battery_percentage < 0:
+ await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Wrong battery percentage.')
+ self.bluetooth.set_battery_level(math.floor((request.battery_percentage / 100) * 5))
+ return empty_pb2.Empty()
+
+ async def ConnectToAudioAsHandsfree(self, request: hfp_pb2.ConnectToAudioAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.ConnectToAudioAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details("Method not implemented!") # type: ignore
+ raise NotImplementedError("Method not implemented!")
+
+ async def DisconnectFromAudioAsHandsfree(
+ self, request: hfp_pb2.DisconnectFromAudioAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.DisconnectFromAudioAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details("Method not implemented!") # type: ignore
+ raise NotImplementedError("Method not implemented!")
+
+ async def SetVoiceRecognition(self, request: hfp_pb2.SetVoiceRecognitionRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details("Method not implemented!") # type: ignore
+ raise NotImplementedError("Method not implemented!")
+
+ async def SetVoiceRecognitionAsHandsfree(
+ self, request: hfp_pb2.SetVoiceRecognitionAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SetVoiceRecognitionAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details("Method not implemented!") # type: ignore
+ raise NotImplementedError("Method not implemented!")
+
+ async def MakeCall(self, request: hfp_pb2.MakeCallRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.MakeCallResponse:
+ self.enable_phone_for_testing()
+ number = request.number
+ if number is None or len(number) == 0:
+ await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.')
+ call_result = self.bluetooth.dial_call(number)
+ if call_result is None or not call_result:
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to make a call.')
+ return hfp_pb2.MakeCallResponse()
+
+ async def MakeCallAsHandsfree(self, request: hfp_pb2.MakeCallAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.MakeCallAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def AnswerCall(self, request: hfp_pb2.AnswerCallRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.AnswerCallResponse:
+ self.enable_phone_for_testing()
+ answer_result = self.bluetooth.answer_call()
+ if answer_result is None or not answer_result:
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to answer call.')
+ return hfp_pb2.AnswerCallResponse()
+
+ async def AnswerCallAsHandsfree(self, request: hfp_pb2.AnswerCallAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.AnswerCallAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def SetAudioPath(self, request: hfp_pb2.SetAudioPathRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SetAudioPathResponse:
+ self.enable_phone_for_testing()
+ connected_devices = self.bluetooth.get_connected_audio_devices()
+ if len(connected_devices) == 0:
+ await context.abort(grpc.StatusCode.INTERNAL, 'No connected devices.')
+ if request.audio_path == hfp_pb2.AUDIO_PATH_SPEAKERS:
+ self.bluetooth.audio_disconnect(connected_devices[0])
+ elif request.audio_path == hfp_pb2.AUDIO_PATH_HANDSFREE:
+ self.bluetooth.audio_connect(connected_devices[0])
+ return hfp_pb2.SetAudioPathResponse()
+
+ async def SwapActiveCall(self, request: hfp_pb2.SwapActiveCallRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SwapActiveCallResponse:
+ self.enable_phone_for_testing()
+ swap_result = self.bluetooth.swap_active_call()
+ if swap_result is None or not swap_result:
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to swap active call.')
+ return hfp_pb2.SwapActiveCallResponse()
+
+ async def SetInBandRingtone(self, request: hfp_pb2.SetInBandRingtoneRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SetInBandRingtoneResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def ClearCallHistory(self, request: hfp_pb2.ClearCallHistoryRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.ClearCallHistoryResponse:
+ self.enable_phone_for_testing()
+ if not all((self.bluetooth.set_memory_call(), self.bluetooth.set_last_call())):
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear hall history.')
+ return hfp_pb2.ClearCallHistoryResponse()
+
+ async def EndCallAsHandsfree(self, request: hfp_pb2.EndCallAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.EndCallAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def CallTransferAsHandsfree(self, request: hfp_pb2.CallTransferAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.CallTransferAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def SendDtmfFromHandsfree(self, request: hfp_pb2.SendDtmfFromHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SendDtmfFromHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
diff --git a/floss/pandora/server/modem.py b/floss/pandora/server/modem.py
new file mode 100644
index 0000000000..65eaade611
--- /dev/null
+++ b/floss/pandora/server/modem.py
@@ -0,0 +1,54 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Modem grpc interface."""
+
+from floss.pandora.server import bluetooth as bluetooth_module
+import grpc
+from pandora_experimental import modem_grpc_aio
+from pandora_experimental import modem_pb2
+
+
+class Modem(modem_grpc_aio.ModemServicer):
+ """Service to trigger modem procedures.
+
+ This class implements the Pandora bluetooth test interfaces,
+ where the meta class definition is automatically generated by the protobuf.
+ The interface definition can be found in:
+ https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/modem.proto
+ """
+
+ def __init__(self, bluetooth: bluetooth_module.Bluetooth):
+ self.bluetooth = bluetooth
+
+ async def Call(self, request: modem_pb2.CallRequest, context: grpc.ServicerContext) -> modem_pb2.CallResponse:
+ phone_number = request.phone_number
+ if phone_number is None or len(phone_number) == 0:
+ await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.')
+
+ call_result = self.bluetooth.incoming_call(phone_number)
+ if not call_result:
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to receive a call.')
+
+ return modem_pb2.CallResponse()
+
+ async def AnswerCall(self, request: modem_pb2.AnswerCallRequest,
+ context: grpc.ServicerContext) -> modem_pb2.AnswerCallResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def Close(self, request: modem_pb2.CloseRequest, context: grpc.ServicerContext) -> modem_pb2.CloseResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
diff --git a/floss/pandora/server/server.py b/floss/pandora/server/server.py
index 3c58f307fe..426d168558 100644
--- a/floss/pandora/server/server.py
+++ b/floss/pandora/server/server.py
@@ -19,9 +19,11 @@ import logging
from floss.pandora.server import a2dp
from floss.pandora.server import bluetooth as bluetooth_module
from floss.pandora.server import gatt
+from floss.pandora.server import hfp
from floss.pandora.server import hid
from floss.pandora.server import host
from floss.pandora.server import l2cap
+from floss.pandora.server import modem
from floss.pandora.server import rfcomm
from floss.pandora.server import security
import grpc
@@ -29,8 +31,10 @@ from pandora import a2dp_grpc_aio
from pandora import host_grpc_aio
from pandora import security_grpc_aio
from pandora_experimental import gatt_grpc_aio
+from pandora_experimental import hfp_grpc_aio
from pandora_experimental import hid_grpc_aio
from pandora_experimental import l2cap_grpc_aio
+from pandora_experimental import modem_grpc_aio
from pandora_experimental import rfcomm_grpc_aio
@@ -58,6 +62,12 @@ async def serve(port):
gatt_service = gatt.GATTService(bluetooth)
gatt_grpc_aio.add_GATTServicer_to_server(gatt_service, server)
+ modem_service = modem.Modem(bluetooth)
+ modem_grpc_aio.add_ModemServicer_to_server(modem_service, server)
+
+ hfp_service = hfp.HFPService(bluetooth)
+ hfp_grpc_aio.add_HFPServicer_to_server(hfp_service, server)
+
hid_service = hid.HIDService(bluetooth)
hid_grpc_aio.add_HIDServicer_to_server(hid_service, server)