diff options
author | 2021-12-14 15:48:00 -0800 | |
---|---|---|
committer | 2021-12-15 02:39:37 +0000 | |
commit | 7849fe5e7abcab932580813e0a39a360daed7e44 (patch) | |
tree | fe05742bdc2691b4473b277d7300205b9f686a64 | |
parent | 7efccf61ee8e7cd6b8908bb6db0a932c7e69efb8 (diff) |
Project import generated by Copybara.
Migrate basic coarse level e2e Blueberry tests.
Test: Manual test
PiperOrigin-RevId: 416413362
Change-Id: I97989892034864836a39c758936fb31d6caea763
40 files changed, 3497 insertions, 811 deletions
diff --git a/system/blueberry/controllers/android_bt_target_device.py b/system/blueberry/controllers/android_bt_target_device.py index 9e7aebf752..d1b14412e9 100644 --- a/system/blueberry/controllers/android_bt_target_device.py +++ b/system/blueberry/controllers/android_bt_target_device.py @@ -22,23 +22,27 @@ The config for this derived_bt_target_device in mobileharness is: import logging import os import time +from typing import Any, Dict, Optional from mobly import asserts -from mobly.controllers.android_device import AndroidDevice -from mobly.signals import ControllerError +from mobly import signals +from mobly.controllers import android_device + # Internal import +from blueberry.utils import android_bluetooth_decorator from blueberry.utils import bt_constants -from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator -import blueberry.utils.bt_test_utils as btutils +from blueberry.utils import bt_test_utils as btutils + +_CONNECTION_STATE = bt_constants.BluetoothConnectionStatus -ADB_FILE = "rec.pcm" -ADB_PATH = "/sdcard/Music/" -WAVE_FILE_TEMPLATE = "recorded_audio_%s.wav" +ADB_FILE = 'rec.pcm' +ADB_PATH = '/sdcard/Music/' +WAVE_FILE_TEMPLATE = 'recorded_audio_%s.wav' DEFAULT_WAIT_TIME = 3.0 # A MediaBrowserService implemented in the SL4A app to intercept Media keys and # commands. -BLUETOOTH_SL4A_AUDIO_SRC_MBS = "BluetoothSL4AAudioSrcMBS" +BLUETOOTH_SL4A_AUDIO_SRC_MBS = 'BluetoothSL4AAudioSrcMBS' A2DP_HFP_PROFILES = [ bt_constants.BluetoothProfile.A2DP_SINK, @@ -53,19 +57,21 @@ class AndroidBtTargetDevice(object): hfp and a2dp sink device. """ - def __init__(self, config): + def __init__(self, config: Dict[str, Any]) -> None: """Initializes an android hfp device.""" - logging.info("Initializes the android hfp device") + logging.info('Initializes the android hfp device') + self.config = config self.pri_ad = None self.sec_ad = None - self.serial = config.get("device_id", None) - self.audio_params = config.get("audio_params", None) + self.serial = config.get('device_id', None) + self.audio_params = config.get('audio_params', None) if self.serial: # self._ad for accessing the device at the end of the test - self._ad = AndroidDevice(self.serial) + self._ad = android_device.AndroidDevice(self.serial) self.aud = adb_ui_device.AdbUiDevice(self._ad) - self.pri_ad = AndroidBluetoothDecorator(self._ad) + self.pri_ad = android_bluetooth_decorator.AndroidBluetoothDecorator( + self._ad) self.pri_ad.init_setup() self.pri_ad.sl4a_setup() self.sl4a = self._ad.services.sl4a @@ -75,58 +81,58 @@ class AndroidBtTargetDevice(object): self._initialize_audio_params() self.avrcp_ready = False - def __getattr__(self, name): + def __getattr__(self, name: str) -> Any: return getattr(self.pri_ad, name) - def _disable_profiles(self): + def _disable_profiles(self) -> None: if self.sec_ad is None: - raise MissingBtClientDeviceError("Please provide sec_ad forsetting" - "profiles") + raise MissingBtClientDeviceError('Please provide sec_ad forsetting' + 'profiles') self.set_profiles_policy_off(self.sec_ad, A2DP_HFP_PROFILES) - def _initialize_audio_params(self): - self.audio_capture_path = os.path.join(self._ad.log_path, "audio_capture") + def _initialize_audio_params(self) -> None: + self.audio_capture_path = os.path.join(self._ad.log_path, 'audio_capture') os.makedirs(self.audio_capture_path) self.adb_path = os.path.join(ADB_PATH, ADB_FILE) self.wave_file_template = os.path.join(self.audio_capture_path, WAVE_FILE_TEMPLATE) self.wave_file_number = 0 - def _verify_pri_ad(self): + def _verify_pri_ad(self) -> None: if not self.pri_ad: - raise ControllerError("No be target device") + raise signals.ControllerError('No be target device') - def clean_up(self): + def clean_up(self) -> None: """Resets Bluetooth and stops all services when the device is destroyed.""" self.deactivate_ble_pairing_mode() self.factory_reset_bluetooth() self._ad.services.stop_all() - def a2dp_sink_connect(self): + def a2dp_sink_connect(self) -> bool: """Establishes the hft connection between self.pri_ad and self.sec_ad.""" self._verify_pri_ad() connected = self.pri_ad.a2dp_sink_connect(self.sec_ad) asserts.assert_true( - connected, "The a2dp sink connection between {} and {} failed".format( + connected, 'The a2dp sink connection between {} and {} failed'.format( self.serial, self.sec_ad.serial)) - self.log.info("The a2dp sink connection between %s and %s succeeded", + self.log.info('The a2dp sink connection between %s and %s succeeded', self.serial, self.sec_ad.serial) return True - def activate_pairing_mode(self): + def activate_pairing_mode(self) -> None: """Makes the android hfp device discoverable over Bluetooth.""" - self.log.info("Activating the pairing mode of the android target device") + self.log.info('Activating the pairing mode of the android target device') self.pri_ad.activate_pairing_mode() - def activate_ble_pairing_mode(self): + def activate_ble_pairing_mode(self) -> None: """Activates BLE pairing mode on an AndroidBtTargetDevice.""" self.pri_ad.activate_ble_pairing_mode() - def deactivate_ble_pairing_mode(self): + def deactivate_ble_pairing_mode(self) -> None: """Deactivates BLE pairing mode on an AndroidBtTargetDevice.""" self.pri_ad.deactivate_ble_pairing_mode() - def add_pri_ad_device(self, pri_ad): + def add_pri_ad_device(self, pri_ad: android_device.AndroidDevice) -> None: """Adds primary android device as bt target device. The primary android device should have been initialized with @@ -142,12 +148,12 @@ class AndroidBtTargetDevice(object): self.log = self.pri_ad.log self.serial = self.pri_ad.serial self.log.info( - "Adds primary android device with id %s for the bluetooth" - "connection", pri_ad.serial) + 'Adds primary android device with id %s for the bluetooth' + 'connection', pri_ad.serial) if self.audio_params: self._initialize_audio_params() - def add_sec_ad_device(self, sec_ad): + def add_sec_ad_device(self, sec_ad: android_device.AndroidDevice) -> None: """Adds second android device for bluetooth connection. The second android device should have sl4a service acitvated. @@ -156,65 +162,65 @@ class AndroidBtTargetDevice(object): sec_ad: the second android device for bluetooth connection. """ self.log.info( - "Adds second android device with id %s for the bluetooth" - "connection", sec_ad.serial) + 'Adds second android device with id %s for the bluetooth' + 'connection', sec_ad.serial) self.sec_ad = sec_ad self.sec_ad_mac_address = self.sec_ad.sl4a.bluetoothGetLocalAddress() - def answer_phone_call(self): + def answer_phone_call(self) -> bool: """Answers an incoming phone call.""" if not self.is_hfp_connected(): self.hfp_connect() # Make sure the device is in ringing state. if not self.wait_for_call_state( bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): - raise ControllerError( - "Timed out after %ds waiting for the device %s to be ringing state " - "before anwsering the incoming phone call." % + raise signals.ControllerError( + 'Timed out after %ds waiting for the device %s to be ringing state ' + 'before anwsering the incoming phone call.' % (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) - self.log.info("Answers the incoming phone call from hf phone %s for %s", + self.log.info('Answers the incoming phone call from hf phone %s for %s', self.mac_address, self.sec_ad_mac_address) return self.sl4a.bluetoothHfpClientAcceptCall(self.sec_ad_mac_address) - def call_volume_down(self): + def call_volume_down(self) -> None: """Lowers the volume.""" current_volume = self.mbs.getVoiceCallVolume() if current_volume > 0: change_volume = current_volume - 1 - self.log.debug("Set voice call volume from %d to %d." % + self.log.debug('Set voice call volume from %d to %d.' % (current_volume, change_volume)) self.mbs.setVoiceCallVolume(change_volume) - def call_volume_up(self): + def call_volume_up(self) -> None: """Raises the volume.""" current_volume = self.mbs.getVoiceCallVolume() if current_volume < self.mbs.getVoiceCallMaxVolume(): change_volume = current_volume + 1 - self.log.debug("Set voice call volume from %d to %d." % + self.log.debug('Set voice call volume from %d to %d.' % (current_volume, change_volume)) self.mbs.setVoiceCallVolume(change_volume) - def disconnect_all(self): + def disconnect_all(self) -> None: self._disable_profiles() - def factory_reset_bluetooth(self): + def factory_reset_bluetooth(self) -> None: """Factory resets Bluetooth on the android hfp device.""" - self.log.info("Factory resets Bluetooth on the android target device") + self.log.info('Factory resets Bluetooth on the android target device') self.pri_ad.factory_reset_bluetooth() - def get_bluetooth_mac_address(self): + def get_bluetooth_mac_address(self) -> str: """Gets Bluetooth mac address of this android_bt_device.""" - self.log.info("Getting Bluetooth mac address for AndroidBtTargetDevice.") + self.log.info('Getting Bluetooth mac address for AndroidBtTargetDevice.') mac_address = self.sl4a.bluetoothGetLocalAddress() - self.log.info("Bluetooth mac address of AndroidBtTargetDevice: %s", + self.log.info('Bluetooth mac address of AndroidBtTargetDevice: %s', mac_address) return mac_address - def get_audio_params(self): + def get_audio_params(self) -> Optional[Dict[str, str]]: """Gets audio params from the android_bt_target_device.""" return self.audio_params - def get_new_wave_file_path(self): + def get_new_wave_file_path(self) -> str: """Gets a new wave file path for the audio capture.""" wave_file_path = self.wave_file_template % self.wave_file_number while os.path.exists(wave_file_path): @@ -226,27 +232,27 @@ class AndroidBtTargetDevice(object): """Gets unread messages from the connected device (MSE).""" self.sl4a.mapGetUnreadMessages(self.sec_ad_mac_address) - def hangup_phone_call(self): + def hangup_phone_call(self) -> bool: """Hangs up an ongoing phone call.""" if not self.is_hfp_connected(): self.hfp_connect() - self.log.info("Hangs up the phone call from hf phone %s for %s", + self.log.info('Hangs up the phone call from hf phone %s for %s', self.mac_address, self.sec_ad_mac_address) return self.sl4a.bluetoothHfpClientTerminateAllCalls( self.sec_ad_mac_address) - def hfp_connect(self): + def hfp_connect(self) -> bool: """Establishes the hft connection between self.pri_ad and self.sec_ad.""" self._verify_pri_ad() connected = self.pri_ad.hfp_connect(self.sec_ad) asserts.assert_true( - connected, "The hfp connection between {} and {} failed".format( + connected, 'The hfp connection between {} and {} failed'.format( self.serial, self.sec_ad.serial)) - self.log.info("The hfp connection between %s and %s succeed", self.serial, + self.log.info('The hfp connection between %s and %s succeed', self.serial, self.sec_ad.serial) return connected - def init_ambs_for_avrcp(self): + def init_ambs_for_avrcp(self) -> bool: """Initializes media browser service for avrcp. This is required to be done before running any of the passthrough @@ -271,7 +277,7 @@ class AndroidBtTargetDevice(object): if not self.is_a2dp_sink_connected(): self.a2dp_sink_connect() - self.sec_ad.log.info("Starting AvrcpMediaBrowserService") + self.sec_ad.log.info('Starting AvrcpMediaBrowserService') self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStart() time.sleep(DEFAULT_WAIT_TIME) @@ -279,9 +285,9 @@ class AndroidBtTargetDevice(object): # Check if the media session "BluetoothSL4AAudioSrcMBS" is active on sec_ad. active_sessions = self.sec_ad.sl4a.bluetoothMediaGetActiveMediaSessions() if BLUETOOTH_SL4A_AUDIO_SRC_MBS not in active_sessions: - raise ControllerError("Failed to start AvrcpMediaBrowserService.") + raise signals.ControllerError('Failed to start AvrcpMediaBrowserService.') - self.log.info("Connecting to A2dp media browser service") + self.log.info('Connecting to A2dp media browser service') self.sl4a.bluetoothMediaConnectToCarMBS() # TODO(user) Wait for an event back instead of sleep @@ -289,52 +295,52 @@ class AndroidBtTargetDevice(object): self.avrcp_ready = True return self.avrcp_ready - def is_avrcp_ready(self): + def is_avrcp_ready(self) -> bool: """Checks if the pri_ad and sec_ad are ready for avrcp.""" self._verify_pri_ad() if self.avrcp_ready: return True active_sessions = self.sl4a.bluetoothMediaGetActiveMediaSessions() if not active_sessions: - self.log.info("The device is not avrcp ready") + self.log.info('The device is not avrcp ready') self.avrcp_ready = False else: - self.log.info("The device is avrcp ready") + self.log.info('The device is avrcp ready') self.avrcp_ready = True return self.avrcp_ready - def is_hfp_connected(self): + def is_hfp_connected(self) -> _CONNECTION_STATE: """Checks if the pri_ad and sec_ad are hfp connected.""" self._verify_pri_ad() if self.sec_ad is None: - raise MissingBtClientDeviceError("The sec_ad was not added") + raise MissingBtClientDeviceError('The sec_ad was not added') return self.sl4a.bluetoothHfpClientGetConnectionStatus( self.sec_ad_mac_address) - def is_a2dp_sink_connected(self): + def is_a2dp_sink_connected(self) -> _CONNECTION_STATE: """Checks if the pri_ad and sec_ad are hfp connected.""" self._verify_pri_ad() if self.sec_ad is None: - raise MissingBtClientDeviceError("The sec_ad was not added") + raise MissingBtClientDeviceError('The sec_ad was not added') return self.sl4a.bluetoothA2dpSinkGetConnectionStatus( self.sec_ad_mac_address) - def last_number_dial(self): + def last_number_dial(self) -> None: """Redials last outgoing phone number.""" if not self.is_hfp_connected(): self.hfp_connect() - self.log.info("Redials last number from hf phone %s for %s", + self.log.info('Redials last number from hf phone %s for %s', self.mac_address, self.sec_ad_mac_address) self.sl4a.bluetoothHfpClientDial(self.sec_ad_mac_address, None) - def map_connect(self): + def map_connect(self) -> None: """Establishes the map connection between self.pri_ad and self.sec_ad.""" self._verify_pri_ad() connected = self.pri_ad.map_connect(self.sec_ad) asserts.assert_true( - connected, "The map connection between {} and {} failed".format( + connected, 'The map connection between {} and {} failed'.format( self.serial, self.sec_ad.serial)) - self.log.info("The map connection between %s and %s succeed", self.serial, + self.log.info('The map connection between %s and %s succeed', self.serial, self.sec_ad.serial) def map_disconnect(self) -> None: @@ -349,87 +355,91 @@ class AndroidBtTargetDevice(object): 'Failed to terminate the MAP connection with the device "%s".' % self.sec_ad_mac_address) - def pbap_connect(self): + def pbap_connect(self) -> None: """Establishes the pbap connection between self.pri_ad and self.sec_ad.""" connected = self.pri_ad.pbap_connect(self.sec_ad) asserts.assert_true( - connected, "The pbap connection between {} and {} failed".format( + connected, 'The pbap connection between {} and {} failed'.format( self.serial, self.sec_ad.serial)) - self.log.info("The pbap connection between %s and %s succeed", self.serial, + self.log.info('The pbap connection between %s and %s succeed', self.serial, self.sec_ad.serial) - def pause(self): + def pause(self) -> None: """Sends Avrcp pause command.""" self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE, self.sec_ad) - def play(self): + def play(self) -> None: """Sends Avrcp play command.""" self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY, self.sec_ad) - def power_on(self): + def power_on(self) -> bool: """Turns the Bluetooth on the android bt garget device.""" - self.log.info("Turns on the bluetooth") + self.log.info('Turns on the bluetooth') return self.sl4a.bluetoothToggleState(True) - def power_off(self): + def power_off(self) -> bool: """Turns the Bluetooth off the android bt garget device.""" - self.log.info("Turns off the bluetooth") + self.log.info('Turns off the bluetooth') return self.sl4a.bluetoothToggleState(False) - def route_call_audio(self, connect=False): + def route_call_audio(self, connect: bool = False) -> None: """Routes call audio during a call.""" if not self.is_hfp_connected(): self.hfp_connect() self.log.info( - "Routes call audio during a call from hf phone %s for %s " - "audio connection %s after routing", self.mac_address, + 'Routes call audio during a call from hf phone %s for %s ' + 'audio connection %s after routing', self.mac_address, self.sec_ad_mac_address, connect) if connect: self.sl4a.bluetoothHfpClientConnectAudio(self.sec_ad_mac_address) else: self.sl4a.bluetoothHfpClientDisconnectAudio(self.sec_ad_mac_address) - def reject_phone_call(self): + def reject_phone_call(self) -> bool: """Rejects an incoming phone call.""" if not self.is_hfp_connected(): self.hfp_connect() # Make sure the device is in ringing state. if not self.wait_for_call_state( bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): - raise ControllerError( - "Timed out after %ds waiting for the device %s to be ringing state " - "before rejecting the incoming phone call." % + raise signals.ControllerError( + 'Timed out after %ds waiting for the device %s to be ringing state ' + 'before rejecting the incoming phone call.' % (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) - self.log.info("Rejects the incoming phone call from hf phone %s for %s", + self.log.info('Rejects the incoming phone call from hf phone %s for %s', self.mac_address, self.sec_ad_mac_address) return self.sl4a.bluetoothHfpClientRejectCall(self.sec_ad_mac_address) - def set_audio_params(self, audio_params): + def set_audio_params(self, audio_params: Optional[Dict[str, str]]) -> None: """Sets audio params to the android_bt_target_device.""" self.audio_params = audio_params - def track_previous(self): + def track_previous(self) -> None: """Sends Avrcp skip prev command.""" self.send_media_passthrough_cmd( bt_constants.CMD_MEDIA_SKIP_PREV, self.sec_ad) - def track_next(self): + def track_next(self) -> None: """Sends Avrcp skip next command.""" self.send_media_passthrough_cmd( bt_constants.CMD_MEDIA_SKIP_NEXT, self.sec_ad) - def start_audio_capture(self): - """Starts the audio capture over adb.""" - if self.audio_params is None: - raise MissingAudioParamsError("Missing audio params for captureing audio") + def start_audio_capture(self, duration_sec: int = 20) -> None: + """Starts the audio capture over adb. + + Args: + duration_sec: int, Number of seconds to record audio, 20 secs as default. + """ + if 'duration' in self.audio_params.keys(): + duration_sec = self.audio_params['duration'] if not self.is_a2dp_sink_connected(): self.a2dp_sink_connect() - cmd = "ap2f --usage 1 --start --duration {} --target {}".format( - self.audio_params["duration"], self.adb_path) - self.log.info("Starts capturing audio with adb shell command %s", cmd) + cmd = 'ap2f --usage 1 --start --duration {} --target {}'.format( + duration_sec, self.adb_path) + self.log.info('Starts capturing audio with adb shell command %s', cmd) self.adb.shell(cmd) - def stop_audio_capture(self): + def stop_audio_capture(self) -> str: """Stops the audio capture and stores it in wave file. Returns: @@ -439,68 +449,68 @@ class AndroidBtTargetDevice(object): MissingAudioParamsError: when self.audio_params is None """ if self.audio_params is None: - raise MissingAudioParamsError("Missing audio params for captureing audio") + raise MissingAudioParamsError('Missing audio params for capturing audio') if not self.is_a2dp_sink_connected(): self.a2dp_sink_connect() adb_pull_args = [self.adb_path, self.audio_capture_path] - self.log.info("start adb -s %s pull %s", self.serial, adb_pull_args) + self.log.info('start adb -s %s pull %s', self.serial, adb_pull_args) self._ad.adb.pull(adb_pull_args) pcm_file_path = os.path.join(self.audio_capture_path, ADB_FILE) - self.log.info("delete the recored file %s", self.adb_path) - self._ad.adb.shell("rm {}".format(self.adb_path)) + self.log.info('delete the recored file %s', self.adb_path) + self._ad.adb.shell('rm {}'.format(self.adb_path)) wave_file_path = self.get_new_wave_file_path() - self.log.info("convert pcm file %s to wav file %s", pcm_file_path, + self.log.info('convert pcm file %s to wav file %s', pcm_file_path, wave_file_path) btutils.convert_pcm_to_wav(pcm_file_path, wave_file_path, self.audio_params) return wave_file_path - def stop_all_services(self): + def stop_all_services(self) -> None: """Stops all services for the pri_ad device.""" - self.log.info("Stops all services on the android bt target device") + self.log.info('Stops all services on the android bt target device') self._ad.services.stop_all() - def stop_ambs_for_avrcp(self): + def stop_ambs_for_avrcp(self) -> None: """Stops media browser service for avrcp.""" if self.is_avrcp_ready(): - self.log.info("Stops avrcp connection") + self.log.info('Stops avrcp connection') self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStop() self.avrcp_ready = False - def stop_voice_dial(self): + def stop_voice_dial(self) -> None: """Stops voice dial.""" if not self.is_hfp_connected(): self.hfp_connect() - self.log.info("Stops voice dial from hf phone %s for %s", self.mac_address, + self.log.info('Stops voice dial from hf phone %s for %s', self.mac_address, self.sec_ad_mac_address) if self.is_hfp_connected(): self.sl4a.bluetoothHfpClientStopVoiceRecognition( self.sec_ad_mac_address) def take_bug_report(self, - test_name=None, - begin_time=None, - timeout=300, - destination=None): + test_name: Optional[str] = None, + begin_time: Optional[int] = None, + timeout: float = 300, + destination: Optional[str] = None) -> None: """Wrapper method to capture bugreport on the android bt target device.""" self._ad.take_bug_report(test_name, begin_time, timeout, destination) - def voice_dial(self): + def voice_dial(self) -> None: """Triggers voice dial.""" if not self.is_hfp_connected(): self.hfp_connect() - self.log.info("Triggers voice dial from hf phone %s for %s", + self.log.info('Triggers voice dial from hf phone %s for %s', self.mac_address, self.sec_ad_mac_address) if self.is_hfp_connected(): self.sl4a.bluetoothHfpClientStartVoiceRecognition( self.sec_ad_mac_address) - def log_type(self): + def log_type(self) -> str: """Gets the log type of Android bt target device. Returns: A string, the log type of Android bt target device. """ - return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR + return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR.value class BluetoothProfileConnectionError(Exception): diff --git a/system/blueberry/controllers/bt_stub.py b/system/blueberry/controllers/bt_stub.py index 0cd024e23b..718c28da53 100644 --- a/system/blueberry/controllers/bt_stub.py +++ b/system/blueberry/controllers/bt_stub.py @@ -9,6 +9,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +from typing import Any, List + import six @@ -20,29 +22,28 @@ class BtStub(object): """ # Connection Commands - def power_off(self): + def power_off(self) -> None: """Prompt the user to power off the Bluetooth device.""" - six.moves.input("Power Off Bluetooth device, then press enter.") + six.moves.input('Power Off Bluetooth device, then press enter.') - def power_on(self): + def power_on(self) -> None: """Prompt the user to power on the Bluetooth device.""" - six.moves.input("Power ON Bluetooth device, then press enter.") + six.moves.input('Power ON Bluetooth device, then press enter.') - def activate_pairing_mode(self): + def activate_pairing_mode(self) -> None: """Prompt the user to put the Bluetooth device into pairing mode.""" - six.moves.input("Put Bluetooth device into pairing mode," - "then press enter.") + six.moves.input('Put Bluetooth device into pairing mode, then press enter.') - def get_bluetooth_mac_address(self): + def get_bluetooth_mac_address(self) -> str: """Prompt the user to input the Bluetooth MAC address for this device. Returns: mac_address (str): the string received from user input. """ - mac_address = six.moves.input("Enter BT MAC address, then press enter.") + mac_address = six.moves.input('Enter BT MAC address, then press enter.') return mac_address - def set_device_name(self, device_name): + def set_device_name(self, device_name: str) -> None: """Prompt the user to set the device name (Carkit Only). Args: @@ -50,79 +51,79 @@ class BtStub(object): Returns: None """ - six.moves.input("Device name is: %s", device_name) + six.moves.input(f'Device name is: {device_name}') - def factory_reset_bluetooth(self): + def factory_reset_bluetooth(self) -> None: """Prompt the user to factory reset Bluetooth on the device.""" - six.moves.input("Factory reset Bluetooth on the Bluetooth device, " - "then press enter.") + six.moves.input('Factory reset Bluetooth on the Bluetooth device, ' + 'then press enter.') # A2DP: Bluetooth stereo streaming protocol methods. - def is_audio_playing(self): + def is_audio_playing(self) -> bool: """Prompt the user to indicate if the audio is playing. Returns: A Bool, true is audio is playing, false if not. """ - audio_playing = six.moves.input("Indicate if audio is playing: " - "true/false.") + audio_playing = six.moves.input('Indicate if audio is playing: ' + 'true/false.') return bool(audio_playing) # AVRCP Commands - def volume_up(self): + def volume_up(self) -> None: """Prompt the user to raise the volume on the Bluetooth device.""" - six.moves.input("Press the Volume Up Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Volume Up Button on the Bluetooth device, ' + 'then press enter.') - def volume_down(self): + def volume_down(self) -> None: """Prompt the user to lower the volume on the Bluetooth device.""" - six.moves.input("Press the Volume Down Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Volume Down Button on the Bluetooth device, ' + 'then press enter.') - def track_next(self): + def track_next(self) -> None: """Prompt the user to skip the track on the Bluetooth device.""" - six.moves.input("Press the Skip Track Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Skip Track Button on the Bluetooth device, ' + 'then press enter.') - def track_previous(self): + def track_previous(self) -> None: """Prompt the user to rewind the track on the Bluetooth device.""" - six.moves.input("Press the Rewind Track Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Rewind Track Button on the Bluetooth device, ' + 'then press enter.') - def play(self): + def play(self) -> None: """Prompt the user to press play on the Bluetooth device.""" - six.moves.input("Press the Play Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Play Button on the Bluetooth device, ' + 'then press enter.') - def pause(self): + def pause(self) -> None: """Prompt the user to press pause on the Bluetooth device.""" - six.moves.input("Press the Pause Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Pause Button on the Bluetooth device, ' + 'then press enter.') - def repeat(self): + def repeat(self) -> None: """Prompt the user to set the repeat option on the device.""" - six.moves.input("Press the Repeat Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Repeat Button on the Bluetooth device, ' + 'then press enter.') - def fast_forward(self): + def fast_forward(self) -> None: """Prompt the user to press the fast forward option/button on the device. Returns: None """ - six.moves.input("Press the Fast Forward Button on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Fast Forward Button on the Bluetooth device, ' + 'then press enter.') - def rewind(self): + def rewind(self) -> None: """Prompt the user to press Rewind option on the device. Returns: None """ - six.moves.input("Press the Rewind option on the Bluetooth device, " - "then press enter.") + six.moves.input('Press the Rewind option on the Bluetooth device, ' + 'then press enter.') # TODO(user): browse_media_content may need more work in terms of input # params and value(s) returned - def browse_media_content(self, directory=""): + def browse_media_content(self, directory: str = '') -> List[Any]: """Prompt the user to enter to the paired device media folders. Args: @@ -131,10 +132,10 @@ class BtStub(object): Returns: List - empty """ - six.moves.input("Navigate to directory: %s", directory) + six.moves.input(f'Navigate to directory: {directory}') return [] - def delete_song(self, file_path=""): + def delete_song(self, file_path: str = '') -> None: """Prompt the user to delete a song. Args: @@ -142,45 +143,45 @@ class BtStub(object): Returns: None """ - six.moves.input("Delete a song %s", file_path) + six.moves.input(f'Delete a song {file_path}') - def shuffle_song(self): + def shuffle_song(self) -> None: """Prompt the user to shuffle a playlist. Returns: None """ - six.moves.input("Shuffle a playlist") + six.moves.input('Shuffle a playlist') # HFP (Hands Free Phone protocol) Commands - def call_volume_up(self): + def call_volume_up(self) -> None: """Prompt the user to press the volume up button on an active call. Returns: None """ - six.moves.input("Press the volume up button for an active call.") + six.moves.input('Press the volume up button for an active call.') - def call_volume_down(self): + def call_volume_down(self) -> None: """Prompt the user to press the volume down button on an active call. Returns: None """ - six.moves.input("Press the volume down button for an active call.") + six.moves.input('Press the volume down button for an active call.') - def answer_phone_call(self): + def answer_phone_call(self) -> None: """Prompt the user to press the button to answer a phone call.. Returns: None """ - six.moves.input("Press the button to answer the phone call.") + six.moves.input('Press the button to answer the phone call.') - def hangup_phone_call(self): + def hangup_phone_call(self) -> None: """Prompt the user to press the button to hang up on an active phone call. Returns: None """ - six.moves.input("Press the button to hang up on the phone call.") + six.moves.input('Press the button to hang up on the phone call.') - def call_contact(self, name): + def call_contact(self, name: str) -> None: """Prompt the user to select a contact from the phonebook and call. Args: @@ -188,9 +189,9 @@ class BtStub(object): Returns: None """ - six.moves.input("Select contact, %s, to call.", name) + six.moves.input(f'Select contact, {name}, to call.') - def call_number(self, phone_number): + def call_number(self, phone_number: str) -> None: """Prompt the user to dial a phone number and call. Args: @@ -198,9 +199,9 @@ class BtStub(object): Returns: None """ - six.moves.input("Dial phone number and initiate a call. %s", phone_number) + six.moves.input(f'Dial phone number and initiate a call. {phone_number}') - def swap_call(self): + def swap_call(self) -> None: """Prompt the user to push the button to swap. Function swaps between the primary and secondary calls. One call will @@ -208,9 +209,9 @@ class BtStub(object): Returns: None """ - six.moves.input("Press the button to swap calls.") + six.moves.input('Press the button to swap calls.') - def merge_call(self): + def merge_call(self) -> None: """Prompt the user to push the button to merge calls. Merges calls between the primary and secondary calls into a conference @@ -218,77 +219,77 @@ class BtStub(object): Returns: None """ - six.moves.input("Press the button to merge calls into a conference call.") + six.moves.input('Press the button to merge calls into a conference call.') - def hold_call(self): + def hold_call(self) -> None: """Prompt the user to put the primary call on hold. Primary call will be on hold, while the secondary call becomes active. Returns: None """ - six.moves.input("Press the hold button to put primary call on hold.") + six.moves.input('Press the hold button to put primary call on hold.') - def mute_call(self): + def mute_call(self) -> None: """Prompt the user to mute the ongoing active call. Returns: None """ - six.moves.input("Press Mute button on active call.") + six.moves.input('Press Mute button on active call.') - def unmute_call(self): + def unmute_call(self) -> None: """Prompt the user to unmute the ongoing active call. Returns: None """ - six.moves.input("Press the Unmute button on an active call.") + six.moves.input('Press the Unmute button on an active call.') - def reject_phone_call(self): + def reject_phone_call(self) -> None: """Prompt the user to reject an incoming call. Returns: None """ - six.moves.input("Press the Reject button to reject an incoming call.") + six.moves.input('Press the Reject button to reject an incoming call.') - def answer_voip_call(self): + def answer_voip_call(self) -> None: """Prompt the user to press the button to answer a VOIP call. Returns: None """ - six.moves.input("Press the Answer button on an incoming VOIP phone call.") + six.moves.input('Press the Answer button on an incoming VOIP phone call.') - def hangup_voip_call(self): + def hangup_voip_call(self) -> None: """Prompt the user to press the button to hangup on the active VOIP call. Returns: None """ - six.moves.input("Press the hangup button on the active VOIP call.") + six.moves.input('Press the hangup button on the active VOIP call.') - def reject_voip_call(self): + def reject_voip_call(self) -> None: """Prompt the user to press the Reject button on the incoming VOIP call. Returns: None """ - six.moves.input("Press the Reject button on the incoming VOIP call.") + six.moves.input('Press the Reject button on the incoming VOIP call.') - def voice_dial(self): + def voice_dial(self) -> None: """Prompt user to initiate a voice dial from the phone. Returns: None """ - six.moves.input("Initiate a voice dial.") + six.moves.input('Initiate a voice dial.') - def last_number_dial(self): + def last_number_dial(self) -> None: """Prompt user to iniate a call to the last number dialed. Returns: None """ - six.moves.input("Initiate a call to the last number dialed.") + six.moves.input('Initiate a call to the last number dialed.') # TODO(user): does this method need a input parameter? - def route_call_audio(self): + def route_call_audio(self) -> None: """Prompt user to route a call from AG to HF, and vice versa. Returns: None """ - six.moves.input("Reroute call audio.") + six.moves.input('Reroute call audio.') diff --git a/system/blueberry/controllers/derived_bt_device.py b/system/blueberry/controllers/derived_bt_device.py index 37fafc1c64..29408cd402 100644 --- a/system/blueberry/controllers/derived_bt_device.py +++ b/system/blueberry/controllers/derived_bt_device.py @@ -27,15 +27,18 @@ from __future__ import print_function import importlib import logging +from typing import Any, Dict, List, Sequence + import yaml -MOBLY_CONTROLLER_CONFIG_NAME = "DerivedBtDevice" -MOBLY_CONTROLLER_CONFIG_MODULE_KEY = "ModuleName" -MOBLY_CONTROLLER_CONFIG_CLASS_KEY = "ClassName" -MOBLY_CONTROLLER_CONFIG_PARAMS_KEY = "Params" + +MOBLY_CONTROLLER_CONFIG_NAME = 'DerivedBtDevice' +MOBLY_CONTROLLER_CONFIG_MODULE_KEY = 'ModuleName' +MOBLY_CONTROLLER_CONFIG_CLASS_KEY = 'ClassName' +MOBLY_CONTROLLER_CONFIG_PARAMS_KEY = 'Params' -def create(configs): +def create(configs: List[Dict[str, Any]]) -> List[Any]: """Creates DerivedBtDevice controller objects. For each config dict in configs: @@ -55,25 +58,24 @@ def create(configs): return [_create_bt_device_class(config) for config in configs] -def _create_bt_device_class(config): +def _create_bt_device_class(config: Dict[str, Any]) -> Any: """Created new device class from associated device controller from config.""" module = importlib.import_module( - "blueberry.controllers.%s" % + 'blueberry.controllers.%s' % config[MOBLY_CONTROLLER_CONFIG_MODULE_KEY]) - logging.info("Creating DerivedBtDevice from %r", config) + logging.info('Creating DerivedBtDevice from %r', config) cls = getattr(module, config[MOBLY_CONTROLLER_CONFIG_CLASS_KEY]) - params = yaml.safe_load("%s" % + params = yaml.safe_load('%s' % config.get(MOBLY_CONTROLLER_CONFIG_PARAMS_KEY, {})) new_class = type(MOBLY_CONTROLLER_CONFIG_NAME, (cls, BtDevice), params) - new_class_inst = new_class(**params) - return new_class_inst + return new_class(**params) -def destroy(derived_bt_devices): +def destroy(derived_bt_devices: Sequence[Any])-> None: """Cleans up DerivedBtDevice objects.""" for device in derived_bt_devices: # Execute cleanup if the controller class has the method "clean_up". - if hasattr(device, "clean_up"): + if hasattr(device, 'clean_up'): device.clean_up() del derived_bt_devices @@ -84,14 +86,14 @@ class BtDevice(object): Provides additional necessary functionality for use within blueberry. """ - def __init__(self): + def __init__(self) -> None: """Initializes a derived bt base class.""" self._user_params = {} - def setup(self): + def setup(self) -> None: """For devices that need extra setup.""" - def set_user_params(self, params): + def set_user_params(self, params: Dict[str, str]) -> None: """Intended for passing mobly user_params into a derived device class. Args: @@ -99,7 +101,7 @@ class BtDevice(object): """ self._user_params = params - def get_user_params(self): + def get_user_params(self) -> Dict[str, str]: """Return saved user_params. Returns: @@ -114,3 +116,7 @@ class BtDevice(object): def activate_pairing_mode(self) -> None: """Activates pairing mode on an AndroidDevice.""" raise NotImplementedError + + def get_bluetooth_mac_address(self) -> None: + """Get bluetooth mac address of an BT Device.""" + pass diff --git a/system/blueberry/controllers/grpc_bt_sync_mock.py b/system/blueberry/controllers/grpc_bt_sync_mock.py index d3e00e8e78..226bbf283c 100644 --- a/system/blueberry/controllers/grpc_bt_sync_mock.py +++ b/system/blueberry/controllers/grpc_bt_sync_mock.py @@ -15,7 +15,9 @@ Example MH testbed config for Hostside: dimensions: device: GrpcBtSyncStub """ + import subprocess +from typing import Any, Dict, Optional, Tuple from absl import flags from absl import logging @@ -25,6 +27,7 @@ import grpc from blueberry.grpc.proto import blueberry_device_controller_pb2 from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc + FLAGS = flags.FLAGS flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address') @@ -32,19 +35,22 @@ flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address') class GrpcBtSyncMock(object): """Generic GRPC device controller.""" - def __init__(self, config): + def __init__(self, config: Dict[str, str]) -> None: """Initialize GRPC object.""" super(GrpcBtSyncMock, self).__init__() + self.config = config self.mac_address = config['mac_address'] - def __del__(self): + def __del__(self) -> None: + # pytype: disable=attribute-error self.server_proc.terminate() del self.channel_creds del self.channel del self.stub - def setup(self): + def setup(self) -> None: """Setup the gRPC server that the sync mock will respond to.""" + # pytype: disable=attribute-error server_path = self.get_user_params()['mh_files']['grpc_server'][0] logging.info('Start gRPC server: %s', server_path) self.server_proc = subprocess.Popen([server_path], @@ -60,17 +66,19 @@ class GrpcBtSyncMock(object): self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub( self.channel) - def init_setup(self): + def init_setup(self) -> None: logging.info('init setup TO BE IMPLEMENTED') - def set_target(self, bt_device): + def set_target(self, bt_device: Any) -> None: self._target_device = bt_device - def pair_and_connect_bluetooth(self, target_mac_address): + def pair_and_connect_bluetooth( + self, target_mac_address: str) -> Optional[Tuple[float, float]]: """Pair and connect to a peripheral Bluetooth device.""" request = blueberry_device_controller_pb2.TargetMacAddress( mac_address=target_mac_address) try: + # pytype: disable=attribute-error response = self.stub.PairAndConnectBluetooth(request) logging.info('pair and connect bluetooth response: %s', response) if response.error: diff --git a/system/blueberry/controllers/grpc_bt_target_mock.py b/system/blueberry/controllers/grpc_bt_target_mock.py index 85f6b516ec..6b4e53338e 100644 --- a/system/blueberry/controllers/grpc_bt_target_mock.py +++ b/system/blueberry/controllers/grpc_bt_target_mock.py @@ -15,7 +15,9 @@ Example MH testbed config for Hostside: dimensions: device: GrpcBtTargetStub """ + import subprocess +from typing import Dict from absl import flags from absl import logging @@ -25,25 +27,29 @@ import grpc from blueberry.grpc.proto import blueberry_device_controller_pb2 from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc + FLAGS = flags.FLAGS class GrpcBtTargetMock(object): """BT Mock Target for testing the GRPC interface.""" - def __init__(self, config): + def __init__(self, config: Dict[str, str]) -> None: """Initialize GRPC object.""" super(GrpcBtTargetMock, self).__init__() + self.config = config self.mac_address = config['mac_address'] - def __del__(self): + def __del__(self) -> None: + # pytype: disable=attribute-error self.server_proc.terminate() del self.channel_creds del self.channel del self.stub - def setup(self): + def setup(self) -> None: """Setup the gRPC server that the target mock will respond to.""" + # pytype: disable=attribute-error server_path = self.get_user_params()['mh_files']['grpc_server'][0] logging.info('Start gRPC server: %s', server_path) self.server_proc = subprocess.Popen([server_path], @@ -59,10 +65,12 @@ class GrpcBtTargetMock(object): self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub( self.channel) - def activate_pairing_mode(self): + def activate_pairing_mode(self) -> int: + """Activates Bluetooth pairing mode.""" logging.info('activate pairing mode TO BE IMPLEMENTED') request = blueberry_device_controller_pb2.DiscoverableMode(mode=True) try: + # pytype: disable=attribute-error response = self.stub.SetDiscoverableMode(request) logging.info('set discoverageble response: %s', response) return 0 @@ -70,9 +78,9 @@ class GrpcBtTargetMock(object): print(rpc_error) return -1 - def factory_reset_bluetooth(self): + def factory_reset_bluetooth(self) -> None: logging.info('factory reset TO BE IMPLEMENTED') - def get_bluetooth_mac_address(self): + def get_bluetooth_mac_address(self) -> str: logging.info('mac_address: %s', self.mac_address) return self.mac_address diff --git a/system/blueberry/decorators/android_bluetooth_client_decorator.py b/system/blueberry/decorators/android_bluetooth_client_decorator.py index f9834ea00f..00156f91bd 100644 --- a/system/blueberry/decorators/android_bluetooth_client_decorator.py +++ b/system/blueberry/decorators/android_bluetooth_client_decorator.py @@ -6,7 +6,6 @@ the blueberry/decorators directory. from __future__ import absolute_import from __future__ import division - from __future__ import print_function import importlib diff --git a/system/blueberry/decorators/android_bluetooth_client_test_decorator.py b/system/blueberry/decorators/android_bluetooth_client_test_decorator.py index 769579e6fd..47516a5d55 100644 --- a/system/blueberry/decorators/android_bluetooth_client_test_decorator.py +++ b/system/blueberry/decorators/android_bluetooth_client_test_decorator.py @@ -3,7 +3,6 @@ from __future__ import absolute_import from __future__ import division - from __future__ import print_function from mobly.controllers.android_device import AndroidDevice diff --git a/system/blueberry/decorators/fitbit_app_decorator.py b/system/blueberry/decorators/fitbit_app_decorator.py new file mode 100644 index 0000000000..f21e3c2d4b --- /dev/null +++ b/system/blueberry/decorators/fitbit_app_decorator.py @@ -0,0 +1,250 @@ +"""Android Device decorator to control functionality of the Fitbit companion App.""" +import logging +import time +from typing import Any, Dict, Tuple + +import immutabledict # pylint: disable=no-name-in-module,import-error +from mobly import asserts +from mobly import signals +from mobly.controllers import android_device + +from blueberry.controllers import derived_bt_device +# Internal import +# Internal import +from blueberry.utils.ui_pages import fitbit_companion # pylint: disable=no-name-in-module,import-error +from blueberry.utils.ui_pages.fitbit_companion import account_pages # pylint: disable=no-name-in-module,import-error +from blueberry.utils.ui_pages.fitbit_companion import context # pylint: disable=no-name-in-module,import-error +from blueberry.utils.ui_pages.fitbit_companion import other_pages # pylint: disable=no-name-in-module,import-error +from blueberry.utils.ui_pages.fitbit_companion import pairing_pages # pylint: disable=no-name-in-module,import-error + +_FITBIT_PACKAGE_NAME = 'com.fitbit.FitbitMobile' +_LOG_PREFIX_MESSAGE = 'Fitbit Companion App' +_DEBUG_PREFIX_TEMPLATE = f'[{_LOG_PREFIX_MESSAGE}|{{tag}}] {{msg}}' +_MODEL_TO_PRODUCT_NAME_MAPPING = immutabledict.immutabledict({ + 'Buzz': 'Luxe', +}) +_INVALID_PAIRING_CODE_MESSAGE = "Sorry, this code isn't valid." +_MAX_PAIRING_RETRIES = 10 + + +class FitbitAppDecorator: + """Decorates Android Device with the Fitbit Companion App's operations. + + Attributes: + ui_context: The UI context of Fitbit companion App. + """ + + def __init__(self, ad: android_device.AndroidDevice): # pylint: disable=super-init-not-called + self._ad = ad + self._target_device = None + self.ui_context = fitbit_companion.get_context( + self._ad, do_go_home=False, safe_get=True) + + if not apk_utils.is_apk_installed(self._ad, _FITBIT_PACKAGE_NAME): + # Fitbit App is not installed, install it now. + self.ui_context.log.info('Installing Fitbit App...') + fitbit_companion.go_google_play_page(self.ui_context) + self.ui_context.expect_page(other_pages.GooglePlayPage) + self.ui_context.regr_page_call(other_pages.GoogleSmartLockSavePage, 'no') + self.ui_context.page.install() + self.ui_context.expect_page(other_pages.LoginPage) + fitbit_app_account = self._ad._user_params.get('fitbit_app_account', + 'test') + self.ui_context.log.info('Login Fitbit App with account=%s...', + fitbit_app_account) + self.ui_context.page.login( + fitbit_app_account, + self._ad._user_params.get('fitbit_app_password', 'test')) + self.ui_context.expect_page(context.HomePage) + + def __getattr__(self, name): + return getattr(self._ad, name) + + def set_target(self, bt_device: derived_bt_device.BtDevice) -> None: + """Allows for use to get target device object for target interaction. + + Args: + bt_device: The testing target. + """ + self._target_device = bt_device + + def pair_and_connect_bluetooth(self, mac_address: str) -> None: + """Pairs and connects Android device with Fitbit device. + + Args: + mac_address: MAC address of the Fitbit device to be paired with. + + Raises: + signals.TestError: Fail in pairing and connection process. + AssertionError: Fail in evaluation after pairing. + """ + log = FitbitCompanionAppLoggerAdapter(logging.getLogger(), + {'tag': mac_address}) + fitbit_device = self._target_device + target_device_mac_address = fitbit_device.get_bluetooth_mac_address() + if target_device_mac_address != mac_address: + raise ValueError( + (f'Target BT device has MAC address={target_device_mac_address}', + f'which is different than given MAC address={mac_address} !')) + + self.ui_context.regr_page_call(other_pages.LinkConfirmPage, 'ok') + self.ui_context.regr_page_call(other_pages.PurchaseFail, 'ok') + self.ui_context.regr_page_call(pairing_pages.PremiumPage, 'done') + self.ui_context.regr_page_call(other_pages.PurchaseFail, 'ok') + self.ui_context.regr_page_call(other_pages.LocationPermissionSync, 'enable') + self.ui_context.regr_page_call(pairing_pages.UpdateDevicePage, + 'update_later') + + log.debug('Start the pair-pin subscription...') + try: + fitbit_device._device.bt.pair_pin_start() + except fitbit_tracker_cli.CliError as err: + if err and 'Already subscribed on pubsub' in err.output[0]: + log.warning('Fitbit device already subscribed on pubsub!') + else: + raise err + + log.info('Entering account page...') + self.ui_context.go_page(account_pages.AccountPage) + + log.info('Removed all paired device(s) before testing...') + removed_count = fitbit_companion.remove_all_paired_devices(self.ui_context) + + log.info('Total %d device(s) being removed!', removed_count) + + fitbit_prod_name = _MODEL_TO_PRODUCT_NAME_MAPPING[fitbit_device.model] + log.info('Pairing with %s...', fitbit_prod_name) + + def _eval_existence_of_fitbit_product_name(node, name=fitbit_prod_name): + return name in node.text + + self.ui_context.page.add_device() + self.ui_context.expect_page( + pairing_pages.ChooseTrackerPage, + node_eval=_eval_existence_of_fitbit_product_name) + self.ui_context.page.select_device(fitbit_prod_name) + self.ui_context.page.confirm() + + log.info('Accept pairing privacy requirement...') + self.ui_context.expect_page(pairing_pages.PairPrivacyConfirmPage) + self.ui_context.page.accept() + self.ui_context.expect_page(pairing_pages.ConfirmChargePage) + self.ui_context.page.next() + if self.ui_context.is_page(other_pages.LocationDisabledPage): + # Optional page when you are required to enable location + # permission for Fitbit device. + log.info('Enabling location permission...') + self.ui_context.page.enable() + self.ui_context.expect_page(other_pages.SettingLocation) + self.ui_context.page.set(True) + self.ui_context.page.back() + + # TODO(user): Move pairing logic into fitbit_companion package while + # it may be used in many places. + self.ui_context.expect_page(pairing_pages.Pairing4DigitPage, wait_sec=150) + pins = fitbit_device._device.bt.pair_pin_show() + log.info('Pairing pins=%s...', pins) + self.ui_context.page.input_pins(pins) + pair_retry = 0 + while (self.ui_context.is_page(pairing_pages.Pairing4DigitPage) and + self.ui_context.page.get_node_by_func( + lambda n: _INVALID_PAIRING_CODE_MESSAGE in n.text) is not None): + pair_retry += 1 + if pair_retry >= _MAX_PAIRING_RETRIES: + raise signals.TestError( + f'Failed in pairing pins matching after {pair_retry} tries!') + pins = fitbit_device._device.bt.pair_pin_show() + log.warning('Retrying on pairing pins=%s...', pins) + self.ui_context.page.input_pins(pins) + time.sleep(1) + + pair_retry = 0 + while True: + self.ui_context.expect_pages([ + pairing_pages.PairRetryPage, + pairing_pages.PairAndLinkPage, + pairing_pages.PairingIntroPage, + pairing_pages.PairingConfirmPage, + pairing_pages.PairingIntroPage, + pairing_pages.CancelPairPage, + pairing_pages.CancelPair2Page, + other_pages.AllowNotification, + ], + wait_sec=90) + if self.ui_context.is_page(pairing_pages.PairingConfirmPage): + log.info('Accept pairing confirm page...') + self.ui_context.page.confirm() + elif self.ui_context.is_page(pairing_pages.PairRetryPage): + log.warning('Skip pair retry page...') + self.ui_context.back() + elif self.ui_context.is_page(pairing_pages.PairAndLinkPage): + log.warning('Skip pair and link page...') + self.ui_context.page.cancel() + elif self.ui_context.is_page(pairing_pages.CancelPairPage): + log.warning('Skip cancel pair page...') + self.ui_context.page.yes() + elif self.ui_context.is_page(other_pages.AllowNotification): + log.warning('Allow notification page...') + self.ui_context.page.allow() + elif self.ui_context.is_page(pairing_pages.PairingIntroPage): + log.info('Passing through Fitbit introduction pages...') + break + + pair_retry += 1 + if pair_retry >= _MAX_PAIRING_RETRIES: + raise signals.TestError( + f'Failed in pairing process after {pair_retry} tries!') + + self.ui_context.expect_page(pairing_pages.PairingIntroPage) + while self.ui_context.is_page(pairing_pages.PairingIntroPage): + self.ui_context.page.next() + + self.ui_context.expect_pages([ + pairing_pages.PremiumPage, other_pages.PurchaseFail, + account_pages.AccountPage + ]) + + if self.ui_context.is_page(pairing_pages.PremiumPage): + # Preminum page is optional. + self.ui_context.page.done() + elif self.ui_context.is_page(other_pages.PurchaseFail): + # Optional page observed during manual pairing experiment. + self.ui_context.page.ok() + + log.info('Completed pairing process and start evaluation process...') + if self.ui_context.is_page(account_pages.AccountPage): + paired_device_nodes = self.ui_context.page.get_paired_devices() + asserts.assert_true( + len(paired_device_nodes) == 1, + f'Unexpected paired device nodes={paired_device_nodes}', + ) + asserts.assert_true( + paired_device_nodes[0].text == fitbit_prod_name, + f'Unexpected paired device nodes={paired_device_nodes}', + ) + else: + raise signals.TestError('Failed in evaluation of Fitbit pairing result!') + + log.info('Stop the pair-pin subscription...') + fitbit_device._device.bt.pair_pin_stop() + log.info('Pairing and connection with %s(%s) is all done!', + fitbit_prod_name, mac_address) + + +class FitbitCompanionAppLoggerAdapter(logging.LoggerAdapter): + """A wrapper class that adds a prefix to each log line. + + Usage: + .. code-block:: python + my_log = FitbitCompanionAppLoggerAdapter(logging.getLogger(), { + 'tag': <custom tag> + }) + + Then each log line added by my_log will have a prefix + '[Fitbit Companion App|<tag>]' + """ + + def process(self, msg: str, kwargs: Dict[Any, + Any]) -> Tuple[str, Dict[Any, Any]]: + new_msg = _DEBUG_PREFIX_TEMPLATE.format(tag=self.extra['tag'], msg=msg) + return (new_msg, kwargs) diff --git a/system/blueberry/grpc/proto/blueberry_device_controller.proto b/system/blueberry/grpc/proto/blueberry_device_controller.proto index d3f39063aa..1bef4cc64f 100644 --- a/system/blueberry/grpc/proto/blueberry_device_controller.proto +++ b/system/blueberry/grpc/proto/blueberry_device_controller.proto @@ -3,7 +3,6 @@ syntax = "proto3"; package wearables.qa.blueberry.grpc; option java_multiple_files = true; -option jspb_use_correct_proto2_semantics = true; message DiscoverableMode { bool mode = 1; // True to set discoverable on, False to set discoverable off. diff --git a/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py b/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py deleted file mode 100644 index 9b578be27a..0000000000 --- a/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py +++ /dev/null @@ -1,221 +0,0 @@ -# Lint as: python3 -"""Tests for Bluetooth A2DP audio streamming.""" - -import time - -from mobly import asserts -from mobly import test_runner -from mobly import signals - -from blueberry.controllers import android_bt_target_device -from blueberry.utils import blueberry_base_test -from blueberry.utils import bt_audio_utils -from blueberry.utils import bt_constants -from blueberry.utils import bt_test_utils - -# Number of seconds for A2DP audio check. -A2DP_AUDIO_CHECK_TIMEOUT_SEC = 3 - -# Number of seconds for duration of reference audio. -REFERENCE_AUDIO_DURATION_SEC = 1.0 - -# Default threshold of the mean opinion score (MOS). -MOS_THRESHOLD = 4.5 - -# The audio parameters for creating a sine wave. -AUDIO_PARAMS = { - 'file_path': '/sdcard/Music', - 'frequency': 480, - 'channel': 2, - 'sample_rate': 48000, - 'sample_format': 16, - 'duration_sec': 60 -} - - -class BluetoothA2dpTest(blueberry_base_test.BlueberryBaseTest): - """Test class for Bluetooth A2DP test. - - This test uses a fixed frequency sine wave to be the reference audio, makes a - DUT play this audio and then starts audio capture from a connected Bluetooth - sink device, measures mean opinion score (MOS) of the recorded audio and - compares the MOS with a threshold to detemine the test result. - """ - - def setup_class(self): - """Standard Mobly setup class.""" - super(BluetoothA2dpTest, self).setup_class() - - # Threshold of the MOS to determine a test result. Default value is 4.5. - self.threshold = float(self.user_params.get('mos_threshold', MOS_THRESHOLD)) - - self.phone = self.android_devices[0] - self.phone.init_setup() - self.phone.sl4a_setup() - - # Generates a sine wave to be reference audio in comparison, and push it to - # the phone storage. - self.audio_file_on_device, self.audio_file_on_host = ( - bt_audio_utils.generate_sine_wave_to_device( - self.phone, - AUDIO_PARAMS['file_path'], - AUDIO_PARAMS['frequency'], - AUDIO_PARAMS['channel'], - AUDIO_PARAMS['sample_rate'], - AUDIO_PARAMS['sample_format'], - AUDIO_PARAMS['duration_sec']) - ) - - # Trims the audio to 1 second duration for reference. - self.reference_audio_file = bt_audio_utils.trim_audio( - audio_file=self.audio_file_on_host, - duration_sec=REFERENCE_AUDIO_DURATION_SEC) - - self.derived_bt_device = self.derived_bt_devices[0] - self.derived_bt_device.factory_reset_bluetooth() - self.derived_bt_device.activate_pairing_mode() - self.mac_address = self.derived_bt_device.get_bluetooth_mac_address() - self.phone.pair_and_connect_bluetooth(self.mac_address) - - # Sleep until the connection stabilizes. - time.sleep(3) - - # Adds the phone to be the secondary device in the android-to-android case. - if isinstance(self.derived_bt_device, - android_bt_target_device.AndroidBtTargetDevice): - self.derived_bt_device.add_sec_ad_device(self.phone) - - def assert_a2dp_expected_status(self, is_playing, fail_msg): - """Asserts that A2DP audio is in the expected status. - - Args: - is_playing: bool, True if A2DP audio is playing as expected. - fail_msg: string, a message explaining the details of test failure. - """ - bt_test_utils.wait_until( - timeout_sec=A2DP_AUDIO_CHECK_TIMEOUT_SEC, - condition_func=self.phone.mbs.btIsA2dpPlaying, - func_args=[self.mac_address], - expected_value=is_playing, - exception=signals.TestFailure(fail_msg)) - - def test_play_a2dp_audio(self): - """Test for playing A2DP audio through Bluetooth.""" - - # Plays media audio from the phone. - audio_file_url = 'file://' + self.audio_file_on_device - if not self.phone.sl4a.mediaPlayOpen(audio_file_url): - raise signals.TestError( - 'Failed to open and play "%s" on the phone "%s".' % - (self.audio_file_on_device, self.phone.serial)) - self.phone.sl4a.mediaPlayStart() - - # Starts audio capture for Bluetooth audio stream. - self.derived_bt_device.start_audio_capture() - - # Stops audio capture and generates an recorded audio file. - recorded_audio_file = self.derived_bt_device.stop_audio_capture() - self.phone.sl4a.mediaPlayStop() - self.phone.sl4a.mediaPlayClose() - - # Measures MOS for the recorded audio. - mos = bt_audio_utils.measure_audio_mos(recorded_audio_file, - self.reference_audio_file) - - # Asserts that the measured MOS should be more than the threshold. - asserts.assert_true( - mos >= self.threshold, - 'MOS of the recorded audio "%.3f" is lower than the threshold "%.3f".' % - (mos, self.threshold)) - - def test_resume_a2dp_audio_after_phone_call_ended(self): - """Test for resuming A2DP audio after a phone call ended. - - Tests that A2DP audio can be paused when receiving a incoming phone call, - and resumed after this phone call ended. - """ - # Checks if two android device exist. - if len(self.android_devices) < 2: - raise signals.TestError('This test requires two android devices.') - pri_phone = self.phone - sec_phone = self.android_devices[1] - sec_phone.init_setup() - pri_number = pri_phone.dimensions.get('phone_number') - if not pri_number: - raise signals.TestError('Please set the dimension "phone_number" to the ' - 'primary phone.') - sec_number = sec_phone.dimensions.get('phone_number') - if not sec_number: - raise signals.TestError('Please set the dimension "phone_number" to the ' - 'secondary phone.') - - # Plays media audio from the phone. - audio_file_url = 'file://' + self.audio_file_on_device - if not self.phone.sl4a.mediaPlayOpen(audio_file_url): - raise signals.TestError( - 'Failed to open and play "%s" on the phone "%s".' % - (self.audio_file_on_device, self.phone.serial)) - self.phone.sl4a.mediaPlayStart() - - # Checks if A2DP audio is playing. - self.assert_a2dp_expected_status( - is_playing=True, - fail_msg='A2DP audio is not playing.') - - try: - # Makes a incoming phone call. - sec_phone.sl4a.telecomCallNumber(pri_number) - sec_phone.log.info('Made a phone call to device "%s".' % pri_phone.serial) - pri_phone.log.info('Waiting for the incoming call from device "%s"...' - % sec_phone.serial) - - is_ringing = pri_phone.wait_for_call_state( - bt_constants.CALL_STATE_RINGING, - bt_constants.CALL_STATE_TIMEOUT_SEC) - if not is_ringing: - raise signals.TestError( - 'Timed out after %ds waiting for the incoming call from device ' - '"%s".' % (bt_constants.CALL_STATE_TIMEOUT_SEC, sec_phone.serial)) - - # Checks if A2DP audio is paused. - self.assert_a2dp_expected_status( - is_playing=False, - fail_msg='A2DP audio is not paused when receiving a phone call.') - finally: - # Ends the incoming phone call. - sec_phone.sl4a.telecomEndCall() - sec_phone.log.info('Ended the phone call.') - is_idle = pri_phone.wait_for_call_state( - bt_constants.CALL_STATE_IDLE, - bt_constants.CALL_STATE_TIMEOUT_SEC) - if not is_idle: - raise signals.TestError( - 'Timed out after %ds waiting for the phone call to be ended.' % - bt_constants.CALL_STATE_TIMEOUT_SEC) - - # Checks if A2DP audio is resumed. - self.assert_a2dp_expected_status( - is_playing=True, - fail_msg='A2DP audio is not resumed when the phone call is ended.') - - # Starts audio capture for Bluetooth audio stream. - self.derived_bt_device.start_audio_capture() - - # Stops audio capture and generates an recorded audio file. - recorded_audio_file = self.derived_bt_device.stop_audio_capture() - pri_phone.sl4a.mediaPlayStop() - pri_phone.sl4a.mediaPlayClose() - - # Measures MOS for the recorded audio. - mos = bt_audio_utils.measure_audio_mos(recorded_audio_file, - self.reference_audio_file) - - # Asserts that the measured MOS should be more than the threshold. - asserts.assert_true( - mos >= self.threshold, - 'MOS of the recorded audio "%.3f" is lower than the threshold "%.3f".' % - (mos, self.threshold)) - - -if __name__ == '__main__': - test_runner.main() diff --git a/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py b/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py index 484fcd9587..78fd76ecf9 100644 --- a/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py +++ b/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py @@ -32,6 +32,13 @@ class BluetoothAvrcpTest(blueberry_base_test.BlueberryBaseTest): 4. track_next() """ + def __init__(self, configs): + super().__init__(configs) + self.derived_bt_device = None + self.pri_device = None + self.is_android_bt_target_device = None + self.tracks = None + def setup_class(self): """Standard Mobly setup class.""" super(BluetoothAvrcpTest, self).setup_class() @@ -45,6 +52,7 @@ class BluetoothAvrcpTest(blueberry_base_test.BlueberryBaseTest): if len(self.android_devices) > 1 and not self.derived_bt_devices: self.derived_bt_device = self.android_devices[1] + self.derived_bt_devices.append(self.derived_bt_device) else: self.derived_bt_device = self.derived_bt_devices[0] @@ -114,9 +122,14 @@ class BluetoothAvrcpTest(blueberry_base_test.BlueberryBaseTest): def teardown_test(self): """Teardown test for bluetooth avrcp media play test.""" super(BluetoothAvrcpTest, self).teardown_test() + # Adds 1 second waiting time to fix the NullPointerException when executing + # the following sl4a.bluetoothMediaHandleMediaCommandOnPhone method. + time.sleep(1) # Sets Playback state to Paused after a test method finishes. self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone( bt_constants.CMD_MEDIA_PAUSE) + # Buffer between tests. + time.sleep(1) def wait_for_media_info_sync(self): """Waits for sync Media information between two sides. diff --git a/system/blueberry/tests/connectivity/bluetooth_connection_test.py b/system/blueberry/tests/connectivity/bluetooth_connection_test.py index 767af38da6..3a601b457a 100644 --- a/system/blueberry/tests/connectivity/bluetooth_connection_test.py +++ b/system/blueberry/tests/connectivity/bluetooth_connection_test.py @@ -3,6 +3,8 @@ import time from mobly import test_runner +from mobly import signals +from blueberry.utils import asserts from blueberry.utils import blueberry_base_test # Connection state change sleep time in seconds. @@ -32,33 +34,34 @@ class BluetoothConnectionTest(blueberry_base_test.BlueberryBaseTest): def setup_test(self): super().setup_test() # Checks if A2DP and HSP profiles are connected. - self.wait_for_a2dp_and_hsp_connection_state(connected=True) + self.assert_a2dp_and_hsp_connection_state(connected=True) # Buffer between tests. time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC) - def wait_for_a2dp_and_hsp_connection_state(self, connected): + def assert_a2dp_and_hsp_connection_state(self, connected): """Asserts that A2DP and HSP connections are in the expected state. Args: connected: bool, True if the expected state is connected else False. """ - self.primary_device.wait_for_a2dp_connection_state(self.mac_address, - connected) - self.primary_device.wait_for_hsp_connection_state(self.mac_address, - connected) + with asserts.assert_not_raises(signals.TestError): + self.primary_device.wait_for_a2dp_connection_state(self.mac_address, + connected) + self.primary_device.wait_for_hsp_connection_state(self.mac_address, + connected) def test_disconnect_and_connect(self): """Test for DUT disconnecting and then connecting to the remote device.""" self.primary_device.log.info('Disconnecting the device "%s"...' % self.mac_address) self.primary_device.disconnect_bluetooth(self.mac_address) - self.wait_for_a2dp_and_hsp_connection_state(connected=False) + self.assert_a2dp_and_hsp_connection_state(connected=False) # Buffer time for connection state change. time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC) self.primary_device.log.info('Connecting the device "%s"...' % self.mac_address) self.primary_device.connect_bluetooth(self.mac_address) - self.wait_for_a2dp_and_hsp_connection_state(connected=True) + self.assert_a2dp_and_hsp_connection_state(connected=True) def test_reconnect_when_enabling_bluetooth(self): """Test for DUT reconnecting to the remote device when Bluetooth enabled.""" @@ -71,7 +74,7 @@ class BluetoothConnectionTest(blueberry_base_test.BlueberryBaseTest): self.primary_device.sl4a.bluetoothToggleState(True) self.primary_device.wait_for_bluetooth_toggle_state(enabled=True) self.primary_device.wait_for_connection_success(self.mac_address) - self.wait_for_a2dp_and_hsp_connection_state(connected=True) + self.assert_a2dp_and_hsp_connection_state(connected=True) def test_reconnect_when_connected_device_powered_on(self): """Test for the remote device reconnecting to DUT. @@ -83,13 +86,13 @@ class BluetoothConnectionTest(blueberry_base_test.BlueberryBaseTest): 'The connected device "%s" is being powered off...' % self.mac_address) self.derived_bt_device.power_off() self.primary_device.wait_for_disconnection_success(self.mac_address) - self.wait_for_a2dp_and_hsp_connection_state(connected=False) + self.assert_a2dp_and_hsp_connection_state(connected=False) time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC) self.derived_bt_device.power_on() self.primary_device.log.info( 'The connected device "%s" is being powered on...' % self.mac_address) self.primary_device.wait_for_connection_success(self.mac_address) - self.wait_for_a2dp_and_hsp_connection_state(connected=True) + self.assert_a2dp_and_hsp_connection_state(connected=True) if __name__ == '__main__': diff --git a/system/blueberry/tests/connectivity/bluetooth_pairing_test.py b/system/blueberry/tests/connectivity/bluetooth_pairing_test.py index f3c1770781..6bbc0efe5c 100644 --- a/system/blueberry/tests/connectivity/bluetooth_pairing_test.py +++ b/system/blueberry/tests/connectivity/bluetooth_pairing_test.py @@ -6,6 +6,8 @@ from __future__ import print_function import logging from mobly import test_runner +from mobly import signals +from blueberry.utils import asserts from blueberry.utils import blueberry_base_test @@ -34,6 +36,7 @@ class BluetoothPairingTest(blueberry_base_test.BlueberryBaseTest): # secondary phone as a derived_bt_device in order for the generic script # to work with this android phone properly. self.derived_bt_device = self.android_devices[1] + self.derived_bt_devices.append(self.derived_bt_device) self.derived_bt_device.init_setup() self.derived_bt_device.sl4a_setup() else: @@ -69,7 +72,8 @@ class BluetoothPairingTest(blueberry_base_test.BlueberryBaseTest): receiver.activate_pairing_mode() # initiate pairing from initiator initiator.set_target(receiver) - initiator.pair_and_connect_bluetooth(mac_address) + with asserts.assert_not_raises(signals.ControllerError): + initiator.pair_and_connect_bluetooth(mac_address) if self.allow_pairing_reverse and initiator != self.derived_bt_device: logging.info('===== Reversing Pairing =====') # Resets Bluetooth status for two sides. diff --git a/system/blueberry/tests/map/bluetooth_map_test.py b/system/blueberry/tests/map/bluetooth_map_test.py index 11bee4ed4d..3dc38f57ec 100644 --- a/system/blueberry/tests/map/bluetooth_map_test.py +++ b/system/blueberry/tests/map/bluetooth_map_test.py @@ -24,6 +24,13 @@ _TEXT_COUNT = 5 class BluetoothMapTest(blueberry_base_test.BlueberryBaseTest): """Test Class for Bluetooth MAP Test.""" + def __init__(self, configs): + super().__init__(configs) + self.derived_bt_device = None + self.pri_phone = None + self.pri_number = None + self.sec_phone = None + def setup_class(self): """Standard Mobly setup class.""" super().setup_class() diff --git a/system/blueberry/tests/pan/bluetooth_pan_test.py b/system/blueberry/tests/pan/bluetooth_pan_test.py index 062da8e480..43ed1e3cad 100644 --- a/system/blueberry/tests/pan/bluetooth_pan_test.py +++ b/system/blueberry/tests/pan/bluetooth_pan_test.py @@ -34,6 +34,10 @@ class BluetoothPanTest(blueberry_base_test.BlueberryBaseTest): to as PANU(Personal Area Networking User). """ + def __init__(self, configs): + super().__init__(configs) + self.pan_connect_attempts = None + def setup_class(self): """Standard Mobly setup class.""" super(BluetoothPanTest, self).setup_class() diff --git a/system/blueberry/tests/pbap/bluetooth_pbap_test.py b/system/blueberry/tests/pbap/bluetooth_pbap_test.py index 9c7c032f61..d0184c48f8 100644 --- a/system/blueberry/tests/pbap/bluetooth_pbap_test.py +++ b/system/blueberry/tests/pbap/bluetooth_pbap_test.py @@ -11,7 +11,7 @@ from mobly import utils from mobly.controllers import android_device -from blueberry.utils import blueberry_base_test +from blueberry.utils import blueberry_ui_base_test from blueberry.utils import bt_constants from blueberry.utils import bt_test_utils @@ -34,9 +34,15 @@ PERMISSION_LIST = [ ] -class BluetoothPbapTest(blueberry_base_test.BlueberryBaseTest): +class BluetoothPbapTest(blueberry_ui_base_test.BlueberryUiBaseTest): """Test Class for Bluetooth PBAP Test.""" + def __init__(self, configs): + super().__init__(configs) + self.derived_bt_device = None + self.pri_phone = None + self.pse_mac_address = None + def setup_class(self): """Standard Mobly setup class.""" super(BluetoothPbapTest, self).setup_class() @@ -372,26 +378,19 @@ class BluetoothPbapTest(blueberry_base_test.BlueberryBaseTest): f'the incoming call from device "{secondary_phone.serial}".') try: self.derived_bt_device.aud.open_notification() - target_node_match_dict = { - 'resource_id': 'android:id/line1', - 'child': { - 'resource_id': 'android:id/title' - } - } hfp_address = primary_phone.get_bluetooth_mac_address() - target_node = self.derived_bt_device.aud( - sibling=target_node_match_dict, - text=f'Incoming call via HFP {hfp_address}') - caller_name = target_node.get_attribute_value('text') - message = (f'Caller name is incorrect. Actual: {caller_name}, ' - f'Correct: {full_name}') + if not self.derived_bt_device.aud( + text=f'Incoming call via HFP {hfp_address}').exists(): + raise signals.TestError('The incoming call was not received from ' + 'the Handsfree device side.') # Asserts that caller name of the incoming phone call is correct in the # notification bar. - asserts.assert_equal( - first=caller_name, - second=full_name, - msg=message) + asserts.assert_true( + self.derived_bt_device.aud(text=full_name).exists(), + f'Caller name is incorrect. Expectation: "{full_name}"') finally: + # Takes a screenshot for debugging. + self.derived_bt_device.take_screenshot(self.derived_bt_device.log_path) # Recovery actions. self.derived_bt_device.aud.close_notification() secondary_phone.sl4a.telecomEndCall() diff --git a/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py b/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py index faa1151c77..cf28ef65cb 100644 --- a/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py +++ b/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py @@ -46,7 +46,7 @@ class BluetoothAcceptanceSuite(mobly_g3_suite.BaseSuite): # Enable all Bluetooth logging in the first test. first_test_config = config.copy() first_test_config.user_params.update({ - 'enable_all_bluetooth_logging': 1, + 'enable_hci_snoop_logging': 1, }) for index, clazz in enumerate(TEST_CLASSES): if selected_tests: diff --git a/system/blueberry/tests/pbat/bluetooth_device_setup.py b/system/blueberry/tests/pbat/bluetooth_device_setup.py new file mode 100644 index 0000000000..fe49b7ea3c --- /dev/null +++ b/system/blueberry/tests/pbat/bluetooth_device_setup.py @@ -0,0 +1,32 @@ +"""Setup for Android Bluetooth device.""" + +from mobly import test_runner +from mobly.controllers import android_device +from mobly.controllers.android_device_lib.services import sl4a_service +from blueberry.utils import blueberry_base_test + + +class BluetoothDeviceSetup(blueberry_base_test.BlueberryBaseTest): + """A class for Bluetooth device setup. + + This is not a test, just used to do device quick setup for building a testbed. + """ + + def test_setup_device(self): + """Setup a Bluetooth device. + + Executes logging setup and checks if MBS and SL4A can be used. + """ + device = self.android_devices[0] + # Setup logging + self.set_bt_trc_level_verbose(device) + self.set_btsnooplogmode_full(device) + self.set_logger_buffer_size_16m(device) + device.reboot() + # Loads MBS and SL4A to make sure they work fine. + device.load_snippet('mbs', android_device.MBS_PACKAGE) + device.services.register('sl4a', sl4a_service.Sl4aService) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/tests/triangle/connection_switching_test.py b/system/blueberry/tests/triangle/connection_switching_test.py new file mode 100644 index 0000000000..3071788a23 --- /dev/null +++ b/system/blueberry/tests/triangle/connection_switching_test.py @@ -0,0 +1,74 @@ +"""Tests for Connection switching feature of Triangle.""" + +import logging +import time + +from mobly import test_runner +from blueberry.utils import triangle_base_test as base_test +from blueberry.utils import triangle_constants + + +class ConnectionSwitchingTest(base_test.TriangleBaseTest): + """Connection Switching Test.""" + + def setup_class(self): + """Executes Connection switching setups. + + Pairs Phone to headset and Watch, then pairs and connect Watch to Headset, + let Watch be last connected device of Headset. + """ + super().setup_class() + self.headset.power_on() + self.pair_and_connect_phone_to_headset() + self.pair_and_connect_phone_to_watch() + self.pair_and_connect_watch_to_headset() + + def setup_test(self): + """Makes sure that Headset is connected to Watch instead of Phone.""" + super().setup_test() + self.phone.disconnect_bluetooth(self.headset.mac_address) + self.watch.connect_bluetooth(self.headset.mac_address) + self.assert_headset_a2dp_connection(connected=False, device=self.phone) + self.assert_headset_hsp_connection(connected=False, device=self.phone) + + def test_trigger_connection_switching_when_headset_powered_on(self): + """Test for triggering connection switching when Headset is powered on. + + Steps: + 1. Power off Headset. + 2. Wait 1 minute. + 3. Power on Headset, and then it will be reconnect. + + Verifications: + The Headset connection is switched from Watch to Phone. + """ + logging.info('Power off Headset and wait 1 minute.') + self.headset.power_off() + time.sleep(triangle_constants.WAITING_TIME_SEC) + logging.info('Power on Headset.') + self.headset.power_on() + self.assert_headset_a2dp_connection(connected=True, device=self.phone) + self.assert_headset_hsp_connection(connected=True, device=self.phone) + + def test_trigger_connection_switching_when_phone_tethered_watch(self): + """Test for triggering connection switching when Phone is tethered to Watch. + + Steps: + 1. Disable Bluetooth on Phone. + 2. Wait 1 minute. + 3. Enable Bluetooth on Phone, and then Phone will be tethered to Watch. + + Verifications: + The Headset connection is switched from Watch to Phone. + """ + self.phone.log.info('Disable Bluetooth and wait 1 minute.') + self.phone.mbs.btDisable() + time.sleep(triangle_constants.WAITING_TIME_SEC) + self.phone.log.info('Enable Bluetooth.') + self.phone.mbs.btEnable() + self.assert_headset_a2dp_connection(connected=True, device=self.phone) + self.assert_headset_hsp_connection(connected=True, device=self.phone) + + +if __name__ == '__main__': + test_runner.main() diff --git a/system/blueberry/utils/android_bluetooth_decorator.py b/system/blueberry/utils/android_bluetooth_decorator.py index c5708971f0..b258d6b645 100644 --- a/system/blueberry/utils/android_bluetooth_decorator.py +++ b/system/blueberry/utils/android_bluetooth_decorator.py @@ -1,4 +1,3 @@ -# Lint as: python3 """AndroidBluetoothDecorator class. This decorator is used for giving an AndroidDevice Bluetooth-specific @@ -13,25 +12,19 @@ import random import re import string import time -from typing import Dict, Any, Text, Optional, Tuple, Sequence, Union -from mobly import asserts +from typing import Dict, Any, Text, Optional, Tuple, Sequence, Union, List + from mobly import logger as mobly_logger from mobly import signals from mobly import utils -from mobly.controllers.android_device import AndroidDevice +from mobly.controllers import android_device from mobly.controllers.android_device_lib import adb from mobly.controllers.android_device_lib import jsonrpc_client_base from mobly.controllers.android_device_lib.services import sl4a_service -# Internal import -from blueberry.controllers.derived_bt_device import BtDevice + +from blueberry.controllers import derived_bt_device from blueberry.utils import bt_constants from blueberry.utils import bt_test_utils -from blueberry.utils.bt_constants import AvrcpEvent -from blueberry.utils.bt_constants import BluetoothConnectionPolicy -from blueberry.utils.bt_constants import BluetoothConnectionStatus -from blueberry.utils.bt_constants import BluetoothProfile -from blueberry.utils.bt_constants import CallLogType -from blueberry.utils.bt_constants import CallState # Map for media passthrough commands and the corresponding events. @@ -67,19 +60,32 @@ PING_TIMEOUT_SEC = 60 # A URL is used to verify internet by ping request. TEST_URL = 'http://www.google.com' +# Timeout to wait for device boot success in second. +WAIT_FOR_DEVICE_TIMEOUT_SEC = 180 + + +class DeviceBootError(signals.ControllerError): + """Exception raised for Android device boot failures.""" + pass + + +class Error(Exception): + """Raised when an operation in this module fails.""" + pass + class DiscoveryError(signals.ControllerError): """Exception raised for Bluetooth device discovery failures.""" pass -class AndroidBluetoothDecorator(AndroidDevice): +class AndroidBluetoothDecorator(android_device.AndroidDevice): """Decorates an AndroidDevice with Bluetooth-specific functionality.""" - def __init__(self, ad: AndroidDevice): + def __init__(self, ad: android_device.AndroidDevice): self._ad = ad self._user_params = None - if not self._ad or not isinstance(self._ad, AndroidDevice): + if not self._ad or not isinstance(self._ad, android_device.AndroidDevice): raise TypeError('Must apply AndroidBluetoothDecorator to an ' 'AndroidDevice') self.ble_advertise_callback = None @@ -100,13 +106,13 @@ class AndroidBluetoothDecorator(AndroidDevice): """Checks if the profile is connected.""" status = None pri_ad = self._ad - if profile == BluetoothProfile.HEADSET_CLIENT: + if profile == bt_constants.BluetoothProfile.HEADSET_CLIENT: status = pri_ad.sl4a.bluetoothHfpClientGetConnectionStatus(mac_address) - elif profile == BluetoothProfile.A2DP_SINK: + elif profile == bt_constants.BluetoothProfile.A2DP_SINK: status = pri_ad.sl4a.bluetoothA2dpSinkGetConnectionStatus(mac_address) - elif profile == BluetoothProfile.PBAP_CLIENT: + elif profile == bt_constants.BluetoothProfile.PBAP_CLIENT: status = pri_ad.sl4a.bluetoothPbapClientGetConnectionStatus(mac_address) - elif profile == BluetoothProfile.MAP_MCE: + elif profile == bt_constants.BluetoothProfile.MAP_MCE: connected_devices = self._ad.sl4a.bluetoothMapClientGetConnectedDevices() return any( mac_address in device['address'] for device in connected_devices) @@ -115,7 +121,7 @@ class AndroidBluetoothDecorator(AndroidDevice): 'The connection check for profile %s is not supported ' 'yet', profile) return False - return status == BluetoothConnectionStatus.STATE_CONNECTED + return status == bt_constants.BluetoothConnectionStatus.STATE_CONNECTED def _get_bluetooth_le_state(self): """Wrapper method to help with unit testability of this class.""" @@ -229,7 +235,7 @@ class AndroidBluetoothDecorator(AndroidDevice): package_name)) return bool(result) - def connect_with_rfcomm(self, other_ad: AndroidDevice) -> bool: + def connect_with_rfcomm(self, other_ad: android_device.AndroidDevice) -> bool: """Establishes an RFCOMM connection with other android device. Connects this android device (as a client) to the other android device @@ -253,7 +259,7 @@ class AndroidBluetoothDecorator(AndroidDevice): def orchestrate_rfcomm_connection( self, - other_ad: AndroidDevice, + other_ad: android_device.AndroidDevice, accept_timeout_ms: int = bt_constants.DEFAULT_RFCOMM_TIMEOUT_MS, uuid: Optional[Text] = None) -> bool: """Sets up the RFCOMM connection to another android device. @@ -313,14 +319,22 @@ class AndroidBluetoothDecorator(AndroidDevice): Raises: DiscoveryError """ + device_start_time = self.get_device_time() start_time = time.time() + event_name = f'Discovery{mac_address}' try: - self._ad.ed.wait_for_event('Discovery%s' % mac_address, + self._ad.ed.wait_for_event(event_name, lambda x: x['data']['Status'], timeout) discovery_time = time.time() - start_time return discovery_time except queue.Empty: + # TODO(user): Remove this check when this bug is fixed. + if self.logcat_filter(device_start_time, event_name): + self._ad.log.info( + 'Actually the event "%s" was posted within %d seconds.', + event_name, timeout) + return timeout raise DiscoveryError('Failed to discover device %s after %d seconds' % (mac_address, timeout)) @@ -390,7 +404,7 @@ class AndroidBluetoothDecorator(AndroidDevice): self._ad.sl4a.bluetoothUnbond(device['Address']) self._ad.sl4a.bluetoothFactoryReset() self._wait_for_bluetooth_manager_state() - self._ad.sl4a.bluetoothToggleState(True) + self.wait_for_bluetooth_toggle_state(True) def get_device_info(self) -> Dict[str, Any]: """Gets the configuration info of an AndroidDevice. @@ -639,7 +653,7 @@ class AndroidBluetoothDecorator(AndroidDevice): def connect_with_profile( self, snd_ad_mac_address: str, - profile: BluetoothProfile) -> bool: + profile: bt_constants.BluetoothProfile) -> bool: """Connects with the profile. The connection can only be completed after the bluetooth devices are paired. @@ -655,20 +669,20 @@ class AndroidBluetoothDecorator(AndroidDevice): Returns: The profile connection succeed/fail """ - if profile == BluetoothProfile.MAP_MCE: + if profile == bt_constants.BluetoothProfile.MAP_MCE: self._ad.sl4a.bluetoothMapClientConnect(snd_ad_mac_address) - elif profile == BluetoothProfile.PBAP_CLIENT: + elif profile == bt_constants.BluetoothProfile.PBAP_CLIENT: self.set_profile_policy( snd_ad_mac_address, profile, - BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED) + bt_constants.BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED) self._ad.sl4a.bluetoothPbapClientConnect(snd_ad_mac_address) else: self.set_profile_policy( snd_ad_mac_address, profile, - BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN) + bt_constants.BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN) self.set_profile_policy( snd_ad_mac_address, profile, - BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED) + bt_constants.BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED) self._ad.sl4a.bluetoothConnectBonded(snd_ad_mac_address) time.sleep(BT_CONNECTION_WAITING_TIME_SECONDS) is_connected = self._is_profile_connected(snd_ad_mac_address, profile) @@ -678,8 +692,8 @@ class AndroidBluetoothDecorator(AndroidDevice): def connect_to_snd_with_profile( self, - snd_ad: AndroidDevice, - profile: BluetoothProfile, + snd_ad: android_device.AndroidDevice, + profile: bt_constants.BluetoothProfile, attempts: int = 5) -> bool: """Connects pri android device to snd android device with profile. @@ -734,7 +748,7 @@ class AndroidBluetoothDecorator(AndroidDevice): connected_devices = self._ad.sl4a.bluetoothA2dpGetConnectedDevices() return mac_address in [d['address'] for d in connected_devices] - def hfp_connect(self, ag_ad: AndroidDevice) -> bool: + def hfp_connect(self, ag_ad: android_device.AndroidDevice) -> bool: """Hfp connecting hf android device to ag android device. The android device should support the Headset Client profile. For example, @@ -746,10 +760,10 @@ class AndroidBluetoothDecorator(AndroidDevice): Returns: Boolean of connecting result """ - return self.connect_to_snd_with_profile(ag_ad, - BluetoothProfile.HEADSET_CLIENT) + return self.connect_to_snd_with_profile( + ag_ad, bt_constants.BluetoothProfile.HEADSET_CLIENT) - def a2dp_sink_connect(self, src_ad: AndroidDevice) -> bool: + def a2dp_sink_connect(self, src_ad: android_device.AndroidDevice) -> bool: """Connects pri android device to secondary android device. The android device should support the A2dp Sink profile. For example, the @@ -761,9 +775,10 @@ class AndroidBluetoothDecorator(AndroidDevice): Returns: Boolean of connecting result """ - return self.connect_to_snd_with_profile(src_ad, BluetoothProfile.A2DP_SINK) + return self.connect_to_snd_with_profile( + src_ad, bt_constants.BluetoothProfile.A2DP_SINK) - def map_connect(self, map_ad: AndroidDevice) -> bool: + def map_connect(self, map_ad: android_device.AndroidDevice) -> bool: """Connects primary device to secondary device via MAP MCE profile. The primary device should support the MAP MCE profile. For example, @@ -775,8 +790,8 @@ class AndroidBluetoothDecorator(AndroidDevice): Returns: Boolean of connecting result """ - return self.connect_to_snd_with_profile(map_ad, - BluetoothProfile.MAP_MCE) + return self.connect_to_snd_with_profile( + map_ad, bt_constants.BluetoothProfile.MAP_MCE) def map_disconnect(self, bluetooth_address: str) -> bool: """Disconnects a MAP MSE device with specified Bluetooth MAC address. @@ -791,10 +806,10 @@ class AndroidBluetoothDecorator(AndroidDevice): return bt_test_utils.wait_until( timeout_sec=COMMON_TIMEOUT_SECONDS, condition_func=self._is_profile_connected, - func_args=[bluetooth_address, BluetoothProfile.MAP_MCE], + func_args=[bluetooth_address, bt_constants.BluetoothProfile.MAP_MCE], expected_value=False) - def pbap_connect(self, pbap_ad: AndroidDevice) -> bool: + def pbap_connect(self, pbap_ad: android_device.AndroidDevice) -> bool: """Connects primary device to secondary device via PBAP client profile. The primary device should support the PBAP client profile. For example, @@ -806,8 +821,8 @@ class AndroidBluetoothDecorator(AndroidDevice): Returns: Boolean of connecting result """ - return self.connect_to_snd_with_profile(pbap_ad, - BluetoothProfile.PBAP_CLIENT) + return self.connect_to_snd_with_profile( + pbap_ad, bt_constants.BluetoothProfile.PBAP_CLIENT) def set_bluetooth_tethering(self, status_enabled: bool) -> None: """Sets Bluetooth tethering to be specific status. @@ -837,8 +852,8 @@ class AndroidBluetoothDecorator(AndroidDevice): def set_profile_policy( self, snd_ad_mac_address: str, - profile: BluetoothProfile, - policy: BluetoothConnectionPolicy) -> None: + profile: bt_constants.BluetoothProfile, + policy: bt_constants.BluetoothConnectionPolicy) -> None: """Sets policy of the profile car related profiles to OFF. This avoids autoconnect being triggered randomly. The use of this function @@ -853,17 +868,17 @@ class AndroidBluetoothDecorator(AndroidDevice): pri_ad_local_name = pri_ad.sl4a.bluetoothGetLocalName() pri_ad.log.info('Sets profile %s on %s for %s to policy %s', profile, pri_ad_local_name, snd_ad_mac_address, policy) - if profile == BluetoothProfile.A2DP: + if profile == bt_constants.BluetoothProfile.A2DP: pri_ad.sl4a.bluetoothA2dpSetPriority(snd_ad_mac_address, policy.value) - elif profile == BluetoothProfile.A2DP_SINK: + elif profile == bt_constants.BluetoothProfile.A2DP_SINK: pri_ad.sl4a.bluetoothA2dpSinkSetPriority(snd_ad_mac_address, policy.value) - elif profile == BluetoothProfile.HEADSET_CLIENT: + elif profile == bt_constants.BluetoothProfile.HEADSET_CLIENT: pri_ad.sl4a.bluetoothHfpClientSetPriority(snd_ad_mac_address, policy.value) - elif profile == BluetoothProfile.PBAP_CLIENT: + elif profile == bt_constants.BluetoothProfile.PBAP_CLIENT: pri_ad.sl4a.bluetoothPbapClientSetPriority(snd_ad_mac_address, policy.value) - elif profile == BluetoothProfile.HID_HOST: + elif profile == bt_constants.BluetoothProfile.HID_HOST: pri_ad.sl4a.bluetoothHidSetPriority(snd_ad_mac_address, policy.value) else: pri_ad.log.error('Profile %s not yet supported for policy settings', @@ -871,9 +886,9 @@ class AndroidBluetoothDecorator(AndroidDevice): def set_profiles_policy( self, - snd_ad: AndroidDevice, - profile_list: Sequence[BluetoothProfile], - policy: BluetoothConnectionPolicy) -> None: + snd_ad: android_device.AndroidDevice, + profile_list: Sequence[bt_constants.BluetoothProfile], + policy: bt_constants.BluetoothConnectionPolicy) -> None: """Sets the policy of said profile(s) on pri_ad for snd_ad. Args: @@ -887,8 +902,8 @@ class AndroidBluetoothDecorator(AndroidDevice): def set_profiles_policy_off( self, - snd_ad: AndroidDevice, - profile_list: Sequence[BluetoothProfile]) -> None: + snd_ad: android_device.AndroidDevice, + profile_list: Sequence[bt_constants.BluetoothProfile]) -> None: """Sets policy of the profiles to OFF. This avoids autoconnect being triggered randomly. The use of this function @@ -900,11 +915,11 @@ class AndroidBluetoothDecorator(AndroidDevice): """ self.set_profiles_policy( snd_ad, profile_list, - BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN) + bt_constants.BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN) def wait_for_call_state( self, - call_state: Union[int, CallState], + call_state: Union[int, bt_constants.CallState], timeout_sec: float, wait_interval: int = 3) -> bool: """Waits for call state of the device to be changed. @@ -921,7 +936,7 @@ class AndroidBluetoothDecorator(AndroidDevice): True if the call state has been changed else False. """ # TODO(user): Force external call to use CallState instead of int - if isinstance(call_state, CallState): + if isinstance(call_state, bt_constants.CallState): call_state = call_state.value expiration_time = time.time() + timeout_sec which_cycle = 1 @@ -936,31 +951,9 @@ class AndroidBluetoothDecorator(AndroidDevice): call_state) return False - def play_audio_file_with_google_play_music(self) -> None: - """Plays an audio file on an AndroidDevice with Google Play Music app. - - Returns: - None - """ - try: - self._ad.aud.add_watcher('LOGIN').when(text='SKIP').click(text='SKIP') - self._ad.aud.add_watcher('NETWORK').when(text='Server error').click( - text='OK') - self._ad.aud.add_watcher('MENU').when(text='Settings').click( - text='Listen Now') - except adb_ui.Error: - logging.info('The watcher has been added.') - self._ad.sl4a.appLaunch('com.google.android.music') - if self._ad.aud(text='No Music available').exists(10): - self._ad.reboot() - self._ad.sl4a.appLaunch('com.google.android.music') - self._ad.aud( - resource_id='com.google.android.music:id/li_thumbnail_frame').click() - time.sleep(6) # Wait for audio playback to reach steady state - def add_call_log( self, - call_log_type: Union[int, CallLogType], + call_log_type: Union[int, bt_constants.CallLogType], phone_number: str, call_time: int) -> None: """Add call number and time to specified log. @@ -977,7 +970,7 @@ class AndroidBluetoothDecorator(AndroidDevice): None """ # TODO(user): Force external call to use CallLogType instead of int - if isinstance(call_log_type, CallLogType): + if isinstance(call_log_type, bt_constants.CallLogType): call_log_type = call_log_type.value new_call_log = {} new_call_log['type'] = str(call_log_type) @@ -990,17 +983,21 @@ class AndroidBluetoothDecorator(AndroidDevice): Returns: An integer specifying the number of current call volume level. + + Raises: + Error: If the pattern search failed. """ cmd = 'dumpsys audio | grep "STREAM_BLUETOOTH_SCO" | tail -1' out = self._ad.adb.shell(cmd).decode() - # TODO(user): Should we handle the case that re.search(...) return None - # below? - pattern = r'(?<=SCO index:)[\d]+' - return int(re.search(pattern, out).group()) + pattern = r'(?<=SCO index:)\d+' + result = re.search(pattern, out) + if result is None: + raise Error(f'Pattern "{pattern}" search failed, dump output: {out}') + return int(result.group()) def make_phone_call( self, - callee: AndroidDevice, + callee: android_device.AndroidDevice, timeout_sec: float = 30) -> None: """Make a phone call to callee and check if callee is ringing. @@ -1046,14 +1043,16 @@ class AndroidBluetoothDecorator(AndroidDevice): raise signals.ControllerError( 'Failed to disconnect device within %d seconds.' % timeout) - def first_pair_and_connect_bluetooth(self, bt_device: BtDevice) -> None: + def first_pair_and_connect_bluetooth( + self, bt_device: Any) -> None: """Pairs and connects an AndroidDevice with a Bluetooth device. This method does factory reset bluetooth first and then pairs and connects the devices. Args: - bt_device: The peripheral Bluetooth device or an AndroidDevice. + bt_device: A device object which implements basic Bluetooth function + related methods. Returns: None @@ -1133,7 +1132,7 @@ class AndroidBluetoothDecorator(AndroidDevice): def send_media_passthrough_cmd( self, command: str, - event_receiver: Optional[AndroidDevice] = None) -> None: + event_receiver: Optional[android_device.AndroidDevice] = None) -> None: """Sends a media passthrough command. Args: @@ -1144,11 +1143,13 @@ class AndroidBluetoothDecorator(AndroidDevice): Raises: signals.ControllerError: raised if the event is not received. """ + if event_receiver is None: + event_receiver = self._ad self._ad.log.info('Sending Media Passthough: %s' % command) self._ad.sl4a.bluetoothMediaPassthrough(command) + if not event_receiver: + event_receiver = self._ad try: - if not event_receiver: - event_receiver = self._ad event_receiver.ed.pop_event(MEDIA_CMD_MAP[command], MEDIA_EVENT_TIMEOUT_SEC) except queue.Empty: @@ -1196,7 +1197,8 @@ class AndroidBluetoothDecorator(AndroidDevice): condition_func=self.get_current_playback_state, func_args=[], expected_value=expected_state, - exception=exception) + exception=exception, + interval_sec=1) def verify_current_track_changed( self, @@ -1213,11 +1215,12 @@ class AndroidBluetoothDecorator(AndroidDevice): condition_func=self.get_current_track_info, func_args=[], expected_value=expected_track, - exception=exception) + exception=exception, + interval_sec=1) def verify_avrcp_event( self, - event_name: AvrcpEvent, + event_name: bt_constants.AvrcpEvent, check_time: str, timeout_sec: float = 20) -> bool: """Verifies that an AVRCP event was received by an AndroidDevice. @@ -1305,56 +1308,6 @@ class AndroidBluetoothDecorator(AndroidDevice): time.sleep(1) # Buffer between retries. raise signals.TestError('Failed to remove google account: %s' % output) - def make_hangouts_voice_call(self, callee: AndroidDevice) -> None: - """Make Hangouts VOIP voice call. - - Args: - callee: Android Device, the android device of callee. - - Returns: - None - """ - try: - self._ad.aud.add_watcher('SETUP').when(text='SKIP').click(text='SKIP') - self._ad.aud.add_watcher('REMINDER').when(text='Got it').click( - text='Got it') - except adb_ui.Error: - # TODO(user): Need to figure out the logic here why use info in - # exception catch block instead of warning/error - logging.info('The watcher has been added.') - self._ad.sl4a.appLaunch('com.google.android.talk') - callee.sl4a.appLaunch('com.google.android.talk') - # Make voice call to callee - try: - # Click the callee icon - self._ad.aud(resource_id='com.google.android.talk:id/avatarView').click() - except adb_ui.Error: - # Press BACK key twice and re-launch Hangouts if it is not in main page - for _ in range(2): - self._ad.aud.send_key_code(4) - self._ad.sl4a.appLaunch('com.google.android.talk') - self._ad.aud(resource_id='com.google.android.talk:id/avatarView').click() - # Click the button to make a voice call - self._ad.aud(content_desc='Call').click() - # Answer by callee - if callee.aud(text='Answer').exists(5): - callee.aud(text='Answer').click() - else: - callee.aud(content_desc='Join voice call').click() - - def hang_up_hangouts_call(self) -> None: - """Hang up Hangouts VOIP voice call. - - Returns: - None - """ - # Click the in call icon to show the end call button - self._ad.aud( - resource_id='com.google.android.talk:id/in_call_main_avatar').click() - # Click the button to hang up call - self._ad.aud(content_desc='Hang up').click() - time.sleep(3) # Wait for VoIP call state to reach idle state - def detect_and_pull_ssrdump(self, ramdump_type: str = 'ramdump_bt') -> bool: """Detect and pull RAMDUMP log. @@ -1397,33 +1350,37 @@ class AndroidBluetoothDecorator(AndroidDevice): """ self._ad.adb.shell('rm -rf %s/*' % bt_constants.RAMDUMP_PATH) - def set_target(self, bt_device: BtDevice) -> None: + def set_target(self, bt_device: derived_bt_device.BtDevice) -> None: """Allows for use to get target device object for target interaction.""" self._target_device = bt_device def wait_for_hsp_connection_state(self, mac_address: str, connected: bool, + raise_error: bool = True, timeout_sec: float = 30) -> bool: """Waits for HSP connection to be in a expected state on Android device. Args: mac_address: The Bluetooth mac address of the peripheral device. connected: True if HSP connection state is connected as expected. + raise_error: Error will be raised if True. timeout_sec: Number of seconds to wait for HSP connection state change. + + Returns: + True if HSP connection state is the expected state. """ - expected_state = BluetoothConnectionStatus.STATE_DISCONNECTED + expected_state = bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED if connected: - expected_state = BluetoothConnectionStatus.STATE_CONNECTED - bt_test_utils.wait_until( + expected_state = bt_constants.BluetoothConnectionStatus.STATE_CONNECTED + msg = ('Failed to %s the device "%s" within %d seconds via HSP.' % + ('connect' if connected else 'disconnect', mac_address, timeout_sec)) + return bt_test_utils.wait_until( timeout_sec=timeout_sec, condition_func=self._ad.sl4a.bluetoothHspGetConnectionStatus, func_args=[mac_address], expected_value=expected_state, - exception=signals.TestError( - 'Failed to %s the device "%s" within %d seconds via HSP.' % - ('connect' if connected else 'disconnect', mac_address, - timeout_sec))) + exception=signals.TestError(msg) if raise_error else None) def wait_for_bluetooth_toggle_state(self, enabled: bool = True, @@ -1448,23 +1405,27 @@ class AndroidBluetoothDecorator(AndroidDevice): def wait_for_a2dp_connection_state(self, mac_address: str, connected: bool, + raise_error: bool = True, timeout_sec: float = 30) -> bool: """Waits for A2DP connection to be in a expected state on Android device. Args: mac_address: The Bluetooth mac address of the peripheral device. connected: True if A2DP connection state is connected as expected. + raise_error: Error will be raised if True. timeout_sec: Number of seconds to wait for A2DP connection state change. + + Returns: + True if A2DP connection state is in the expected state. """ - bt_test_utils.wait_until( + msg = ('Failed to %s the device "%s" within %d seconds via A2DP.' % + ('connect' if connected else 'disconnect', mac_address, timeout_sec)) + return bt_test_utils.wait_until( timeout_sec=timeout_sec, condition_func=self.is_a2dp_sink_connected, func_args=[mac_address], expected_value=connected, - exception=signals.TestError( - 'Failed to %s the device "%s" within %d seconds via A2DP.' % - ('connect' if connected else 'disconnect', mac_address, - timeout_sec))) + exception=signals.TestError(msg) if raise_error else None) def wait_for_nap_service_connection( self, @@ -1546,109 +1507,6 @@ class AndroidBluetoothDecorator(AndroidDevice): called if a test is not Wear OS use case. """ - def goto_bluetooth_device_details(self) -> None: - """Goes to bluetooth device detail page.""" - self._ad.adb.shell('am force-stop com.android.settings') - self._ad.adb.shell('am start -a android.settings.BLUETOOTH_SETTINGS') - self._ad.aud( - resource_id='com.android.settings:id/settings_button').click() - - def bluetooth_ui_forget_device(self) -> None: - """Clicks the forget device button.""" - self.goto_bluetooth_device_details() - self._ad.aud(resource_id='com.android.settings:id/button1').click() - - def bluetooth_ui_disconnect_device(self) -> None: - """Clicks the disconnect device button.""" - self.goto_bluetooth_device_details() - self._ad.aud(resource_id='com.android.settings:id/button2').click() - - def _find_bt_device_details_ui_switch(self, switch_name: str): - """Returns the UI node for a BT switch. - - Args: - switch_name: each switch button name in bluetooth connect device detail - page. switch name like 'Phone calls', 'Media audio', etc. - - Returns: - adb_ui_device.XML node UI element of the BT each option switch button. - """ - switch_button_name = ('Phone calls', - 'Media audio', - 'Contact sharing', - 'Text Messages' - ) - if switch_name not in switch_button_name: - raise ValueError(f'Unknown switch name {switch_name}.') - self.goto_bluetooth_device_details() - text_node = adb_ui.wait_and_get_xml_node( - self._ad, timeout=10, text=switch_name) - text_grandparent_node = text_node.parentNode.parentNode - switch_node = adb_ui.Selector( - resource_id='android:id/switch_widget').find_node(text_grandparent_node) - return switch_node - - def get_bt_device_details_ui_switch_state(self, switch_name: str) -> bool: - """Gets bluetooth each option switch button state value. - - Args: - switch_name: each switch button name in bluetooth connect device detail - page. - - Returns: - State True or False. - """ - switch_node = self._find_bt_device_details_ui_switch(switch_name) - current_state = switch_node.attributes['checked'].value == 'true' - return current_state - - def set_bt_device_details_ui_switch_state( - self, switch_name: str, - target_state: bool) -> None: - """Sets and checks the BT each option button is the target enable state. - - Args: - switch_name: each switch button name in bluetooth connect device detail - page. - target_state: The desired state expected from the switch. If the state of - the switch already meet expectation, no action will be taken. - """ - if self.get_bt_device_details_ui_switch_state(switch_name) == target_state: - return - switch_node = self._find_bt_device_details_ui_switch(switch_name) - x, y = adb_ui.find_point_in_bounds(switch_node.attributes['bounds'].value) - self._ad.aud.click(x, y) - - def get_bt_quick_setting_switch_state(self) -> bool: - """Gets bluetooth quick settings switch button state value.""" - self._ad.open_notification() - switch_node = self._ad_aud(class_name='android.widget.Switch', index='1') - current_state = switch_node.attributes['content-desc'].value == 'Bluetooth.' - return current_state - - def assert_bt_device_details_state(self, target_state: bool) -> None: - """Asserts the Bluetooth connection state. - - Asserts the BT each option button in device detail, - BT quick setting state and BT manager service from log are at the target - state. - - Args: - target_state: BT each option button, quick setting and bluetooth manager - service target state. - - """ - for switch_name in ['Phone calls', 'Media audio']: - asserts.assert_equal( - self._ad.get_bt_device_details_ui_switch_state(switch_name), - target_state, - f'The BT Media calls switch button state is not {target_state}.') - asserts.assert_equal(self._ad.is_service_running(), target_state, - f'The BT service state is not {target_state}.') - asserts.assert_equal( - self._ad.get_bt_quick_setting_switch_state(), target_state, - f'The BT each switch button state is not {target_state}.') - def is_service_running( self, mac_address: str, @@ -1690,40 +1548,163 @@ class AndroidBluetoothDecorator(AndroidDevice): time.sleep(ADB_WAITING_TIME_SECONDS) return False - def browse_internet(self, url: str = 'www.google.com') -> None: - """Browses internet by Chrome. - - Args: - url: web address. - - Raises: - signals.TestError: raised if it failed to browse internet by Chrome. - """ - browse_url = ( - 'am start -n com.android.chrome/com.google.android.apps.chrome.Main -d' - ' %s' % url - ) - self._ad.adb.shell(browse_url) - self._ad.aud.add_watcher('Welcome').when( - text='Accept & continue').click(text='Accept & continue') - self._ad.aud.add_watcher('sync page').when( - text='No thanks').click(text='No thanks') - if self._ad.aud(text='No internet').exists(): - raise signals.TestError('No connect internet.') - def connect_wifi_from_other_device_hotspot( - self, wifi_hotspot_device: AndroidDevice) -> None: + self, wifi_hotspot_device: android_device.AndroidDevice) -> None: """Turns on 2.4G Wifi hotspot from the other android device and connect on the android device. Args: wifi_hotspot_device: Android device, turn on 2.4G Wifi hotspot. """ + wifi_hotspot_2_4g_config = bt_constants.WIFI_HOTSPOT_2_4G.copy() + if int(wifi_hotspot_device.build_info['build_version_sdk']) > 29: + wifi_hotspot_2_4g_config['apBand'] = 1 # Turn on 2.4G Wifi hotspot on the secondary phone. wifi_hotspot_device.sl4a.wifiSetWifiApConfiguration( - bt_constants.WIFI_HOTSPOT_2_4G) + wifi_hotspot_2_4g_config) wifi_hotspot_device.sl4a.connectivityStartTethering(0, False) # Connect the 2.4G Wifi on the primary phone. self._ad.mbs.wifiEnable() self._ad.mbs.wifiConnectSimple( - bt_constants.WIFI_HOTSPOT_2_4G['SSID'], - bt_constants.WIFI_HOTSPOT_2_4G['password']) + wifi_hotspot_2_4g_config['SSID'], + wifi_hotspot_2_4g_config['password']) + + def get_paired_device_supported_codecs(self, mac_address: str) -> List[str]: + """Gets the supported A2DP codecs of the paired Bluetooth device. + + Gets the supported A2DP codecs of the paired Bluetooth device from bluetooth + manager log. + + Args: + mac_address: The Bluetooth mac address of the paired Bluetooth device. + + Returns: + A list of the A2DP codecs that the paired Bluetooth device supports. + """ + if not self.is_bt_paired(mac_address): + raise signals.TestError( + f'Devices {self.serial} and {mac_address} are not paired.') + cmd = (f'dumpsys bluetooth_manager | ' + f'egrep -A12 "A2dpStateMachine for {mac_address}" | ' + f'egrep -A5 "mCodecsSelectableCapabilities"') + paired_device_selectable_codecs = self._ad.adb.shell(cmd).decode() + pattern = 'codecName:(.*),mCodecType' + return re.findall(pattern, paired_device_selectable_codecs) + + def get_current_a2dp_codec(self) -> bt_constants.BluetoothA2dpCodec: + """Gets current A2DP codec type. + + Returns: + A number representing the current A2DP codec type. + Codec type values are: + 0: SBC + 1: AAC + 2: aptX + 3: aptX HD + 4: LDAC + """ + codec_type = self._ad.sl4a.bluetoothA2dpGetCurrentCodecConfig()['codecType'] + return bt_constants.BluetoothA2dpCodec(codec_type) + + def is_variable_bit_rate_enabled(self) -> bool: + """Checks if Variable Bit Rate (VBR) support is enabled for A2DP AAC codec. + + Returns: + True if Variable Bit Rate support is enabled else False. + """ + return bt_constants.TRUE in self._ad.adb.getprop( + bt_constants.AAC_VBR_SUPPORTED_PROPERTY) + + def toggle_variable_bit_rate(self, enabled: bool = True) -> bool: + """Toggles Variable Bit Rate (VBR) support status for A2DP AAC codec. + + After calling this method, the android device needs to restart Bluetooth for + taking effect. + + If Variable Bit Rate support status is disabled, the android device will use + Constant Bit Rate (CBR). + + Args: + enabled: Enable Variable Bit Rate support if True. + + Returns: + True if the status is changed successfully else False. + """ + self._ad.adb.shell( + f'su root setprop {bt_constants.AAC_VBR_SUPPORTED_PROPERTY} ' + f'{bt_constants.TRUE if enabled else bt_constants.FALSE}') + return enabled == self.is_variable_bit_rate_enabled() + + def pair_and_connect_ble_device( + self, peripheral_ble_device: android_device.AndroidDevice) -> None: + """Pairs Android phone with BLE device. + + Initiates pairing from the phone and checks if it is bonded and connected to + the BLE device. + + Args: + peripheral_ble_device: An android device. AndroidDevice instance to pair + and connect with. + + Raises: + signals.ControllerError: raised if it failed to connect BLE device. + """ + peripheral_ble_device.activate_ble_pairing_mode() + mac_address = self.scan_and_get_ble_device_address( + peripheral_ble_device.get_device_name()) + self.pair_and_connect_bluetooth(mac_address) + + def toggle_charging(self, enabled: bool) -> None: + """Toggles charging on the device. + + Args: + enabled: Enable charging if True. + """ + set_value = '0' if enabled else '1' + config_file = bt_constants.CHARGING_CONTROL_CONFIG_DICT[ + self._ad.build_info['hardware']] + self._ad.adb.shell(f'echo {set_value} > {config_file}') + + def enable_airplane_mode(self, wait_secs=1) -> None: + """Enables airplane mode on device. + + Args: + wait_secs: float, the amount of time to wait after sending the airplane + mode broadcast. + Returns: + None + """ + self._ad.adb.shell(['settings', 'put', 'global', 'airplane_mode_on', '1']) + self._ad.adb.shell([ + 'am', 'broadcast', '-a', 'android.intent.action.AIRPLANE_MODE', '--ez', + 'state', 'true' + ]) + time.sleep(wait_secs) + + def disable_airplane_mode(self, wait_secs=1) -> None: + """Disables airplane mode on device. + + Args: + wait_secs: float, the amount of time to wait after sending the airplane + mode broadcast. + Returns: + None + """ + self._ad.adb.shell(['settings', 'put', 'global', 'airplane_mode_on', '0']) + self._ad.adb.shell([ + 'am', 'broadcast', '-a', 'android.intent.action.AIRPLANE_MODE', '--ez', + 'state', 'false' + ]) + time.sleep(wait_secs) + + def disable_verity_check(self) -> None: + """Disables Android dm verity check. + + Returns: + None + """ + if 'verity is already disabled' in str(self._ad.adb.disable_verity()): + return + self._ad.reboot() + self._ad.root_adb() + self._ad.wait_for_boot_completion() + self._ad.adb.remount() diff --git a/system/blueberry/utils/arduino_base.py b/system/blueberry/utils/arduino_base.py index 37c9146033..e84f2e6892 100644 --- a/system/blueberry/utils/arduino_base.py +++ b/system/blueberry/utils/arduino_base.py @@ -7,6 +7,7 @@ Internal link """ import time +from typing import Dict from mobly.signals import ControllerError import serial @@ -15,13 +16,15 @@ class ArduinoBase(object): """Implements an Arduino base class. Attributes: + config: A device configuration. serial: serial object, a serial object which is used to communicate with Arduino board. """ - def __init__(self, config): + def __init__(self, config: Dict[str, str]): """Initializes an Arduino base class.""" self._verify_config(config) + self.config = config self.serial = serial.Serial(config['arduino_port'], 9600) self.serial.timeout = 30 # Buffer between calling serial.Serial() and serial.Serial.write(). diff --git a/system/blueberry/utils/asserts.py b/system/blueberry/utils/asserts.py new file mode 100644 index 0000000000..e0a3530b42 --- /dev/null +++ b/system/blueberry/utils/asserts.py @@ -0,0 +1,37 @@ +"""Assertions for Blueberry package.""" + +import contextlib +from typing import Iterator, Type + +from mobly import signals + + +@contextlib.contextmanager +def assert_not_raises( + exception: Type[Exception] = Exception) -> Iterator[None]: + """Asserts that the exception is not raised. + + This assertion function is used to catch a specified exception + (or any exceptions) and raise signal.TestFailure instead. + + Usage: + ``` + with asserts.assert_not_raises(signals.TestError): + foo() + ``` + + Args: + exception: Exception to be catched. If not specify, catch any exceptions as + default. + + Yields: + A context which may raise the exception. + + Raises: + signals.TestFailure: raised when the exception is catched in the context. + """ + + try: + yield + except exception as e: # pylint: disable=broad-except + raise signals.TestFailure(e) diff --git a/system/blueberry/utils/blueberry_base_test.py b/system/blueberry/utils/blueberry_base_test.py index abe19cd323..1874f2a049 100644 --- a/system/blueberry/utils/blueberry_base_test.py +++ b/system/blueberry/utils/blueberry_base_test.py @@ -1,23 +1,18 @@ """Base test class for Blueberry.""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - +import importlib import re +from typing import Union from mobly import base_test +from mobly import records from mobly import signals from mobly.controllers import android_device from mobly.controllers.android_device_lib import adb -# Internal import -# Internal import -# Internal import -# Internal import from blueberry.controllers import derived_bt_device from blueberry.decorators import android_bluetooth_client_decorator -from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator +from blueberry.utils import android_bluetooth_decorator class BlueberryBaseTest(base_test.BaseTestClass): @@ -27,6 +22,15 @@ class BlueberryBaseTest(base_test.BaseTestClass): setup required for Bluetooth tests. """ + def __init__(self, configs): + super().__init__(configs) + self._upload_test_report = None + self.capture_bugreport_on_fail = None + self.android_devices = None + self.derived_bt_devices = None + self.ignore_device_setup_failures = None + self._test_metrics = [] + def setup_generated_tests(self): """Generates multiple the same tests for pilot run. @@ -83,6 +87,11 @@ class BlueberryBaseTest(base_test.BaseTestClass): def setup_class(self): """Setup class is called before running any tests.""" super(BlueberryBaseTest, self).setup_class() + self._upload_test_report = int(self.user_params.get( + 'upload_test_report', 0)) + # Inits Spanner Utils if need to upload the test reports to Spanner. + if self._upload_test_report: + self._init_spanner_utils() self.capture_bugreport_on_fail = int(self.user_params.get( 'capture_bugreport_on_fail', 0)) self.ignore_device_setup_failures = int(self.user_params.get( @@ -117,12 +126,14 @@ class BlueberryBaseTest(base_test.BaseTestClass): derived_device.set_user_params(self.user_params) derived_device.setup() - self.android_ui_devices = {} - # Create a dictionary of Android UI Devices + self.android_devices = [ + android_bluetooth_decorator.AndroidBluetoothDecorator(device) + for device in self.android_devices + ] + for device in self.android_devices: + device.set_user_params(self.user_params) + for device in self.android_devices: - self.android_ui_devices[device.serial] = AdbUiDevice(device) - mobly_config = {'aud': self.android_ui_devices[device.serial]} - device.load_config(mobly_config) need_restart_bluetooth = False if (self.enable_bluetooth_verbose_logging or self.enable_all_bluetooth_logging): @@ -138,10 +149,6 @@ class BlueberryBaseTest(base_test.BaseTestClass): if need_restart_bluetooth: device.log.info('Restarting Bluetooth by airplane mode...') self.restart_bluetooth_by_airplane_mode(device) - self.android_devices = [AndroidBluetoothDecorator(device) - for device in self.android_devices] - for device in self.android_devices: - device.set_user_params(self.user_params) self.client_decorators = self.user_params.get('sync_decorator', []) if self.client_decorators: @@ -161,8 +168,16 @@ class BlueberryBaseTest(base_test.BaseTestClass): num_devices] = android_bluetooth_client_decorator.decorate( self.android_devices[num_devices], decorator) + def on_pass(self, record): + """This method is called when a test passed.""" + if self._upload_test_report: + self._upload_test_report_to_spanner(record.result) + def on_fail(self, record): """This method is called when a test failure.""" + if self._upload_test_report: + self._upload_test_report_to_spanner(record.result) + # Capture bugreports on fail if enabled. if self.capture_bugreport_on_fail: devices = self.android_devices @@ -176,6 +191,50 @@ class BlueberryBaseTest(base_test.BaseTestClass): record.begin_time, destination=self.current_test_info.output_path) + def _init_spanner_utils(self) -> None: + """Imports spanner_utils and creates SpannerUtils object.""" + spanner_utils_module = importlib.import_module( + 'blueberry.utils.spanner_utils') + self._spanner_utils = spanner_utils_module.SpannerUtils( + test_class_name=self.__class__.__name__, + mh_sponge_link=self.user_params['mh_sponge_link']) + + def _upload_test_report_to_spanner( + self, + result: records.TestResultEnums) -> None: + """Uploads the test report to Spanner. + + Args: + result: Result of this test. + """ + self._spanner_utils.create_test_report_proto( + current_test_info=self.current_test_info, + primary_device=self.android_devices[0], + companion_devices=self.derived_bt_devices, + test_metrics=self._test_metrics) + self._test_metrics.clear() + test_report = self._spanner_utils.write_test_report_proto(result=result) + # Shows the test report on Sponge properties for debugging. + self.record_data({ + 'Test Name': self.current_test_info.name, + 'sponge_properties': {'test_report': test_report}, + }) + + def record_test_metric( + self, + metric_name: str, + metric_value: Union[int, float]) -> None: + """Records a test metric to Spanner. + + Args: + metric_name: Name of the metric. + metric_value: Value of the metric. + """ + if not self._upload_test_report: + return + self._test_metrics.append( + self._spanner_utils.create_metric_proto(metric_name, metric_value)) + def set_logger_buffer_size_16m(self, device): """Sets all logger sizes per log buffer to 16M.""" device.log.info('Setting all logger sizes per log buffer to 16M...') @@ -207,7 +266,7 @@ class BlueberryBaseTest(base_test.BaseTestClass): return False # Suggest to use AndroidDeviceSettingsDecorator to disable verity and then # reboot (b/140277443). - disable_verity_check(device) + device.disable_verity_check() device.adb.remount() try: device.adb.shell(r'sed -i "s/\(TRC.*=\)2/\16/g;s/#\(LoggingV=--v=\)0/\13' @@ -240,5 +299,5 @@ class BlueberryBaseTest(base_test.BaseTestClass): def restart_bluetooth_by_airplane_mode(self, device): """Restarts bluetooth by airplane mode.""" - enable_airplane_mode(device, 3) - disable_airplane_mode(device, 3) + device.enable_airplane_mode(3) + device.disable_airplane_mode(3) diff --git a/system/blueberry/utils/bt_audio_utils.py b/system/blueberry/utils/bt_audio_utils.py index a4c2e8e95b..44dbd1adef 100644 --- a/system/blueberry/utils/bt_audio_utils.py +++ b/system/blueberry/utils/bt_audio_utils.py @@ -2,13 +2,20 @@ """Utils for bluetooth audio testing.""" import logging as log +import math import os +from typing import Optional import numpy as np from scipy import signal as scipy_signal from scipy.io import wavfile # Internal import # Internal import +# Dict keys of the THD+N analysis result +_THDN_KEY = 'thd+n' +_START_TIME_KEY = 'start_time' +_END_TIME_KEY = 'end_time' + def generate_sine_wave_to_device( device, @@ -129,7 +136,7 @@ def measure_audio_thdn_per_window( thdn_threshold, step_size, window_size, - q, + q=1.0, frequency=None): """Measures Total Harmonic Distortion + Noise (THD+N) of an audio file. @@ -170,12 +177,16 @@ def measure_audio_thdn_per_window( raise ValueError('window_size shall be greater than 0.') sample_rate, wave_data = wavfile.read(audio_file) wave_data = wave_data.astype('float64') + if len(wave_data.shape) == 1: + channel_signals = (wave_data,) + else: + channel_signals = wave_data.transpose() # Collects the result for each channels. results = [] - for signal in wave_data.transpose(): + for signal in channel_signals: current_position = 0 channel_result = [] - while current_position + window_size < len(signal): + while current_position + window_size <= len(signal): window = signal[current_position:current_position + window_size] thdn = measure_thdn( signal=window, @@ -186,15 +197,65 @@ def measure_audio_thdn_per_window( end_time = (current_position + window_size) / sample_rate if thdn > thdn_threshold: channel_result.append({ - 'thd+n': thdn, - 'start_time': start_time, - 'end_time': end_time + _THDN_KEY: thdn, + _START_TIME_KEY: start_time, + _END_TIME_KEY: end_time }) current_position += step_size results.append(channel_result) return results +def get_audio_maximum_thdn( + audio_file: str, + step_size: int, + window_size: int, + q: float = 1.0, + frequency: Optional[float] = None) -> float: + """Gets maximum THD+N from each audio sample with specified window size. + + Args: + audio_file: A .wav file to be measured. + step_size: Number of samples to move the window by for each analysis. + window_size: Number of samples to analyze each time. + q: Quality factor for the notch filter. + frequency: Fundamental frequency of the signal. All other frequencies + are noise. If not specified, will be calculated using FFT. + + Returns: + Float representing the maximum THD+N for the audio file. + """ + # Gets all analysis results. + total_results = measure_audio_thdn_per_window( + audio_file=audio_file, + thdn_threshold=-1.0, + step_size=step_size, + window_size=window_size, + q=q, + frequency=frequency) + + max_thdn_result = {_THDN_KEY: -1.0, _START_TIME_KEY: -1, _END_TIME_KEY: -1} + for channel_results in total_results: + for result in channel_results: + if result[_THDN_KEY] > max_thdn_result[_THDN_KEY]: + max_thdn_result = result + + log.info('Maximum THD+N result: %s', max_thdn_result) + return max_thdn_result[_THDN_KEY] + + +def convert_thdn_percent_to_decibels(thdn_percent: float) -> float: + """Converts THD+N percentage to decibels (dB). + + Args: + thdn_percent: THD+N in percent. E.g. 0.001. + + Returns: + THD+N in decibels. + """ + return math.log(thdn_percent / 100, 10) * 20 + + def trim_audio(audio_file: str, duration_sec: float, start_time_sec: float = 0.0) -> str: diff --git a/system/blueberry/utils/bt_constants.py b/system/blueberry/utils/bt_constants.py index 6a94e83edd..e65e039ee7 100644 --- a/system/blueberry/utils/bt_constants.py +++ b/system/blueberry/utils/bt_constants.py @@ -46,6 +46,18 @@ ANDROIDX_TEST_RUNNER = 'androidx.test.runner.AndroidJUnitRunner' # Wifi hotspot setting WIFI_HOTSPOT_2_4G = {'SSID': 'pqmBT', 'password': 'password', 'apBand': 0} +# Strings representing boolean of device properties. +TRUE = 'true' +FALSE = 'false' + +# String representing a property of AAC VBR support for Android device. +AAC_VBR_SUPPORTED_PROPERTY = 'persist.bluetooth.a2dp_aac.vbr_supported' + +# Dict containing charging control config for devices. +CHARGING_CONTROL_CONFIG_DICT = { +# Internal codename +} + class AvrcpEvent(enum.Enum): """Enumeration of AVRCP event types.""" @@ -189,3 +201,12 @@ class CallLogType(enum.IntEnum): INCOMING_CALL = 1 OUTGOING_CALL = 2 MISSED_CALL = 3 + + +class BluetoothA2dpCodec(enum.IntEnum): + """Enum class for Bluetooth A2DP codec type.""" + SBC = 0 + AAC = 1 + APTX = 2 + APTX_HD = 3 + LDAC = 4 diff --git a/system/blueberry/utils/bt_test_utils.py b/system/blueberry/utils/bt_test_utils.py index e9d6ec0422..bfd184d8e0 100644 --- a/system/blueberry/utils/bt_test_utils.py +++ b/system/blueberry/utils/bt_test_utils.py @@ -13,6 +13,7 @@ import os import random import string import time +from typing import Optional import wave @@ -31,9 +32,9 @@ def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params): def create_vcf_from_vcard(output_path: str, num_of_contacts: int, - first_name: str = None, - last_name: str = None, - phone_number: int = None) -> str: + first_name: Optional[str] = None, + last_name: Optional[str] = None, + phone_number: Optional[int] = None) -> str: """Creates a vcf file from vCard. Args: diff --git a/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py b/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py index 3a186b8785..709fb69be2 100644 --- a/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py +++ b/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py @@ -6,7 +6,6 @@ specified DUTs (devices-under-test) using a simple command line interface. from __future__ import absolute_import from __future__ import division - from __future__ import print_function import base64 diff --git a/system/blueberry/utils/metrics_utils.py b/system/blueberry/utils/metrics_utils.py index b62b9e921c..6acde98acc 100644 --- a/system/blueberry/utils/metrics_utils.py +++ b/system/blueberry/utils/metrics_utils.py @@ -5,7 +5,6 @@ Internal reference from __future__ import absolute_import from __future__ import division - from __future__ import print_function import base64 diff --git a/system/blueberry/utils/triangle_constants.py b/system/blueberry/utils/triangle_constants.py new file mode 100644 index 0000000000..1149121e2a --- /dev/null +++ b/system/blueberry/utils/triangle_constants.py @@ -0,0 +1,98 @@ +"""Constants for Triangle testing.""" + +# Internal import + +FLAG_TYPE = phenotype_utils.FlagTypes + +# Waiting time to trigger connection switching. +WAITING_TIME_SEC = 60 + +NEARBY_PACKAGE = 'com.google.android.gms.nearby' +WEARABLE_PACKAGE = 'com.google.android.gms.wearable' + +SET_SCREEN_OFF_TIMEOUT_HALF_HOUR = ( + 'settings put system screen_off_timeout 1800000') + +# Phenotype flags +CONNECTION_SWITCHING_FLAGS = ( + { + 'name': 'FastPairFeature__enable_triangle_audio_switch', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'FastPairFeature__enable_wearable_service', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'fast_pair_enable_api_for_wear_os', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'FastPairFeature__enable_task_scheduler_service', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'fast_pair_manual_connect_affect_duration_millis', + 'type': FLAG_TYPE.LONG, + 'value': '60000' + } +) + +PHONE_PHENOTYPE_FLAGS = { + NEARBY_PACKAGE: + ( + { + 'name': 'fast_pair_half_sheet_wear_os', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'default_debug_mode_enabled', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + } + ) + CONNECTION_SWITCHING_FLAGS + , + WEARABLE_PACKAGE: + ( + { + 'name': 'enable_fast_pair_account_key_processing_for_phone', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + ) +} + +WATCH_PHENOTYPE_FLAGS = { + NEARBY_PACKAGE: ( + { + 'name': 'DiscovererFeature__support_wear_os', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'fast_pair_enable_wear_os_fastpair_seeker', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'default_device_notification_enabled', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'fast_pair_enable_wearable_peripheral_api', + 'type': FLAG_TYPE.BOOLEAN, + 'value': 'true' + }, + { + 'name': 'fast_pair_footprints_access_strategy', + 'type': FLAG_TYPE.STRING, + 'value': 'geller' + } + ) + CONNECTION_SWITCHING_FLAGS +} diff --git a/system/blueberry/utils/ui_pages/errors.py b/system/blueberry/utils/ui_pages/errors.py new file mode 100644 index 0000000000..24145128bf --- /dev/null +++ b/system/blueberry/utils/ui_pages/errors.py @@ -0,0 +1,45 @@ +"""Module for errors thrown from ui_pages.""" + +from typing import List, Optional +from xml.dom import minidom +from blueberry.utils.ui_pages import ui_node + + +class Error(Exception): + pass + + +class ContextError(Error): + """Context related error.""" + + def __init__(self, ctx, msg): + new_msg = f'{ctx.ad}: {msg}' + super().__init__(new_msg) + + +class UIError(Error): + """UI page related error.""" + pass + + +class UnknownPageError(Error): + """UI page error for unknown XML content. + + Attributes: + ui_xml: Parsed XML object. + clickable_nodes: List of UINode with attribute `clickable="true"` + enabled_nodes: List of UINode with attribute `enabled="true"` + all_nodes: List of all UINode + """ + + def __init__(self, + ui_xml: minidom.Document, + clickable_nodes: Optional[List[ui_node.UINode]] = None, + enabled_nodes: Optional[List[ui_node.UINode]] = None, + all_nodes: Optional[List[ui_node.UINode]] = None): + new_msg = f'Unknown ui_xml:\n{ui_xml.toxml()}\n' + self.ui_xml = ui_xml + self.enabled_nodes = enabled_nodes + self.clickable_nodes = clickable_nodes + self.all_nodes = all_nodes + super().__init__(new_msg) diff --git a/system/blueberry/utils/ui_pages/fitbit_companion/__init__.py b/system/blueberry/utils/ui_pages/fitbit_companion/__init__.py new file mode 100644 index 0000000000..dbe78a4059 --- /dev/null +++ b/system/blueberry/utils/ui_pages/fitbit_companion/__init__.py @@ -0,0 +1,108 @@ +"""Gets context of Fitbit Companion App.""" + +from mobly.controllers import android_device + +from blueberry.utils.ui_pages import ui_core +from blueberry.utils.ui_pages.fitbit_companion import account_pages +from blueberry.utils.ui_pages.fitbit_companion import constants +from blueberry.utils.ui_pages.fitbit_companion import context +from blueberry.utils.ui_pages.fitbit_companion import other_pages +from blueberry.utils.ui_pages.fitbit_companion import pairing_pages + + +def get_context(ad: android_device.AndroidDevice, + safe_get: bool = False, + do_go_home: bool = True) -> context.Context: + """Gets context of Fitbit Companion App. + + Args: + ad: The Android device where the UI pages are derived from. + safe_get: If True, use `safe_get_page` to get the page; otherwise, use + `get_page`. + do_go_home: If False the context object will stay in the App's current page. + + Returns: + Context of Fitbit Companion App. + """ + ctx = context.Context(ad, safe_get=safe_get, do_go_home=do_go_home) + ctx.known_pages.extend(( + other_pages.LoginPage, + other_pages.GooglePlayPage, + other_pages.AllowLocationPermissionConfirmPopup, + other_pages.AllowLocationPermissionPopup, + other_pages.LocationPermissionSync, + other_pages.PurchaseFail, + other_pages.AllowNotification, + other_pages.SettingLocation, + other_pages.LocationDisabledPage, + other_pages.LinkConfirmPage, + account_pages.AccountPage, + account_pages.PairedDeviceDetailPage, + account_pages.UnpairConfirmPage, + pairing_pages.PurchasePage, + pairing_pages.PairRetryPage, + pairing_pages.Pairing4DigitPage, + pairing_pages.PairingConfirmPage, + pairing_pages.PairingIntroPage, + pairing_pages.PairAndLinkPage, + pairing_pages.PremiumPage, + pairing_pages.PairPrivacyConfirmPage, + pairing_pages.CancelPairPage, + pairing_pages.CancelPair2Page, + pairing_pages.ConfirmReplaceSmartWatchPage, + pairing_pages.ConfirmChargePage, + pairing_pages.ChooseTrackerPage, + pairing_pages.ConfirmDevicePage, + pairing_pages.SkipInfoPage, + pairing_pages.UpdateDevicePage, + )) + + return ctx + + +def go_google_play_page(ctx: context.Context) -> None: + """Goes to Google play page of Fitbit companion app. + + This function will leverage adb shell command to launch Fitbit app's + Google play page by searching the package of it. Then it will confirm + the result by checking the expected page as `GooglePlayPage` by `ctx`. + + Args: + ctx: Context object of Fitbit Companion App. + + Raises: + errors.ContextError: Fail to reach target page. + """ + ctx.ad.adb.shell( + 'am start -a android.intent.action.VIEW -d market://details?id=com.fitbit.FitbitMobile' + ) + ctx.expect_page(other_pages.GooglePlayPage) + + +def remove_all_paired_devices(ctx: context.Context) -> int: + """Removes all paired devices. + + Args: + ctx: Context object of Fitbit Companion App. + + Returns: + The number of paired device being removed. + + Raises: + errors.ContextError: Fail to reach target page. + AssertionError: Fail in evaluation after pairing. + """ + removed_count = 0 + ctx.go_page(account_pages.AccountPage) + paired_devices = ctx.page.get_paired_devices() + while paired_devices: + ctx.page.click(paired_devices[0]) + ctx.expect_page(account_pages.PairedDeviceDetailPage) + ctx.page.unpair() + ctx.expect_page(account_pages.UnpairConfirmPage) + ctx.page.confirm() + ctx.expect_page(account_pages.AccountPage) + removed_count += 1 + paired_devices = ctx.page.get_paired_devices() + + return removed_count diff --git a/system/blueberry/utils/ui_pages/fitbit_companion/account_pages.py b/system/blueberry/utils/ui_pages/fitbit_companion/account_pages.py new file mode 100644 index 0000000000..b2e14d80dd --- /dev/null +++ b/system/blueberry/utils/ui_pages/fitbit_companion/account_pages.py @@ -0,0 +1,82 @@ +"""Account pages associated with Fitbit Companion App testing.""" +from typing import List + +from blueberry.utils.ui_pages import ui_core +from blueberry.utils.ui_pages import ui_node +from blueberry.utils.ui_pages.fitbit_companion import constants + + +class AccountPage(ui_core.UIPage): + """Fitbit Companion App's account page.""" + + ACTIVITY = f'{constants.PKG_NAME}/com.fitbit.settings.ui.AccountActivity' + PAGE_RID = f'{constants.PKG_NAME_ID}/account_recycler' + NODE_UER_AVATOR_RID = f'{constants.PKG_NAME_ID}/userAvatar' + NODE_BACK_CONTENT_DESC = 'Navigate up' + NODE_ADD_DEV_RID = f'{constants.PKG_NAME_ID}/add_device_img' + NODE_DEVICE_RID = f'{constants.PKG_NAME_ID}/device_name' + + def add_device(self) -> ui_core.UIPage: + """Goes to page to add new device. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get the target node. + """ + return self.click_node_by_rid(self.NODE_ADD_DEV_RID) + + def back(self) -> ui_core.UIPage: + """Goes back to Fitbit Companion App's home page. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_content_desc(self.NODE_BACK_CONTENT_DESC) + + def get_paired_devices(self) -> List[ui_node.UINode]: + """Gets all paired devices. + + Returns: + The list of node representing the paired device. + """ + return self.get_all_nodes_by_rid(self.NODE_DEVICE_RID) + + +class PairedDeviceDetailPage(ui_core.UIPage): + """Fitbit Companion App's page of paired device.""" + + PAGE_RID = f'{constants.PKG_NAME_ID}/unpair' + + def unpair(self) -> ui_core.UIPage: + """Unpairs device. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to find the target node. + """ + # TODO(user): We need to consider the situation while device + # sync now which cause unpair to fail. + return self.click_node_by_rid(self.PAGE_RID) + + +class UnpairConfirmPage(ui_core.UIPage): + """Fitbit Companion App's page to confirm the action of unpairing.""" + + PAGE_TEXT = 'UNPAIR' + + def confirm(self) -> ui_core.UIPage: + """Confirms the action of unpairing. + + Returns: + The transformed page. + """ + self.click_node_by_text(self.PAGE_TEXT) + self.ctx.expect_page(AccountPage) + return self.ctx.page diff --git a/system/blueberry/utils/ui_pages/fitbit_companion/constants.py b/system/blueberry/utils/ui_pages/fitbit_companion/constants.py new file mode 100644 index 0000000000..01cf50b185 --- /dev/null +++ b/system/blueberry/utils/ui_pages/fitbit_companion/constants.py @@ -0,0 +1,11 @@ +"""Constants used in Fitbit companion app pages.""" + +# Fitbit package name +PKG_NAME = 'com.fitbit.FitbitMobile' + +# Fitbit Main Page Activity +# com.fitbit.FitbitMobile/com.fitbit.home.ui.HomeActivity +MAIN_PAGE_ACTIVITY = f'{PKG_NAME}/com.fitbit.FirstActivity' + +# Fitbit package ID +PKG_NAME_ID = f'{PKG_NAME}:id' diff --git a/system/blueberry/utils/ui_pages/fitbit_companion/context.py b/system/blueberry/utils/ui_pages/fitbit_companion/context.py new file mode 100644 index 0000000000..20d294d7b9 --- /dev/null +++ b/system/blueberry/utils/ui_pages/fitbit_companion/context.py @@ -0,0 +1,52 @@ +"""Context of Fitbit Companion App.""" +from mobly.controllers import android_device + +from blueberry.utils.ui_pages import ui_core +from blueberry.utils.ui_pages.fitbit_companion import constants + + +class Context(ui_core.Context): + """Context of Fitbit Companion App. + + Attributes: + ad: The Android device where the UI pages are derived from. + safe_get: If True, use `safe_get_page` to get the page; + otherwise, use `get_page`. + do_go_home: If False the context object will stay in the + App's current page. + """ + + def __init__(self, ad: android_device.AndroidDevice, + safe_get: bool = False, + do_go_home: bool = True) -> None: + super().__init__(ad, [HomePage], safe_get=safe_get, + do_go_home=do_go_home) + + def go_home_page(self) -> ui_core.UIPage: + """Goes to Fitbit companion App's home page. + + Returns: + The home page object. + + Raises: + errors.ContextError: Fail to reach target page. + """ + return self.go_page(HomePage) + + +class HomePage(ui_core.UIPage): + """Fitbit Companion App's home page.""" + + ACTIVITY = f'{constants.PKG_NAME}/com.fitbit.home.ui.HomeActivity' + PAGE_RID = f'{constants.PKG_NAME_ID}/userAvatar' + + def go_account_page(self) -> ui_core.UIPage: + """Goes to Fitbit companion App's account page. + + Returns: + The account page object. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_rid(self.PAGE_RID) diff --git a/system/blueberry/utils/ui_pages/fitbit_companion/other_pages.py b/system/blueberry/utils/ui_pages/fitbit_companion/other_pages.py new file mode 100644 index 0000000000..83d9170f15 --- /dev/null +++ b/system/blueberry/utils/ui_pages/fitbit_companion/other_pages.py @@ -0,0 +1,359 @@ +"""Other pages associated with Fitbit Companion App testing.""" +import shlex + +from typing import List, Optional +from xml.dom import minidom + +from blueberry.utils.ui_pages import errors +from blueberry.utils.ui_pages import ui_core +from blueberry.utils.ui_pages import ui_node +from blueberry.utils.ui_pages.fitbit_companion import constants + +# Typing hint alias. +NodeList = List[ui_node.UINode] + + +class LoginInputPage(ui_core.UIPage): + """Fitbit Companion App's login input page.""" + + PAGE_TEXT = 'Forgot password?' + _NODE_EMAIL_INPUT_TEXT = f'{constants.PKG_NAME_ID}/login_email' + _NODE_PASSWORD_INPUT_RID = f'{constants.PKG_NAME_ID}/login_password' + _NODE_LOGIN_BUTTON_RID = f'{constants.PKG_NAME_ID}/login_button' + + def input(self, account: str, password: str) -> ui_core.UIPage: + """Inputs login credentials. + + Args: + account: The account is used to login Fitbit App. + password: The password is used to login Fitbit App. + + Returns: + The transformed page. + """ + self.click_node_by_rid(self._NODE_EMAIL_INPUT_TEXT) + self.ctx.ad.adb.shell(shlex.split(f'input text "{account}"')) + self.click_node_by_rid(self._NODE_PASSWORD_INPUT_RID) + self.ctx.ad.adb.shell(shlex.split(f'input text "{password}"')) + return self.click_node_by_rid(self._NODE_LOGIN_BUTTON_RID) + + +class GoogleSmartLockSavePage(ui_core.UIPage): + """Google SmartLock popup to save input credentials.""" + + PAGE_TEXT = 'Save password to Google?' + _NODE_NO_RID = 'android:id/autofill_save_no' + _NODE_YES_RID = 'android:id/autofill_save_yes' + + def yes(self) -> ui_core.UIPage: + """Saves the input credentials in Google SmartLock. + + Returns: + The transformed page. + """ + return self.click_node_by_rid(self._NODE_YES_RID) + + def no(self) -> ui_core.UIPage: + """Skips the request to save input credentials. + + Returns: + The transformed page. + """ + return self.click_node_by_rid(self._NODE_NO_RID) + + +class GoogleSmartLockPage(ui_core.UIPage): + """Google SmartLock popup from login page.""" + + PAGE_TEXT = 'Google Smartlock' + _NODE_YES_TEXT = 'YES' + _NODE_NO_TEXT = 'NO' + + def yes(self) -> ui_core.UIPage: + """Logins by GoogleSmartLock. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self._NODE_YES_TEXT) + + def no(self) -> ui_core.UIPage: + """Skips GoogleSmartLock. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self._NODE_NO_TEXT) + + +class LoginPage(ui_core.UIPage): + """Fitbit Companion App's login page.""" + + PAGE_TEXT = 'Log in' + + def login(self, + account: str, + password: str, + allow_google_smartlock: bool = True) -> ui_core.UIPage: + """Logins the Fitbit Companion App. + + Args: + account: Login account + password: Login password + allow_google_smartlock: True to allow Google SmartLock feature. + + Returns: + The transformed page. + """ + self.click_node_by_text(self.PAGE_TEXT) + self.ctx.expect_pages([GoogleSmartLockPage, LoginInputPage]) + if self.ctx.is_page(GoogleSmartLockPage): + if allow_google_smartlock: + return self.ctx.page.yes() + else: + self.ctx.page.no() + + self.ctx.expect_page(LoginInputPage) + return self.ctx.page.input(account, password) + + +class GooglePlayPage(ui_core.UIPage): + """Fitbit Companion App's GooglePlay page.""" + + _FITBIT_COMPANION_NAME_TEXT = 'Fitbit, Inc.' + _NODE_RESOURCE_NAME_RID = 'com.android.vending:id/0_resource_name_obfuscated' + _NODE_UNINSTALL_BUTTON_TEXT = 'Uninstall' + _NODE_OPEN_BUTTON_TEXT = 'Open' + _NODE_INSTALL_BUTTON_TEXT = 'Install' + + @classmethod + def from_xml(cls, ctx: ui_core.Context, ui_xml: minidom.Document, + clickable_nodes: NodeList, + enabled_nodes: NodeList, + all_nodes: NodeList) -> Optional[ui_core.UIPage]: + """Instantiates page object from XML object. + + Args: + ctx: Page context object. + ui_xml: Parsed XML object. + clickable_nodes: Clickable node list from page. + enabled_nodes: Enabled node list from page. + all_nodes: All node from page. + + Returns: + UI page object iff the given XML object can be parsed. + """ + for node in enabled_nodes: + if (node.text == cls._FITBIT_COMPANION_NAME_TEXT and + node.resource_id == cls._NODE_RESOURCE_NAME_RID): + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + + def open(self) -> ui_core.UIPage: + """Opens the Fitbit Companion App. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self._NODE_OPEN_BUTTON_TEXT) + + def install(self, open_app: bool = True) -> ui_core.UIPage: + """Installs the Fitbit Companion App. + + Args: + open_app: True to open application after installation. + + Returns: + The transformed page. + """ + if self.get_node_by_text(self._NODE_OPEN_BUTTON_TEXT) is None: + # The app is not installed yet. + self.click_node_by_text(self._NODE_INSTALL_BUTTON_TEXT) + self.ctx.expect_page( + self.__class__, + wait_sec=120, + node_eval=lambda node: node.text == self._NODE_OPEN_BUTTON_TEXT) + + if open_app: + return self.ctx.page.open() + else: + return self.ctx.page + + +class AllowLocationPermissionConfirmPopup(ui_core.UIPage): + """Page to confirm the location permission request.""" + + PAGE_RE_TEXT = 'This app wants to access your location' + _ALLOW_BUTTON_RESOURCE_ID = 'com.android.permissioncontroller:id/permission_no_upgrade_button' + + def allow(self) -> ui_core.UIPage: + """Allows the request.""" + return self.click_node_by_rid(self._ALLOW_BUTTON_RESOURCE_ID) + + +class AllowLocationPermissionPopup(ui_core.UIPage): + """Page to allow location permission.""" + + PAGE_TEXT = 'While using the app' + + def next(self) -> ui_core.UIPage: + """Allows the permission.""" + return self.click_node_by_text(self.PAGE_TEXT) + + +class LocationPermissionSync(ui_core.UIPage): + """Page to require location permission required by Fitbit Companion App.""" + + PAGE_TEXT = 'Location permission required to sync' + _EXIT_IMAGE_CLASS = 'android.widget.ImageButton' + _LOCATION_PERMISSION_CHECK_BOX_TEXT = 'Location Permission' + _UPDATE_BUTTON_TEXT = 'Update Settings' + + def enable(self) -> ui_core.UIPage: + """Enables location permission to app.""" + self.ctx.enable_registered_page_call = False + self.click_node_by_text(self._LOCATION_PERMISSION_CHECK_BOX_TEXT) + self.ctx.get_page() + self.ctx.expect_page(AllowLocationPermissionPopup) + self.ctx.page.next() + self.ctx.expect_page(AllowLocationPermissionConfirmPopup) + self.ctx.page.allow() + self.click_node_by_text(self._UPDATE_BUTTON_TEXT) + self.ctx.enable_registered_page_call = True + return self.click_node_by_class(self._EXIT_IMAGE_CLASS) + + +class PurchaseFail(ui_core.UIPage): + """Fitbit Companion App's page to show failure of purchase.""" + + PAGE_TEXT = 'Purchase failed!' + _NODE_OK_TEXT = 'OK' + + def ok(self) -> ui_core.UIPage: + """Confirms the failure. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self._NODE_OK_TEXT) + + +class AllowNotification(ui_core.UIPage): + """Fitbit Companion App's page to allow access of notification.""" + + PAGE_TEXT = 'Allow notification access for Fitbit?' + _NODE_ALLOW_TEXT = 'Allow' + _NODE_DENY_TEXT = 'Deny' + + def allow(self) -> ui_core.UIPage: + """Allows the request. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self._NODE_ALLOW_TEXT) + + def deny(self) -> ui_core.UIPage: + """Denies the request. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self._NODE_DENY_TEXT) + + +class SettingLocation(ui_core.UIPage): + """Android location setting page.""" + + _NODE_SWITCH_RID = 'com.android.settings:id/switch_widget' + _NODE_SWITCH_TEXT_RID = 'com.android.settings:id/switch_text' + + @classmethod + def from_xml(cls, ctx: ui_core.Context, ui_xml: minidom.Document, + clickable_nodes: NodeList, + enabled_nodes: NodeList, + all_nodes: NodeList) -> Optional[ui_core.UIPage]: + """Instantiates page object from XML object. + + Args: + ctx: Page context object. + ui_xml: Parsed XML object. + clickable_nodes: Clickable node list from page. + enabled_nodes: Enabled node list from page. + all_nodes: All node from page. + + Returns: + UI page object iff the given XML object can be parsed. + """ + for node in enabled_nodes: + if (node.text == 'Use location' and + node.resource_id == cls._NODE_SWITCH_TEXT_RID): + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + + def back(self) -> ui_core.UIPage: + """Backs to previous page. + + Returns: + The transformed page. + """ + self.click_node_by_content_desc('Navigate up') + return self.swipe_left() + + @property + def enabled(self) -> bool: + """Checks the location setting. + + Returns: + True iff the location is enabled. + + Raises: + errors.UIError: Fail to find the target node. + """ + node = self.get_node_by_rid(self._NODE_SWITCH_RID) + if not node: + raise errors.UIError('Fail to get switch node!') + + return node.attributes['checked'].value == 'true' + + def set(self, value: bool) -> ui_core.UIPage: + """Toggles the switch to enable/disable location. + + Args: + value: True to turn on setting; False to turn off setting. + + Returns: + The transformed page. + """ + if value != self.enabled: + return self.click_node_by_rid(self._NODE_SWITCH_RID) + + return self + + +class LocationDisabledPage(ui_core.UIPage): + """Popup page for notification as location is disabled.""" + + PAGE_RE_TEXT = 'ENABLE LOCATIONS' + _NODE_ENABLE_TEXT = 'ENABLE LOCATIONS' + + def enable(self) -> ui_core.UIPage: + """Enables the location setting. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self._NODE_ENABLE_TEXT) + + +class LinkConfirmPage(ui_core.UIPage): + """Popup page to confirm the process of pairing.""" + + PAGE_RID = 'com.android.companiondevicemanager:id/buttons' + _NODE_PAIR_BTN_RID = 'com.android.companiondevicemanager:id/button_pair' + + def ok(self) -> ui_core.UIPage: + """Confirms pairing process. + + Returns: + The transformed page. + """ + return self.click_node_by_rid(self._NODE_PAIR_BTN_RID) diff --git a/system/blueberry/utils/ui_pages/fitbit_companion/pairing_pages.py b/system/blueberry/utils/ui_pages/fitbit_companion/pairing_pages.py new file mode 100644 index 0000000000..5c7cf08fda --- /dev/null +++ b/system/blueberry/utils/ui_pages/fitbit_companion/pairing_pages.py @@ -0,0 +1,393 @@ +"""Pages associated with pairing process of Fitbit tracker device.""" +import shlex +from typing import List, Optional +from xml.dom import minidom + +from blueberry.utils.ui_pages import errors +from blueberry.utils.ui_pages import ui_core +from blueberry.utils.ui_pages import ui_node +from blueberry.utils.ui_pages.fitbit_companion import constants + +# Alias for typing convenience. +_NodeList = List[ui_node.UINode] + + +class PairRetryPage(ui_core.UIPage): + """Fitbit Companion App's page for retry of pairing.""" + + PAGE_RE_TEXT = 'TRY AGAIN' + + def retry(self) -> ui_core.UIPage: + """Clicks button to retry pairing. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to find the target node. + """ + return self.click_node_by_text('TRY AGAIN') + + +class Pairing4DigitPage(ui_core.UIPage): + """Fitbit Companion App's page to enter 4 digit pins for pairing.""" + + PAGE_RID = f'{constants.PKG_NAME_ID}/digits' + NODE_DIGIT_RIDS = (f'{constants.PKG_NAME_ID}/digit0' + f'{constants.PKG_NAME_ID}/digit1' + f'{constants.PKG_NAME_ID}/digit2' + f'{constants.PKG_NAME_ID}/digit3') + + def input_pins(self, pins: str) -> ui_core.UIPage: + """Inputs 4 digit pins required in pairing process. + + Args: + pins: 4 digit pins (e.g.: "1234") + + Returns: + The transformed page. + + Raises: + ValueError: Input pins is not valid. + """ + if len(pins) != 4: + raise ValueError(f'4 digits required here! (input={pins})') + + for digit in pins: + self.ctx.ad.adb.shell(shlex.split(f'input text "{digit}"')) + + return self.ctx.page + + +class PairingConfirmPage(ui_core.UIPage): + """Fitbit Companion App's page to confirm pairing.""" + + NODE_ALLOW_ACCESS_TEXT = 'Allow access to your contacts and call history' + NODE_PAIR_TEXT = 'Pair' + + @classmethod + def from_xml(cls, ctx: ui_core.Context, ui_xml: minidom.Document, + clickable_nodes: _NodeList, enabled_nodes: _NodeList, + all_nodes: _NodeList) -> Optional[ui_core.UIPage]: + """Instantiates page object from XML object. + + Args: + ctx: Page context object. + ui_xml: Parsed XML object. + clickable_nodes: Clickable node list from page. + enabled_nodes: Enabled node list from page. + all_nodes: All node from page. + + Returns: + UI page object iff the given XML object can be parsed. + """ + for node in enabled_nodes: + if (node.text == cls.NODE_PAIR_TEXT and + node.resource_id == 'android:id/button1'): + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + + def confirm(self) -> ui_core.UIPage: + """Confirms the action of pairing. + + Returns: + The transformed page. + """ + self.click_node_by_text(self.NODE_ALLOW_ACCESS_TEXT) + + return self.click_node_by_text(self.NODE_PAIR_TEXT) + + +class PairingIntroPage(ui_core.UIPage): + """Fitbit Companion App's pages for introduction of product usage.""" + + NODE_TITLE_TEXT_SET = frozenset([ + 'All set!', + 'Double tap to wake', + 'Firmly double-tap', + 'How to go back', + 'Swipe down', + 'Swipe left or right', + 'Swipe to navigate', + 'Swipe up', + 'Try it on', + 'Wear & care tips', + ]) + NODE_NEXT_BTN_RID = f'{constants.PKG_NAME_ID}/btn_next' + + @classmethod + def from_xml(cls, ctx: ui_core.Context, ui_xml: minidom.Document, + clickable_nodes: _NodeList, enabled_nodes: _NodeList, + all_nodes: _NodeList) -> Optional[ui_core.UIPage]: + """Instantiates page object from XML object. + + The appending punctuation '.' of the text will be ignored during comparison. + + Args: + ctx: Page context object. + ui_xml: Parsed XML object. + clickable_nodes: Clickable node list from page. + enabled_nodes: Enabled node list from page. + all_nodes: All node from page. + + Returns: + UI page object iff the given XML object can be parsed. + """ + for node in enabled_nodes: + node_text = node.text[:-1] if node.text.endswith('.') else node.text + if node_text in cls.NODE_TITLE_TEXT_SET: + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + + def next(self): + """Moves to next page.""" + return self.click_node_by_rid(self.NODE_NEXT_BTN_RID) + + +class PairAndLinkPage(ui_core.UIPage): + """Fitbit Companion App's landing page for pairing and linking.""" + + PAGE_TEXT = 'Bluetooth Pairing and Linking' + NODE_CANCEL_TEXT = 'Cancel' + + def cancel(self) -> ui_core.UIPage: + """Cancel pairing process. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self.NODE_CANCEL_TEXT) + + +class PremiumPage(ui_core.UIPage): + """Fitbit Companion App's page for Premium information.""" + + PAGE_TEXT = 'See all Premium features' + NODE_EXIT_IMG_BTN_CLASS = 'android.widget.ImageButton' + + def done(self): + """Completes pairing process. + + Returns: + The transformed page. + """ + return self.click_node_by_class(self.NODE_EXIT_IMG_BTN_CLASS) + + +class PairPrivacyConfirmPage(ui_core.UIPage): + """Fitbit Companion App's page to confirm the privacy befoe pairing.""" + + PAGE_RID = f'{constants.PKG_NAME_ID}/gdpr_scroll_view' + _NODE_ACCEPT_BTN_TEXT = 'ACCEPT' + _SWIPE_RETRY = 5 + + def accept(self) -> ui_core.UIPage: + """Accepts the privacy policy. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + self.swipe_down() + for i in range(self._SWIPE_RETRY): + node = self.get_node_by_text(self._NODE_ACCEPT_BTN_TEXT) + if node is None: + raise errors.UIError( + f'Fail to find the node with text={self._NODE_ACCEPT_BTN_TEXT}') + + atr_obj = node.attributes.get('enabled') + if atr_obj is not None and atr_obj.value == 'true': + return self.click_node_by_text(self._NODE_ACCEPT_BTN_TEXT) + + self.log.debug('swipe down to browse the privacy info...%d', i + 1) + self.swipe_down() + self.ctx.get_page() + + raise errors.UIError( + 'Fail to wait for the enabled button to confirm the privacy!') + + +class CancelPairPage(ui_core.UIPage): + """Fitbit Companion App's page to confirm the cancel of pairing.""" + + PAGE_TEXT = ('Canceling this process may result in poor connectivity with ' + 'your Fitbit device.') + NODE_YES_TEXT = 'YES' + NODE_NO_TEXT = 'NO' + + def yes(self) -> ui_core.UIPage: + """Cancels the pairing process. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_text(self.NODE_YES_TEXT) + + def no(self) -> ui_core.UIPage: + """Continues the pairing process. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_text(self.NODE_NO_TEXT) + + +class CancelPair2Page(ui_core.UIPage): + """Fitbit Companion App's page to confirm the cancel of pairing.""" + + PAGE_TEXT = ( + 'Are you sure you want to cancel pairing?' + ' You can set up your Fitbit Device later on the Devices screen.') + + _NODE_YES_TEXT = 'CANCEL' + + def yes(self) -> ui_core.UIPage: + """Cancels the pairing process. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_text(self._NODE_YES_TEXT) + + +class ConfirmReplaceSmartWatchPage(ui_core.UIPage): + """Fitbit Companion App's page to confirm the replacement of tracker device. + + When you already have one paired tracker device and you try to pair a + new one, this page will show up. + """ + NODE_SWITCH_BTN_TEXT = 'SWITCH TO' + PAGE_TEXT = 'Switching?' + + def confirm(self) -> ui_core.UIPage: + """Confirms the switching. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + + def _search_switch_btn_node(node: ui_node.UINode) -> bool: + if node.text.startswith(self.NODE_SWITCH_BTN_TEXT): + return True + + return False + + node = self.get_node_by_func(_search_switch_btn_node) + if node is None: + raise errors.UIError( + 'Failed to confirm the switching of new tracker device!') + + return self.click(node) + + +class ConfirmChargePage(ui_core.UIPage): + """Fitbit Companion App's page to confirm the charge condition.""" + + PAGE_RE_TEXT = 'Let your device charge during setup' + + def next(self) -> ui_core.UIPage: + """Forwards to pairing page. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_text('NEXT') + + +class ChooseTrackerPage(ui_core.UIPage): + """Fitbit Companion App's page to select device model for pairing.""" + + ACTIVITY = f'{constants.PKG_NAME}/com.fitbit.device.ui.setup.choose.ChooseTrackerActivity' + PAGE_RID = f'{constants.PKG_NAME_ID}/choose_tracker_title_container' + + def select_device(self, name: str) -> ui_core.UIPage: + """Selects tracker device. + + Args: + name: The name of device. (e.g.: 'Buzz') + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_text(name) + + +class ConfirmDevicePage(ui_core.UIPage): + """Fitbit Companion App's page to confirm the selected tracker device.""" + PAGE_TEXT = 'SET UP' + ACTIVITY = f'{constants.PKG_NAME}/com.fitbit.device.ui.setup.choose.ConfirmDeviceActivity' + + def confirm(self) -> ui_core.UIPage: + """Confirms the selection. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + return self.click_node_by_text(self.PAGE_TEXT) + + +class SkipInfoPage(ui_core.UIPage): + """Fitbit Companion App's page to skip the 'not working' page.""" + + PAGE_TEXT = 'Skip Information Screens' + NODE_SKIP_TEXT = 'SKIP' + NODE_CONTINUE_TEXT = 'CONTINUE' + + def skip(self) -> ui_core.UIPage: + """Skips the information screens. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self.NODE_SKIP_TEXT) + + +class UpdateDevicePage(ui_core.UIPage): + """Fitbit Companion App's page to update device.""" + + PAGE_TEXT = 'INSTALL UPDATE NOW' + NODE_UPDATE_LATER_BTN_TEXT = 'UPDATE LATER' + + def update_later(self) -> ui_core.UIPage: + """Cancels the update. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self.NODE_UPDATE_LATER_BTN_TEXT) + + +class PurchasePage(ui_core.UIPage): + """Fitbit Companion App's page to purchase merchandise.""" + + PAGE_RE_TEXT = 'Protect Your New Device' + NODE_SKIP_BTN_TEXT = 'NOT NOW' + + def skip(self) -> ui_core.UIPage: + """Skips the purchase action. + + Returns: + The transformed page. + """ + return self.click_node_by_text(self.NODE_SKIP_BTN_TEXT) diff --git a/system/blueberry/utils/ui_pages/test_utils.py b/system/blueberry/utils/ui_pages/test_utils.py new file mode 100644 index 0000000000..c718fa379a --- /dev/null +++ b/system/blueberry/utils/ui_pages/test_utils.py @@ -0,0 +1,39 @@ +"""Utility used for unit test.""" +from typing import Dict, Sequence + + +class MockNode: + """Mock node class.""" + + def __init__(self, attrs: Dict[str, str], + child_attrs: Sequence[Dict[str, str]] = ({},), + is_child: bool = False): + self.attributes = attrs + self.childs = [ + MockNode(attrs, is_child=True) for attrs in child_attrs if attrs] + + self.is_child = is_child + if 'bounds' not in self.attributes: + self.attributes['bounds'] = '[0,0][384,384]' + + def __str__(self): + xml_str_elements = [] + if not self.is_child: + xml_str_elements.append( + "<?xml version='1.0' encoding='UTF-8' standalone='yes' ?>") + xml_str_elements.append('<hierarchy rotation="0">') + + xml_str_elements.append('<node index="0"') + for attr_key, attr_val in self.attributes.items(): + xml_str_elements.append(f' {attr_key}="{attr_val}"') + xml_str_elements.append('>') + + for child in self.childs: + xml_str_elements.append(str(child)) + + xml_str_elements.append('</node>') + + if not self.is_child: + xml_str_elements.append('</hierarchy>') + + return ''.join(xml_str_elements) diff --git a/system/blueberry/utils/ui_pages/ui_core.py b/system/blueberry/utils/ui_pages/ui_core.py new file mode 100644 index 0000000000..d3e260177b --- /dev/null +++ b/system/blueberry/utils/ui_pages/ui_core.py @@ -0,0 +1,942 @@ +"""Core components of ui_pages.""" +from __future__ import annotations + +import abc +import itertools +import logging +import os +import re +import shlex +import time +from typing import Any, Callable, Dict, Generator, Iterable, List, Sequence, NamedTuple, Optional, Tuple, Type +from xml.dom import minidom + +from mobly.controllers import android_device +from mobly.controllers.android_device_lib import adb + +# Internal import +from blueberry.utils.ui_pages import errors +from blueberry.utils.ui_pages import ui_node +from blueberry.utils.ui_pages import utils + +# Return type of otpional UINode. +OptUINode = Optional[ui_node.UINode] + +# Return type of node generator. +NodeGenerator = Generator[ui_node.UINode, None, None] + +# Waiting time of expecting page. +_EXPECT_PAGE_WAIT_TIME_IN_SECOND = 20 + +# Function to evaluate UINode. +NodeEvaluator = Optional[Callable[[ui_node.UINode], bool]] + +# Number of retries in retrieving UI xml file. +_RETRIES_NUM_OF_ADB_COMMAND = 5 + + +# Dataclass for return of UI parsing result. +class ParsedUI(NamedTuple): + ui_xml: minidom.Document + clickable_nodes: List[ui_node.UINode] + enabled_nodes: List[ui_node.UINode] + all_nodes: List[ui_node.UINode] + + +class Context(abc.ABC): + """Context of UI page. + + Attributes: + ad: The Android device where the UI pages are derived from. + page: The last obtained UI page object. + regr_page_calls: Key as page class; value as registered method of page class + to call when meeting it. + root_node: The root node of parsed XML file dumped from adb. + known_pages: List of UIPage objects used to represent the current page. + log: The logger object. + safe_get: The method `safe_get_page` will be used to get page iff True. + Otherwise the method `strict_get_page` will be used instead as default. + enable_registered_page_call: True to enable the phrase of executing + registered page action(s). It is used to avoid infinite loop situations. + """ + + def __init__(self, + ad: android_device.AndroidDevice, + known_pages: List[Type[UIPage]], + do_go_home: bool = True, + safe_get: bool = False) -> None: + self.log = logging.getLogger(self.__class__.__name__) + self.ad = ad + self.page = None + self.regr_page_calls = {} + self.root_node = None + self.known_pages = known_pages + self.safe_get = safe_get + self.enable_registered_page_call = True + self.log.debug('safe_get=%s; do_go_home=%s', self.safe_get, do_go_home) + if do_go_home: + self.unlock_screen() + self.go_home_page() + else: + self.get_page() + + def regr_page_call(self, page_class: Type[UIPage], method_name: str) -> None: + """Registers a page call. + + This method is used to register a fixed method call on registered + page class. Whenever the current page is of regisered page class, + the registered method name will be called automatically. + + Args: + page_class: The page class to register for callback. + method_name: Name of method from `page_class` to be registered. + """ + self.regr_page_calls[page_class] = method_name + + def get_regr_page_call(self, page_obj: UIPage) -> Optional[str]: + """Gets the registered method name. + + We use method `self.regr_page_call` to register the subclass of UIPage + as specific page and its method name. Then this method is used to + retrieve the registered method name according to the input page object. + + Args: + page_obj: The page object to search for registered method name. + + Returns: + The registered method name of given page object iff registered. + Otherwise, None is returned. + """ + for page_class, method_name in self.regr_page_calls.items(): + if isinstance(page_obj, page_class): + return method_name + + return None + + @abc.abstractmethod + def go_home_page(self) -> UIPage: + """Goes to home page. + + This is a abtract method to be implemented in subclass. + Different App will have different home page and this method + is implemented in the context of each App. + + Returns: + The home page object. + """ + pass + + def go_page(self, page_class: Type[UIPage]) -> UIPage: + """Goes to target page. + + Args: + page_class: The class of target page to go to. + + Returns: + The corresponding UIPage of given page class. + + Raises: + errors.ContextError: Fail to reach target page. + """ + if self.is_page(page_class): + return self.page + + self.ad.adb.shell(f'am start -n {page_class.ACTIVITY}') + self.get_page() + self.expect_page(page_class) + + return self.page + + def send_keycode(self, keycode: str) -> None: + """Sends keycode. + + Args: + keycode: Key code to be sent. e.g.: "BACK" + """ + self.ad.adb.shell(f'input keyevent KEYCODE_{keycode}') + + def back(self) -> UIPage: + """Sends keycode 'BACK'. + + Returns: + The transformed page object. + """ + self.send_keycode('BACK') + return self.get_page() + + def send_keycode_number_pad(self, number: str) -> None: + """Sends keycode of number pad. + + Args: + number: The number pad to be sent. + """ + self.send_keycode(f'NUMPAD_{number}') + + def get_my_current_focus_app(self) -> str: + """Gets the current focus application. + + Returns: + The current focus app activity name iff it works. + Otherwise, empty string is returned. + """ + output = self.ad.adb.shell( + 'dumpsys activity | grep -E mFocusedApp').decode() + if any([ + not output, 'not found' in output, "Can't find" in output, + 'mFocusedApp=null' in output + ]): + self.log.warning( + 'Fail to obtain the current app activity with output: %s', output) + return '' + + # The output may look like: + # ActivityRecord{... FitbitMobile/com.fitbit.home.ui.HomeActivity t93} + # and we want to extract 'FitbitMobile/com.fitbit.home.ui.HomeActivity' + result = output.split(' ')[-2] + self.log.debug('Current focus app activity is %s', result) + return result + + def unlock_screen(self) -> UIPage: + """Unlocks the screen. + + This method will assume that the device is not protected + by password under testing. + + Returns: + The page object after unlock. + """ + # Bring device to SLEEP so that unlock process can start fresh. + self.send_keycode('SLEEP') + time.sleep(1) + self.send_keycode('WAKEUP') + self.get_page() + self.page.swipe_down() + return self.page + + def is_page(self, page_class: Type[UIPage]) -> bool: + """Checks the current page is of expected page. + + Args: + page_class: The class of expected page. + + Returns: + True iff the current page is of expected page. + """ + return isinstance(self.page, page_class) + + @retry.logged_retry_on_exception( + retry_value=(adb.Error, errors.ContextError), + retry_intervals=retry.FuzzedExponentialIntervals( + initial_delay_sec=1, + num_retries=_RETRIES_NUM_OF_ADB_COMMAND, + factor=1.1)) + def get_ui_xml(self, xml_out_dir: str = '/tmp/') -> minidom.Document: + """Gets the XML object of current UI. + + Args: + xml_out_dir: The host directory path to store the dumped UI XML file. + + Returns: + The parsed XML object of current UI page. + + Raises: + errors.ContextError: Fail to dump UI xml from adb. + """ + # Clean exist dump xml file in host if any to avoid + # parsing the previous dumped xml file. + dump_xml_name = 'window_dump.xml' + xml_path = os.path.join(xml_out_dir, dump_xml_name) + if os.path.isfile(xml_path): + os.remove(xml_path) + + dump_xml_path = f'/sdcard/{dump_xml_name}' + self.ad.adb.shell( + f"test -f {dump_xml_path} && rm {dump_xml_path} || echo 'no dump xml'") + self.ad.adb.shell('uiautomator dump') + self.ad.adb.pull(shlex.split(f'{dump_xml_path} {xml_out_dir}')) + + if not os.path.isfile(xml_path): + raise errors.ContextError(self, f'Fail to dump UI xml to {xml_path}!') + + return minidom.parse(xml_path) + + def parse_ui(self, xml_path: Optional[str] = None) -> ParsedUI: + """Parses the current UI page. + + Args: + xml_path: Target XML file path. If this argument is given, this method + will parse this XML file instead of dumping XML file through adb. + + Returns: + Parsed tuple as [ + <UI XML object>, + <List of clickable nodes>, + <List of enabled nodes>, + ] + + Raises: + errors.ContextError: Fail to dump UI XML from adb. + """ + if xml_path and os.path.isfile(xml_path): + ui_xml = minidom.parse(xml_path) + else: + ui_xml = self.get_ui_xml() + + root = ui_xml.documentElement + self.root_node = ui_node.UINode(root.childNodes[0]) + + clickable_nodes = [] + enabled_nodes = [] + all_nodes = [] + + # TODO(user): Avoid the usage of nested functions. + def _get_node_attribute(node: ui_node.UINode, name: str) -> Optional[str]: + """Gets the attribute of give node by name if exist.""" + attribute = node.attributes.get(name) + if attribute: + return attribute.value + else: + return None + + def _search_node(node: ui_node.UINode) -> None: + """Searches node(s) with desired attribute name to be true by DFS. + + Args: + node: The current node to process. + """ + rid = node.resource_id.strip() + clz = node.clz.strip() + + if rid or clz: + if _get_node_attribute(node, 'clickable') == 'true': + clickable_nodes.append(node) + if _get_node_attribute(node, 'enabled') == 'true': + enabled_nodes.append(node) + + all_nodes.append(node) + + for child_node in node.child_nodes: + _search_node(child_node) + + # TODO(user): Store nodes information in a dataclass. + _search_node(self.root_node) + return ParsedUI(ui_xml, clickable_nodes, enabled_nodes, all_nodes) + + def expect_pages(self, + page_class_list: Sequence[Type[UIPage]], + wait_sec: int = _EXPECT_PAGE_WAIT_TIME_IN_SECOND, + node_eval: NodeEvaluator = None) -> None: + """Waits for expected pages for certain time. + + Args: + page_class_list: The list of expected page class. + wait_sec: The waiting time. + node_eval: Function to search the node. Here it is used to confirm that + the target page contain the desired node. + + Raises: + errors.ContextError: Fail to reach expected page. + """ + end_time = time.monotonic() + wait_sec + while time.monotonic() < end_time: + page = self.safe_get_page() + if any((isinstance(page, page_class) for page_class in page_class_list)): + if node_eval is not None and page.get_node_by_func(node_eval) is None: + continue + + return + + raise errors.ContextError( + self, + f'Fail to reach page(s): {page_class_list} (current page={page})!') + + def expect_page(self, + page_class: Type[UIPage], + wait_sec: int = _EXPECT_PAGE_WAIT_TIME_IN_SECOND, + node_eval: NodeEvaluator = None) -> None: + """Waits for expected page for certain time.""" + self.expect_pages([page_class], wait_sec=wait_sec, node_eval=node_eval) + + def get_page(self, + wait_sec: int = 1, + xml_path: Optional[str] = None) -> UIPage: + if self.safe_get: + return self.safe_get_page(wait_sec=wait_sec, xml_path=xml_path) + else: + return self.strict_get_page(wait_sec=wait_sec, xml_path=xml_path) + + def safe_get_page(self, + wait_sec: int = 1, + xml_path: Optional[str] = None) -> UIPage: + """Gets the represented UIPage object of current UI page safely. + + Args: + wait_sec: Wait in second before actions. + xml_path: Target XML file path. If this argument is given, this method + will parse this XML file instead of dumping XML file from adb. + + Returns: + The focused UIPage object will be returned iff the XML object can + be obtained and recognized. Otherwise, NonePage is returned. + """ + try: + self.strict_get_page(wait_sec=wait_sec, xml_path=xml_path) + except errors.UnknownPageError as err: + self.ad.log.warning(str(err)) + self.page = NonePage( + self, + ui_xml=err.ui_xml, + clickable_nodes=err.clickable_nodes, + enabled_nodes=err.enabled_nodes, + all_nodes=err.all_nodes) + + return self.page + + def strict_get_page(self, + wait_sec: int = 1, + xml_path: Optional[str] = None) -> Optional[UIPage]: + """Gets the represented UIPage object of current UI page. + + This method will use adb command to dump UI XML file into local and use + the content of the dumped XML file to decide the proper page class and + instantiate it to self.page + + Args: + wait_sec: Wait in second before actions. + xml_path: Target XML file path. If this argument is given, this method + will parse this XML file instead of dumping XML file from adb. + + Returns: + The focused UIPage object will be returned iff the XML object can + be obtained and recognized. + + Raises: + errors.UnknownPageError: Fail to recognize the content of + current UI page. + errors.ContextError: Fail to dump UI xml file. + """ + time.sleep(wait_sec) + ui_xml, clickable_nodes, enabled_nodes, all_nodes = self.parse_ui(xml_path) + + for page_class in self.known_pages: + page_obj = page_class.from_xml(self, ui_xml, clickable_nodes, + enabled_nodes, all_nodes) + + if page_obj: + if self.page is not None and isinstance(page_obj, self.page.__class__): + self.log.debug('Refreshing page %s...', self.page) + self.page.refresh(page_obj) + else: + self.page = page_obj + + if self.enable_registered_page_call: + regr_method_name = self.get_regr_page_call(self.page) + if regr_method_name: + return getattr(self.page, regr_method_name)() + + return self.page + + raise errors.UnknownPageError(ui_xml, clickable_nodes, enabled_nodes, + all_nodes) + + def get_display_size(self) -> Tuple[int, int]: + """Gets the display size of the device. + + Returns: + tuple(width, height) of the display size. + + Raises: + errors.ContextError: Obtained unexpected output of + display size from adb. + """ + # e.g.: Physical size: 384x384 + output = self.ad.adb.shell(shlex.split('wm size')).decode() + size_items = output.rsplit(' ', 1)[-1].split('x') + if len(size_items) == 2: + return (int(size_items[0]), int(size_items[1])) + + raise errors.ContextError(self, f'Illegal output of display size: {output}') + + +class UIPage: + """Object to represent the current UI page. + + Attributes: + ctx: The context object to hold the current page. + ui_xml: Parsed XML object. + clickable_nodes: List of UINode with attribute `clickable="true"` + enabled_nodes: List of UINode with attribute `enabled="true"` + all_nodes: List of all UINode + log: Logger object. + """ + + # Defined in subclass + ACTIVITY = None + + # Defined in subclass + PAGE_RID = None + + # Defined in subclass + PAGE_TEXT = None + + # Defined in subclass + PAGE_RE_TEXT = None + + # Defined in subclass + PAGE_TITLE = None + + def __init__(self, ctx: Context, ui_xml: Optional[minidom.Document], + clickable_nodes: List[ui_node.UINode], + enabled_nodes: List[ui_node.UINode], + all_nodes: List[ui_node.UINode]) -> None: + self.ctx = ctx + self.ui_xml = ui_xml + self.clickable_nodes = clickable_nodes + self.enabled_nodes = enabled_nodes + self.all_nodes = all_nodes + self.log = logging.getLogger(self.__class__.__name__) + + @classmethod + def from_xml(cls, ctx: Context, ui_xml: minidom.Document, + clickable_nodes: List[ui_node.UINode], + enabled_nodes: List[ui_node.UINode], + all_nodes: List[ui_node.UINode]) -> Optional[UIPage]: + """Instantiates page object from XML object. + + Args: + ctx: Page context object. + ui_xml: Parsed XML object. + clickable_nodes: Clickable node list from page. + enabled_nodes: Enabled node list from page. + all_nodes: All node list from the page. + + Returns: + UI page object iff the given XML object can be parsed. + + Raises: + errors.UIError: The page class doesn't provide signature + for matching. + """ + if cls.PAGE_RID is not None: + for node in enabled_nodes + clickable_nodes: + if node.resource_id == cls.PAGE_RID: + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + elif cls.PAGE_TEXT is not None: + for node in enabled_nodes + clickable_nodes: + if node.text == cls.PAGE_TEXT: + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + elif cls.PAGE_RE_TEXT is not None: + for node in enabled_nodes + clickable_nodes: + if re.search(cls.PAGE_RE_TEXT, node.text): + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + elif cls.PAGE_TITLE is not None: + for node in enabled_nodes + clickable_nodes: + if all([ + node.resource_id == 'android:id/title', node.text == cls.PAGE_TITLE + ]): + return cls(ctx, ui_xml, clickable_nodes, enabled_nodes, all_nodes) + else: + raise errors.UIError(f'Illegal UI Page class: {cls}') + + def refresh(self, new_page: UIPage) -> UIPage: + """Refreshes current page with obtained latest page. + + Args: + new_page: The page with latest data for current page to be refreshed. + + Returns: + The current refreshed UI page object. + """ + self.ui_xml = new_page.ui_xml + self.clickable_nodes = new_page.clickable_nodes + self.enabled_nodes = new_page.enabled_nodes + return self + + def _get_node_search_space( + self, from_all: bool = False) -> Iterable[ui_node.UINode]: + """Gets the search space of node.""" + if from_all: + return self.all_nodes + else: + return itertools.chain(self.clickable_nodes, self.enabled_nodes) + + def get_node_by_content_desc(self, + content_desc: str, + from_all: bool = False) -> OptUINode: + """Gets the first node with desired content description. + + Args: + content_desc: Content description used for search. + from_all: True to search from all nodes; False to search only the + clickable or enabled nodes. + + Returns: + Return the first node found with expected content description + iff it exists. Otherwise, None is returned. + """ + # TODO(user): Redesign APIs for intuitive usage. + for node in self._get_node_search_space(from_all): + if node.content_desc == content_desc: + return node + + return None + + def get_node_by_func(self, + func: Callable[[ui_node.UINode], bool], + from_all: bool = False) -> OptUINode: + """Gets the first node found by given function. + + Args: + func: The function to search target node. + from_all: True to search from all nodes; False to search only the + clickable or enabled nodes. + + Returns: + The node found by given function. + """ + for node in self._get_node_search_space(from_all): + if func(node): + return node + + return None + + def get_node_by_text(self, text: str, from_all: bool = False) -> OptUINode: + """Gets the first node with desired text. + + Args: + text: Text used for search. + from_all: True to search from all nodes; False to search only the + clickable or enabled nodes. + + Returns: + Return the first node found with expected text iff it exists. + Otherwise, None is returned. + """ + for node in self._get_node_search_space(from_all): + if node.text == text: + return node + + return None + + def _yield_node_by_rid(self, + rid: str, + from_all: bool = False) -> NodeGenerator: + """Generates node with desired resource id.""" + for node in self._get_node_search_space(from_all): + if ('resource-id' in node.attributes and + node.attributes['resource-id'].value == rid): + yield node + + def get_all_nodes_by_rid(self, + rid: str, + from_all: bool = False) -> List[ui_node.UINode]: + """Gets all nodes with desired resource id. + + Args: + rid: Resource id used for search. + from_all: True to search from all nodes; False to search only the + clickable or enabled nodes. + + Returns: + The list of nodes found with expected resource id. + """ + found_node_set = set(self._yield_node_by_rid(rid, from_all)) + return list(found_node_set) + + def get_node_by_rid(self, + rid: str, + from_all: bool = False) -> Optional[ui_node.UINode]: + """Gets the first node with desired resource id. + + Args: + rid: Resource id used for search. + from_all: True to search from all nodes; False to search only the + clickable or enabled nodes. + + Returns: + Return the first node found with expected resource id iff it exists. + Otherwise, None. + """ + try: + return next(self._yield_node_by_rid(rid, from_all)) + except StopIteration: + return None + + def get_node_by_class(self, + class_name: str, + from_all: bool = False) -> Optional[ui_node.UINode]: + """Gets the first node with desired class. + + Args: + class_name: Name of class as attribute. + from_all: True to search from all nodes; False to search only the + clickable or enabled nodes. + + Returns: + Return the first node found with desired class iff it exists. + Otherwise, None. + """ + for node in self._get_node_search_space(from_all): + if node.clz == class_name: + return node + + return None + + def get_node_by_attrs(self, + attrs: Dict[str, Any], + from_all: bool = False) -> OptUINode: + """Gets the first node with the given attributes. + + Args: + attrs: Attributes used to search target node. + from_all: True to search from all nodes; False to search only the + clickable or enabled nodes. + + Returns: + Return the first UI node with expected attributes iff it exists. + Otherwise, None is returned. + """ + for node in self._get_node_search_space(from_all): + if node.match_attrs(attrs): + return node + + return None + + @utils.dr_wakeup_before_op + def swipe(self, + start_x: int, + start_y: int, + end_x: int, + end_y: int, + duration_ms: int, + swipes: int = 1) -> UIPage: + """Performs the swipe from one coordinate to another coordinate. + + Args: + start_x: The starting X-axis coordinate. + start_y: The starting Y-axis coordinate. + end_x: The ending X-axis coordinate. + end_y: The ending Y-axis coordinate. + duration_ms: The millisecond of duration to drag. + swipes: How many swipe to carry on. + + Returns: + The transformed UI page. + """ + for _ in range(swipes): + self.ctx.ad.adb.shell( + shlex.split( + f'input swipe {start_x} {start_y} {end_x} {end_y} {duration_ms}')) + + return self.ctx.get_page() + + def swipe_left(self, + duration_ms: int = 1000, + x_start: float = 0.2, + x_end: float = 0.9, + swipes: int = 1) -> UIPage: + """Performs the swipe left action. + + Args: + duration_ms: Number of milliseconds to swipe from start point to end + point. + x_start: The range of width as start position + x_end: The range of width as end position + swipes: Round to conduct the swipe action. + + Returns: + The transformed UI page. + """ + width, height = self.ctx.get_display_size() + self.log.info('Page size=(%d, %d)', width, height) + return self.swipe( + width * x_start, + height * 0.5, + width * x_end, + height * 0.5, + duration_ms=duration_ms, + swipes=swipes) + + def swipe_right(self, + duration_ms: int = 1000, + x_start: float = 0.9, + x_end: float = 0.2, + swipes: int = 1) -> UIPage: + """Performs the swipe right action. + + Args: + duration_ms: Number of milliseconds to swipe from start point to end + point. + x_start: The range of width as start position + x_end: The range of width as end position + swipes: Round to conduct the swipe action. + + Returns: + The transformed UI page. + """ + width, height = self.ctx.get_display_size() + return self.swipe( + width * x_start, + height * 0.5, + width * x_end, + height * 0.5, + duration_ms=duration_ms, + swipes=swipes) + + def swipe_down(self, duration_ms: int = 1000, swipes: int = 1) -> UIPage: + """Performs the swipe down action. + + Args: + duration_ms: Number of milliseconds to swipe from start point to end + point. + swipes: Round to conduct the swipe action. + + Returns: + The transformed UI page. + """ + width, height = self.ctx.get_display_size() + return self.swipe( + width * 0.5, + height * 0.7, + width * 0.5, + height * 0.2, + duration_ms=duration_ms, + swipes=swipes) + + def swipe_up(self, duration_ms: int = 1000, swipes: int = 1) -> UIPage: + """Performs the swipe up action. + + Args: + duration_ms: Number of milliseconds to swipe from start point to end + point. + swipes: Round to conduct the swipe action. + + Returns: + The transformed UI page. + """ + width, height = self.ctx.get_display_size() + return self.swipe( + width * 0.5, + height * 0.2, + width * 0.5, + height * 0.7, + duration_ms=duration_ms, + swipes=swipes) + + def click_on(self, x: int, y: int) -> None: + """Clicks on the given X/Y coordinates. + + Args: + x: X-axis coordinate + y: Y-axis coordinate + + Raises: + acts.controllers.adb.AdbError: If the adb shell command + failed to execute. + """ + self.ctx.ad.adb.shell(shlex.split(f'input tap {x} {y}')) + + @utils.dr_wakeup_before_op + def click(self, + node: ui_node.UINode, + do_get_page: bool = True) -> Optional[UIPage]: + """Clicks the given UI node. + + Args: + node: Node to click on. + do_get_page: Gets the latest page after clicking iff True. + + Returns: + The transformed UI page is returned iff `do_get_page` is True. + Otherwise, None is returned. + """ + self.click_on(node.x, node.y) + if do_get_page: + return self.ctx.get_page() + + def click_node_by_rid(self, + node_rid: str, + do_get_page: bool = True) -> UIPage: + """Clicks on node its resource id. + + Args: + node_rid: Resource ID of node to search and click on. + do_get_page: Gets the latest page after clicking iff True. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + node = self.get_node_by_rid(node_rid) + if node is None: + raise errors.UIError(f'Fail to find the node with resource id={node_rid}') + + return self.click(node, do_get_page) + + def click_node_by_text(self, text: str, do_get_page: bool = True) -> UIPage: + """Clicks on node by its text. + + Args: + text: Text of node to search and click on. + do_get_page: Gets the latest page after clicking iff True. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + node = self.get_node_by_text(text) + if node is None: + raise errors.UIError(f'Fail to find the node with text={text}') + + return self.click(node, do_get_page) + + def click_node_by_content_desc(self, + text: str, + do_get_page: bool = True) -> UIPage: + """Clicks on node by its content description. + + Args: + text: Content description of node to search and click on. + do_get_page: Gets the latest page after clicking iff True. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + node = self.get_node_by_content_desc(text) + if node is None: + raise errors.UIError( + f'Fail to find the node with content description={text}') + + return self.click(node, do_get_page) + + def click_node_by_class(self, + class_value: str, + do_get_page: bool = True) -> UIPage: + """Clicks on node by its class attribute value. + + Args: + class_value: Value of class attribute. + do_get_page: Gets the latest page after clicking iff True. + + Returns: + The transformed page. + + Raises: + errors.UIError: Fail to get target node. + """ + node = self.get_node_by_class(class_value) + if node is None: + raise errors.UIError(f'Fail to find the node with class={class_value}') + + return self.click(node, do_get_page) + + +class NonePage(UIPage): + """None page to handle the context when we fail to dump UI xml file.""" diff --git a/system/blueberry/utils/ui_pages/ui_node.py b/system/blueberry/utils/ui_pages/ui_node.py new file mode 100644 index 0000000000..fbb787c3ff --- /dev/null +++ b/system/blueberry/utils/ui_pages/ui_node.py @@ -0,0 +1,171 @@ +"""UI Node is used to compose the UI pages.""" +from __future__ import annotations + +import collections +from typing import Any, Dict, List, Optional +from xml.dom import minidom + +# Internal import + + +class UINode: + """UI Node to hold element of UI page. + + If both x and y axis are given in constructor, this node will use (x, y) + as coordinates. Otherwise, the attribute `bounds` of node will be used to + calculate the coordinates. + + Attributes: + node: XML node element. + x: x point of UI page. + y: y point of UI page. + """ + + STR_FORMAT = "RID='{rid}'/CLASS='{clz}'/TEXT='{txt}'/CD='{ctx}'" + PREFIX_SEARCH_IN = 'c:' + + def __init__(self, node: minidom.Element, + x: Optional[int] = None, y: Optional[int] = None) -> None: + self.node = node + if x and y: + self.x = x + self.y = y + else: + self.x, self.y = adb_ui.find_point_in_bounds( + self.attributes['bounds'].value) + + def __hash__(self) -> int: + return id(self.node) + + @property + def clz(self) -> str: + """Returns the class of node.""" + return self.attributes['class'].value + + @property + def text(self) -> str: + """Gets text of node. + + Returns: + The text of node. + """ + return self.attributes['text'].value + + @property + def content_desc(self) -> str: + """Gets content description of node. + + Returns: + The content description of node. + """ + return self.attributes['content-desc'].value + + @property + def resource_id(self) -> str: + """Gets resource id of node. + + Returns: + The resource id of node. + """ + return self.attributes['resource-id'].value + + @property + def attributes(self) -> Dict[str, Any]: + """Gets attributes of node. + + Returns: + The attributes of node. + """ + if hasattr(self.node, 'attributes'): + return collections.defaultdict( + lambda: None, + getattr(self.node, 'attributes')) + else: + return collections.defaultdict(lambda: None) + + @property + def child_nodes(self) -> List[UINode]: + """Gets child node(s) of current node. + + Returns: + The child nodes of current node if any. + """ + return [UINode(n) for n in self.node.childNodes] + + def match_attrs_by_kwargs(self, **kwargs) -> bool: + """Matches given attribute key/value pair with current node. + + Args: + **kwargs: Key/value pair as attribute key/value. + e.g.: resource_id='abc' + + Returns: + True iff the given attributes match current node. + """ + if 'clz' in kwargs: + kwargs['class'] = kwargs['clz'] + del kwargs['clz'] + + return self.match_attrs(kwargs) + + def match_attrs(self, attrs: Dict[str, Any]) -> bool: + """Matches given attributes with current node. + + This method is used to compare the given `attrs` with attributes of + current node. Only the keys given in `attrs` will be compared. e.g.: + ``` + # ui_node has attributes {'name': 'john', 'id': '1234'} + >>> ui_node.match_attrs({'name': 'john'}) + True + + >>> ui_node.match_attrs({'name': 'ken'}) + False + ``` + + If you don't want exact match and want to check if an attribute value + contain specific substring, you can leverage special prefix + `PREFIX_SEARCH_IN` to tell this method to use `in` instead of `==` for + comparison. e.g.: + ``` + # ui_node has attributes {'name': 'john', 'id': '1234'} + >>> ui_node.match_attrs({'name': ui_node.PREFIX_SEARCH_IN + 'oh'}) + True + + >>> ui_node.match_attrs({'name': 'oh'}) + False + ``` + + Args: + attrs: Attributes to compare with. + + Returns: + True iff the given attributes match current node. + """ + for k, v in attrs.items(): + if k not in self.attributes: + return False + + if v and v.startswith(self.PREFIX_SEARCH_IN): + v = v[len(self.PREFIX_SEARCH_IN):] + if not v or v not in self.attributes[k].value: + return False + elif v != self.attributes[k].value: + return False + + return True + + def __str__(self) -> str: + """The string representation of this object. + + Returns: + The string representation including below information: + - resource id + - class + - text + - content description. + """ + rid = self.resource_id.strip() + clz = self.clz.strip() + txt = self.text.strip() + ctx = self.content_desc.strip() + return f"RID='{rid}'/CLASS='{clz}'/TEXT='{txt}'/CD='{ctx}'" diff --git a/system/blueberry/utils/ui_pages/utils.py b/system/blueberry/utils/ui_pages/utils.py new file mode 100644 index 0000000000..f73d198520 --- /dev/null +++ b/system/blueberry/utils/ui_pages/utils.py @@ -0,0 +1,30 @@ +"""Utilities used by ui_pages module.""" +from typing import Any, Callable, Dict + + +def dr_wakeup_before_op(op: Callable[..., Any]) -> Callable[..., Any]: + """Sends keycode 'KEYCODE_WAKEUP' before conducting UI function. + + Args: + op: UI function (click, swipe etc.) + + Returns: + Wrapped UI function. + """ + def _wrapper(*args: Any, **kargs: Dict[str, Any]) -> Callable[..., Any]: + """Wrapper of UI function. + + Args: + *args: Argument list passed into UI function. + **kargs: key/value argument passed into UI function. + + Returns: + The returned result by calling the wrapped UI operation method. + """ + + ui_page_self = args[0] + ui_page_self.ctx.ad.adb.shell( + 'input keyevent KEYCODE_WAKEUP') + return op(*args, **kargs) + + return _wrapper |