summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Pixel and Ecosystems Test <noreply@google.com> 2021-05-21 00:59:23 +0000
committer Jizheng Chu <jizhengchu@google.com> 2021-05-24 18:05:04 +0000
commit02cfdb090aae0997ad619b4cb5b5a106dc0061a7 (patch)
tree51fb94ce8dbbb6344bb1c701d7444895ffa406f6
parent85ec1138e4d94f266f8dcdb30e7854ffe992afba (diff)
Project import generated by Copybara.
Tag: #compatibility Bug: 189117116 Test: NA PiperOrigin-RevId: 374987840 Change-Id: I1b512359e184ddc47145aa9339ddf3c75e28a2dc
-rw-r--r--system/blueberry/controllers/android_bt_target_device.py515
-rw-r--r--system/blueberry/controllers/bt_stub.py294
-rw-r--r--system/blueberry/controllers/derived_bt_device.py116
-rw-r--r--system/blueberry/controllers/grpc_bt_sync_mock.py81
-rw-r--r--system/blueberry/controllers/grpc_bt_target_mock.py78
-rw-r--r--system/blueberry/decorators/android_bluetooth_client_decorator.py48
-rw-r--r--system/blueberry/decorators/android_bluetooth_client_test_decorator.py25
-rw-r--r--system/blueberry/grpc/blueberry_device_controller.py40
-rw-r--r--system/blueberry/grpc/blueberry_device_controller_service.py37
-rw-r--r--system/blueberry/grpc/blueberry_test_client.py46
-rw-r--r--system/blueberry/grpc/proto/blueberry_device_controller.proto36
-rw-r--r--system/blueberry/sample_testbed.yaml10
-rw-r--r--system/blueberry/tests/a2dp/bluetooth_a2dp_test.py221
-rw-r--r--system/blueberry/tests/audio_capture/bluetooth_audio_capture_test.py93
-rw-r--r--system/blueberry/tests/avrcp/bluetooth_avrcp_test.py349
-rw-r--r--system/blueberry/tests/connectivity/bluetooth_connection_test.py96
-rw-r--r--system/blueberry/tests/connectivity/bluetooth_latency_test.py132
-rw-r--r--system/blueberry/tests/connectivity/bluetooth_pairing_test.py81
-rw-r--r--system/blueberry/tests/connectivity/bluetooth_throughput_test.py196
-rw-r--r--system/blueberry/tests/map/bluetooth_map_test.py148
-rw-r--r--system/blueberry/tests/pan/bluetooth_pan_test.py325
-rw-r--r--system/blueberry/tests/pbap/bluetooth_pbap_test.py401
-rw-r--r--system/blueberry/tests/pbat/bluetooth_acceptance_suite.py71
-rw-r--r--system/blueberry/utils/android_bluetooth_decorator.py1729
-rw-r--r--system/blueberry/utils/arduino_base.py78
-rw-r--r--system/blueberry/utils/blueberry_base_test.py244
-rw-r--r--system/blueberry/utils/bt_audio_utils.py229
-rw-r--r--system/blueberry/utils/bt_constants.py191
-rw-r--r--system/blueberry/utils/bt_test_utils.py200
-rw-r--r--system/blueberry/utils/command_line_runner/run_bluetooth_tests.py248
-rw-r--r--system/blueberry/utils/metrics_utils.py111
31 files changed, 6469 insertions, 0 deletions
diff --git a/system/blueberry/controllers/android_bt_target_device.py b/system/blueberry/controllers/android_bt_target_device.py
new file mode 100644
index 0000000000..9e7aebf752
--- /dev/null
+++ b/system/blueberry/controllers/android_bt_target_device.py
@@ -0,0 +1,515 @@
+"""Controller class for an android bt device with git_master-bds-dev build.
+
+The config for this derived_bt_target_device in mobileharness is:
+- name: android_bt_target_device
+ devices:
+ - type: MiscTestbedSubDevice
+ dimensions:
+ mobly_type: DerivedBtDevice
+ properties:
+ ModuleName: android_bt_target_device
+ ClassName: AndroidBtTargetDevice
+ Params:
+ config:
+ device_id: phone_serial_number
+ audio_params:
+ channel: 2
+ duration: 50
+ music_file: "music.wav"
+ sample_rate: 44100
+"""
+
+import logging
+import os
+import time
+
+from mobly import asserts
+from mobly.controllers.android_device import AndroidDevice
+from mobly.signals import ControllerError
+# Internal import
+from blueberry.utils import bt_constants
+from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator
+import blueberry.utils.bt_test_utils as btutils
+
+ADB_FILE = "rec.pcm"
+ADB_PATH = "/sdcard/Music/"
+WAVE_FILE_TEMPLATE = "recorded_audio_%s.wav"
+DEFAULT_WAIT_TIME = 3.0
+
+# A MediaBrowserService implemented in the SL4A app to intercept Media keys and
+# commands.
+BLUETOOTH_SL4A_AUDIO_SRC_MBS = "BluetoothSL4AAudioSrcMBS"
+
+A2DP_HFP_PROFILES = [
+ bt_constants.BluetoothProfile.A2DP_SINK,
+ bt_constants.BluetoothProfile.HEADSET_CLIENT
+]
+
+
+class AndroidBtTargetDevice(object):
+ """Implements an android device as a hfp and a2dp sink device.
+
+ With git_master-bds-dev build, the android device can act as a bluetooth
+ hfp and a2dp sink device.
+ """
+
+ def __init__(self, config):
+ """Initializes an android hfp device."""
+ logging.info("Initializes the android hfp device")
+ self.pri_ad = None
+ self.sec_ad = None
+ self.serial = config.get("device_id", None)
+ self.audio_params = config.get("audio_params", None)
+
+ if self.serial:
+ # self._ad for accessing the device at the end of the test
+ self._ad = AndroidDevice(self.serial)
+ self.aud = adb_ui_device.AdbUiDevice(self._ad)
+ self.pri_ad = AndroidBluetoothDecorator(self._ad)
+ self.pri_ad.init_setup()
+ self.pri_ad.sl4a_setup()
+ self.sl4a = self._ad.services.sl4a
+ self.mac_address = self.sl4a.bluetoothGetLocalAddress()
+
+ if self.audio_params:
+ self._initialize_audio_params()
+ self.avrcp_ready = False
+
+ def __getattr__(self, name):
+ return getattr(self.pri_ad, name)
+
+ def _disable_profiles(self):
+ if self.sec_ad is None:
+ raise MissingBtClientDeviceError("Please provide sec_ad forsetting"
+ "profiles")
+ self.set_profiles_policy_off(self.sec_ad, A2DP_HFP_PROFILES)
+
+ def _initialize_audio_params(self):
+ self.audio_capture_path = os.path.join(self._ad.log_path, "audio_capture")
+ os.makedirs(self.audio_capture_path)
+ self.adb_path = os.path.join(ADB_PATH, ADB_FILE)
+ self.wave_file_template = os.path.join(self.audio_capture_path,
+ WAVE_FILE_TEMPLATE)
+ self.wave_file_number = 0
+
+ def _verify_pri_ad(self):
+ if not self.pri_ad:
+ raise ControllerError("No be target device")
+
+ def clean_up(self):
+ """Resets Bluetooth and stops all services when the device is destroyed."""
+ self.deactivate_ble_pairing_mode()
+ self.factory_reset_bluetooth()
+ self._ad.services.stop_all()
+
+ def a2dp_sink_connect(self):
+ """Establishes the hft connection between self.pri_ad and self.sec_ad."""
+ self._verify_pri_ad()
+ connected = self.pri_ad.a2dp_sink_connect(self.sec_ad)
+ asserts.assert_true(
+ connected, "The a2dp sink connection between {} and {} failed".format(
+ self.serial, self.sec_ad.serial))
+ self.log.info("The a2dp sink connection between %s and %s succeeded",
+ self.serial, self.sec_ad.serial)
+ return True
+
+ def activate_pairing_mode(self):
+ """Makes the android hfp device discoverable over Bluetooth."""
+ self.log.info("Activating the pairing mode of the android target device")
+ self.pri_ad.activate_pairing_mode()
+
+ def activate_ble_pairing_mode(self):
+ """Activates BLE pairing mode on an AndroidBtTargetDevice."""
+ self.pri_ad.activate_ble_pairing_mode()
+
+ def deactivate_ble_pairing_mode(self):
+ """Deactivates BLE pairing mode on an AndroidBtTargetDevice."""
+ self.pri_ad.deactivate_ble_pairing_mode()
+
+ def add_pri_ad_device(self, pri_ad):
+ """Adds primary android device as bt target device.
+
+ The primary android device should have been initialized with
+ android_bluetooth_decorator.
+
+ Args:
+ pri_ad: the primary android device as bt target device.
+ """
+ self._ad = pri_ad
+ self.pri_ad = pri_ad
+ self.sl4a = self._ad.services.sl4a
+ self.mac_address = self.sl4a.bluetoothGetLocalAddress()
+ self.log = self.pri_ad.log
+ self.serial = self.pri_ad.serial
+ self.log.info(
+ "Adds primary android device with id %s for the bluetooth"
+ "connection", pri_ad.serial)
+ if self.audio_params:
+ self._initialize_audio_params()
+
+ def add_sec_ad_device(self, sec_ad):
+ """Adds second android device for bluetooth connection.
+
+ The second android device should have sl4a service acitvated.
+
+ Args:
+ sec_ad: the second android device for bluetooth connection.
+ """
+ self.log.info(
+ "Adds second android device with id %s for the bluetooth"
+ "connection", sec_ad.serial)
+ self.sec_ad = sec_ad
+ self.sec_ad_mac_address = self.sec_ad.sl4a.bluetoothGetLocalAddress()
+
+ def answer_phone_call(self):
+ """Answers an incoming phone call."""
+ if not self.is_hfp_connected():
+ self.hfp_connect()
+ # Make sure the device is in ringing state.
+ if not self.wait_for_call_state(
+ bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC):
+ raise ControllerError(
+ "Timed out after %ds waiting for the device %s to be ringing state "
+ "before anwsering the incoming phone call." %
+ (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial))
+ self.log.info("Answers the incoming phone call from hf phone %s for %s",
+ self.mac_address, self.sec_ad_mac_address)
+ return self.sl4a.bluetoothHfpClientAcceptCall(self.sec_ad_mac_address)
+
+ def call_volume_down(self):
+ """Lowers the volume."""
+ current_volume = self.mbs.getVoiceCallVolume()
+ if current_volume > 0:
+ change_volume = current_volume - 1
+ self.log.debug("Set voice call volume from %d to %d." %
+ (current_volume, change_volume))
+ self.mbs.setVoiceCallVolume(change_volume)
+
+ def call_volume_up(self):
+ """Raises the volume."""
+ current_volume = self.mbs.getVoiceCallVolume()
+ if current_volume < self.mbs.getVoiceCallMaxVolume():
+ change_volume = current_volume + 1
+ self.log.debug("Set voice call volume from %d to %d." %
+ (current_volume, change_volume))
+ self.mbs.setVoiceCallVolume(change_volume)
+
+ def disconnect_all(self):
+ self._disable_profiles()
+
+ def factory_reset_bluetooth(self):
+ """Factory resets Bluetooth on the android hfp device."""
+ self.log.info("Factory resets Bluetooth on the android target device")
+ self.pri_ad.factory_reset_bluetooth()
+
+ def get_bluetooth_mac_address(self):
+ """Gets Bluetooth mac address of this android_bt_device."""
+ self.log.info("Getting Bluetooth mac address for AndroidBtTargetDevice.")
+ mac_address = self.sl4a.bluetoothGetLocalAddress()
+ self.log.info("Bluetooth mac address of AndroidBtTargetDevice: %s",
+ mac_address)
+ return mac_address
+
+ def get_audio_params(self):
+ """Gets audio params from the android_bt_target_device."""
+ return self.audio_params
+
+ def get_new_wave_file_path(self):
+ """Gets a new wave file path for the audio capture."""
+ wave_file_path = self.wave_file_template % self.wave_file_number
+ while os.path.exists(wave_file_path):
+ self.wave_file_number += 1
+ wave_file_path = self.wave_file_template % self.wave_file_number
+ return wave_file_path
+
+ def get_unread_messages(self) -> None:
+ """Gets unread messages from the connected device (MSE)."""
+ self.sl4a.mapGetUnreadMessages(self.sec_ad_mac_address)
+
+ def hangup_phone_call(self):
+ """Hangs up an ongoing phone call."""
+ if not self.is_hfp_connected():
+ self.hfp_connect()
+ self.log.info("Hangs up the phone call from hf phone %s for %s",
+ self.mac_address, self.sec_ad_mac_address)
+ return self.sl4a.bluetoothHfpClientTerminateAllCalls(
+ self.sec_ad_mac_address)
+
+ def hfp_connect(self):
+ """Establishes the hft connection between self.pri_ad and self.sec_ad."""
+ self._verify_pri_ad()
+ connected = self.pri_ad.hfp_connect(self.sec_ad)
+ asserts.assert_true(
+ connected, "The hfp connection between {} and {} failed".format(
+ self.serial, self.sec_ad.serial))
+ self.log.info("The hfp connection between %s and %s succeed", self.serial,
+ self.sec_ad.serial)
+ return connected
+
+ def init_ambs_for_avrcp(self):
+ """Initializes media browser service for avrcp.
+
+ This is required to be done before running any of the passthrough
+ commands.
+
+ Steps:
+ 1. Starts up the AvrcpMediaBrowserService on the A2dp source phone. This
+ MediaBrowserService is part of the SL4A app.
+ 2. Switch the playback state to be paused.
+ 3. Connects a MediaBrowser to the A2dp sink's A2dpMediaBrowserService.
+
+ Returns:
+ True: if it is avrcp ready after the initialization.
+ False: if it is still not avrcp ready after the initialization.
+
+ Raises:
+ Signals.ControllerError: raise if AvrcpMediaBrowserService on the A2dp
+ source phone fails to be started.
+ """
+ if self.is_avrcp_ready():
+ return True
+ if not self.is_a2dp_sink_connected():
+ self.a2dp_sink_connect()
+
+ self.sec_ad.log.info("Starting AvrcpMediaBrowserService")
+ self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStart()
+
+ time.sleep(DEFAULT_WAIT_TIME)
+
+ # Check if the media session "BluetoothSL4AAudioSrcMBS" is active on sec_ad.
+ active_sessions = self.sec_ad.sl4a.bluetoothMediaGetActiveMediaSessions()
+ if BLUETOOTH_SL4A_AUDIO_SRC_MBS not in active_sessions:
+ raise ControllerError("Failed to start AvrcpMediaBrowserService.")
+
+ self.log.info("Connecting to A2dp media browser service")
+ self.sl4a.bluetoothMediaConnectToCarMBS()
+
+ # TODO(user) Wait for an event back instead of sleep
+ time.sleep(DEFAULT_WAIT_TIME)
+ self.avrcp_ready = True
+ return self.avrcp_ready
+
+ def is_avrcp_ready(self):
+ """Checks if the pri_ad and sec_ad are ready for avrcp."""
+ self._verify_pri_ad()
+ if self.avrcp_ready:
+ return True
+ active_sessions = self.sl4a.bluetoothMediaGetActiveMediaSessions()
+ if not active_sessions:
+ self.log.info("The device is not avrcp ready")
+ self.avrcp_ready = False
+ else:
+ self.log.info("The device is avrcp ready")
+ self.avrcp_ready = True
+ return self.avrcp_ready
+
+ def is_hfp_connected(self):
+ """Checks if the pri_ad and sec_ad are hfp connected."""
+ self._verify_pri_ad()
+ if self.sec_ad is None:
+ raise MissingBtClientDeviceError("The sec_ad was not added")
+ return self.sl4a.bluetoothHfpClientGetConnectionStatus(
+ self.sec_ad_mac_address)
+
+ def is_a2dp_sink_connected(self):
+ """Checks if the pri_ad and sec_ad are hfp connected."""
+ self._verify_pri_ad()
+ if self.sec_ad is None:
+ raise MissingBtClientDeviceError("The sec_ad was not added")
+ return self.sl4a.bluetoothA2dpSinkGetConnectionStatus(
+ self.sec_ad_mac_address)
+
+ def last_number_dial(self):
+ """Redials last outgoing phone number."""
+ if not self.is_hfp_connected():
+ self.hfp_connect()
+ self.log.info("Redials last number from hf phone %s for %s",
+ self.mac_address, self.sec_ad_mac_address)
+ self.sl4a.bluetoothHfpClientDial(self.sec_ad_mac_address, None)
+
+ def map_connect(self):
+ """Establishes the map connection between self.pri_ad and self.sec_ad."""
+ self._verify_pri_ad()
+ connected = self.pri_ad.map_connect(self.sec_ad)
+ asserts.assert_true(
+ connected, "The map connection between {} and {} failed".format(
+ self.serial, self.sec_ad.serial))
+ self.log.info("The map connection between %s and %s succeed", self.serial,
+ self.sec_ad.serial)
+
+ def map_disconnect(self) -> None:
+ """Initiates a map disconnection to the connected device.
+
+ Raises:
+ BluetoothProfileConnectionError: raised if failed to disconnect.
+ """
+ self._verify_pri_ad()
+ if not self.pri_ad.map_disconnect(self.sec_ad_mac_address):
+ raise BluetoothProfileConnectionError(
+ 'Failed to terminate the MAP connection with the device "%s".' %
+ self.sec_ad_mac_address)
+
+ def pbap_connect(self):
+ """Establishes the pbap connection between self.pri_ad and self.sec_ad."""
+ connected = self.pri_ad.pbap_connect(self.sec_ad)
+ asserts.assert_true(
+ connected, "The pbap connection between {} and {} failed".format(
+ self.serial, self.sec_ad.serial))
+ self.log.info("The pbap connection between %s and %s succeed", self.serial,
+ self.sec_ad.serial)
+
+ def pause(self):
+ """Sends Avrcp pause command."""
+ self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE, self.sec_ad)
+
+ def play(self):
+ """Sends Avrcp play command."""
+ self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY, self.sec_ad)
+
+ def power_on(self):
+ """Turns the Bluetooth on the android bt garget device."""
+ self.log.info("Turns on the bluetooth")
+ return self.sl4a.bluetoothToggleState(True)
+
+ def power_off(self):
+ """Turns the Bluetooth off the android bt garget device."""
+ self.log.info("Turns off the bluetooth")
+ return self.sl4a.bluetoothToggleState(False)
+
+ def route_call_audio(self, connect=False):
+ """Routes call audio during a call."""
+ if not self.is_hfp_connected():
+ self.hfp_connect()
+ self.log.info(
+ "Routes call audio during a call from hf phone %s for %s "
+ "audio connection %s after routing", self.mac_address,
+ self.sec_ad_mac_address, connect)
+ if connect:
+ self.sl4a.bluetoothHfpClientConnectAudio(self.sec_ad_mac_address)
+ else:
+ self.sl4a.bluetoothHfpClientDisconnectAudio(self.sec_ad_mac_address)
+
+ def reject_phone_call(self):
+ """Rejects an incoming phone call."""
+ if not self.is_hfp_connected():
+ self.hfp_connect()
+ # Make sure the device is in ringing state.
+ if not self.wait_for_call_state(
+ bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC):
+ raise ControllerError(
+ "Timed out after %ds waiting for the device %s to be ringing state "
+ "before rejecting the incoming phone call." %
+ (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial))
+ self.log.info("Rejects the incoming phone call from hf phone %s for %s",
+ self.mac_address, self.sec_ad_mac_address)
+ return self.sl4a.bluetoothHfpClientRejectCall(self.sec_ad_mac_address)
+
+ def set_audio_params(self, audio_params):
+ """Sets audio params to the android_bt_target_device."""
+ self.audio_params = audio_params
+
+ def track_previous(self):
+ """Sends Avrcp skip prev command."""
+ self.send_media_passthrough_cmd(
+ bt_constants.CMD_MEDIA_SKIP_PREV, self.sec_ad)
+
+ def track_next(self):
+ """Sends Avrcp skip next command."""
+ self.send_media_passthrough_cmd(
+ bt_constants.CMD_MEDIA_SKIP_NEXT, self.sec_ad)
+
+ def start_audio_capture(self):
+ """Starts the audio capture over adb."""
+ if self.audio_params is None:
+ raise MissingAudioParamsError("Missing audio params for captureing audio")
+ if not self.is_a2dp_sink_connected():
+ self.a2dp_sink_connect()
+ cmd = "ap2f --usage 1 --start --duration {} --target {}".format(
+ self.audio_params["duration"], self.adb_path)
+ self.log.info("Starts capturing audio with adb shell command %s", cmd)
+ self.adb.shell(cmd)
+
+ def stop_audio_capture(self):
+ """Stops the audio capture and stores it in wave file.
+
+ Returns:
+ File name of the recorded file.
+
+ Raises:
+ MissingAudioParamsError: when self.audio_params is None
+ """
+ if self.audio_params is None:
+ raise MissingAudioParamsError("Missing audio params for captureing audio")
+ if not self.is_a2dp_sink_connected():
+ self.a2dp_sink_connect()
+ adb_pull_args = [self.adb_path, self.audio_capture_path]
+ self.log.info("start adb -s %s pull %s", self.serial, adb_pull_args)
+ self._ad.adb.pull(adb_pull_args)
+ pcm_file_path = os.path.join(self.audio_capture_path, ADB_FILE)
+ self.log.info("delete the recored file %s", self.adb_path)
+ self._ad.adb.shell("rm {}".format(self.adb_path))
+ wave_file_path = self.get_new_wave_file_path()
+ self.log.info("convert pcm file %s to wav file %s", pcm_file_path,
+ wave_file_path)
+ btutils.convert_pcm_to_wav(pcm_file_path, wave_file_path, self.audio_params)
+ return wave_file_path
+
+ def stop_all_services(self):
+ """Stops all services for the pri_ad device."""
+ self.log.info("Stops all services on the android bt target device")
+ self._ad.services.stop_all()
+
+ def stop_ambs_for_avrcp(self):
+ """Stops media browser service for avrcp."""
+ if self.is_avrcp_ready():
+ self.log.info("Stops avrcp connection")
+ self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStop()
+ self.avrcp_ready = False
+
+ def stop_voice_dial(self):
+ """Stops voice dial."""
+ if not self.is_hfp_connected():
+ self.hfp_connect()
+ self.log.info("Stops voice dial from hf phone %s for %s", self.mac_address,
+ self.sec_ad_mac_address)
+ if self.is_hfp_connected():
+ self.sl4a.bluetoothHfpClientStopVoiceRecognition(
+ self.sec_ad_mac_address)
+
+ def take_bug_report(self,
+ test_name=None,
+ begin_time=None,
+ timeout=300,
+ destination=None):
+ """Wrapper method to capture bugreport on the android bt target device."""
+ self._ad.take_bug_report(test_name, begin_time, timeout, destination)
+
+ def voice_dial(self):
+ """Triggers voice dial."""
+ if not self.is_hfp_connected():
+ self.hfp_connect()
+ self.log.info("Triggers voice dial from hf phone %s for %s",
+ self.mac_address, self.sec_ad_mac_address)
+ if self.is_hfp_connected():
+ self.sl4a.bluetoothHfpClientStartVoiceRecognition(
+ self.sec_ad_mac_address)
+
+ def log_type(self):
+ """Gets the log type of Android bt target device.
+
+ Returns:
+ A string, the log type of Android bt target device.
+ """
+ return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR
+
+
+class BluetoothProfileConnectionError(Exception):
+ """Error for Bluetooth Profile connection problems."""
+
+
+class MissingBtClientDeviceError(Exception):
+ """Error for missing required bluetooth client device."""
+
+
+class MissingAudioParamsError(Exception):
+ """Error for missing the audio params."""
diff --git a/system/blueberry/controllers/bt_stub.py b/system/blueberry/controllers/bt_stub.py
new file mode 100644
index 0000000000..0cd024e23b
--- /dev/null
+++ b/system/blueberry/controllers/bt_stub.py
@@ -0,0 +1,294 @@
+"""Bluetooth stub class.
+
+This controller offers no direct control to any device. It simply prompts the
+user to perform a certain action on the device it is standing in for. For use
+in test scripts where no controller for the DUT exists.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import six
+
+
+class BtStub(object):
+ """Stub for when controller class does not exist for a Bluetooth device.
+
+ This class will simulate semi-automation by prompting user to manually
+ perform actions on the Bluetooth device.
+ """
+
+ # Connection Commands
+ def power_off(self):
+ """Prompt the user to power off the Bluetooth device."""
+ six.moves.input("Power Off Bluetooth device, then press enter.")
+
+ def power_on(self):
+ """Prompt the user to power on the Bluetooth device."""
+ six.moves.input("Power ON Bluetooth device, then press enter.")
+
+ def activate_pairing_mode(self):
+ """Prompt the user to put the Bluetooth device into pairing mode."""
+ six.moves.input("Put Bluetooth device into pairing mode,"
+ "then press enter.")
+
+ def get_bluetooth_mac_address(self):
+ """Prompt the user to input the Bluetooth MAC address for this device.
+
+ Returns:
+ mac_address (str): the string received from user input.
+ """
+ mac_address = six.moves.input("Enter BT MAC address, then press enter.")
+ return mac_address
+
+ def set_device_name(self, device_name):
+ """Prompt the user to set the device name (Carkit Only).
+
+ Args:
+ device_name: String of device name to be set.
+
+ Returns: None
+ """
+ six.moves.input("Device name is: %s", device_name)
+
+ def factory_reset_bluetooth(self):
+ """Prompt the user to factory reset Bluetooth on the device."""
+ six.moves.input("Factory reset Bluetooth on the Bluetooth device, "
+ "then press enter.")
+
+ # A2DP: Bluetooth stereo streaming protocol methods.
+ def is_audio_playing(self):
+ """Prompt the user to indicate if the audio is playing.
+
+ Returns:
+ A Bool, true is audio is playing, false if not.
+ """
+ audio_playing = six.moves.input("Indicate if audio is playing: "
+ "true/false.")
+ return bool(audio_playing)
+
+ # AVRCP Commands
+ def volume_up(self):
+ """Prompt the user to raise the volume on the Bluetooth device."""
+ six.moves.input("Press the Volume Up Button on the Bluetooth device, "
+ "then press enter.")
+
+ def volume_down(self):
+ """Prompt the user to lower the volume on the Bluetooth device."""
+ six.moves.input("Press the Volume Down Button on the Bluetooth device, "
+ "then press enter.")
+
+ def track_next(self):
+ """Prompt the user to skip the track on the Bluetooth device."""
+ six.moves.input("Press the Skip Track Button on the Bluetooth device, "
+ "then press enter.")
+
+ def track_previous(self):
+ """Prompt the user to rewind the track on the Bluetooth device."""
+ six.moves.input("Press the Rewind Track Button on the Bluetooth device, "
+ "then press enter.")
+
+ def play(self):
+ """Prompt the user to press play on the Bluetooth device."""
+ six.moves.input("Press the Play Button on the Bluetooth device, "
+ "then press enter.")
+
+ def pause(self):
+ """Prompt the user to press pause on the Bluetooth device."""
+ six.moves.input("Press the Pause Button on the Bluetooth device, "
+ "then press enter.")
+
+ def repeat(self):
+ """Prompt the user to set the repeat option on the device."""
+ six.moves.input("Press the Repeat Button on the Bluetooth device, "
+ "then press enter.")
+
+ def fast_forward(self):
+ """Prompt the user to press the fast forward option/button on the device.
+
+ Returns: None
+ """
+ six.moves.input("Press the Fast Forward Button on the Bluetooth device, "
+ "then press enter.")
+
+ def rewind(self):
+ """Prompt the user to press Rewind option on the device.
+
+ Returns: None
+ """
+ six.moves.input("Press the Rewind option on the Bluetooth device, "
+ "then press enter.")
+
+ # TODO(user): browse_media_content may need more work in terms of input
+ # params and value(s) returned
+ def browse_media_content(self, directory=""):
+ """Prompt the user to enter to the paired device media folders.
+
+ Args:
+ directory: A path to the directory to browse to.
+
+ Returns:
+ List - empty
+ """
+ six.moves.input("Navigate to directory: %s", directory)
+ return []
+
+ def delete_song(self, file_path=""):
+ """Prompt the user to delete a song.
+
+ Args:
+ file_path (optional): A file path to the song to be deleted.
+
+ Returns: None
+ """
+ six.moves.input("Delete a song %s", file_path)
+
+ def shuffle_song(self):
+ """Prompt the user to shuffle a playlist.
+
+ Returns: None
+ """
+ six.moves.input("Shuffle a playlist")
+
+ # HFP (Hands Free Phone protocol) Commands
+ def call_volume_up(self):
+ """Prompt the user to press the volume up button on an active call.
+
+ Returns: None
+ """
+ six.moves.input("Press the volume up button for an active call.")
+
+ def call_volume_down(self):
+ """Prompt the user to press the volume down button on an active call.
+
+ Returns: None
+ """
+ six.moves.input("Press the volume down button for an active call.")
+
+ def answer_phone_call(self):
+ """Prompt the user to press the button to answer a phone call..
+
+ Returns: None
+ """
+ six.moves.input("Press the button to answer the phone call.")
+
+ def hangup_phone_call(self):
+ """Prompt the user to press the button to hang up on an active phone call.
+
+ Returns: None
+ """
+ six.moves.input("Press the button to hang up on the phone call.")
+
+ def call_contact(self, name):
+ """Prompt the user to select a contact from the phonebook and call.
+
+ Args:
+ name: string name of contact to call
+
+ Returns: None
+ """
+ six.moves.input("Select contact, %s, to call.", name)
+
+ def call_number(self, phone_number):
+ """Prompt the user to dial a phone number and call.
+
+ Args:
+ phone_number: string of phone number to dial and call
+
+ Returns: None
+ """
+ six.moves.input("Dial phone number and initiate a call. %s", phone_number)
+
+ def swap_call(self):
+ """Prompt the user to push the button to swap.
+
+ Function swaps between the primary and secondary calls. One call will
+ be active and the other will be on hold.
+
+ Returns: None
+ """
+ six.moves.input("Press the button to swap calls.")
+
+ def merge_call(self):
+ """Prompt the user to push the button to merge calls.
+
+ Merges calls between the primary and secondary calls into a conference
+ call.
+
+ Returns: None
+ """
+ six.moves.input("Press the button to merge calls into a conference call.")
+
+ def hold_call(self):
+ """Prompt the user to put the primary call on hold.
+
+ Primary call will be on hold, while the secondary call becomes active.
+
+ Returns: None
+ """
+ six.moves.input("Press the hold button to put primary call on hold.")
+
+ def mute_call(self):
+ """Prompt the user to mute the ongoing active call.
+
+ Returns: None
+ """
+ six.moves.input("Press Mute button on active call.")
+
+ def unmute_call(self):
+ """Prompt the user to unmute the ongoing active call.
+
+ Returns: None
+ """
+ six.moves.input("Press the Unmute button on an active call.")
+
+ def reject_phone_call(self):
+ """Prompt the user to reject an incoming call.
+
+ Returns: None
+ """
+ six.moves.input("Press the Reject button to reject an incoming call.")
+
+ def answer_voip_call(self):
+ """Prompt the user to press the button to answer a VOIP call.
+
+ Returns: None
+ """
+ six.moves.input("Press the Answer button on an incoming VOIP phone call.")
+
+ def hangup_voip_call(self):
+ """Prompt the user to press the button to hangup on the active VOIP call.
+
+ Returns: None
+ """
+ six.moves.input("Press the hangup button on the active VOIP call.")
+
+ def reject_voip_call(self):
+ """Prompt the user to press the Reject button on the incoming VOIP call.
+
+ Returns: None
+ """
+ six.moves.input("Press the Reject button on the incoming VOIP call.")
+
+ def voice_dial(self):
+ """Prompt user to initiate a voice dial from the phone.
+
+ Returns: None
+ """
+ six.moves.input("Initiate a voice dial.")
+
+ def last_number_dial(self):
+ """Prompt user to iniate a call to the last number dialed.
+
+ Returns: None
+ """
+ six.moves.input("Initiate a call to the last number dialed.")
+
+ # TODO(user): does this method need a input parameter?
+ def route_call_audio(self):
+ """Prompt user to route a call from AG to HF, and vice versa.
+
+ Returns: None
+ """
+ six.moves.input("Reroute call audio.")
diff --git a/system/blueberry/controllers/derived_bt_device.py b/system/blueberry/controllers/derived_bt_device.py
new file mode 100644
index 0000000000..37fafc1c64
--- /dev/null
+++ b/system/blueberry/controllers/derived_bt_device.py
@@ -0,0 +1,116 @@
+# Copyright 2019 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Controller class for a Bluetooth Device.
+
+This controller will instantiate derived classes from BtDevice and the
+module/class specified via strings in configs dictionary.
+
+The idea is to allow vendors to run blueberry tests with their controller class
+through this controller module, eliminating the need to edit the test classes
+themselves.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import importlib
+import logging
+import yaml
+
+MOBLY_CONTROLLER_CONFIG_NAME = "DerivedBtDevice"
+MOBLY_CONTROLLER_CONFIG_MODULE_KEY = "ModuleName"
+MOBLY_CONTROLLER_CONFIG_CLASS_KEY = "ClassName"
+MOBLY_CONTROLLER_CONFIG_PARAMS_KEY = "Params"
+
+
+def create(configs):
+ """Creates DerivedBtDevice controller objects.
+
+ For each config dict in configs:
+ Import desired controller class from config, compose DerivedBtDevice class
+ from that class and BtDevice, instantiate with params from config.
+
+ Args:
+ configs (list): A list of dicts, each representing a configuration for a
+ Bluetooth device. Each dict should be of the format:
+ {"ModuleName": <name of module in blueberry.controllers>,
+ "ClassName": <name of class to derive controller from>,
+ "Params": <kwargs in dict form to instantiate class with>}
+
+ Returns:
+ A list with DerivedBtDevice objects.
+ """
+ return [_create_bt_device_class(config) for config in configs]
+
+
+def _create_bt_device_class(config):
+ """Created new device class from associated device controller from config."""
+ module = importlib.import_module(
+ "blueberry.controllers.%s" %
+ config[MOBLY_CONTROLLER_CONFIG_MODULE_KEY])
+ logging.info("Creating DerivedBtDevice from %r", config)
+ cls = getattr(module, config[MOBLY_CONTROLLER_CONFIG_CLASS_KEY])
+ params = yaml.safe_load("%s" %
+ config.get(MOBLY_CONTROLLER_CONFIG_PARAMS_KEY, {}))
+ new_class = type(MOBLY_CONTROLLER_CONFIG_NAME, (cls, BtDevice), params)
+ new_class_inst = new_class(**params)
+ return new_class_inst
+
+
+def destroy(derived_bt_devices):
+ """Cleans up DerivedBtDevice objects."""
+ for device in derived_bt_devices:
+ # Execute cleanup if the controller class has the method "clean_up".
+ if hasattr(device, "clean_up"):
+ device.clean_up()
+ del derived_bt_devices
+
+
+class BtDevice(object):
+ """Base class for all Bluetooth Devices.
+
+ Provides additional necessary functionality for use within blueberry.
+ """
+
+ def __init__(self):
+ """Initializes a derived bt base class."""
+ self._user_params = {}
+
+ def setup(self):
+ """For devices that need extra setup."""
+
+ def set_user_params(self, params):
+ """Intended for passing mobly user_params into a derived device class.
+
+ Args:
+ params: Mobly user params.
+ """
+ self._user_params = params
+
+ def get_user_params(self):
+ """Return saved user_params.
+
+ Returns:
+ user_params.
+ """
+ return self._user_params
+
+ def factory_reset_bluetooth(self) -> None:
+ """Factory resets Bluetooth on an BT Device."""
+ raise NotImplementedError
+
+ def activate_pairing_mode(self) -> None:
+ """Activates pairing mode on an AndroidDevice."""
+ raise NotImplementedError
diff --git a/system/blueberry/controllers/grpc_bt_sync_mock.py b/system/blueberry/controllers/grpc_bt_sync_mock.py
new file mode 100644
index 0000000000..d3e00e8e78
--- /dev/null
+++ b/system/blueberry/controllers/grpc_bt_sync_mock.py
@@ -0,0 +1,81 @@
+"""A generic gRPC mock device controller.
+
+Example MH testbed config for Hostside:
+- name: GrpcBtSyncStub-1
+ devices:
+ - type: MiscTestbedSubDevice
+ dimensions:
+ mobly_type: DerivedBtDevice
+ properties:
+ ModuleName: grpc_bt_sync_mock
+ ClassName: GrpcBtSyncMock
+ Params:
+ config:
+ mac_address: FE:ED:BE:EF:CA:FE
+ dimensions:
+ device: GrpcBtSyncStub
+"""
+import subprocess
+
+from absl import flags
+from absl import logging
+import grpc
+
+# Internal import
+from blueberry.grpc.proto import blueberry_device_controller_pb2
+from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address')
+
+
+class GrpcBtSyncMock(object):
+ """Generic GRPC device controller."""
+
+ def __init__(self, config):
+ """Initialize GRPC object."""
+ super(GrpcBtSyncMock, self).__init__()
+ self.mac_address = config['mac_address']
+
+ def __del__(self):
+ self.server_proc.terminate()
+ del self.channel_creds
+ del self.channel
+ del self.stub
+
+ def setup(self):
+ """Setup the gRPC server that the sync mock will respond to."""
+ server_path = self.get_user_params()['mh_files']['grpc_server'][0]
+ logging.info('Start gRPC server: %s', server_path)
+ self.server_proc = subprocess.Popen([server_path],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ bufsize=0)
+
+ self.channel_creds = loas2.loas2_channel_credentials()
+ self.channel = grpc.secure_channel(FLAGS.server, self.channel_creds)
+ grpc.channel_ready_future(self.channel).result()
+ self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub(
+ self.channel)
+
+ def init_setup(self):
+ logging.info('init setup TO BE IMPLEMENTED')
+
+ def set_target(self, bt_device):
+ self._target_device = bt_device
+
+ def pair_and_connect_bluetooth(self, target_mac_address):
+ """Pair and connect to a peripheral Bluetooth device."""
+ request = blueberry_device_controller_pb2.TargetMacAddress(
+ mac_address=target_mac_address)
+ try:
+ response = self.stub.PairAndConnectBluetooth(request)
+ logging.info('pair and connect bluetooth response: %s', response)
+ if response.error:
+ print('error handler TO BE IMPLEMENTED')
+ else:
+ return response.pairing_time_sec, response.connection_time_sec
+ except grpc.RpcError as rpc_error:
+ print(rpc_error)
diff --git a/system/blueberry/controllers/grpc_bt_target_mock.py b/system/blueberry/controllers/grpc_bt_target_mock.py
new file mode 100644
index 0000000000..85f6b516ec
--- /dev/null
+++ b/system/blueberry/controllers/grpc_bt_target_mock.py
@@ -0,0 +1,78 @@
+"""gRPC mock target for testing purposes.
+
+Example MH testbed config for Hostside:
+- name: GrpcBtTargetStub-1
+ devices:
+ - type: MiscTestbedSubDevice
+ dimensions:
+ mobly_type: DerivedBtDevice
+ properties:
+ ModuleName: grpc_bt_target_mock
+ ClassName: GrpcBtTargetMock
+ Params:
+ config:
+ mac_address: FE:ED:BE:EF:CA:FE
+ dimensions:
+ device: GrpcBtTargetStub
+"""
+import subprocess
+
+from absl import flags
+from absl import logging
+import grpc
+
+# Internal import
+from blueberry.grpc.proto import blueberry_device_controller_pb2
+from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc
+
+FLAGS = flags.FLAGS
+
+
+class GrpcBtTargetMock(object):
+ """BT Mock Target for testing the GRPC interface."""
+
+ def __init__(self, config):
+ """Initialize GRPC object."""
+ super(GrpcBtTargetMock, self).__init__()
+ self.mac_address = config['mac_address']
+
+ def __del__(self):
+ self.server_proc.terminate()
+ del self.channel_creds
+ del self.channel
+ del self.stub
+
+ def setup(self):
+ """Setup the gRPC server that the target mock will respond to."""
+ server_path = self.get_user_params()['mh_files']['grpc_server'][0]
+ logging.info('Start gRPC server: %s', server_path)
+ self.server_proc = subprocess.Popen([server_path],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ bufsize=0)
+
+ self.channel_creds = loas2.loas2_channel_credentials()
+ self.channel = grpc.secure_channel(FLAGS.server, self.channel_creds)
+ grpc.channel_ready_future(self.channel).result()
+ self.stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub(
+ self.channel)
+
+ def activate_pairing_mode(self):
+ logging.info('activate pairing mode TO BE IMPLEMENTED')
+ request = blueberry_device_controller_pb2.DiscoverableMode(mode=True)
+ try:
+ response = self.stub.SetDiscoverableMode(request)
+ logging.info('set discoverageble response: %s', response)
+ return 0
+ except grpc.RpcError as rpc_error:
+ print(rpc_error)
+ return -1
+
+ def factory_reset_bluetooth(self):
+ logging.info('factory reset TO BE IMPLEMENTED')
+
+ def get_bluetooth_mac_address(self):
+ logging.info('mac_address: %s', self.mac_address)
+ return self.mac_address
diff --git a/system/blueberry/decorators/android_bluetooth_client_decorator.py b/system/blueberry/decorators/android_bluetooth_client_decorator.py
new file mode 100644
index 0000000000..f9834ea00f
--- /dev/null
+++ b/system/blueberry/decorators/android_bluetooth_client_decorator.py
@@ -0,0 +1,48 @@
+"""A Bluetooth Client Decorator util for an Android Device.
+
+This utility allows the user to decorate an device with a custom decorator from
+the blueberry/decorators directory.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+from __future__ import print_function
+
+import importlib
+import re
+from mobly.controllers.android_device import AndroidDevice
+
+
+def decorate(ad, decorator):
+ """Utility to decorate an AndroidDevice.
+
+ Args:
+ ad: Device, must be of type AndroidDevice.
+ decorator: String, class name of the decorator to use.
+ Returns:
+ AndroidDevice object.
+ """
+
+ if not isinstance(ad, AndroidDevice):
+ raise TypeError('Must apply AndroidBluetoothClientDecorator to an '
+ 'AndroidDevice')
+ decorator_module = camel_to_snake(decorator)
+ module = importlib.import_module(
+ 'blueberry.decorators.%s' % decorator_module)
+ cls = getattr(module, decorator)
+ ad = cls(ad)
+
+ return ad
+
+
+def camel_to_snake(cls_name):
+ """Utility to convert a class name from camel case to snake case.
+
+ Args:
+ cls_name: string
+ Returns:
+ string
+ """
+ s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', cls_name)
+ return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
diff --git a/system/blueberry/decorators/android_bluetooth_client_test_decorator.py b/system/blueberry/decorators/android_bluetooth_client_test_decorator.py
new file mode 100644
index 0000000000..769579e6fd
--- /dev/null
+++ b/system/blueberry/decorators/android_bluetooth_client_test_decorator.py
@@ -0,0 +1,25 @@
+"""An example Bluetooth Client Decorator.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+from __future__ import print_function
+
+from mobly.controllers.android_device import AndroidDevice
+
+
+class AndroidBluetoothClientTestDecorator(AndroidDevice):
+ """A class used to test Blueberry's BT Client Profile decoration."""
+
+ def __init__(self, ad):
+ self._ad = ad
+ if not isinstance(self._ad, AndroidDevice):
+ raise TypeError('Must apply AndroidBluetoothClientTestDecorator to an '
+ 'AndroidDevice')
+
+ def __getattr__(self, name):
+ return getattr(self._ad, name)
+
+ def test_decoration(self):
+ return 'I make this device fancy!'
diff --git a/system/blueberry/grpc/blueberry_device_controller.py b/system/blueberry/grpc/blueberry_device_controller.py
new file mode 100644
index 0000000000..c99a204f0d
--- /dev/null
+++ b/system/blueberry/grpc/blueberry_device_controller.py
@@ -0,0 +1,40 @@
+"""Blueberry gRPC device controller.
+
+This is a server to act as a mock device for testing the Blueberry gRPC
+interface.
+"""
+
+from concurrent import futures
+from absl import app
+from absl import flags
+
+import grpc
+
+# Internal import
+from blueberry.grpc import blueberry_device_controller_service
+from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc
+
+
+_HOST = '[::]'
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer('port', 10000, 'port to listen on')
+flags.DEFINE_integer('threads', 10, 'number of worker threads in thread pool')
+
+
+def main(unused_argv):
+ server = grpc.server(
+ futures.ThreadPoolExecutor(max_workers=FLAGS.threads),
+ ports=(FLAGS.port,)) # pytype: disable=wrong-keyword-args
+ servicer = (
+ blueberry_device_controller_service.BlueberryDeviceControllerServicer())
+ blueberry_device_controller_pb2_grpc.add_BlueberryDeviceControllerServicer_to_server(
+ servicer, server)
+ server_creds = loas2.loas2_server_credentials()
+ server.add_secure_port(f'{_HOST}:{FLAGS.port}', server_creds)
+ server.start()
+ server.wait_for_termination()
+
+
+if __name__ == '__main__':
+ app.run(main)
diff --git a/system/blueberry/grpc/blueberry_device_controller_service.py b/system/blueberry/grpc/blueberry_device_controller_service.py
new file mode 100644
index 0000000000..a17d7db25b
--- /dev/null
+++ b/system/blueberry/grpc/blueberry_device_controller_service.py
@@ -0,0 +1,37 @@
+"""Blueberry gRPC Mock Service.
+
+This is simple mock service that is used to verify the implementation of the
+Blueberry gRPC device controller interface.
+"""
+
+from blueberry.grpc.proto import blueberry_device_controller_pb2
+from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc
+
+
+class BlueberryDeviceControllerServicer(
+ blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerServicer):
+ """A BlueberryTest gRPC server."""
+
+ def __init__(self, *args, **kwargs):
+ super(BlueberryDeviceControllerServicer, self).__init__(*args, **kwargs)
+ self._error = "testing 123"
+
+ def SetDiscoverableMode(self, request, servicer_context):
+ """Sets the device's discoverable mode.
+
+ Args:
+ request: a blueberry_test_server_pb2.DiscoverableMode object containing
+ the "mode" to set the device to.
+ servicer_context: A grpc.ServicerContext for use during service of the
+ RPC.
+
+ Returns:
+ A blueberry_test_server_pb2.DiscoverableResult
+ """
+ return blueberry_device_controller_pb2.DiscoverableResult(
+ result=True,
+ error=self._error)
+
+ def PairAndConnectBluetooth(self, request, servicer_context):
+ return blueberry_device_controller_pb2.PairAndConnectBluetoothResult(
+ pairing_time_sec=0.1, connection_time_sec=0.2, error=None)
diff --git a/system/blueberry/grpc/blueberry_test_client.py b/system/blueberry/grpc/blueberry_test_client.py
new file mode 100644
index 0000000000..1fe8eb1b1c
--- /dev/null
+++ b/system/blueberry/grpc/blueberry_test_client.py
@@ -0,0 +1,46 @@
+"""Blueberry Test Client.
+
+Simple gRPC client to test the Blueberry Mock server.
+"""
+
+from absl import app
+from absl import flags
+
+import grpc
+
+# Internal import
+from blueberry.grpc.proto import blueberry_device_controller_pb2
+from blueberry.grpc.proto import blueberry_device_controller_pb2_grpc
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('server', 'dns:///[::1]:10000', 'server address')
+
+
+def _UpdateDiscoveryMode(stub, request):
+ try:
+ print('try SetDiscoverableMode')
+ response = stub.SetDiscoverableMode(request)
+ print('complete response')
+ print(response)
+ return 0
+ except grpc.RpcError as rpc_error:
+ print(rpc_error)
+ return -1
+
+
+def main(unused_argv):
+ channel_creds = loas2.loas2_channel_credentials()
+ with grpc.secure_channel(FLAGS.server, channel_creds) as channel:
+ grpc.channel_ready_future(channel).result()
+ stub = blueberry_device_controller_pb2_grpc.BlueberryDeviceControllerStub(
+ channel)
+
+ print('request grpc')
+ request = blueberry_device_controller_pb2.DiscoverableMode(
+ mode=True)
+ print('Call _UpdateDiscoveryMode')
+ return _UpdateDiscoveryMode(stub, request)
+
+
+if __name__ == '__main__':
+ app.run(main)
diff --git a/system/blueberry/grpc/proto/blueberry_device_controller.proto b/system/blueberry/grpc/proto/blueberry_device_controller.proto
new file mode 100644
index 0000000000..d3f39063aa
--- /dev/null
+++ b/system/blueberry/grpc/proto/blueberry_device_controller.proto
@@ -0,0 +1,36 @@
+syntax = "proto3";
+
+package wearables.qa.blueberry.grpc;
+
+option java_multiple_files = true;
+option jspb_use_correct_proto2_semantics = true;
+
+message DiscoverableMode {
+ bool mode = 1; // True to set discoverable on, False to set discoverable off.
+}
+
+message DiscoverableResult {
+ bool result = 1; // True if successful, False if unsuccessful.
+ string error = 2; // Error message if unsuccessful.
+}
+
+message TargetMacAddress {
+ string mac_address = 1; // Mac Address of target device.
+}
+
+message PairAndConnectBluetoothResult {
+ double pairing_time_sec =
+ 1; // The time it takes in seconds to pair the devices.
+ double connection_time_sec =
+ 2; // The time it takes in seconds to connect the devices.
+ string error = 3; // Error message if unsuccessful.
+}
+
+service BlueberryDeviceController {
+ // Returns the result from a request to set device to discoverable.
+ rpc SetDiscoverableMode(DiscoverableMode) returns (DiscoverableResult) {}
+
+ // Returns the result from a request to connect to a target device.
+ rpc PairAndConnectBluetooth(TargetMacAddress)
+ returns (PairAndConnectBluetoothResult) {}
+}
diff --git a/system/blueberry/sample_testbed.yaml b/system/blueberry/sample_testbed.yaml
new file mode 100644
index 0000000000..99da384bde
--- /dev/null
+++ b/system/blueberry/sample_testbed.yaml
@@ -0,0 +1,10 @@
+TestBeds:
+ - Name: SampleTestBed
+ Controllers:
+ AndroidDevice:
+ - serial: 94GAZ00A5C
+ phone_number: 12341234
+ DerivedBtDevice:
+ - ModuleName: iclever_hb01
+ ClassName: IcleverHb01
+ Params: '{"config":{"mac_address":"C4:45:67:02:94:F0","fifo_id":"AH06IVJP","arduino_port":"1"}}'
diff --git a/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py b/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py
new file mode 100644
index 0000000000..9b578be27a
--- /dev/null
+++ b/system/blueberry/tests/a2dp/bluetooth_a2dp_test.py
@@ -0,0 +1,221 @@
+# Lint as: python3
+"""Tests for Bluetooth A2DP audio streamming."""
+
+import time
+
+from mobly import asserts
+from mobly import test_runner
+from mobly import signals
+
+from blueberry.controllers import android_bt_target_device
+from blueberry.utils import blueberry_base_test
+from blueberry.utils import bt_audio_utils
+from blueberry.utils import bt_constants
+from blueberry.utils import bt_test_utils
+
+# Number of seconds for A2DP audio check.
+A2DP_AUDIO_CHECK_TIMEOUT_SEC = 3
+
+# Number of seconds for duration of reference audio.
+REFERENCE_AUDIO_DURATION_SEC = 1.0
+
+# Default threshold of the mean opinion score (MOS).
+MOS_THRESHOLD = 4.5
+
+# The audio parameters for creating a sine wave.
+AUDIO_PARAMS = {
+ 'file_path': '/sdcard/Music',
+ 'frequency': 480,
+ 'channel': 2,
+ 'sample_rate': 48000,
+ 'sample_format': 16,
+ 'duration_sec': 60
+}
+
+
+class BluetoothA2dpTest(blueberry_base_test.BlueberryBaseTest):
+ """Test class for Bluetooth A2DP test.
+
+ This test uses a fixed frequency sine wave to be the reference audio, makes a
+ DUT play this audio and then starts audio capture from a connected Bluetooth
+ sink device, measures mean opinion score (MOS) of the recorded audio and
+ compares the MOS with a threshold to detemine the test result.
+ """
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothA2dpTest, self).setup_class()
+
+ # Threshold of the MOS to determine a test result. Default value is 4.5.
+ self.threshold = float(self.user_params.get('mos_threshold', MOS_THRESHOLD))
+
+ self.phone = self.android_devices[0]
+ self.phone.init_setup()
+ self.phone.sl4a_setup()
+
+ # Generates a sine wave to be reference audio in comparison, and push it to
+ # the phone storage.
+ self.audio_file_on_device, self.audio_file_on_host = (
+ bt_audio_utils.generate_sine_wave_to_device(
+ self.phone,
+ AUDIO_PARAMS['file_path'],
+ AUDIO_PARAMS['frequency'],
+ AUDIO_PARAMS['channel'],
+ AUDIO_PARAMS['sample_rate'],
+ AUDIO_PARAMS['sample_format'],
+ AUDIO_PARAMS['duration_sec'])
+ )
+
+ # Trims the audio to 1 second duration for reference.
+ self.reference_audio_file = bt_audio_utils.trim_audio(
+ audio_file=self.audio_file_on_host,
+ duration_sec=REFERENCE_AUDIO_DURATION_SEC)
+
+ self.derived_bt_device = self.derived_bt_devices[0]
+ self.derived_bt_device.factory_reset_bluetooth()
+ self.derived_bt_device.activate_pairing_mode()
+ self.mac_address = self.derived_bt_device.get_bluetooth_mac_address()
+ self.phone.pair_and_connect_bluetooth(self.mac_address)
+
+ # Sleep until the connection stabilizes.
+ time.sleep(3)
+
+ # Adds the phone to be the secondary device in the android-to-android case.
+ if isinstance(self.derived_bt_device,
+ android_bt_target_device.AndroidBtTargetDevice):
+ self.derived_bt_device.add_sec_ad_device(self.phone)
+
+ def assert_a2dp_expected_status(self, is_playing, fail_msg):
+ """Asserts that A2DP audio is in the expected status.
+
+ Args:
+ is_playing: bool, True if A2DP audio is playing as expected.
+ fail_msg: string, a message explaining the details of test failure.
+ """
+ bt_test_utils.wait_until(
+ timeout_sec=A2DP_AUDIO_CHECK_TIMEOUT_SEC,
+ condition_func=self.phone.mbs.btIsA2dpPlaying,
+ func_args=[self.mac_address],
+ expected_value=is_playing,
+ exception=signals.TestFailure(fail_msg))
+
+ def test_play_a2dp_audio(self):
+ """Test for playing A2DP audio through Bluetooth."""
+
+ # Plays media audio from the phone.
+ audio_file_url = 'file://' + self.audio_file_on_device
+ if not self.phone.sl4a.mediaPlayOpen(audio_file_url):
+ raise signals.TestError(
+ 'Failed to open and play "%s" on the phone "%s".' %
+ (self.audio_file_on_device, self.phone.serial))
+ self.phone.sl4a.mediaPlayStart()
+
+ # Starts audio capture for Bluetooth audio stream.
+ self.derived_bt_device.start_audio_capture()
+
+ # Stops audio capture and generates an recorded audio file.
+ recorded_audio_file = self.derived_bt_device.stop_audio_capture()
+ self.phone.sl4a.mediaPlayStop()
+ self.phone.sl4a.mediaPlayClose()
+
+ # Measures MOS for the recorded audio.
+ mos = bt_audio_utils.measure_audio_mos(recorded_audio_file,
+ self.reference_audio_file)
+
+ # Asserts that the measured MOS should be more than the threshold.
+ asserts.assert_true(
+ mos >= self.threshold,
+ 'MOS of the recorded audio "%.3f" is lower than the threshold "%.3f".' %
+ (mos, self.threshold))
+
+ def test_resume_a2dp_audio_after_phone_call_ended(self):
+ """Test for resuming A2DP audio after a phone call ended.
+
+ Tests that A2DP audio can be paused when receiving a incoming phone call,
+ and resumed after this phone call ended.
+ """
+ # Checks if two android device exist.
+ if len(self.android_devices) < 2:
+ raise signals.TestError('This test requires two android devices.')
+ pri_phone = self.phone
+ sec_phone = self.android_devices[1]
+ sec_phone.init_setup()
+ pri_number = pri_phone.dimensions.get('phone_number')
+ if not pri_number:
+ raise signals.TestError('Please set the dimension "phone_number" to the '
+ 'primary phone.')
+ sec_number = sec_phone.dimensions.get('phone_number')
+ if not sec_number:
+ raise signals.TestError('Please set the dimension "phone_number" to the '
+ 'secondary phone.')
+
+ # Plays media audio from the phone.
+ audio_file_url = 'file://' + self.audio_file_on_device
+ if not self.phone.sl4a.mediaPlayOpen(audio_file_url):
+ raise signals.TestError(
+ 'Failed to open and play "%s" on the phone "%s".' %
+ (self.audio_file_on_device, self.phone.serial))
+ self.phone.sl4a.mediaPlayStart()
+
+ # Checks if A2DP audio is playing.
+ self.assert_a2dp_expected_status(
+ is_playing=True,
+ fail_msg='A2DP audio is not playing.')
+
+ try:
+ # Makes a incoming phone call.
+ sec_phone.sl4a.telecomCallNumber(pri_number)
+ sec_phone.log.info('Made a phone call to device "%s".' % pri_phone.serial)
+ pri_phone.log.info('Waiting for the incoming call from device "%s"...'
+ % sec_phone.serial)
+
+ is_ringing = pri_phone.wait_for_call_state(
+ bt_constants.CALL_STATE_RINGING,
+ bt_constants.CALL_STATE_TIMEOUT_SEC)
+ if not is_ringing:
+ raise signals.TestError(
+ 'Timed out after %ds waiting for the incoming call from device '
+ '"%s".' % (bt_constants.CALL_STATE_TIMEOUT_SEC, sec_phone.serial))
+
+ # Checks if A2DP audio is paused.
+ self.assert_a2dp_expected_status(
+ is_playing=False,
+ fail_msg='A2DP audio is not paused when receiving a phone call.')
+ finally:
+ # Ends the incoming phone call.
+ sec_phone.sl4a.telecomEndCall()
+ sec_phone.log.info('Ended the phone call.')
+ is_idle = pri_phone.wait_for_call_state(
+ bt_constants.CALL_STATE_IDLE,
+ bt_constants.CALL_STATE_TIMEOUT_SEC)
+ if not is_idle:
+ raise signals.TestError(
+ 'Timed out after %ds waiting for the phone call to be ended.' %
+ bt_constants.CALL_STATE_TIMEOUT_SEC)
+
+ # Checks if A2DP audio is resumed.
+ self.assert_a2dp_expected_status(
+ is_playing=True,
+ fail_msg='A2DP audio is not resumed when the phone call is ended.')
+
+ # Starts audio capture for Bluetooth audio stream.
+ self.derived_bt_device.start_audio_capture()
+
+ # Stops audio capture and generates an recorded audio file.
+ recorded_audio_file = self.derived_bt_device.stop_audio_capture()
+ pri_phone.sl4a.mediaPlayStop()
+ pri_phone.sl4a.mediaPlayClose()
+
+ # Measures MOS for the recorded audio.
+ mos = bt_audio_utils.measure_audio_mos(recorded_audio_file,
+ self.reference_audio_file)
+
+ # Asserts that the measured MOS should be more than the threshold.
+ asserts.assert_true(
+ mos >= self.threshold,
+ 'MOS of the recorded audio "%.3f" is lower than the threshold "%.3f".' %
+ (mos, self.threshold))
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/audio_capture/bluetooth_audio_capture_test.py b/system/blueberry/tests/audio_capture/bluetooth_audio_capture_test.py
new file mode 100644
index 0000000000..74e8705b32
--- /dev/null
+++ b/system/blueberry/tests/audio_capture/bluetooth_audio_capture_test.py
@@ -0,0 +1,93 @@
+# Lint as: python3
+"""Tests for testing audio capture in android bt target controller.
+
+ location of the controller:
+ blueberry.controllers.android_bt_target_device
+ Before the test, the music file should be copied to the location of the
+ pri_phone. The a2dp sink phone should be with the android build that can
+ support a2dp sink profile (for example, the git_master-bds-dev).
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+import time
+
+from mobly import asserts
+from mobly import test_runner
+from blueberry.utils import blueberry_base_test
+from blueberry.utils import bt_test_utils
+
+MUSIC_FILE = '1khz.wav'
+
+
+class BluetoothAudioCaptureTest(blueberry_base_test.BlueberryBaseTest):
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothAudioCaptureTest, self).setup_class()
+ self.derived_bt_device = self.derived_bt_devices[0]
+ for device in self.android_devices:
+ device.init_setup()
+ device.sl4a_setup()
+ self.pri_phone = self.android_devices[0]
+ self.mac_address = self.derived_bt_device.get_bluetooth_mac_address()
+ self.derived_bt_device.activate_pairing_mode()
+ self.pri_phone.sl4a.bluetoothDiscoverAndBond(self.mac_address)
+ self.pri_phone.wait_for_connection_success(self.mac_address)
+ # Gives more time for the pairing between the pri_phone and the
+ # derived_bt_device (the android bt target device)
+ time.sleep(3)
+ self.derived_bt_device.add_sec_ad_device(self.pri_phone)
+ self.derived_bt_device.disconnect_all()
+ self.duration = self.derived_bt_device.audio_params['duration']
+ self.recorded_duration = 0
+
+ def setup_test(self):
+ """Setup for bluetooth latency test."""
+ logging.info('Setup Test for audio capture test')
+ super(BluetoothAudioCaptureTest, self).setup_test()
+ asserts.assert_true(self.derived_bt_device.a2dp_sink_connect(),
+ 'Failed to establish A2dp Sink connection')
+
+ def test_audio_capture(self):
+ """Tests the audio capture for the android bt target device."""
+
+ music_file = self.derived_bt_device.audio_params.get(
+ 'music_file', MUSIC_FILE)
+ music_file = 'file:///sdcard/Music/{}'.format(music_file)
+ self.pri_phone.sl4a.mediaPlayOpen(music_file)
+ self.pri_phone.sl4a.mediaPlaySetLooping()
+ self.pri_phone.sl4a.mediaPlayStart()
+ time.sleep(3)
+ self.pri_phone.log.info(self.pri_phone.sl4a.mediaPlayGetInfo())
+ self.derived_bt_device.start_audio_capture()
+ time.sleep(self.duration)
+ audio_captured = self.derived_bt_device.stop_audio_capture()
+ self.pri_phone.sl4a.mediaPlayStop()
+ self.pri_phone.sl4a.mediaPlayClose()
+ self.derived_bt_device.log.info('Audio play and record stopped')
+ self.recorded_duration = bt_test_utils.get_duration_seconds(audio_captured)
+ self.derived_bt_device.log.info(
+ 'The capture duration is %s s and the recorded duration is %s s',
+ self.duration, self.recorded_duration)
+
+ def teardown_class(self):
+ logging.info('Factory resetting Bluetooth on devices.')
+ self.pri_phone.factory_reset_bluetooth()
+ self.derived_bt_device.factory_reset_bluetooth()
+ super(BluetoothAudioCaptureTest, self).teardown_class()
+ self.derived_bt_device.stop_all_services()
+ self.record_data({
+ 'Test Name': 'test_audio_capture',
+ 'sponge_properties': {
+ 'duration': self.duration,
+ 'recorded duration': self.recorded_duration
+ }
+ })
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py b/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py
new file mode 100644
index 0000000000..484fcd9587
--- /dev/null
+++ b/system/blueberry/tests/avrcp/bluetooth_avrcp_test.py
@@ -0,0 +1,349 @@
+# Lint as: python3
+"""Tests for AVRCP basic functionality."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import time
+
+from mobly import test_runner
+from mobly import signals
+from mobly.controllers.android_device_lib import adb
+from blueberry.controllers import android_bt_target_device
+from blueberry.utils import blueberry_base_test
+from blueberry.utils import bt_constants
+
+# The audio source path of BluetoothMediaPlayback in the SL4A app.
+ANDROID_MEDIA_PATH = '/sdcard/Music/test'
+
+# Timeout for track change and playback state update in second.
+MEDIA_UPDATE_TIMEOUT_SEC = 3
+
+
+class BluetoothAvrcpTest(blueberry_base_test.BlueberryBaseTest):
+ """Test Class for Bluetooth AVRCP.
+
+ This test requires two or more audio files exist in path "/sdcard/Music/test"
+ on the primary device, and device controllers need to have the following APIs:
+ 1. play()
+ 2. pause()
+ 3. track_previous()
+ 4. track_next()
+ """
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothAvrcpTest, self).setup_class()
+
+ for device in self.android_devices:
+ device.init_setup()
+ device.sl4a_setup()
+
+ # The device which role is AVRCP Target (TG).
+ self.pri_device = self.android_devices[0]
+
+ if len(self.android_devices) > 1 and not self.derived_bt_devices:
+ self.derived_bt_device = self.android_devices[1]
+ else:
+ self.derived_bt_device = self.derived_bt_devices[0]
+
+ # Check if the derived bt device is android bt target device.
+ self.is_android_bt_target_device = isinstance(
+ self.derived_bt_device, android_bt_target_device.AndroidBtTargetDevice)
+
+ # Check if the audio files exist on the primary device.
+ try:
+ self.audio_files = self.pri_device.adb.shell(
+ 'ls %s' % ANDROID_MEDIA_PATH).decode().split('\n')[:-1]
+ if len(self.audio_files) < 2:
+ raise signals.TestError(
+ 'Please push two or more audio files to %s on the primary device '
+ '"%s".' % (ANDROID_MEDIA_PATH, self.pri_device.serial))
+ except adb.AdbError as error:
+ if 'No such file or directory' in str(error):
+ raise signals.TestError(
+ 'No directory "%s" found on the primary device "%s".' %
+ (ANDROID_MEDIA_PATH, self.pri_device.serial))
+ raise error
+
+ self.mac_address = self.derived_bt_device.get_bluetooth_mac_address()
+ self.derived_bt_device.activate_pairing_mode()
+ self.pri_device.set_target(self.derived_bt_device)
+ self.pri_device.pair_and_connect_bluetooth(self.mac_address)
+ self.pri_device.allow_extra_permissions()
+ # Gives more time for the pairing between two devices.
+ time.sleep(3)
+
+ if self.is_android_bt_target_device:
+ self.derived_bt_device.add_sec_ad_device(self.pri_device)
+
+ # Starts BluetoothSL4AAudioSrcMBS on the phone.
+ if self.is_android_bt_target_device:
+ self.derived_bt_device.init_ambs_for_avrcp()
+ else:
+ self.pri_device.sl4a.bluetoothMediaPhoneSL4AMBSStart()
+ # Waits for BluetoothSL4AAudioSrcMBS to be active.
+ time.sleep(1)
+ # Changes the playback state to Playing in order to other Media passthrough
+ # commands can work.
+ self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone(
+ bt_constants.CMD_MEDIA_PLAY)
+
+ # Collects media metadata of all tracks.
+ self.tracks = []
+ for _ in range(len(self.audio_files)):
+ self.tracks.append(self.pri_device.get_current_track_info())
+ self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone(
+ bt_constants.CMD_MEDIA_SKIP_NEXT)
+ self.pri_device.log.info('Tracks: %s' % self.tracks)
+
+ # Sets Playback state to Paused as default.
+ self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone(
+ bt_constants.CMD_MEDIA_PAUSE)
+
+ def teardown_class(self):
+ """Teardown class for bluetooth avrcp media play test."""
+ super(BluetoothAvrcpTest, self).teardown_class()
+ # Stops BluetoothSL4AAudioSrcMBS after all test methods finish.
+ if self.is_android_bt_target_device:
+ self.derived_bt_device.stop_ambs_for_avrcp()
+ else:
+ self.pri_device.sl4a.bluetoothMediaPhoneSL4AMBSStop()
+
+ def teardown_test(self):
+ """Teardown test for bluetooth avrcp media play test."""
+ super(BluetoothAvrcpTest, self).teardown_test()
+ # Sets Playback state to Paused after a test method finishes.
+ self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone(
+ bt_constants.CMD_MEDIA_PAUSE)
+
+ def wait_for_media_info_sync(self):
+ """Waits for sync Media information between two sides.
+
+ Waits for sync the current playback state and Now playing track info from
+ the android bt target device to the phone.
+ """
+ # Check if Playback state is sync.
+ expected_state = self.pri_device.get_current_playback_state()
+ self.derived_bt_device.verify_playback_state_changed(
+ expected_state=expected_state,
+ exception=signals.TestError(
+ 'Playback state is not equivalent between two sides. '
+ '"%s" != "%s"' %
+ (self.derived_bt_device.get_current_playback_state(),
+ expected_state)))
+
+ # Check if Now Playing track is sync.
+ expected_track = self.pri_device.get_current_track_info()
+ self.derived_bt_device.verify_current_track_changed(
+ expected_track=expected_track,
+ exception=signals.TestError(
+ 'Now Playing track is not equivalent between two sides. '
+ '"%s" != "%s"' %
+ (self.derived_bt_device.get_current_track_info(), expected_track)))
+
+ def execute_media_play_pause_test_logic(self, command_sender, test_command):
+ """Executes the test logic of the media command "play" or "pause".
+
+ Steps:
+ 1. Correct the playback state if needed.
+ 2. Send a media passthrough command.
+ 3. Verify that the playback state is changed from AVRCP TG and CT.
+
+ Args:
+ command_sender: a device controller sending the command.
+ test_command: string, the media passthrough command for testing, either
+ "play" or "pause".
+
+ Raises:
+ signals.TestError: raised if the test command is invalid.
+ """
+ # Checks if the test command is valid.
+ if test_command not in [bt_constants.CMD_MEDIA_PLAY,
+ bt_constants.CMD_MEDIA_PAUSE]:
+ raise signals.TestError(
+ 'Command "%s" is invalid. The test command should be "%s" or "%s".' %
+ (test_command, bt_constants.CMD_MEDIA_PLAY,
+ bt_constants.CMD_MEDIA_PAUSE))
+
+ # Make sure the playback state is playing if testing the command "pause".
+ if (self.pri_device.get_current_playback_state() !=
+ bt_constants.STATE_PLAYING and
+ test_command == bt_constants.CMD_MEDIA_PAUSE):
+ self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone(
+ bt_constants.CMD_MEDIA_PLAY)
+
+ # Makes sure Media info is the same between two sides.
+ if self.is_android_bt_target_device:
+ self.wait_for_media_info_sync()
+ self.pri_device.log.info(
+ 'Current playback state: %s' %
+ self.pri_device.get_current_playback_state())
+
+ expected_state = None
+ if test_command == bt_constants.CMD_MEDIA_PLAY:
+ command_sender.play()
+ expected_state = bt_constants.STATE_PLAYING
+ elif test_command == bt_constants.CMD_MEDIA_PAUSE:
+ command_sender.pause()
+ expected_state = bt_constants.STATE_PAUSED
+
+ # Verify that the playback state is changed.
+ self.pri_device.log.info('Expected playback state: %s' % expected_state)
+ device_check_list = [self.pri_device]
+ # Check the playback state from the android bt target device.
+ if self.is_android_bt_target_device:
+ device_check_list.append(self.derived_bt_device)
+ for device in device_check_list:
+ device.verify_playback_state_changed(
+ expected_state=expected_state,
+ exception=signals.TestFailure(
+ 'Playback state is not changed to "%s" from the device "%s". '
+ 'Current state: %s' %
+ (expected_state, device.serial,
+ device.get_current_playback_state())))
+
+ def execute_skip_next_prev_test_logic(self, command_sender, test_command):
+ """Executes the test logic of the media command "skipNext" or "skipPrev".
+
+ Steps:
+ 1. Correct the Now Playing track if needed.
+ 2. Send a media passthrough command.
+ 3. Verify that the Now Playing track is changed from AVRCP TG and CT.
+
+ Args:
+ command_sender: a device controller sending the command.
+ test_command: string, the media passthrough command for testing, either
+ "skipNext" or "skipPrev".
+
+ Raises:
+ signals.TestError: raised if the test command is invalid.
+ """
+ # Checks if the test command is valid.
+ if test_command not in [bt_constants.CMD_MEDIA_SKIP_NEXT,
+ bt_constants.CMD_MEDIA_SKIP_PREV]:
+ raise signals.TestError(
+ 'Command "%s" is invalid. The test command should be "%s" or "%s".' %
+ (test_command, bt_constants.CMD_MEDIA_SKIP_NEXT,
+ bt_constants.CMD_MEDIA_SKIP_PREV))
+
+ # Make sure the track index is not 0 if testing the command "skipPrev".
+ if (self.tracks.index(self.pri_device.get_current_track_info()) == 0
+ and test_command == bt_constants.CMD_MEDIA_SKIP_PREV):
+ self.pri_device.sl4a.bluetoothMediaHandleMediaCommandOnPhone(
+ bt_constants.CMD_MEDIA_SKIP_NEXT)
+
+ # Makes sure Media info is the same between two sides.
+ if self.is_android_bt_target_device:
+ self.wait_for_media_info_sync()
+ current_track = self.pri_device.get_current_track_info()
+ current_index = self.tracks.index(current_track)
+ self.pri_device.log.info('Current track: %s' % current_track)
+
+ expected_track = None
+ if test_command == bt_constants.CMD_MEDIA_SKIP_NEXT:
+ command_sender.track_next()
+ # It will return to the first track by skipNext if now playing is the last
+ # track.
+ if current_index + 1 == len(self.tracks):
+ expected_track = self.tracks[0]
+ else:
+ expected_track = self.tracks[current_index + 1]
+ elif test_command == bt_constants.CMD_MEDIA_SKIP_PREV:
+ command_sender.track_previous()
+ expected_track = self.tracks[current_index - 1]
+
+ # Verify that the now playing track is changed.
+ self.pri_device.log.info('Expected track: %s' % expected_track)
+ device_check_list = [self.pri_device]
+ # Check the playback state from the android bt target device.
+ if self.is_android_bt_target_device:
+ device_check_list.append(self.derived_bt_device)
+ for device in device_check_list:
+ device.verify_current_track_changed(
+ expected_track=expected_track,
+ exception=signals.TestFailure(
+ 'Now Playing track is not changed to "%s" from the device "%s". '
+ 'Current track: %s' %
+ (expected_track, device.serial, device.get_current_track_info())))
+
+ def test_media_pause(self):
+ """Tests the media pause from AVRCP Controller."""
+ self.execute_media_play_pause_test_logic(
+ command_sender=self.derived_bt_device,
+ test_command=bt_constants.CMD_MEDIA_PAUSE)
+
+ def test_media_play(self):
+ """Tests the media play from AVRCP Controller."""
+ self.execute_media_play_pause_test_logic(
+ command_sender=self.derived_bt_device,
+ test_command=bt_constants.CMD_MEDIA_PLAY)
+
+ def test_media_skip_prev(self):
+ """Tests the media skip prev from AVRCP Controller."""
+ self.execute_skip_next_prev_test_logic(
+ command_sender=self.derived_bt_device,
+ test_command=bt_constants.CMD_MEDIA_SKIP_PREV)
+
+ def test_media_skip_next(self):
+ """Tests the media skip next from AVRCP Controller."""
+ self.execute_skip_next_prev_test_logic(
+ command_sender=self.derived_bt_device,
+ test_command=bt_constants.CMD_MEDIA_SKIP_NEXT)
+
+ def test_media_pause_from_phone(self):
+ """Tests the media pause from AVRCP Target.
+
+ Tests that Playback state of AVRCP Controller will be changed to paused when
+ AVRCP Target sends the command "pause".
+ """
+ if not self.is_android_bt_target_device:
+ signals.TestError('The test requires an android bt target device.')
+
+ self.execute_media_play_pause_test_logic(
+ command_sender=self.pri_device,
+ test_command=bt_constants.CMD_MEDIA_PAUSE)
+
+ def test_media_play_from_phone(self):
+ """Tests the media play from AVRCP Target.
+
+ Tests that Playback state of AVRCP Controller will be changed to playing
+ when AVRCP Target sends the command "play".
+ """
+ if not self.is_android_bt_target_device:
+ signals.TestError('The test requires an android bt target device.')
+
+ self.execute_media_play_pause_test_logic(
+ command_sender=self.pri_device,
+ test_command=bt_constants.CMD_MEDIA_PLAY)
+
+ def test_media_skip_prev_from_phone(self):
+ """Tests the media skip prev from AVRCP Target.
+
+ Tests that Now Playing track of AVRCP Controller will be changed to the
+ previous track when AVRCP Target sends the command "skipPrev".
+ """
+ if not self.is_android_bt_target_device:
+ signals.TestError('The test requires an android bt target device.')
+
+ self.execute_skip_next_prev_test_logic(
+ command_sender=self.pri_device,
+ test_command=bt_constants.CMD_MEDIA_SKIP_PREV)
+
+ def test_media_skip_next_from_phone(self):
+ """Tests the media skip next from AVRCP Target.
+
+ Tests that Now Playing track of AVRCP Controller will be changed to the next
+ track when AVRCP Target sends the command "skipNext".
+ """
+ if not self.is_android_bt_target_device:
+ signals.TestError('The test requires an android bt target device.')
+
+ self.execute_skip_next_prev_test_logic(
+ command_sender=self.pri_device,
+ test_command=bt_constants.CMD_MEDIA_SKIP_NEXT)
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/connectivity/bluetooth_connection_test.py b/system/blueberry/tests/connectivity/bluetooth_connection_test.py
new file mode 100644
index 0000000000..767af38da6
--- /dev/null
+++ b/system/blueberry/tests/connectivity/bluetooth_connection_test.py
@@ -0,0 +1,96 @@
+"""Tests for Bluetooth connection with Android device and a Bluetooth device."""
+
+import time
+
+from mobly import test_runner
+from blueberry.utils import blueberry_base_test
+
+# Connection state change sleep time in seconds.
+CONNECTION_STATE_CHANGE_SLEEP_SEC = 5
+
+
+class BluetoothConnectionTest(blueberry_base_test.BlueberryBaseTest):
+ """Test Class for Bluetooth connection testing.
+
+ Attributes:
+ primary_device: A primary device under test.
+ derived_bt_device: A Bluetooth device which is used to connected to the
+ primary device in the test.
+ """
+
+ def setup_class(self):
+ super().setup_class()
+ self.primary_device = self.android_devices[0]
+ self.primary_device.init_setup()
+
+ self.derived_bt_device = self.derived_bt_devices[0]
+ self.derived_bt_device.factory_reset_bluetooth()
+ self.mac_address = self.derived_bt_device.get_bluetooth_mac_address()
+ self.derived_bt_device.activate_pairing_mode()
+ self.primary_device.pair_and_connect_bluetooth(self.mac_address)
+
+ def setup_test(self):
+ super().setup_test()
+ # Checks if A2DP and HSP profiles are connected.
+ self.wait_for_a2dp_and_hsp_connection_state(connected=True)
+ # Buffer between tests.
+ time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC)
+
+ def wait_for_a2dp_and_hsp_connection_state(self, connected):
+ """Asserts that A2DP and HSP connections are in the expected state.
+
+ Args:
+ connected: bool, True if the expected state is connected else False.
+ """
+ self.primary_device.wait_for_a2dp_connection_state(self.mac_address,
+ connected)
+ self.primary_device.wait_for_hsp_connection_state(self.mac_address,
+ connected)
+
+ def test_disconnect_and_connect(self):
+ """Test for DUT disconnecting and then connecting to the remote device."""
+ self.primary_device.log.info('Disconnecting the device "%s"...' %
+ self.mac_address)
+ self.primary_device.disconnect_bluetooth(self.mac_address)
+ self.wait_for_a2dp_and_hsp_connection_state(connected=False)
+ # Buffer time for connection state change.
+ time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC)
+ self.primary_device.log.info('Connecting the device "%s"...' %
+ self.mac_address)
+ self.primary_device.connect_bluetooth(self.mac_address)
+ self.wait_for_a2dp_and_hsp_connection_state(connected=True)
+
+ def test_reconnect_when_enabling_bluetooth(self):
+ """Test for DUT reconnecting to the remote device when Bluetooth enabled."""
+ self.primary_device.log.info('Turning off Bluetooth...')
+ self.primary_device.sl4a.bluetoothToggleState(False)
+ self.primary_device.wait_for_bluetooth_toggle_state(enabled=False)
+ self.primary_device.wait_for_disconnection_success(self.mac_address)
+ time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC)
+ self.primary_device.log.info('Turning on Bluetooth...')
+ self.primary_device.sl4a.bluetoothToggleState(True)
+ self.primary_device.wait_for_bluetooth_toggle_state(enabled=True)
+ self.primary_device.wait_for_connection_success(self.mac_address)
+ self.wait_for_a2dp_and_hsp_connection_state(connected=True)
+
+ def test_reconnect_when_connected_device_powered_on(self):
+ """Test for the remote device reconnecting to DUT.
+
+ Tests that DUT can be disconnected when the remoted device is powerd off,
+ and then reconnected when the remote device is powered on.
+ """
+ self.primary_device.log.info(
+ 'The connected device "%s" is being powered off...' % self.mac_address)
+ self.derived_bt_device.power_off()
+ self.primary_device.wait_for_disconnection_success(self.mac_address)
+ self.wait_for_a2dp_and_hsp_connection_state(connected=False)
+ time.sleep(CONNECTION_STATE_CHANGE_SLEEP_SEC)
+ self.derived_bt_device.power_on()
+ self.primary_device.log.info(
+ 'The connected device "%s" is being powered on...' % self.mac_address)
+ self.primary_device.wait_for_connection_success(self.mac_address)
+ self.wait_for_a2dp_and_hsp_connection_state(connected=True)
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/connectivity/bluetooth_latency_test.py b/system/blueberry/tests/connectivity/bluetooth_latency_test.py
new file mode 100644
index 0000000000..0facbdc8e7
--- /dev/null
+++ b/system/blueberry/tests/connectivity/bluetooth_latency_test.py
@@ -0,0 +1,132 @@
+# Lint as: python3
+"""Tests for blueberry.tests.bluetooth.bluetooth_latency."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+import math
+import random
+import string
+import time
+
+from mobly import asserts
+from mobly import test_runner
+from mobly.signals import TestAbortClass
+# Internal import
+from blueberry.utils import blueberry_base_test
+from blueberry.utils import bt_test_utils
+from blueberry.utils import metrics_utils
+# Internal import
+
+
+class BluetoothLatencyTest(blueberry_base_test.BlueberryBaseTest):
+
+ @retry.logged_retry_on_exception(
+ retry_intervals=retry.FuzzedExponentialIntervals(
+ initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300))
+ def _measure_latency(self):
+ """Measures the latency of data transfer over RFCOMM.
+
+ Sends data from the client device that is read by the server device.
+ Calculates the latency of the transfer.
+
+ Returns:
+ The latency of the transfer milliseconds.
+ """
+
+ # Generates a random message to transfer
+ message = (''.join(
+ random.choice(string.ascii_letters + string.digits) for _ in range(6)))
+ start_time = time.time()
+ write_read_successful = bt_test_utils.write_read_verify_data_sl4a(
+ self.phone, self.derived_bt_device, message, False)
+ end_time = time.time()
+ asserts.assert_true(write_read_successful, 'Failed to send/receive message')
+ return (end_time - start_time) * 1000
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothLatencyTest, self).setup_class()
+ if len(self.android_devices) < 2:
+ raise TestAbortClass(
+ 'Not enough android phones detected (need at least two)')
+ self.phone = self.android_devices[0]
+ self.phone.init_setup()
+ self.phone.sl4a_setup()
+
+ # We treat the secondary phone as a derived_bt_device in order for the
+ # generic script to work with this android phone properly. Data will be sent
+ # from first phone to the second phone.
+ self.derived_bt_device = self.android_devices[1]
+ self.derived_bt_device.init_setup()
+ self.derived_bt_device.sl4a_setup()
+ self.set_btsnooplogmode_full(self.phone)
+ self.set_btsnooplogmode_full(self.derived_bt_device)
+
+ self.metrics = (
+ metrics_utils.BluetoothMetricLogger(
+ metrics_pb2.BluetoothDataTestResult()))
+ self.metrics.add_primary_device_metrics(self.phone)
+ self.metrics.add_connected_device_metrics(self.derived_bt_device)
+
+ self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM
+ self.iterations = int(self.user_params.get('iterations', 300))
+ logging.info('Running Bluetooth latency test %s times.', self.iterations)
+ logging.info('Successfully found required devices.')
+
+ def setup_test(self):
+ """Setup for bluetooth latency test."""
+ logging.info('Setup Test for test_bluetooth_latency')
+ super(BluetoothLatencyTest, self).setup_test()
+ asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device),
+ 'Failed to establish RFCOMM connection')
+
+ def test_bluetooth_latency(self):
+ """Tests the latency for a data transfer over RFCOMM."""
+
+ metrics = {}
+ latency_list = []
+
+ for _ in range(self.iterations):
+ latency_list.append(self._measure_latency())
+
+ metrics['data_transfer_protocol'] = self.data_transfer_type
+ metrics['data_latency_min_millis'] = int(min(latency_list))
+ metrics['data_latency_max_millis'] = int(max(latency_list))
+ metrics['data_latency_avg_millis'] = int(
+ math.fsum(latency_list) / float(len(latency_list)))
+ logging.info('Latency: %s', metrics)
+
+ asserts.assert_true(metrics['data_latency_min_millis'] > 0,
+ 'Minimum latency must be greater than 0!')
+ self.metrics.add_test_metrics(metrics)
+ for metric in metrics:
+ self.record_data({
+ 'Test Name': 'test_bluetooth_latency',
+ 'sponge_properties': {
+ metric: metrics[metric],
+ }
+ })
+
+ def teardown_class(self):
+ logging.info('Factory resetting Bluetooth on devices.')
+ self.phone.sl4a.bluetoothSocketConnStop()
+ self.derived_bt_device.sl4a.bluetoothSocketConnStop()
+ self.phone.factory_reset_bluetooth()
+ self.derived_bt_device.factory_reset_bluetooth()
+ super(BluetoothLatencyTest, self).teardown_class()
+ self.record_data({
+ 'Test Name': 'test_bluetooth_latency',
+ 'sponge_properties': {
+ 'proto_ascii':
+ self.metrics.proto_message_to_ascii(),
+ 'primary_device_build':
+ self.phone.get_device_info()['android_release_id']
+ }
+ })
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/connectivity/bluetooth_pairing_test.py b/system/blueberry/tests/connectivity/bluetooth_pairing_test.py
new file mode 100644
index 0000000000..f3c1770781
--- /dev/null
+++ b/system/blueberry/tests/connectivity/bluetooth_pairing_test.py
@@ -0,0 +1,81 @@
+"""Tests for blueberry.tests.bluetooth_pairing."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+
+from mobly import test_runner
+from blueberry.utils import blueberry_base_test
+
+
+class BluetoothPairingTest(blueberry_base_test.BlueberryBaseTest):
+ """Test Class for Bluetooth Pairing Test.
+
+ Test will reset the bluetooth settings on the phone and attempt to pair
+ with the derived_bt_device specified in the configuration file.
+ """
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothPairingTest, self).setup_class()
+ # Adds a use case that derived_bt_device initiates a pairing request to
+ # primary_device. Enable this case if allow_pairing_reverse is 1.
+ self.allow_pairing_reverse = int(self.user_params.get(
+ 'allow_pairing_reverse', 0))
+
+ if self.android_devices:
+ self.primary_device = self.android_devices[0]
+ self.primary_device.init_setup()
+ self.primary_device.sl4a_setup()
+
+ if len(self.android_devices) > 1 and not self.derived_bt_devices:
+ # In the case of pairing phone to phone, we need to treat the
+ # secondary phone as a derived_bt_device in order for the generic script
+ # to work with this android phone properly.
+ self.derived_bt_device = self.android_devices[1]
+ self.derived_bt_device.init_setup()
+ self.derived_bt_device.sl4a_setup()
+ else:
+ self.derived_bt_device = self.derived_bt_devices[0]
+ self.derived_bt_device.factory_reset_bluetooth()
+ else:
+ # In the case of pairing mock to mock, at least 2 derived_bt_device is
+ # required. The first derived_bt_device is treated as primary_device.
+ self.primary_device = self.derived_bt_devices[0]
+ self.primary_device.init_setup()
+ self.derived_bt_device = self.derived_bt_devices[1]
+ self.derived_bt_device.factory_reset_bluetooth()
+
+ def setup_test(self):
+ """Setup for pairing test."""
+ logging.info('Setup Test for test_pair_and_bond')
+ super(BluetoothPairingTest, self).setup_test()
+
+ def test_pair_and_bond(self):
+ """Test for pairing and bonding a phone with a bluetooth device.
+
+ Initiates pairing from the phone and checks for 20 seconds that
+ the device is connected.
+ """
+ device_list = [(self.primary_device, self.derived_bt_device)]
+ if self.allow_pairing_reverse:
+ device_list.append((self.derived_bt_device, self.primary_device))
+ for initiator, receiver in device_list:
+ # get mac address of device to pair with
+ mac_address = receiver.get_bluetooth_mac_address()
+ logging.info('Receiver BT MAC Address: %s', mac_address)
+ # put device into pairing mode
+ receiver.activate_pairing_mode()
+ # initiate pairing from initiator
+ initiator.set_target(receiver)
+ initiator.pair_and_connect_bluetooth(mac_address)
+ if self.allow_pairing_reverse and initiator != self.derived_bt_device:
+ logging.info('===== Reversing Pairing =====')
+ # Resets Bluetooth status for two sides.
+ initiator.factory_reset_bluetooth()
+ receiver.factory_reset_bluetooth()
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/connectivity/bluetooth_throughput_test.py b/system/blueberry/tests/connectivity/bluetooth_throughput_test.py
new file mode 100644
index 0000000000..79ac2e22b9
--- /dev/null
+++ b/system/blueberry/tests/connectivity/bluetooth_throughput_test.py
@@ -0,0 +1,196 @@
+# Lint as: python3
+"""Tests for blueberry.tests.bluetooth.bluetooth_throughput."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging
+import math
+
+from mobly import asserts
+from mobly import test_runner
+from mobly.controllers.android_device_lib.jsonrpc_client_base import ApiError
+from mobly.signals import TestAbortClass
+# Internal import
+from blueberry.utils import blueberry_base_test
+from blueberry.utils import metrics_utils
+# Internal import
+
+
+class BluetoothThroughputTest(blueberry_base_test.BlueberryBaseTest):
+
+ @retry.logged_retry_on_exception(
+ retry_intervals=retry.FuzzedExponentialIntervals(
+ initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300))
+ def _measure_throughput(self, num_of_buffers, buffer_size):
+ """Measures the throughput of a data transfer.
+
+ Sends data from the client device that is read by the server device.
+ Calculates the throughput for the transfer.
+
+ Args:
+ num_of_buffers: An integer value designating the number of buffers
+ to be sent.
+ buffer_size: An integer value designating the size of each buffer,
+ in bytes.
+
+ Returns:
+ The throughput of the transfer in bytes per second.
+ """
+
+ # TODO(user): Need to fix throughput send/receive methods
+ (self.phone.sl4a
+ .bluetoothConnectionThroughputSend(num_of_buffers, buffer_size))
+
+ throughput = (self.derived_bt_device.sl4a
+ .bluetoothConnectionThroughputRead(num_of_buffers,
+ buffer_size))
+ return throughput
+
+ def _throughput_test(self, buffer_size, test_name):
+ logging.info('throughput test with buffer_size: %d and testname: %s',
+ buffer_size, test_name)
+ metrics = {}
+ throughput_list = []
+ num_of_buffers = 1
+ for _ in range(self.iterations):
+ throughput = self._measure_throughput(num_of_buffers, buffer_size)
+ logging.info('Throughput: %d bytes-per-sec', throughput)
+ throughput_list.append(throughput)
+
+ metrics['data_transfer_protocol'] = self.data_transfer_type
+ metrics['data_packet_size'] = buffer_size
+ metrics['data_throughput_min_bytes_per_second'] = int(
+ min(throughput_list))
+ metrics['data_throughput_max_bytes_per_second'] = int(
+ max(throughput_list))
+ metrics['data_throughput_avg_bytes_per_second'] = int(
+ math.fsum(throughput_list) / float(len(throughput_list)))
+
+ logging.info('Throughput at large buffer: %s', metrics)
+
+ asserts.assert_true(metrics['data_throughput_min_bytes_per_second'] > 0,
+ 'Minimum throughput must be greater than 0!')
+
+ self.metrics.add_test_metrics(metrics)
+ for metric in metrics:
+ self.record_data({
+ 'Test Name': test_name,
+ 'sponge_properties': {
+ metric: metrics[metric],
+ }
+ })
+ self.record_data({
+ 'Test Name': test_name,
+ 'sponge_properties': {
+ 'proto_ascii':
+ self.metrics.proto_message_to_ascii(),
+ 'primary_device_build':
+ self.phone.get_device_info()['android_release_id']
+ }
+ })
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothThroughputTest, self).setup_class()
+ if len(self.android_devices) < 2:
+ raise TestAbortClass(
+ 'Not enough android phones detected (need at least two)')
+ self.phone = self.android_devices[0]
+
+ # We treat the secondary phone as a derived_bt_device in order for the
+ # generic script to work with this android phone properly. Data will be sent
+ # from first phone to the second phone.
+ self.derived_bt_device = self.android_devices[1]
+ self.phone.init_setup()
+ self.derived_bt_device.init_setup()
+ self.phone.sl4a_setup()
+ self.derived_bt_device.sl4a_setup()
+ self.set_btsnooplogmode_full(self.phone)
+ self.set_btsnooplogmode_full(self.derived_bt_device)
+
+ self.metrics = (
+ metrics_utils.BluetoothMetricLogger(
+ metrics_pb2.BluetoothDataTestResult()))
+ self.metrics.add_primary_device_metrics(self.phone)
+ self.metrics.add_connected_device_metrics(self.derived_bt_device)
+
+ self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM
+ self.iterations = int(self.user_params.get('iterations', 300))
+ logging.info('Running Bluetooth throughput test %s times.', self.iterations)
+ logging.info('Successfully found required devices.')
+
+ def setup_test(self):
+ """Setup for bluetooth latency test."""
+ logging.info('Setup Test for test_bluetooth_throughput')
+ super(BluetoothThroughputTest, self).setup_test()
+ asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device),
+ 'Failed to establish RFCOMM connection')
+
+ def test_bluetooth_throughput_large_buffer(self):
+ """Tests the throughput with large buffer size.
+
+ Tests the throughput over a series of data transfers with large buffer size.
+ """
+ large_buffer_size = 300
+ test_name = 'test_bluetooth_throughput_large_buffer'
+ self._throughput_test(large_buffer_size, test_name)
+
+ def test_bluetooth_throughput_medium_buffer(self):
+ """Tests the throughput with medium buffer size.
+
+ Tests the throughput over a series of data transfers with medium buffer
+ size.
+ """
+ medium_buffer_size = 100
+ test_name = 'test_bluetooth_throughput_medium_buffer'
+ self._throughput_test(medium_buffer_size, test_name)
+
+ def test_bluetooth_throughput_small_buffer(self):
+ """Tests the throughput with small buffer size.
+
+ Tests the throughput over a series of data transfers with small buffer size.
+ """
+ small_buffer_size = 10
+ test_name = 'test_bluetooth_throughput_small_buffer'
+ self._throughput_test(small_buffer_size, test_name)
+
+ def test_maximum_buffer_size(self):
+ """Calculates the maximum allowed buffer size for one packet."""
+ current_buffer_size = 300
+ throughput = -1
+ num_of_buffers = 1
+ while True:
+ logging.info('Trying buffer size %d', current_buffer_size)
+ try:
+ throughput = self._measure_throughput(
+ num_of_buffers, current_buffer_size)
+ logging.info('The throughput is %d at buffer size of %d', throughput,
+ current_buffer_size)
+ except ApiError:
+ maximum_buffer_size = current_buffer_size - 1
+ logging.info('Max buffer size: %d bytes', maximum_buffer_size)
+ logging.info('Max throughput: %d bytes-per-second', throughput)
+ self.record_data({
+ 'Test Name': 'test_maximum_buffer_size',
+ 'sponge_properties': {
+ 'maximum_buffer_size': maximum_buffer_size
+ }
+ })
+ return True
+ current_buffer_size += 1
+
+ def teardown_test(self):
+ self.phone.sl4a.bluetoothSocketConnStop()
+ self.derived_bt_device.sl4a.bluetoothSocketConnStop()
+
+ def teardown_class(self):
+ self.phone.factory_reset_bluetooth()
+ self.derived_bt_device.factory_reset_bluetooth()
+ logging.info('Factory resetting Bluetooth on devices.')
+ super(BluetoothThroughputTest, self).teardown_class()
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/map/bluetooth_map_test.py b/system/blueberry/tests/map/bluetooth_map_test.py
new file mode 100644
index 0000000000..11bee4ed4d
--- /dev/null
+++ b/system/blueberry/tests/map/bluetooth_map_test.py
@@ -0,0 +1,148 @@
+"""Tests for blueberry.map.bluetooth_map."""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import queue
+import time
+
+from mobly import test_runner
+from mobly import signals
+from mobly import utils
+
+from blueberry.controllers import android_bt_target_device
+from blueberry.utils import blueberry_base_test
+
+_SMS_MSG_EVENT = 'SmsReceived'
+_MAP_MSG_EVENT = 'MapMessageReceived'
+
+_EVENT_TIMEOUT_SEC = 180
+_TEXT_LENGTH = 10
+_TEXT_COUNT = 5
+
+
+class BluetoothMapTest(blueberry_base_test.BlueberryBaseTest):
+ """Test Class for Bluetooth MAP Test."""
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super().setup_class()
+ for device in self.android_devices:
+ device.init_setup()
+ device.sl4a_setup()
+
+ # Primary phone which role is Message Server Equipment (MSE).
+ self.pri_phone = self.android_devices[0]
+ self.pri_phone.sl4a.smsStartTrackingIncomingSmsMessage()
+ self.pri_number = self.pri_phone.dimensions['phone_number']
+
+ # Secondary phone which is used to send SMS messages to primary phone.
+ self.sec_phone = self.android_devices[1]
+
+ # Bluetooth carkit which role is Message Client Equipment (MCE).
+ self.derived_bt_device = self.derived_bt_devices[0]
+
+ mac_address = self.derived_bt_device.get_bluetooth_mac_address()
+ self.derived_bt_device.activate_pairing_mode()
+ self.pri_phone.pair_and_connect_bluetooth(mac_address)
+ # Sleep to make the connection to be steady.
+ time.sleep(5)
+
+ if isinstance(
+ self.derived_bt_device, android_bt_target_device.AndroidBtTargetDevice):
+ # Allow sl4a to receive the intent with ACTION_MESSAGE_RECEIVED.
+ self.derived_bt_device.adb.shell(
+ 'pm grant com.googlecode.android_scripting '
+ 'android.permission.RECEIVE_SMS')
+ # Connect derived bt device to primary phone via MAP MCE profile.
+ self.derived_bt_device.add_sec_ad_device(self.pri_phone)
+
+ def teardown_test(self):
+ """Standard Mobly teardown test.
+
+ Disconnects the MAP connection after a test completes.
+ """
+ super().teardown_test()
+ self.derived_bt_device.map_disconnect()
+
+ def _wait_for_message_on_mce(self, text):
+ """Waits for that MCE gets an event with specific message.
+
+ Args:
+ text: String, Text of the message.
+
+ Raises:
+ TestFailure: Raised if timed out.
+ """
+ try:
+ self.derived_bt_device.ed.wait_for_event(
+ _MAP_MSG_EVENT, lambda e: e['data'] == text, _EVENT_TIMEOUT_SEC)
+ self.derived_bt_device.log.info(
+ 'Successfully got the unread message: %s' % text)
+ except queue.Empty:
+ raise signals.TestFailure(
+ 'Timed out after %ds waiting for "%s" event with the message: %s' %
+ (_EVENT_TIMEOUT_SEC, _MAP_MSG_EVENT, text))
+
+ def _wait_for_message_on_mse(self, text):
+ """Waits for that MSE gets an event with specific message.
+
+ This method is used to make sure that MSE has received the test message.
+
+ Args:
+ text: String, Text of the message.
+
+ Raises:
+ TestError: Raised if timed out.
+ """
+ try:
+ self.pri_phone.ed.wait_for_event(
+ _SMS_MSG_EVENT, lambda e: e['data']['Text'] == text,
+ _EVENT_TIMEOUT_SEC)
+ self.pri_phone.log.info(
+ 'Successfully received the incoming message: %s' % text)
+ except queue.Empty:
+ raise signals.TestError(
+ 'Timed out after %ds waiting for "%s" event with the message: %s' %
+ (_EVENT_TIMEOUT_SEC, _SMS_MSG_EVENT, text))
+
+ def _create_message_on_mse(self, text):
+ """Creates a new incoming message on MSE.
+
+ Args:
+ text: String, Text of the message.
+ """
+ self.sec_phone.sl4a.smsSendTextMessage(self.pri_number, text, False)
+ self._wait_for_message_on_mse(text)
+
+ def test_get_existing_unread_messages(self):
+ """Test for the feature of getting existing unread messages on MCE.
+
+ Tests MCE can list existing messages of MSE.
+ """
+ text_list = []
+ # Creates 5 SMS messages on MSE before establishing connection.
+ for _ in range(_TEXT_COUNT):
+ text = utils.rand_ascii_str(_TEXT_LENGTH)
+ self._create_message_on_mse(text)
+ text_list.append(text)
+ self.derived_bt_device.map_connect()
+ # Gets the unread messages of MSE and checks if they are downloaded
+ # successfully on MCE.
+ self.derived_bt_device.get_unread_messages()
+ for text in text_list:
+ self._wait_for_message_on_mce(text)
+
+ def test_receive_unread_message(self):
+ """Test for the feature of receiving unread message on MCE.
+
+ Tests MCE can get an unread message when MSE receives an incoming message.
+ """
+ self.derived_bt_device.map_connect()
+ text = utils.rand_ascii_str(_TEXT_LENGTH)
+ self._create_message_on_mse(text)
+ self._wait_for_message_on_mce(text)
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/pan/bluetooth_pan_test.py b/system/blueberry/tests/pan/bluetooth_pan_test.py
new file mode 100644
index 0000000000..062da8e480
--- /dev/null
+++ b/system/blueberry/tests/pan/bluetooth_pan_test.py
@@ -0,0 +1,325 @@
+"""Tests for Bluetooth PAN profile functionalities."""
+
+import contextlib
+import time
+
+from mobly import test_runner
+from mobly import signals
+from mobly.controllers.android_device_lib import jsonrpc_client_base
+from blueberry.utils import blueberry_base_test
+from blueberry.utils import bt_test_utils
+
+# Timeout to wait for NAP service connection to be specific state in second.
+CONNECTION_TIMEOUT_SECS = 20
+
+# Interval time between ping requests in second.
+PING_INTERVAL_TIME_SEC = 2
+
+# Timeout to wait for ping success in second.
+PING_TIMEOUT_SEC = 60
+
+# A URL is used to verify internet by ping request.
+TEST_URL = 'http://www.google.com'
+
+# A string representing SIM State is ready.
+SIM_STATE_READY = 'READY'
+
+
+class BluetoothPanTest(blueberry_base_test.BlueberryBaseTest):
+ """Test class for Bluetooth PAN(Personal Area Networking) profile.
+
+ Test internet connection sharing via Bluetooth between two Android devices.
+ One device which is referred to as NAP(Network Access Point) uses Bluetooth
+ tethering to share internet connection with another device which is referred
+ to as PANU(Personal Area Networking User).
+ """
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothPanTest, self).setup_class()
+ # Number of attempts to initiate connection. 5 attempts as default.
+ self.pan_connect_attempts = self.user_params.get('pan_connect_attempts', 5)
+
+ for device in self.android_devices:
+ device.init_setup()
+ device.sl4a_setup()
+ device.mac_address = device.get_bluetooth_mac_address()
+
+ # Check if the device has inserted a SIM card.
+ if device.sl4a.telephonyGetSimState() != SIM_STATE_READY:
+ raise signals.TestError('SIM card is not ready on Device "%s".' %
+ device.serial)
+
+ self.primary_device = self.android_devices[0]
+ self.secondary_device = self.android_devices[1]
+
+ def teardown_test(self):
+ """Standard Mobly teardown test.
+
+ Reset every devices when a test finished.
+ """
+ super(BluetoothPanTest, self).teardown_test()
+ # Revert debug tags.
+ for device in self.android_devices:
+ device.debug_tag = device.serial
+ device.factory_reset_bluetooth()
+
+ def wait_for_nap_service_connection(
+ self,
+ device,
+ connected_mac_addr,
+ state_connected=True):
+ """Waits for NAP service connection to be expected state.
+
+ Args:
+ device: AndroidDevice, A device is used to check this connection.
+ connected_mac_addr: String, Bluetooth Mac address is needed to be checked.
+ state_connected: Bool, NAP service connection is established as expected
+ if True, else terminated as expected.
+
+ Raises:
+ TestFailure: Raised if NAP service connection is not expected state.
+ """
+ def is_device_connected():
+ """Returns True if connected else False."""
+ connected_devices = (device.sl4a.
+ bluetoothPanGetConnectedDevices())
+ # Check if the Bluetooth mac address is in the connected device list.
+ return connected_mac_addr in [d['address'] for d in connected_devices]
+
+ bt_test_utils.wait_until(
+ timeout_sec=CONNECTION_TIMEOUT_SECS,
+ condition_func=is_device_connected,
+ func_args=[],
+ expected_value=state_connected,
+ exception=signals.TestFailure(
+ 'NAP service connection failed to be %s in %ds.' %
+ ('established' if state_connected else 'terminated',
+ CONNECTION_TIMEOUT_SECS)))
+
+ def initiate_nap_service_connection(
+ self,
+ initiator_device,
+ connected_mac_addr):
+ """Initiates NAP service connection.
+
+ Args:
+ initiator_device: AndroidDevice, A device intiating connection.
+ connected_mac_addr: String, Bluetooth Mac address of connected device.
+
+ Raises:
+ TestFailure: Raised if NAP service connection fails to be established.
+ """
+ count = 0
+ for _ in range(self.pan_connect_attempts):
+ count += 1
+ try:
+ initiator_device.sl4a.bluetoothConnectBonded(connected_mac_addr)
+ self.wait_for_nap_service_connection(
+ device=initiator_device,
+ connected_mac_addr=connected_mac_addr,
+ state_connected=True)
+ return
+ except signals.TestFailure:
+ if count == self.pan_connect_attempts:
+ raise signals.TestFailure(
+ 'NAP service connection still failed to be established '
+ 'after retried %d times.' %
+ self.pan_connect_attempts)
+
+ def terminate_nap_service_connection(
+ self,
+ initiator_device,
+ connected_mac_addr):
+ """Terminates NAP service connection.
+
+ Args:
+ initiator_device: AndroidDevice, A device intiating disconnection.
+ connected_mac_addr: String, Bluetooth Mac address of connected device.
+ """
+ initiator_device.log.info('Terminate NAP service connection.')
+ initiator_device.sl4a.bluetoothDisconnectConnected(connected_mac_addr)
+ self.wait_for_nap_service_connection(
+ device=initiator_device,
+ connected_mac_addr=connected_mac_addr,
+ state_connected=False)
+
+ @contextlib.contextmanager
+ def establish_nap_service_connection(self, nap_device, panu_device):
+ """Establishes NAP service connection between both Android devices.
+
+ The context is used to form a basic network connection between devices
+ before executing a test.
+
+ Steps:
+ 1. Disable Mobile data to avoid internet access on PANU device.
+ 2. Make sure Mobile data available on NAP device.
+ 3. Enable Bluetooth from PANU device.
+ 4. Enable Bluetooth tethering on NAP device.
+ 5. Initiate a connection from PANU device.
+ 6. Check if PANU device has internet access via the connection.
+
+ Args:
+ nap_device: AndroidDevice, A device sharing internet connection via
+ Bluetooth tethering.
+ panu_device: AndroidDevice, A device gaining internet access via
+ Bluetooth tethering.
+
+ Yields:
+ None, the context just execute a pre procedure for PAN testing.
+
+ Raises:
+ signals.TestError: raised if a step fails.
+ signals.TestFailure: raised if PANU device fails to access internet.
+ """
+ nap_device.debug_tag = 'NAP'
+ panu_device.debug_tag = 'PANU'
+ try:
+ # Disable Mobile data to avoid internet access on PANU device.
+ panu_device.log.info('Disabling Mobile data...')
+ panu_device.sl4a.setMobileDataEnabled(False)
+ self.verify_internet(
+ allow_access=False,
+ device=panu_device,
+ exception=signals.TestError(
+ 'PANU device "%s" still connected to internet when Mobile data '
+ 'had been disabled.' % panu_device.serial))
+
+ # Make sure NAP device has Mobile data for internet sharing.
+ nap_device.log.info('Enabling Mobile data...')
+ nap_device.sl4a.setMobileDataEnabled(True)
+ self.verify_internet(
+ allow_access=True,
+ device=nap_device,
+ exception=signals.TestError(
+ 'NAP device "%s" did not have internet access when Mobile data '
+ 'had been enabled.' % nap_device.serial))
+
+ # Enable Bluetooth tethering from NAP device.
+ nap_device.set_bluetooth_tethering(status_enabled=True)
+ # Wait until Bluetooth tethering stabilizes. This waiting time avoids PANU
+ # device initiates a connection to NAP device immediately when NAP device
+ # enables Bluetooth tethering.
+ time.sleep(5)
+
+ nap_device.activate_pairing_mode()
+ panu_device.log.info('Pair to NAP device "%s".' % nap_device.serial)
+ panu_device.pair_and_connect_bluetooth(nap_device.mac_address)
+
+ # Initiate a connection to NAP device.
+ panu_device.log.info('Initiate a connection to NAP device "%s".' %
+ nap_device.serial)
+ self.initiate_nap_service_connection(
+ initiator_device=panu_device,
+ connected_mac_addr=nap_device.mac_address)
+
+ # Check if PANU device can access internet via NAP service connection.
+ self.verify_internet(
+ allow_access=True,
+ device=panu_device,
+ exception=signals.TestFailure(
+ 'PANU device "%s" failed to access internet via NAP service '
+ 'connection.' % panu_device.serial))
+ yield
+ finally:
+ # Disable Bluetooth tethering from NAP device.
+ nap_device.set_bluetooth_tethering(status_enabled=False)
+ panu_device.sl4a.setMobileDataEnabled(True)
+
+ def verify_internet(self, allow_access, device, exception):
+ """Verifies that internet is in expected state.
+
+ Continuously make ping request to a URL for internet verification.
+
+ Args:
+ allow_access: Bool, Device can have internet access as expected if True,
+ else no internet access as expected.
+ device: AndroidDevice, Device to be check internet state.
+ exception: Exception, Raised if internet is not in expected state.
+ """
+ device.log.info('Verify that internet %s be used.' %
+ ('can' if allow_access else 'can not'))
+
+ def http_ping():
+ """Returns True if http ping success else False."""
+ try:
+ return bool(device.sl4a.httpPing(TEST_URL))
+ except jsonrpc_client_base.ApiError as e:
+ # ApiError is raised by httpPing() when no internet.
+ device.log.debug(str(e))
+ return False
+
+ bt_test_utils.wait_until(
+ timeout_sec=PING_TIMEOUT_SEC,
+ condition_func=http_ping,
+ func_args=[],
+ expected_value=allow_access,
+ exception=exception,
+ interval_sec=PING_INTERVAL_TIME_SEC)
+
+ def test_gain_internet_and_terminate_nap_connection(self):
+ """Test that DUT can access internet and terminate NAP service connection.
+
+ In this test case, primary device is PANU and secondary device is NAP. While
+ a connection has established between both devices, PANU should be able to
+ use internet and terminate the connection to disable internet access.
+
+ Steps:
+ 1. Establish NAP service connection between both devices.
+ 2. Terminal the connection from PANU device.
+ 3. Verify that PANU device cannot access internet.
+ """
+ with self.establish_nap_service_connection(
+ nap_device=self.secondary_device,
+ panu_device=self.primary_device):
+
+ # Terminate the connection from DUT.
+ self.terminate_nap_service_connection(
+ initiator_device=self.primary_device,
+ connected_mac_addr=self.secondary_device.mac_address)
+
+ # Verify that PANU device cannot access internet.
+ self.verify_internet(
+ allow_access=False,
+ device=self.primary_device,
+ exception=signals.TestFailure(
+ 'PANU device "%s" can still access internet when it had '
+ 'terminated NAP service connection.' %
+ self.primary_device.serial))
+
+ def test_share_internet_and_disable_bluetooth_tethering(self):
+ """Test that DUT can share internet and stop internet sharing.
+
+ In this test case, primary device is NAP and secondary device is PANU. While
+ a connection has established between both devices, NAP should be able to
+ share internet and disable Bluetooth thethering to stop internet sharing.
+
+ Steps:
+ 1. Establish NAP service connection between both devices.
+ 3. Disable Bluetooth tethering from NAP device.
+ 4. Verify that PANU device cannot access internet.
+ """
+ with self.establish_nap_service_connection(
+ nap_device=self.primary_device,
+ panu_device=self.secondary_device):
+
+ # Disable Bluetooth tethering from DUT and check if the nap connection is
+ # terminated.
+ self.primary_device.set_bluetooth_tethering(status_enabled=False)
+ self.wait_for_nap_service_connection(
+ device=self.primary_device,
+ connected_mac_addr=self.secondary_device.mac_address,
+ state_connected=False)
+
+ # Verify that PANU device cannot access internet.
+ self.verify_internet(
+ allow_access=False,
+ device=self.secondary_device,
+ exception=signals.TestFailure(
+ 'PANU device "%s" can still access internet when it had '
+ 'terminated NAP service connection.' %
+ self.secondary_device.serial))
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/pbap/bluetooth_pbap_test.py b/system/blueberry/tests/pbap/bluetooth_pbap_test.py
new file mode 100644
index 0000000000..9c7c032f61
--- /dev/null
+++ b/system/blueberry/tests/pbap/bluetooth_pbap_test.py
@@ -0,0 +1,401 @@
+"""Tests for blueberry.pbap.bluetooth_pbap."""
+
+import os
+import random
+import time
+
+from mobly import asserts
+from mobly import test_runner
+from mobly import signals
+from mobly import utils
+
+from mobly.controllers import android_device
+
+from blueberry.utils import blueberry_base_test
+from blueberry.utils import bt_constants
+from blueberry.utils import bt_test_utils
+
+# The path is used to place the created vcf files.
+STORAGE_PATH = '/storage/emulated/0'
+
+# URI for contacts database.
+CONTACTS_URI = 'content://com.android.contacts/data/phones'
+
+# Number of seconds to wait for contacts and call logs update.
+WAITING_TIMEOUT_SEC = 60
+
+# Number of contacts and call logs to be tested.
+TEST_DATA_COUNT = 1000
+
+# Permissions for Contacts app.
+PERMISSION_LIST = [
+ 'android.permission.READ_CONTACTS',
+ 'android.permission.WRITE_CONTACTS',
+]
+
+
+class BluetoothPbapTest(blueberry_base_test.BlueberryBaseTest):
+ """Test Class for Bluetooth PBAP Test."""
+
+ def setup_class(self):
+ """Standard Mobly setup class."""
+ super(BluetoothPbapTest, self).setup_class()
+
+ # Bluetooth carkit which role is Phone Book Client Equipment (PCE).
+ self.derived_bt_device = self.derived_bt_devices[0]
+
+ # Primary phone which role is Phone Book Server Equipment (PSE).
+ self.pri_phone = self.android_devices[0]
+ self.pri_phone.init_setup()
+ self.pri_phone.sl4a_setup()
+ self.derived_bt_device.add_sec_ad_device(self.pri_phone)
+
+ # Grant the permissions to Contacts app.
+ for device in [self.pri_phone, self.derived_bt_device]:
+ required_permissions = PERMISSION_LIST
+ # App requires READ_EXTERNAL_STORAGE to read contacts if SDK < 30.
+ if int(device.build_info['build_version_sdk']) < 30:
+ required_permissions.append('android.permission.READ_EXTERNAL_STORAGE')
+ for permission in required_permissions:
+ device.adb.shell('pm grant com.google.android.contacts %s' % permission)
+ self.pse_mac_address = self.pri_phone.get_bluetooth_mac_address()
+ mac_address = self.derived_bt_device.get_bluetooth_mac_address()
+ self.derived_bt_device.activate_pairing_mode()
+ self.pri_phone.pair_and_connect_bluetooth(mac_address)
+ # Sleep until the connection stabilizes.
+ time.sleep(5)
+
+ # Allow permission access for PBAP profile.
+ self.pri_phone.sl4a.bluetoothChangeProfileAccessPermission(
+ mac_address,
+ bt_constants.BluetoothProfile.PBAP.value,
+ bt_constants.BluetoothAccessLevel.ACCESS_ALLOWED.value)
+
+ def setup_test(self):
+ super(BluetoothPbapTest, self).setup_test()
+ # Make sure PBAP is not connected before running tests.
+ self._terminate_pbap_connection()
+
+ def _import_vcf_to_pse(self, file_name, expected_contact_count):
+ """Imports the vcf file to PSE."""
+ # Open ImportVcardActivity and click "OK" in the pop-up dialog, then
+ # PickActivity will be launched and browses the existing vcf files.
+ self.pri_phone.adb.shell(
+ 'am start com.google.android.contacts/'
+ 'com.google.android.apps.contacts.vcard.ImportVCardActivity')
+ self.pri_phone.aud(text='OK').click()
+
+ # Check if the vcf file appears in the PickActivity.
+ if not self.pri_phone.aud(text=file_name).exists():
+ raise android_device.DeviceError(
+ self.pri_phone,
+ 'No file name matches "%s" in PickActivity.' % file_name)
+
+ # TODO(user): Remove the check of code name for S build.
+ if (self.pri_phone.build_info['build_version_codename'] != 'S' and
+ int(self.pri_phone.build_info['build_version_sdk']) <= 30):
+ # Since `adb shell input tap` cannot work in PickActivity before R build,
+ # send TAB and ENETER Key events to select and import the vcf file.
+ if self.pri_phone.aud(content_desc='Grid view').exists():
+ # Switch Grid mode since ENTER Key event cannot work in List mode on
+ # git_rvc-d2-release branch.
+ self.pri_phone.aud(content_desc='Grid view').click()
+ self.pri_phone.aud.send_key_code('KEYCODE_TAB')
+ self.pri_phone.aud.send_key_code('KEYCODE_ENTER')
+ else:
+ self.pri_phone.aud(text=file_name).click()
+ self.pri_phone.log.info('Importing "%s"...' % file_name)
+ current_count = self._wait_and_get_contact_count(
+ self.pri_phone, expected_contact_count, WAITING_TIMEOUT_SEC)
+ if current_count != expected_contact_count:
+ raise android_device.DeviceError(
+ self.pri_phone,
+ 'Failed to import %d contact(s) within %ds. Actual count: %d' %
+ (expected_contact_count, WAITING_TIMEOUT_SEC, current_count))
+ self.pri_phone.log.info(
+ 'Successfully added %d contact(s).' % current_count)
+
+ def _generate_contacts_on_pse(self,
+ num_of_contacts,
+ first_name=None,
+ last_name=None,
+ phone_number=None):
+ """Generates contacts to be tested on PSE."""
+ vcf_file = bt_test_utils.create_vcf_from_vcard(
+ output_path=self.pri_phone.log_path,
+ num_of_contacts=num_of_contacts,
+ first_name=first_name,
+ last_name=last_name,
+ phone_number=phone_number)
+ self.pri_phone.adb.push([vcf_file, STORAGE_PATH])
+ # For R build, since the pushed vcf file probably not found when importing
+ # contacts, do a media scan to recognize the file.
+ if int(self.pri_phone.build_info['build_version_sdk']) > 29:
+ self.pri_phone.adb.shell('content call --uri content://media/ --method '
+ 'scan_volume --arg external_primary')
+ file_name = vcf_file.split('/')[-1]
+ self._import_vcf_to_pse(file_name, num_of_contacts)
+ self.pri_phone.adb.shell(
+ 'rm -rf %s' % os.path.join(STORAGE_PATH, file_name))
+
+ def _generate_call_logs_on_pse(self, call_log_type, num_of_call_logs):
+ """Generates call logs to be tested on PSE."""
+ self.pri_phone.log.info('Putting %d call log(s) which type are "%s"...' %
+ (num_of_call_logs, call_log_type))
+ for _ in range(num_of_call_logs):
+ self.pri_phone.sl4a.callLogsPut(dict(
+ type=call_log_type,
+ number='8809%d' % random.randrange(int(10e8)),
+ time=int(1000 * float(self.pri_phone.adb.shell('date +%s.%N')))))
+ current_count = self._wait_and_get_call_log_count(
+ self.pri_phone,
+ call_log_type,
+ num_of_call_logs,
+ WAITING_TIMEOUT_SEC)
+ if current_count != num_of_call_logs:
+ raise android_device.DeviceError(
+ self.pri_phone,
+ 'Failed to generate %d call log(s) within %ds. '
+ 'Actual count: %d, Call log type: %s' %
+ (num_of_call_logs, WAITING_TIMEOUT_SEC, current_count, call_log_type))
+ self.pri_phone.log.info(
+ 'Successfully added %d call log(s).' % current_count)
+
+ def _wait_and_get_contact_count(self,
+ device,
+ expected_contact_count,
+ timeout_sec):
+ """Waits for contact update for a period time and returns contact count.
+
+ This method should be used when a device imports some new contacts. It can
+ wait some time for contact update until expectation or timeout and then
+ return contact count.
+
+ Args:
+ device: AndroidDevice, Mobly Android controller class.
+ expected_contact_count: Int, Number of contacts as expected.
+ timeout_sec: Int, Number of seconds to wait for contact update.
+
+ Returns:
+ current_count: Int, number of the existing contacts on the device.
+ """
+ start_time = time.time()
+ end_time = start_time + timeout_sec
+ current_count = 0
+ while time.time() < end_time:
+ current_count = device.sl4a.contactsGetCount()
+ if current_count == expected_contact_count:
+ break
+ # Interval between attempts to get contacts.
+ time.sleep(1)
+ if current_count != expected_contact_count:
+ device.log.warning(
+ 'Failed to get expected contact count: %d. '
+ 'Actual contact count: %d.' %
+ (expected_contact_count, current_count))
+ return current_count
+
+ def _wait_and_get_call_log_count(self,
+ device,
+ call_log_type,
+ expected_call_log_count,
+ timeout_sec):
+ """Waits for call log update for a period time and returns call log count.
+
+ This method should be used when a device adds some new call logs. It can
+ wait some time for call log update until expectation or timeout and then
+ return call log count.
+
+ Args:
+ device: AndroidDevice, Mobly Android controller class.
+ call_log_type: String, Type of the call logs.
+ expected_call_log_count: Int, Number of call logs as expected.
+ timeout_sec: Int, Number of seconds to wait for call log update.
+
+ Returns:
+ current_count: Int, number of the existing call logs on the device.
+ """
+ start_time = time.time()
+ end_time = start_time + timeout_sec
+ current_count = 0
+ while time.time() < end_time:
+ current_count = len(device.sl4a.callLogsGet(call_log_type))
+ if current_count == expected_call_log_count:
+ break
+ # Interval between attempts to get call logs.
+ time.sleep(1)
+ if current_count != expected_call_log_count:
+ device.log.warning(
+ 'Failed to get expected call log count: %d. '
+ 'Actual call log count: %d.' %
+ (expected_call_log_count, current_count))
+ return current_count
+
+ def _terminate_pbap_connection(self):
+ status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
+ self.pse_mac_address)
+ if status == bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
+ return
+ self.derived_bt_device.log.info('Disconnecting PBAP...')
+ self.derived_bt_device.sl4a.bluetoothPbapClientDisconnect(
+ self.pse_mac_address)
+ # Buffer for the connection status check.
+ time.sleep(3)
+ status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
+ self.pse_mac_address)
+ if status != bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
+ raise signals.TestError('PBAP connection failed to be terminated.')
+ self.derived_bt_device.log.info('Successfully disconnected PBAP.')
+
+ def test_download_contacts(self):
+ """Test for the feature of downloading contacts.
+
+ Tests that PCE can download contacts from PSE.
+ """
+ # Make sure no any contacts exist on the devices.
+ for device in [self.pri_phone, self.derived_bt_device]:
+ device.sl4a.contactsEraseAll()
+
+ # Add contacts to PSE.
+ self._generate_contacts_on_pse(TEST_DATA_COUNT)
+
+ # When PCE is connected to PSE, it will download PSE's contacts.
+ self.derived_bt_device.pbap_connect()
+ self.derived_bt_device.log.info('Downloading contacts from PSE...')
+ current_count = self._wait_and_get_contact_count(
+ self.derived_bt_device, TEST_DATA_COUNT, WAITING_TIMEOUT_SEC)
+ self.derived_bt_device.log.info(
+ 'Successfully downloaded %d contact(s).' % current_count)
+
+ asserts.assert_true(
+ current_count == TEST_DATA_COUNT,
+ 'PCE failed to download %d contact(s) within %ds, '
+ 'actually downloaded %d contact(s).' %
+ (TEST_DATA_COUNT, WAITING_TIMEOUT_SEC, current_count))
+
+ def test_download_call_logs(self):
+ """Test for the feature of downloading call logs.
+
+ Tests that PCE can download incoming/outgoing/missed call logs from PSE.
+ """
+ # Make sure no any call logs exist on the devices.
+ for device in [self.pri_phone, self.derived_bt_device]:
+ device.sl4a.callLogsEraseAll()
+
+ call_log_types = [
+ bt_constants.INCOMING_CALL_LOG_TYPE,
+ bt_constants.OUTGOING_CALL_LOG_TYPE,
+ bt_constants.MISSED_CALL_LOG_TYPE,
+ ]
+ for call_log_type in call_log_types:
+ # Add call logs to PSE.
+ self._generate_call_logs_on_pse(call_log_type, TEST_DATA_COUNT)
+
+ # When PCE is connected to PSE, it will download PSE's contacts.
+ self.derived_bt_device.pbap_connect()
+ self.derived_bt_device.log.info('Downloading call logs...')
+
+ for call_log_type in call_log_types:
+ current_count = self._wait_and_get_call_log_count(
+ self.derived_bt_device,
+ call_log_type,
+ TEST_DATA_COUNT,
+ WAITING_TIMEOUT_SEC)
+ self.derived_bt_device.log.info(
+ 'Successfully downloaded %d call log(s) which type are "%s".' %
+ (current_count, call_log_type))
+
+ asserts.assert_true(
+ current_count == TEST_DATA_COUNT,
+ 'PCE failed to download %d call log(s) which type are "%s" within %ds'
+ ', actually downloaded %d call log(s).' %
+ (TEST_DATA_COUNT, call_log_type, WAITING_TIMEOUT_SEC, current_count))
+
+ def test_show_caller_name(self):
+ """Test for caller name of the incoming phone call is correct on PCE.
+
+ Tests that caller name matches contact name which is downloaded via PBAP.
+ """
+ # Checks if two android devices exist.
+ if len(self.android_devices) < 2:
+ raise signals.TestError('This test requires two Android devices.')
+ primary_phone = self.pri_phone
+ secondary_phone = self.android_devices[1]
+ secondary_phone.init_setup()
+ for phone in [primary_phone, secondary_phone]:
+ # Checks if SIM state is loaded for every devices.
+ if not phone.is_sim_state_loaded():
+ raise signals.TestError(f'Please insert a SIM Card to the phone '
+ f'"{phone.serial}".')
+ # Checks if phone_number is provided in the support dimensions.
+ phone.phone_number = phone.dimensions.get('phone_number')
+ if not phone.phone_number:
+ raise signals.TestError(f'Please add "phone_number" to support '
+ f'dimensions of the phone "{phone.serial}".')
+ # Make sure no any contacts exist on the devices.
+ for device in [primary_phone, self.derived_bt_device]:
+ device.sl4a.contactsEraseAll()
+ # Generate a contact name randomly.
+ first_name = utils.rand_ascii_str(4)
+ last_name = utils.rand_ascii_str(4)
+ full_name = f'{first_name} {last_name}'
+ primary_phone.log.info('Creating a contact "%s"...', full_name)
+ self._generate_contacts_on_pse(
+ num_of_contacts=1,
+ first_name=first_name,
+ last_name=last_name,
+ phone_number=secondary_phone.phone_number)
+ self.derived_bt_device.log.info('Connecting to PSE...')
+ self.derived_bt_device.pbap_connect()
+ self.derived_bt_device.log.info('Downloading contacts from PSE...')
+ current_count = self._wait_and_get_contact_count(
+ device=self.derived_bt_device,
+ expected_contact_count=1,
+ timeout_sec=WAITING_TIMEOUT_SEC)
+ self.derived_bt_device.log.info('Successfully downloaded %d contact(s).',
+ current_count)
+ asserts.assert_equal(
+ first=current_count,
+ second=1,
+ msg=f'Failed to download the contact "{full_name}".')
+ secondary_phone.sl4a.telecomCallNumber(primary_phone.phone_number)
+ secondary_phone.log.info('Made a phone call to device "%s".',
+ primary_phone.serial)
+ primary_phone.log.info('Waiting for the incoming call from device "%s"...',
+ secondary_phone.serial)
+ is_ringing = primary_phone.wait_for_call_state(
+ bt_constants.CALL_STATE_RINGING,
+ bt_constants.CALL_STATE_TIMEOUT_SEC)
+ if not is_ringing:
+ raise signals.TestError(
+ f'Timed out after {bt_constants.CALL_STATE_TIMEOUT_SEC}s waiting for '
+ f'the incoming call from device "{secondary_phone.serial}".')
+ try:
+ self.derived_bt_device.aud.open_notification()
+ target_node_match_dict = {
+ 'resource_id': 'android:id/line1',
+ 'child': {
+ 'resource_id': 'android:id/title'
+ }
+ }
+ hfp_address = primary_phone.get_bluetooth_mac_address()
+ target_node = self.derived_bt_device.aud(
+ sibling=target_node_match_dict,
+ text=f'Incoming call via HFP {hfp_address}')
+ caller_name = target_node.get_attribute_value('text')
+ message = (f'Caller name is incorrect. Actual: {caller_name}, '
+ f'Correct: {full_name}')
+ # Asserts that caller name of the incoming phone call is correct in the
+ # notification bar.
+ asserts.assert_equal(
+ first=caller_name,
+ second=full_name,
+ msg=message)
+ finally:
+ # Recovery actions.
+ self.derived_bt_device.aud.close_notification()
+ secondary_phone.sl4a.telecomEndCall()
+
+
+if __name__ == '__main__':
+ test_runner.main()
diff --git a/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py b/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py
new file mode 100644
index 0000000000..faa1151c77
--- /dev/null
+++ b/system/blueberry/tests/pbat/bluetooth_acceptance_suite.py
@@ -0,0 +1,71 @@
+"""Suite collecting bluetooth test classes for acceptance testing."""
+
+import logging
+
+from mobly import test_runner_suite
+
+from blueberry.tests.a2dp import bluetooth_a2dp_test
+from blueberry.tests.avrcp import bluetooth_avrcp_test
+from blueberry.tests.connectivity import ble_pairing_test
+from blueberry.tests.connectivity import bluetooth_pairing_test
+from blueberry.tests.hfp import bluetooth_hfp_test
+from blueberry.tests.map import bluetooth_map_test
+from blueberry.tests.opp import bluetooth_opp_test
+from blueberry.tests.pan import bluetooth_pan_test
+from blueberry.tests.pbap import bluetooth_pbap_test
+
+# Test classes for the Bluetooth acceptance suite.
+TEST_CLASSES = [
+ bluetooth_pairing_test.BluetoothPairingTest,
+ ble_pairing_test.BlePairingTest,
+ bluetooth_a2dp_test.BluetoothA2dpTest,
+ bluetooth_avrcp_test.BluetoothAvrcpTest,
+ bluetooth_hfp_test.BluetoothHfpTest,
+ bluetooth_map_test.BluetoothMapTest,
+ bluetooth_pbap_test.BluetoothPbapTest,
+ bluetooth_opp_test.BluetoothOppTest,
+ bluetooth_pan_test.BluetoothPanTest
+]
+
+
+class BluetoothAcceptanceSuite(mobly_g3_suite.BaseSuite):
+ """Bluetooth Acceptance Suite.
+
+ Usage of Test selector:
+ Add the parameter "acceptance_test_selector" in the Mobly configuration, it's
+ value is like "test_method_1,test_method_2,...". If this parameter is not
+ used, all tests will be running.
+ """
+
+ def setup_suite(self, config):
+ selected_tests = None
+ selector = config.user_params.get('acceptance_test_selector')
+ if selector:
+ selected_tests = selector.split(',')
+ logging.info('Selected tests: %s', ' '.join(selected_tests))
+ # Enable all Bluetooth logging in the first test.
+ first_test_config = config.copy()
+ first_test_config.user_params.update({
+ 'enable_all_bluetooth_logging': 1,
+ })
+ for index, clazz in enumerate(TEST_CLASSES):
+ if selected_tests:
+ matched_tests = None
+ # Gets the same elements between selected_tests and dir(clazz).
+ matched_tests = list(set(selected_tests) & set(dir(clazz)))
+ # Adds the test class if it contains the selected tests.
+ if matched_tests:
+ self.add_test_class(
+ clazz=clazz,
+ config=first_test_config if index == 0 else config,
+ tests=matched_tests)
+ logging.info('Added the tests of "%s": %s', clazz.__name__,
+ ' '.join(matched_tests))
+ else:
+ self.add_test_class(
+ clazz=clazz,
+ config=first_test_config if index == 0 else config)
+
+
+if __name__ == '__main__':
+ mobly_g3_suite.main()
diff --git a/system/blueberry/utils/android_bluetooth_decorator.py b/system/blueberry/utils/android_bluetooth_decorator.py
new file mode 100644
index 0000000000..c5708971f0
--- /dev/null
+++ b/system/blueberry/utils/android_bluetooth_decorator.py
@@ -0,0 +1,1729 @@
+# Lint as: python3
+"""AndroidBluetoothDecorator class.
+
+This decorator is used for giving an AndroidDevice Bluetooth-specific
+functionality.
+"""
+
+import datetime
+import logging
+import os
+import queue
+import random
+import re
+import string
+import time
+from typing import Dict, Any, Text, Optional, Tuple, Sequence, Union
+from mobly import asserts
+from mobly import logger as mobly_logger
+from mobly import signals
+from mobly import utils
+from mobly.controllers.android_device import AndroidDevice
+from mobly.controllers.android_device_lib import adb
+from mobly.controllers.android_device_lib import jsonrpc_client_base
+from mobly.controllers.android_device_lib.services import sl4a_service
+# Internal import
+from blueberry.controllers.derived_bt_device import BtDevice
+from blueberry.utils import bt_constants
+from blueberry.utils import bt_test_utils
+from blueberry.utils.bt_constants import AvrcpEvent
+from blueberry.utils.bt_constants import BluetoothConnectionPolicy
+from blueberry.utils.bt_constants import BluetoothConnectionStatus
+from blueberry.utils.bt_constants import BluetoothProfile
+from blueberry.utils.bt_constants import CallLogType
+from blueberry.utils.bt_constants import CallState
+
+
+# Map for media passthrough commands and the corresponding events.
+MEDIA_CMD_MAP = {
+ bt_constants.CMD_MEDIA_PAUSE: bt_constants.EVENT_PAUSE_RECEIVED,
+ bt_constants.CMD_MEDIA_PLAY: bt_constants.EVENT_PLAY_RECEIVED,
+ bt_constants.CMD_MEDIA_SKIP_PREV: bt_constants.EVENT_SKIP_PREV_RECEIVED,
+ bt_constants.CMD_MEDIA_SKIP_NEXT: bt_constants.EVENT_SKIP_NEXT_RECEIVED
+}
+
+# Timeout for track change and playback state update in second.
+MEDIA_UPDATE_TIMEOUT_SEC = 3
+
+# Timeout for the event of Media passthrough commands in second.
+MEDIA_EVENT_TIMEOUT_SEC = 1
+
+BT_CONNECTION_WAITING_TIME_SECONDS = 10
+
+ADB_WAITING_TIME_SECONDS = 1
+
+# Common timeout for toggle status in seconds.
+COMMON_TIMEOUT_SECONDS = 5
+
+# Local constant
+_DATETIME_FMT = '%m-%d %H:%M:%S.%f'
+
+# Interval time between ping requests in second.
+PING_INTERVAL_TIME_SEC = 2
+
+# Timeout to wait for ping success in second.
+PING_TIMEOUT_SEC = 60
+
+# A URL is used to verify internet by ping request.
+TEST_URL = 'http://www.google.com'
+
+
+class DiscoveryError(signals.ControllerError):
+ """Exception raised for Bluetooth device discovery failures."""
+ pass
+
+
+class AndroidBluetoothDecorator(AndroidDevice):
+ """Decorates an AndroidDevice with Bluetooth-specific functionality."""
+
+ def __init__(self, ad: AndroidDevice):
+ self._ad = ad
+ self._user_params = None
+ if not self._ad or not isinstance(self._ad, AndroidDevice):
+ raise TypeError('Must apply AndroidBluetoothDecorator to an '
+ 'AndroidDevice')
+ self.ble_advertise_callback = None
+ self.regex_logcat_time = re.compile(
+ r'(?P<datetime>[\d]{2}-[\d]{2} [\d]{2}:[\d]{2}:[\d]{2}.[\d]{3})'
+ r'[ ]+\d+.*')
+ self._regex_bt_crash = re.compile(
+ r'Bluetooth crashed (?P<num_bt_crashes>\d+) times')
+
+ def __getattr__(self, name: Any) -> Any:
+ return getattr(self._ad, name)
+
+ def _is_device_connected(self, mac_address):
+ """Wrapper method to help with unit testability of this class."""
+ return self._ad.sl4a.bluetoothIsDeviceConnected(mac_address)
+
+ def _is_profile_connected(self, mac_address, profile):
+ """Checks if the profile is connected."""
+ status = None
+ pri_ad = self._ad
+ if profile == BluetoothProfile.HEADSET_CLIENT:
+ status = pri_ad.sl4a.bluetoothHfpClientGetConnectionStatus(mac_address)
+ elif profile == BluetoothProfile.A2DP_SINK:
+ status = pri_ad.sl4a.bluetoothA2dpSinkGetConnectionStatus(mac_address)
+ elif profile == BluetoothProfile.PBAP_CLIENT:
+ status = pri_ad.sl4a.bluetoothPbapClientGetConnectionStatus(mac_address)
+ elif profile == BluetoothProfile.MAP_MCE:
+ connected_devices = self._ad.sl4a.bluetoothMapClientGetConnectedDevices()
+ return any(
+ mac_address in device['address'] for device in connected_devices)
+ else:
+ pri_ad.log.warning(
+ 'The connection check for profile %s is not supported '
+ 'yet', profile)
+ return False
+ return status == BluetoothConnectionStatus.STATE_CONNECTED
+
+ def _get_bluetooth_le_state(self):
+ """Wrapper method to help with unit testability of this class."""
+ return self._ad.sl4a.bluetoothGetLeState
+
+ def _generate_id_by_size(self, size):
+ """Generate string of random ascii letters and digits.
+
+ Args:
+ size: required size of string.
+
+ Returns:
+ String of random chars.
+ """
+ return ''.join(
+ random.choice(string.ascii_letters + string.digits)
+ for _ in range(size))
+
+ def _wait_for_bluetooth_manager_state(self,
+ state=None,
+ timeout=10,
+ threshold=5):
+ """Waits for Bluetooth normalized state or normalized explicit state.
+
+ Args:
+ state: expected Bluetooth state
+ timeout: max timeout threshold
+ threshold: list len of bt state
+ Returns:
+ True if successful, false if unsuccessful.
+ """
+ all_states = []
+ start_time = time.time()
+ while time.time() < start_time + timeout:
+ all_states.append(self._get_bluetooth_le_state())
+ if len(all_states) >= threshold:
+ # for any normalized state
+ if state is None:
+ if len(all_states[-threshold:]) == 1:
+ logging.info('State normalized %s', all_states[-threshold:])
+ return True
+ else:
+ # explicit check against normalized state
+ if state in all_states[-threshold:]:
+ return True
+ time.sleep(0.5)
+ logging.error(
+ 'Bluetooth state fails to normalize' if state is None else
+ 'Failed to match bluetooth state, current state {} expected state {}'
+ .format(self._get_bluetooth_le_state(), state))
+ return False
+
+ def init_setup(self) -> None:
+ """Sets up android device for bluetooth tests."""
+ self._ad.services.register('sl4a', sl4a_service.Sl4aService)
+ self._ad.load_snippet('mbs', 'com.google.android.mobly.snippet.bundled')
+ self._ad.adb.shell('setenforce 0')
+
+ # Adds 2 seconds waiting time to see it can fix the NullPointerException
+ # when executing the following sl4a.bluetoothStartPairingHelper method.
+ time.sleep(2)
+ self._ad.sl4a.bluetoothStartPairingHelper()
+ self.factory_reset_bluetooth()
+
+ def sl4a_setup(self) -> None:
+ """A common setup routine for android device sl4a function.
+
+ Things this method setup:
+ 1. Set Bluetooth local name to random string of size 4
+ 2. Disable BLE background scanning.
+ """
+
+ sl4a = self._ad.sl4a
+ sl4a.bluetoothStartConnectionStateChangeMonitor('')
+ setup_result = sl4a.bluetoothSetLocalName(self._generate_id_by_size(4))
+ if not setup_result:
+ self.log.error('Failed to set device name.')
+ return
+ sl4a.bluetoothDisableBLE()
+ bonded_devices = sl4a.bluetoothGetBondedDevices()
+ for b in bonded_devices:
+ self.log.info('Removing bond for device {}'.format(b['address']))
+ sl4a.bluetoothUnbond(b['address'])
+
+ def set_user_params(self, params: Dict[str, Any]) -> None:
+ self._user_params = params
+
+ def get_user_params(self) -> Dict[str, Any]:
+ return self._user_params
+
+ def is_sim_state_loaded(self) -> bool:
+ """Checks if SIM state is loaded.
+
+ Returns:
+ True if SIM state is loaded else False.
+ """
+ state = self._ad.adb.shell('getprop gsm.sim.state').decode().strip()
+ return state == 'LOADED'
+
+ def is_package_installed(self, package_name: str) -> bool:
+ """Checks if a package is installed.
+
+ Args:
+ package_name: string, a package to be checked.
+
+ Returns:
+ True if the package is installed else False.
+ """
+ # The package is installed if result is 1, not installed if result is 0.
+ result = int(self._ad.adb.shell('pm list packages | grep -i %s$ | wc -l' %
+ package_name))
+ return bool(result)
+
+ def connect_with_rfcomm(self, other_ad: AndroidDevice) -> bool:
+ """Establishes an RFCOMM connection with other android device.
+
+ Connects this android device (as a client) to the other android device
+ (as a server).
+
+ Args:
+ other_ad: the Android device accepting the connection from this device.
+
+ Returns:
+ True if connection was successful, False if unsuccessful.
+ """
+ server_address = other_ad.sl4a.bluetoothGetLocalAddress()
+ logging.info('Pairing and connecting devices')
+ if not self._ad.sl4a.bluetoothDiscoverAndBond(server_address):
+ logging.info('Failed to pair and connect devices')
+ return False
+
+ # Create RFCOMM connection
+ logging.info('establishing RFCOMM connection')
+ return self.orchestrate_rfcomm_connection(other_ad)
+
+ def orchestrate_rfcomm_connection(
+ self,
+ other_ad: AndroidDevice,
+ accept_timeout_ms: int = bt_constants.DEFAULT_RFCOMM_TIMEOUT_MS,
+ uuid: Optional[Text] = None) -> bool:
+ """Sets up the RFCOMM connection to another android device.
+
+ It sets up the connection with a Bluetooth Socket connection with other
+ device.
+
+ Args:
+ other_ad: the Android device accepting the connection from this device.
+ accept_timeout_ms: the timeout in ms for the connection.
+ uuid: universally unique identifier.
+
+ Returns:
+ True if connection was successful, False if unsuccessful.
+ """
+ if uuid is None:
+ uuid = bt_constants.BT_RFCOMM_UUIDS['default_uuid']
+ other_ad.sl4a.bluetoothStartPairingHelper()
+ self._ad.sl4a.bluetoothStartPairingHelper()
+ other_ad.sl4a.bluetoothSocketConnBeginAcceptThreadUuid(uuid,
+ accept_timeout_ms)
+ self._ad.sl4a.bluetoothSocketConnBeginConnectThreadUuid(
+ other_ad.sl4a.bluetoothGetLocalAddress(), uuid)
+
+ end_time = time.time() + bt_constants.BT_DEFAULT_TIMEOUT_SECONDS
+ test_result = True
+
+ while time.time() < end_time:
+ number_socket_connections = len(
+ other_ad.sl4a.bluetoothSocketConnActiveConnections())
+ connected = number_socket_connections > 0
+ if connected:
+ test_result = True
+ other_ad.log.info('Bluetooth socket Client Connection Active')
+ break
+ else:
+ test_result = False
+ time.sleep(1)
+ if not test_result:
+ other_ad.log.error('Failed to establish a Bluetooth socket connection')
+ return False
+ return True
+
+ def wait_for_discovery_success(
+ self,
+ mac_address: str,
+ timeout: float = 30) -> float:
+ """Waits for a device to be discovered by AndroidDevice.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ timeout: Number of seconds to wait for device discovery.
+
+ Returns:
+ discovery_time: The time it takes to pair in seconds.
+
+ Raises:
+ DiscoveryError
+ """
+ start_time = time.time()
+ try:
+ self._ad.ed.wait_for_event('Discovery%s' % mac_address,
+ lambda x: x['data']['Status'], timeout)
+ discovery_time = time.time() - start_time
+ return discovery_time
+
+ except queue.Empty:
+ raise DiscoveryError('Failed to discover device %s after %d seconds' %
+ (mac_address, timeout))
+
+ def wait_for_pairing_success(
+ self,
+ mac_address: str,
+ timeout: float = 30) -> float:
+ """Waits for a device to pair with the AndroidDevice.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ timeout: Number of seconds to wait for the devices to pair.
+
+ Returns:
+ pairing_time: The time it takes to pair in seconds.
+
+ Raises:
+ ControllerError
+ """
+ start_time = time.time()
+ try:
+ self._ad.ed.wait_for_event('Bond%s' % mac_address,
+ lambda x: x['data']['Status'], timeout)
+ pairing_time = time.time() - start_time
+ return pairing_time
+
+ except queue.Empty:
+ raise signals.ControllerError(
+ 'Failed to bond with device %s after %d seconds' %
+ (mac_address, timeout))
+
+ def wait_for_connection_success(
+ self,
+ mac_address: str,
+ timeout: int = 30) -> float:
+ """Waits for a device to connect with the AndroidDevice.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ timeout: Number of seconds to wait for the devices to connect.
+
+ Returns:
+ connection_time: The time it takes to connect in seconds.
+
+ Raises:
+ ControllerError
+ """
+ start_time = time.time()
+ end_time = start_time + timeout
+ while time.time() < end_time:
+ if self._is_device_connected(mac_address):
+ connection_time = (time.time() - start_time)
+ logging.info('Connected device %s in %d seconds', mac_address,
+ connection_time)
+ return connection_time
+
+ raise signals.ControllerError(
+ 'Failed to connect device within %d seconds.' % timeout)
+
+ def factory_reset_bluetooth(self) -> None:
+ """Factory resets Bluetooth on an AndroidDevice."""
+
+ logging.info('Factory resetting Bluetooth for AndroidDevice.')
+ self._ad.sl4a.bluetoothToggleState(True)
+ paired_devices = self._ad.mbs.btGetPairedDevices()
+ for device in paired_devices:
+ self._ad.sl4a.bluetoothUnbond(device['Address'])
+ self._ad.sl4a.bluetoothFactoryReset()
+ self._wait_for_bluetooth_manager_state()
+ self._ad.sl4a.bluetoothToggleState(True)
+
+ def get_device_info(self) -> Dict[str, Any]:
+ """Gets the configuration info of an AndroidDevice.
+
+ Returns:
+ dict, A dictionary mapping metric keys to their respective values.
+ """
+
+ device_info = {
+ 'device_class':
+ 'AndroidDevice',
+ 'device_model':
+ self._ad.device_info['model'],
+ 'hardware_version':
+ self._ad.adb.getprop('ro.boot.hardware.revision'),
+ 'software_version':
+ self._ad.build_info['build_id'],
+ 'android_build_type':
+ self._ad.build_info['build_type'],
+ 'android_build_number':
+ self._ad.adb.getprop('ro.build.version.incremental'),
+ 'android_release_id':
+ self._ad.build_info['build_id']
+ }
+
+ return device_info
+
+ def pair_and_connect_bluetooth(
+ self,
+ mac_address: str,
+ attempts: int = 3,
+ enable_pairing_retry: bool = True) -> Tuple[float, float]:
+ """Pairs and connects an AndroidDevice with a peripheral Bluetooth device.
+
+ Ensures that an AndroidDevice is paired and connected to a peripheral
+ device. If the devices are already connected, does nothing. If
+ the devices are paired but not connected, connects the devices. If the
+ devices are neither paired nor connected, this method pairs and connects the
+ devices.
+
+ Suggests to use the retry mechanism on Discovery because it sometimes fail
+ even if the devices are testing in shielding. In order to avoid the remote
+ device may not respond a incoming pairing request causing to bonding failure
+ , it suggests to retry pairing too.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ attempts: Number of attempts to discover and pair the peripheral device.
+ enable_pairing_retry: Bool to control whether the retry mechanism is used
+ on bonding failure, it's enabled if True.
+
+ Returns:
+ pairing_time: The time, in seconds, it takes to pair the devices.
+ connection_time: The time, in seconds, it takes to connect the
+ devices after pairing is completed.
+
+ Raises:
+ DiscoveryError: Raised if failed to discover the peripheral device.
+ ControllerError: Raised if failed to bond the peripheral device.
+ """
+
+ connected = self._is_device_connected(mac_address)
+ pairing_time = 0
+ connection_time = 0
+ if connected:
+ logging.info('Device %s already paired and connected', mac_address)
+ return pairing_time, connection_time
+
+ paired_devices = [device['address'] for device in
+ self._ad.sl4a.bluetoothGetBondedDevices()]
+ if mac_address in paired_devices:
+ self._ad.sl4a.bluetoothConnectBonded(mac_address)
+ return pairing_time, self.wait_for_connection_success(mac_address)
+
+ logging.info('Initiate pairing to the device "%s".', mac_address)
+ for i in range(attempts):
+ self._ad.sl4a.bluetoothDiscoverAndBond(mac_address)
+ try:
+ self.wait_for_discovery_success(mac_address)
+ pairing_time = self.wait_for_pairing_success(mac_address)
+ break
+ except DiscoveryError:
+ if i + 1 < attempts:
+ logging.error(
+ 'Failed to find the device "%s" on Attempt %d. '
+ 'Retrying discovery...', mac_address, i + 1)
+ continue
+ raise DiscoveryError('Failed to find the device "%s".' % mac_address)
+ except signals.ControllerError:
+ if i + 1 < attempts and enable_pairing_retry:
+ logging.error(
+ 'Failed to bond the device "%s" on Attempt %d. '
+ 'Retrying pairing...', mac_address, i + 1)
+ continue
+ raise signals.ControllerError('Failed to bond the device "%s".' %
+ mac_address)
+
+ connection_time = self.wait_for_connection_success(mac_address)
+ return pairing_time, connection_time
+
+ def disconnect_bluetooth(
+ self,
+ mac_address: str,
+ timeout: float = 30) -> float:
+ """Disconnects Bluetooth between an AndroidDevice and peripheral device.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ timeout: Number of seconds to wait for the devices to disconnect the
+ peripheral device.
+
+ Returns:
+ disconnection_time: The time, in seconds, it takes to disconnect the
+ peripheral device.
+
+ Raises:
+ ControllerError: Raised if failed to disconnect the peripheral device.
+ """
+ if not self._is_device_connected(mac_address):
+ logging.info('Device %s already disconnected', mac_address)
+ return 0
+
+ self._ad.sl4a.bluetoothDisconnectConnected(mac_address)
+ start_time = time.time()
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ connected = self._is_device_connected(mac_address)
+ if not connected:
+ logging.info('Device %s disconnected successfully.', mac_address)
+ return time.time() - start_time
+
+ raise signals.ControllerError(
+ 'Failed to disconnect device within %d seconds.' % timeout)
+
+ def connect_bluetooth(self, mac_address: str, timeout: float = 30) -> float:
+ """Connects Bluetooth between an AndroidDevice and peripheral device.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ timeout: Number of seconds to wait for the devices to connect the
+ peripheral device.
+
+ Returns:
+ connection_time: The time, in seconds, it takes to connect the
+ peripheral device.
+
+ Raises:
+ ControllerError: Raised if failed to connect the peripheral device.
+ """
+ if self._is_device_connected(mac_address):
+ logging.info('Device %s already connected', mac_address)
+ return 0
+
+ self._ad.sl4a.bluetoothConnectBonded(mac_address)
+ connect_time = self.wait_for_connection_success(mac_address)
+
+ return connect_time
+
+ def activate_pairing_mode(self) -> None:
+ """Activates pairing mode on an AndroidDevice."""
+ logging.info('Activating pairing mode on AndroidDevice.')
+ self._ad.sl4a.bluetoothMakeDiscoverable()
+ self._ad.sl4a.bluetoothStartPairingHelper()
+
+ def activate_ble_pairing_mode(self) -> None:
+ """Activates BLE pairing mode on an AndroidDevice."""
+ self.ble_advertise_callback = self._ad.sl4a.bleGenBleAdvertiseCallback()
+ self._ad.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
+ # Sets advertise mode to low latency.
+ self._ad.sl4a.bleSetAdvertiseSettingsAdvertiseMode(
+ bt_constants.BleAdvertiseSettingsMode.LOW_LATENCY)
+ self._ad.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
+ # Sets TX power level to High.
+ self._ad.sl4a.bleSetAdvertiseSettingsTxPowerLevel(
+ bt_constants.BleAdvertiseSettingsTxPower.HIGH)
+ advertise_data = self._ad.sl4a.bleBuildAdvertiseData()
+ advertise_settings = self._ad.sl4a.bleBuildAdvertiseSettings()
+ logging.info('Activating BLE pairing mode on AndroidDevice.')
+ self._ad.sl4a.bleStartBleAdvertising(
+ self.ble_advertise_callback, advertise_data, advertise_settings)
+
+ def deactivate_ble_pairing_mode(self) -> None:
+ """Deactivates BLE pairing mode on an AndroidDevice."""
+ if not self.ble_advertise_callback:
+ self._ad.log.debug('BLE pairing mode is not activated.')
+ return
+ logging.info('Deactivating BLE pairing mode on AndroidDevice.')
+ self._ad.sl4a.bleStopBleAdvertising(self.ble_advertise_callback)
+ self.ble_advertise_callback = None
+
+ def get_bluetooth_mac_address(self) -> str:
+ """Gets Bluetooth mac address of an AndroidDevice."""
+ logging.info('Getting Bluetooth mac address for AndroidDevice.')
+ mac_address = self._ad.sl4a.bluetoothGetLocalAddress()
+ logging.info('Bluetooth mac address of AndroidDevice: %s', mac_address)
+ return mac_address
+
+ def scan_and_get_ble_device_address(
+ self,
+ device_name: str,
+ timeout_sec: float = 30) -> str:
+ """Searchs a BLE device by BLE scanner and returns it's BLE mac address.
+
+ Args:
+ device_name: string, the name of BLE device.
+ timeout_sec: int, number of seconds to wait for finding the advertisement.
+
+ Returns:
+ String of the BLE mac address.
+
+ Raises:
+ ControllerError: Raised if failed to get the BLE device address
+ """
+ filter_list = self._ad.sl4a.bleGenFilterList()
+ scan_settings = self._ad.sl4a.bleBuildScanSetting()
+ scan_callback = self._ad.sl4a.bleGenScanCallback()
+ self._ad.sl4a.bleSetScanFilterDeviceName(device_name)
+ self._ad.sl4a.bleBuildScanFilter(filter_list)
+ self._ad.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
+ try:
+ event = self._ad.ed.pop_event(
+ 'BleScan%sonScanResults' % scan_callback, timeout_sec)
+ except queue.Empty:
+ raise signals.ControllerError(
+ 'Timed out %ds after waiting for phone finding BLE device: %s.' %
+ (timeout_sec, device_name))
+ finally:
+ self._ad.sl4a.bleStopBleScan(scan_callback)
+ return event['data']['Result']['deviceInfo']['address']
+
+ def get_device_name(self) -> str:
+ """Gets Bluetooth device name of an AndroidDevice."""
+ logging.info('Getting Bluetooth device name for AndroidDevice.')
+ device_name = self._ad.sl4a.bluetoothGetLocalName()
+ logging.info('Bluetooth device name of AndroidDevice: %s', device_name)
+ return device_name
+
+ def is_bluetooth_sco_on(self) -> bool:
+ """Checks whether communications use Bluetooth SCO."""
+ cmd = 'dumpsys bluetooth_manager | grep "isBluetoothScoOn"'
+ get_status = self._ad.adb.shell(cmd)
+ if isinstance(get_status, bytes):
+ get_status = get_status.decode()
+ return 'true' in get_status
+
+ def connect_with_profile(
+ self,
+ snd_ad_mac_address: str,
+ profile: BluetoothProfile) -> bool:
+ """Connects with the profile.
+
+ The connection can only be completed after the bluetooth devices are paired.
+ To connected with the profile, the bluetooth connection policy is set to
+ forbidden first and then set to allowed. The paired bluetooth devices will
+ start to make connection. The connection time could be long. The waitting
+ time is set to BT_CONNECTION_WAITING_TIME_SECONDS (currently 10 seconds).
+
+ Args:
+ snd_ad_mac_address: the mac address of the device accepting connection.
+ profile: the profiles to be set
+
+ Returns:
+ The profile connection succeed/fail
+ """
+ if profile == BluetoothProfile.MAP_MCE:
+ self._ad.sl4a.bluetoothMapClientConnect(snd_ad_mac_address)
+ elif profile == BluetoothProfile.PBAP_CLIENT:
+ self.set_profile_policy(
+ snd_ad_mac_address, profile,
+ BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED)
+ self._ad.sl4a.bluetoothPbapClientConnect(snd_ad_mac_address)
+ else:
+ self.set_profile_policy(
+ snd_ad_mac_address, profile,
+ BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN)
+ self.set_profile_policy(
+ snd_ad_mac_address, profile,
+ BluetoothConnectionPolicy.CONNECTION_POLICY_ALLOWED)
+ self._ad.sl4a.bluetoothConnectBonded(snd_ad_mac_address)
+ time.sleep(BT_CONNECTION_WAITING_TIME_SECONDS)
+ is_connected = self._is_profile_connected(snd_ad_mac_address, profile)
+ self.log.info('The connection between %s and %s for profile %s succeed: %s',
+ self.serial, snd_ad_mac_address, profile, is_connected)
+ return is_connected
+
+ def connect_to_snd_with_profile(
+ self,
+ snd_ad: AndroidDevice,
+ profile: BluetoothProfile,
+ attempts: int = 5) -> bool:
+ """Connects pri android device to snd android device with profile.
+
+ Args:
+ snd_ad: android device accepting connection
+ profile: the profile to be connected
+ attempts: Number of attempts to try until failure.
+
+ Returns:
+ Boolean of connecting result
+ """
+ pri_ad = self._ad
+ curr_attempts = 0
+ snd_ad_mac_address = snd_ad.sl4a.bluetoothGetLocalAddress()
+ if not self.is_bt_paired(snd_ad_mac_address):
+ self.log.error('Devices %s and %s not paired before connecting',
+ self.serial, snd_ad.serial)
+ return False
+ while curr_attempts < attempts:
+ curr_attempts += 1
+ self.log.info('Connection of profile %s at curr attempt %d (total %d)',
+ profile, curr_attempts, attempts)
+ if self.connect_with_profile(snd_ad_mac_address, profile):
+ self.log.info('Connection between devices %s and %s succeeds at %d try',
+ pri_ad.serial, snd_ad.serial, curr_attempts)
+ return True
+ self.log.error('Connection of profile %s failed after %d attempts', profile,
+ attempts)
+ return False
+
+ def is_bt_paired(self, mac_address: str) -> bool:
+ """Check if the bluetooth device with mac_address is paired to ad.
+
+ Args:
+ mac_address: the mac address of the bluetooth device for pairing
+
+ Returns:
+ True if they are paired
+ """
+ bonded_info = self._ad.sl4a.bluetoothGetBondedDevices()
+ return mac_address in [info['address'] for info in bonded_info]
+
+ def is_a2dp_sink_connected(self, mac_address: str) -> bool:
+ """Checks if the Android device connects to a A2DP sink device.
+
+ Args:
+ mac_address: String, Bluetooth MAC address of the A2DP sink device.
+
+ Returns:
+ True if connected else False.
+ """
+ connected_devices = self._ad.sl4a.bluetoothA2dpGetConnectedDevices()
+ return mac_address in [d['address'] for d in connected_devices]
+
+ def hfp_connect(self, ag_ad: AndroidDevice) -> bool:
+ """Hfp connecting hf android device to ag android device.
+
+ The android device should support the Headset Client profile. For example,
+ the android device with git_master-bds-dev build.
+
+ Args:
+ ag_ad: Audio Gateway (ag) android device
+
+ Returns:
+ Boolean of connecting result
+ """
+ return self.connect_to_snd_with_profile(ag_ad,
+ BluetoothProfile.HEADSET_CLIENT)
+
+ def a2dp_sink_connect(self, src_ad: AndroidDevice) -> bool:
+ """Connects pri android device to secondary android device.
+
+ The android device should support the A2dp Sink profile. For example, the
+ android device with git_master-bds-dev build.
+
+ Args:
+ src_ad: A2dp source android device
+
+ Returns:
+ Boolean of connecting result
+ """
+ return self.connect_to_snd_with_profile(src_ad, BluetoothProfile.A2DP_SINK)
+
+ def map_connect(self, map_ad: AndroidDevice) -> bool:
+ """Connects primary device to secondary device via MAP MCE profile.
+
+ The primary device should support the MAP MCE profile. For example,
+ the android device with git_master-bds-dev build.
+
+ Args:
+ map_ad: AndroidDevice, a android device supporting MAP profile.
+
+ Returns:
+ Boolean of connecting result
+ """
+ return self.connect_to_snd_with_profile(map_ad,
+ BluetoothProfile.MAP_MCE)
+
+ def map_disconnect(self, bluetooth_address: str) -> bool:
+ """Disconnects a MAP MSE device with specified Bluetooth MAC address.
+
+ Args:
+ bluetooth_address: a connected device's bluetooth address.
+
+ Returns:
+ True if the device is disconnected else False.
+ """
+ self._ad.sl4a.bluetoothMapClientDisconnect(bluetooth_address)
+ return bt_test_utils.wait_until(
+ timeout_sec=COMMON_TIMEOUT_SECONDS,
+ condition_func=self._is_profile_connected,
+ func_args=[bluetooth_address, BluetoothProfile.MAP_MCE],
+ expected_value=False)
+
+ def pbap_connect(self, pbap_ad: AndroidDevice) -> bool:
+ """Connects primary device to secondary device via PBAP client profile.
+
+ The primary device should support the PBAP client profile. For example,
+ the android device with git_master-bds-dev build.
+
+ Args:
+ pbap_ad: AndroidDevice, a android device supporting PBAP profile.
+
+ Returns:
+ Boolean of connecting result
+ """
+ return self.connect_to_snd_with_profile(pbap_ad,
+ BluetoothProfile.PBAP_CLIENT)
+
+ def set_bluetooth_tethering(self, status_enabled: bool) -> None:
+ """Sets Bluetooth tethering to be specific status.
+
+ Args:
+ status_enabled: Bool, Bluetooth tethering will be set to enable if True,
+ else disable.
+ """
+ if self._ad.sl4a.bluetoothPanIsTetheringOn() == status_enabled:
+ self._ad.log.info('Already %s Bluetooth tethering.' %
+ ('enabled' if status_enabled else 'disabled'))
+ return
+
+ self._ad.log.info('%s Bluetooth tethering.' %
+ ('Enable' if status_enabled else 'Disable'))
+ self._ad.sl4a.bluetoothPanSetBluetoothTethering(status_enabled)
+
+ bt_test_utils.wait_until(
+ timeout_sec=COMMON_TIMEOUT_SECONDS,
+ condition_func=self._ad.sl4a.bluetoothPanIsTetheringOn,
+ func_args=[],
+ expected_value=status_enabled,
+ exception=signals.ControllerError(
+ 'Failed to %s Bluetooth tethering.' %
+ ('enable' if status_enabled else 'disable')))
+
+ def set_profile_policy(
+ self,
+ snd_ad_mac_address: str,
+ profile: BluetoothProfile,
+ policy: BluetoothConnectionPolicy) -> None:
+ """Sets policy of the profile car related profiles to OFF.
+
+ This avoids autoconnect being triggered randomly. The use of this function
+ is encouraged when you're testing individual profiles in isolation.
+
+ Args:
+ snd_ad_mac_address: the mac address of the device accepting connection.
+ profile: the profiles to be set
+ policy: the policy value to be set
+ """
+ pri_ad = self._ad
+ pri_ad_local_name = pri_ad.sl4a.bluetoothGetLocalName()
+ pri_ad.log.info('Sets profile %s on %s for %s to policy %s', profile,
+ pri_ad_local_name, snd_ad_mac_address, policy)
+ if profile == BluetoothProfile.A2DP:
+ pri_ad.sl4a.bluetoothA2dpSetPriority(snd_ad_mac_address, policy.value)
+ elif profile == BluetoothProfile.A2DP_SINK:
+ pri_ad.sl4a.bluetoothA2dpSinkSetPriority(snd_ad_mac_address, policy.value)
+ elif profile == BluetoothProfile.HEADSET_CLIENT:
+ pri_ad.sl4a.bluetoothHfpClientSetPriority(snd_ad_mac_address,
+ policy.value)
+ elif profile == BluetoothProfile.PBAP_CLIENT:
+ pri_ad.sl4a.bluetoothPbapClientSetPriority(snd_ad_mac_address,
+ policy.value)
+ elif profile == BluetoothProfile.HID_HOST:
+ pri_ad.sl4a.bluetoothHidSetPriority(snd_ad_mac_address, policy.value)
+ else:
+ pri_ad.log.error('Profile %s not yet supported for policy settings',
+ profile)
+
+ def set_profiles_policy(
+ self,
+ snd_ad: AndroidDevice,
+ profile_list: Sequence[BluetoothProfile],
+ policy: BluetoothConnectionPolicy) -> None:
+ """Sets the policy of said profile(s) on pri_ad for snd_ad.
+
+ Args:
+ snd_ad: android device accepting connection
+ profile_list: list of the profiles to be set
+ policy: the policy to be set
+ """
+ mac_address = snd_ad.sl4a.bluetoothGetLocalAddress()
+ for profile in profile_list:
+ self.set_profile_policy(mac_address, profile, policy)
+
+ def set_profiles_policy_off(
+ self,
+ snd_ad: AndroidDevice,
+ profile_list: Sequence[BluetoothProfile]) -> None:
+ """Sets policy of the profiles to OFF.
+
+ This avoids autoconnect being triggered randomly. The use of this function
+ is encouraged when you're testing individual profiles in isolation
+
+ Args:
+ snd_ad: android device accepting connection
+ profile_list: list of the profiles to be turned off
+ """
+ self.set_profiles_policy(
+ snd_ad, profile_list,
+ BluetoothConnectionPolicy.CONNECTION_POLICY_FORBIDDEN)
+
+ def wait_for_call_state(
+ self,
+ call_state: Union[int, CallState],
+ timeout_sec: float,
+ wait_interval: int = 3) -> bool:
+ """Waits for call state of the device to be changed.
+
+ Args:
+ call_state: int, the expected call state. Call state values are:
+ 0: IDLE
+ 1: RINGING
+ 2: OFFHOOK
+ timeout_sec: int, number of seconds of expiration time
+ wait_interval: int, number of seconds of waiting in each cycle
+
+ Returns:
+ True if the call state has been changed else False.
+ """
+ # TODO(user): Force external call to use CallState instead of int
+ if isinstance(call_state, CallState):
+ call_state = call_state.value
+ expiration_time = time.time() + timeout_sec
+ which_cycle = 1
+ while time.time() < expiration_time:
+ # Waits for the call state change in every cycle.
+ time.sleep(wait_interval)
+ self._ad.log.info(
+ 'in cycle %d of waiting for call state %d', which_cycle, call_state)
+ if call_state == self._ad.mbs.getTelephonyCallState():
+ return True
+ self._ad.log.info('The call state did not change to %d before timeout',
+ call_state)
+ return False
+
+ def play_audio_file_with_google_play_music(self) -> None:
+ """Plays an audio file on an AndroidDevice with Google Play Music app.
+
+ Returns:
+ None
+ """
+ try:
+ self._ad.aud.add_watcher('LOGIN').when(text='SKIP').click(text='SKIP')
+ self._ad.aud.add_watcher('NETWORK').when(text='Server error').click(
+ text='OK')
+ self._ad.aud.add_watcher('MENU').when(text='Settings').click(
+ text='Listen Now')
+ except adb_ui.Error:
+ logging.info('The watcher has been added.')
+ self._ad.sl4a.appLaunch('com.google.android.music')
+ if self._ad.aud(text='No Music available').exists(10):
+ self._ad.reboot()
+ self._ad.sl4a.appLaunch('com.google.android.music')
+ self._ad.aud(
+ resource_id='com.google.android.music:id/li_thumbnail_frame').click()
+ time.sleep(6) # Wait for audio playback to reach steady state
+
+ def add_call_log(
+ self,
+ call_log_type: Union[int, CallLogType],
+ phone_number: str,
+ call_time: int) -> None:
+ """Add call number and time to specified log.
+
+ Args:
+ call_log_type: int, number of call log type. Call log type values are:
+ 1: Incoming call
+ 2: Outgoing call
+ 3: Missed call
+ phone_number: string, phone number to be added in call log.
+ call_time: int, call time to be added in call log.
+
+ Returns:
+ None
+ """
+ # TODO(user): Force external call to use CallLogType instead of int
+ if isinstance(call_log_type, CallLogType):
+ call_log_type = call_log_type.value
+ new_call_log = {}
+ new_call_log['type'] = str(call_log_type)
+ new_call_log['number'] = phone_number
+ new_call_log['time'] = str(call_time)
+ self._ad.sl4a.callLogsPut(new_call_log)
+
+ def get_call_volume(self) -> int:
+ """Gets current call volume of an AndroidDevice when Bluetooth SCO On.
+
+ Returns:
+ An integer specifying the number of current call volume level.
+ """
+ cmd = 'dumpsys audio | grep "STREAM_BLUETOOTH_SCO" | tail -1'
+ out = self._ad.adb.shell(cmd).decode()
+ # TODO(user): Should we handle the case that re.search(...) return None
+ # below?
+ pattern = r'(?<=SCO index:)[\d]+'
+ return int(re.search(pattern, out).group())
+
+ def make_phone_call(
+ self,
+ callee: AndroidDevice,
+ timeout_sec: float = 30) -> None:
+ """Make a phone call to callee and check if callee is ringing.
+
+ Args:
+ callee: AndroidDevice, The callee in the phone call.
+ timeout_sec: int, number of seconds to wait for the callee ringing.
+
+ Raises:
+ TestError
+ """
+ self._ad.sl4a.telecomCallNumber(callee.dimensions['phone_number'])
+ is_ringing = callee.wait_for_call_state(bt_constants.CALL_STATE_RINGING,
+ timeout_sec)
+ if not is_ringing:
+ raise signals.TestError(
+ 'Timed out after %ds waiting for call state: RINGING' % timeout_sec)
+
+ def wait_for_disconnection_success(
+ self,
+ mac_address: str,
+ timeout: float = 30) -> float:
+ """Waits for a device to connect with the AndroidDevice.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ timeout: Number of seconds to wait for the devices to connect.
+
+ Returns:
+ connection_time: The time it takes to connect in seconds.
+
+ Raises:
+ ControllerError
+ """
+ start_time = time.time()
+ end_time = start_time + timeout
+ while time.time() < end_time:
+ if not self._ad.sl4a.bluetoothIsDeviceConnected(mac_address):
+ disconnection_time = (time.time() - start_time)
+ logging.info('Disconnected device %s in %d seconds', mac_address,
+ disconnection_time)
+ return disconnection_time
+
+ raise signals.ControllerError(
+ 'Failed to disconnect device within %d seconds.' % timeout)
+
+ def first_pair_and_connect_bluetooth(self, bt_device: BtDevice) -> None:
+ """Pairs and connects an AndroidDevice with a Bluetooth device.
+
+ This method does factory reset bluetooth first and then pairs and connects
+ the devices.
+
+ Args:
+ bt_device: The peripheral Bluetooth device or an AndroidDevice.
+
+ Returns:
+ None
+ """
+ bt_device.factory_reset_bluetooth()
+ mac_address = bt_device.get_bluetooth_mac_address()
+ bt_device.activate_pairing_mode()
+ self.pair_and_connect_bluetooth(mac_address)
+
+ def get_device_time(self) -> str:
+ """Get device epoch time and transfer to logcat timestamp format.
+
+ Returns:
+ String of the device time.
+ """
+ return self._ad.adb.shell(
+ 'date +"%m-%d %H:%M:%S.000"').decode().splitlines()[0]
+
+ def logcat_filter(
+ self,
+ start_time: str,
+ text_filter: str = '') -> str:
+ """Returns logcat after a given time.
+
+ This method calls from the android_device logcat service file and filters
+ all logcat line prior to the start_time.
+
+ Args:
+ start_time: start time in string format of _DATETIME_FMT.
+ text_filter: only return logcat lines that include this string.
+
+ Returns:
+ A logcat output.
+
+ Raises:
+ ValueError Exception if start_time is invalid format.
+ """
+ try:
+ start_time_conv = datetime.datetime.strptime(start_time, _DATETIME_FMT)
+ except ValueError as ex:
+ logging.error('Invalid time format!')
+ raise ex
+ logcat_response = ''
+ with open(self._ad.adb_logcat_file_path, 'r', errors='replace') \
+ as logcat_file:
+ post_start_time = False
+ for line in logcat_file:
+ match = self.regex_logcat_time.match(line)
+ if match:
+ if (datetime.datetime.strptime(
+ match.group('datetime'), _DATETIME_FMT) >= start_time_conv):
+ post_start_time = True
+ if post_start_time and line.find(text_filter) >= 0:
+ logcat_response += line
+ return logcat_response
+
+ def logcat_filter_message(
+ self,
+ current_time: str,
+ text: str = '') -> str:
+ """DEPRECATED Builds the logcat command.
+
+ This method builds the logcat command to check for a specified log
+ message after the specified time. If text=None, the logcat returned will be
+ unfiltered.
+
+ Args:
+ current_time: time cutoff for grepping for the specified
+ message, format = ('%m-%d %H:%M:%S.000').
+ text: text to search for.
+
+ Returns:
+ The response of the logcat filter.
+ """
+ return self.logcat_filter(current_time, text)
+
+ def send_media_passthrough_cmd(
+ self,
+ command: str,
+ event_receiver: Optional[AndroidDevice] = None) -> None:
+ """Sends a media passthrough command.
+
+ Args:
+ command: string, media passthrough command.
+ event_receiver: AndroidDevice, a device which starts
+ BluetoothSL4AAudioSrcMBS.
+
+ Raises:
+ signals.ControllerError: raised if the event is not received.
+ """
+ self._ad.log.info('Sending Media Passthough: %s' % command)
+ self._ad.sl4a.bluetoothMediaPassthrough(command)
+ try:
+ if not event_receiver:
+ event_receiver = self._ad
+ event_receiver.ed.pop_event(MEDIA_CMD_MAP[command],
+ MEDIA_EVENT_TIMEOUT_SEC)
+ except queue.Empty:
+ raise signals.ControllerError(
+ 'Device "%s" failed to receive the event "%s" '
+ 'when the command "%s" was sent.' %
+ (event_receiver.serial, MEDIA_CMD_MAP[command], command))
+
+ def pause(self) -> None:
+ """Sends the AVRCP command "pause"."""
+ self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE)
+
+ def play(self) -> None:
+ """Sends the AVRCP command "play"."""
+ self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY)
+
+ def track_previous(self) -> None:
+ """Sends the AVRCP command "skipPrev"."""
+ self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_SKIP_PREV)
+
+ def track_next(self) -> None:
+ """Sends the AVRCP command "skipNext"."""
+ self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_SKIP_NEXT)
+
+ def get_current_track_info(self) -> Dict[str, Any]:
+ """Returns Dict (Media metadata) representing the current track."""
+ return self._ad.sl4a.bluetoothMediaGetCurrentMediaMetaData()
+
+ def get_current_playback_state(self) -> int:
+ """Returns Integer representing the current playback state."""
+ return self._ad.sl4a.bluetoothMediaGetCurrentPlaybackState()['state']
+
+ def verify_playback_state_changed(
+ self,
+ expected_state: str,
+ exception: Optional[Exception] = None) -> bool:
+ """Verifies the playback state is changed to be the expected state.
+
+ Args:
+ expected_state: string, the changed state as expected.
+ exception: Exception, raised when the state is not changed if needed.
+ """
+ bt_test_utils.wait_until(
+ timeout_sec=MEDIA_UPDATE_TIMEOUT_SEC,
+ condition_func=self.get_current_playback_state,
+ func_args=[],
+ expected_value=expected_state,
+ exception=exception)
+
+ def verify_current_track_changed(
+ self,
+ expected_track: str,
+ exception: Optional[Exception] = None) -> bool:
+ """Verifies the Now playing track is changed to be the expected track.
+
+ Args:
+ expected_track: string, the changed track as expected.
+ exception: Exception, raised when the track is not changed if needed.
+ """
+ bt_test_utils.wait_until(
+ timeout_sec=MEDIA_UPDATE_TIMEOUT_SEC,
+ condition_func=self.get_current_track_info,
+ func_args=[],
+ expected_value=expected_track,
+ exception=exception)
+
+ def verify_avrcp_event(
+ self,
+ event_name: AvrcpEvent,
+ check_time: str,
+ timeout_sec: float = 20) -> bool:
+ """Verifies that an AVRCP event was received by an AndroidDevice.
+
+ Checks logcat to verify that an AVRCP event was received after a given
+ time.
+
+ Args:
+ event_name: enum, AVRCP event name. Currently supports play, pause,
+ track_previous, and track_next.
+ check_time: string, The earliest desired cutoff time to check the logcat.
+ Must be in format '%m-%d %H:%M:%S.000'. Use
+ datetime.datetime.now().strftime('%m-%d %H:%M:%S.%f') to get current time
+ in this format.
+ timeout_sec: int, Number of seconds to wait for the specified AVRCP event
+ be found in logcat.
+
+ Raises:
+ TestError
+
+ Returns:
+ True if the event was received.
+ """
+ avrcp_events = [
+ 'State:NOT_PLAYING->PLAYING', 'State:PLAYING->NOT_PLAYING',
+ 'sendMediaKeyEvent: keyEvent=76', 'sendMediaKeyEvent: keyEvent=75'
+ ]
+ if event_name.value not in avrcp_events:
+ raise signals.TestError('An unexpected AVRCP event is specified.')
+
+ end_time = time.time() + timeout_sec
+ while time.time() < end_time:
+ if self.logcat_filter_message(check_time, event_name.value):
+ logging.info('%s event received successfully.', event_name)
+ return True
+ time.sleep(1)
+ logging.error('AndroidDevice failed to receive %s event.', event_name)
+ logging.info('Logcat:\n%s', self.logcat_filter_message(check_time))
+ return False
+
+ def add_google_account(self, retries: int = 5) -> bool:
+ """Login Google account.
+
+ Args:
+ retries: int, the number of retries.
+
+ Returns:
+ True if account is added successfully.
+
+ Raises:
+ TestError
+ """
+ for _ in range(retries):
+ output = self._ad.adb.shell(
+ 'am instrument -w -e account "%s" -e password '
+ '"%s" -e sync true -e wait-for-checkin false '
+ 'com.google.android.tradefed.account/.AddAccount' %
+ (self._ad.dimensions['google_account'],
+ self._ad.dimensions['google_account_password'])).decode()
+ if 'result=SUCCESS' in output:
+ logging.info('Google account is added successfully')
+ time.sleep(3) # Wait for account to steady state
+ return True
+ raise signals.TestError('Failed to add google account: %s' % output)
+
+ def remove_google_account(self, retries: int = 5) -> bool:
+ """Remove Google account.
+
+ Args:
+ retries: int, the number of retries.
+
+ Returns:
+ True if account is removed successfully.
+
+ Raises:
+ TestError
+ """
+ for _ in range(retries):
+ output = self._ad.adb.shell(
+ 'am instrument -w com.google.android.tradefed.account/.RemoveAccounts'
+ ).decode()
+ if 'result=SUCCESS' in output:
+ logging.info('Google account is removed successfully')
+ return True
+ time.sleep(1) # Buffer between retries.
+ raise signals.TestError('Failed to remove google account: %s' % output)
+
+ def make_hangouts_voice_call(self, callee: AndroidDevice) -> None:
+ """Make Hangouts VOIP voice call.
+
+ Args:
+ callee: Android Device, the android device of callee.
+
+ Returns:
+ None
+ """
+ try:
+ self._ad.aud.add_watcher('SETUP').when(text='SKIP').click(text='SKIP')
+ self._ad.aud.add_watcher('REMINDER').when(text='Got it').click(
+ text='Got it')
+ except adb_ui.Error:
+ # TODO(user): Need to figure out the logic here why use info in
+ # exception catch block instead of warning/error
+ logging.info('The watcher has been added.')
+ self._ad.sl4a.appLaunch('com.google.android.talk')
+ callee.sl4a.appLaunch('com.google.android.talk')
+ # Make voice call to callee
+ try:
+ # Click the callee icon
+ self._ad.aud(resource_id='com.google.android.talk:id/avatarView').click()
+ except adb_ui.Error:
+ # Press BACK key twice and re-launch Hangouts if it is not in main page
+ for _ in range(2):
+ self._ad.aud.send_key_code(4)
+ self._ad.sl4a.appLaunch('com.google.android.talk')
+ self._ad.aud(resource_id='com.google.android.talk:id/avatarView').click()
+ # Click the button to make a voice call
+ self._ad.aud(content_desc='Call').click()
+ # Answer by callee
+ if callee.aud(text='Answer').exists(5):
+ callee.aud(text='Answer').click()
+ else:
+ callee.aud(content_desc='Join voice call').click()
+
+ def hang_up_hangouts_call(self) -> None:
+ """Hang up Hangouts VOIP voice call.
+
+ Returns:
+ None
+ """
+ # Click the in call icon to show the end call button
+ self._ad.aud(
+ resource_id='com.google.android.talk:id/in_call_main_avatar').click()
+ # Click the button to hang up call
+ self._ad.aud(content_desc='Hang up').click()
+ time.sleep(3) # Wait for VoIP call state to reach idle state
+
+ def detect_and_pull_ssrdump(self, ramdump_type: str = 'ramdump_bt') -> bool:
+ """Detect and pull RAMDUMP log.
+
+ Args:
+ ramdump_type: str, the partial of file names to search for in ramdump
+ files path. 'ramdump_bt' is used for searching Bluetooth ramdump log
+ files.
+
+ Returns:
+ True if there is a file with file name matching the ramdump type.
+ """
+ files = self._ad.adb.shell('ls %s' % bt_constants.RAMDUMP_PATH).decode()
+ if ramdump_type in files:
+ logging.info('RAMDUMP is found.')
+ log_name_timestamp = mobly_logger.get_log_file_timestamp()
+ destination = os.path.join(self._ad.log_path, 'RamdumpLogs',
+ log_name_timestamp)
+ utils.create_dir(destination)
+ self._ad.adb.pull([bt_constants.RAMDUMP_PATH, destination])
+ return True
+ return False
+
+ def get_bt_num_of_crashes(self) -> int:
+ """Get number of Bluetooth crash times from bluetooth_manager.
+
+ Returns:
+ Number of Bluetooth crashed times.
+ """
+ out = self._regex_bt_crash.search(
+ self._ad.adb.shell('dumpsys bluetooth_manager').decode())
+ # TODO(user): Need to consider the case "out=None" when miss in
+ # matching
+ return int(out.group('num_bt_crashes'))
+
+ def clean_ssrdump(self) -> None:
+ """Clean RAMDUMP log.
+
+ Returns:
+ None
+ """
+ self._ad.adb.shell('rm -rf %s/*' % bt_constants.RAMDUMP_PATH)
+
+ def set_target(self, bt_device: BtDevice) -> None:
+ """Allows for use to get target device object for target interaction."""
+ self._target_device = bt_device
+
+ def wait_for_hsp_connection_state(self,
+ mac_address: str,
+ connected: bool,
+ timeout_sec: float = 30) -> bool:
+ """Waits for HSP connection to be in a expected state on Android device.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ connected: True if HSP connection state is connected as expected.
+ timeout_sec: Number of seconds to wait for HSP connection state change.
+ """
+ expected_state = BluetoothConnectionStatus.STATE_DISCONNECTED
+ if connected:
+ expected_state = BluetoothConnectionStatus.STATE_CONNECTED
+ bt_test_utils.wait_until(
+ timeout_sec=timeout_sec,
+ condition_func=self._ad.sl4a.bluetoothHspGetConnectionStatus,
+ func_args=[mac_address],
+ expected_value=expected_state,
+ exception=signals.TestError(
+ 'Failed to %s the device "%s" within %d seconds via HSP.' %
+ ('connect' if connected else 'disconnect', mac_address,
+ timeout_sec)))
+
+ def wait_for_bluetooth_toggle_state(self,
+ enabled: bool = True,
+ timeout_sec: float = 30) -> bool:
+ """Waits for Bluetooth to be in an expected state.
+
+ Args:
+ enabled: True if Bluetooth status is enabled as expected.
+ timeout_sec: Number of seconds to wait for Bluetooth to be in the expected
+ state.
+ """
+ bt_test_utils.wait_until(
+ timeout_sec=timeout_sec,
+ condition_func=self._ad.mbs.btIsEnabled,
+ func_args=[],
+ expected_value=enabled,
+ exception=signals.TestError(
+ 'Bluetooth is not %s within %d seconds on the device "%s".' %
+ ('enabled' if enabled else 'disabled', timeout_sec,
+ self._ad.serial)))
+
+ def wait_for_a2dp_connection_state(self,
+ mac_address: str,
+ connected: bool,
+ timeout_sec: float = 30) -> bool:
+ """Waits for A2DP connection to be in a expected state on Android device.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ connected: True if A2DP connection state is connected as expected.
+ timeout_sec: Number of seconds to wait for A2DP connection state change.
+ """
+ bt_test_utils.wait_until(
+ timeout_sec=timeout_sec,
+ condition_func=self.is_a2dp_sink_connected,
+ func_args=[mac_address],
+ expected_value=connected,
+ exception=signals.TestError(
+ 'Failed to %s the device "%s" within %d seconds via A2DP.' %
+ ('connect' if connected else 'disconnect', mac_address,
+ timeout_sec)))
+
+ def wait_for_nap_service_connection(
+ self,
+ connected_mac_addr: str,
+ state_connected: bool,
+ exception: Exception) -> bool:
+ """Waits for NAP service connection to be expected state.
+
+ Args:
+ connected_mac_addr: String, Bluetooth Mac address is needed to be checked.
+ state_connected: Bool, NAP service connection is established as expected
+ if True, else terminated as expected.
+ exception: Exception, Raised if NAP service connection is not expected
+ state.
+
+ Raises:
+ exception: Raised if NAP service connection is not expected state.
+ """
+ def is_device_connected():
+ """Returns True if connected else False."""
+ connected_devices = self._ad.sl4a.bluetoothPanGetConnectedDevices()
+ # Check if the Bluetooth mac address is in the connected device list.
+ return connected_mac_addr in [d['address'] for d in connected_devices]
+
+ bt_test_utils.wait_until(
+ timeout_sec=bt_constants.NAP_CONNECTION_TIMEOUT_SECS,
+ condition_func=is_device_connected,
+ func_args=[],
+ expected_value=state_connected,
+ exception=exception)
+
+ def verify_internet(self,
+ allow_access: bool,
+ exception: Exception,
+ test_url: str = TEST_URL,
+ interval_sec: int = PING_INTERVAL_TIME_SEC,
+ timeout_sec: float = PING_TIMEOUT_SEC) -> bool:
+ """Verifies that internet is in expected state.
+
+ Continuously make ping request to a URL for internet verification.
+
+ Args:
+ allow_access: Bool, Device can have internet access as expected if True,
+ else no internet access as expected.
+ exception: Exception, Raised if internet is not in expected state.
+ test_url: String, A URL is used to verify internet by ping request.
+ interval_sec: Int, Interval time between ping requests in second.
+ timeout_sec: Int, Number of seconds to wait for ping success if
+ allow_access is True else wait for ping failure if allow_access is
+ False.
+
+ Raises:
+ exception: Raised if internet is not in expected state.
+ """
+ self._ad.log.info('Verify that internet %s be used.' %
+ ('can' if allow_access else 'can not'))
+
+ def http_ping():
+ """Returns True if http ping success else False."""
+ try:
+ return bool(self._ad.sl4a.httpPing(test_url))
+ except jsonrpc_client_base.ApiError as e:
+ # ApiError is raised by httpPing() when no internet.
+ self._ad.log.debug(str(e))
+ return False
+
+ bt_test_utils.wait_until(
+ timeout_sec=timeout_sec,
+ condition_func=http_ping,
+ func_args=[],
+ expected_value=allow_access,
+ exception=exception,
+ interval_sec=interval_sec)
+
+ def allow_extra_permissions(self) -> None:
+ """A method to allow extra permissions.
+
+ This method has no any logics. It is used to skip the operation when it is
+ called if a test is not Wear OS use case.
+ """
+
+ def goto_bluetooth_device_details(self) -> None:
+ """Goes to bluetooth device detail page."""
+ self._ad.adb.shell('am force-stop com.android.settings')
+ self._ad.adb.shell('am start -a android.settings.BLUETOOTH_SETTINGS')
+ self._ad.aud(
+ resource_id='com.android.settings:id/settings_button').click()
+
+ def bluetooth_ui_forget_device(self) -> None:
+ """Clicks the forget device button."""
+ self.goto_bluetooth_device_details()
+ self._ad.aud(resource_id='com.android.settings:id/button1').click()
+
+ def bluetooth_ui_disconnect_device(self) -> None:
+ """Clicks the disconnect device button."""
+ self.goto_bluetooth_device_details()
+ self._ad.aud(resource_id='com.android.settings:id/button2').click()
+
+ def _find_bt_device_details_ui_switch(self, switch_name: str):
+ """Returns the UI node for a BT switch.
+
+ Args:
+ switch_name: each switch button name in bluetooth connect device detail
+ page. switch name like 'Phone calls', 'Media audio', etc.
+
+ Returns:
+ adb_ui_device.XML node UI element of the BT each option switch button.
+ """
+ switch_button_name = ('Phone calls',
+ 'Media audio',
+ 'Contact sharing',
+ 'Text Messages'
+ )
+ if switch_name not in switch_button_name:
+ raise ValueError(f'Unknown switch name {switch_name}.')
+ self.goto_bluetooth_device_details()
+ text_node = adb_ui.wait_and_get_xml_node(
+ self._ad, timeout=10, text=switch_name)
+ text_grandparent_node = text_node.parentNode.parentNode
+ switch_node = adb_ui.Selector(
+ resource_id='android:id/switch_widget').find_node(text_grandparent_node)
+ return switch_node
+
+ def get_bt_device_details_ui_switch_state(self, switch_name: str) -> bool:
+ """Gets bluetooth each option switch button state value.
+
+ Args:
+ switch_name: each switch button name in bluetooth connect device detail
+ page.
+
+ Returns:
+ State True or False.
+ """
+ switch_node = self._find_bt_device_details_ui_switch(switch_name)
+ current_state = switch_node.attributes['checked'].value == 'true'
+ return current_state
+
+ def set_bt_device_details_ui_switch_state(
+ self, switch_name: str,
+ target_state: bool) -> None:
+ """Sets and checks the BT each option button is the target enable state.
+
+ Args:
+ switch_name: each switch button name in bluetooth connect device detail
+ page.
+ target_state: The desired state expected from the switch. If the state of
+ the switch already meet expectation, no action will be taken.
+ """
+ if self.get_bt_device_details_ui_switch_state(switch_name) == target_state:
+ return
+ switch_node = self._find_bt_device_details_ui_switch(switch_name)
+ x, y = adb_ui.find_point_in_bounds(switch_node.attributes['bounds'].value)
+ self._ad.aud.click(x, y)
+
+ def get_bt_quick_setting_switch_state(self) -> bool:
+ """Gets bluetooth quick settings switch button state value."""
+ self._ad.open_notification()
+ switch_node = self._ad_aud(class_name='android.widget.Switch', index='1')
+ current_state = switch_node.attributes['content-desc'].value == 'Bluetooth.'
+ return current_state
+
+ def assert_bt_device_details_state(self, target_state: bool) -> None:
+ """Asserts the Bluetooth connection state.
+
+ Asserts the BT each option button in device detail,
+ BT quick setting state and BT manager service from log are at the target
+ state.
+
+ Args:
+ target_state: BT each option button, quick setting and bluetooth manager
+ service target state.
+
+ """
+ for switch_name in ['Phone calls', 'Media audio']:
+ asserts.assert_equal(
+ self._ad.get_bt_device_details_ui_switch_state(switch_name),
+ target_state,
+ f'The BT Media calls switch button state is not {target_state}.')
+ asserts.assert_equal(self._ad.is_service_running(), target_state,
+ f'The BT service state is not {target_state}.')
+ asserts.assert_equal(
+ self._ad.get_bt_quick_setting_switch_state(), target_state,
+ f'The BT each switch button state is not {target_state}.')
+
+ def is_service_running(
+ self,
+ mac_address: str,
+ timeout_sec: float) -> bool:
+ """Checks bluetooth profile state.
+
+ Check bluetooth headset/a2dp profile connection
+ status from bluetooth manager log.
+
+ Args:
+ mac_address: The Bluetooth mac address of the peripheral device.
+ timeout_sec: Number of seconds to wait for the specified message
+ be found in bluetooth manager log.
+
+ Returns:
+ True: If pattern match with bluetooth_manager_log.
+ """
+ pattern_headset = (r'\sm\w+e:\sC\w+d')
+ pattern_a2dp = (r'StateMachine:.*state=Connected')
+ output_headset = self._ad.adb.shell(
+ 'dumpsys bluetooth_manager | egrep -A20 "Profile: HeadsetService"'
+ ).decode()
+ output_a2dp = self._ad.adb.shell(
+ 'dumpsys bluetooth_manager | egrep -A30 "Profile: A2dpService"').decode(
+ )
+ service_type = {
+ 'a2dp': ((pattern_a2dp), (output_a2dp)),
+ 'headset': ((pattern_headset), (output_headset))
+ }
+ start_time = time.time()
+ end_time = start_time + timeout_sec
+ while start_time < end_time:
+ try:
+ match = service_type
+ if match and mac_address in service_type:
+ return True
+ except adb.AdbError as e:
+ logging.exception(e)
+ time.sleep(ADB_WAITING_TIME_SECONDS)
+ return False
+
+ def browse_internet(self, url: str = 'www.google.com') -> None:
+ """Browses internet by Chrome.
+
+ Args:
+ url: web address.
+
+ Raises:
+ signals.TestError: raised if it failed to browse internet by Chrome.
+ """
+ browse_url = (
+ 'am start -n com.android.chrome/com.google.android.apps.chrome.Main -d'
+ ' %s' % url
+ )
+ self._ad.adb.shell(browse_url)
+ self._ad.aud.add_watcher('Welcome').when(
+ text='Accept & continue').click(text='Accept & continue')
+ self._ad.aud.add_watcher('sync page').when(
+ text='No thanks').click(text='No thanks')
+ if self._ad.aud(text='No internet').exists():
+ raise signals.TestError('No connect internet.')
+
+ def connect_wifi_from_other_device_hotspot(
+ self, wifi_hotspot_device: AndroidDevice) -> None:
+ """Turns on 2.4G Wifi hotspot from the other android device and connect on the android device.
+
+ Args:
+ wifi_hotspot_device: Android device, turn on 2.4G Wifi hotspot.
+ """
+ # Turn on 2.4G Wifi hotspot on the secondary phone.
+ wifi_hotspot_device.sl4a.wifiSetWifiApConfiguration(
+ bt_constants.WIFI_HOTSPOT_2_4G)
+ wifi_hotspot_device.sl4a.connectivityStartTethering(0, False)
+ # Connect the 2.4G Wifi on the primary phone.
+ self._ad.mbs.wifiEnable()
+ self._ad.mbs.wifiConnectSimple(
+ bt_constants.WIFI_HOTSPOT_2_4G['SSID'],
+ bt_constants.WIFI_HOTSPOT_2_4G['password'])
diff --git a/system/blueberry/utils/arduino_base.py b/system/blueberry/utils/arduino_base.py
new file mode 100644
index 0000000000..37c9146033
--- /dev/null
+++ b/system/blueberry/utils/arduino_base.py
@@ -0,0 +1,78 @@
+"""Base class for Blueberry controllers using Arduino board.
+
+This module uses pyserial library to communicate with Arduino UNO board.
+
+About Arduino code, please refer to the code of following Arduino project:
+Internal link
+"""
+
+import time
+from mobly.signals import ControllerError
+import serial
+
+
+class ArduinoBase(object):
+ """Implements an Arduino base class.
+
+ Attributes:
+ serial: serial object, a serial object which is used to communicate with
+ Arduino board.
+ """
+
+ def __init__(self, config):
+ """Initializes an Arduino base class."""
+ self._verify_config(config)
+ self.serial = serial.Serial(config['arduino_port'], 9600)
+ self.serial.timeout = 30
+ # Buffer between calling serial.Serial() and serial.Serial.write().
+ time.sleep(2)
+
+ def _verify_config(self, config):
+ """Checks the device config's required config parameters.
+
+ Args:
+ config: dict, Mobly controller config for ArduinoBass. The config should
+ include the key "arduino_port" whose value is a string representing
+ Arduino board name. e.g. /dev/ttyACM0.
+ """
+ if 'arduino_port' not in config:
+ raise ControllerError('Please provide an Arduino board port for the'
+ ' ArduinoBase in Mobile Harness config')
+
+ def _send_string_to_arduino(self, tx_string):
+ """Sends a particular string to communicate with Arduino.
+
+ The method requires that Arduino code can read string which is received from
+ a python serial object and then send the same string to the serial object.
+
+ An example of Arduino code:
+ String kRxString = "";
+ void setup() {
+ ...
+ }
+ void loop() {
+ if (Serial.available() > 0) {
+ kRxString = Serial.readString();
+ ...
+ Serial.write(kRxString.c_str());
+ }
+ }
+
+ Args:
+ tx_string: string, is used to be sent to Arduino port for making the
+ controlled device perform action. After Arduino receives the string, it
+ will send a response which is the same string.
+
+ Returns:
+ The time it takes for waiting a response, in seconds.
+
+ Raises:
+ ControllerError: raised if not received a response from Arduino.
+ """
+ self.serial.write(str.encode(tx_string))
+ start_time = time.time()
+ rx_string = self.serial.read_until(tx_string, len(tx_string)).decode()
+ if rx_string == tx_string:
+ return time.time() - start_time
+ raise ControllerError('Timed out after %ds waiting for the string "%s" from'
+ ' Arduino.' % (self.serial.timeout, tx_string))
diff --git a/system/blueberry/utils/blueberry_base_test.py b/system/blueberry/utils/blueberry_base_test.py
new file mode 100644
index 0000000000..abe19cd323
--- /dev/null
+++ b/system/blueberry/utils/blueberry_base_test.py
@@ -0,0 +1,244 @@
+"""Base test class for Blueberry."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import re
+
+from mobly import base_test
+from mobly import signals
+from mobly.controllers import android_device
+from mobly.controllers.android_device_lib import adb
+
+# Internal import
+# Internal import
+# Internal import
+# Internal import
+from blueberry.controllers import derived_bt_device
+from blueberry.decorators import android_bluetooth_client_decorator
+from blueberry.utils.android_bluetooth_decorator import AndroidBluetoothDecorator
+
+
+class BlueberryBaseTest(base_test.BaseTestClass):
+ """Base test class for all Blueberry tests to inherit from.
+
+ This class assists with device setup for device logging and other pre test
+ setup required for Bluetooth tests.
+ """
+
+ def setup_generated_tests(self):
+ """Generates multiple the same tests for pilot run.
+
+ This is used to let developers can easily pilot run their tests many times,
+ help them to check test stability and reliability. If need to use this,
+ please add a flag called "test_iterations" to TestParams in Mobly test
+ configuration and its value is number of test methods to be generated. The
+ naming rule of test method on Sponge is such as the following example:
+ test_send_file_via_bluetooth_opp_1_of_50
+ test_send_file_via_bluetooth_opp_2_of_50
+ test_send_file_via_bluetooth_opp_3_of_50
+ ...
+ test_send_file_via_bluetooth_opp_50_of_50
+
+ Don't use "test_case_selector" when using "test_iterations", and please use
+ "test_method_selector" to replace it.
+ """
+ test_iterations = int(self.user_params.get('test_iterations', 0))
+ if test_iterations < 2:
+ return
+
+ test_method_selector = self.user_params.get('test_method_selector', 'all')
+ existing_test_names = self.get_existing_test_names()
+
+ selected_test_names = None
+ if test_method_selector == 'all':
+ selected_test_names = existing_test_names
+ else:
+ selected_test_names = test_method_selector.split(' ')
+ # Check if selected test methods exist in the test class.
+ for test_name in selected_test_names:
+ if test_name not in existing_test_names:
+ raise base_test.Error('%s does not have test method "%s".' %
+ (self.TAG, test_name))
+
+ for test_name in selected_test_names:
+ test_method = getattr(self.__class__, test_name)
+ # List of (<new test name>, <test method>).
+ test_arg_sets = [('%s_%s_of_%s' % (test_name, i + 1, test_iterations),
+ test_method) for i in range(test_iterations)]
+ # pylint: disable=cell-var-from-loop
+ self.generate_tests(
+ test_logic=lambda _, test: test(self),
+ name_func=lambda name, _: name,
+ arg_sets=test_arg_sets)
+
+ # Delete origin test methods in order to avoid below situation:
+ # test_send_file_via_bluetooth_opp <-- origin test method
+ # test_send_file_via_bluetooth_opp_1_of_50
+ # test_send_file_via_bluetooth_opp_2_of_50
+ for test_name in existing_test_names:
+ delattr(self.__class__, test_name)
+
+ def setup_class(self):
+ """Setup class is called before running any tests."""
+ super(BlueberryBaseTest, self).setup_class()
+ self.capture_bugreport_on_fail = int(self.user_params.get(
+ 'capture_bugreport_on_fail', 0))
+ self.ignore_device_setup_failures = int(self.user_params.get(
+ 'ignore_device_setup_failures', 0))
+ self.enable_bluetooth_verbose_logging = int(self.user_params.get(
+ 'enable_bluetooth_verbose_logging', 0))
+ self.enable_hci_snoop_logging = int(self.user_params.get(
+ 'enable_hci_snoop_logging', 0))
+ self.increase_logger_buffers = int(self.user_params.get(
+ 'increase_logger_buffers', 0))
+ self.enable_all_bluetooth_logging = int(self.user_params.get(
+ 'enable_all_bluetooth_logging', 0))
+
+ # base test should include the test between primary device with Bluetooth
+ # peripheral device.
+ self.android_devices = self.register_controller(
+ android_device, required=False)
+
+ # In the case of no android_device assigned, at least 2 derived_bt_device
+ # is required.
+ if self.android_devices is None:
+ self.derived_bt_devices = self.register_controller(
+ module=derived_bt_device, min_number=2)
+ else:
+ self.derived_bt_devices = self.register_controller(
+ module=derived_bt_device, required=False)
+
+ if self.derived_bt_devices is None:
+ self.derived_bt_devices = []
+ else:
+ for derived_device in self.derived_bt_devices:
+ derived_device.set_user_params(self.user_params)
+ derived_device.setup()
+
+ self.android_ui_devices = {}
+ # Create a dictionary of Android UI Devices
+ for device in self.android_devices:
+ self.android_ui_devices[device.serial] = AdbUiDevice(device)
+ mobly_config = {'aud': self.android_ui_devices[device.serial]}
+ device.load_config(mobly_config)
+ need_restart_bluetooth = False
+ if (self.enable_bluetooth_verbose_logging or
+ self.enable_all_bluetooth_logging):
+ if self.set_bt_trc_level_verbose(device):
+ need_restart_bluetooth = True
+ if self.enable_hci_snoop_logging or self.enable_all_bluetooth_logging:
+ if self.set_btsnooplogmode_full(device):
+ need_restart_bluetooth = True
+ if self.increase_logger_buffers or self.enable_all_bluetooth_logging:
+ self.set_logger_buffer_size_16m(device)
+
+ # Restarts Bluetooth to take BT VERBOSE and HCI Snoop logging effect.
+ if need_restart_bluetooth:
+ device.log.info('Restarting Bluetooth by airplane mode...')
+ self.restart_bluetooth_by_airplane_mode(device)
+ self.android_devices = [AndroidBluetoothDecorator(device)
+ for device in self.android_devices]
+ for device in self.android_devices:
+ device.set_user_params(self.user_params)
+
+ self.client_decorators = self.user_params.get('sync_decorator', [])
+ if self.client_decorators:
+ self.client_decorators = self.client_decorators.split(',')
+
+ self.target_decorators = self.user_params.get('target_decorator', [])
+ if self.target_decorators:
+ self.target_decorators = self.target_decorators.split(',')
+
+ for decorator in self.client_decorators:
+ self.android_devices[0] = android_bluetooth_client_decorator.decorate(
+ self.android_devices[0], decorator)
+
+ for num_devices in range(1, len(self.android_devices)):
+ for decorator in self.target_decorators:
+ self.android_devices[
+ num_devices] = android_bluetooth_client_decorator.decorate(
+ self.android_devices[num_devices], decorator)
+
+ def on_fail(self, record):
+ """This method is called when a test failure."""
+ # Capture bugreports on fail if enabled.
+ if self.capture_bugreport_on_fail:
+ devices = self.android_devices
+ # Also capture bugreport of AndroidBtTargetDevice.
+ for d in self.derived_bt_devices:
+ if hasattr(d, 'take_bug_report'):
+ devices = devices + [d]
+ android_device.take_bug_reports(
+ devices,
+ record.test_name,
+ record.begin_time,
+ destination=self.current_test_info.output_path)
+
+ def set_logger_buffer_size_16m(self, device):
+ """Sets all logger sizes per log buffer to 16M."""
+ device.log.info('Setting all logger sizes per log buffer to 16M...')
+ # Logger buffer info:
+ # https://developer.android.com/studio/command-line/logcat#alternativeBuffers
+ logger_buffers = ['main', 'system', 'crash', 'radio', 'events', 'kernel']
+ for buffer in logger_buffers: # pylint: disable=redefined-builtin
+ device.adb.shell('logcat -b %s -G 16M' % buffer)
+ buffer_size = device.adb.shell('logcat -b %s -g' % buffer)
+ if isinstance(buffer_size, bytes):
+ buffer_size = buffer_size.decode()
+ if 'ring buffer is 16' in buffer_size:
+ device.log.info('Successfully set "%s" buffer size to 16M.' % buffer)
+ else:
+ msg = 'Failed to set "%s" buffer size to 16M.' % buffer
+ if not self.ignore_device_setup_failures:
+ raise signals.TestError(msg)
+ device.log.warning(msg)
+
+ def set_bt_trc_level_verbose(self, device):
+ """Modifies etc/bluetooth/bt_stack.conf to enable Bluetooth VERBOSE log."""
+ device.log.info('Enabling Bluetooth VERBOSE logging...')
+ bt_stack_conf = device.adb.shell('cat etc/bluetooth/bt_stack.conf')
+ if isinstance(bt_stack_conf, bytes):
+ bt_stack_conf = bt_stack_conf.decode()
+ # Check if 19 trace level settings are set to 6(VERBOSE). E.g. TRC_HCI=6.
+ if len(re.findall('TRC.*=[6]', bt_stack_conf)) == 19:
+ device.log.info('Bluetooth VERBOSE logging has already enabled.')
+ return False
+ # Suggest to use AndroidDeviceSettingsDecorator to disable verity and then
+ # reboot (b/140277443).
+ disable_verity_check(device)
+ device.adb.remount()
+ try:
+ device.adb.shell(r'sed -i "s/\(TRC.*=\)2/\16/g;s/#\(LoggingV=--v=\)0/\13'
+ '/" etc/bluetooth/bt_stack.conf')
+ device.log.info('Successfully enabled Bluetooth VERBOSE Logging.')
+ return True
+ except adb.AdbError:
+ msg = 'Failed to enable Bluetooth VERBOSE Logging.'
+ if not self.ignore_device_setup_failures:
+ raise signals.TestError(msg)
+ device.log.warning(msg)
+ return False
+
+ def set_btsnooplogmode_full(self, device):
+ """Enables bluetooth snoop logging."""
+ device.log.info('Enabling Bluetooth HCI Snoop logging...')
+ device.adb.shell('setprop persist.bluetooth.btsnooplogmode full')
+ out = device.adb.shell('getprop persist.bluetooth.btsnooplogmode')
+ if isinstance(out, bytes):
+ out = out.decode()
+ # The expected output is "full/n".
+ if 'full' in out:
+ device.log.info('Successfully enabled Bluetooth HCI Snoop Logging.')
+ return True
+ msg = 'Failed to enable Bluetooth HCI Snoop Logging.'
+ if not self.ignore_device_setup_failures:
+ raise signals.TestError(msg)
+ device.log.warning(msg)
+ return False
+
+ def restart_bluetooth_by_airplane_mode(self, device):
+ """Restarts bluetooth by airplane mode."""
+ enable_airplane_mode(device, 3)
+ disable_airplane_mode(device, 3)
diff --git a/system/blueberry/utils/bt_audio_utils.py b/system/blueberry/utils/bt_audio_utils.py
new file mode 100644
index 0000000000..a4c2e8e95b
--- /dev/null
+++ b/system/blueberry/utils/bt_audio_utils.py
@@ -0,0 +1,229 @@
+# Lint as: python3
+"""Utils for bluetooth audio testing."""
+
+import logging as log
+import os
+import numpy as np
+from scipy import signal as scipy_signal
+from scipy.io import wavfile
+# Internal import
+# Internal import
+
+
+def generate_sine_wave_to_device(
+ device,
+ pushed_file_path='/sdcard/Music',
+ frequency=480,
+ channel=2,
+ sample_rate=48000,
+ sample_format=16,
+ duration_sec=10):
+ """Generates a fixed frequency sine wave file and push it to the device.
+
+ Generates a sine wave to the Mobly device directory and push it to the device
+ storage. The output file name format is such as the example:
+ sine_480hz_2ch_48000rate_16bit_10sec.wav
+
+ Args:
+ device: AndroidDevice, Mobly Android controller class.
+ pushed_file_path: string, the wave file path which is pushed to the device
+ storage. E.g. /sdcard/Music
+ frequency: int, fixed frequency in Hz.
+ channel: int, number of channels.
+ sample_rate: int, sampling rate in Hz.
+ sample_format: int, sampling format in bit.
+ duration_sec: int, audio duration in second.
+
+ Returns:
+ device_storage_path: string, the wave file on the device storage.
+ mobly_directory_path: string, the wave file on the Mobly device directory.
+ """
+ file_name = 'sine_%dhz_%dch_%drate_%dbit_%dsec.wav' % (
+ frequency, channel, sample_rate, sample_format, duration_sec)
+ mobly_directory_path = os.path.join(device.log_path, file_name)
+ os.system('%s -n -c %d -r %d -b %d %s synth %d sine %d' %
+ (audio_processor.AudioProcessor.SOX, channel, sample_rate,
+ sample_format, mobly_directory_path, duration_sec, frequency))
+ device.adb.push([mobly_directory_path, pushed_file_path])
+ device_storage_path = os.path.join(pushed_file_path, file_name)
+ return device_storage_path, mobly_directory_path
+
+
+def measure_audio_mos(recorded_audio_file, reference_audio_file):
+ """Measures mean opinion score (MOS) of a recorded audio.
+
+ This function uses the module of A/V Analysis Service to measure MOS:
+ Internal reference
+
+ Args:
+ recorded_audio_file: string, the recorded audio file to be measured.
+ reference_audio_file: string, the reference audio file for comparison.
+
+ Returns:
+ Float which is the mean opinion score of the recorded audio.
+ """
+ results = audio_calculator.AudioAnalyzer().Analyze(reference_audio_file,
+ recorded_audio_file)
+ # Returns 0.0 if the results fails to be generated.
+ if not results:
+ log.warning('Failed to generate the audio analysis results.')
+ return 0.0
+ return results[0].mos
+
+
+def measure_fundamental_frequency(signal, sample_rate):
+ """Measures fundamental frequency of a signal.
+
+ Args:
+ signal: An 1-D array representing the signal data.
+ sample_rate: int, sample rate of the signal.
+
+ Returns:
+ Float representing the fundamental frequency.
+ """
+ return sample_rate * (np.argmax(np.abs(np.fft.rfft(signal))) / len(signal))
+
+
+def measure_rms(signal):
+ """Measures Root Mean Square (RMS) of a signal.
+
+ Args:
+ signal: An 1-D array representing the signal data.
+
+ Returns:
+ Float representing the root mean square.
+ """
+ return np.sqrt(np.mean(np.absolute(signal)**2))
+
+
+def measure_thdn(signal, sample_rate, q, frequency=None):
+ """Measures Total Harmonic Distortion + Noise (THD+N) of a signal.
+
+ Args:
+ signal: An 1-D array representing the signal data.
+ sample_rate: int, sample rate of the signal.
+ q: float, quality factor for the notch filter.
+ frequency: float, fundamental frequency of the signal. All other frequencies
+ are noise. If not specified, will be calculated using FFT.
+
+ Returns:
+ Float representing THD+N ratio calculated from the ratio of RMS of pure
+ harmonics and noise signal to RMS of original signal.
+ """
+ # Normalizes the signal.
+ signal -= np.mean(signal)
+ # Gets Blackman-Harris window from the signal.
+ window = signal * scipy_signal.blackmanharris(len(signal))
+ # Finds the fundamental frequency to remove if not specified.
+ if not frequency:
+ frequency = measure_fundamental_frequency(window, sample_rate)
+ # Creates a notch filter to get noise from the signal.
+ wo = frequency / (sample_rate / 2)
+ b, a = scipy_signal.iirnotch(wo, q)
+ noise = scipy_signal.lfilter(b, a, window)
+ return measure_rms(noise) / measure_rms(window)
+
+
+def measure_audio_thdn_per_window(
+ audio_file,
+ thdn_threshold,
+ step_size,
+ window_size,
+ q,
+ frequency=None):
+ """Measures Total Harmonic Distortion + Noise (THD+N) of an audio file.
+
+ This function is used to capture audio glitches from a recorded audio file,
+ and the audio file shall record a fixed frequency sine wave.
+
+ Args:
+ audio_file: A .wav file to be measured.
+ thdn_threshold: float, a THD+N threshold used to compare with the measured
+ THD+N for every windows. If THD+N of a window is greater than the
+ threshold, will record this to results.
+ step_size: int, number of samples to move the window by for each analysis.
+ window_size: int, number of samples to analyze each time.
+ q: float, quality factor for the notch filter.
+ frequency: float, fundamental frequency of the signal. All other frequencies
+ are noise. If not specified, will be calculated using FFT.
+
+ Returns:
+ List containing each result of channels. Like the following structure:
+ ```
+ [
+ [ # result of channel 1
+ {
+ "thd+n": <float>, # THD+N of a window
+ "start_time": <float>, # start time of a window
+ "end_time": <float>, # end time of a window
+ },
+ ...,
+ ],
+ [...,] # result of channel 2
+ ...,
+ ]
+ ```
+ """
+ if step_size <= 0:
+ raise ValueError('step_size shall be greater than 0.')
+ if window_size <= 0:
+ raise ValueError('window_size shall be greater than 0.')
+ sample_rate, wave_data = wavfile.read(audio_file)
+ wave_data = wave_data.astype('float64')
+ # Collects the result for each channels.
+ results = []
+ for signal in wave_data.transpose():
+ current_position = 0
+ channel_result = []
+ while current_position + window_size < len(signal):
+ window = signal[current_position:current_position + window_size]
+ thdn = measure_thdn(
+ signal=window,
+ sample_rate=sample_rate,
+ q=q,
+ frequency=frequency)
+ start_time = current_position / sample_rate
+ end_time = (current_position + window_size) / sample_rate
+ if thdn > thdn_threshold:
+ channel_result.append({
+ 'thd+n': thdn,
+ 'start_time': start_time,
+ 'end_time': end_time
+ })
+ current_position += step_size
+ results.append(channel_result)
+ return results
+
+
+def trim_audio(audio_file: str,
+ duration_sec: float,
+ start_time_sec: float = 0.0) -> str:
+ """Trims an audio file with a specific start time and duration.
+
+ Generates a output file and its name is such as below format:
+ `<input file name>_<start time sec>-<duration sec>.<input file type>`
+
+ Args:
+ audio_file: string, an audio file to be trimed.
+ duration_sec: float, the duration of the output file in seconds.
+ start_time_sec: float, the start time of the audio file to be trimmed in
+ seconds. Default value is 0.0 second if not specified.
+
+ Returns:
+ String, the output file of the same path of the origin file.
+ """
+ file_path, file_name = os.path.split(audio_file)
+ file_name, file_ext = os.path.splitext(file_name)
+ output_file_name = '%s_%s-%s%s' % (
+ file_name,
+ start_time_sec,
+ (start_time_sec + duration_sec),
+ file_ext)
+ output_file = os.path.join(file_path, output_file_name)
+ processor = audio_processor.AudioProcessor()
+ processor.TrimAudio(
+ input_file=audio_file,
+ output_file=output_file,
+ duration=duration_sec,
+ start=start_time_sec)
+ return output_file
diff --git a/system/blueberry/utils/bt_constants.py b/system/blueberry/utils/bt_constants.py
new file mode 100644
index 0000000000..6a94e83edd
--- /dev/null
+++ b/system/blueberry/utils/bt_constants.py
@@ -0,0 +1,191 @@
+# Lint as: python3
+"""Constants used for bluetooth test."""
+
+import enum
+
+
+### Generic Constants Begin ###
+BT_DEFAULT_TIMEOUT_SECONDS = 15
+DEFAULT_RFCOMM_TIMEOUT_MS = 10000
+CALL_STATE_IDLE = 0
+CALL_STATE_RINGING = 1
+CALL_STATE_OFFHOOK = 2
+CALL_STATE_TIMEOUT_SEC = 30
+NAP_CONNECTION_TIMEOUT_SECS = 20
+
+# Call log types.
+INCOMING_CALL_LOG_TYPE = '1'
+OUTGOING_CALL_LOG_TYPE = '2'
+MISSED_CALL_LOG_TYPE = '3'
+
+# Passthrough Commands sent to the RPC Server.
+CMD_MEDIA_PLAY = 'play'
+CMD_MEDIA_PAUSE = 'pause'
+CMD_MEDIA_SKIP_NEXT = 'skipNext'
+CMD_MEDIA_SKIP_PREV = 'skipPrev'
+
+# Events dispatched from the RPC Server.
+EVENT_PLAY_RECEIVED = 'playReceived'
+EVENT_PAUSE_RECEIVED = 'pauseReceived'
+EVENT_SKIP_NEXT_RECEIVED = 'skipNextReceived'
+EVENT_SKIP_PREV_RECEIVED = 'skipPrevReceived'
+
+# A playback state indicating the media session is currently paused.
+STATE_PAUSED = 2
+STATE_PLAYING = 3
+
+# File path
+RAMDUMP_PATH = 'data/vendor/ssrdump'
+
+# UiAutoHelper package name.
+UIAUTO_HELPER_PACKAGE_NAME = 'com.google.android.uiautohelper'
+
+# Test Runner for Android instrumentation test.
+ANDROIDX_TEST_RUNNER = 'androidx.test.runner.AndroidJUnitRunner'
+
+# Wifi hotspot setting
+WIFI_HOTSPOT_2_4G = {'SSID': 'pqmBT', 'password': 'password', 'apBand': 0}
+
+
+class AvrcpEvent(enum.Enum):
+ """Enumeration of AVRCP event types."""
+ PLAY = 'State:NOT_PLAYING->PLAYING'
+ PAUSE = 'State:PLAYING->NOT_PLAYING'
+ TRACK_PREVIOUS = 'sendMediaKeyEvent: keyEvent=76'
+ TRACK_NEXT = 'sendMediaKeyEvent: keyEvent=75'
+
+# Bluetooth RFCOMM UUIDs as defined by the SIG
+BT_RFCOMM_UUIDS = {
+ 'default_uuid': '457807c0-4897-11df-9879-0800200c9a66',
+ 'base_uuid': '00000000-0000-1000-8000-00805F9B34FB',
+ 'sdp': '00000001-0000-1000-8000-00805F9B34FB',
+ 'udp': '00000002-0000-1000-8000-00805F9B34FB',
+ 'rfcomm': '00000003-0000-1000-8000-00805F9B34FB',
+ 'tcp': '00000004-0000-1000-8000-00805F9B34FB',
+ 'tcs_bin': '00000005-0000-1000-8000-00805F9B34FB',
+ 'tcs_at': '00000006-0000-1000-8000-00805F9B34FB',
+ 'att': '00000007-0000-1000-8000-00805F9B34FB',
+ 'obex': '00000008-0000-1000-8000-00805F9B34FB',
+ 'ip': '00000009-0000-1000-8000-00805F9B34FB',
+ 'ftp': '0000000A-0000-1000-8000-00805F9B34FB',
+ 'http': '0000000C-0000-1000-8000-00805F9B34FB',
+ 'wsp': '0000000E-0000-1000-8000-00805F9B34FB',
+ 'bnep': '0000000F-0000-1000-8000-00805F9B34FB',
+ 'upnp': '00000010-0000-1000-8000-00805F9B34FB',
+ 'hidp': '00000011-0000-1000-8000-00805F9B34FB',
+ 'hardcopy_control_channel': '00000012-0000-1000-8000-00805F9B34FB',
+ 'hardcopy_data_channel': '00000014-0000-1000-8000-00805F9B34FB',
+ 'hardcopy_notification': '00000016-0000-1000-8000-00805F9B34FB',
+ 'avctp': '00000017-0000-1000-8000-00805F9B34FB',
+ 'avdtp': '00000019-0000-1000-8000-00805F9B34FB',
+ 'cmtp': '0000001B-0000-1000-8000-00805F9B34FB',
+ 'mcap_control_channel': '0000001E-0000-1000-8000-00805F9B34FB',
+ 'mcap_data_channel': '0000001F-0000-1000-8000-00805F9B34FB',
+ 'l2cap': '00000100-0000-1000-8000-00805F9B34FB'
+}
+
+
+class BluetoothAccessLevel(enum.IntEnum):
+ """Enum class for bluetooth profile access levels."""
+ ACCESS_ALLOWED = 1
+ ACCESS_DENIED = 2
+
+
+class BluetoothProfile(enum.IntEnum):
+ """Enum class for bluetooth profile types.
+
+ Should be kept in sync with
+ //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java
+ """
+
+ HEADSET = 1
+ A2DP = 2
+ HEALTH = 3
+ HID_HOST = 4
+ PAN = 5
+ PBAP = 6
+ GATT = 7
+ GATT_SERVER = 8
+ MAP = 9
+ SAP = 10
+ A2DP_SINK = 11
+ AVRCP_CONTROLLER = 12
+ AVRCP = 13
+ HEADSET_CLIENT = 16
+ PBAP_CLIENT = 17
+ MAP_MCE = 18
+ HID_DEVICE = 19
+ OPP = 20
+ HEARING_AID = 21
+
+
+class BluetoothConnectionPolicy(enum.IntEnum):
+ """Enum class for bluetooth bluetooth connection policy.
+
+ bluetooth connection policy as defined in
+ //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java
+ """
+ CONNECTION_POLICY_UNKNOWN = -1
+ CONNECTION_POLICY_FORBIDDEN = 0
+ CONNECTION_POLICY_ALLOWED = 100
+
+
+class BluetoothConnectionStatus(enum.IntEnum):
+ """Enum class for bluetooth connection status.
+
+ Bluetooth connection status as defined in
+ //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java
+ """
+ STATE_DISCONNECTED = 0
+ STATE_CONNECTING = 1
+ STATE_CONNECTED = 2
+ STATE_DISCONNECTING = 3
+
+
+class BluetoothPriorityLevel(enum.IntEnum):
+ """Enum class for bluetooth priority level.
+
+ Priority levels as defined in
+ //frameworks/base/core/java/android/bluetooth/BluetoothProfile.java
+ """
+
+ PRIORITY_AUTO_CONNECT = 1000
+ PRIORITY_ON = 100
+ PRIORITY_OFF = 0
+ PRIORITY_UNDEFINED = -1
+
+
+class BleAdvertiseSettingsMode(enum.IntEnum):
+ """Enum class for BLE advertise settings mode."""
+ LOW_POWER = 0
+ BALANCED = 1
+ LOW_LATENCY = 2
+
+
+class BleAdvertiseSettingsTxPower(enum.IntEnum):
+ """Enum class for BLE advertise settings tx power."""
+ ULTRA_LOW = 0
+ LOW = 1
+ MEDIUM = 2
+ HIGH = 3
+
+
+class LogType(enum.Enum):
+ """Enumeration of device log type."""
+ DEFAULT_VALUE = 'GENERIC'
+ BLUETOOTH_DEVICE_SIMULATOR = 'BDS'
+ ICLEVER_HB01 = 'GENERIC'
+
+
+class CallState(enum.IntEnum):
+ """Enum class for phone call state."""
+ IDLE = 0
+ RINGING = 1
+ OFFHOOK = 2
+
+
+class CallLogType(enum.IntEnum):
+ """Enum class for phone call log type."""
+ INCOMING_CALL = 1
+ OUTGOING_CALL = 2
+ MISSED_CALL = 3
diff --git a/system/blueberry/utils/bt_test_utils.py b/system/blueberry/utils/bt_test_utils.py
new file mode 100644
index 0000000000..e9d6ec0422
--- /dev/null
+++ b/system/blueberry/utils/bt_test_utils.py
@@ -0,0 +1,200 @@
+# Lint as: python3
+"""Utils for blue tooth tests.
+
+Partly ported from acts/framework/acts/test_utils/bt/bt_test_utils.py
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import logging as log
+import os
+import random
+import string
+import time
+import wave
+
+
+def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params):
+ """Converts raw pcm data into wave file.
+
+ Args:
+ pcm_file_path: File path of origin pcm file.
+ wave_file_path: File path of converted wave file.
+ audio_params: A dict with audio configuration.
+ """
+ with open(pcm_file_path, 'rb') as pcm_file:
+ frames = pcm_file.read()
+ write_record_file(wave_file_path, audio_params, frames)
+
+
+def create_vcf_from_vcard(output_path: str,
+ num_of_contacts: int,
+ first_name: str = None,
+ last_name: str = None,
+ phone_number: int = None) -> str:
+ """Creates a vcf file from vCard.
+
+ Args:
+ output_path: Path of the output vcf file.
+ num_of_contacts: Number of contacts to be generated.
+ first_name: First name of the contacts.
+ last_name: Last name of the contacts.
+ phone_number: Phone number of the contacts.
+
+ Returns:
+ vcf_file_path: Path of the output vcf file. E.g.
+ "/<output_path>/contacts_<time>.vcf".
+ """
+ file_name = f'contacts_{int(time.time())}.vcf'
+ vcf_file_path = os.path.join(output_path, file_name)
+ with open(vcf_file_path, 'w+') as f:
+ for i in range(num_of_contacts):
+ lines = []
+ if first_name is None:
+ first_name = 'Person'
+ vcard_last_name = last_name
+ if last_name is None:
+ vcard_last_name = i
+ vcard_phone_number = phone_number
+ if phone_number is None:
+ vcard_phone_number = random.randrange(int(10e10))
+ lines.append('BEGIN:VCARD\n')
+ lines.append('VERSION:2.1\n')
+ lines.append(f'N:{vcard_last_name};{first_name};;;\n')
+ lines.append(f'FN:{first_name} {vcard_last_name}\n')
+ lines.append(f'TEL;CELL:{vcard_phone_number}\n')
+ lines.append(f'EMAIL;PREF:{first_name}{vcard_last_name}@gmail.com\n')
+ lines.append('END:VCARD\n')
+ f.write(''.join(lines))
+ return vcf_file_path
+
+
+def generate_id_by_size(size,
+ chars=(string.ascii_lowercase + string.ascii_uppercase +
+ string.digits)):
+ """Generate random ascii characters of input size and input char types.
+
+ Args:
+ size: Input size of string.
+ chars: (Optional) Chars to use in generating a random string.
+
+ Returns:
+ String of random input chars at the input size.
+ """
+ return ''.join(random.choice(chars) for _ in range(size))
+
+
+def get_duration_seconds(wav_file_path):
+ """Get duration of most recently recorded file.
+
+ Args:
+ wav_file_path: path of the wave file.
+
+ Returns:
+ duration (float): duration of recorded file in seconds.
+ """
+ f = wave.open(wav_file_path, 'r')
+ frames = f.getnframes()
+ rate = f.getframerate()
+ duration = (frames / float(rate))
+ f.close()
+ return duration
+
+
+def wait_until(timeout_sec,
+ condition_func,
+ func_args,
+ expected_value,
+ exception=None,
+ interval_sec=0.5):
+ """Waits until a function returns a expected value or timeout is reached.
+
+ Example usage:
+ ```
+ def is_bluetooth_enabled(device) -> bool:
+ do something and return something...
+
+ # Waits and checks if Bluetooth is turned on.
+ bt_test_utils.wait_until(
+ timeout_sec=10,
+ condition_func=is_bluetooth_enabled,
+ func_args=[dut],
+ expected_value=True,
+ exception=signals.TestFailure('Failed to turn on Bluetooth.'),
+ interval_sec=1)
+ ```
+
+ Args:
+ timeout_sec: float, max waiting time in seconds.
+ condition_func: function, when the condiction function returns the expected
+ value, the waiting mechanism will be interrupted.
+ func_args: tuple or list, the arguments for the condition function.
+ expected_value: a expected value that the condition function returns.
+ exception: Exception, an exception will be raised when timed out if needed.
+ interval_sec: float, interval time between calls of the condition function
+ in seconds.
+
+ Returns:
+ True if the function returns the expected value else False.
+ """
+ start_time = time.time()
+ end_time = start_time + timeout_sec
+ while time.time() < end_time:
+ if condition_func(*func_args) == expected_value:
+ return True
+ time.sleep(interval_sec)
+ args_string = ', '.join(list(map(str, func_args)))
+ log.warning('Timed out after %.1fs waiting for "%s(%s)" to be "%s".',
+ timeout_sec, condition_func.__name__, args_string, expected_value)
+ if exception:
+ raise exception
+ return False
+
+
+def write_read_verify_data_sl4a(client_ad, server_ad, msg, binary=False):
+ """Verify that the client wrote data to the server Android device correctly.
+
+ Args:
+ client_ad: the Android device to perform the write.
+ server_ad: the Android device to read the data written.
+ msg: the message to write.
+ binary: if the msg arg is binary or not.
+
+ Returns:
+ True if the data written matches the data read, false if not.
+ """
+ client_ad.log.info('Write message %s.', msg)
+ if binary:
+ client_ad.sl4a.bluetoothSocketConnWriteBinary(msg)
+ else:
+ client_ad.sl4a.bluetoothSocketConnWrite(msg)
+ server_ad.log.info('Read message %s.', msg)
+ if binary:
+ read_msg = server_ad.sl4a.bluetoothSocketConnReadBinary().rstrip('\r\n')
+ else:
+ read_msg = server_ad.sl4a.bluetoothSocketConnRead()
+ log.info('Verify message.')
+ if msg != read_msg:
+ log.error('Mismatch! Read: %s, Expected: %s', read_msg, msg)
+ return False
+ log.info('Matched! Read: %s, Expected: %s', read_msg, msg)
+ return True
+
+
+def write_record_file(file_name, audio_params, frames):
+ """Writes the recorded audio into the file.
+
+ Args:
+ file_name: The file name for writing the recorded audio.
+ audio_params: A dict with audio configuration.
+ frames: Recorded audio frames.
+ """
+ log.debug('writing frame to %s', file_name)
+ wf = wave.open(file_name, 'wb')
+ wf.setnchannels(audio_params['channel'])
+ wf.setsampwidth(audio_params.get('sample_width', 1))
+ wf.setframerate(audio_params['sample_rate'])
+ wf.writeframes(frames)
+ wf.close()
diff --git a/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py b/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py
new file mode 100644
index 0000000000..3a186b8785
--- /dev/null
+++ b/system/blueberry/utils/command_line_runner/run_bluetooth_tests.py
@@ -0,0 +1,248 @@
+"""Command-line test runner script for running Bluetooth tests.
+
+This module allows users to initiate Bluetooth test targets and run them against
+specified DUTs (devices-under-test) using a simple command line interface.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+from __future__ import print_function
+
+import base64
+
+from absl import app
+from absl import flags
+from absl import logging
+
+# Internal import
+# Internal import
+# Internal import
+# Internal import
+# Internal import
+
+FLAGS = flags.FLAGS
+
+flags.DEFINE_multi_string('bt_test', None, 'Bluetooth test to run.')
+flags.DEFINE_multi_string('bt_dut', None,
+ 'Bluetooth device to allocate for tests.')
+
+
+# Valid config keys for the --bt_test command line flag.
+BT_TEST_CONFIG_KEYS = {'target'}
+
+# Valid config keys for the --bt_dut (device-under-test) command line flag.
+BT_DUT_CONFIG_KEYS = {'hardware'}
+
+TEST_FLAGS = ('--notest_loasd --test_output=streamed '
+ '--test_arg=--param_dut_config=%s')
+
+
+class Error(Exception):
+ """Base class for module exceptions."""
+ pass
+
+
+class TestConfigError(Error):
+ """Raised when --bt_test config flags are specified incorrectly."""
+ pass
+
+
+class DutConfigError(Error):
+ """Raised when --bt_dut config flags are specified incorrectly."""
+ pass
+
+
+def validate_bt_test_flags(flag_value):
+ """Validates the format of specified --bt_test flags.
+
+ Args:
+ flag_value: string, the config flag value for a given --bt_test flag.
+
+ Returns:
+ bool, True if --bt_test flags have been specified correctly.
+ """
+ if not flag_value:
+ logging.error('No tests specified! Please specify at least one '
+ 'test using the --bt_test flag.')
+ return False
+ for test in flag_value:
+ config_args = test.split(',')
+ for config_arg in config_args:
+ if config_arg.split('=')[0] not in BT_TEST_CONFIG_KEYS:
+ logging.error('--bt_test config key "%s" is invalid!',
+ config_arg.split('=')[0])
+ return False
+ return True
+
+
+def validate_bt_dut_flags(flag_value):
+ """Validates the format of specified --bt_dut flags.
+
+ Args:
+ flag_value: string, the config flag value for a given --bt_dut flag.
+
+ Returns:
+ bool, True if --bt_dut flags have been specified correctly.
+ """
+ if not flag_value:
+ logging.error('No DUTs specified! Please specify at least one '
+ 'DUT using the --bt_dut flag.')
+ return False
+ for dut in flag_value:
+ config_args = dut.split(',')
+ for config_arg in config_args:
+ if config_arg.split('=')[0] not in BT_DUT_CONFIG_KEYS:
+ logging.error('--bt_dut config key "%s" is invalid!',
+ config_arg.split('=')[0])
+ return False
+ return True
+
+flags.register_validator(
+ 'bt_test', validate_bt_test_flags,
+ ('Invalid --bt_test configuration specified!'
+ ' Valid configuration fields include: %s')
+ % BT_TEST_CONFIG_KEYS)
+
+
+flags.register_validator(
+ 'bt_dut', validate_bt_dut_flags,
+ ('Invalid --bt_dut configuration specified!'
+ ' Valid configuration fields include: %s')
+ % BT_DUT_CONFIG_KEYS)
+
+
+def parse_flag_value(flag_value):
+ """Parses a config flag value string into a dict.
+
+ Example input: 'target=//tests:bluetooth_pairing_test'
+ Example output: {'target': '//tests:bluetooth_pairing_test'}
+
+ Args:
+ flag_value: string, the config flag value for a given flag.
+
+ Returns:
+ dict, A dict object representation of a config flag value.
+ """
+ config_dict = {}
+ config_args = flag_value.split(',')
+ for config_arg in config_args:
+ config_dict[config_arg.split('=')[0]] = config_arg.split('=')[1]
+ return config_dict
+
+
+def get_device_type(gateway_stub, dut_config_dict):
+ """Determines a device type based on a device query.
+
+ Args:
+ gateway_stub: An RPC2 stub object.
+ dut_config_dict: dict, A dict of device config args.
+
+ Returns:
+ string, The MobileHarness device type.
+
+ Raises:
+ DutConfigError: If --bt_dut flag(s) are incorrectly specified.
+ """
+ device_query_filter = device_query_pb2.DeviceQueryFilter()
+ device_query_filter.type_regex.append('AndroidRealDevice')
+ for dut_config_key in dut_config_dict:
+ dimension_filter = device_query_filter.dimension_filter.add()
+ dimension_filter.name = dut_config_key
+ dimension_filter.value_regex = dut_config_dict[dut_config_key]
+ request = gateway_service_pb2.QueryDeviceRequest(
+ device_query_filter=device_query_filter)
+ response = gateway_stub.QueryDevice(request)
+ if response.device_query_result.device_info:
+ return 'AndroidRealDevice'
+
+ device_query_filter.ClearField('type_regex')
+ device_query_filter.type_regex.append('TestbedDevice')
+ request = gateway_service_pb2.QueryDeviceRequest(
+ device_query_filter=device_query_filter)
+ response = gateway_stub.QueryDevice(request)
+ if response.device_query_result.device_info:
+ return 'TestbedDevice'
+
+ raise DutConfigError('Invalid --bt_dut config specified: %s' %
+ dut_config_dict)
+
+
+def generate_dut_configs(gateway_stub):
+ """Generates a unicode string specifying the desired DUT configurations.
+
+ Args:
+ gateway_stub: An RPC2 stub object.
+
+ Returns:
+ string, Unicode string specifying DUT configurations.
+
+ Raises:
+ DutConfigError: If --bt_dut flag(s) are incorrectly specified.
+ """
+ dut_list = job_config_pb2.JobConfig().DeviceList()
+ dut_config_dict_list = [parse_flag_value(value) for value in FLAGS.bt_dut]
+
+ for dut_config_dict in dut_config_dict_list:
+ dut_config_dict['pool'] = 'bluetooth-iop'
+ dut = job_config_pb2.JobConfig().SubDeviceSpec()
+ if 'hardware' not in dut_config_dict:
+ raise DutConfigError('Must specify hardware name for bt_dut: %s' %
+ dut_config_dict)
+ dut.type = get_device_type(gateway_stub, dut_config_dict)
+ for config_key in dut_config_dict:
+ dut.dimensions.content[config_key] = dut_config_dict[config_key]
+ dut_list.sub_device_spec.append(dut)
+ logging.info(base64.b64encode(dut_list.SerializeToString()).decode('utf-8'))
+ return base64.b64encode(dut_list.SerializeToString()).decode('utf-8')
+
+
+def generate_blaze_targets(session_config, gateway_stub):
+ """Generates and appends blaze test targets to a MobileHarness session.
+
+ Args:
+ session_config: The SessionConfig object to append blaze test targets to.
+ gateway_stub: An RPC2 stub object.
+
+ Raises:
+ TestConfigError: If --bt_test flag(s) are incorrectly specified.
+ """
+ test_config_dict_list = [parse_flag_value(value) for value in FLAGS.bt_test]
+
+ for test_config_dict in test_config_dict_list:
+ target = setting_pb2.BlazeTarget()
+ if 'target' not in test_config_dict:
+ raise TestConfigError('Must specify a target for bt_test: %s' %
+ test_config_dict)
+ target.target_name = test_config_dict['target']
+ target.test_flags = TEST_FLAGS % generate_dut_configs(gateway_stub)
+ session_config.blaze_target.append(target)
+
+
+def run_session():
+ """Runs a configured test session.
+
+ Returns:
+ A RunSessionResponse object.
+ """
+ session_config = setting_pb2.SessionConfig()
+ channel = rpcutil.GetNewChannel('blade:mobileharness-gateway')
+ gateway_stub = gateway_service_pb2.GatewayService.NewRPC2Stub(channel=channel)
+ generate_blaze_targets(session_config, gateway_stub)
+ request = gateway_service_pb2.RunSessionRequest()
+ request.session_config.CopyFrom(session_config)
+ response = gateway_stub.RunSession(request)
+ logging.info('Sponge link: %s', response.sponge)
+ logging.info('Session ID: %s', response.session_id)
+ return response
+
+
+def main(argv):
+ logging.use_python_logging()
+ del argv
+ run_session()
+
+if __name__ == '__main__':
+ flags.mark_flag_as_required('bt_test')
+ flags.mark_flag_as_required('bt_dut')
+ app.run(main)
diff --git a/system/blueberry/utils/metrics_utils.py b/system/blueberry/utils/metrics_utils.py
new file mode 100644
index 0000000000..b62b9e921c
--- /dev/null
+++ b/system/blueberry/utils/metrics_utils.py
@@ -0,0 +1,111 @@
+"""Metrics reporting module for Blueberry using protobuf.
+
+Internal reference
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+from __future__ import print_function
+
+import base64
+import logging
+import time
+
+# Internal import
+
+
+class BluetoothMetricLogger(object):
+ """A class used for gathering metrics from tests and devices.
+
+ This class provides methods to allow test writers to easily export metrics
+ from their tests as protobuf messages.
+
+ Attributes:
+ _metrics: The Bluetooth test proto message to add metrics to.
+ """
+
+ def __init__(self, bluetooth_test_proto_message):
+ self._metrics = bluetooth_test_proto_message
+ self._start_time = int(time.time())
+
+ def add_primary_device_metrics(self, device):
+ """Adds primary device metrics to the test proto message.
+
+ Args:
+ device: The Bluetooth device object to gather device metrics from.
+ """
+ device_message = self._metrics.configuration_data.primary_device
+ message_fields = device_message.DESCRIPTOR.fields_by_name.keys()
+ try:
+ device_metrics_dict = device.get_device_info()
+ except AttributeError:
+ logging.info(
+ 'Must implement get_device_info method for this controller in order to upload device metrics.'
+ )
+ return
+
+ for metric in device_metrics_dict:
+ if metric in message_fields:
+ setattr(device_message, metric, device_metrics_dict[metric])
+ else:
+ logging.info('%s is not a valid metric field.', metric)
+
+ def add_connected_device_metrics(self, device):
+ """Adds connected device metrics to the test proto message.
+
+ Args:
+ device: The Bluetooth device object to gather device metrics from.
+ """
+ device_message = self._metrics.configuration_data.connected_device
+ message_fields = device_message.DESCRIPTOR.fields_by_name.keys()
+ try:
+ device_metrics_dict = device.get_device_info()
+ except AttributeError:
+ logging.info(
+ 'Must implement get_device_info method for this controller in order to upload device metrics.'
+ )
+ return
+
+ for metric in device_metrics_dict:
+ if metric in message_fields:
+ setattr(device_message, metric, device_metrics_dict[metric])
+ else:
+ logging.warning('%s is not a valid metric field.', metric)
+
+ def add_test_metrics(self, test_metrics_dict):
+ """Adds test metrics to the test proto message.
+
+ Args:
+ test_metrics_dict: A dictionary of metrics to add to the test proto
+ message. Metric will only be added if the key exists as a field in the
+ test proto message.
+ """
+ if hasattr(self._metrics, 'configuration_data'):
+ self._metrics.configuration_data.test_date_time = self._start_time
+ message_fields = self._metrics.DESCRIPTOR.fields_by_name.keys()
+ for metric in test_metrics_dict:
+ if metric in message_fields:
+ metric_value = test_metrics_dict[metric]
+ if isinstance(metric_value, (list, tuple)):
+ getattr(self._metrics, metric).extend(metric_value)
+ else:
+ setattr(self._metrics, metric, metric_value)
+ else:
+ logging.warning('%s is not a valid metric field.', metric)
+
+ def proto_message_to_base64(self):
+ """Converts a proto message to a base64 string.
+
+ Returns:
+ string, Message formatted as a base64 string.
+ """
+ return base64.b64encode(self._metrics.SerializeToString()).decode('utf-8')
+
+ def proto_message_to_ascii(self):
+ """Converts a proto message to an ASCII string.
+
+ Returns:
+ string, Message formatted as an ASCII string. Useful for debugging.
+ """
+ return text_format.MessageToString(self._metrics)