summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Mohammad Sabri <mohammad.kh.sabri@exalt.ps> 2023-12-25 10:53:09 +0200
committer JohnLai <johnlai@google.com> 2024-04-16 14:31:06 +0800
commit9b217496b7bf790cb80e7a13b6987f97d3361bcd (patch)
tree44089aacf3a20a26f0923d54565b361aee237b4f
parent8e6a4445b4f32623876235e6f66f386fac1e52b8 (diff)
Floss: Implement Pandora HFP profile (part1).
Bug: 300954040 Bug: 301036159 Bug: 300954041 Bug: 300954043 Bug: 300954042 Bug: 300942872 Bug: 300942873 Bug: 301000128 Bug: 300954044 Bug: 301000132 Bug: 301000135 Test: mma packages/modules/Bluetooth && pts-bot HFP Tag: #floss Flag: EXEMPT floss only changes Change-Id: I76df131eeb6ccd18d8a2e5b324f76af571c6e805
-rw-r--r--floss/pandora/server/bluetooth.py33
-rw-r--r--floss/pandora/server/hfp.py118
-rw-r--r--floss/pandora/server/server.py5
3 files changed, 156 insertions, 0 deletions
diff --git a/floss/pandora/server/bluetooth.py b/floss/pandora/server/bluetooth.py
index 5b4b32d462..1ef1496506 100644
--- a/floss/pandora/server/bluetooth.py
+++ b/floss/pandora/server/bluetooth.py
@@ -359,6 +359,39 @@ class Bluetooth(object):
def write_characteristic(self, address, handle, write_type, auth_req, value):
return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value)
+ def set_mps_qualification_enabled(self, enable):
+ return self.telephony_client.set_mps_qualification_enabled(enable)
+
+ def incoming_call(self, number):
+ return self.telephony_client.incoming_call(number)
+
+ def set_phone_ops_enabled(self, enable):
+ return self.telephony_client.set_phone_ops_enabled(enable)
+
+ def dial_call(self, number):
+ return self.telephony_client.dialing_call(number)
+
+ def answer_call(self):
+ return self.telephony_client.answer_call()
+
+ def swap_active_call(self):
+ return self.telephony_client.hold_active_accept_held()
+
+ def set_last_call(self, number=None):
+ return self.telephony_client.set_last_call(number)
+
+ def set_memory_call(self, number=None):
+ return self.telephony_client.set_memory_call(number)
+
+ def get_connected_audio_devices(self):
+ return self.media_client.devices
+
+ def audio_connect(self, address):
+ return self.telephony_client.audio_connect(address)
+
+ def audio_disconnect(self, address):
+ return self.telephony_client.audio_disconnect(address)
+
def gatt_connect(self, address, is_direct, transport):
return self.gatt_client.connect_client(address, is_direct, transport)
diff --git a/floss/pandora/server/hfp.py b/floss/pandora/server/hfp.py
new file mode 100644
index 0000000000..d31439c6d3
--- /dev/null
+++ b/floss/pandora/server/hfp.py
@@ -0,0 +1,118 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""HFP grpc interface."""
+
+from floss.pandora.server import bluetooth as bluetooth_module
+import grpc
+from pandora_experimental import hfp_grpc_aio
+from pandora_experimental import hfp_pb2
+
+
+class HFPService(hfp_grpc_aio.HFPServicer):
+ """Service to trigger Bluetooth HFP procedures.
+
+ This class implements the Pandora bluetooth test interfaces,
+ where the meta class definition is automatically generated by the protobuf.
+ The interface definition can be found in:
+ https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/hfp.proto
+ """
+
+ def __init__(self, bluetooth: bluetooth_module.Bluetooth):
+ self.bluetooth = bluetooth
+
+ def enable_phone_for_testing(self):
+ self.bluetooth.set_phone_ops_enabled(True)
+ self.bluetooth.set_mps_qualification_enabled(True)
+
+ async def MakeCall(self, request: hfp_pb2.MakeCallRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.MakeCallResponse:
+ self.enable_phone_for_testing()
+ number = request.number
+ if number is None or len(number) == 0:
+ await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.')
+ call_result = self.bluetooth.dial_call(number)
+ if call_result is None or not call_result:
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to make a call.')
+ return hfp_pb2.MakeCallResponse()
+
+ async def MakeCallAsHandsfree(self, request: hfp_pb2.MakeCallAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.MakeCallAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def AnswerCall(self, request: hfp_pb2.AnswerCallRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.AnswerCallResponse:
+ self.enable_phone_for_testing()
+ answer_result = self.bluetooth.answer_call()
+ if answer_result is None or not answer_result:
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to answer call.')
+ return hfp_pb2.AnswerCallResponse()
+
+ async def AnswerCallAsHandsfree(self, request: hfp_pb2.AnswerCallAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.AnswerCallAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def SetAudioPath(self, request: hfp_pb2.SetAudioPathRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SetAudioPathResponse:
+ self.enable_phone_for_testing()
+ connected_devices = self.bluetooth.get_connected_audio_devices()
+ if len(connected_devices) == 0:
+ await context.abort(grpc.StatusCode.INTERNAL, 'No connected devices.')
+ if request.audio_path == hfp_pb2.AUDIO_PATH_SPEAKERS:
+ self.bluetooth.audio_disconnect(connected_devices[0])
+ elif request.audio_path == hfp_pb2.AUDIO_PATH_HANDSFREE:
+ self.bluetooth.audio_connect(connected_devices[0])
+ return hfp_pb2.SetAudioPathResponse()
+
+ async def SwapActiveCall(self, request: hfp_pb2.SwapActiveCallRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SwapActiveCallResponse:
+ self.enable_phone_for_testing()
+ swap_result = self.bluetooth.swap_active_call()
+ if swap_result is None or not swap_result:
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to swap active call.')
+ return hfp_pb2.SwapActiveCallResponse()
+
+ async def SetInBandRingtone(self, request: hfp_pb2.SetInBandRingtoneRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SetInBandRingtoneResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def ClearCallHistory(self, request: hfp_pb2.ClearCallHistoryRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.ClearCallHistoryResponse:
+ self.enable_phone_for_testing()
+ if not all((self.bluetooth.set_memory_call(), self.bluetooth.set_last_call())):
+ await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear hall history.')
+ return hfp_pb2.ClearCallHistoryResponse()
+
+ async def EndCallAsHandsfree(self, request: hfp_pb2.EndCallAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.EndCallAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def CallTransferAsHandsfree(self, request: hfp_pb2.CallTransferAsHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.CallTransferAsHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
+
+ async def SendDtmfFromHandsfree(self, request: hfp_pb2.SendDtmfFromHandsfreeRequest,
+ context: grpc.ServicerContext) -> hfp_pb2.SendDtmfFromHandsfreeResponse:
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
+ context.set_details('Method not implemented!') # type: ignore
+ raise NotImplementedError('Method not implemented!')
diff --git a/floss/pandora/server/server.py b/floss/pandora/server/server.py
index 3be194977f..426d168558 100644
--- a/floss/pandora/server/server.py
+++ b/floss/pandora/server/server.py
@@ -19,6 +19,7 @@ import logging
from floss.pandora.server import a2dp
from floss.pandora.server import bluetooth as bluetooth_module
from floss.pandora.server import gatt
+from floss.pandora.server import hfp
from floss.pandora.server import hid
from floss.pandora.server import host
from floss.pandora.server import l2cap
@@ -30,6 +31,7 @@ from pandora import a2dp_grpc_aio
from pandora import host_grpc_aio
from pandora import security_grpc_aio
from pandora_experimental import gatt_grpc_aio
+from pandora_experimental import hfp_grpc_aio
from pandora_experimental import hid_grpc_aio
from pandora_experimental import l2cap_grpc_aio
from pandora_experimental import modem_grpc_aio
@@ -63,6 +65,9 @@ async def serve(port):
modem_service = modem.Modem(bluetooth)
modem_grpc_aio.add_ModemServicer_to_server(modem_service, server)
+ hfp_service = hfp.HFPService(bluetooth)
+ hfp_grpc_aio.add_HFPServicer_to_server(hfp_service, server)
+
hid_service = hid.HIDService(bluetooth)
hid_grpc_aio.add_HIDServicer_to_server(hid_service, server)