diff options
author | 2023-12-25 10:53:09 +0200 | |
---|---|---|
committer | 2024-04-16 14:31:06 +0800 | |
commit | 9b217496b7bf790cb80e7a13b6987f97d3361bcd (patch) | |
tree | 44089aacf3a20a26f0923d54565b361aee237b4f | |
parent | 8e6a4445b4f32623876235e6f66f386fac1e52b8 (diff) |
Floss: Implement Pandora HFP profile (part1).
Bug: 300954040
Bug: 301036159
Bug: 300954041
Bug: 300954043
Bug: 300954042
Bug: 300942872
Bug: 300942873
Bug: 301000128
Bug: 300954044
Bug: 301000132
Bug: 301000135
Test: mma packages/modules/Bluetooth && pts-bot HFP
Tag: #floss
Flag: EXEMPT floss only changes
Change-Id: I76df131eeb6ccd18d8a2e5b324f76af571c6e805
-rw-r--r-- | floss/pandora/server/bluetooth.py | 33 | ||||
-rw-r--r-- | floss/pandora/server/hfp.py | 118 | ||||
-rw-r--r-- | floss/pandora/server/server.py | 5 |
3 files changed, 156 insertions, 0 deletions
diff --git a/floss/pandora/server/bluetooth.py b/floss/pandora/server/bluetooth.py index 5b4b32d462..1ef1496506 100644 --- a/floss/pandora/server/bluetooth.py +++ b/floss/pandora/server/bluetooth.py @@ -359,6 +359,39 @@ class Bluetooth(object): def write_characteristic(self, address, handle, write_type, auth_req, value): return self.gatt_client.write_characteristic(address, handle, write_type, auth_req, value) + def set_mps_qualification_enabled(self, enable): + return self.telephony_client.set_mps_qualification_enabled(enable) + + def incoming_call(self, number): + return self.telephony_client.incoming_call(number) + + def set_phone_ops_enabled(self, enable): + return self.telephony_client.set_phone_ops_enabled(enable) + + def dial_call(self, number): + return self.telephony_client.dialing_call(number) + + def answer_call(self): + return self.telephony_client.answer_call() + + def swap_active_call(self): + return self.telephony_client.hold_active_accept_held() + + def set_last_call(self, number=None): + return self.telephony_client.set_last_call(number) + + def set_memory_call(self, number=None): + return self.telephony_client.set_memory_call(number) + + def get_connected_audio_devices(self): + return self.media_client.devices + + def audio_connect(self, address): + return self.telephony_client.audio_connect(address) + + def audio_disconnect(self, address): + return self.telephony_client.audio_disconnect(address) + def gatt_connect(self, address, is_direct, transport): return self.gatt_client.connect_client(address, is_direct, transport) diff --git a/floss/pandora/server/hfp.py b/floss/pandora/server/hfp.py new file mode 100644 index 0000000000..d31439c6d3 --- /dev/null +++ b/floss/pandora/server/hfp.py @@ -0,0 +1,118 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""HFP grpc interface.""" + +from floss.pandora.server import bluetooth as bluetooth_module +import grpc +from pandora_experimental import hfp_grpc_aio +from pandora_experimental import hfp_pb2 + + +class HFPService(hfp_grpc_aio.HFPServicer): + """Service to trigger Bluetooth HFP procedures. + + This class implements the Pandora bluetooth test interfaces, + where the meta class definition is automatically generated by the protobuf. + The interface definition can be found in: + https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/hfp.proto + """ + + def __init__(self, bluetooth: bluetooth_module.Bluetooth): + self.bluetooth = bluetooth + + def enable_phone_for_testing(self): + self.bluetooth.set_phone_ops_enabled(True) + self.bluetooth.set_mps_qualification_enabled(True) + + async def MakeCall(self, request: hfp_pb2.MakeCallRequest, + context: grpc.ServicerContext) -> hfp_pb2.MakeCallResponse: + self.enable_phone_for_testing() + number = request.number + if number is None or len(number) == 0: + await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Cannot call empty number.') + call_result = self.bluetooth.dial_call(number) + if call_result is None or not call_result: + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to make a call.') + return hfp_pb2.MakeCallResponse() + + async def MakeCallAsHandsfree(self, request: hfp_pb2.MakeCallAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.MakeCallAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def AnswerCall(self, request: hfp_pb2.AnswerCallRequest, + context: grpc.ServicerContext) -> hfp_pb2.AnswerCallResponse: + self.enable_phone_for_testing() + answer_result = self.bluetooth.answer_call() + if answer_result is None or not answer_result: + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to answer call.') + return hfp_pb2.AnswerCallResponse() + + async def AnswerCallAsHandsfree(self, request: hfp_pb2.AnswerCallAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.AnswerCallAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def SetAudioPath(self, request: hfp_pb2.SetAudioPathRequest, + context: grpc.ServicerContext) -> hfp_pb2.SetAudioPathResponse: + self.enable_phone_for_testing() + connected_devices = self.bluetooth.get_connected_audio_devices() + if len(connected_devices) == 0: + await context.abort(grpc.StatusCode.INTERNAL, 'No connected devices.') + if request.audio_path == hfp_pb2.AUDIO_PATH_SPEAKERS: + self.bluetooth.audio_disconnect(connected_devices[0]) + elif request.audio_path == hfp_pb2.AUDIO_PATH_HANDSFREE: + self.bluetooth.audio_connect(connected_devices[0]) + return hfp_pb2.SetAudioPathResponse() + + async def SwapActiveCall(self, request: hfp_pb2.SwapActiveCallRequest, + context: grpc.ServicerContext) -> hfp_pb2.SwapActiveCallResponse: + self.enable_phone_for_testing() + swap_result = self.bluetooth.swap_active_call() + if swap_result is None or not swap_result: + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to swap active call.') + return hfp_pb2.SwapActiveCallResponse() + + async def SetInBandRingtone(self, request: hfp_pb2.SetInBandRingtoneRequest, + context: grpc.ServicerContext) -> hfp_pb2.SetInBandRingtoneResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def ClearCallHistory(self, request: hfp_pb2.ClearCallHistoryRequest, + context: grpc.ServicerContext) -> hfp_pb2.ClearCallHistoryResponse: + self.enable_phone_for_testing() + if not all((self.bluetooth.set_memory_call(), self.bluetooth.set_last_call())): + await context.abort(grpc.StatusCode.INTERNAL, 'Failed to clear hall history.') + return hfp_pb2.ClearCallHistoryResponse() + + async def EndCallAsHandsfree(self, request: hfp_pb2.EndCallAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.EndCallAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def CallTransferAsHandsfree(self, request: hfp_pb2.CallTransferAsHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.CallTransferAsHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') + + async def SendDtmfFromHandsfree(self, request: hfp_pb2.SendDtmfFromHandsfreeRequest, + context: grpc.ServicerContext) -> hfp_pb2.SendDtmfFromHandsfreeResponse: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + context.set_details('Method not implemented!') # type: ignore + raise NotImplementedError('Method not implemented!') diff --git a/floss/pandora/server/server.py b/floss/pandora/server/server.py index 3be194977f..426d168558 100644 --- a/floss/pandora/server/server.py +++ b/floss/pandora/server/server.py @@ -19,6 +19,7 @@ import logging from floss.pandora.server import a2dp from floss.pandora.server import bluetooth as bluetooth_module from floss.pandora.server import gatt +from floss.pandora.server import hfp from floss.pandora.server import hid from floss.pandora.server import host from floss.pandora.server import l2cap @@ -30,6 +31,7 @@ from pandora import a2dp_grpc_aio from pandora import host_grpc_aio from pandora import security_grpc_aio from pandora_experimental import gatt_grpc_aio +from pandora_experimental import hfp_grpc_aio from pandora_experimental import hid_grpc_aio from pandora_experimental import l2cap_grpc_aio from pandora_experimental import modem_grpc_aio @@ -63,6 +65,9 @@ async def serve(port): modem_service = modem.Modem(bluetooth) modem_grpc_aio.add_ModemServicer_to_server(modem_service, server) + hfp_service = hfp.HFPService(bluetooth) + hfp_grpc_aio.add_HFPServicer_to_server(hfp_service, server) + hid_service = hid.HIDService(bluetooth) hid_grpc_aio.add_HIDServicer_to_server(hid_service, server) |