diff options
author | 2021-05-21 00:59:23 +0000 | |
---|---|---|
committer | 2021-05-24 18:05:04 +0000 | |
commit | 02cfdb090aae0997ad619b4cb5b5a106dc0061a7 (patch) | |
tree | 51fb94ce8dbbb6344bb1c701d7444895ffa406f6 | |
parent | 85ec1138e4d94f266f8dcdb30e7854ffe992afba (diff) |
Project import generated by Copybara.
Tag: #compatibility
Bug: 189117116
Test: NA
PiperOrigin-RevId: 374987840
Change-Id: I1b512359e184ddc47145aa9339ddf3c75e28a2dc
31 files changed, 6469 insertions, 0 deletions
diff --git a/system/blueberry/controllers/android_bt_target_device.py b/system/blueberry/controllers/android_bt_target_device.py new file mode 100644 index 0000000000..9e7aebf752 --- /dev/null +++ b/system/blueberry/controllers/android_bt_target_device.py @@ -0,0 +1,515 @@ +"""Controller class for an android bt device with git_master-bds-dev build. + +The config for this derived_bt_target_device in mobileharness is: +- name: android_bt_target_device + devices: + - type: MiscTestbedSubDevice + dimensions: + mobly_type: DerivedBtDevice + properties: + ModuleName: android_bt_target_device + ClassName: AndroidBtTargetDevice + Params: + config: + device_id: phone_serial_number + audio_params: + channel: 2 + duration: 50 + music_file: "music.wav" + sample_rate: 44100 +""" + +import logging +import os +import time + +from mobly import asserts +from mobly.controllers.android_device import AndroidDevice +from mobly.signals import ControllerError +# Internal import +from blueberry.utils import bt_constants +from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator +import blueberry.utils.bt_test_utils as btutils + +ADB_FILE = "rec.pcm" +ADB_PATH = "/sdcard/Music/" +WAVE_FILE_TEMPLATE = "recorded_audio_%s.wav" +DEFAULT_WAIT_TIME = 3.0 + +# A MediaBrowserService implemented in the SL4A app to intercept Media keys and +# commands. +BLUETOOTH_SL4A_AUDIO_SRC_MBS = "BluetoothSL4AAudioSrcMBS" + +A2DP_HFP_PROFILES = [ + bt_constants.BluetoothProfile.A2DP_SINK, + bt_constants.BluetoothProfile.HEADSET_CLIENT +] + + +class AndroidBtTargetDevice(object): + """Implements an android device as a hfp and a2dp sink device. + + With git_master-bds-dev build, the android device can act as a bluetooth + hfp and a2dp sink device. + """ + + def __init__(self, config): + """Initializes an android hfp device.""" + logging.info("Initializes the android hfp device") + self.pri_ad = None + self.sec_ad = None + self.serial = config.get("device_id", None) + self.audio_params = config.get("audio_params", None) + + if self.serial: + # self._ad for accessing the device at the end of the test + self._ad = AndroidDevice(self.serial) + self.aud = adb_ui_device.AdbUiDevice(self._ad) + self.pri_ad = AndroidBluetoothDecorator(self._ad) + self.pri_ad.init_setup() + self.pri_ad.sl4a_setup() + self.sl4a = self._ad.services.sl4a + self.mac_address = self.sl4a.bluetoothGetLocalAddress() + + if self.audio_params: + self._initialize_audio_params() + self.avrcp_ready = False + + def __getattr__(self, name): + return getattr(self.pri_ad, name) + + def _disable_profiles(self): + if self.sec_ad is None: + raise MissingBtClientDeviceError("Please provide sec_ad forsetting" + "profiles") + self.set_profiles_policy_off(self.sec_ad, A2DP_HFP_PROFILES) + + def _initialize_audio_params(self): + self.audio_capture_path = os.path.join(self._ad.log_path, "audio_capture") + os.makedirs(self.audio_capture_path) + self.adb_path = os.path.join(ADB_PATH, ADB_FILE) + self.wave_file_template = os.path.join(self.audio_capture_path, + WAVE_FILE_TEMPLATE) + self.wave_file_number = 0 + + def _verify_pri_ad(self): + if not self.pri_ad: + raise ControllerError("No be target device") + + def clean_up(self): + """Resets Bluetooth and stops all services when the device is destroyed.""" + self.deactivate_ble_pairing_mode() + self.factory_reset_bluetooth() + self._ad.services.stop_all() + + def a2dp_sink_connect(self): + """Establishes the hft connection between self.pri_ad and self.sec_ad.""" + self._verify_pri_ad() + connected = self.pri_ad.a2dp_sink_connect(self.sec_ad) + asserts.assert_true( + connected, "The a2dp sink connection between {} and {} failed".format( + self.serial, self.sec_ad.serial)) + self.log.info("The a2dp sink connection between %s and %s succeeded", + self.serial, self.sec_ad.serial) + return True + + def activate_pairing_mode(self): + """Makes the android hfp device discoverable over Bluetooth.""" + self.log.info("Activating the pairing mode of the android target device") + self.pri_ad.activate_pairing_mode() + + def activate_ble_pairing_mode(self): + """Activates BLE pairing mode on an AndroidBtTargetDevice.""" + self.pri_ad.activate_ble_pairing_mode() + + def deactivate_ble_pairing_mode(self): + """Deactivates BLE pairing mode on an AndroidBtTargetDevice.""" + self.pri_ad.deactivate_ble_pairing_mode() + + def add_pri_ad_device(self, pri_ad): + """Adds primary android device as bt target device. + + The primary android device should have been initialized with + android_bluetooth_decorator. + + Args: + pri_ad: the primary android device as bt target device. + """ + self._ad = pri_ad + self.pri_ad = pri_ad + self.sl4a = self._ad.services.sl4a + self.mac_address = self.sl4a.bluetoothGetLocalAddress() + self.log = self.pri_ad.log + self.serial = self.pri_ad.serial + self.log.info( + "Adds primary android device with id %s for the bluetooth" + "connection", pri_ad.serial) + if self.audio_params: + self._initialize_audio_params() + + def add_sec_ad_device(self, sec_ad): + """Adds second android device for bluetooth connection. + + The second android device should have sl4a service acitvated. + + Args: + sec_ad: the second android device for bluetooth connection. + """ + self.log.info( + "Adds second android device with id %s for the bluetooth" + "connection", sec_ad.serial) + self.sec_ad = sec_ad + self.sec_ad_mac_address = self.sec_ad.sl4a.bluetoothGetLocalAddress() + + def answer_phone_call(self): + """Answers an incoming phone call.""" + if not self.is_hfp_connected(): + self.hfp_connect() + # Make sure the device is in ringing state. + if not self.wait_for_call_state( + bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): + raise ControllerError( + "Timed out after %ds waiting for the device %s to be ringing state " + "before anwsering the incoming phone call." % + (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) + self.log.info("Answers the incoming phone call from hf phone %s for %s", + self.mac_address, self.sec_ad_mac_address) + return self.sl4a.bluetoothHfpClientAcceptCall(self.sec_ad_mac_address) + + def call_volume_down(self): + """Lowers the volume.""" + current_volume = self.mbs.getVoiceCallVolume() + if current_volume > 0: + change_volume = current_volume - 1 + self.log.debug("Set voice call volume from %d to %d." % + (current_volume, change_volume)) + self.mbs.setVoiceCallVolume(change_volume) + + def call_volume_up(self): + """Raises the volume.""" + current_volume = self.mbs.getVoiceCallVolume() + if current_volume < self.mbs.getVoiceCallMaxVolume(): + change_volume = current_volume + 1 + self.log.debug("Set voice call volume from %d to %d." % + (current_volume, change_volume)) + self.mbs.setVoiceCallVolume(change_volume) + + def disconnect_all(self): + self._disable_profiles() + + def factory_reset_bluetooth(self): + """Factory resets Bluetooth on the android hfp device.""" + self.log.info("Factory resets Bluetooth on the android target device") + self.pri_ad.factory_reset_bluetooth() + + def get_bluetooth_mac_address(self): + """Gets Bluetooth mac address of this android_bt_device.""" + self.log.info("Getting Bluetooth mac address for AndroidBtTargetDevice.") + mac_address = self.sl4a.bluetoothGetLocalAddress() + self.log.info("Bluetooth mac address of AndroidBtTargetDevice: %s", + mac_address) + return mac_address + + def get_audio_params(self): + """Gets audio params from the android_bt_target_device.""" + return self.audio_params + + def get_new_wave_file_path(self): + """Gets a new wave file path for the audio capture.""" + wave_file_path = self.wave_file_template % self.wave_file_number + while os.path.exists(wave_file_path): + self.wave_file_number += 1 + wave_file_path = self.wave_file_template % self.wave_file_number + return wave_file_path + + def get_unread_messages(self) -> None: + """Gets unread messages from the connected device (MSE).""" + self.sl4a.mapGetUnreadMessages(self.sec_ad_mac_address) + + def hangup_phone_call(self): + """Hangs up an ongoing phone call.""" + if not self.is_hfp_connected(): + self.hfp_connect() + self.log.info("Hangs up the phone call from hf phone %s for %s", + self.mac_address, self.sec_ad_mac_address) + return self.sl4a.bluetoothHfpClientTerminateAllCalls( + self.sec_ad_mac_address) + + def hfp_connect(self): + """Establishes the hft connection between self.pri_ad and self.sec_ad.""" + self._verify_pri_ad() + connected = self.pri_ad.hfp_connect(self.sec_ad) + asserts.assert_true( + connected, "The hfp connection between {} and {} failed".format( + self.serial, self.sec_ad.serial)) + self.log.info("The hfp connection between %s and %s succeed", self.serial, + self.sec_ad.serial) + return connected + + def init_ambs_for_avrcp(self): + """Initializes media browser service for avrcp. + + This is required to be done before running any of the passthrough + commands. + + Steps: + 1. Starts up the AvrcpMediaBrowserService on the A2dp source phone. This + MediaBrowserService is part of the SL4A app. + 2. Switch the playback state to be paused. + 3. Connects a MediaBrowser to the A2dp sink's A2dpMediaBrowserService. + + Returns: + True: if it is avrcp ready after the initialization. + False: if it is still not avrcp ready after the initialization. + + Raises: + Signals.ControllerError: raise if AvrcpMediaBrowserService on the A2dp + source phone fails to be started. + """ + if self.is_avrcp_ready(): + return True + if not self.is_a2dp_sink_connected(): + self.a2dp_sink_connect() + + self.sec_ad.log.info("Starting AvrcpMediaBrowserService") + self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStart() + + time.sleep(DEFAULT_WAIT_TIME) + + # Check if the media session "BluetoothSL4AAudioSrcMBS" is active on sec_ad. + active_sessions = self.sec_ad.sl4a.bluetoothMediaGetActiveMediaSessions() + if BLUETOOTH_SL4A_AUDIO_SRC_MBS not in active_sessions: + raise ControllerError("Failed to start AvrcpMediaBrowserService.") + + self.log.info("Connecting to A2dp media browser service") + self.sl4a.bluetoothMediaConnectToCarMBS() + + # TODO(user) Wait for an event back instead of sleep + time.sleep(DEFAULT_WAIT_TIME) + self.avrcp_ready = True + return self.avrcp_ready + + def is_avrcp_ready(self): + """Checks if the pri_ad and sec_ad are ready for avrcp.""" + self._verify_pri_ad() + if self.avrcp_ready: + return True + active_sessions = self.sl4a.bluetoothMediaGetActiveMediaSessions() + if not active_sessions: + self.log.info("The device is not avrcp ready") + self.avrcp_ready = False + else: + self.log.info("The device is avrcp ready") + self.avrcp_ready = True + return self.avrcp_ready + + def is_hfp_connected(self): + """Checks if the pri_ad and sec_ad are hfp connected.""" + self._verify_pri_ad() + if self.sec_ad is None: + raise MissingBtClientDeviceError("The sec_ad was not added") + return self.sl4a.bluetoothHfpClientGetConnectionStatus( + self.sec_ad_mac_address) + + def is_a2dp_sink_connected(self): + """Checks if the pri_ad and sec_ad are hfp connected.""" + self._verify_pri_ad() + if self.sec_ad is None: + raise MissingBtClientDeviceError("The sec_ad was not added") + return self.sl4a.bluetoothA2dpSinkGetConnectionStatus( + self.sec_ad_mac_address) + + def last_number_dial(self): + """Redials last outgoing phone number.""" + if not self.is_hfp_connected(): + self.hfp_connect() + self.log.info("Redials last number from hf phone %s for %s", + self.mac_address, self.sec_ad_mac_address) + self.sl4a.bluetoothHfpClientDial(self.sec_ad_mac_address, None) + + def map_connect(self): + """Establishes the map connection between self.pri_ad and self.sec_ad.""" + self._verify_pri_ad() + connected = self.pri_ad.map_connect(self.sec_ad) + asserts.assert_true( + connected, "The map connection between {} and {} failed".format( + self.serial, self.sec_ad.serial)) + self.log.info("The map connection between %s and %s succeed", self.serial, + self.sec_ad.serial) + + def map_disconnect(self) -> None: + """Initiates a map disconnection to the connected device. + + Raises: + BluetoothProfileConnectionError: raised if failed to disconnect. + """ + self._verify_pri_ad() + if not self.pri_ad.map_disconnect(self.sec_ad_mac_address): + raise BluetoothProfileConnectionError( + 'Failed to terminate the MAP connection with the device "%s".' % + self.sec_ad_mac_address) + + def pbap_connect(self): + """Establishes the pbap connection between self.pri_ad and self.sec_ad.""" + connected = self.pri_ad.pbap_connect(self.sec_ad) + asserts.assert_true( + connected, "The pbap connection between {} and {} failed".format( + self.serial, self.sec_ad.serial)) + self.log.info("The pbap connection between %s and %s succeed", self.serial, + self.sec_ad.serial) + + def pause(self): + """Sends Avrcp pause command.""" + self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE, self.sec_ad) + + def play(self): + """Sends Avrcp play command.""" + self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY, self.sec_ad) + + def power_on(self): + """Turns the Bluetooth on the android bt garget device.""" + self.log.info("Turns on the bluetooth") + return self.sl4a.bluetoothToggleState(True) + + def power_off(self): + """Turns the Bluetooth off the android bt garget device.""" + self.log.info("Turns off the bluetooth") + return self.sl4a.bluetoothToggleState(False) + + def route_call_audio(self, connect=False): + """Routes call audio during a call.""" + if not self.is_hfp_connected(): + self.hfp_connect() + self.log.info( + "Routes call audio during a call from hf phone %s for %s " + "audio connection %s after routing", self.mac_address, + self.sec_ad_mac_address, connect) + if connect: + self.sl4a.bluetoothHfpClientConnectAudio(self.sec_ad_mac_address) + else: + self.sl4a.bluetoothHfpClientDisconnectAudio(self.sec_ad_mac_address) + + def reject_phone_call(self): + """Rejects an incoming phone call.""" + if not self.is_hfp_connected(): + self.hfp_connect() + # Make sure the device is in ringing state. + if not self.wait_for_call_state( + bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): + raise ControllerError( + "Timed out after %ds waiting for the device %s to be ringing state " + "before rejecting the incoming phone call." % + (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) + self.log.info("Rejects the incoming phone call from hf phone %s for %s", + self.mac_address, self.sec_ad_mac_address) + return self.sl4a.bluetoothHfpClientRejectCall(self.sec_ad_mac_address) + + def set_audio_params(self, audio_params): + """Sets audio params to the android_bt_target_device.""" + self.audio_params = audio_params + + def track_previous(self): + """Sends Avrcp skip prev command.""" + self.send_media_passthrough_cmd( + bt_constants.CMD_MEDIA_SKIP_PREV, self.sec_ad) + + def track_next(self): + """Sends Avrcp skip next command.""" + self.send_media_passthrough_cmd( + bt_constants.CMD_MEDIA_SKIP_NEXT, self.sec_ad) + + def start_audio_capture(self): + """Starts the audio capture over adb.""" + if self.audio_params is None: + raise MissingAudioParamsError("Missing audio params for captureing audio") + if not self.is_a2dp_sink_connected(): + self.a2dp_sink_connect() + cmd = "ap2f --usage 1 --start --duration {} --target {}".format( + self.audio_params["duration"], self.adb_path) + self.log.info("Starts capturing audio with adb shell command %s", cmd) + self.adb.shell(cmd) + + def stop_audio_capture(self): + """Stops the audio capture and stores it in wave file. + + Returns: + File name of the recorded file. + + Raises: + MissingAudioParamsError: when self.audio_params is None + """ + if self.audio_params is None: + raise MissingAudioParamsError("Missing audio params for captureing audio") + if not self.is_a2dp_sink_connected(): + self.a2dp_sink_connect() + adb_pull_args = [self.adb_path, self.audio_capture_path] + self.log.info("start adb -s %s pull %s", self.serial, adb_pull_args) + self._ad.adb.pull(adb_pull_args) + pcm_file_path = os.path.join(self.audio_capture_path, ADB_FILE) + self.log.info("delete the recored file %s", self.adb_path) + self._ad.adb.shell("rm {}".format(self.adb_path)) + wave_file_path = self.get_new_wave_file_path() + self.log.info("convert pcm file %s to wav file %s", pcm_file_path, + wave_file_path) + btutils.convert_pcm_to_wav(pcm_file_path, wave_file_path, self.audio_params) + return wave_file_path + + def stop_all_services(self): + """Stops all services for the pri_ad device.""" + self.log.info("Stops all services on the android bt target device") + self._ad.services.stop_all() + + def stop_ambs_for_avrcp(self): + """Stops media browser service for avrcp.""" + if self.is_avrcp_ready(): + self.log.info("Stops avrcp connection") + self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStop() + self.avrcp_ready = False + + def stop_voice_dial(self): + """Stops voice dial.""" + if not self.is_hfp_connected(): + self.hfp_connect() + self.log.info("Stops voice dial from hf phone %s for %s", self.mac_address, + self.sec_ad_mac_address) + if self.is_hfp_connected(): + self.sl4a.bluetoothHfpClientStopVoiceRecognition( + self.sec_ad_mac_address) + + def take_bug_report(self, + test_name=None, + begin_time=None, + timeout=300, + destination=None): + """Wrapper method to capture bugreport on the android bt target device.""" + self._ad.take_bug_report(test_name, begin_time, timeout, destination) + + def voice_dial(self): + """Triggers voice dial.""" + if not self.is_hfp_connected(): + self.hfp_connect() + self.log.info("Triggers voice dial from hf phone %s for %s", + self.mac_address, self.sec_ad_mac_address) + if self.is_hfp_connected(): + self.sl4a.bluetoothHfpClientStartVoiceRecognition( + self.sec_ad_mac_address) + + def log_type(self): + """Gets the log type of Android bt target device. + + Returns: + A string, the log type of Android bt target device. + """ + return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR + + +class BluetoothProfileConnectionError(Exception): + """Error for Bluetooth Profile connection problems.""" + + +class MissingBtClientDeviceError(Exception): + """Error for missing required bluetooth client device.""" + + +class MissingAudioParamsError(Exception): + """Error for missing the audio params.""" diff --git a/system/blueberry/controllers/bt_stub.py b/system/blueberry/controllers/bt_stub.py new file mode 100644 index 0000000000..0cd024e23b --- /dev/null +++ b/system/blueberry/controllers/bt_stub.py @@ -0,0 +1,294 @@ +"""Bluetooth stub class. + +This controller offers no direct control to any device. It simply prompts the +user to perform a certain action on the device it is standing in for. For use +in test scripts where no controller for the DUT exists. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import six + + +class BtStub(object): + """Stub for when controller class does not exist for a Bluetooth device. + + This class will simulate semi-automation by prompting user to manually + perform actions on the Bluetooth device. + """ + + # Connection Commands + def power_off(self): + """Prompt the user to power off the Bluetooth device.""" + six.moves.input("Power Off Bluetooth device, then press enter.") + + def power_on(self): + """Prompt the user to power on the Bluetooth device.""" + six.moves.input("Power ON Bluetooth device, then press enter.") + + def activate_pairing_mode(self): + """Prompt the user to put the Bluetooth device into pairing mode.""" + six.moves.input("Put Bluetooth device into pairing mode," + "then press enter.") + + def get_bluetooth_mac_address(self): + """Prompt the user to input the Bluetooth MAC address for this device. + + Returns: + mac_address (str): the string received from user input. + """ + mac_address = six.moves.input("Enter BT MAC address, then press enter.") + return mac_address + + def set_device_name(self, device_name): + """Prompt the user to set the device name (Carkit Only). + + Args: + device_name: String of device name to be set. + + Returns: None + """ + six.moves.input("Device name is: %s", device_name) + + def factory_reset_bluetooth(self): + """Prompt the user to factory reset Bluetooth on the device.""" + six.moves.input("Factory reset Bluetooth on the Bluetooth device, " + "then press enter.") + + # A2DP: Bluetooth stereo streaming protocol methods. + def is_audio_playing(self): + """Prompt the user to indicate if the audio is playing. + + Returns: + A Bool, true is audio is playing, false if not. + """ + audio_playing = six.moves.input("Indicate if audio is playing: " + "true/false.") + return bool(audio_playing) + + # AVRCP Commands + def volume_up(self): + """Prompt the user to raise the volume on the Bluetooth device.""" + six.moves.input("Press the Volume Up Button on the Bluetooth device, " + "then press enter.") + + def volume_down(self): + """Prompt the user to lower the volume on the Bluetooth device.""" + six.moves.input("Press the Volume Down Button on the Bluetooth device, " + "then press enter.") + + def track_next(self): + """Prompt the user to skip the track on the Bluetooth device.""" + six.moves.input("Press the Skip Track Button on the Bluetooth device, " + "then press enter.") + + def track_previous(self): + """Prompt the user to rewind the track on the Bluetooth device.""" + six.moves.input("Press the Rewind Track Button on the Bluetooth device, " + "then press enter.") + + def play(self): + """Prompt the user to press play on the Bluetooth device.""" + six.moves.input("Press the Play Button on the Bluetooth device, " + "then press enter.") + + def pause(self): + """Prompt the user to press pause on the Bluetooth device.""" + six.moves.input("Press the Pause Button on the Bluetooth device, " + "then press enter.") + + def repeat(self): + """Prompt the user to set the repeat option on the device.""" + six.moves.input("Press the Repeat Button on the Bluetooth device, " + "then press enter.") + + def fast_forward(self): + """Prompt the user to press the fast forward option/button on the device. + + Returns: None + """ + six.moves.input("Press the Fast Forward Button on the Bluetooth device, " + "then press enter.") + + def rewind(self): + """Prompt the user to press Rewind option on the device. + + Returns: None + """ + six.moves.input("Press the Rewind option on the Bluetooth device, " + "then press enter.") + + # TODO(user): browse_media_content may need more work in terms of input + # params and value(s) returned + def browse_media_content(self, directory=""): + """Prompt the user to enter to the paired device media folders. + + Args: + directory: A path to the directory to browse to. + + Returns: + List - empty + """ + six.moves.input("Navigate to directory: %s", directory) + return [] + + def delete_song(self, file_path=""): + """Prompt the user to delete a song. + + Args: + file_path (optional): A file path to the song to be deleted. + + Returns: None + """ + six.moves.input("Delete a song %s", file_path) + + def shuffle_song(self): + """Prompt the user to shuffle a playlist. + + Returns: None + """ + six.moves.input("Shuffle a playlist") + + # HFP (Hands Free Phone protocol) Commands + def call_volume_up(self): + """Prompt the user to press the volume up button on an active call. + + Returns: None + """ + six.moves.input("Press the volume up button for an active call.") + + def call_volume_down(self): + """Prompt the user to press the volume down button on an active call. + + Returns: None + """ + six.moves.input("Press the volume down button for an active call.") + + def answer_phone_call(self): + """Prompt the user to press the button to answer a phone call.. + + Returns: None + """ + six.moves.input("Press the button to answer the phone call.") + + def hangup_phone_call(self): + """Prompt the user to press the button to hang up on an active phone call. + + Returns: None + """ + six.moves.input("Press the button to hang up on the phone call.") + + def call_contact(self, name): + """Prompt the user to select a contact from the phonebook and call. + + Args: + name: string name of contact to call + + Returns: None + """ + six.moves.input("Select contact, %s, to call.", name) + + def call_number(self, phone_number): + """Prompt the user to dial a phone number and call. + + Args: + phone_number: string of phone number to dial and call + + Returns: None + """ + six.moves.input("Dial phone number and initiate a call. %s", phone_number) + + def swap_call(self): + """Prompt the user to push the button to swap. + + Function swaps between the primary and secondary calls. One call will + be active and the other will be on hold. + + Returns: None + """ + six.moves.input("Press the button to swap calls.") + + def merge_call(self): + """Prompt the user to push the button to merge calls. + + Merges calls between the primary and secondary calls into a conference + call. + + Returns: None + """ + six.moves.input("Press the button to merge calls into a conference call.") + + def hold_call(self): + """Prompt the user to put the primary call on hold. + + Primary call will be on hold, while the secondary call becomes active. + + Returns: None + """ + six.moves.input("Press the hold button to put primary call on hold.") + + def mute_call(self): + """Prompt the user to mute the ongoing active call. + + Returns: None + """ + six.moves.input("Press Mute button on active call.") + + def unmute_call(self): + """Prompt the user to unmute the ongoing active call. + + Returns: None + """ + six.moves.input("Press the Unmute button on an active call.") + + def reject_phone_call(self): + """Prompt the user to reject an incoming call. + + Returns: None + """ + six.moves.input("Press the Reject button to reject an incoming call.") + + def answer_voip_call(self): + """Prompt the user to press the button to answer a VOIP call. + + Returns: None + """ + six.moves.input("Press the Answer button on an incoming VOIP phone call.") + + def hangup_voip_call(self): + """Prompt the user to press the button to hangup on the active VOIP call. + + Returns: None + """ + six.moves.input("Press the hangup button on the active VOIP call.") + + def reject_voip_call(self): + """Prompt the user to press the Reject button on the incoming VOIP call. + + Returns: None + """ + six.moves.input("Press the Reject button on the incoming VOIP call.") + + def voice_dial(self): + """Prompt user to initiate a voice dial from the phone. + + Returns: None + """ + six.moves.input("Initiate a voice dial.") + + def last_number_dial(self): + """Prompt user to iniate a call to the last number dialed. + + Returns: None + """ + six.moves.input("Initiate a call to the last number dialed.") + + # TODO(user): does this method need a input parameter? + def route_call_audio(self): + """Prompt user to route a call from AG to HF, and vice versa. + + Returns: None + """ + six.moves.input("Reroute call audio.") diff --git a/system/blueberry/controllers/derived_bt_device.py b/system/blueberry/controllers/derived_bt_device.py new file mode 100644 index 0000000000..37fafc1c64 --- /dev/null +++ b/system/blueberry/controllers/derived_bt_device.py @@ -0,0 +1,116 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Controller class for a Bluetooth Device. + +This controller will instantiate derived classes from BtDevice and the +module/class specified via strings in configs dictionary. + +The idea is to allow vendors to run blueberry tests with their controller class +through this controller module, eliminating the need to edit the test classes +themselves. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import importlib +import logging +import yaml + +MOBLY_CONTROLLER_CONFIG_NAME = "DerivedBtDevice" +MOBLY_CONTROLLER_CONFIG_MODULE_KEY = "ModuleName" +MOBLY_CONTROLLER_CONFIG_CLASS_KEY = "ClassName" +MOBLY_CONTROLLER_CONFIG_PARAMS_KEY = "Params" + + +def create(configs): + """Creates DerivedBtDevice controller objects. + + For each config dict in configs: + Import desired controller class from config, compose DerivedBtDevice class + from that class and BtDevice, instantiate with params from config. + + Args: + configs (list): A list of dicts, each representing a configuration for a + Bluetooth device. Each dict should be of the format: + {"ModuleName": <name of module in blueberry.controllers>, + "ClassName": <name of class to derive controller from>, + "Params": <kwargs in dict form to instantiate class with>} + + Returns: + A list with DerivedBtDevice objects. + """ + return [_create_bt_device_class(config) for config in configs] + + +def _create_bt_device_class(config): + """Created new device class from associated device controller from config.""" + module = importlib.import_module( + "blueberry.controllers.%s" % + config[MOBLY_CONTROLLER_CONFIG_MODULE_KEY]) + logging.info("Creating DerivedBtDevice from %r", config) + cls = getattr(module, config[MOBLY_CONTROLLER_CONFIG_CLASS_KEY]) + params = yaml.safe_load("%s" % + config.get(MOBLY_CONTROLLER_CONFIG_PARAMS_KEY, {})) + new_class = type(MOBLY_CONTROLLER_CONFIG_NAME, (cls, BtDevice), params) + new_class_inst = new_class(**params) + return new_class_inst + + +def destroy(derived_bt_devices): + """Cleans up DerivedBtDevice objects.""" + for device in derived_bt_devices: + # Execute cleanup if the controller class has the method "clean_up". + if hasattr(device, "clean_up"): + device.clean_up() + del derived_bt_devices + + +class BtDevice(object): + """Base class for all Bluetooth Devices. + + Provides additional necessary functionality for use within blueberry. + """ + + def __init__(self): + """Initializes a derived bt base class.""" + self._user_params = {} + + def setup(self): + """For devices that need extra setup.""" + + def set_user_params(self, params): + """Intended for passing mobly user_params into a derived device class. + + Args: + params: Mobly user params. + """ + self._user_params = params + + def get_user_params(self): + """Return saved user_params. + + Returns: + user_params. + """ + return self._user_params + + def factory_reset_bluetooth(self) -> None: + """Factory resets Bluetooth on an BT Device.""" + raise NotImplementedError + + def activate_pairing_mode(self) -> None: + """Activates pairing mode on an AndroidDevice.""" + raise NotImplementedError diff --git a/system/blueberry/controllers/grpc_bt_sync_mock.py b/system/blueberry/controllers/grpc_bt_sync_mock.py new file mode 100644 index 0000000000..d3e00e8e78 --- /dev/null +++ b/system/blueberry/controllers/grpc_bt_sync_mock.py @@ -0,0 +1,81 @@ +"""A generic gRPC mock device controller. + +Example MH testbed config for Hostside: +- name: GrpcBtSyncStub-1 + devices: + - type: MiscTestbedSubDevice + dimensions: + mobly_type: DerivedBtDevice + properties: + ModuleName: grpc_bt_sync_mock + ClassName: GrpcBtSyncMock + Params: + config: + mac_address: FE:ED:BE:EF:CA:FE + dimensions: + device: GrpcBtSyncStub +""" +import subprocess + +from absl import flags +from absl import logging +import grpc + +# Internal import +from blueberry.grpc.proto import blueberry_device_controller_pb2 +from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc + +FLAGS = flags.FLAGS +flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address') + + +class GrpcBtSyncMock(object): + """Generic GRPC device controller.""" + + def __init__(self, config): + """Initialize GRPC object.""" + super(GrpcBtSyncMock, self).__init__() + self.mac_address = config['mac_address'] + + def __del__(self): + self.server_proc.terminate() + del self.channel_creds + del self.channel + del self.stub + + def setup(self): + """Setup the gRPC server that the sync mock will respond to.""" + server_path = self.get_user_params()['mh_files']['grpc_server'][0] + logging.info('Start gRPC server: %s', server_path) + self.server_proc = subprocess.Popen([server_path], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + bufsize=0) + + self.channel_creds = loas2.loas2_channel_credentials() + self.channel = grpc.secure_channel(FLAGS.server, self.channel_creds) + grpc.channel_ready_future(self.channel).result() + self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub( + self.channel) + + def init_setup(self): + logging.info('init setup TO BE IMPLEMENTED') + + def set_target(self, bt_device): + self._target_device = bt_device + + def pair_and_connect_bluetooth(self, target_mac_address): + """Pair and connect to a peripheral Bluetooth device.""" + request = blueberry_device_controller_pb2.TargetMacAddress( + mac_address=target_mac_address) + try: + response = self.stub.PairAndConnectBluetooth(request) + logging.info('pair and connect bluetooth response: %s', response) + if response.error: + print('error handler TO BE IMPLEMENTED') + else: + return response.pairing_time_sec, response.connection_time_sec + except grpc.RpcError as rpc_error: + print(rpc_error) diff --git a/system/blueberry/controllers/grpc_bt_target_mock.py b/system/blueberry/controllers/grpc_bt_target_mock.py new file mode 100644 index 0000000000..85f6b516ec --- /dev/null +++ b/system/blueberry/controllers/grpc_bt_target_mock.py @@ -0,0 +1,78 @@ +"""gRPC mock target for testing purposes. + +Example MH testbed config for Hostside: +- name: GrpcBtTargetStub-1 + devices: + - type: MiscTestbedSubDevice + dimensions: + mobly_type: DerivedBtDevice + properties: + ModuleName: grpc_bt_target_mock + ClassName: GrpcBtTargetMock + Params: + config: + mac_address: FE:ED:BE:EF:CA:FE + dimensions: + device: GrpcBtTargetStub +""" +import subprocess + +from absl import flags +from absl import logging +import grpc + +# Internal import +from blueberry.grpc.proto import blueberry_device_controller_pb2 +from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc + +FLAGS = flags.FLAGS + + +class GrpcBtTargetMock(object): + """BT Mock Target for testing the GRPC interface.""" + + def __init__(self, config): + """Initialize GRPC object.""" + super(GrpcBtTargetMock, self).__init__() + self.mac_address = config['mac_address'] + + def __del__(self): + self.server_proc.terminate() + del self.channel_creds + del self.channel + del self.stub + + def setup(self): + """Setup the gRPC server that the target mock will respond to.""" + server_path = self.get_user_params()['mh_files']['grpc_server'][0] + logging.info('Start gRPC server: %s', server_path) + self.server_proc = subprocess.Popen([server_path], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + bufsize=0) + + self.channel_creds = loas2.loas2_channel_credentials() + self.channel = grpc.secure_channel(FLAGS.server, self.channel_creds) + grpc.channel_ready_future(self.channel).result() + self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub( + self.channel) + + def activate_pairing_mode(self): + logging.info('activate pairing mode TO BE IMPLEMENTED') + request = blueberry_device_controller_pb2.DiscoverableMode(mode=True) + try: + response = self.stub.SetDiscoverableMode(request) + logging.info('set discoverageble response: %s', response) + return 0 + except grpc.RpcError as rpc_error: + print(rpc_error) + return -1 + + def factory_reset_bluetooth(self): + logging.info('factory reset TO BE IMPLEMENTED') + + def get_bluetooth_mac_address(self): + logging.info('mac_address: %s', self.mac_address) + return self.mac_address diff --git a/system/blueberry/decorators/android_bluetooth_client_decorator.py b/system/blueberry/decorators/android_bluetooth_client_decorator.py new file mode 100644 index 0000000000..f9834ea00f --- /dev/null +++ b/system/blueberry/decorators/android_bluetooth_client_decorator.py @@ -0,0 +1,48 @@ +"""A Bluetooth Client Decorator util for an Android Device. + +This utility allows the user to decorate an device with a custom decorator from +the blueberry/decorators directory. +""" + +from __future__ import absolute_import +from __future__ import division + +from __future__ import print_function + +import importlib +import re +from mobly.controllers.android_device import AndroidDevice + + +def decorate(ad, decorator): + """Utility to decorate an AndroidDevice. + + Args: + ad: Device, must be of type AndroidDevice. + decorator: String, class name of the decorator to use. + Returns: + AndroidDevice object. + """ + + if not isinstance(ad, AndroidDevice): + raise TypeError('Must apply AndroidBluetoothClientDecorator to an ' + 'AndroidDevice') + decorator_module = camel_to_snake(decorator) + module = importlib.import_module( + 'blueberry.decorators.%s' % decorator_module) + cls = getattr(module, decorator) + ad = cls(ad) + + return ad + + +def camel_to_snake(cls_name): + """Utility to convert a class name from camel case to snake case. + + Args: + cls_name: string + Returns: + string + """ + s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', cls_name) + return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() diff --git a/system/blueberry/decorators/android_bluetooth_client_test_decorator.py b/system/blueberry/decorators/android_bluetooth_client_test_decorator.py new file mode 100644 index 0000000000..769579e6fd --- /dev/null +++ b/system/blueberry/decorators/android_bluetooth_client_test_decorator.py @@ -0,0 +1,25 @@ +"""An example Bluetooth Client Decorator. +""" + +from __future__ import absolute_import +from __future__ import division + +from __future__ import print_function + +from mobly.controllers.android_device import AndroidDevice + + +class AndroidBluetoothClientTestDecorator(AndroidDevice): + """A class used to test Blueberry's BT Client Profile decoration.""" + + def __init__(self, ad): + self._ad = ad + if not isinstance(self._ad, AndroidDevice): + raise TypeError('Must apply AndroidBluetoothClientTestDecorator to an ' + 'AndroidDevice') + + def __getattr__(self, name): + return getattr(self._ad, name) + + def test_decoration(self): + return 'I make this device fancy!' diff --git a/system/blueberry/grpc/blueberry_device_controller.py b/system/blueberry/grpc/blueberry_device_controller.py new file mode 100644 index 0000000000..c99a204f0d --- /dev/null +++ b/system/blueberry/grpc/blueberry_device_controller.py @@ -0,0 +1,40 @@ +"""Blueberry gRPC device controller. + +This is a server to act as a mock device for testing the Blueberry gRPC +interface. +""" + +from concurrent import futures +from absl import app +from absl import flags + +import grpc + +# Internal import +from blueberry.grpc import blueberry_device_controller_service +from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc + + +_HOST = '[::]' + +FLAGS = flags.FLAGS +flags.DEFINE_integer('port', 10000, 'port to listen on') +flags.DEFINE_integer('threads', 10, 'number of worker threads in thread pool') + + +def main(unused_argv): + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=FLAGS.threads), + ports=(FLAGS.port,)) # pytype: disable=wrong-keyword-args + servicer = ( + blueberry_device_controller_service.BlueberryDeviceControllerServicer()) + blueberry_device_controller_pb2_grpc.add_BlueberryDeviceControllerServicer_to_server( + servicer, server) + server_creds = loas2.loas2_server_credentials() + server.add_secure_port(f'{_HOST}:{FLAGS.port}', server_creds) + server.start() + server.wait_for_termination() + + +if __name__ == '__main__': + app.run(main) diff --git a/system/blueberry/grpc/blueberry_device_controller_service.py b/system/blueberry/grpc/blueberry_device_controller_service.py new file mode 100644 index 0000000000..a17d7db25b --- /dev/null +++ b/system/blueberry/grpc/blueberry_device_controller_service.py @@ -0,0 +1,37 @@ +"""Blueberry gRPC Mock Service. + +This is simple mock service that is used to verify the implementation of the +Blueberry gRPC device controller interface. +""" + +from blueberry.grpc.proto import blueberry_device_controller_pb2 +from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc + + +class BlueberryDeviceControllerServicer( + blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerServicer): + """A BlueberryTest gRPC server.""" + + def __init__(self, *args, **kwargs): + super(BlueberryDeviceControllerServicer, self).__init__(*args, **kwargs) + self._error = "testing 123" + + def SetDiscoverableMode(self, request, servicer_context): + """Sets the device's discoverable mode. + + Args: + request: a blueberry_test_server_pb2.DiscoverableMode object containing + the "mode" to set the device to. + servicer_context: A grpc.ServicerContext for use during service of the + RPC. + + Returns: + A blueberry_test_server_pb2.DiscoverableResult + """ + return blueberry_device_controller_pb2.DiscoverableResult( + result=True, + error=self._error) + + def PairAndConnectBluetooth(self, request, servicer_context): + return blueberry_device_controller_pb2.PairAndConnectBluetoothResult( + pairing_time_sec=0.1, connection_time_sec=0.2, error=None) diff --git a/system/blueberry/grpc/blueberry_test_client.py b/system/blueberry/grpc/blueberry_test_client.py new file mode 100644 index 0000000000..1fe8eb1b1c --- /dev/null +++ b/system/blueberry/grpc/blueberry_test_client.py @@ -0,0 +1,46 @@ +"""Blueberry Test Client. + +Simple gRPC client to test the Blueberry Mock server. +""" + +from absl import app +from absl import flags + +import grpc + +# Internal import +from blueberry.grpc.proto import blueberry_device_controller_pb2 +from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc + +FLAGS = flags.FLAGS +flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address') + + +def _UpdateDiscoveryMode(stub, request): + try: + print('try SetDiscoverableMode') + response = stub.SetDiscoverableMode(request) + print('complete response') + print(response) + return 0 + except grpc.RpcError as rpc_error: + print(rpc_error) + return -1 + + +def main(unused_argv): + channel_creds = loas2.loas2_channel_credentials() + with grpc.secure_channel(FLAGS.server, channel_creds) as channel: + grpc.channel_ready_future(channel).result() + stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub( + channel) + + print('request grpc') + request = blueberry_device_controller_pb2.DiscoverableMode( + mode=True) + print('Call _UpdateDiscoveryMode') + return _UpdateDiscoveryMode(stub, request) + + +if __name__ == '__main__': + app.run(main) diff --git a/system/blueberry/grpc/proto/blueberry_device_controller.proto b/system/blueberry/grpc/proto/blueberry_device_controller.proto new file mode 100644 index 0000000000..d3f39063aa --- /dev/null +++ b/system/blueberry/grpc/proto/blueberry_device_controller.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package wearables.qa.blueberry.grpc; + +option java_multiple_files = true; +option jspb_use_correct_proto2_semantics = true; + +message DiscoverableMode { + bool mode = 1; // True to set discoverable on, False to set discoverable off. +} + +message DiscoverableResult { + bool result = 1; // True if successful, False if unsuccessful. + string error = 2; // Error message if unsuccessful. +} + +message TargetMacAddress { + string mac_address = 1; // Mac Address of target device. +} + +message PairAndConnectBluetoothResult { + double pairing_time_sec = + 1; // The time it takes in seconds to pair the devices. + double connection_time_sec = + 2; // The time it takes in seconds to connect the devices. + string error = 3; // Error message if unsuccessful. +} + +service BlueberryDeviceController { + // Returns the result from a request to set device to discoverable. + rpc SetDiscoverableMode(DiscoverableMode) returns (DiscoverableResult) {} + + // Returns the result from a request to connect to a target device. + rpc PairAndConnectBluetooth(TargetMacAddress) + returns (PairAndConnectBluetoothResult) {} +} diff --git a/system/blueberry/sample_testbed.yaml b/system/blueberry/sample_testbed.yaml new file mode 100644 index 0000000000..99da384bde --- /dev/null +++ b/system/blueberry/sample_testbed.yaml @@ -0,0 +1,10 @@ +TestBeds: + - Name: SampleTestBed + Controllers: + AndroidDevice: + - serial: 94GAZ00A5C + phone_number: 12341234 + DerivedBtDevice: + - ModuleName: iclever_hb01 + ClassName: IcleverHb01 + Params: '{"config":{"mac_address":"C4:45:67:02:94:F0","fifo_id":"AH06IVJP","arduino_port":"1"}}' diff --git a/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py b/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py new file mode 100644 index 0000000000..9b578be27a --- /dev/null +++ b/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py @@ -0,0 +1,221 @@ +# Lint as: python3 +"""Tests for Bluetooth A2DP audio streamming.""" + +import time + +from mobly import asserts +from mobly import test_runner +from mobly import signals + +from blueberry.controllers import android_bt_target_device +from blueberry.utils import blueberry_base_test +from blueberry.utils import bt_audio_utils +from blueberry.utils import bt_constants +from blueberry.utils import bt_test_utils + +# Number of seconds for A2DP audio check. +A2DP_AUDIO_CHECK_TIMEOUT_SEC = 3 + +# Number of seconds for duration of reference audio. +REFERENCE_AUDIO_DURATION_SEC = 1.0 + +# Default threshold of the mean opinion score (MOS). +MOS_THRESHOLD = 4.5 + +# The audio parameters for creating a sine wave. +AUDIO_PARAMS = { + 'file_path': '/sdcard/Music', + 'frequency': 480, + 'channel': 2, + 'sample_rate': 48000, + 'sample_format': 16, + 'duration_sec': 60 +} + + +class BluetoothA2dpTest(blueberry_base_test.BlueberryBaseTest): + """Test class for Bluetooth A2DP test. + + This test uses a fixed frequency sine wave to be the reference audio, makes a + DUT play this audio and then starts audio capture from a connected Bluetooth + sink device, measures mean opinion score (MOS) of the recorded audio and + compares the MOS with a threshold to detemine the test result. + """ + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothA2dpTest, self).setup_class() + + # Threshold of the MOS to determine a test result. Default value is 4.5. + self.threshold = float(self.user_params.get('mos_threshold', MOS_THRESHOLD)) + + self.phone = self.android_devices[0] + self.phone.init_setup() + self.phone.sl4a_setup() + + # Generates a sine wave to be reference audio in comparison, and push it to + # the phone storage. + self.audio_file_on_device, self.audio_file_on_host = ( + bt_audio_utils.generate_sine_wave_to_device( + self.phone, + AUDIO_PARAMS['file_path'], + AUDIO_PARAMS['frequency'], + AUDIO_PARAMS['channel'], + AUDIO_PARAMS['sample_rate'], + AUDIO_PARAMS['sample_format'], + AUDIO_PARAMS['duration_sec']) + ) + + # Trims the audio to 1 second duration for reference. + self.reference_audio_file = bt_audio_utils.trim_audio( + audio_file=self.audio_file_on_host, + duration_sec=REFERENCE_AUDIO_DURATION_SEC) + + self.derived_bt_device = self.derived_bt_devices[0] + self.derived_bt_device.factory_reset_bluetooth() + self.derived_bt_device.activate_pairing_mode() + self.mac_address = self.derived_bt_device.get_bluetooth_mac_address() + self.phone.pair_and_connect_bluetooth(self.mac_address) + + # Sleep until the connection stabilizes. + time.sleep(3) + + # Adds the phone to be the secondary device in the android-to-android case. + if isinstance(self.derived_bt_device, + android_bt_target_device.AndroidBtTargetDevice): + self.derived_bt_device.add_sec_ad_device(self.phone) + + def assert_a2dp_expected_status(self, is_playing, fail_msg): + """Asserts that A2DP audio is in the expected status. + + Args: + is_playing: bool, True if A2DP audio is playing as expected. + fail_msg: string, a message explaining the details of test failure. + """ + bt_test_utils.wait_until( + timeout_sec=A2DP_AUDIO_CHECK_TIMEOUT_SEC, + condition_func=self.phone.mbs.btIsA2dpPlaying, + func_args=[self.mac_address], + expected_value=is_playing, + exception=signals.TestFailure(fail_msg)) + + def test_play_a2dp_audio(self): + """Test for playing A2DP audio through Bluetooth.""" + + # Plays media audio from the phone. + audio_file_url = 'file://' + self.audio_file_on_device + if not self.phone.sl4a.mediaPlayOpen(audio_file_url): + raise signals.TestError( + 'Failed to open and play "%s" on the phone "%s".' % + (self.audio_file_on_device, self.phone.serial)) + self.phone.sl4a.mediaPlayStart() + + # Starts audio capture for Bluetooth audio stream. + self.derived_bt_device.start_audio_capture() + + # Stops audio capture and generates an recorded audio file. + recorded_audio_file = self.derived_bt_device.stop_audio_capture() + self.phone.sl4a.mediaPlayStop() + self.phone.sl4a.mediaPlayClose() + + # Measures MOS for the recorded audio. + mos = bt_audio_utils.measure_audio_mos(recorded_audio_file, + self.reference_audio_file) + + # Asserts that the measured MOS should be more than the threshold. + asserts.assert_true( + mos >= self.threshold, + 'MOS of the recorded audio "%.3f" is lower than the threshold "%.3f".' % + (mos, self.threshold)) + + def test_resume_a2dp_audio_after_phone_call_ended(self): + """Test for resuming A2DP audio after a phone call ended. + + Tests that A2DP audio can be paused when receiving a incoming phone call, + and resumed after this phone call ended. + """ + # Checks if two android device exist. + if len(self.android_devices) < 2: + raise signals.TestError('This test requires two android devices.') + pri_phone = self.phone + sec_phone = self.android_devices[1] + sec_phone.init_setup() + pri_number = pri_phone.dimensions.get('phone_number') + if not pri_number: + raise signals.TestError('Please set the dimension "phone_number" to the ' + 'primary phone.') + sec_number = sec_phone.dimensions.get('phone_number') + if not sec_number: + raise signals.TestError('Please set the dimension "phone_number" to the ' + 'secondary phone.') + + # Plays media audio from the phone. + audio_file_url = 'file://' + self.audio_file_on_device + if not self.phone.sl4a.mediaPlayOpen(audio_file_url): + raise signals.TestError( + 'Failed to open and play "%s" on the phone "%s".' % + (self.audio_file_on_device, self.phone.serial)) + self.phone.sl4a.mediaPlayStart() + + # Checks if A2DP audio is playing. + self.assert_a2dp_expected_status( + is_playing=True, + fail_msg='A2DP audio is not playing.') + + try: + # Makes a incoming phone call. + sec_phone.sl4a.telecomCallNumber(pri_number) + sec_phone.log.info('Made a phone call to device "%s".' % pri_phone.serial) + pri_phone.log.info('Waiting for the incoming call from device "%s"...' + % sec_phone.serial) + + is_ringing = pri_phone.wait_for_call_state( + bt_constants.CALL_STATE_RINGING, + bt_constants.CALL_STATE_TIMEOUT_SEC) + if not is_ringing: + raise signals.TestError( + 'Timed out after %ds waiting for the incoming call from device ' + '"%s".' % (bt_constants.CALL_STATE_TIMEOUT_SEC, sec_phone.serial)) + + # Checks if A2DP audio is paused. + self.assert_a2dp_expected_status( + is_playing=False, + fail_msg='A2DP audio is not paused when receiving a phone call.') + finally: + # Ends the incoming phone call. + sec_phone.sl4a.telecomEndCall() + sec_phone.log.info('Ended the phone call.') + is_idle = pri_phone.wait_for_call_state( + bt_constants.CALL_STATE_IDLE, + bt_constants.CALL_STATE_TIMEOUT_SEC) + if not is_idle: + raise signals.TestError( + 'Timed out after %ds waiting for the phone call to be ended.' % + bt_constants.CALL_STATE_TIMEOUT_SEC) + + # Checks if A2DP audio is resumed. + self.assert_a2dp_expected_status( + is_playing=True, + fail_msg='A2DP audio is not resumed when the phone call is ended.') + + # Starts audio capture for Bluetooth audio stream. + self.derived_bt_device.start_audio_capture() + + # Stops audio capture and generates an recorded audio file. + recorded_audio_file = self.derived_bt_device.stop_audio_capture() + pri_phone.sl4a.mediaPlayStop() + pri_phone.sl4a.mediaPlayClose() + + # Measures MOS for the recorded audio. + mos = bt_audio_utils.measure_audio_mos(recorded_audio_file, + self.reference_audio_file) + + # Asserts that the measured MOS should be more than the threshold. + asserts.assert_true( + mos >= self.threshold, + 'MOS of the recorded audio "%.3f" is lower than the threshold "%.3f".' % + (mos, self.threshold)) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/audio_capture/bluetooth_audio_capture_test.py b/system/blueberry/tests/audio_capture/bluetooth_audio_capture_test.py new file mode 100644 index 0000000000..74e8705b32 --- /dev/null +++ b/system/blueberry/tests/audio_capture/bluetooth_audio_capture_test.py @@ -0,0 +1,93 @@ +# Lint as: python3 +"""Tests for testing audio capture in android bt target controller. + + location of the controller: + blueberry.controllers.android_bt_target_device + Before the test, the music file should be copied to the location of the + pri_phone. The a2dp sink phone should be with the android build that can + support a2dp sink profile (for example, the git_master-bds-dev). +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import time + +from mobly import asserts +from mobly import test_runner +from blueberry.utils import blueberry_base_test +from blueberry.utils import bt_test_utils + +MUSIC_FILE = '1khz.wav' + + +class BluetoothAudioCaptureTest(blueberry_base_test.BlueberryBaseTest): + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothAudioCaptureTest, self).setup_class() + self.derived_bt_device = self.derived_bt_devices[0] + for device in self.android_devices: + device.init_setup() + device.sl4a_setup() + self.pri_phone = self.android_devices[0] + self.mac_address = self.derived_bt_device.get_bluetooth_mac_address() + self.derived_bt_device.activate_pairing_mode() + self.pri_phone.sl4a.bluetoothDiscoverAndBond(self.mac_address) + self.pri_phone.wait_for_connection_success(self.mac_address) + # Gives more time for the pairing between the pri_phone and the + # derived_bt_device (the android bt target device) + time.sleep(3) + self.derived_bt_device.add_sec_ad_device(self.pri_phone) + self.derived_bt_device.disconnect_all() + self.duration = self.derived_bt_device.audio_params['duration'] + self.recorded_duration = 0 + + def setup_test(self): + """Setup for bluetooth latency test.""" + logging.info('Setup Test for audio capture test') + super(BluetoothAudioCaptureTest, self).setup_test() + asserts.assert_true(self.derived_bt_device.a2dp_sink_connect(), + 'Failed to establish A2dp Sink connection') + + def test_audio_capture(self): + """Tests the audio capture for the android bt target device.""" + + music_file = self.derived_bt_device.audio_params.get( + 'music_file', MUSIC_FILE) + music_file = 'file:///sdcard/Music/{}'.format(music_file) + self.pri_phone.sl4a.mediaPlayOpen(music_file) + self.pri_phone.sl4a.mediaPlaySetLooping() + self.pri_phone.sl4a.mediaPlayStart() + time.sleep(3) + self.pri_phone.log.info(self.pri_phone.sl4a.mediaPlayGetInfo()) + self.derived_bt_device.start_audio_capture() + time.sleep(self.duration) + audio_captured = self.derived_bt_device.stop_audio_capture() + self.pri_phone.sl4a.mediaPlayStop() + self.pri_phone.sl4a.mediaPlayClose() + self.derived_bt_device.log.info('Audio play and record stopped') + self.recorded_duration = bt_test_utils.get_duration_seconds(audio_captured) + self.derived_bt_device.log.info( + 'The capture duration is %s s and the recorded duration is %s s', + self.duration, self.recorded_duration) + + def teardown_class(self): + logging.info('Factory resetting Bluetooth on devices.') + self.pri_phone.factory_reset_bluetooth() + self.derived_bt_device.factory_reset_bluetooth() + super(BluetoothAudioCaptureTest, self).teardown_class() + self.derived_bt_device.stop_all_services() + self.record_data({ + 'Test Name': 'test_audio_capture', + 'sponge_properties': { + 'duration': self.duration, + 'recorded duration': self.recorded_duration + } + }) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py b/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py new file mode 100644 index 0000000000..484fcd9587 --- /dev/null +++ b/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py @@ -0,0 +1,349 @@ +# Lint as: python3 +"""Tests for AVRCP basic functionality.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +from mobly import test_runner +from mobly import signals +from mobly.controllers.android_device_lib import adb +from blueberry.controllers import android_bt_target_device +from blueberry.utils import blueberry_base_test +from blueberry.utils import bt_constants + +# The audio source path of BluetoothMediaPlayback in the SL4A app. +ANDROID_MEDIA_PATH = '/sdcard/Music/test' + +# Timeout for track change and playback state update in second. +MEDIA_UPDATE_TIMEOUT_SEC = 3 + + +class BluetoothAvrcpTest(blueberry_base_test.BlueberryBaseTest): + """Test Class for Bluetooth AVRCP. + + This test requires two or more audio files exist in path "/sdcard/Music/test" + on the primary device, and device controllers need to have the following APIs: + 1. play() + 2. pause() + 3. track_previous() + 4. track_next() + """ + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothAvrcpTest, self).setup_class() + + for device in self.android_devices: + device.init_setup() + device.sl4a_setup() + + # The device which role is AVRCP Target (TG). + self.pri_device = self.android_devices[0] + + if len(self.android_devices) > 1 and not self.derived_bt_devices: + self.derived_bt_device = self.android_devices[1] + else: + self.derived_bt_device = self.derived_bt_devices[0] + + # Check if the derived bt device is android bt target device. + self.is_android_bt_target_device = isinstance( + self.derived_bt_device, android_bt_target_device.AndroidBtTargetDevice) + + # Check if the audio files exist on the primary device. + try: + self.audio_files = self.pri_device.adb.shell( + 'ls %s' % ANDROID_MEDIA_PATH).decode().split('\n')[:-1] + if len(self.audio_files) < 2: + raise signals.TestError( + 'Please push two or more audio files to %s on the primary device ' + '"%s".' % (ANDROID_MEDIA_PATH, self.pri_device.serial)) + except adb.AdbError as error: + if 'No such file or directory' in str(error): + raise signals.TestError( + 'No directory "%s" found on the primary device "%s".' % + (ANDROID_MEDIA_PATH, self.pri_device.serial)) + raise error + + self.mac_address = self.derived_bt_device.get_bluetooth_mac_address() + self.derived_bt_device.activate_pairing_mode() + self.pri_device.set_target(self.derived_bt_device) + self.pri_device.pair_and_connect_bluetooth(self.mac_address) + self.pri_device.allow_extra_permissions() + # Gives more time for the pairing between two devices. + time.sleep(3) + + if self.is_android_bt_target_device: + self.derived_bt_device.add_sec_ad_device(self.pri_device) + + # Starts BluetoothSL4AAudioSrcMBS on the phone. + if self.is_android_bt_target_device: + self.derived_bt_device.init_ambs_for_avrcp() + else: + self.pri_device.sl4a.bluetoothMediaPhoneSL4AMBSStart() + # Waits for BluetoothSL4AAudioSrcMBS to be active. + time.sleep(1) + # Changes the playback state to Playing in order to other Media passthrough + # commands can work. + self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone( + bt_constants.CMD_MEDIA_PLAY) + + # Collects media metadata of all tracks. + self.tracks = [] + for _ in range(len(self.audio_files)): + self.tracks.append(self.pri_device.get_current_track_info()) + self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone( + bt_constants.CMD_MEDIA_SKIP_NEXT) + self.pri_device.log.info('Tracks: %s' % self.tracks) + + # Sets Playback state to Paused as default. + self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone( + bt_constants.CMD_MEDIA_PAUSE) + + def teardown_class(self): + """Teardown class for bluetooth avrcp media play test.""" + super(BluetoothAvrcpTest, self).teardown_class() + # Stops BluetoothSL4AAudioSrcMBS after all test methods finish. + if self.is_android_bt_target_device: + self.derived_bt_device.stop_ambs_for_avrcp() + else: + self.pri_device.sl4a.bluetoothMediaPhoneSL4AMBSStop() + + def teardown_test(self): + """Teardown test for bluetooth avrcp media play test.""" + super(BluetoothAvrcpTest, self).teardown_test() + # Sets Playback state to Paused after a test method finishes. + self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone( + bt_constants.CMD_MEDIA_PAUSE) + + def wait_for_media_info_sync(self): + """Waits for sync Media information between two sides. + + Waits for sync the current playback state and Now playing track info from + the android bt target device to the phone. + """ + # Check if Playback state is sync. + expected_state = self.pri_device.get_current_playback_state() + self.derived_bt_device.verify_playback_state_changed( + expected_state=expected_state, + exception=signals.TestError( + 'Playback state is not equivalent between two sides. ' + '"%s" != "%s"' % + (self.derived_bt_device.get_current_playback_state(), + expected_state))) + + # Check if Now Playing track is sync. + expected_track = self.pri_device.get_current_track_info() + self.derived_bt_device.verify_current_track_changed( + expected_track=expected_track, + exception=signals.TestError( + 'Now Playing track is not equivalent between two sides. ' + '"%s" != "%s"' % + (self.derived_bt_device.get_current_track_info(), expected_track))) + + def execute_media_play_pause_test_logic(self, command_sender, test_command): + """Executes the test logic of the media command "play" or "pause". + + Steps: + 1. Correct the playback state if needed. + 2. Send a media passthrough command. + 3. Verify that the playback state is changed from AVRCP TG and CT. + + Args: + command_sender: a device controller sending the command. + test_command: string, the media passthrough command for testing, either + "play" or "pause". + + Raises: + signals.TestError: raised if the test command is invalid. + """ + # Checks if the test command is valid. + if test_command not in [bt_constants.CMD_MEDIA_PLAY, + bt_constants.CMD_MEDIA_PAUSE]: + raise signals.TestError( + 'Command "%s" is invalid. The test command should be "%s" or "%s".' % + (test_command, bt_constants.CMD_MEDIA_PLAY, + bt_constants.CMD_MEDIA_PAUSE)) + + # Make sure the playback state is playing if testing the command "pause". + if (self.pri_device.get_current_playback_state() != + bt_constants.STATE_PLAYING and + test_command == bt_constants.CMD_MEDIA_PAUSE): + self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone( + bt_constants.CMD_MEDIA_PLAY) + + # Makes sure Media info is the same between two sides. + if self.is_android_bt_target_device: + self.wait_for_media_info_sync() + self.pri_device.log.info( + 'Current playback state: %s' % + self.pri_device.get_current_playback_state()) + + expected_state = None + if test_command == bt_constants.CMD_MEDIA_PLAY: + command_sender.play() + expected_state = bt_constants.STATE_PLAYING + elif test_command == bt_constants.CMD_MEDIA_PAUSE: + command_sender.pause() + expected_state = bt_constants.STATE_PAUSED + + # Verify that the playback state is changed. + self.pri_device.log.info('Expected playback state: %s' % expected_state) + device_check_list = [self.pri_device] + # Check the playback state from the android bt target device. + if self.is_android_bt_target_device: + device_check_list.append(self.derived_bt_device) + for device in device_check_list: + device.verify_playback_state_changed( + expected_state=expected_state, + exception=signals.TestFailure( + 'Playback state is not changed to "%s" from the device "%s". ' + 'Current state: %s' % + (expected_state, device.serial, + device.get_current_playback_state()))) + + def execute_skip_next_prev_test_logic(self, command_sender, test_command): + """Executes the test logic of the media command "skipNext" or "skipPrev". + + Steps: + 1. Correct the Now Playing track if needed. + 2. Send a media passthrough command. + 3. Verify that the Now Playing track is changed from AVRCP TG and CT. + + Args: + command_sender: a device controller sending the command. + test_command: string, the media passthrough command for testing, either + "skipNext" or "skipPrev". + + Raises: + signals.TestError: raised if the test command is invalid. + """ + # Checks if the test command is valid. + if test_command not in [bt_constants.CMD_MEDIA_SKIP_NEXT, + bt_constants.CMD_MEDIA_SKIP_PREV]: + raise signals.TestError( + 'Command "%s" is invalid. The test command should be "%s" or "%s".' % + (test_command, bt_constants.CMD_MEDIA_SKIP_NEXT, + bt_constants.CMD_MEDIA_SKIP_PREV)) + + # Make sure the track index is not 0 if testing the command "skipPrev". + if (self.tracks.index(self.pri_device.get_current_track_info()) == 0 + and test_command == bt_constants.CMD_MEDIA_SKIP_PREV): + self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone( + bt_constants.CMD_MEDIA_SKIP_NEXT) + + # Makes sure Media info is the same between two sides. + if self.is_android_bt_target_device: + self.wait_for_media_info_sync() + current_track = self.pri_device.get_current_track_info() + current_index = self.tracks.index(current_track) + self.pri_device.log.info('Current track: %s' % current_track) + + expected_track = None + if test_command == bt_constants.CMD_MEDIA_SKIP_NEXT: + command_sender.track_next() + # It will return to the first track by skipNext if now playing is the last + # track. + if current_index + 1 == len(self.tracks): + expected_track = self.tracks[0] + else: + expected_track = self.tracks[current_index + 1] + elif test_command == bt_constants.CMD_MEDIA_SKIP_PREV: + command_sender.track_previous() + expected_track = self.tracks[current_index - 1] + + # Verify that the now playing track is changed. + self.pri_device.log.info('Expected track: %s' % expected_track) + device_check_list = [self.pri_device] + # Check the playback state from the android bt target device. + if self.is_android_bt_target_device: + device_check_list.append(self.derived_bt_device) + for device in device_check_list: + device.verify_current_track_changed( + expected_track=expected_track, + exception=signals.TestFailure( + 'Now Playing track is not changed to "%s" from the device "%s". ' + 'Current track: %s' % + (expected_track, device.serial, device.get_current_track_info()))) + + def test_media_pause(self): + """Tests the media pause from AVRCP Controller.""" + self.execute_media_play_pause_test_logic( + command_sender=self.derived_bt_device, + test_command=bt_constants.CMD_MEDIA_PAUSE) + + def test_media_play(self): + """Tests the media play from AVRCP Controller.""" + self.execute_media_play_pause_test_logic( + command_sender=self.derived_bt_device, + test_command=bt_constants.CMD_MEDIA_PLAY) + + def test_media_skip_prev(self): + """Tests the media skip prev from AVRCP Controller.""" + self.execute_skip_next_prev_test_logic( + command_sender=self.derived_bt_device, + test_command=bt_constants.CMD_MEDIA_SKIP_PREV) + + def test_media_skip_next(self): + """Tests the media skip next from AVRCP Controller.""" + self.execute_skip_next_prev_test_logic( + command_sender=self.derived_bt_device, + test_command=bt_constants.CMD_MEDIA_SKIP_NEXT) + + def test_media_pause_from_phone(self): + """Tests the media pause from AVRCP Target. + + Tests that Playback state of AVRCP Controller will be changed to paused when + AVRCP Target sends the command "pause". + """ + if not self.is_android_bt_target_device: + signals.TestError('The test requires an android bt target device.') + + self.execute_media_play_pause_test_logic( + command_sender=self.pri_device, + test_command=bt_constants.CMD_MEDIA_PAUSE) + + def test_media_play_from_phone(self): + """Tests the media play from AVRCP Target. + + Tests that Playback state of AVRCP Controller will be changed to playing + when AVRCP Target sends the command "play". + """ + if not self.is_android_bt_target_device: + signals.TestError('The test requires an android bt target device.') + + self.execute_media_play_pause_test_logic( + command_sender=self.pri_device, + test_command=bt_constants.CMD_MEDIA_PLAY) + + def test_media_skip_prev_from_phone(self): + """Tests the media skip prev from AVRCP Target. + + Tests that Now Playing track of AVRCP Controller will be changed to the + previous track when AVRCP Target sends the command "skipPrev". + """ + if not self.is_android_bt_target_device: + signals.TestError('The test requires an android bt target device.') + + self.execute_skip_next_prev_test_logic( + command_sender=self.pri_device, + test_command=bt_constants.CMD_MEDIA_SKIP_PREV) + + def test_media_skip_next_from_phone(self): + """Tests the media skip next from AVRCP Target. + + Tests that Now Playing track of AVRCP Controller will be changed to the next + track when AVRCP Target sends the command "skipNext". + """ + if not self.is_android_bt_target_device: + signals.TestError('The test requires an android bt target device.') + + self.execute_skip_next_prev_test_logic( + command_sender=self.pri_device, + test_command=bt_constants.CMD_MEDIA_SKIP_NEXT) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/connectivity/bluetooth_connection_test.py b/system/blueberry/tests/connectivity/bluetooth_connection_test.py new file mode 100644 index 0000000000..767af38da6 --- /dev/null +++ b/system/blueberry/tests/connectivity/bluetooth_connection_test.py @@ -0,0 +1,96 @@ +"""Tests for Bluetooth connection with Android device and a Bluetooth device.""" + +import time + +from mobly import test_runner +from blueberry.utils import blueberry_base_test + +# Connection state change sleep time in seconds. +CONNECTION_STATE_CHANGE_SLEEP_SEC = 5 + + +class BluetoothConnectionTest(blueberry_base_test.BlueberryBaseTest): + """Test Class for Bluetooth connection testing. + + Attributes: + primary_device: A primary device under test. + derived_bt_device: A Bluetooth device which is used to connected to the + primary device in the test. + """ + + def setup_class(self): + super().setup_class() + self.primary_device = self.android_devices[0] + self.primary_device.init_setup() + + self.derived_bt_device = self.derived_bt_devices[0] + self.derived_bt_device.factory_reset_bluetooth() + self.mac_address = self.derived_bt_device.get_bluetooth_mac_address() + self.derived_bt_device.activate_pairing_mode() + self.primary_device.pair_and_connect_bluetooth(self.mac_address) + + def setup_test(self): + super().setup_test() + # Checks if A2DP and HSP profiles are connected. + self.wait_for_a2dp_and_hsp_connection_state(connected=True) + # Buffer between tests. + time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC) + + def wait_for_a2dp_and_hsp_connection_state(self, connected): + """Asserts that A2DP and HSP connections are in the expected state. + + Args: + connected: bool, True if the expected state is connected else False. + """ + self.primary_device.wait_for_a2dp_connection_state(self.mac_address, + connected) + self.primary_device.wait_for_hsp_connection_state(self.mac_address, + connected) + + def test_disconnect_and_connect(self): + """Test for DUT disconnecting and then connecting to the remote device.""" + self.primary_device.log.info('Disconnecting the device "%s"...' % + self.mac_address) + self.primary_device.disconnect_bluetooth(self.mac_address) + self.wait_for_a2dp_and_hsp_connection_state(connected=False) + # Buffer time for connection state change. + time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC) + self.primary_device.log.info('Connecting the device "%s"...' % + self.mac_address) + self.primary_device.connect_bluetooth(self.mac_address) + self.wait_for_a2dp_and_hsp_connection_state(connected=True) + + def test_reconnect_when_enabling_bluetooth(self): + """Test for DUT reconnecting to the remote device when Bluetooth enabled.""" + self.primary_device.log.info('Turning off Bluetooth...') + self.primary_device.sl4a.bluetoothToggleState(False) + self.primary_device.wait_for_bluetooth_toggle_state(enabled=False) + self.primary_device.wait_for_disconnection_success(self.mac_address) + time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC) + self.primary_device.log.info('Turning on Bluetooth...') + self.primary_device.sl4a.bluetoothToggleState(True) + self.primary_device.wait_for_bluetooth_toggle_state(enabled=True) + self.primary_device.wait_for_connection_success(self.mac_address) + self.wait_for_a2dp_and_hsp_connection_state(connected=True) + + def test_reconnect_when_connected_device_powered_on(self): + """Test for the remote device reconnecting to DUT. + + Tests that DUT can be disconnected when the remoted device is powerd off, + and then reconnected when the remote device is powered on. + """ + self.primary_device.log.info( + 'The connected device "%s" is being powered off...' % self.mac_address) + self.derived_bt_device.power_off() + self.primary_device.wait_for_disconnection_success(self.mac_address) + self.wait_for_a2dp_and_hsp_connection_state(connected=False) + time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC) + self.derived_bt_device.power_on() + self.primary_device.log.info( + 'The connected device "%s" is being powered on...' % self.mac_address) + self.primary_device.wait_for_connection_success(self.mac_address) + self.wait_for_a2dp_and_hsp_connection_state(connected=True) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/connectivity/bluetooth_latency_test.py b/system/blueberry/tests/connectivity/bluetooth_latency_test.py new file mode 100644 index 0000000000..0facbdc8e7 --- /dev/null +++ b/system/blueberry/tests/connectivity/bluetooth_latency_test.py @@ -0,0 +1,132 @@ +# Lint as: python3 +"""Tests for blueberry.tests.bluetooth.bluetooth_latency.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import math +import random +import string +import time + +from mobly import asserts +from mobly import test_runner +from mobly.signals import TestAbortClass +# Internal import +from blueberry.utils import blueberry_base_test +from blueberry.utils import bt_test_utils +from blueberry.utils import metrics_utils +# Internal import + + +class BluetoothLatencyTest(blueberry_base_test.BlueberryBaseTest): + + @retry.logged_retry_on_exception( + retry_intervals=retry.FuzzedExponentialIntervals( + initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300)) + def _measure_latency(self): + """Measures the latency of data transfer over RFCOMM. + + Sends data from the client device that is read by the server device. + Calculates the latency of the transfer. + + Returns: + The latency of the transfer milliseconds. + """ + + # Generates a random message to transfer + message = (''.join( + random.choice(string.ascii_letters + string.digits) for _ in range(6))) + start_time = time.time() + write_read_successful = bt_test_utils.write_read_verify_data_sl4a( + self.phone, self.derived_bt_device, message, False) + end_time = time.time() + asserts.assert_true(write_read_successful, 'Failed to send/receive message') + return (end_time - start_time) * 1000 + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothLatencyTest, self).setup_class() + if len(self.android_devices) < 2: + raise TestAbortClass( + 'Not enough android phones detected (need at least two)') + self.phone = self.android_devices[0] + self.phone.init_setup() + self.phone.sl4a_setup() + + # We treat the secondary phone as a derived_bt_device in order for the + # generic script to work with this android phone properly. Data will be sent + # from first phone to the second phone. + self.derived_bt_device = self.android_devices[1] + self.derived_bt_device.init_setup() + self.derived_bt_device.sl4a_setup() + self.set_btsnooplogmode_full(self.phone) + self.set_btsnooplogmode_full(self.derived_bt_device) + + self.metrics = ( + metrics_utils.BluetoothMetricLogger( + metrics_pb2.BluetoothDataTestResult())) + self.metrics.add_primary_device_metrics(self.phone) + self.metrics.add_connected_device_metrics(self.derived_bt_device) + + self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM + self.iterations = int(self.user_params.get('iterations', 300)) + logging.info('Running Bluetooth latency test %s times.', self.iterations) + logging.info('Successfully found required devices.') + + def setup_test(self): + """Setup for bluetooth latency test.""" + logging.info('Setup Test for test_bluetooth_latency') + super(BluetoothLatencyTest, self).setup_test() + asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device), + 'Failed to establish RFCOMM connection') + + def test_bluetooth_latency(self): + """Tests the latency for a data transfer over RFCOMM.""" + + metrics = {} + latency_list = [] + + for _ in range(self.iterations): + latency_list.append(self._measure_latency()) + + metrics['data_transfer_protocol'] = self.data_transfer_type + metrics['data_latency_min_millis'] = int(min(latency_list)) + metrics['data_latency_max_millis'] = int(max(latency_list)) + metrics['data_latency_avg_millis'] = int( + math.fsum(latency_list) / float(len(latency_list))) + logging.info('Latency: %s', metrics) + + asserts.assert_true(metrics['data_latency_min_millis'] > 0, + 'Minimum latency must be greater than 0!') + self.metrics.add_test_metrics(metrics) + for metric in metrics: + self.record_data({ + 'Test Name': 'test_bluetooth_latency', + 'sponge_properties': { + metric: metrics[metric], + } + }) + + def teardown_class(self): + logging.info('Factory resetting Bluetooth on devices.') + self.phone.sl4a.bluetoothSocketConnStop() + self.derived_bt_device.sl4a.bluetoothSocketConnStop() + self.phone.factory_reset_bluetooth() + self.derived_bt_device.factory_reset_bluetooth() + super(BluetoothLatencyTest, self).teardown_class() + self.record_data({ + 'Test Name': 'test_bluetooth_latency', + 'sponge_properties': { + 'proto_ascii': + self.metrics.proto_message_to_ascii(), + 'primary_device_build': + self.phone.get_device_info()['android_release_id'] + } + }) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/connectivity/bluetooth_pairing_test.py b/system/blueberry/tests/connectivity/bluetooth_pairing_test.py new file mode 100644 index 0000000000..f3c1770781 --- /dev/null +++ b/system/blueberry/tests/connectivity/bluetooth_pairing_test.py @@ -0,0 +1,81 @@ +"""Tests for blueberry.tests.bluetooth_pairing.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging + +from mobly import test_runner +from blueberry.utils import blueberry_base_test + + +class BluetoothPairingTest(blueberry_base_test.BlueberryBaseTest): + """Test Class for Bluetooth Pairing Test. + + Test will reset the bluetooth settings on the phone and attempt to pair + with the derived_bt_device specified in the configuration file. + """ + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothPairingTest, self).setup_class() + # Adds a use case that derived_bt_device initiates a pairing request to + # primary_device. Enable this case if allow_pairing_reverse is 1. + self.allow_pairing_reverse = int(self.user_params.get( + 'allow_pairing_reverse', 0)) + + if self.android_devices: + self.primary_device = self.android_devices[0] + self.primary_device.init_setup() + self.primary_device.sl4a_setup() + + if len(self.android_devices) > 1 and not self.derived_bt_devices: + # In the case of pairing phone to phone, we need to treat the + # secondary phone as a derived_bt_device in order for the generic script + # to work with this android phone properly. + self.derived_bt_device = self.android_devices[1] + self.derived_bt_device.init_setup() + self.derived_bt_device.sl4a_setup() + else: + self.derived_bt_device = self.derived_bt_devices[0] + self.derived_bt_device.factory_reset_bluetooth() + else: + # In the case of pairing mock to mock, at least 2 derived_bt_device is + # required. The first derived_bt_device is treated as primary_device. + self.primary_device = self.derived_bt_devices[0] + self.primary_device.init_setup() + self.derived_bt_device = self.derived_bt_devices[1] + self.derived_bt_device.factory_reset_bluetooth() + + def setup_test(self): + """Setup for pairing test.""" + logging.info('Setup Test for test_pair_and_bond') + super(BluetoothPairingTest, self).setup_test() + + def test_pair_and_bond(self): + """Test for pairing and bonding a phone with a bluetooth device. + + Initiates pairing from the phone and checks for 20 seconds that + the device is connected. + """ + device_list = [(self.primary_device, self.derived_bt_device)] + if self.allow_pairing_reverse: + device_list.append((self.derived_bt_device, self.primary_device)) + for initiator, receiver in device_list: + # get mac address of device to pair with + mac_address = receiver.get_bluetooth_mac_address() + logging.info('Receiver BT MAC Address: %s', mac_address) + # put device into pairing mode + receiver.activate_pairing_mode() + # initiate pairing from initiator + initiator.set_target(receiver) + initiator.pair_and_connect_bluetooth(mac_address) + if self.allow_pairing_reverse and initiator != self.derived_bt_device: + logging.info('===== Reversing Pairing =====') + # Resets Bluetooth status for two sides. + initiator.factory_reset_bluetooth() + receiver.factory_reset_bluetooth() + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/connectivity/bluetooth_throughput_test.py b/system/blueberry/tests/connectivity/bluetooth_throughput_test.py new file mode 100644 index 0000000000..79ac2e22b9 --- /dev/null +++ b/system/blueberry/tests/connectivity/bluetooth_throughput_test.py @@ -0,0 +1,196 @@ +# Lint as: python3 +"""Tests for blueberry.tests.bluetooth.bluetooth_throughput.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import math + +from mobly import asserts +from mobly import test_runner +from mobly.controllers.android_device_lib.jsonrpc_client_base import ApiError +from mobly.signals import TestAbortClass +# Internal import +from blueberry.utils import blueberry_base_test +from blueberry.utils import metrics_utils +# Internal import + + +class BluetoothThroughputTest(blueberry_base_test.BlueberryBaseTest): + + @retry.logged_retry_on_exception( + retry_intervals=retry.FuzzedExponentialIntervals( + initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300)) + def _measure_throughput(self, num_of_buffers, buffer_size): + """Measures the throughput of a data transfer. + + Sends data from the client device that is read by the server device. + Calculates the throughput for the transfer. + + Args: + num_of_buffers: An integer value designating the number of buffers + to be sent. + buffer_size: An integer value designating the size of each buffer, + in bytes. + + Returns: + The throughput of the transfer in bytes per second. + """ + + # TODO(user): Need to fix throughput send/receive methods + (self.phone.sl4a + .bluetoothConnectionThroughputSend(num_of_buffers, buffer_size)) + + throughput = (self.derived_bt_device.sl4a + .bluetoothConnectionThroughputRead(num_of_buffers, + buffer_size)) + return throughput + + def _throughput_test(self, buffer_size, test_name): + logging.info('throughput test with buffer_size: %d and testname: %s', + buffer_size, test_name) + metrics = {} + throughput_list = [] + num_of_buffers = 1 + for _ in range(self.iterations): + throughput = self._measure_throughput(num_of_buffers, buffer_size) + logging.info('Throughput: %d bytes-per-sec', throughput) + throughput_list.append(throughput) + + metrics['data_transfer_protocol'] = self.data_transfer_type + metrics['data_packet_size'] = buffer_size + metrics['data_throughput_min_bytes_per_second'] = int( + min(throughput_list)) + metrics['data_throughput_max_bytes_per_second'] = int( + max(throughput_list)) + metrics['data_throughput_avg_bytes_per_second'] = int( + math.fsum(throughput_list) / float(len(throughput_list))) + + logging.info('Throughput at large buffer: %s', metrics) + + asserts.assert_true(metrics['data_throughput_min_bytes_per_second'] > 0, + 'Minimum throughput must be greater than 0!') + + self.metrics.add_test_metrics(metrics) + for metric in metrics: + self.record_data({ + 'Test Name': test_name, + 'sponge_properties': { + metric: metrics[metric], + } + }) + self.record_data({ + 'Test Name': test_name, + 'sponge_properties': { + 'proto_ascii': + self.metrics.proto_message_to_ascii(), + 'primary_device_build': + self.phone.get_device_info()['android_release_id'] + } + }) + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothThroughputTest, self).setup_class() + if len(self.android_devices) < 2: + raise TestAbortClass( + 'Not enough android phones detected (need at least two)') + self.phone = self.android_devices[0] + + # We treat the secondary phone as a derived_bt_device in order for the + # generic script to work with this android phone properly. Data will be sent + # from first phone to the second phone. + self.derived_bt_device = self.android_devices[1] + self.phone.init_setup() + self.derived_bt_device.init_setup() + self.phone.sl4a_setup() + self.derived_bt_device.sl4a_setup() + self.set_btsnooplogmode_full(self.phone) + self.set_btsnooplogmode_full(self.derived_bt_device) + + self.metrics = ( + metrics_utils.BluetoothMetricLogger( + metrics_pb2.BluetoothDataTestResult())) + self.metrics.add_primary_device_metrics(self.phone) + self.metrics.add_connected_device_metrics(self.derived_bt_device) + + self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM + self.iterations = int(self.user_params.get('iterations', 300)) + logging.info('Running Bluetooth throughput test %s times.', self.iterations) + logging.info('Successfully found required devices.') + + def setup_test(self): + """Setup for bluetooth latency test.""" + logging.info('Setup Test for test_bluetooth_throughput') + super(BluetoothThroughputTest, self).setup_test() + asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device), + 'Failed to establish RFCOMM connection') + + def test_bluetooth_throughput_large_buffer(self): + """Tests the throughput with large buffer size. + + Tests the throughput over a series of data transfers with large buffer size. + """ + large_buffer_size = 300 + test_name = 'test_bluetooth_throughput_large_buffer' + self._throughput_test(large_buffer_size, test_name) + + def test_bluetooth_throughput_medium_buffer(self): + """Tests the throughput with medium buffer size. + + Tests the throughput over a series of data transfers with medium buffer + size. + """ + medium_buffer_size = 100 + test_name = 'test_bluetooth_throughput_medium_buffer' + self._throughput_test(medium_buffer_size, test_name) + + def test_bluetooth_throughput_small_buffer(self): + """Tests the throughput with small buffer size. + + Tests the throughput over a series of data transfers with small buffer size. + """ + small_buffer_size = 10 + test_name = 'test_bluetooth_throughput_small_buffer' + self._throughput_test(small_buffer_size, test_name) + + def test_maximum_buffer_size(self): + """Calculates the maximum allowed buffer size for one packet.""" + current_buffer_size = 300 + throughput = -1 + num_of_buffers = 1 + while True: + logging.info('Trying buffer size %d', current_buffer_size) + try: + throughput = self._measure_throughput( + num_of_buffers, current_buffer_size) + logging.info('The throughput is %d at buffer size of %d', throughput, + current_buffer_size) + except ApiError: + maximum_buffer_size = current_buffer_size - 1 + logging.info('Max buffer size: %d bytes', maximum_buffer_size) + logging.info('Max throughput: %d bytes-per-second', throughput) + self.record_data({ + 'Test Name': 'test_maximum_buffer_size', + 'sponge_properties': { + 'maximum_buffer_size': maximum_buffer_size + } + }) + return True + current_buffer_size += 1 + + def teardown_test(self): + self.phone.sl4a.bluetoothSocketConnStop() + self.derived_bt_device.sl4a.bluetoothSocketConnStop() + + def teardown_class(self): + self.phone.factory_reset_bluetooth() + self.derived_bt_device.factory_reset_bluetooth() + logging.info('Factory resetting Bluetooth on devices.') + super(BluetoothThroughputTest, self).teardown_class() + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/map/bluetooth_map_test.py b/system/blueberry/tests/map/bluetooth_map_test.py new file mode 100644 index 0000000000..11bee4ed4d --- /dev/null +++ b/system/blueberry/tests/map/bluetooth_map_test.py @@ -0,0 +1,148 @@ +"""Tests for blueberry.map.bluetooth_map.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import queue +import time + +from mobly import test_runner +from mobly import signals +from mobly import utils + +from blueberry.controllers import android_bt_target_device +from blueberry.utils import blueberry_base_test + +_SMS_MSG_EVENT = 'SmsReceived' +_MAP_MSG_EVENT = 'MapMessageReceived' + +_EVENT_TIMEOUT_SEC = 180 +_TEXT_LENGTH = 10 +_TEXT_COUNT = 5 + + +class BluetoothMapTest(blueberry_base_test.BlueberryBaseTest): + """Test Class for Bluetooth MAP Test.""" + + def setup_class(self): + """Standard Mobly setup class.""" + super().setup_class() + for device in self.android_devices: + device.init_setup() + device.sl4a_setup() + + # Primary phone which role is Message Server Equipment (MSE). + self.pri_phone = self.android_devices[0] + self.pri_phone.sl4a.smsStartTrackingIncomingSmsMessage() + self.pri_number = self.pri_phone.dimensions['phone_number'] + + # Secondary phone which is used to send SMS messages to primary phone. + self.sec_phone = self.android_devices[1] + + # Bluetooth carkit which role is Message Client Equipment (MCE). + self.derived_bt_device = self.derived_bt_devices[0] + + mac_address = self.derived_bt_device.get_bluetooth_mac_address() + self.derived_bt_device.activate_pairing_mode() + self.pri_phone.pair_and_connect_bluetooth(mac_address) + # Sleep to make the connection to be steady. + time.sleep(5) + + if isinstance( + self.derived_bt_device, android_bt_target_device.AndroidBtTargetDevice): + # Allow sl4a to receive the intent with ACTION_MESSAGE_RECEIVED. + self.derived_bt_device.adb.shell( + 'pm grant com.googlecode.android_scripting ' + 'android.permission.RECEIVE_SMS') + # Connect derived bt device to primary phone via MAP MCE profile. + self.derived_bt_device.add_sec_ad_device(self.pri_phone) + + def teardown_test(self): + """Standard Mobly teardown test. + + Disconnects the MAP connection after a test completes. + """ + super().teardown_test() + self.derived_bt_device.map_disconnect() + + def _wait_for_message_on_mce(self, text): + """Waits for that MCE gets an event with specific message. + + Args: + text: String, Text of the message. + + Raises: + TestFailure: Raised if timed out. + """ + try: + self.derived_bt_device.ed.wait_for_event( + _MAP_MSG_EVENT, lambda e: e['data'] == text, _EVENT_TIMEOUT_SEC) + self.derived_bt_device.log.info( + 'Successfully got the unread message: %s' % text) + except queue.Empty: + raise signals.TestFailure( + 'Timed out after %ds waiting for "%s" event with the message: %s' % + (_EVENT_TIMEOUT_SEC, _MAP_MSG_EVENT, text)) + + def _wait_for_message_on_mse(self, text): + """Waits for that MSE gets an event with specific message. + + This method is used to make sure that MSE has received the test message. + + Args: + text: String, Text of the message. + + Raises: + TestError: Raised if timed out. + """ + try: + self.pri_phone.ed.wait_for_event( + _SMS_MSG_EVENT, lambda e: e['data']['Text'] == text, + _EVENT_TIMEOUT_SEC) + self.pri_phone.log.info( + 'Successfully received the incoming message: %s' % text) + except queue.Empty: + raise signals.TestError( + 'Timed out after %ds waiting for "%s" event with the message: %s' % + (_EVENT_TIMEOUT_SEC, _SMS_MSG_EVENT, text)) + + def _create_message_on_mse(self, text): + """Creates a new incoming message on MSE. + + Args: + text: String, Text of the message. + """ + self.sec_phone.sl4a.smsSendTextMessage(self.pri_number, text, False) + self._wait_for_message_on_mse(text) + + def test_get_existing_unread_messages(self): + """Test for the feature of getting existing unread messages on MCE. + + Tests MCE can list existing messages of MSE. + """ + text_list = [] + # Creates 5 SMS messages on MSE before establishing connection. + for _ in range(_TEXT_COUNT): + text = utils.rand_ascii_str(_TEXT_LENGTH) + self._create_message_on_mse(text) + text_list.append(text) + self.derived_bt_device.map_connect() + # Gets the unread messages of MSE and checks if they are downloaded + # successfully on MCE. + self.derived_bt_device.get_unread_messages() + for text in text_list: + self._wait_for_message_on_mce(text) + + def test_receive_unread_message(self): + """Test for the feature of receiving unread message on MCE. + + Tests MCE can get an unread message when MSE receives an incoming message. + """ + self.derived_bt_device.map_connect() + text = utils.rand_ascii_str(_TEXT_LENGTH) + self._create_message_on_mse(text) + self._wait_for_message_on_mce(text) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/pan/bluetooth_pan_test.py b/system/blueberry/tests/pan/bluetooth_pan_test.py new file mode 100644 index 0000000000..062da8e480 --- /dev/null +++ b/system/blueberry/tests/pan/bluetooth_pan_test.py @@ -0,0 +1,325 @@ +"""Tests for Bluetooth PAN profile functionalities.""" + +import contextlib +import time + +from mobly import test_runner +from mobly import signals +from mobly.controllers.android_device_lib import jsonrpc_client_base +from blueberry.utils import blueberry_base_test +from blueberry.utils import bt_test_utils + +# Timeout to wait for NAP service connection to be specific state in second. +CONNECTION_TIMEOUT_SECS = 20 + +# Interval time between ping requests in second. +PING_INTERVAL_TIME_SEC = 2 + +# Timeout to wait for ping success in second. +PING_TIMEOUT_SEC = 60 + +# A URL is used to verify internet by ping request. +TEST_URL = 'http://www.google.com' + +# A string representing SIM State is ready. +SIM_STATE_READY = 'READY' + + +class BluetoothPanTest(blueberry_base_test.BlueberryBaseTest): + """Test class for Bluetooth PAN(Personal Area Networking) profile. + + Test internet connection sharing via Bluetooth between two Android devices. + One device which is referred to as NAP(Network Access Point) uses Bluetooth + tethering to share internet connection with another device which is referred + to as PANU(Personal Area Networking User). + """ + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothPanTest, self).setup_class() + # Number of attempts to initiate connection. 5 attempts as default. + self.pan_connect_attempts = self.user_params.get('pan_connect_attempts', 5) + + for device in self.android_devices: + device.init_setup() + device.sl4a_setup() + device.mac_address = device.get_bluetooth_mac_address() + + # Check if the device has inserted a SIM card. + if device.sl4a.telephonyGetSimState() != SIM_STATE_READY: + raise signals.TestError('SIM card is not ready on Device "%s".' % + device.serial) + + self.primary_device = self.android_devices[0] + self.secondary_device = self.android_devices[1] + + def teardown_test(self): + """Standard Mobly teardown test. + + Reset every devices when a test finished. + """ + super(BluetoothPanTest, self).teardown_test() + # Revert debug tags. + for device in self.android_devices: + device.debug_tag = device.serial + device.factory_reset_bluetooth() + + def wait_for_nap_service_connection( + self, + device, + connected_mac_addr, + state_connected=True): + """Waits for NAP service connection to be expected state. + + Args: + device: AndroidDevice, A device is used to check this connection. + connected_mac_addr: String, Bluetooth Mac address is needed to be checked. + state_connected: Bool, NAP service connection is established as expected + if True, else terminated as expected. + + Raises: + TestFailure: Raised if NAP service connection is not expected state. + """ + def is_device_connected(): + """Returns True if connected else False.""" + connected_devices = (device.sl4a. + bluetoothPanGetConnectedDevices()) + # Check if the Bluetooth mac address is in the connected device list. + return connected_mac_addr in [d['address'] for d in connected_devices] + + bt_test_utils.wait_until( + timeout_sec=CONNECTION_TIMEOUT_SECS, + condition_func=is_device_connected, + func_args=[], + expected_value=state_connected, + exception=signals.TestFailure( + 'NAP service connection failed to be %s in %ds.' % + ('established' if state_connected else 'terminated', + CONNECTION_TIMEOUT_SECS))) + + def initiate_nap_service_connection( + self, + initiator_device, + connected_mac_addr): + """Initiates NAP service connection. + + Args: + initiator_device: AndroidDevice, A device intiating connection. + connected_mac_addr: String, Bluetooth Mac address of connected device. + + Raises: + TestFailure: Raised if NAP service connection fails to be established. + """ + count = 0 + for _ in range(self.pan_connect_attempts): + count += 1 + try: + initiator_device.sl4a.bluetoothConnectBonded(connected_mac_addr) + self.wait_for_nap_service_connection( + device=initiator_device, + connected_mac_addr=connected_mac_addr, + state_connected=True) + return + except signals.TestFailure: + if count == self.pan_connect_attempts: + raise signals.TestFailure( + 'NAP service connection still failed to be established ' + 'after retried %d times.' % + self.pan_connect_attempts) + + def terminate_nap_service_connection( + self, + initiator_device, + connected_mac_addr): + """Terminates NAP service connection. + + Args: + initiator_device: AndroidDevice, A device intiating disconnection. + connected_mac_addr: String, Bluetooth Mac address of connected device. + """ + initiator_device.log.info('Terminate NAP service connection.') + initiator_device.sl4a.bluetoothDisconnectConnected(connected_mac_addr) + self.wait_for_nap_service_connection( + device=initiator_device, + connected_mac_addr=connected_mac_addr, + state_connected=False) + + @contextlib.contextmanager + def establish_nap_service_connection(self, nap_device, panu_device): + """Establishes NAP service connection between both Android devices. + + The context is used to form a basic network connection between devices + before executing a test. + + Steps: + 1. Disable Mobile data to avoid internet access on PANU device. + 2. Make sure Mobile data available on NAP device. + 3. Enable Bluetooth from PANU device. + 4. Enable Bluetooth tethering on NAP device. + 5. Initiate a connection from PANU device. + 6. Check if PANU device has internet access via the connection. + + Args: + nap_device: AndroidDevice, A device sharing internet connection via + Bluetooth tethering. + panu_device: AndroidDevice, A device gaining internet access via + Bluetooth tethering. + + Yields: + None, the context just execute a pre procedure for PAN testing. + + Raises: + signals.TestError: raised if a step fails. + signals.TestFailure: raised if PANU device fails to access internet. + """ + nap_device.debug_tag = 'NAP' + panu_device.debug_tag = 'PANU' + try: + # Disable Mobile data to avoid internet access on PANU device. + panu_device.log.info('Disabling Mobile data...') + panu_device.sl4a.setMobileDataEnabled(False) + self.verify_internet( + allow_access=False, + device=panu_device, + exception=signals.TestError( + 'PANU device "%s" still connected to internet when Mobile data ' + 'had been disabled.' % panu_device.serial)) + + # Make sure NAP device has Mobile data for internet sharing. + nap_device.log.info('Enabling Mobile data...') + nap_device.sl4a.setMobileDataEnabled(True) + self.verify_internet( + allow_access=True, + device=nap_device, + exception=signals.TestError( + 'NAP device "%s" did not have internet access when Mobile data ' + 'had been enabled.' % nap_device.serial)) + + # Enable Bluetooth tethering from NAP device. + nap_device.set_bluetooth_tethering(status_enabled=True) + # Wait until Bluetooth tethering stabilizes. This waiting time avoids PANU + # device initiates a connection to NAP device immediately when NAP device + # enables Bluetooth tethering. + time.sleep(5) + + nap_device.activate_pairing_mode() + panu_device.log.info('Pair to NAP device "%s".' % nap_device.serial) + panu_device.pair_and_connect_bluetooth(nap_device.mac_address) + + # Initiate a connection to NAP device. + panu_device.log.info('Initiate a connection to NAP device "%s".' % + nap_device.serial) + self.initiate_nap_service_connection( + initiator_device=panu_device, + connected_mac_addr=nap_device.mac_address) + + # Check if PANU device can access internet via NAP service connection. + self.verify_internet( + allow_access=True, + device=panu_device, + exception=signals.TestFailure( + 'PANU device "%s" failed to access internet via NAP service ' + 'connection.' % panu_device.serial)) + yield + finally: + # Disable Bluetooth tethering from NAP device. + nap_device.set_bluetooth_tethering(status_enabled=False) + panu_device.sl4a.setMobileDataEnabled(True) + + def verify_internet(self, allow_access, device, exception): + """Verifies that internet is in expected state. + + Continuously make ping request to a URL for internet verification. + + Args: + allow_access: Bool, Device can have internet access as expected if True, + else no internet access as expected. + device: AndroidDevice, Device to be check internet state. + exception: Exception, Raised if internet is not in expected state. + """ + device.log.info('Verify that internet %s be used.' % + ('can' if allow_access else 'can not')) + + def http_ping(): + """Returns True if http ping success else False.""" + try: + return bool(device.sl4a.httpPing(TEST_URL)) + except jsonrpc_client_base.ApiError as e: + # ApiError is raised by httpPing() when no internet. + device.log.debug(str(e)) + return False + + bt_test_utils.wait_until( + timeout_sec=PING_TIMEOUT_SEC, + condition_func=http_ping, + func_args=[], + expected_value=allow_access, + exception=exception, + interval_sec=PING_INTERVAL_TIME_SEC) + + def test_gain_internet_and_terminate_nap_connection(self): + """Test that DUT can access internet and terminate NAP service connection. + + In this test case, primary device is PANU and secondary device is NAP. While + a connection has established between both devices, PANU should be able to + use internet and terminate the connection to disable internet access. + + Steps: + 1. Establish NAP service connection between both devices. + 2. Terminal the connection from PANU device. + 3. Verify that PANU device cannot access internet. + """ + with self.establish_nap_service_connection( + nap_device=self.secondary_device, + panu_device=self.primary_device): + + # Terminate the connection from DUT. + self.terminate_nap_service_connection( + initiator_device=self.primary_device, + connected_mac_addr=self.secondary_device.mac_address) + + # Verify that PANU device cannot access internet. + self.verify_internet( + allow_access=False, + device=self.primary_device, + exception=signals.TestFailure( + 'PANU device "%s" can still access internet when it had ' + 'terminated NAP service connection.' % + self.primary_device.serial)) + + def test_share_internet_and_disable_bluetooth_tethering(self): + """Test that DUT can share internet and stop internet sharing. + + In this test case, primary device is NAP and secondary device is PANU. While + a connection has established between both devices, NAP should be able to + share internet and disable Bluetooth thethering to stop internet sharing. + + Steps: + 1. Establish NAP service connection between both devices. + 3. Disable Bluetooth tethering from NAP device. + 4. Verify that PANU device cannot access internet. + """ + with self.establish_nap_service_connection( + nap_device=self.primary_device, + panu_device=self.secondary_device): + + # Disable Bluetooth tethering from DUT and check if the nap connection is + # terminated. + self.primary_device.set_bluetooth_tethering(status_enabled=False) + self.wait_for_nap_service_connection( + device=self.primary_device, + connected_mac_addr=self.secondary_device.mac_address, + state_connected=False) + + # Verify that PANU device cannot access internet. + self.verify_internet( + allow_access=False, + device=self.secondary_device, + exception=signals.TestFailure( + 'PANU device "%s" can still access internet when it had ' + 'terminated NAP service connection.' % + self.secondary_device.serial)) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/pbap/bluetooth_pbap_test.py b/system/blueberry/tests/pbap/bluetooth_pbap_test.py new file mode 100644 index 0000000000..9c7c032f61 --- /dev/null +++ b/system/blueberry/tests/pbap/bluetooth_pbap_test.py @@ -0,0 +1,401 @@ +"""Tests for blueberry.pbap.bluetooth_pbap.""" + +import os +import random +import time + +from mobly import asserts +from mobly import test_runner +from mobly import signals +from mobly import utils + +from mobly.controllers import android_device + +from blueberry.utils import blueberry_base_test +from blueberry.utils import bt_constants +from blueberry.utils import bt_test_utils + +# The path is used to place the created vcf files. +STORAGE_PATH = '/storage/emulated/0' + +# URI for contacts database. +CONTACTS_URI = 'content://com.android.contacts/data/phones' + +# Number of seconds to wait for contacts and call logs update. +WAITING_TIMEOUT_SEC = 60 + +# Number of contacts and call logs to be tested. +TEST_DATA_COUNT = 1000 + +# Permissions for Contacts app. +PERMISSION_LIST = [ + 'android.permission.READ_CONTACTS', + 'android.permission.WRITE_CONTACTS', +] + + +class BluetoothPbapTest(blueberry_base_test.BlueberryBaseTest): + """Test Class for Bluetooth PBAP Test.""" + + def setup_class(self): + """Standard Mobly setup class.""" + super(BluetoothPbapTest, self).setup_class() + + # Bluetooth carkit which role is Phone Book Client Equipment (PCE). + self.derived_bt_device = self.derived_bt_devices[0] + + # Primary phone which role is Phone Book Server Equipment (PSE). + self.pri_phone = self.android_devices[0] + self.pri_phone.init_setup() + self.pri_phone.sl4a_setup() + self.derived_bt_device.add_sec_ad_device(self.pri_phone) + + # Grant the permissions to Contacts app. + for device in [self.pri_phone, self.derived_bt_device]: + required_permissions = PERMISSION_LIST + # App requires READ_EXTERNAL_STORAGE to read contacts if SDK < 30. + if int(device.build_info['build_version_sdk']) < 30: + required_permissions.append('android.permission.READ_EXTERNAL_STORAGE') + for permission in required_permissions: + device.adb.shell('pm grant com.google.android.contacts %s' % permission) + self.pse_mac_address = self.pri_phone.get_bluetooth_mac_address() + mac_address = self.derived_bt_device.get_bluetooth_mac_address() + self.derived_bt_device.activate_pairing_mode() + self.pri_phone.pair_and_connect_bluetooth(mac_address) + # Sleep until the connection stabilizes. + time.sleep(5) + + # Allow permission access for PBAP profile. + self.pri_phone.sl4a.bluetoothChangeProfileAccessPermission( + mac_address, + bt_constants.BluetoothProfile.PBAP.value, + bt_constants.BluetoothAccessLevel.ACCESS_ALLOWED.value) + + def setup_test(self): + super(BluetoothPbapTest, self).setup_test() + # Make sure PBAP is not connected before running tests. + self._terminate_pbap_connection() + + def _import_vcf_to_pse(self, file_name, expected_contact_count): + """Imports the vcf file to PSE.""" + # Open ImportVcardActivity and click "OK" in the pop-up dialog, then + # PickActivity will be launched and browses the existing vcf files. + self.pri_phone.adb.shell( + 'am start com.google.android.contacts/' + 'com.google.android.apps.contacts.vcard.ImportVCardActivity') + self.pri_phone.aud(text='OK').click() + + # Check if the vcf file appears in the PickActivity. + if not self.pri_phone.aud(text=file_name).exists(): + raise android_device.DeviceError( + self.pri_phone, + 'No file name matches "%s" in PickActivity.' % file_name) + + # TODO(user): Remove the check of code name for S build. + if (self.pri_phone.build_info['build_version_codename'] != 'S' and + int(self.pri_phone.build_info['build_version_sdk']) <= 30): + # Since `adb shell input tap` cannot work in PickActivity before R build, + # send TAB and ENETER Key events to select and import the vcf file. + if self.pri_phone.aud(content_desc='Grid view').exists(): + # Switch Grid mode since ENTER Key event cannot work in List mode on + # git_rvc-d2-release branch. + self.pri_phone.aud(content_desc='Grid view').click() + self.pri_phone.aud.send_key_code('KEYCODE_TAB') + self.pri_phone.aud.send_key_code('KEYCODE_ENTER') + else: + self.pri_phone.aud(text=file_name).click() + self.pri_phone.log.info('Importing "%s"...' % file_name) + current_count = self._wait_and_get_contact_count( + self.pri_phone, expected_contact_count, WAITING_TIMEOUT_SEC) + if current_count != expected_contact_count: + raise android_device.DeviceError( + self.pri_phone, + 'Failed to import %d contact(s) within %ds. Actual count: %d' % + (expected_contact_count, WAITING_TIMEOUT_SEC, current_count)) + self.pri_phone.log.info( + 'Successfully added %d contact(s).' % current_count) + + def _generate_contacts_on_pse(self, + num_of_contacts, + first_name=None, + last_name=None, + phone_number=None): + """Generates contacts to be tested on PSE.""" + vcf_file = bt_test_utils.create_vcf_from_vcard( + output_path=self.pri_phone.log_path, + num_of_contacts=num_of_contacts, + first_name=first_name, + last_name=last_name, + phone_number=phone_number) + self.pri_phone.adb.push([vcf_file, STORAGE_PATH]) + # For R build, since the pushed vcf file probably not found when importing + # contacts, do a media scan to recognize the file. + if int(self.pri_phone.build_info['build_version_sdk']) > 29: + self.pri_phone.adb.shell('content call --uri content://media/ --method ' + 'scan_volume --arg external_primary') + file_name = vcf_file.split('/')[-1] + self._import_vcf_to_pse(file_name, num_of_contacts) + self.pri_phone.adb.shell( + 'rm -rf %s' % os.path.join(STORAGE_PATH, file_name)) + + def _generate_call_logs_on_pse(self, call_log_type, num_of_call_logs): + """Generates call logs to be tested on PSE.""" + self.pri_phone.log.info('Putting %d call log(s) which type are "%s"...' % + (num_of_call_logs, call_log_type)) + for _ in range(num_of_call_logs): + self.pri_phone.sl4a.callLogsPut(dict( + type=call_log_type, + number='8809%d' % random.randrange(int(10e8)), + time=int(1000 * float(self.pri_phone.adb.shell('date +%s.%N'))))) + current_count = self._wait_and_get_call_log_count( + self.pri_phone, + call_log_type, + num_of_call_logs, + WAITING_TIMEOUT_SEC) + if current_count != num_of_call_logs: + raise android_device.DeviceError( + self.pri_phone, + 'Failed to generate %d call log(s) within %ds. ' + 'Actual count: %d, Call log type: %s' % + (num_of_call_logs, WAITING_TIMEOUT_SEC, current_count, call_log_type)) + self.pri_phone.log.info( + 'Successfully added %d call log(s).' % current_count) + + def _wait_and_get_contact_count(self, + device, + expected_contact_count, + timeout_sec): + """Waits for contact update for a period time and returns contact count. + + This method should be used when a device imports some new contacts. It can + wait some time for contact update until expectation or timeout and then + return contact count. + + Args: + device: AndroidDevice, Mobly Android controller class. + expected_contact_count: Int, Number of contacts as expected. + timeout_sec: Int, Number of seconds to wait for contact update. + + Returns: + current_count: Int, number of the existing contacts on the device. + """ + start_time = time.time() + end_time = start_time + timeout_sec + current_count = 0 + while time.time() < end_time: + current_count = device.sl4a.contactsGetCount() + if current_count == expected_contact_count: + break + # Interval between attempts to get contacts. + time.sleep(1) + if current_count != expected_contact_count: + device.log.warning( + 'Failed to get expected contact count: %d. ' + 'Actual contact count: %d.' % + (expected_contact_count, current_count)) + return current_count + + def _wait_and_get_call_log_count(self, + device, + call_log_type, + expected_call_log_count, + timeout_sec): + """Waits for call log update for a period time and returns call log count. + + This method should be used when a device adds some new call logs. It can + wait some time for call log update until expectation or timeout and then + return call log count. + + Args: + device: AndroidDevice, Mobly Android controller class. + call_log_type: String, Type of the call logs. + expected_call_log_count: Int, Number of call logs as expected. + timeout_sec: Int, Number of seconds to wait for call log update. + + Returns: + current_count: Int, number of the existing call logs on the device. + """ + start_time = time.time() + end_time = start_time + timeout_sec + current_count = 0 + while time.time() < end_time: + current_count = len(device.sl4a.callLogsGet(call_log_type)) + if current_count == expected_call_log_count: + break + # Interval between attempts to get call logs. + time.sleep(1) + if current_count != expected_call_log_count: + device.log.warning( + 'Failed to get expected call log count: %d. ' + 'Actual call log count: %d.' % + (expected_call_log_count, current_count)) + return current_count + + def _terminate_pbap_connection(self): + status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus( + self.pse_mac_address) + if status == bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED: + return + self.derived_bt_device.log.info('Disconnecting PBAP...') + self.derived_bt_device.sl4a.bluetoothPbapClientDisconnect( + self.pse_mac_address) + # Buffer for the connection status check. + time.sleep(3) + status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus( + self.pse_mac_address) + if status != bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED: + raise signals.TestError('PBAP connection failed to be terminated.') + self.derived_bt_device.log.info('Successfully disconnected PBAP.') + + def test_download_contacts(self): + """Test for the feature of downloading contacts. + + Tests that PCE can download contacts from PSE. + """ + # Make sure no any contacts exist on the devices. + for device in [self.pri_phone, self.derived_bt_device]: + device.sl4a.contactsEraseAll() + + # Add contacts to PSE. + self._generate_contacts_on_pse(TEST_DATA_COUNT) + + # When PCE is connected to PSE, it will download PSE's contacts. + self.derived_bt_device.pbap_connect() + self.derived_bt_device.log.info('Downloading contacts from PSE...') + current_count = self._wait_and_get_contact_count( + self.derived_bt_device, TEST_DATA_COUNT, WAITING_TIMEOUT_SEC) + self.derived_bt_device.log.info( + 'Successfully downloaded %d contact(s).' % current_count) + + asserts.assert_true( + current_count == TEST_DATA_COUNT, + 'PCE failed to download %d contact(s) within %ds, ' + 'actually downloaded %d contact(s).' % + (TEST_DATA_COUNT, WAITING_TIMEOUT_SEC, current_count)) + + def test_download_call_logs(self): + """Test for the feature of downloading call logs. + + Tests that PCE can download incoming/outgoing/missed call logs from PSE. + """ + # Make sure no any call logs exist on the devices. + for device in [self.pri_phone, self.derived_bt_device]: + device.sl4a.callLogsEraseAll() + + call_log_types = [ + bt_constants.INCOMING_CALL_LOG_TYPE, + bt_constants.OUTGOING_CALL_LOG_TYPE, + bt_constants.MISSED_CALL_LOG_TYPE, + ] + for call_log_type in call_log_types: + # Add call logs to PSE. + self._generate_call_logs_on_pse(call_log_type, TEST_DATA_COUNT) + + # When PCE is connected to PSE, it will download PSE's contacts. + self.derived_bt_device.pbap_connect() + self.derived_bt_device.log.info('Downloading call logs...') + + for call_log_type in call_log_types: + current_count = self._wait_and_get_call_log_count( + self.derived_bt_device, + call_log_type, + TEST_DATA_COUNT, + WAITING_TIMEOUT_SEC) + self.derived_bt_device.log.info( + 'Successfully downloaded %d call log(s) which type are "%s".' % + (current_count, call_log_type)) + + asserts.assert_true( + current_count == TEST_DATA_COUNT, + 'PCE failed to download %d call log(s) which type are "%s" within %ds' + ', actually downloaded %d call log(s).' % + (TEST_DATA_COUNT, call_log_type, WAITING_TIMEOUT_SEC, current_count)) + + def test_show_caller_name(self): + """Test for caller name of the incoming phone call is correct on PCE. + + Tests that caller name matches contact name which is downloaded via PBAP. + """ + # Checks if two android devices exist. + if len(self.android_devices) < 2: + raise signals.TestError('This test requires two Android devices.') + primary_phone = self.pri_phone + secondary_phone = self.android_devices[1] + secondary_phone.init_setup() + for phone in [primary_phone, secondary_phone]: + # Checks if SIM state is loaded for every devices. + if not phone.is_sim_state_loaded(): + raise signals.TestError(f'Please insert a SIM Card to the phone ' + f'"{phone.serial}".') + # Checks if phone_number is provided in the support dimensions. + phone.phone_number = phone.dimensions.get('phone_number') + if not phone.phone_number: + raise signals.TestError(f'Please add "phone_number" to support ' + f'dimensions of the phone "{phone.serial}".') + # Make sure no any contacts exist on the devices. + for device in [primary_phone, self.derived_bt_device]: + device.sl4a.contactsEraseAll() + # Generate a contact name randomly. + first_name = utils.rand_ascii_str(4) + last_name = utils.rand_ascii_str(4) + full_name = f'{first_name} {last_name}' + primary_phone.log.info('Creating a contact "%s"...', full_name) + self._generate_contacts_on_pse( + num_of_contacts=1, + first_name=first_name, + last_name=last_name, + phone_number=secondary_phone.phone_number) + self.derived_bt_device.log.info('Connecting to PSE...') + self.derived_bt_device.pbap_connect() + self.derived_bt_device.log.info('Downloading contacts from PSE...') + current_count = self._wait_and_get_contact_count( + device=self.derived_bt_device, + expected_contact_count=1, + timeout_sec=WAITING_TIMEOUT_SEC) + self.derived_bt_device.log.info('Successfully downloaded %d contact(s).', + current_count) + asserts.assert_equal( + first=current_count, + second=1, + msg=f'Failed to download the contact "{full_name}".') + secondary_phone.sl4a.telecomCallNumber(primary_phone.phone_number) + secondary_phone.log.info('Made a phone call to device "%s".', + primary_phone.serial) + primary_phone.log.info('Waiting for the incoming call from device "%s"...', + secondary_phone.serial) + is_ringing = primary_phone.wait_for_call_state( + bt_constants.CALL_STATE_RINGING, + bt_constants.CALL_STATE_TIMEOUT_SEC) + if not is_ringing: + raise signals.TestError( + f'Timed out after {bt_constants.CALL_STATE_TIMEOUT_SEC}s waiting for ' + f'the incoming call from device "{secondary_phone.serial}".') + try: + self.derived_bt_device.aud.open_notification() + target_node_match_dict = { + 'resource_id': 'android:id/line1', + 'child': { + 'resource_id': 'android:id/title' + } + } + hfp_address = primary_phone.get_bluetooth_mac_address() + target_node = self.derived_bt_device.aud( + sibling=target_node_match_dict, + text=f'Incoming call via HFP {hfp_address}') + caller_name = target_node.get_attribute_value('text') + message = (f'Caller name is incorrect. Actual: {caller_name}, ' + f'Correct: {full_name}') + # Asserts that caller name of the incoming phone call is correct in the + # notification bar. + asserts.assert_equal( + first=caller_name, + second=full_name, + msg=message) + finally: + # Recovery actions. + self.derived_bt_device.aud.close_notification() + secondary_phone.sl4a.telecomEndCall() + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py b/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py new file mode 100644 index 0000000000..faa1151c77 --- /dev/null +++ b/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py @@ -0,0 +1,71 @@ +"""Suite collecting bluetooth test classes for acceptance testing.""" + +import logging + +from mobly import test_runner_suite + +from blueberry.tests.a2dp import bluetooth_a2dp_test +from blueberry.tests.avrcp import bluetooth_avrcp_test +from blueberry.tests.connectivity import ble_pairing_test +from blueberry.tests.connectivity import bluetooth_pairing_test +from blueberry.tests.hfp import bluetooth_hfp_test +from blueberry.tests.map import bluetooth_map_test +from blueberry.tests.opp import bluetooth_opp_test +from blueberry.tests.pan import bluetooth_pan_test +from blueberry.tests.pbap import bluetooth_pbap_test + +# Test classes for the Bluetooth acceptance suite. +TEST_CLASSES = [ + bluetooth_pairing_test.BluetoothPairingTest, + ble_pairing_test.BlePairingTest, + bluetooth_a2dp_test.BluetoothA2dpTest, + bluetooth_avrcp_test.BluetoothAvrcpTest, + bluetooth_hfp_test.BluetoothHfpTest, + bluetooth_map_test.BluetoothMapTest, + bluetooth_pbap_test.BluetoothPbapTest, + bluetooth_opp_test.BluetoothOppTest, + bluetooth_pan_test.BluetoothPanTest +] + + +class BluetoothAcceptanceSuite(mobly_g3_suite.BaseSuite): + """Bluetooth Acceptance Suite. + + Usage of Test selector: + Add the parameter "acceptance_test_selector" in the Mobly configuration, it's + value is like "test_method_1,test_method_2,...". If this parameter is not + used, all tests will be running. + """ + + def setup_suite(self, config): + selected_tests = None + selector = config.user_params.get('acceptance_test_selector') + if selector: + selected_tests = selector.split(',') + logging.info('Selected tests: %s', ' '.join(selected_tests)) + # Enable all Bluetooth logging in the first test. + first_test_config = config.copy() + first_test_config.user_params.update({ + 'enable_all_bluetooth_logging': 1, + }) + for index, clazz in enumerate(TEST_CLASSES): + if selected_tests: + matched_tests = None + # Gets the same elements between selected_tests and dir(clazz). + matched_tests = list(set(selected_tests) & set(dir(clazz))) + # Adds the test class if it contains the selected tests. + if matched_tests: + self.add_test_class( + clazz=clazz, + config=first_test_config if index == 0 else config, + tests=matched_tests) + logging.info('Added the tests of "%s": %s', clazz.__name__, + ' '.join(matched_tests)) + else: + self.add_test_class( + clazz=clazz, + config=first_test_config if index == 0 else config) + + +if __name__ == '__main__': + mobly_g3_suite.main() diff --git a/system/blueberry/utils/android_bluetooth_decorator.py b/system/blueberry/utils/android_bluetooth_decorator.py new file mode 100644 index 0000000000..c5708971f0 --- /dev/null +++ b/system/blueberry/utils/android_bluetooth_decorator.py @@ -0,0 +1,1729 @@ +# Lint as: python3 +"""AndroidBluetoothDecorator class. + +This decorator is used for giving an AndroidDevice Bluetooth-specific +functionality. +""" + +import datetime +import logging +import os +import queue +import random +import re +import string +import time +from typing import Dict, Any, Text, Optional, Tuple, Sequence, Union +from mobly import asserts +from mobly import logger as mobly_logger +from mobly import signals +from mobly import utils +from mobly.controllers.android_device import AndroidDevice +from mobly.controllers.android_device_lib import adb +from mobly.controllers.android_device_lib import jsonrpc_client_base +from mobly.controllers.android_device_lib.services import sl4a_service +# Internal import +from blueberry.controllers.derived_bt_device import BtDevice +from blueberry.utils import bt_constants +from blueberry.utils import bt_test_utils +from blueberry.utils.bt_constants import AvrcpEvent +from blueberry.utils.bt_constants import BluetoothConnectionPolicy +from blueberry.utils.bt_constants import BluetoothConnectionStatus +from blueberry.utils.bt_constants import BluetoothProfile +from blueberry.utils.bt_constants import CallLogType +from blueberry.utils.bt_constants import CallState + + +# Map for media passthrough commands and the corresponding events. +MEDIA_CMD_MAP = { + bt_constants.CMD_MEDIA_PAUSE: bt_constants.EVENT_PAUSE_RECEIVED, + bt_constants.CMD_MEDIA_PLAY: bt_constants.EVENT_PLAY_RECEIVED, + bt_constants.CMD_MEDIA_SKIP_PREV: bt_constants.EVENT_SKIP_PREV_RECEIVED, + bt_constants.CMD_MEDIA_SKIP_NEXT: bt_constants.EVENT_SKIP_NEXT_RECEIVED +} + +# Timeout for track change and playback state update in second. +MEDIA_UPDATE_TIMEOUT_SEC = 3 + +# Timeout for the event of Media passthrough commands in second. +MEDIA_EVENT_TIMEOUT_SEC = 1 + +BT_CONNECTION_WAITING_TIME_SECONDS = 10 + +ADB_WAITING_TIME_SECONDS = 1 + +# Common timeout for toggle status in seconds. +COMMON_TIMEOUT_SECONDS = 5 + +# Local constant +_DATETIME_FMT = '%m-%d %H:%M:%S.%f' + +# Interval time between ping requests in second. +PING_INTERVAL_TIME_SEC = 2 + +# Timeout to wait for ping success in second. +PING_TIMEOUT_SEC = 60 + +# A URL is used to verify internet by ping request. +TEST_URL = 'http://www.google.com' + + +class DiscoveryError(signals.ControllerError): + """Exception raised for Bluetooth device discovery failures.""" + pass + + +class AndroidBluetoothDecorator(AndroidDevice): + """Decorates an AndroidDevice with Bluetooth-specific functionality.""" + + def __init__(self, ad: AndroidDevice): + self._ad = ad + self._user_params = None + if not self._ad or not isinstance(self._ad, AndroidDevice): + raise TypeError('Must apply AndroidBluetoothDecorator to an ' + 'AndroidDevice') + self.ble_advertise_callback = None + self.regex_logcat_time = re.compile( + r'(?P<datetime>[\d]{2}-[\d]{2} [\d]{2}:[\d]{2}:[\d]{2}.[\d]{3})' + r'[ ]+\d+.*') + self._regex_bt_crash = re.compile( + r'Bluetooth crashed (?P<num_bt_crashes>\d+) times') + + def __getattr__(self, name: Any) -> Any: + return getattr(self._ad, name) + + def _is_device_connected(self, mac_address): + """Wrapper method to help with unit testability of this class.""" + return self._ad.sl4a.bluetoothIsDeviceConnected(mac_address) + + def _is_profile_connected(self, mac_address, profile): + """Checks if the profile is connected.""" + status = None + pri_ad = self._ad + if profile == BluetoothProfile.HEADSET_CLIENT: + status = pri_ad.sl4a.bluetoothHfpClientGetConnectionStatus(mac_address) + elif profile == BluetoothProfile.A2DP_SINK: + status = pri_ad.sl4a.bluetoothA2dpSinkGetConnectionStatus(mac_address) + elif profile == BluetoothProfile.PBAP_CLIENT: + status = pri_ad.sl4a.bluetoothPbapClientGetConnectionStatus(mac_address) + elif profile == BluetoothProfile.MAP_MCE: + connected_devices = self._ad.sl4a.bluetoothMapClientGetConnectedDevices() + return any( + mac_address in device['address'] for device in connected_devices) + else: + pri_ad.log.warning( + 'The connection check for profile %s is not supported ' + 'yet', profile) + return False + return status == BluetoothConnectionStatus.STATE_CONNECTED + + def _get_bluetooth_le_state(self): + """Wrapper method to help with unit testability of this class.""" + return self._ad.sl4a.bluetoothGetLeState + + def _generate_id_by_size(self, size): + """Generate string of random ascii letters and digits. + + Args: + size: required size of string. + + Returns: + String of random chars. + """ + return ''.join( + random.choice(string.ascii_letters + string.digits) + for _ in range(size)) + + def _wait_for_bluetooth_manager_state(self, + state=None, + timeout=10, + threshold=5): + """Waits for Bluetooth normalized state or normalized explicit state. + + Args: + state: expected Bluetooth state + timeout: max timeout threshold + threshold: list len of bt state + Returns: + True if successful, false if unsuccessful. + """ + all_states = [] + start_time = time.time() + while time.time() < start_time + timeout: + all_states.append(self._get_bluetooth_le_state()) + if len(all_states) >= threshold: + # for any normalized state + if state is None: + if len(all_states[-threshold:]) == 1: + logging.info('State normalized %s', all_states[-threshold:]) + return True + else: + # explicit check against normalized state + if state in all_states[-threshold:]: + return True + time.sleep(0.5) + logging.error( + 'Bluetooth state fails to normalize' if state is None else + 'Failed to match bluetooth state, current state {} expected state {}' + .format(self._get_bluetooth_le_state(), state)) + return False + + def init_setup(self) -> None: + """Sets up android device for bluetooth tests.""" + self._ad.services.register('sl4a', sl4a_service.Sl4aService) + self._ad.load_snippet('mbs', 'com.google.android.mobly.snippet.bundled') + self._ad.adb.shell('setenforce 0') + + # Adds 2 seconds waiting time to see it can fix the NullPointerException + # when executing the following sl4a.bluetoothStartPairingHelper method. + time.sleep(2) + self._ad.sl4a.bluetoothStartPairingHelper() + self.factory_reset_bluetooth() + + def sl4a_setup(self) -> None: + """A common setup routine for android device sl4a function. + + Things this method setup: + 1. Set Bluetooth local name to random string of size 4 + 2. Disable BLE background scanning. + """ + + sl4a = self._ad.sl4a + sl4a.bluetoothStartConnectionStateChangeMonitor('') + setup_result = sl4a.bluetoothSetLocalName(self._generate_id_by_size(4)) + if not setup_result: + self.log.error('Failed to set device name.') + return + sl4a.bluetoothDisableBLE() + bonded_devices = sl4a.bluetoothGetBondedDevices() + for b in bonded_devices: + self.log.info('Removing bond for device {}'.format(b['address'])) + sl4a.bluetoothUnbond(b['address']) + + def set_user_params(self, params: Dict[str, Any]) -> None: + self._user_params = params + + def get_user_params(self) -> Dict[str, Any]: + return self._user_params + + def is_sim_state_loaded(self) -> bool: + """Checks if SIM state is loaded. + + Returns: + True if SIM state is loaded else False. + """ + state = self._ad.adb.shell('getprop gsm.sim.state').decode().strip() + return state == 'LOADED' + + def is_package_installed(self, package_name: str) -> bool: + """Checks if a package is installed. + + Args: + package_name: string, a package to be checked. + + Returns: + True if the package is installed else False. + """ + # The package is installed if result is 1, not installed if result is 0. + result = int(self._ad.adb.shell('pm list packages | grep -i %s$ | wc -l' % + package_name)) + return bool(result) + + def connect_with_rfcomm(self, other_ad: AndroidDevice) -> bool: + """Establishes an RFCOMM connection with other android device. + + Connects this android device (as a client) to the other android device + (as a server). + + Args: + other_ad: the Android device accepting the connection from this device. + + Returns: + True if connection was successful, False if unsuccessful. + """ + server_address = other_ad.sl4a.bluetoothGetLocalAddress() + logging.info('Pairing and connecting devices') + if not self._ad.sl4a.bluetoothDiscoverAndBond(server_address): + logging.info('Failed to pair and connect devices') + return False + + # Create RFCOMM connection + logging.info('establishing RFCOMM connection') + return self.orchestrate_rfcomm_connection(other_ad) + + def orchestrate_rfcomm_connection( + self, + other_ad: AndroidDevice, + accept_timeout_ms: int = bt_constants.DEFAULT_RFCOMM_TIMEOUT_MS, + uuid: Optional[Text] = None) -> bool: + """Sets up the RFCOMM connection to another android device. + + It sets up the connection with a Bluetooth Socket connection with other + device. + + Args: + other_ad: the Android device accepting the connection from this device. + accept_timeout_ms: the timeout in ms for the connection. + uuid: universally unique identifier. + + Returns: + True if connection was successful, False if unsuccessful. + """ + if uuid is None: + uuid = bt_constants.BT_RFCOMM_UUIDS['default_uuid'] + other_ad.sl4a.bluetoothStartPairingHelper() + self._ad.sl4a.bluetoothStartPairingHelper() + other_ad.sl4a.bluetoothSocketConnBeginAcceptThreadUuid(uuid, + accept_timeout_ms) + self._ad.sl4a.bluetoothSocketConnBeginConnectThreadUuid( + other_ad.sl4a.bluetoothGetLocalAddress(), uuid) + + end_time = time.time() + bt_constants.BT_DEFAULT_TIMEOUT_SECONDS + test_result = True + + while time.time() < end_time: + number_socket_connections = len( + other_ad.sl4a.bluetoothSocketConnActiveConnections()) + connected = number_socket_connections > 0 + if connected: + test_result = True + other_ad.log.info('Bluetooth socket Client Connection Active') + break + else: + test_result = False + time.sleep(1) + if not test_result: + other_ad.log.error('Failed to establish a Bluetooth socket connection') + return False + return True + + def wait_for_discovery_success( + self, + mac_address: str, + timeout: float = 30) -> float: + """Waits for a device to be discovered by AndroidDevice. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + timeout: Number of seconds to wait for device discovery. + + Returns: + discovery_time: The time it takes to pair in seconds. + + Raises: + DiscoveryError + """ + start_time = time.time() + try: + self._ad.ed.wait_for_event('Discovery%s' % mac_address, + lambda x: x['data']['Status'], timeout) + discovery_time = time.time() - start_time + return discovery_time + + except queue.Empty: + raise DiscoveryError('Failed to discover device %s after %d seconds' % + (mac_address, timeout)) + + def wait_for_pairing_success( + self, + mac_address: str, + timeout: float = 30) -> float: + """Waits for a device to pair with the AndroidDevice. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + timeout: Number of seconds to wait for the devices to pair. + + Returns: + pairing_time: The time it takes to pair in seconds. + + Raises: + ControllerError + """ + start_time = time.time() + try: + self._ad.ed.wait_for_event('Bond%s' % mac_address, + lambda x: x['data']['Status'], timeout) + pairing_time = time.time() - start_time + return pairing_time + + except queue.Empty: + raise signals.ControllerError( + 'Failed to bond with device %s after %d seconds' % + (mac_address, timeout)) + + def wait_for_connection_success( + self, + mac_address: str, + timeout: int = 30) -> float: + """Waits for a device to connect with the AndroidDevice. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + timeout: Number of seconds to wait for the devices to connect. + + Returns: + connection_time: The time it takes to connect in seconds. + + Raises: + ControllerError + """ + start_time = time.time() + end_time = start_time + timeout + while time.time() < end_time: + if self._is_device_connected(mac_address): + connection_time = (time.time() - start_time) + logging.info('Connected device %s in %d seconds', mac_address, + connection_time) + return connection_time + + raise signals.ControllerError( + 'Failed to connect device within %d seconds.' % timeout) + + def factory_reset_bluetooth(self) -> None: + """Factory resets Bluetooth on an AndroidDevice.""" + + logging.info('Factory resetting Bluetooth for AndroidDevice.') + self._ad.sl4a.bluetoothToggleState(True) + paired_devices = self._ad.mbs.btGetPairedDevices() + for device in paired_devices: + self._ad.sl4a.bluetoothUnbond(device['Address']) + self._ad.sl4a.bluetoothFactoryReset() + self._wait_for_bluetooth_manager_state() + self._ad.sl4a.bluetoothToggleState(True) + + def get_device_info(self) -> Dict[str, Any]: + """Gets the configuration info of an AndroidDevice. + + Returns: + dict, A dictionary mapping metric keys to their respective values. + """ + + device_info = { + 'device_class': + 'AndroidDevice', + 'device_model': + self._ad.device_info['model'], + 'hardware_version': + self._ad.adb.getprop('ro.boot.hardware.revision'), + 'software_version': + self._ad.build_info['build_id'], + 'android_build_type': + self._ad.build_info['build_type'], + 'android_build_number': + self._ad.adb.getprop('ro.build.version.incremental'), + 'android_release_id': + self._ad.build_info['build_id'] + } + + return device_info + + def pair_and_connect_bluetooth( + self, + mac_address: str, + attempts: int = 3, + enable_pairing_retry: bool = True) -> Tuple[float, float]: + """Pairs and connects an AndroidDevice with a peripheral Bluetooth device. + + Ensures that an AndroidDevice is paired and connected to a peripheral + device. If the devices are already connected, does nothing. If + the devices are paired but not connected, connects the devices. If the + devices are neither paired nor connected, this method pairs and connects the + devices. + + Suggests to use the retry mechanism on Discovery because it sometimes fail + even if the devices are testing in shielding. In order to avoid the remote + device may not respond a incoming pairing request causing to bonding failure + , it suggests to retry pairing too. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + attempts: Number of attempts to discover and pair the peripheral device. + enable_pairing_retry: Bool to control whether the retry mechanism is used + on bonding failure, it's enabled if True. + + Returns: + pairing_time: The time, in seconds, it takes to pair the devices. + connection_time: The time, in seconds, it takes to connect the + devices after pairing is completed. + + Raises: + DiscoveryError: Raised if failed to discover the peripheral device. + ControllerError: Raised if failed to bond the peripheral device. + """ + + connected = self._is_device_connected(mac_address) + pairing_time = 0 + connection_time = 0 + if connected: + logging.info('Device %s already paired and connected', mac_address) + return pairing_time, connection_time + + paired_devices = [device['address'] for device in + self._ad.sl4a.bluetoothGetBondedDevices()] + if mac_address in paired_devices: + self._ad.sl4a.bluetoothConnectBonded(mac_address) + return pairing_time, self.wait_for_connection_success(mac_address) + + logging.info('Initiate pairing to the device "%s".', mac_address) + for i in range(attempts): + self._ad.sl4a.bluetoothDiscoverAndBond(mac_address) + try: + self.wait_for_discovery_success(mac_address) + pairing_time = self.wait_for_pairing_success(mac_address) + break + except DiscoveryError: + if i + 1 < attempts: + logging.error( + 'Failed to find the device "%s" on Attempt %d. ' + 'Retrying discovery...', mac_address, i + 1) + continue + raise DiscoveryError('Failed to find the device "%s".' % mac_address) + except signals.ControllerError: + if i + 1 < attempts and enable_pairing_retry: + logging.error( + 'Failed to bond the device "%s" on Attempt %d. ' + 'Retrying pairing...', mac_address, i + 1) + continue + raise signals.ControllerError('Failed to bond the device "%s".' % + mac_address) + + connection_time = self.wait_for_connection_success(mac_address) + return pairing_time, connection_time + + def disconnect_bluetooth( + self, + mac_address: str, + timeout: float = 30) -> float: + """Disconnects Bluetooth between an AndroidDevice and peripheral device. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + timeout: Number of seconds to wait for the devices to disconnect the + peripheral device. + + Returns: + disconnection_time: The time, in seconds, it takes to disconnect the + peripheral device. + + Raises: + ControllerError: Raised if failed to disconnect the peripheral device. + """ + if not self._is_device_connected(mac_address): + logging.info('Device %s already disconnected', mac_address) + return 0 + + self._ad.sl4a.bluetoothDisconnectConnected(mac_address) + start_time = time.time() + end_time = time.time() + timeout + while time.time() < end_time: + connected = self._is_device_connected(mac_address) + if not connected: + logging.info('Device %s disconnected successfully.', mac_address) + return time.time() - start_time + + raise signals.ControllerError( + 'Failed to disconnect device within %d seconds.' % timeout) + + def connect_bluetooth(self, mac_address: str, timeout: float = 30) -> float: + """Connects Bluetooth between an AndroidDevice and peripheral device. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + timeout: Number of seconds to wait for the devices to connect the + peripheral device. + + Returns: + connection_time: The time, in seconds, it takes to connect the + peripheral device. + + Raises: + ControllerError: Raised if failed to connect the peripheral device. + """ + if self._is_device_connected(mac_address): + logging.info('Device %s already connected', mac_address) + return 0 + + self._ad.sl4a.bluetoothConnectBonded(mac_address) + connect_time = self.wait_for_connection_success(mac_address) + + return connect_time + + def activate_pairing_mode(self) -> None: + """Activates pairing mode on an AndroidDevice.""" + logging.info('Activating pairing mode on AndroidDevice.') + self._ad.sl4a.bluetoothMakeDiscoverable() + self._ad.sl4a.bluetoothStartPairingHelper() + + def activate_ble_pairing_mode(self) -> None: + """Activates BLE pairing mode on an AndroidDevice.""" + self.ble_advertise_callback = self._ad.sl4a.bleGenBleAdvertiseCallback() + self._ad.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) + # Sets advertise mode to low latency. + self._ad.sl4a.bleSetAdvertiseSettingsAdvertiseMode( + bt_constants.BleAdvertiseSettingsMode.LOW_LATENCY) + self._ad.sl4a.bleSetAdvertiseSettingsIsConnectable(True) + # Sets TX power level to High. + self._ad.sl4a.bleSetAdvertiseSettingsTxPowerLevel( + bt_constants.BleAdvertiseSettingsTxPower.HIGH) + advertise_data = self._ad.sl4a.bleBuildAdvertiseData() + advertise_settings = self._ad.sl4a.bleBuildAdvertiseSettings() + logging.info('Activating BLE pairing mode on AndroidDevice.') + self._ad.sl4a.bleStartBleAdvertising( + self.ble_advertise_callback, advertise_data, advertise_settings) + + def deactivate_ble_pairing_mode(self) -> None: + """Deactivates BLE pairing mode on an AndroidDevice.""" + if not self.ble_advertise_callback: + self._ad.log.debug('BLE pairing mode is not activated.') + return + logging.info('Deactivating BLE pairing mode on AndroidDevice.') + self._ad.sl4a.bleStopBleAdvertising(self.ble_advertise_callback) + self.ble_advertise_callback = None + + def get_bluetooth_mac_address(self) -> str: + """Gets Bluetooth mac address of an AndroidDevice.""" + logging.info('Getting Bluetooth mac address for AndroidDevice.') + mac_address = self._ad.sl4a.bluetoothGetLocalAddress() + logging.info('Bluetooth mac address of AndroidDevice: %s', mac_address) + return mac_address + + def scan_and_get_ble_device_address( + self, + device_name: str, + timeout_sec: float = 30) -> str: + """Searchs a BLE device by BLE scanner and returns it's BLE mac address. + + Args: + device_name: string, the name of BLE device. + timeout_sec: int, number of seconds to wait for finding the advertisement. + + Returns: + String of the BLE mac address. + + Raises: + ControllerError: Raised if failed to get the BLE device address + """ + filter_list = self._ad.sl4a.bleGenFilterList() + scan_settings = self._ad.sl4a.bleBuildScanSetting() + scan_callback = self._ad.sl4a.bleGenScanCallback() + self._ad.sl4a.bleSetScanFilterDeviceName(device_name) + self._ad.sl4a.bleBuildScanFilter(filter_list) + self._ad.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) + try: + event = self._ad.ed.pop_event( + 'BleScan%sonScanResults' % scan_callback, timeout_sec) + except queue.Empty: + raise signals.ControllerError( + 'Timed out %ds after waiting for phone finding BLE device: %s.' % + (timeout_sec, device_name)) + finally: + self._ad.sl4a.bleStopBleScan(scan_callback) + return event['data']['Result']['deviceInfo']['address'] + + def get_device_name(self) -> str: + """Gets Bluetooth device name of an AndroidDevice.""" + logging.info('Getting Bluetooth device name for AndroidDevice.') + device_name = self._ad.sl4a.bluetoothGetLocalName() + logging.info('Bluetooth device name of AndroidDevice: %s', device_name) + return device_name + + def is_bluetooth_sco_on(self) -> bool: + """Checks whether communications use Bluetooth SCO.""" + cmd = 'dumpsys bluetooth_manager | grep "isBluetoothScoOn"' + get_status = self._ad.adb.shell(cmd) + if isinstance(get_status, bytes): + get_status = get_status.decode() + return 'true' in get_status + + def connect_with_profile( + self, + snd_ad_mac_address: str, + profile: BluetoothProfile) -> bool: + """Connects with the profile. + + The connection can only be completed after the bluetooth devices are paired. + To connected with the profile, the bluetooth connection policy is set to + forbidden first and then set to allowed. The paired bluetooth devices will + start to make connection. The connection time could be long. The waitting + time is set to BT_CONNECTION_WAITING_TIME_SECONDS (currently 10 seconds). + + Args: + snd_ad_mac_address: the mac address of the device accepting connection. + profile: the profiles to be set + + Returns: + The profile connection succeed/fail + """ + if profile == BluetoothProfile.MAP_MCE: + self._ad.sl4a.bluetoothMapClientConnect(snd_ad_mac_address) + elif profile == BluetoothProfile.PBAP_CLIENT: + self.set_profile_policy( + snd_ad_mac_address, profile, + BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED) + self._ad.sl4a.bluetoothPbapClientConnect(snd_ad_mac_address) + else: + self.set_profile_policy( + snd_ad_mac_address, profile, + BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN) + self.set_profile_policy( + snd_ad_mac_address, profile, + BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED) + self._ad.sl4a.bluetoothConnectBonded(snd_ad_mac_address) + time.sleep(BT_CONNECTION_WAITING_TIME_SECONDS) + is_connected = self._is_profile_connected(snd_ad_mac_address, profile) + self.log.info('The connection between %s and %s for profile %s succeed: %s', + self.serial, snd_ad_mac_address, profile, is_connected) + return is_connected + + def connect_to_snd_with_profile( + self, + snd_ad: AndroidDevice, + profile: BluetoothProfile, + attempts: int = 5) -> bool: + """Connects pri android device to snd android device with profile. + + Args: + snd_ad: android device accepting connection + profile: the profile to be connected + attempts: Number of attempts to try until failure. + + Returns: + Boolean of connecting result + """ + pri_ad = self._ad + curr_attempts = 0 + snd_ad_mac_address = snd_ad.sl4a.bluetoothGetLocalAddress() + if not self.is_bt_paired(snd_ad_mac_address): + self.log.error('Devices %s and %s not paired before connecting', + self.serial, snd_ad.serial) + return False + while curr_attempts < attempts: + curr_attempts += 1 + self.log.info('Connection of profile %s at curr attempt %d (total %d)', + profile, curr_attempts, attempts) + if self.connect_with_profile(snd_ad_mac_address, profile): + self.log.info('Connection between devices %s and %s succeeds at %d try', + pri_ad.serial, snd_ad.serial, curr_attempts) + return True + self.log.error('Connection of profile %s failed after %d attempts', profile, + attempts) + return False + + def is_bt_paired(self, mac_address: str) -> bool: + """Check if the bluetooth device with mac_address is paired to ad. + + Args: + mac_address: the mac address of the bluetooth device for pairing + + Returns: + True if they are paired + """ + bonded_info = self._ad.sl4a.bluetoothGetBondedDevices() + return mac_address in [info['address'] for info in bonded_info] + + def is_a2dp_sink_connected(self, mac_address: str) -> bool: + """Checks if the Android device connects to a A2DP sink device. + + Args: + mac_address: String, Bluetooth MAC address of the A2DP sink device. + + Returns: + True if connected else False. + """ + connected_devices = self._ad.sl4a.bluetoothA2dpGetConnectedDevices() + return mac_address in [d['address'] for d in connected_devices] + + def hfp_connect(self, ag_ad: AndroidDevice) -> bool: + """Hfp connecting hf android device to ag android device. + + The android device should support the Headset Client profile. For example, + the android device with git_master-bds-dev build. + + Args: + ag_ad: Audio Gateway (ag) android device + + Returns: + Boolean of connecting result + """ + return self.connect_to_snd_with_profile(ag_ad, + BluetoothProfile.HEADSET_CLIENT) + + def a2dp_sink_connect(self, src_ad: AndroidDevice) -> bool: + """Connects pri android device to secondary android device. + + The android device should support the A2dp Sink profile. For example, the + android device with git_master-bds-dev build. + + Args: + src_ad: A2dp source android device + + Returns: + Boolean of connecting result + """ + return self.connect_to_snd_with_profile(src_ad, BluetoothProfile.A2DP_SINK) + + def map_connect(self, map_ad: AndroidDevice) -> bool: + """Connects primary device to secondary device via MAP MCE profile. + + The primary device should support the MAP MCE profile. For example, + the android device with git_master-bds-dev build. + + Args: + map_ad: AndroidDevice, a android device supporting MAP profile. + + Returns: + Boolean of connecting result + """ + return self.connect_to_snd_with_profile(map_ad, + BluetoothProfile.MAP_MCE) + + def map_disconnect(self, bluetooth_address: str) -> bool: + """Disconnects a MAP MSE device with specified Bluetooth MAC address. + + Args: + bluetooth_address: a connected device's bluetooth address. + + Returns: + True if the device is disconnected else False. + """ + self._ad.sl4a.bluetoothMapClientDisconnect(bluetooth_address) + return bt_test_utils.wait_until( + timeout_sec=COMMON_TIMEOUT_SECONDS, + condition_func=self._is_profile_connected, + func_args=[bluetooth_address, BluetoothProfile.MAP_MCE], + expected_value=False) + + def pbap_connect(self, pbap_ad: AndroidDevice) -> bool: + """Connects primary device to secondary device via PBAP client profile. + + The primary device should support the PBAP client profile. For example, + the android device with git_master-bds-dev build. + + Args: + pbap_ad: AndroidDevice, a android device supporting PBAP profile. + + Returns: + Boolean of connecting result + """ + return self.connect_to_snd_with_profile(pbap_ad, + BluetoothProfile.PBAP_CLIENT) + + def set_bluetooth_tethering(self, status_enabled: bool) -> None: + """Sets Bluetooth tethering to be specific status. + + Args: + status_enabled: Bool, Bluetooth tethering will be set to enable if True, + else disable. + """ + if self._ad.sl4a.bluetoothPanIsTetheringOn() == status_enabled: + self._ad.log.info('Already %s Bluetooth tethering.' % + ('enabled' if status_enabled else 'disabled')) + return + + self._ad.log.info('%s Bluetooth tethering.' % + ('Enable' if status_enabled else 'Disable')) + self._ad.sl4a.bluetoothPanSetBluetoothTethering(status_enabled) + + bt_test_utils.wait_until( + timeout_sec=COMMON_TIMEOUT_SECONDS, + condition_func=self._ad.sl4a.bluetoothPanIsTetheringOn, + func_args=[], + expected_value=status_enabled, + exception=signals.ControllerError( + 'Failed to %s Bluetooth tethering.' % + ('enable' if status_enabled else 'disable'))) + + def set_profile_policy( + self, + snd_ad_mac_address: str, + profile: BluetoothProfile, + policy: BluetoothConnectionPolicy) -> None: + """Sets policy of the profile car related profiles to OFF. + + This avoids autoconnect being triggered randomly. The use of this function + is encouraged when you're testing individual profiles in isolation. + + Args: + snd_ad_mac_address: the mac address of the device accepting connection. + profile: the profiles to be set + policy: the policy value to be set + """ + pri_ad = self._ad + pri_ad_local_name = pri_ad.sl4a.bluetoothGetLocalName() + pri_ad.log.info('Sets profile %s on %s for %s to policy %s', profile, + pri_ad_local_name, snd_ad_mac_address, policy) + if profile == BluetoothProfile.A2DP: + pri_ad.sl4a.bluetoothA2dpSetPriority(snd_ad_mac_address, policy.value) + elif profile == BluetoothProfile.A2DP_SINK: + pri_ad.sl4a.bluetoothA2dpSinkSetPriority(snd_ad_mac_address, policy.value) + elif profile == BluetoothProfile.HEADSET_CLIENT: + pri_ad.sl4a.bluetoothHfpClientSetPriority(snd_ad_mac_address, + policy.value) + elif profile == BluetoothProfile.PBAP_CLIENT: + pri_ad.sl4a.bluetoothPbapClientSetPriority(snd_ad_mac_address, + policy.value) + elif profile == BluetoothProfile.HID_HOST: + pri_ad.sl4a.bluetoothHidSetPriority(snd_ad_mac_address, policy.value) + else: + pri_ad.log.error('Profile %s not yet supported for policy settings', + profile) + + def set_profiles_policy( + self, + snd_ad: AndroidDevice, + profile_list: Sequence[BluetoothProfile], + policy: BluetoothConnectionPolicy) -> None: + """Sets the policy of said profile(s) on pri_ad for snd_ad. + + Args: + snd_ad: android device accepting connection + profile_list: list of the profiles to be set + policy: the policy to be set + """ + mac_address = snd_ad.sl4a.bluetoothGetLocalAddress() + for profile in profile_list: + self.set_profile_policy(mac_address, profile, policy) + + def set_profiles_policy_off( + self, + snd_ad: AndroidDevice, + profile_list: Sequence[BluetoothProfile]) -> None: + """Sets policy of the profiles to OFF. + + This avoids autoconnect being triggered randomly. The use of this function + is encouraged when you're testing individual profiles in isolation + + Args: + snd_ad: android device accepting connection + profile_list: list of the profiles to be turned off + """ + self.set_profiles_policy( + snd_ad, profile_list, + BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN) + + def wait_for_call_state( + self, + call_state: Union[int, CallState], + timeout_sec: float, + wait_interval: int = 3) -> bool: + """Waits for call state of the device to be changed. + + Args: + call_state: int, the expected call state. Call state values are: + 0: IDLE + 1: RINGING + 2: OFFHOOK + timeout_sec: int, number of seconds of expiration time + wait_interval: int, number of seconds of waiting in each cycle + + Returns: + True if the call state has been changed else False. + """ + # TODO(user): Force external call to use CallState instead of int + if isinstance(call_state, CallState): + call_state = call_state.value + expiration_time = time.time() + timeout_sec + which_cycle = 1 + while time.time() < expiration_time: + # Waits for the call state change in every cycle. + time.sleep(wait_interval) + self._ad.log.info( + 'in cycle %d of waiting for call state %d', which_cycle, call_state) + if call_state == self._ad.mbs.getTelephonyCallState(): + return True + self._ad.log.info('The call state did not change to %d before timeout', + call_state) + return False + + def play_audio_file_with_google_play_music(self) -> None: + """Plays an audio file on an AndroidDevice with Google Play Music app. + + Returns: + None + """ + try: + self._ad.aud.add_watcher('LOGIN').when(text='SKIP').click(text='SKIP') + self._ad.aud.add_watcher('NETWORK').when(text='Server error').click( + text='OK') + self._ad.aud.add_watcher('MENU').when(text='Settings').click( + text='Listen Now') + except adb_ui.Error: + logging.info('The watcher has been added.') + self._ad.sl4a.appLaunch('com.google.android.music') + if self._ad.aud(text='No Music available').exists(10): + self._ad.reboot() + self._ad.sl4a.appLaunch('com.google.android.music') + self._ad.aud( + resource_id='com.google.android.music:id/li_thumbnail_frame').click() + time.sleep(6) # Wait for audio playback to reach steady state + + def add_call_log( + self, + call_log_type: Union[int, CallLogType], + phone_number: str, + call_time: int) -> None: + """Add call number and time to specified log. + + Args: + call_log_type: int, number of call log type. Call log type values are: + 1: Incoming call + 2: Outgoing call + 3: Missed call + phone_number: string, phone number to be added in call log. + call_time: int, call time to be added in call log. + + Returns: + None + """ + # TODO(user): Force external call to use CallLogType instead of int + if isinstance(call_log_type, CallLogType): + call_log_type = call_log_type.value + new_call_log = {} + new_call_log['type'] = str(call_log_type) + new_call_log['number'] = phone_number + new_call_log['time'] = str(call_time) + self._ad.sl4a.callLogsPut(new_call_log) + + def get_call_volume(self) -> int: + """Gets current call volume of an AndroidDevice when Bluetooth SCO On. + + Returns: + An integer specifying the number of current call volume level. + """ + cmd = 'dumpsys audio | grep "STREAM_BLUETOOTH_SCO" | tail -1' + out = self._ad.adb.shell(cmd).decode() + # TODO(user): Should we handle the case that re.search(...) return None + # below? + pattern = r'(?<=SCO index:)[\d]+' + return int(re.search(pattern, out).group()) + + def make_phone_call( + self, + callee: AndroidDevice, + timeout_sec: float = 30) -> None: + """Make a phone call to callee and check if callee is ringing. + + Args: + callee: AndroidDevice, The callee in the phone call. + timeout_sec: int, number of seconds to wait for the callee ringing. + + Raises: + TestError + """ + self._ad.sl4a.telecomCallNumber(callee.dimensions['phone_number']) + is_ringing = callee.wait_for_call_state(bt_constants.CALL_STATE_RINGING, + timeout_sec) + if not is_ringing: + raise signals.TestError( + 'Timed out after %ds waiting for call state: RINGING' % timeout_sec) + + def wait_for_disconnection_success( + self, + mac_address: str, + timeout: float = 30) -> float: + """Waits for a device to connect with the AndroidDevice. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + timeout: Number of seconds to wait for the devices to connect. + + Returns: + connection_time: The time it takes to connect in seconds. + + Raises: + ControllerError + """ + start_time = time.time() + end_time = start_time + timeout + while time.time() < end_time: + if not self._ad.sl4a.bluetoothIsDeviceConnected(mac_address): + disconnection_time = (time.time() - start_time) + logging.info('Disconnected device %s in %d seconds', mac_address, + disconnection_time) + return disconnection_time + + raise signals.ControllerError( + 'Failed to disconnect device within %d seconds.' % timeout) + + def first_pair_and_connect_bluetooth(self, bt_device: BtDevice) -> None: + """Pairs and connects an AndroidDevice with a Bluetooth device. + + This method does factory reset bluetooth first and then pairs and connects + the devices. + + Args: + bt_device: The peripheral Bluetooth device or an AndroidDevice. + + Returns: + None + """ + bt_device.factory_reset_bluetooth() + mac_address = bt_device.get_bluetooth_mac_address() + bt_device.activate_pairing_mode() + self.pair_and_connect_bluetooth(mac_address) + + def get_device_time(self) -> str: + """Get device epoch time and transfer to logcat timestamp format. + + Returns: + String of the device time. + """ + return self._ad.adb.shell( + 'date +"%m-%d %H:%M:%S.000"').decode().splitlines()[0] + + def logcat_filter( + self, + start_time: str, + text_filter: str = '') -> str: + """Returns logcat after a given time. + + This method calls from the android_device logcat service file and filters + all logcat line prior to the start_time. + + Args: + start_time: start time in string format of _DATETIME_FMT. + text_filter: only return logcat lines that include this string. + + Returns: + A logcat output. + + Raises: + ValueError Exception if start_time is invalid format. + """ + try: + start_time_conv = datetime.datetime.strptime(start_time, _DATETIME_FMT) + except ValueError as ex: + logging.error('Invalid time format!') + raise ex + logcat_response = '' + with open(self._ad.adb_logcat_file_path, 'r', errors='replace') \ + as logcat_file: + post_start_time = False + for line in logcat_file: + match = self.regex_logcat_time.match(line) + if match: + if (datetime.datetime.strptime( + match.group('datetime'), _DATETIME_FMT) >= start_time_conv): + post_start_time = True + if post_start_time and line.find(text_filter) >= 0: + logcat_response += line + return logcat_response + + def logcat_filter_message( + self, + current_time: str, + text: str = '') -> str: + """DEPRECATED Builds the logcat command. + + This method builds the logcat command to check for a specified log + message after the specified time. If text=None, the logcat returned will be + unfiltered. + + Args: + current_time: time cutoff for grepping for the specified + message, format = ('%m-%d %H:%M:%S.000'). + text: text to search for. + + Returns: + The response of the logcat filter. + """ + return self.logcat_filter(current_time, text) + + def send_media_passthrough_cmd( + self, + command: str, + event_receiver: Optional[AndroidDevice] = None) -> None: + """Sends a media passthrough command. + + Args: + command: string, media passthrough command. + event_receiver: AndroidDevice, a device which starts + BluetoothSL4AAudioSrcMBS. + + Raises: + signals.ControllerError: raised if the event is not received. + """ + self._ad.log.info('Sending Media Passthough: %s' % command) + self._ad.sl4a.bluetoothMediaPassthrough(command) + try: + if not event_receiver: + event_receiver = self._ad + event_receiver.ed.pop_event(MEDIA_CMD_MAP[command], + MEDIA_EVENT_TIMEOUT_SEC) + except queue.Empty: + raise signals.ControllerError( + 'Device "%s" failed to receive the event "%s" ' + 'when the command "%s" was sent.' % + (event_receiver.serial, MEDIA_CMD_MAP[command], command)) + + def pause(self) -> None: + """Sends the AVRCP command "pause".""" + self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE) + + def play(self) -> None: + """Sends the AVRCP command "play".""" + self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY) + + def track_previous(self) -> None: + """Sends the AVRCP command "skipPrev".""" + self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_SKIP_PREV) + + def track_next(self) -> None: + """Sends the AVRCP command "skipNext".""" + self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_SKIP_NEXT) + + def get_current_track_info(self) -> Dict[str, Any]: + """Returns Dict (Media metadata) representing the current track.""" + return self._ad.sl4a.bluetoothMediaGetCurrentMediaMetaData() + + def get_current_playback_state(self) -> int: + """Returns Integer representing the current playback state.""" + return self._ad.sl4a.bluetoothMediaGetCurrentPlaybackState()['state'] + + def verify_playback_state_changed( + self, + expected_state: str, + exception: Optional[Exception] = None) -> bool: + """Verifies the playback state is changed to be the expected state. + + Args: + expected_state: string, the changed state as expected. + exception: Exception, raised when the state is not changed if needed. + """ + bt_test_utils.wait_until( + timeout_sec=MEDIA_UPDATE_TIMEOUT_SEC, + condition_func=self.get_current_playback_state, + func_args=[], + expected_value=expected_state, + exception=exception) + + def verify_current_track_changed( + self, + expected_track: str, + exception: Optional[Exception] = None) -> bool: + """Verifies the Now playing track is changed to be the expected track. + + Args: + expected_track: string, the changed track as expected. + exception: Exception, raised when the track is not changed if needed. + """ + bt_test_utils.wait_until( + timeout_sec=MEDIA_UPDATE_TIMEOUT_SEC, + condition_func=self.get_current_track_info, + func_args=[], + expected_value=expected_track, + exception=exception) + + def verify_avrcp_event( + self, + event_name: AvrcpEvent, + check_time: str, + timeout_sec: float = 20) -> bool: + """Verifies that an AVRCP event was received by an AndroidDevice. + + Checks logcat to verify that an AVRCP event was received after a given + time. + + Args: + event_name: enum, AVRCP event name. Currently supports play, pause, + track_previous, and track_next. + check_time: string, The earliest desired cutoff time to check the logcat. + Must be in format '%m-%d %H:%M:%S.000'. Use + datetime.datetime.now().strftime('%m-%d %H:%M:%S.%f') to get current time + in this format. + timeout_sec: int, Number of seconds to wait for the specified AVRCP event + be found in logcat. + + Raises: + TestError + + Returns: + True if the event was received. + """ + avrcp_events = [ + 'State:NOT_PLAYING->PLAYING', 'State:PLAYING->NOT_PLAYING', + 'sendMediaKeyEvent: keyEvent=76', 'sendMediaKeyEvent: keyEvent=75' + ] + if event_name.value not in avrcp_events: + raise signals.TestError('An unexpected AVRCP event is specified.') + + end_time = time.time() + timeout_sec + while time.time() < end_time: + if self.logcat_filter_message(check_time, event_name.value): + logging.info('%s event received successfully.', event_name) + return True + time.sleep(1) + logging.error('AndroidDevice failed to receive %s event.', event_name) + logging.info('Logcat:\n%s', self.logcat_filter_message(check_time)) + return False + + def add_google_account(self, retries: int = 5) -> bool: + """Login Google account. + + Args: + retries: int, the number of retries. + + Returns: + True if account is added successfully. + + Raises: + TestError + """ + for _ in range(retries): + output = self._ad.adb.shell( + 'am instrument -w -e account "%s" -e password ' + '"%s" -e sync true -e wait-for-checkin false ' + 'com.google.android.tradefed.account/.AddAccount' % + (self._ad.dimensions['google_account'], + self._ad.dimensions['google_account_password'])).decode() + if 'result=SUCCESS' in output: + logging.info('Google account is added successfully') + time.sleep(3) # Wait for account to steady state + return True + raise signals.TestError('Failed to add google account: %s' % output) + + def remove_google_account(self, retries: int = 5) -> bool: + """Remove Google account. + + Args: + retries: int, the number of retries. + + Returns: + True if account is removed successfully. + + Raises: + TestError + """ + for _ in range(retries): + output = self._ad.adb.shell( + 'am instrument -w com.google.android.tradefed.account/.RemoveAccounts' + ).decode() + if 'result=SUCCESS' in output: + logging.info('Google account is removed successfully') + return True + time.sleep(1) # Buffer between retries. + raise signals.TestError('Failed to remove google account: %s' % output) + + def make_hangouts_voice_call(self, callee: AndroidDevice) -> None: + """Make Hangouts VOIP voice call. + + Args: + callee: Android Device, the android device of callee. + + Returns: + None + """ + try: + self._ad.aud.add_watcher('SETUP').when(text='SKIP').click(text='SKIP') + self._ad.aud.add_watcher('REMINDER').when(text='Got it').click( + text='Got it') + except adb_ui.Error: + # TODO(user): Need to figure out the logic here why use info in + # exception catch block instead of warning/error + logging.info('The watcher has been added.') + self._ad.sl4a.appLaunch('com.google.android.talk') + callee.sl4a.appLaunch('com.google.android.talk') + # Make voice call to callee + try: + # Click the callee icon + self._ad.aud(resource_id='com.google.android.talk:id/avatarView').click() + except adb_ui.Error: + # Press BACK key twice and re-launch Hangouts if it is not in main page + for _ in range(2): + self._ad.aud.send_key_code(4) + self._ad.sl4a.appLaunch('com.google.android.talk') + self._ad.aud(resource_id='com.google.android.talk:id/avatarView').click() + # Click the button to make a voice call + self._ad.aud(content_desc='Call').click() + # Answer by callee + if callee.aud(text='Answer').exists(5): + callee.aud(text='Answer').click() + else: + callee.aud(content_desc='Join voice call').click() + + def hang_up_hangouts_call(self) -> None: + """Hang up Hangouts VOIP voice call. + + Returns: + None + """ + # Click the in call icon to show the end call button + self._ad.aud( + resource_id='com.google.android.talk:id/in_call_main_avatar').click() + # Click the button to hang up call + self._ad.aud(content_desc='Hang up').click() + time.sleep(3) # Wait for VoIP call state to reach idle state + + def detect_and_pull_ssrdump(self, ramdump_type: str = 'ramdump_bt') -> bool: + """Detect and pull RAMDUMP log. + + Args: + ramdump_type: str, the partial of file names to search for in ramdump + files path. 'ramdump_bt' is used for searching Bluetooth ramdump log + files. + + Returns: + True if there is a file with file name matching the ramdump type. + """ + files = self._ad.adb.shell('ls %s' % bt_constants.RAMDUMP_PATH).decode() + if ramdump_type in files: + logging.info('RAMDUMP is found.') + log_name_timestamp = mobly_logger.get_log_file_timestamp() + destination = os.path.join(self._ad.log_path, 'RamdumpLogs', + log_name_timestamp) + utils.create_dir(destination) + self._ad.adb.pull([bt_constants.RAMDUMP_PATH, destination]) + return True + return False + + def get_bt_num_of_crashes(self) -> int: + """Get number of Bluetooth crash times from bluetooth_manager. + + Returns: + Number of Bluetooth crashed times. + """ + out = self._regex_bt_crash.search( + self._ad.adb.shell('dumpsys bluetooth_manager').decode()) + # TODO(user): Need to consider the case "out=None" when miss in + # matching + return int(out.group('num_bt_crashes')) + + def clean_ssrdump(self) -> None: + """Clean RAMDUMP log. + + Returns: + None + """ + self._ad.adb.shell('rm -rf %s/*' % bt_constants.RAMDUMP_PATH) + + def set_target(self, bt_device: BtDevice) -> None: + """Allows for use to get target device object for target interaction.""" + self._target_device = bt_device + + def wait_for_hsp_connection_state(self, + mac_address: str, + connected: bool, + timeout_sec: float = 30) -> bool: + """Waits for HSP connection to be in a expected state on Android device. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + connected: True if HSP connection state is connected as expected. + timeout_sec: Number of seconds to wait for HSP connection state change. + """ + expected_state = BluetoothConnectionStatus.STATE_DISCONNECTED + if connected: + expected_state = BluetoothConnectionStatus.STATE_CONNECTED + bt_test_utils.wait_until( + timeout_sec=timeout_sec, + condition_func=self._ad.sl4a.bluetoothHspGetConnectionStatus, + func_args=[mac_address], + expected_value=expected_state, + exception=signals.TestError( + 'Failed to %s the device "%s" within %d seconds via HSP.' % + ('connect' if connected else 'disconnect', mac_address, + timeout_sec))) + + def wait_for_bluetooth_toggle_state(self, + enabled: bool = True, + timeout_sec: float = 30) -> bool: + """Waits for Bluetooth to be in an expected state. + + Args: + enabled: True if Bluetooth status is enabled as expected. + timeout_sec: Number of seconds to wait for Bluetooth to be in the expected + state. + """ + bt_test_utils.wait_until( + timeout_sec=timeout_sec, + condition_func=self._ad.mbs.btIsEnabled, + func_args=[], + expected_value=enabled, + exception=signals.TestError( + 'Bluetooth is not %s within %d seconds on the device "%s".' % + ('enabled' if enabled else 'disabled', timeout_sec, + self._ad.serial))) + + def wait_for_a2dp_connection_state(self, + mac_address: str, + connected: bool, + timeout_sec: float = 30) -> bool: + """Waits for A2DP connection to be in a expected state on Android device. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + connected: True if A2DP connection state is connected as expected. + timeout_sec: Number of seconds to wait for A2DP connection state change. + """ + bt_test_utils.wait_until( + timeout_sec=timeout_sec, + condition_func=self.is_a2dp_sink_connected, + func_args=[mac_address], + expected_value=connected, + exception=signals.TestError( + 'Failed to %s the device "%s" within %d seconds via A2DP.' % + ('connect' if connected else 'disconnect', mac_address, + timeout_sec))) + + def wait_for_nap_service_connection( + self, + connected_mac_addr: str, + state_connected: bool, + exception: Exception) -> bool: + """Waits for NAP service connection to be expected state. + + Args: + connected_mac_addr: String, Bluetooth Mac address is needed to be checked. + state_connected: Bool, NAP service connection is established as expected + if True, else terminated as expected. + exception: Exception, Raised if NAP service connection is not expected + state. + + Raises: + exception: Raised if NAP service connection is not expected state. + """ + def is_device_connected(): + """Returns True if connected else False.""" + connected_devices = self._ad.sl4a.bluetoothPanGetConnectedDevices() + # Check if the Bluetooth mac address is in the connected device list. + return connected_mac_addr in [d['address'] for d in connected_devices] + + bt_test_utils.wait_until( + timeout_sec=bt_constants.NAP_CONNECTION_TIMEOUT_SECS, + condition_func=is_device_connected, + func_args=[], + expected_value=state_connected, + exception=exception) + + def verify_internet(self, + allow_access: bool, + exception: Exception, + test_url: str = TEST_URL, + interval_sec: int = PING_INTERVAL_TIME_SEC, + timeout_sec: float = PING_TIMEOUT_SEC) -> bool: + """Verifies that internet is in expected state. + + Continuously make ping request to a URL for internet verification. + + Args: + allow_access: Bool, Device can have internet access as expected if True, + else no internet access as expected. + exception: Exception, Raised if internet is not in expected state. + test_url: String, A URL is used to verify internet by ping request. + interval_sec: Int, Interval time between ping requests in second. + timeout_sec: Int, Number of seconds to wait for ping success if + allow_access is True else wait for ping failure if allow_access is + False. + + Raises: + exception: Raised if internet is not in expected state. + """ + self._ad.log.info('Verify that internet %s be used.' % + ('can' if allow_access else 'can not')) + + def http_ping(): + """Returns True if http ping success else False.""" + try: + return bool(self._ad.sl4a.httpPing(test_url)) + except jsonrpc_client_base.ApiError as e: + # ApiError is raised by httpPing() when no internet. + self._ad.log.debug(str(e)) + return False + + bt_test_utils.wait_until( + timeout_sec=timeout_sec, + condition_func=http_ping, + func_args=[], + expected_value=allow_access, + exception=exception, + interval_sec=interval_sec) + + def allow_extra_permissions(self) -> None: + """A method to allow extra permissions. + + This method has no any logics. It is used to skip the operation when it is + called if a test is not Wear OS use case. + """ + + def goto_bluetooth_device_details(self) -> None: + """Goes to bluetooth device detail page.""" + self._ad.adb.shell('am force-stop com.android.settings') + self._ad.adb.shell('am start -a android.settings.BLUETOOTH_SETTINGS') + self._ad.aud( + resource_id='com.android.settings:id/settings_button').click() + + def bluetooth_ui_forget_device(self) -> None: + """Clicks the forget device button.""" + self.goto_bluetooth_device_details() + self._ad.aud(resource_id='com.android.settings:id/button1').click() + + def bluetooth_ui_disconnect_device(self) -> None: + """Clicks the disconnect device button.""" + self.goto_bluetooth_device_details() + self._ad.aud(resource_id='com.android.settings:id/button2').click() + + def _find_bt_device_details_ui_switch(self, switch_name: str): + """Returns the UI node for a BT switch. + + Args: + switch_name: each switch button name in bluetooth connect device detail + page. switch name like 'Phone calls', 'Media audio', etc. + + Returns: + adb_ui_device.XML node UI element of the BT each option switch button. + """ + switch_button_name = ('Phone calls', + 'Media audio', + 'Contact sharing', + 'Text Messages' + ) + if switch_name not in switch_button_name: + raise ValueError(f'Unknown switch name {switch_name}.') + self.goto_bluetooth_device_details() + text_node = adb_ui.wait_and_get_xml_node( + self._ad, timeout=10, text=switch_name) + text_grandparent_node = text_node.parentNode.parentNode + switch_node = adb_ui.Selector( + resource_id='android:id/switch_widget').find_node(text_grandparent_node) + return switch_node + + def get_bt_device_details_ui_switch_state(self, switch_name: str) -> bool: + """Gets bluetooth each option switch button state value. + + Args: + switch_name: each switch button name in bluetooth connect device detail + page. + + Returns: + State True or False. + """ + switch_node = self._find_bt_device_details_ui_switch(switch_name) + current_state = switch_node.attributes['checked'].value == 'true' + return current_state + + def set_bt_device_details_ui_switch_state( + self, switch_name: str, + target_state: bool) -> None: + """Sets and checks the BT each option button is the target enable state. + + Args: + switch_name: each switch button name in bluetooth connect device detail + page. + target_state: The desired state expected from the switch. If the state of + the switch already meet expectation, no action will be taken. + """ + if self.get_bt_device_details_ui_switch_state(switch_name) == target_state: + return + switch_node = self._find_bt_device_details_ui_switch(switch_name) + x, y = adb_ui.find_point_in_bounds(switch_node.attributes['bounds'].value) + self._ad.aud.click(x, y) + + def get_bt_quick_setting_switch_state(self) -> bool: + """Gets bluetooth quick settings switch button state value.""" + self._ad.open_notification() + switch_node = self._ad_aud(class_name='android.widget.Switch', index='1') + current_state = switch_node.attributes['content-desc'].value == 'Bluetooth.' + return current_state + + def assert_bt_device_details_state(self, target_state: bool) -> None: + """Asserts the Bluetooth connection state. + + Asserts the BT each option button in device detail, + BT quick setting state and BT manager service from log are at the target + state. + + Args: + target_state: BT each option button, quick setting and bluetooth manager + service target state. + + """ + for switch_name in ['Phone calls', 'Media audio']: + asserts.assert_equal( + self._ad.get_bt_device_details_ui_switch_state(switch_name), + target_state, + f'The BT Media calls switch button state is not {target_state}.') + asserts.assert_equal(self._ad.is_service_running(), target_state, + f'The BT service state is not {target_state}.') + asserts.assert_equal( + self._ad.get_bt_quick_setting_switch_state(), target_state, + f'The BT each switch button state is not {target_state}.') + + def is_service_running( + self, + mac_address: str, + timeout_sec: float) -> bool: + """Checks bluetooth profile state. + + Check bluetooth headset/a2dp profile connection + status from bluetooth manager log. + + Args: + mac_address: The Bluetooth mac address of the peripheral device. + timeout_sec: Number of seconds to wait for the specified message + be found in bluetooth manager log. + + Returns: + True: If pattern match with bluetooth_manager_log. + """ + pattern_headset = (r'\sm\w+e:\sC\w+d') + pattern_a2dp = (r'StateMachine:.*state=Connected') + output_headset = self._ad.adb.shell( + 'dumpsys bluetooth_manager | egrep -A20 "Profile: HeadsetService"' + ).decode() + output_a2dp = self._ad.adb.shell( + 'dumpsys bluetooth_manager | egrep -A30 "Profile: A2dpService"').decode( + ) + service_type = { + 'a2dp': ((pattern_a2dp), (output_a2dp)), + 'headset': ((pattern_headset), (output_headset)) + } + start_time = time.time() + end_time = start_time + timeout_sec + while start_time < end_time: + try: + match = service_type + if match and mac_address in service_type: + return True + except adb.AdbError as e: + logging.exception(e) + time.sleep(ADB_WAITING_TIME_SECONDS) + return False + + def browse_internet(self, url: str = 'www.google.com') -> None: + """Browses internet by Chrome. + + Args: + url: web address. + + Raises: + signals.TestError: raised if it failed to browse internet by Chrome. + """ + browse_url = ( + 'am start -n com.android.chrome/com.google.android.apps.chrome.Main -d' + ' %s' % url + ) + self._ad.adb.shell(browse_url) + self._ad.aud.add_watcher('Welcome').when( + text='Accept & continue').click(text='Accept & continue') + self._ad.aud.add_watcher('sync page').when( + text='No thanks').click(text='No thanks') + if self._ad.aud(text='No internet').exists(): + raise signals.TestError('No connect internet.') + + def connect_wifi_from_other_device_hotspot( + self, wifi_hotspot_device: AndroidDevice) -> None: + """Turns on 2.4G Wifi hotspot from the other android device and connect on the android device. + + Args: + wifi_hotspot_device: Android device, turn on 2.4G Wifi hotspot. + """ + # Turn on 2.4G Wifi hotspot on the secondary phone. + wifi_hotspot_device.sl4a.wifiSetWifiApConfiguration( + bt_constants.WIFI_HOTSPOT_2_4G) + wifi_hotspot_device.sl4a.connectivityStartTethering(0, False) + # Connect the 2.4G Wifi on the primary phone. + self._ad.mbs.wifiEnable() + self._ad.mbs.wifiConnectSimple( + bt_constants.WIFI_HOTSPOT_2_4G['SSID'], + bt_constants.WIFI_HOTSPOT_2_4G['password']) diff --git a/system/blueberry/utils/arduino_base.py b/system/blueberry/utils/arduino_base.py new file mode 100644 index 0000000000..37c9146033 --- /dev/null +++ b/system/blueberry/utils/arduino_base.py @@ -0,0 +1,78 @@ +"""Base class for Blueberry controllers using Arduino board. + +This module uses pyserial library to communicate with Arduino UNO board. + +About Arduino code, please refer to the code of following Arduino project: +Internal link +""" + +import time +from mobly.signals import ControllerError +import serial + + +class ArduinoBase(object): + """Implements an Arduino base class. + + Attributes: + serial: serial object, a serial object which is used to communicate with + Arduino board. + """ + + def __init__(self, config): + """Initializes an Arduino base class.""" + self._verify_config(config) + self.serial = serial.Serial(config['arduino_port'], 9600) + self.serial.timeout = 30 + # Buffer between calling serial.Serial() and serial.Serial.write(). + time.sleep(2) + + def _verify_config(self, config): + """Checks the device config's required config parameters. + + Args: + config: dict, Mobly controller config for ArduinoBass. The config should + include the key "arduino_port" whose value is a string representing + Arduino board name. e.g. /dev/ttyACM0. + """ + if 'arduino_port' not in config: + raise ControllerError('Please provide an Arduino board port for the' + ' ArduinoBase in Mobile Harness config') + + def _send_string_to_arduino(self, tx_string): + """Sends a particular string to communicate with Arduino. + + The method requires that Arduino code can read string which is received from + a python serial object and then send the same string to the serial object. + + An example of Arduino code: + String kRxString = ""; + void setup() { + ... + } + void loop() { + if (Serial.available() > 0) { + kRxString = Serial.readString(); + ... + Serial.write(kRxString.c_str()); + } + } + + Args: + tx_string: string, is used to be sent to Arduino port for making the + controlled device perform action. After Arduino receives the string, it + will send a response which is the same string. + + Returns: + The time it takes for waiting a response, in seconds. + + Raises: + ControllerError: raised if not received a response from Arduino. + """ + self.serial.write(str.encode(tx_string)) + start_time = time.time() + rx_string = self.serial.read_until(tx_string, len(tx_string)).decode() + if rx_string == tx_string: + return time.time() - start_time + raise ControllerError('Timed out after %ds waiting for the string "%s" from' + ' Arduino.' % (self.serial.timeout, tx_string)) diff --git a/system/blueberry/utils/blueberry_base_test.py b/system/blueberry/utils/blueberry_base_test.py new file mode 100644 index 0000000000..abe19cd323 --- /dev/null +++ b/system/blueberry/utils/blueberry_base_test.py @@ -0,0 +1,244 @@ +"""Base test class for Blueberry.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import re + +from mobly import base_test +from mobly import signals +from mobly.controllers import android_device +from mobly.controllers.android_device_lib import adb + +# Internal import +# Internal import +# Internal import +# Internal import +from blueberry.controllers import derived_bt_device +from blueberry.decorators import android_bluetooth_client_decorator +from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator + + +class BlueberryBaseTest(base_test.BaseTestClass): + """Base test class for all Blueberry tests to inherit from. + + This class assists with device setup for device logging and other pre test + setup required for Bluetooth tests. + """ + + def setup_generated_tests(self): + """Generates multiple the same tests for pilot run. + + This is used to let developers can easily pilot run their tests many times, + help them to check test stability and reliability. If need to use this, + please add a flag called "test_iterations" to TestParams in Mobly test + configuration and its value is number of test methods to be generated. The + naming rule of test method on Sponge is such as the following example: + test_send_file_via_bluetooth_opp_1_of_50 + test_send_file_via_bluetooth_opp_2_of_50 + test_send_file_via_bluetooth_opp_3_of_50 + ... + test_send_file_via_bluetooth_opp_50_of_50 + + Don't use "test_case_selector" when using "test_iterations", and please use + "test_method_selector" to replace it. + """ + test_iterations = int(self.user_params.get('test_iterations', 0)) + if test_iterations < 2: + return + + test_method_selector = self.user_params.get('test_method_selector', 'all') + existing_test_names = self.get_existing_test_names() + + selected_test_names = None + if test_method_selector == 'all': + selected_test_names = existing_test_names + else: + selected_test_names = test_method_selector.split(' ') + # Check if selected test methods exist in the test class. + for test_name in selected_test_names: + if test_name not in existing_test_names: + raise base_test.Error('%s does not have test method "%s".' % + (self.TAG, test_name)) + + for test_name in selected_test_names: + test_method = getattr(self.__class__, test_name) + # List of (<new test name>, <test method>). + test_arg_sets = [('%s_%s_of_%s' % (test_name, i + 1, test_iterations), + test_method) for i in range(test_iterations)] + # pylint: disable=cell-var-from-loop + self.generate_tests( + test_logic=lambda _, test: test(self), + name_func=lambda name, _: name, + arg_sets=test_arg_sets) + + # Delete origin test methods in order to avoid below situation: + # test_send_file_via_bluetooth_opp <-- origin test method + # test_send_file_via_bluetooth_opp_1_of_50 + # test_send_file_via_bluetooth_opp_2_of_50 + for test_name in existing_test_names: + delattr(self.__class__, test_name) + + def setup_class(self): + """Setup class is called before running any tests.""" + super(BlueberryBaseTest, self).setup_class() + self.capture_bugreport_on_fail = int(self.user_params.get( + 'capture_bugreport_on_fail', 0)) + self.ignore_device_setup_failures = int(self.user_params.get( + 'ignore_device_setup_failures', 0)) + self.enable_bluetooth_verbose_logging = int(self.user_params.get( + 'enable_bluetooth_verbose_logging', 0)) + self.enable_hci_snoop_logging = int(self.user_params.get( + 'enable_hci_snoop_logging', 0)) + self.increase_logger_buffers = int(self.user_params.get( + 'increase_logger_buffers', 0)) + self.enable_all_bluetooth_logging = int(self.user_params.get( + 'enable_all_bluetooth_logging', 0)) + + # base test should include the test between primary device with Bluetooth + # peripheral device. + self.android_devices = self.register_controller( + android_device, required=False) + + # In the case of no android_device assigned, at least 2 derived_bt_device + # is required. + if self.android_devices is None: + self.derived_bt_devices = self.register_controller( + module=derived_bt_device, min_number=2) + else: + self.derived_bt_devices = self.register_controller( + module=derived_bt_device, required=False) + + if self.derived_bt_devices is None: + self.derived_bt_devices = [] + else: + for derived_device in self.derived_bt_devices: + derived_device.set_user_params(self.user_params) + derived_device.setup() + + self.android_ui_devices = {} + # Create a dictionary of Android UI Devices + for device in self.android_devices: + self.android_ui_devices[device.serial] = AdbUiDevice(device) + mobly_config = {'aud': self.android_ui_devices[device.serial]} + device.load_config(mobly_config) + need_restart_bluetooth = False + if (self.enable_bluetooth_verbose_logging or + self.enable_all_bluetooth_logging): + if self.set_bt_trc_level_verbose(device): + need_restart_bluetooth = True + if self.enable_hci_snoop_logging or self.enable_all_bluetooth_logging: + if self.set_btsnooplogmode_full(device): + need_restart_bluetooth = True + if self.increase_logger_buffers or self.enable_all_bluetooth_logging: + self.set_logger_buffer_size_16m(device) + + # Restarts Bluetooth to take BT VERBOSE and HCI Snoop logging effect. + if need_restart_bluetooth: + device.log.info('Restarting Bluetooth by airplane mode...') + self.restart_bluetooth_by_airplane_mode(device) + self.android_devices = [AndroidBluetoothDecorator(device) + for device in self.android_devices] + for device in self.android_devices: + device.set_user_params(self.user_params) + + self.client_decorators = self.user_params.get('sync_decorator', []) + if self.client_decorators: + self.client_decorators = self.client_decorators.split(',') + + self.target_decorators = self.user_params.get('target_decorator', []) + if self.target_decorators: + self.target_decorators = self.target_decorators.split(',') + + for decorator in self.client_decorators: + self.android_devices[0] = android_bluetooth_client_decorator.decorate( + self.android_devices[0], decorator) + + for num_devices in range(1, len(self.android_devices)): + for decorator in self.target_decorators: + self.android_devices[ + num_devices] = android_bluetooth_client_decorator.decorate( + self.android_devices[num_devices], decorator) + + def on_fail(self, record): + """This method is called when a test failure.""" + # Capture bugreports on fail if enabled. + if self.capture_bugreport_on_fail: + devices = self.android_devices + # Also capture bugreport of AndroidBtTargetDevice. + for d in self.derived_bt_devices: + if hasattr(d, 'take_bug_report'): + devices = devices + [d] + android_device.take_bug_reports( + devices, + record.test_name, + record.begin_time, + destination=self.current_test_info.output_path) + + def set_logger_buffer_size_16m(self, device): + """Sets all logger sizes per log buffer to 16M.""" + device.log.info('Setting all logger sizes per log buffer to 16M...') + # Logger buffer info: + # https://developer.android.com/studio/command-line/logcat#alternativeBuffers + logger_buffers = ['main', 'system', 'crash', 'radio', 'events', 'kernel'] + for buffer in logger_buffers: # pylint: disable=redefined-builtin + device.adb.shell('logcat -b %s -G 16M' % buffer) + buffer_size = device.adb.shell('logcat -b %s -g' % buffer) + if isinstance(buffer_size, bytes): + buffer_size = buffer_size.decode() + if 'ring buffer is 16' in buffer_size: + device.log.info('Successfully set "%s" buffer size to 16M.' % buffer) + else: + msg = 'Failed to set "%s" buffer size to 16M.' % buffer + if not self.ignore_device_setup_failures: + raise signals.TestError(msg) + device.log.warning(msg) + + def set_bt_trc_level_verbose(self, device): + """Modifies etc/bluetooth/bt_stack.conf to enable Bluetooth VERBOSE log.""" + device.log.info('Enabling Bluetooth VERBOSE logging...') + bt_stack_conf = device.adb.shell('cat etc/bluetooth/bt_stack.conf') + if isinstance(bt_stack_conf, bytes): + bt_stack_conf = bt_stack_conf.decode() + # Check if 19 trace level settings are set to 6(VERBOSE). E.g. TRC_HCI=6. + if len(re.findall('TRC.*=[6]', bt_stack_conf)) == 19: + device.log.info('Bluetooth VERBOSE logging has already enabled.') + return False + # Suggest to use AndroidDeviceSettingsDecorator to disable verity and then + # reboot (b/140277443). + disable_verity_check(device) + device.adb.remount() + try: + device.adb.shell(r'sed -i "s/\(TRC.*=\)2/\16/g;s/#\(LoggingV=--v=\)0/\13' + '/" etc/bluetooth/bt_stack.conf') + device.log.info('Successfully enabled Bluetooth VERBOSE Logging.') + return True + except adb.AdbError: + msg = 'Failed to enable Bluetooth VERBOSE Logging.' + if not self.ignore_device_setup_failures: + raise signals.TestError(msg) + device.log.warning(msg) + return False + + def set_btsnooplogmode_full(self, device): + """Enables bluetooth snoop logging.""" + device.log.info('Enabling Bluetooth HCI Snoop logging...') + device.adb.shell('setprop persist.bluetooth.btsnooplogmode full') + out = device.adb.shell('getprop persist.bluetooth.btsnooplogmode') + if isinstance(out, bytes): + out = out.decode() + # The expected output is "full/n". + if 'full' in out: + device.log.info('Successfully enabled Bluetooth HCI Snoop Logging.') + return True + msg = 'Failed to enable Bluetooth HCI Snoop Logging.' + if not self.ignore_device_setup_failures: + raise signals.TestError(msg) + device.log.warning(msg) + return False + + def restart_bluetooth_by_airplane_mode(self, device): + """Restarts bluetooth by airplane mode.""" + enable_airplane_mode(device, 3) + disable_airplane_mode(device, 3) diff --git a/system/blueberry/utils/bt_audio_utils.py b/system/blueberry/utils/bt_audio_utils.py new file mode 100644 index 0000000000..a4c2e8e95b --- /dev/null +++ b/system/blueberry/utils/bt_audio_utils.py @@ -0,0 +1,229 @@ +# Lint as: python3 +"""Utils for bluetooth audio testing.""" + +import logging as log +import os +import numpy as np +from scipy import signal as scipy_signal +from scipy.io import wavfile +# Internal import +# Internal import + + +def generate_sine_wave_to_device( + device, + pushed_file_path='/sdcard/Music', + frequency=480, + channel=2, + sample_rate=48000, + sample_format=16, + duration_sec=10): + """Generates a fixed frequency sine wave file and push it to the device. + + Generates a sine wave to the Mobly device directory and push it to the device + storage. The output file name format is such as the example: + sine_480hz_2ch_48000rate_16bit_10sec.wav + + Args: + device: AndroidDevice, Mobly Android controller class. + pushed_file_path: string, the wave file path which is pushed to the device + storage. E.g. /sdcard/Music + frequency: int, fixed frequency in Hz. + channel: int, number of channels. + sample_rate: int, sampling rate in Hz. + sample_format: int, sampling format in bit. + duration_sec: int, audio duration in second. + + Returns: + device_storage_path: string, the wave file on the device storage. + mobly_directory_path: string, the wave file on the Mobly device directory. + """ + file_name = 'sine_%dhz_%dch_%drate_%dbit_%dsec.wav' % ( + frequency, channel, sample_rate, sample_format, duration_sec) + mobly_directory_path = os.path.join(device.log_path, file_name) + os.system('%s -n -c %d -r %d -b %d %s synth %d sine %d' % + (audio_processor.AudioProcessor.SOX, channel, sample_rate, + sample_format, mobly_directory_path, duration_sec, frequency)) + device.adb.push([mobly_directory_path, pushed_file_path]) + device_storage_path = os.path.join(pushed_file_path, file_name) + return device_storage_path, mobly_directory_path + + +def measure_audio_mos(recorded_audio_file, reference_audio_file): + """Measures mean opinion score (MOS) of a recorded audio. + + This function uses the module of A/V Analysis Service to measure MOS: + Internal reference + + Args: + recorded_audio_file: string, the recorded audio file to be measured. + reference_audio_file: string, the reference audio file for comparison. + + Returns: + Float which is the mean opinion score of the recorded audio. + """ + results = audio_calculator.AudioAnalyzer().Analyze(reference_audio_file, + recorded_audio_file) + # Returns 0.0 if the results fails to be generated. + if not results: + log.warning('Failed to generate the audio analysis results.') + return 0.0 + return results[0].mos + + +def measure_fundamental_frequency(signal, sample_rate): + """Measures fundamental frequency of a signal. + + Args: + signal: An 1-D array representing the signal data. + sample_rate: int, sample rate of the signal. + + Returns: + Float representing the fundamental frequency. + """ + return sample_rate * (np.argmax(np.abs(np.fft.rfft(signal))) / len(signal)) + + +def measure_rms(signal): + """Measures Root Mean Square (RMS) of a signal. + + Args: + signal: An 1-D array representing the signal data. + + Returns: + Float representing the root mean square. + """ + return np.sqrt(np.mean(np.absolute(signal)**2)) + + +def measure_thdn(signal, sample_rate, q, frequency=None): + """Measures Total Harmonic Distortion + Noise (THD+N) of a signal. + + Args: + signal: An 1-D array representing the signal data. + sample_rate: int, sample rate of the signal. + q: float, quality factor for the notch filter. + frequency: float, fundamental frequency of the signal. All other frequencies + are noise. If not specified, will be calculated using FFT. + + Returns: + Float representing THD+N ratio calculated from the ratio of RMS of pure + harmonics and noise signal to RMS of original signal. + """ + # Normalizes the signal. + signal -= np.mean(signal) + # Gets Blackman-Harris window from the signal. + window = signal * scipy_signal.blackmanharris(len(signal)) + # Finds the fundamental frequency to remove if not specified. + if not frequency: + frequency = measure_fundamental_frequency(window, sample_rate) + # Creates a notch filter to get noise from the signal. + wo = frequency / (sample_rate / 2) + b, a = scipy_signal.iirnotch(wo, q) + noise = scipy_signal.lfilter(b, a, window) + return measure_rms(noise) / measure_rms(window) + + +def measure_audio_thdn_per_window( + audio_file, + thdn_threshold, + step_size, + window_size, + q, + frequency=None): + """Measures Total Harmonic Distortion + Noise (THD+N) of an audio file. + + This function is used to capture audio glitches from a recorded audio file, + and the audio file shall record a fixed frequency sine wave. + + Args: + audio_file: A .wav file to be measured. + thdn_threshold: float, a THD+N threshold used to compare with the measured + THD+N for every windows. If THD+N of a window is greater than the + threshold, will record this to results. + step_size: int, number of samples to move the window by for each analysis. + window_size: int, number of samples to analyze each time. + q: float, quality factor for the notch filter. + frequency: float, fundamental frequency of the signal. All other frequencies + are noise. If not specified, will be calculated using FFT. + + Returns: + List containing each result of channels. Like the following structure: + ``` + [ + [ # result of channel 1 + { + "thd+n": <float>, # THD+N of a window + "start_time": <float>, # start time of a window + "end_time": <float>, # end time of a window + }, + ..., + ], + [...,] # result of channel 2 + ..., + ] + ``` + """ + if step_size <= 0: + raise ValueError('step_size shall be greater than 0.') + if window_size <= 0: + raise ValueError('window_size shall be greater than 0.') + sample_rate, wave_data = wavfile.read(audio_file) + wave_data = wave_data.astype('float64') + # Collects the result for each channels. + results = [] + for signal in wave_data.transpose(): + current_position = 0 + channel_result = [] + while current_position + window_size < len(signal): + window = signal[current_position:current_position + window_size] + thdn = measure_thdn( + signal=window, + sample_rate=sample_rate, + q=q, + frequency=frequency) + start_time = current_position / sample_rate + end_time = (current_position + window_size) / sample_rate + if thdn > thdn_threshold: + channel_result.append({ + 'thd+n': thdn, + 'start_time': start_time, + 'end_time': end_time + }) + current_position += step_size + results.append(channel_result) + return results + + +def trim_audio(audio_file: str, + duration_sec: float, + start_time_sec: float = 0.0) -> str: + """Trims an audio file with a specific start time and duration. + + Generates a output file and its name is such as below format: + `<input file name>_<start time sec>-<duration sec>.<input file type>` + + Args: + audio_file: string, an audio file to be trimed. + duration_sec: float, the duration of the output file in seconds. + start_time_sec: float, the start time of the audio file to be trimmed in + seconds. Default value is 0.0 second if not specified. + + Returns: + String, the output file of the same path of the origin file. + """ + file_path, file_name = os.path.split(audio_file) + file_name, file_ext = os.path.splitext(file_name) + output_file_name = '%s_%s-%s%s' % ( + file_name, + start_time_sec, + (start_time_sec + duration_sec), + file_ext) + output_file = os.path.join(file_path, output_file_name) + processor = audio_processor.AudioProcessor() + processor.TrimAudio( + input_file=audio_file, + output_file=output_file, + duration=duration_sec, + start=start_time_sec) + return output_file diff --git a/system/blueberry/utils/bt_constants.py b/system/blueberry/utils/bt_constants.py new file mode 100644 index 0000000000..6a94e83edd --- /dev/null +++ b/system/blueberry/utils/bt_constants.py @@ -0,0 +1,191 @@ +# Lint as: python3 +"""Constants used for bluetooth test.""" + +import enum + + +### Generic Constants Begin ### +BT_DEFAULT_TIMEOUT_SECONDS = 15 +DEFAULT_RFCOMM_TIMEOUT_MS = 10000 +CALL_STATE_IDLE = 0 +CALL_STATE_RINGING = 1 +CALL_STATE_OFFHOOK = 2 +CALL_STATE_TIMEOUT_SEC = 30 +NAP_CONNECTION_TIMEOUT_SECS = 20 + +# Call log types. +INCOMING_CALL_LOG_TYPE = '1' +OUTGOING_CALL_LOG_TYPE = '2' +MISSED_CALL_LOG_TYPE = '3' + +# Passthrough Commands sent to the RPC Server. +CMD_MEDIA_PLAY = 'play' +CMD_MEDIA_PAUSE = 'pause' +CMD_MEDIA_SKIP_NEXT = 'skipNext' +CMD_MEDIA_SKIP_PREV = 'skipPrev' + +# Events dispatched from the RPC Server. +EVENT_PLAY_RECEIVED = 'playReceived' +EVENT_PAUSE_RECEIVED = 'pauseReceived' +EVENT_SKIP_NEXT_RECEIVED = 'skipNextReceived' +EVENT_SKIP_PREV_RECEIVED = 'skipPrevReceived' + +# A playback state indicating the media session is currently paused. +STATE_PAUSED = 2 +STATE_PLAYING = 3 + +# File path +RAMDUMP_PATH = 'data/vendor/ssrdump' + +# UiAutoHelper package name. +UIAUTO_HELPER_PACKAGE_NAME = 'com.google.android.uiautohelper' + +# Test Runner for Android instrumentation test. +ANDROIDX_TEST_RUNNER = 'androidx.test.runner.AndroidJUnitRunner' + +# Wifi hotspot setting +WIFI_HOTSPOT_2_4G = {'SSID': 'pqmBT', 'password': 'password', 'apBand': 0} + + +class AvrcpEvent(enum.Enum): + """Enumeration of AVRCP event types.""" + PLAY = 'State:NOT_PLAYING->PLAYING' + PAUSE = 'State:PLAYING->NOT_PLAYING' + TRACK_PREVIOUS = 'sendMediaKeyEvent: keyEvent=76' + TRACK_NEXT = 'sendMediaKeyEvent: keyEvent=75' + +# Bluetooth RFCOMM UUIDs as defined by the SIG +BT_RFCOMM_UUIDS = { + 'default_uuid': '457807c0-4897-11df-9879-0800200c9a66', + 'base_uuid': '00000000-0000-1000-8000-00805F9B34FB', + 'sdp': '00000001-0000-1000-8000-00805F9B34FB', + 'udp': '00000002-0000-1000-8000-00805F9B34FB', + 'rfcomm': '00000003-0000-1000-8000-00805F9B34FB', + 'tcp': '00000004-0000-1000-8000-00805F9B34FB', + 'tcs_bin': '00000005-0000-1000-8000-00805F9B34FB', + 'tcs_at': '00000006-0000-1000-8000-00805F9B34FB', + 'att': '00000007-0000-1000-8000-00805F9B34FB', + 'obex': '00000008-0000-1000-8000-00805F9B34FB', + 'ip': '00000009-0000-1000-8000-00805F9B34FB', + 'ftp': '0000000A-0000-1000-8000-00805F9B34FB', + 'http': '0000000C-0000-1000-8000-00805F9B34FB', + 'wsp': '0000000E-0000-1000-8000-00805F9B34FB', + 'bnep': '0000000F-0000-1000-8000-00805F9B34FB', + 'upnp': '00000010-0000-1000-8000-00805F9B34FB', + 'hidp': '00000011-0000-1000-8000-00805F9B34FB', + 'hardcopy_control_channel': '00000012-0000-1000-8000-00805F9B34FB', + 'hardcopy_data_channel': '00000014-0000-1000-8000-00805F9B34FB', + 'hardcopy_notification': '00000016-0000-1000-8000-00805F9B34FB', + 'avctp': '00000017-0000-1000-8000-00805F9B34FB', + 'avdtp': '00000019-0000-1000-8000-00805F9B34FB', + 'cmtp': '0000001B-0000-1000-8000-00805F9B34FB', + 'mcap_control_channel': '0000001E-0000-1000-8000-00805F9B34FB', + 'mcap_data_channel': '0000001F-0000-1000-8000-00805F9B34FB', + 'l2cap': '00000100-0000-1000-8000-00805F9B34FB' +} + + +class BluetoothAccessLevel(enum.IntEnum): + """Enum class for bluetooth profile access levels.""" + ACCESS_ALLOWED = 1 + ACCESS_DENIED = 2 + + +class BluetoothProfile(enum.IntEnum): + """Enum class for bluetooth profile types. + + Should be kept in sync with + //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java + """ + + HEADSET = 1 + A2DP = 2 + HEALTH = 3 + HID_HOST = 4 + PAN = 5 + PBAP = 6 + GATT = 7 + GATT_SERVER = 8 + MAP = 9 + SAP = 10 + A2DP_SINK = 11 + AVRCP_CONTROLLER = 12 + AVRCP = 13 + HEADSET_CLIENT = 16 + PBAP_CLIENT = 17 + MAP_MCE = 18 + HID_DEVICE = 19 + OPP = 20 + HEARING_AID = 21 + + +class BluetoothConnectionPolicy(enum.IntEnum): + """Enum class for bluetooth bluetooth connection policy. + + bluetooth connection policy as defined in + //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java + """ + CONNECTION_POLICY_UNKNOWN = -1 + CONNECTION_POLICY_FORBIDDEN = 0 + CONNECTION_POLICY_ALLOWED = 100 + + +class BluetoothConnectionStatus(enum.IntEnum): + """Enum class for bluetooth connection status. + + Bluetooth connection status as defined in + //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java + """ + STATE_DISCONNECTED = 0 + STATE_CONNECTING = 1 + STATE_CONNECTED = 2 + STATE_DISCONNECTING = 3 + + +class BluetoothPriorityLevel(enum.IntEnum): + """Enum class for bluetooth priority level. + + Priority levels as defined in + //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java + """ + + PRIORITY_AUTO_CONNECT = 1000 + PRIORITY_ON = 100 + PRIORITY_OFF = 0 + PRIORITY_UNDEFINED = -1 + + +class BleAdvertiseSettingsMode(enum.IntEnum): + """Enum class for BLE advertise settings mode.""" + LOW_POWER = 0 + BALANCED = 1 + LOW_LATENCY = 2 + + +class BleAdvertiseSettingsTxPower(enum.IntEnum): + """Enum class for BLE advertise settings tx power.""" + ULTRA_LOW = 0 + LOW = 1 + MEDIUM = 2 + HIGH = 3 + + +class LogType(enum.Enum): + """Enumeration of device log type.""" + DEFAULT_VALUE = 'GENERIC' + BLUETOOTH_DEVICE_SIMULATOR = 'BDS' + ICLEVER_HB01 = 'GENERIC' + + +class CallState(enum.IntEnum): + """Enum class for phone call state.""" + IDLE = 0 + RINGING = 1 + OFFHOOK = 2 + + +class CallLogType(enum.IntEnum): + """Enum class for phone call log type.""" + INCOMING_CALL = 1 + OUTGOING_CALL = 2 + MISSED_CALL = 3 diff --git a/system/blueberry/utils/bt_test_utils.py b/system/blueberry/utils/bt_test_utils.py new file mode 100644 index 0000000000..e9d6ec0422 --- /dev/null +++ b/system/blueberry/utils/bt_test_utils.py @@ -0,0 +1,200 @@ +# Lint as: python3 +"""Utils for blue tooth tests. + +Partly ported from acts/framework/acts/test_utils/bt/bt_test_utils.py +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging as log +import os +import random +import string +import time +import wave + + +def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params): + """Converts raw pcm data into wave file. + + Args: + pcm_file_path: File path of origin pcm file. + wave_file_path: File path of converted wave file. + audio_params: A dict with audio configuration. + """ + with open(pcm_file_path, 'rb') as pcm_file: + frames = pcm_file.read() + write_record_file(wave_file_path, audio_params, frames) + + +def create_vcf_from_vcard(output_path: str, + num_of_contacts: int, + first_name: str = None, + last_name: str = None, + phone_number: int = None) -> str: + """Creates a vcf file from vCard. + + Args: + output_path: Path of the output vcf file. + num_of_contacts: Number of contacts to be generated. + first_name: First name of the contacts. + last_name: Last name of the contacts. + phone_number: Phone number of the contacts. + + Returns: + vcf_file_path: Path of the output vcf file. E.g. + "/<output_path>/contacts_<time>.vcf". + """ + file_name = f'contacts_{int(time.time())}.vcf' + vcf_file_path = os.path.join(output_path, file_name) + with open(vcf_file_path, 'w+') as f: + for i in range(num_of_contacts): + lines = [] + if first_name is None: + first_name = 'Person' + vcard_last_name = last_name + if last_name is None: + vcard_last_name = i + vcard_phone_number = phone_number + if phone_number is None: + vcard_phone_number = random.randrange(int(10e10)) + lines.append('BEGIN:VCARD\n') + lines.append('VERSION:2.1\n') + lines.append(f'N:{vcard_last_name};{first_name};;;\n') + lines.append(f'FN:{first_name} {vcard_last_name}\n') + lines.append(f'TEL;CELL:{vcard_phone_number}\n') + lines.append(f'EMAIL;PREF:{first_name}{vcard_last_name}@gmail.com\n') + lines.append('END:VCARD\n') + f.write(''.join(lines)) + return vcf_file_path + + +def generate_id_by_size(size, + chars=(string.ascii_lowercase + string.ascii_uppercase + + string.digits)): + """Generate random ascii characters of input size and input char types. + + Args: + size: Input size of string. + chars: (Optional) Chars to use in generating a random string. + + Returns: + String of random input chars at the input size. + """ + return ''.join(random.choice(chars) for _ in range(size)) + + +def get_duration_seconds(wav_file_path): + """Get duration of most recently recorded file. + + Args: + wav_file_path: path of the wave file. + + Returns: + duration (float): duration of recorded file in seconds. + """ + f = wave.open(wav_file_path, 'r') + frames = f.getnframes() + rate = f.getframerate() + duration = (frames / float(rate)) + f.close() + return duration + + +def wait_until(timeout_sec, + condition_func, + func_args, + expected_value, + exception=None, + interval_sec=0.5): + """Waits until a function returns a expected value or timeout is reached. + + Example usage: + ``` + def is_bluetooth_enabled(device) -> bool: + do something and return something... + + # Waits and checks if Bluetooth is turned on. + bt_test_utils.wait_until( + timeout_sec=10, + condition_func=is_bluetooth_enabled, + func_args=[dut], + expected_value=True, + exception=signals.TestFailure('Failed to turn on Bluetooth.'), + interval_sec=1) + ``` + + Args: + timeout_sec: float, max waiting time in seconds. + condition_func: function, when the condiction function returns the expected + value, the waiting mechanism will be interrupted. + func_args: tuple or list, the arguments for the condition function. + expected_value: a expected value that the condition function returns. + exception: Exception, an exception will be raised when timed out if needed. + interval_sec: float, interval time between calls of the condition function + in seconds. + + Returns: + True if the function returns the expected value else False. + """ + start_time = time.time() + end_time = start_time + timeout_sec + while time.time() < end_time: + if condition_func(*func_args) == expected_value: + return True + time.sleep(interval_sec) + args_string = ', '.join(list(map(str, func_args))) + log.warning('Timed out after %.1fs waiting for "%s(%s)" to be "%s".', + timeout_sec, condition_func.__name__, args_string, expected_value) + if exception: + raise exception + return False + + +def write_read_verify_data_sl4a(client_ad, server_ad, msg, binary=False): + """Verify that the client wrote data to the server Android device correctly. + + Args: + client_ad: the Android device to perform the write. + server_ad: the Android device to read the data written. + msg: the message to write. + binary: if the msg arg is binary or not. + + Returns: + True if the data written matches the data read, false if not. + """ + client_ad.log.info('Write message %s.', msg) + if binary: + client_ad.sl4a.bluetoothSocketConnWriteBinary(msg) + else: + client_ad.sl4a.bluetoothSocketConnWrite(msg) + server_ad.log.info('Read message %s.', msg) + if binary: + read_msg = server_ad.sl4a.bluetoothSocketConnReadBinary().rstrip('\r\n') + else: + read_msg = server_ad.sl4a.bluetoothSocketConnRead() + log.info('Verify message.') + if msg != read_msg: + log.error('Mismatch! Read: %s, Expected: %s', read_msg, msg) + return False + log.info('Matched! Read: %s, Expected: %s', read_msg, msg) + return True + + +def write_record_file(file_name, audio_params, frames): + """Writes the recorded audio into the file. + + Args: + file_name: The file name for writing the recorded audio. + audio_params: A dict with audio configuration. + frames: Recorded audio frames. + """ + log.debug('writing frame to %s', file_name) + wf = wave.open(file_name, 'wb') + wf.setnchannels(audio_params['channel']) + wf.setsampwidth(audio_params.get('sample_width', 1)) + wf.setframerate(audio_params['sample_rate']) + wf.writeframes(frames) + wf.close() diff --git a/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py b/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py new file mode 100644 index 0000000000..3a186b8785 --- /dev/null +++ b/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py @@ -0,0 +1,248 @@ +"""Command-line test runner script for running Bluetooth tests. + +This module allows users to initiate Bluetooth test targets and run them against +specified DUTs (devices-under-test) using a simple command line interface. +""" + +from __future__ import absolute_import +from __future__ import division + +from __future__ import print_function + +import base64 + +from absl import app +from absl import flags +from absl import logging + +# Internal import +# Internal import +# Internal import +# Internal import +# Internal import + +FLAGS = flags.FLAGS + +flags.DEFINE_multi_string('bt_test', None, 'Bluetooth test to run.') +flags.DEFINE_multi_string('bt_dut', None, + 'Bluetooth device to allocate for tests.') + + +# Valid config keys for the --bt_test command line flag. +BT_TEST_CONFIG_KEYS = {'target'} + +# Valid config keys for the --bt_dut (device-under-test) command line flag. +BT_DUT_CONFIG_KEYS = {'hardware'} + +TEST_FLAGS = ('--notest_loasd --test_output=streamed ' + '--test_arg=--param_dut_config=%s') + + +class Error(Exception): + """Base class for module exceptions.""" + pass + + +class TestConfigError(Error): + """Raised when --bt_test config flags are specified incorrectly.""" + pass + + +class DutConfigError(Error): + """Raised when --bt_dut config flags are specified incorrectly.""" + pass + + +def validate_bt_test_flags(flag_value): + """Validates the format of specified --bt_test flags. + + Args: + flag_value: string, the config flag value for a given --bt_test flag. + + Returns: + bool, True if --bt_test flags have been specified correctly. + """ + if not flag_value: + logging.error('No tests specified! Please specify at least one ' + 'test using the --bt_test flag.') + return False + for test in flag_value: + config_args = test.split(',') + for config_arg in config_args: + if config_arg.split('=')[0] not in BT_TEST_CONFIG_KEYS: + logging.error('--bt_test config key "%s" is invalid!', + config_arg.split('=')[0]) + return False + return True + + +def validate_bt_dut_flags(flag_value): + """Validates the format of specified --bt_dut flags. + + Args: + flag_value: string, the config flag value for a given --bt_dut flag. + + Returns: + bool, True if --bt_dut flags have been specified correctly. + """ + if not flag_value: + logging.error('No DUTs specified! Please specify at least one ' + 'DUT using the --bt_dut flag.') + return False + for dut in flag_value: + config_args = dut.split(',') + for config_arg in config_args: + if config_arg.split('=')[0] not in BT_DUT_CONFIG_KEYS: + logging.error('--bt_dut config key "%s" is invalid!', + config_arg.split('=')[0]) + return False + return True + +flags.register_validator( + 'bt_test', validate_bt_test_flags, + ('Invalid --bt_test configuration specified!' + ' Valid configuration fields include: %s') + % BT_TEST_CONFIG_KEYS) + + +flags.register_validator( + 'bt_dut', validate_bt_dut_flags, + ('Invalid --bt_dut configuration specified!' + ' Valid configuration fields include: %s') + % BT_DUT_CONFIG_KEYS) + + +def parse_flag_value(flag_value): + """Parses a config flag value string into a dict. + + Example input: 'target=//tests:bluetooth_pairing_test' + Example output: {'target': '//tests:bluetooth_pairing_test'} + + Args: + flag_value: string, the config flag value for a given flag. + + Returns: + dict, A dict object representation of a config flag value. + """ + config_dict = {} + config_args = flag_value.split(',') + for config_arg in config_args: + config_dict[config_arg.split('=')[0]] = config_arg.split('=')[1] + return config_dict + + +def get_device_type(gateway_stub, dut_config_dict): + """Determines a device type based on a device query. + + Args: + gateway_stub: An RPC2 stub object. + dut_config_dict: dict, A dict of device config args. + + Returns: + string, The MobileHarness device type. + + Raises: + DutConfigError: If --bt_dut flag(s) are incorrectly specified. + """ + device_query_filter = device_query_pb2.DeviceQueryFilter() + device_query_filter.type_regex.append('AndroidRealDevice') + for dut_config_key in dut_config_dict: + dimension_filter = device_query_filter.dimension_filter.add() + dimension_filter.name = dut_config_key + dimension_filter.value_regex = dut_config_dict[dut_config_key] + request = gateway_service_pb2.QueryDeviceRequest( + device_query_filter=device_query_filter) + response = gateway_stub.QueryDevice(request) + if response.device_query_result.device_info: + return 'AndroidRealDevice' + + device_query_filter.ClearField('type_regex') + device_query_filter.type_regex.append('TestbedDevice') + request = gateway_service_pb2.QueryDeviceRequest( + device_query_filter=device_query_filter) + response = gateway_stub.QueryDevice(request) + if response.device_query_result.device_info: + return 'TestbedDevice' + + raise DutConfigError('Invalid --bt_dut config specified: %s' % + dut_config_dict) + + +def generate_dut_configs(gateway_stub): + """Generates a unicode string specifying the desired DUT configurations. + + Args: + gateway_stub: An RPC2 stub object. + + Returns: + string, Unicode string specifying DUT configurations. + + Raises: + DutConfigError: If --bt_dut flag(s) are incorrectly specified. + """ + dut_list = job_config_pb2.JobConfig().DeviceList() + dut_config_dict_list = [parse_flag_value(value) for value in FLAGS.bt_dut] + + for dut_config_dict in dut_config_dict_list: + dut_config_dict['pool'] = 'bluetooth-iop' + dut = job_config_pb2.JobConfig().SubDeviceSpec() + if 'hardware' not in dut_config_dict: + raise DutConfigError('Must specify hardware name for bt_dut: %s' % + dut_config_dict) + dut.type = get_device_type(gateway_stub, dut_config_dict) + for config_key in dut_config_dict: + dut.dimensions.content[config_key] = dut_config_dict[config_key] + dut_list.sub_device_spec.append(dut) + logging.info(base64.b64encode(dut_list.SerializeToString()).decode('utf-8')) + return base64.b64encode(dut_list.SerializeToString()).decode('utf-8') + + +def generate_blaze_targets(session_config, gateway_stub): + """Generates and appends blaze test targets to a MobileHarness session. + + Args: + session_config: The SessionConfig object to append blaze test targets to. + gateway_stub: An RPC2 stub object. + + Raises: + TestConfigError: If --bt_test flag(s) are incorrectly specified. + """ + test_config_dict_list = [parse_flag_value(value) for value in FLAGS.bt_test] + + for test_config_dict in test_config_dict_list: + target = setting_pb2.BlazeTarget() + if 'target' not in test_config_dict: + raise TestConfigError('Must specify a target for bt_test: %s' % + test_config_dict) + target.target_name = test_config_dict['target'] + target.test_flags = TEST_FLAGS % generate_dut_configs(gateway_stub) + session_config.blaze_target.append(target) + + +def run_session(): + """Runs a configured test session. + + Returns: + A RunSessionResponse object. + """ + session_config = setting_pb2.SessionConfig() + channel = rpcutil.GetNewChannel('blade:mobileharness-gateway') + gateway_stub = gateway_service_pb2.GatewayService.NewRPC2Stub(channel=channel) + generate_blaze_targets(session_config, gateway_stub) + request = gateway_service_pb2.RunSessionRequest() + request.session_config.CopyFrom(session_config) + response = gateway_stub.RunSession(request) + logging.info('Sponge link: %s', response.sponge) + logging.info('Session ID: %s', response.session_id) + return response + + +def main(argv): + logging.use_python_logging() + del argv + run_session() + +if __name__ == '__main__': + flags.mark_flag_as_required('bt_test') + flags.mark_flag_as_required('bt_dut') + app.run(main) diff --git a/system/blueberry/utils/metrics_utils.py b/system/blueberry/utils/metrics_utils.py new file mode 100644 index 0000000000..b62b9e921c --- /dev/null +++ b/system/blueberry/utils/metrics_utils.py @@ -0,0 +1,111 @@ +"""Metrics reporting module for Blueberry using protobuf. + +Internal reference +""" + +from __future__ import absolute_import +from __future__ import division + +from __future__ import print_function + +import base64 +import logging +import time + +# Internal import + + +class BluetoothMetricLogger(object): + """A class used for gathering metrics from tests and devices. + + This class provides methods to allow test writers to easily export metrics + from their tests as protobuf messages. + + Attributes: + _metrics: The Bluetooth test proto message to add metrics to. + """ + + def __init__(self, bluetooth_test_proto_message): + self._metrics = bluetooth_test_proto_message + self._start_time = int(time.time()) + + def add_primary_device_metrics(self, device): + """Adds primary device metrics to the test proto message. + + Args: + device: The Bluetooth device object to gather device metrics from. + """ + device_message = self._metrics.configuration_data.primary_device + message_fields = device_message.DESCRIPTOR.fields_by_name.keys() + try: + device_metrics_dict = device.get_device_info() + except AttributeError: + logging.info( + 'Must implement get_device_info method for this controller in order to upload device metrics.' + ) + return + + for metric in device_metrics_dict: + if metric in message_fields: + setattr(device_message, metric, device_metrics_dict[metric]) + else: + logging.info('%s is not a valid metric field.', metric) + + def add_connected_device_metrics(self, device): + """Adds connected device metrics to the test proto message. + + Args: + device: The Bluetooth device object to gather device metrics from. + """ + device_message = self._metrics.configuration_data.connected_device + message_fields = device_message.DESCRIPTOR.fields_by_name.keys() + try: + device_metrics_dict = device.get_device_info() + except AttributeError: + logging.info( + 'Must implement get_device_info method for this controller in order to upload device metrics.' + ) + return + + for metric in device_metrics_dict: + if metric in message_fields: + setattr(device_message, metric, device_metrics_dict[metric]) + else: + logging.warning('%s is not a valid metric field.', metric) + + def add_test_metrics(self, test_metrics_dict): + """Adds test metrics to the test proto message. + + Args: + test_metrics_dict: A dictionary of metrics to add to the test proto + message. Metric will only be added if the key exists as a field in the + test proto message. + """ + if hasattr(self._metrics, 'configuration_data'): + self._metrics.configuration_data.test_date_time = self._start_time + message_fields = self._metrics.DESCRIPTOR.fields_by_name.keys() + for metric in test_metrics_dict: + if metric in message_fields: + metric_value = test_metrics_dict[metric] + if isinstance(metric_value, (list, tuple)): + getattr(self._metrics, metric).extend(metric_value) + else: + setattr(self._metrics, metric, metric_value) + else: + logging.warning('%s is not a valid metric field.', metric) + + def proto_message_to_base64(self): + """Converts a proto message to a base64 string. + + Returns: + string, Message formatted as a base64 string. + """ + return base64.b64encode(self._metrics.SerializeToString()).decode('utf-8') + + def proto_message_to_ascii(self): + """Converts a proto message to an ASCII string. + + Returns: + string, Message formatted as an ASCII string. Useful for debugging. + """ + return text_format.MessageToString(self._metrics) |