diff options
| author | 2019-12-10 16:41:59 -0800 | |
|---|---|---|
| committer | 2019-12-11 12:29:31 -0800 | |
| commit | 8198497fcfbe9811bc9a6c538e09f853561290a8 (patch) | |
| tree | 29bb8fa7df694f8f218d9d11f6240541e7944ddc | |
| parent | cb11407c93016592a335105fa726c2310a4cea87 (diff) | |
Python: format all Python files in system/bt using YAPF
python3 yapf -p -i $(git ls-tree --name-only -r aosp/master | grep "\.py")
Bug: 146016811
Test: run Python tests
Change-Id: Ic5fe6a21151d1abc8eb013f8c8070ba8238a5249
24 files changed, 2681 insertions, 2336 deletions
diff --git a/system/build/toolchain/clang/get_clang_suffix.py b/system/build/toolchain/clang/get_clang_suffix.py index dedacdd51d..fbfe16c959 100644 --- a/system/build/toolchain/clang/get_clang_suffix.py +++ b/system/build/toolchain/clang/get_clang_suffix.py @@ -3,41 +3,43 @@ import subprocess import re import sys + def which(cmd): - for p in os.environ["PATH"].split(os.pathsep): - clang_path = os.path.join(p, cmd) - if os.path.exists(clang_path): - return clang_path - return None + for p in os.environ["PATH"].split(os.pathsep): + clang_path = os.path.join(p, cmd) + if os.path.exists(clang_path): + return clang_path + return None + -CLANG_VERSION_REGEX=".*version\s*([0-9]*\.[0-9]*)\.*" +CLANG_VERSION_REGEX = ".*version\s*([0-9]*\.[0-9]*)\.*" clang_path = which("clang++") clang_version_major = 0 clang_version_minor = 0 if clang_path: - clang_version_out = subprocess.Popen([clang_path, "--version"], - stdout=subprocess.PIPE).communicate()[0] - clang_version_match = re.search(CLANG_VERSION_REGEX, clang_version_out) - clang_version_str = clang_version_match.group(1) - clang_version_array = clang_version_str.split('.') - clang_version_major = int(clang_version_array[0]) - clang_version_minor = int(clang_version_array[1]) + clang_version_out = subprocess.Popen( + [clang_path, "--version"], stdout=subprocess.PIPE).communicate()[0] + clang_version_match = re.search(CLANG_VERSION_REGEX, clang_version_out) + clang_version_str = clang_version_match.group(1) + clang_version_array = clang_version_str.split('.') + clang_version_major = int(clang_version_array[0]) + clang_version_minor = int(clang_version_array[1]) if clang_version_major >= 3 and clang_version_minor >= 5: - print "" + print "" else: - # Loop in support clang version only - clang_version_major = 3 - clang_version_minor = 9 - while clang_version_major >= 3 and clang_version_minor >= 5: - clang_version_str = "%d.%d" % (clang_version_major, clang_version_minor) - clang_path = which("clang++-" + clang_version_str) - if clang_path: - print clang_version_str - sys.exit(0) - clang_version_minor -= 1 - if clang_version_minor < 0: - clang_version_minor = 9 - clang_version_major -= 1 - print "None" + # Loop in support clang version only + clang_version_major = 3 + clang_version_minor = 9 + while clang_version_major >= 3 and clang_version_minor >= 5: + clang_version_str = "%d.%d" % (clang_version_major, clang_version_minor) + clang_path = which("clang++-" + clang_version_str) + if clang_path: + print clang_version_str + sys.exit(0) + clang_version_minor -= 1 + if clang_version_minor < 0: + clang_version_minor = 9 + clang_version_major -= 1 + print "None" diff --git a/system/gd/cert/bluetooth_packets_python3_setup.py b/system/gd/cert/bluetooth_packets_python3_setup.py index 33c8293fde..7cca0196ae 100644 --- a/system/gd/cert/bluetooth_packets_python3_setup.py +++ b/system/gd/cert/bluetooth_packets_python3_setup.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - # Usage: # 1. Run envsetup and lunch first in an Android checkout # 2. Make target bluetooth_packets_python3 that will generate C++ sources for the @@ -25,7 +24,6 @@ # 4. Install: # python3 bluetooth_packets_python3_setup.py install --user - import os import glob from setuptools import setup, Extension @@ -34,10 +32,13 @@ ANDROID_BUILD_TOP = os.getenv("ANDROID_BUILD_TOP") PYBIND11_INCLUDE_DIR = os.path.join(ANDROID_BUILD_TOP, "external/python/pybind11/include") GD_DIR = os.path.join(ANDROID_BUILD_TOP, "packages/modules/Bluetooth/system/gd") -BT_PACKETS_GEN_DIR = os.path.join(ANDROID_BUILD_TOP, - "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_h/gen") -BT_PACKETS_PY3_GEN_DIR = os.path.join(ANDROID_BUILD_TOP, - "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_python3_cc/gen") +BT_PACKETS_GEN_DIR = os.path.join( + ANDROID_BUILD_TOP, + "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_h/gen") +BT_PACKETS_PY3_GEN_DIR = os.path.join( + ANDROID_BUILD_TOP, + "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_python3_cc/gen" +) BT_PACKETS_BASE_SRCS = [ os.path.join(GD_DIR, "l2cap/fcs.cc"), @@ -57,19 +58,19 @@ BT_PACKETS_PY3_SRCs = \ + glob.glob(os.path.join(BT_PACKETS_PY3_GEN_DIR, "l2cap", "*.cc")) \ + glob.glob(os.path.join(BT_PACKETS_PY3_GEN_DIR, "security", "*.cc")) -bluetooth_packets_python3_module = Extension('bluetooth_packets_python3', - sources=BT_PACKETS_BASE_SRCS + BT_PACKETS_PY3_SRCs, - include_dirs=[GD_DIR, - BT_PACKETS_GEN_DIR, - BT_PACKETS_PY3_GEN_DIR, - PYBIND11_INCLUDE_DIR], - extra_compile_args=['-std=c++17'] - ) +bluetooth_packets_python3_module = Extension( + 'bluetooth_packets_python3', + sources=BT_PACKETS_BASE_SRCS + BT_PACKETS_PY3_SRCs, + include_dirs=[ + GD_DIR, BT_PACKETS_GEN_DIR, BT_PACKETS_PY3_GEN_DIR, PYBIND11_INCLUDE_DIR + ], + extra_compile_args=['-std=c++17']) -setup(name='bluetooth_packets_python3', - version='1.0', - author="Android Open Source Project", - description="""Bluetooth Packet Library""", - ext_modules=[bluetooth_packets_python3_module], - py_modules=["bluetooth_packets_python3"], - ) +setup( + name='bluetooth_packets_python3', + version='1.0', + author="Android Open Source Project", + description="""Bluetooth Packet Library""", + ext_modules=[bluetooth_packets_python3_module], + py_modules=["bluetooth_packets_python3"], +) diff --git a/system/gd/cert/event_asserts.py b/system/gd/cert/event_asserts.py index a479c55299..1ce118967f 100644 --- a/system/gd/cert/event_asserts.py +++ b/system/gd/cert/event_asserts.py @@ -24,6 +24,7 @@ from google.protobuf import text_format from cert.event_callback_stream import EventCallbackStream + class EventAsserts(object): """ A class that handles various asserts with respect to a gRPC unary stream @@ -46,7 +47,7 @@ class EventAsserts(object): raise ValueError("event_callback_stream cannot be None") self.event_callback_stream = event_callback_stream self.event_queue = SimpleQueue() - self.callback = lambda event : self.event_queue.put(event) + self.callback = lambda event: self.event_queue.put(event) self.event_callback_stream.register_callback(self.callback) def __del__(self): @@ -62,15 +63,16 @@ class EventAsserts(object): logging.debug("assert_none") try: event = self.event_queue.get(timeout=timeout.seconds) - asserts.assert_equal(event, None, - msg=( - "Expected None, but got %s" % text_format.MessageToString( - event, as_one_line=True))) + asserts.assert_equal( + event, + None, + msg=("Expected None, but got %s" % text_format.MessageToString( + event, as_one_line=True))) except Empty: return - def assert_none_matching(self, match_fn, - timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): + def assert_none_matching( + self, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): """ Assert no events where match_fn(event) is True happen within timeout period @@ -88,23 +90,24 @@ class EventAsserts(object): logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() - current_event = self.event_queue.get( - timeout=timeout_seconds) + current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds if match_fn(current_event): event = current_event except Empty: continue - logging.debug( - "Done waiting for event, got %s" % text_format.MessageToString( - event, as_one_line=True)) - asserts.assert_equal(event, None, - msg=( - "Expected None matching, but got %s" % text_format.MessageToString( - event, as_one_line=True))) - - def assert_event_occurs(self, match_fn, at_least_times=1, + logging.debug("Done waiting for event, got %s" % + text_format.MessageToString(event, as_one_line=True)) + asserts.assert_equal( + event, + None, + msg=("Expected None matching, but got %s" % + text_format.MessageToString(event, as_one_line=True))) + + def assert_event_occurs(self, + match_fn, + at_least_times=1, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): """ Assert at least |at_least_times| instances of events happen where @@ -125,22 +128,25 @@ class EventAsserts(object): logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() - current_event = self.event_queue.get( - timeout=timeout_seconds) + current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds if match_fn(current_event): event.append(current_event) except Empty: continue - logging.debug( - "Done waiting for event, got %s" % text_format.MessageToString( - event, as_one_line=True)) - asserts.assert_true(len(event) == at_least_times, - msg=("Expected at least %d events, but got %d" % at_least_times, len(event))) - - def assert_event_occurs_at_most(self, match_fn, at_most_times, - timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): + logging.debug("Done waiting for event, got %s" % + text_format.MessageToString(event, as_one_line=True)) + asserts.assert_true( + len(event) == at_least_times, + msg=("Expected at least %d events, but got %d" % at_least_times, + len(event))) + + def assert_event_occurs_at_most( + self, + match_fn, + at_most_times, + timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)): """ Assert at most |at_most_times| instances of events happen where match_fn(event) returns True within timeout period @@ -160,16 +166,16 @@ class EventAsserts(object): logging.debug("Waiting for event iteration %d" % iter_count) try: time_before = datetime.now() - current_event = self.event_queue.get( - timeout=timeout_seconds) + current_event = self.event_queue.get(timeout=timeout_seconds) time_elapsed = datetime.now() - time_before timeout_seconds -= time_elapsed.seconds if match_fn(current_event): event.append(current_event) except Empty: continue - logging.debug( - "Done waiting for event, got %s" % text_format.MessageToString( - event, as_one_line=True)) - asserts.assert_true(len(event) <= at_most_times, - msg=("Expected at most %d events, but got %d" % at_most_times, len(event)))
\ No newline at end of file + logging.debug("Done waiting for event, got %s" % + text_format.MessageToString(event, as_one_line=True)) + asserts.assert_true( + len(event) <= at_most_times, + msg=("Expected at most %d events, but got %d" % at_most_times, + len(event))) diff --git a/system/gd/cert/gd_base_test.py b/system/gd/cert/gd_base_test.py index a103ecccb2..3c5186af0e 100644 --- a/system/gd/cert/gd_base_test.py +++ b/system/gd/cert/gd_base_test.py @@ -25,12 +25,17 @@ import subprocess ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP') -sys.path.append(ANDROID_BUILD_TOP + '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen') +sys.path.append( + ANDROID_BUILD_TOP + + '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen' +) ANDROID_HOST_OUT = os.environ.get('ANDROID_HOST_OUT') ROOTCANAL = ANDROID_HOST_OUT + "/nativetest64/root-canal/root-canal" + class GdBaseTestClass(BaseTestClass): + def __init__(self, configs): BaseTestClass.__init__(self, configs) @@ -41,7 +46,8 @@ class GdBaseTestClass(BaseTestClass): self.rootcanal_running = False if 'rootcanal' in self.controller_configs: self.rootcanal_running = True - rootcanal_logpath = os.path.join(log_path_base, 'rootcanal_logs.txt') + rootcanal_logpath = os.path.join(log_path_base, + 'rootcanal_logs.txt') self.rootcanal_logs = open(rootcanal_logpath, 'w') rootcanal_config = self.controller_configs['rootcanal'] rootcanal_hci_port = str(rootcanal_config.get("hci_port", "6402")) @@ -55,19 +61,16 @@ class GdBaseTestClass(BaseTestClass): cwd=ANDROID_BUILD_TOP, env=os.environ.copy(), stdout=self.rootcanal_logs, - stderr=self.rootcanal_logs - ) + stderr=self.rootcanal_logs) for gd_device in gd_devices: gd_device["rootcanal_port"] = rootcanal_hci_port for gd_cert_device in gd_cert_devices: gd_cert_device["rootcanal_port"] = rootcanal_hci_port self.register_controller( - importlib.import_module('cert.gd_device'), - builtin=True) + importlib.import_module('cert.gd_device'), builtin=True) self.register_controller( - importlib.import_module('cert.gd_cert_device'), - builtin=True) + importlib.import_module('cert.gd_cert_device'), builtin=True) def teardown_class(self): if self.rootcanal_running: @@ -76,6 +79,6 @@ class GdBaseTestClass(BaseTestClass): self.rootcanal_logs.close() if rootcanal_return_code != 0 and\ rootcanal_return_code != -signal.SIGINT: - logging.error("rootcanal stopped with code: %d" % - rootcanal_return_code) + logging.error( + "rootcanal stopped with code: %d" % rootcanal_return_code) return False diff --git a/system/gd/cert/gd_cert_device.py b/system/gd/cert/gd_cert_device.py index 45f05397cc..b8b23480e1 100644 --- a/system/gd/cert/gd_cert_device.py +++ b/system/gd/cert/gd_cert_device.py @@ -25,6 +25,7 @@ from l2cap.classic.cert import api_pb2_grpc as l2cap_cert_pb2_grpc ACTS_CONTROLLER_CONFIG_NAME = "GdCertDevice" ACTS_CONTROLLER_REFERENCE_NAME = "gd_cert_devices" + def create(configs): if not configs: raise GdDeviceConfigError("Configuration is empty") @@ -52,20 +53,25 @@ def get_instances_with_configs(configs): resolved_cmd = [] for entry in config["cmd"]: resolved_cmd.append(replace_vars(entry, config)) - devices.append(GdCertDevice(config["grpc_port"], - config["grpc_root_server_port"], - config["signal_port"], - resolved_cmd, config["label"])) + devices.append( + GdCertDevice(config["grpc_port"], config["grpc_root_server_port"], + config["signal_port"], resolved_cmd, config["label"])) return devices + class GdCertDevice(GdDeviceBase): - def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, label): + + def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, + label): super().__init__(grpc_port, grpc_root_server_port, signal_port, cmd, label, ACTS_CONTROLLER_CONFIG_NAME) # Cert stubs - self.rootservice = cert_rootservice_pb2_grpc.RootCertStub(self.grpc_root_server_channel) + self.rootservice = cert_rootservice_pb2_grpc.RootCertStub( + self.grpc_root_server_channel) self.hal = hal_cert_pb2_grpc.HciHalCertStub(self.grpc_channel) - self.controller_read_only_property = cert_rootservice_pb2_grpc.ReadOnlyPropertyStub(self.grpc_channel) + self.controller_read_only_property = cert_rootservice_pb2_grpc.ReadOnlyPropertyStub( + self.grpc_channel) self.hci = hci_cert_pb2_grpc.AclManagerCertStub(self.grpc_channel) - self.l2cap = l2cap_cert_pb2_grpc.L2capClassicModuleCertStub(self.grpc_channel) + self.l2cap = l2cap_cert_pb2_grpc.L2capClassicModuleCertStub( + self.grpc_channel) diff --git a/system/gd/cert/gd_device.py b/system/gd/cert/gd_device.py index a02c33846c..9f8b6dea3a 100644 --- a/system/gd/cert/gd_device.py +++ b/system/gd/cert/gd_device.py @@ -26,6 +26,7 @@ from l2cap.classic import facade_pb2_grpc as l2cap_facade_pb2_grpc ACTS_CONTROLLER_CONFIG_NAME = "GdDevice" ACTS_CONTROLLER_REFERENCE_NAME = "gd_devices" + def create(configs): if not configs: raise GdDeviceConfigError("Configuration is empty") @@ -53,23 +54,29 @@ def get_instances_with_configs(configs): resolved_cmd = [] for entry in config["cmd"]: resolved_cmd.append(replace_vars(entry, config)) - devices.append(GdDevice(config["grpc_port"], - config["grpc_root_server_port"], - config["signal_port"], - resolved_cmd, config["label"])) + devices.append( + GdDevice(config["grpc_port"], config["grpc_root_server_port"], + config["signal_port"], resolved_cmd, config["label"])) return devices + class GdDevice(GdDeviceBase): - def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, label): + + def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, + label): super().__init__(grpc_port, grpc_root_server_port, signal_port, cmd, label, ACTS_CONTROLLER_CONFIG_NAME) # Facade stubs - self.rootservice = facade_rootservice_pb2_grpc.RootFacadeStub(self.grpc_root_server_channel) + self.rootservice = facade_rootservice_pb2_grpc.RootFacadeStub( + self.grpc_root_server_channel) self.hal = hal_facade_pb2_grpc.HciHalFacadeStub(self.grpc_channel) - self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub(self.grpc_channel) + self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub( + self.grpc_channel) self.hci = hci_facade_pb2_grpc.AclManagerFacadeStub(self.grpc_channel) - self.hci_classic_security = hci_facade_pb2_grpc.ClassicSecurityManagerFacadeStub(self.grpc_channel) - self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(self.grpc_channel) - self.hci_le_advertising_manager = le_advertising_manager_facade_pb2_grpc.LeAdvertisingManagerFacadeStub(self.grpc_channel) - + self.hci_classic_security = hci_facade_pb2_grpc.ClassicSecurityManagerFacadeStub( + self.grpc_channel) + self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub( + self.grpc_channel) + self.hci_le_advertising_manager = le_advertising_manager_facade_pb2_grpc.LeAdvertisingManagerFacadeStub( + self.grpc_channel) diff --git a/system/gd/cert/gd_device_base.py b/system/gd/cert/gd_device_base.py index df5f9f23a3..7110212608 100644 --- a/system/gd/cert/gd_device_base.py +++ b/system/gd/cert/gd_device_base.py @@ -32,6 +32,7 @@ ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP') ANDROID_HOST_OUT = os.environ.get('ANDROID_HOST_OUT') WAIT_CHANNEL_READY_TIMEOUT = 10 + def replace_vars(string, config): serial_number = config.get("serial_number") if serial_number is None: @@ -46,7 +47,9 @@ def replace_vars(string, config): .replace("$(signal_port)", config.get("signal_port")) \ .replace("$(serial_number)", serial_number) + class GdDeviceBase: + def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, label, type_identifier): self.label = label if label is not None else grpc_port @@ -55,7 +58,7 @@ class GdDeviceBase: self.log = tracelogger.TraceLogger( GdDeviceBaseLoggerAdapter(logging.getLogger(), { 'device': label, - 'type_identifier' : type_identifier + 'type_identifier': type_identifier })) backing_process_logpath = os.path.join( @@ -64,12 +67,13 @@ class GdDeviceBase: cmd_str = json.dumps(cmd) if "--btsnoop=" not in cmd_str: - btsnoop_path = os.path.join(log_path_base, '%s_btsnoop_hci.log' % label) + btsnoop_path = os.path.join(log_path_base, + '%s_btsnoop_hci.log' % label) cmd.append("--btsnoop=" + btsnoop_path) tester_signal_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - tester_signal_socket.setsockopt( - socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + tester_signal_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, + 1) socket_address = ('localhost', int(signal_port)) tester_signal_socket.bind(socket_address) tester_signal_socket.listen(1) @@ -83,7 +87,8 @@ class GdDeviceBase: tester_signal_socket.accept() tester_signal_socket.close() - self.grpc_root_server_channel = grpc.insecure_channel("localhost:" + grpc_root_server_port) + self.grpc_root_server_channel = grpc.insecure_channel( + "localhost:" + grpc_root_server_port) self.grpc_port = int(grpc_port) self.grpc_channel = grpc.insecure_channel("localhost:" + grpc_port) @@ -101,21 +106,22 @@ class GdDeviceBase: def wait_channel_ready(self): future = grpc.channel_ready_future(self.grpc_channel) try: - future.result(timeout = WAIT_CHANNEL_READY_TIMEOUT) + future.result(timeout=WAIT_CHANNEL_READY_TIMEOUT) except grpc.FutureTimeoutError: - logging.error("wait channel ready timeout") - + logging.error("wait channel ready timeout") class GdDeviceBaseLoggerAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): - msg = "[%s|%s] %s" % (self.extra["type_identifier"], self.extra["device"], msg) + msg = "[%s|%s] %s" % (self.extra["type_identifier"], + self.extra["device"], msg) return (msg, kwargs) + class GdDeviceConfigError(Exception): """Raised when GdDevice configs are malformatted.""" class GdDeviceError(error.ActsError): """Raised when there is an error in GdDevice.""" - diff --git a/system/gd/cert/pts_base_test.py b/system/gd/cert/pts_base_test.py index 1249e1ff44..c849e150e9 100644 --- a/system/gd/cert/pts_base_test.py +++ b/system/gd/cert/pts_base_test.py @@ -25,15 +25,18 @@ import subprocess ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP') -sys.path.append(ANDROID_BUILD_TOP + '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen') +sys.path.append( + ANDROID_BUILD_TOP + + '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen' +) class PTSBaseTestClass(BaseTestClass): + def __init__(self, configs): BaseTestClass.__init__(self, configs) gd_devices = self.controller_configs.get("GdDevice") self.register_controller( - importlib.import_module('cert.gd_device'), - builtin=True) + importlib.import_module('cert.gd_device'), builtin=True) diff --git a/system/gd/hal/cert/simple_hal_test.py b/system/gd/hal/cert/simple_hal_test.py index edfa78dda8..44038ccfd1 100644 --- a/system/gd/hal/cert/simple_hal_test.py +++ b/system/gd/hal/cert/simple_hal_test.py @@ -45,15 +45,11 @@ class SimpleHalTest(GdBaseTestClass): self.device_under_test.rootservice.StartStack( facade_rootservice_pb2.StartStackRequest( module_under_test=facade_rootservice_pb2.BluetoothModule.Value( - 'HAL'), - ) - ) + 'HAL'),)) self.cert_device.rootservice.StartStack( cert_rootservice_pb2.StartStackRequest( module_to_test=cert_rootservice_pb2.BluetoothModule.Value( - 'HAL'), - ) - ) + 'HAL'),)) self.device_under_test.wait_channel_ready() self.cert_device.wait_channel_ready() @@ -63,114 +59,99 @@ class SimpleHalTest(GdBaseTestClass): def teardown_test(self): self.device_under_test.rootservice.StopStack( - facade_rootservice_pb2.StopStackRequest() - ) + facade_rootservice_pb2.StopStackRequest()) self.cert_device.rootservice.StopStack( - cert_rootservice_pb2.StopStackRequest() - ) + cert_rootservice_pb2.StopStackRequest()) def test_none_event(self): - with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream: + with EventCallbackStream( + self.device_under_test.hal.FetchHciEvent( + empty_pb2.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) hci_event_asserts.assert_none(timeout=timedelta(seconds=1)) def test_example(self): response = self.device_under_test.hal.SetLoopbackMode( - hal_facade_pb2.LoopbackModeSettings(enable=True) - ) + hal_facade_pb2.LoopbackModeSettings(enable=True)) def test_fetch_hci_event(self): self.device_under_test.hal.SetLoopbackMode( - hal_facade_pb2.LoopbackModeSettings(enable=True) - ) + hal_facade_pb2.LoopbackModeSettings(enable=True)) - with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream: + with EventCallbackStream( + self.device_under_test.hal.FetchHciEvent( + empty_pb2.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x01\x04\x053\x8b\x9e0\x01' - ) - ) + payload=b'\x01\x04\x053\x8b\x9e0\x01')) hci_event_asserts.assert_event_occurs( - lambda packet: packet.payload == b'\x19\x08\x01\x04\x053\x8b\x9e0\x01') + lambda packet: packet.payload == b'\x19\x08\x01\x04\x053\x8b\x9e0\x01' + ) def test_inquiry_from_dut(self): - with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream: + with EventCallbackStream( + self.device_under_test.hal.FetchHciEvent( + empty_pb2.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) self.cert_device.hal.SetScanMode( - hal_cert_pb2.ScanModeSettings(mode=3) - ) + hal_cert_pb2.ScanModeSettings(mode=3)) self.device_under_test.hal.SetInquiry( - hal_facade_pb2.InquirySettings(length=0x30, num_responses=0xff) - ) + hal_facade_pb2.InquirySettings(length=0x30, num_responses=0xff)) hci_event_asserts.assert_event_occurs( lambda packet: b'\x02\x0f' in packet.payload # Expecting an HCI Event (code 0x02, length 0x0f) ) def test_le_ad_scan_cert_advertises(self): - with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream: + with EventCallbackStream( + self.device_under_test.hal.FetchHciEvent( + empty_pb2.Empty())) as hci_event_stream: hci_event_asserts = EventAsserts(hci_event_stream) # Set the LE Address to 0D:05:04:03:02:01 self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D' - ) - ) + payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D')) # Set the LE Scan parameters (active, 40ms, 20ms, Random, self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x0B\x20\x07\x01\x40\x00\x20\x00\x01\x00' - ) - ) + payload=b'\x0B\x20\x07\x01\x40\x00\x20\x00\x01\x00')) # Enable Scanning (Disable duplicate filtering) self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x0C\x20\x02\x01\x00' - ) - ) + payload=b'\x0C\x20\x02\x01\x00')) # Set the LE Address to 0C:05:04:03:02:01 self.cert_device.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C' - ) - ) + payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C')) # Set LE Advertising parameters self.cert_device.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00' - ) - ) + payload= + b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00' + )) # Set LE Advertising data self.cert_device.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - ) - ) + payload= + b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + )) # Enable Advertising self.cert_device.hal.SendHciCommand( - hal_facade_pb2.HciCommandPacket( - payload=b'\x0A\x20\x01\x01' - ) - ) + hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x01')) hci_event_asserts.assert_event_occurs( lambda packet: b'Im_A_Cert' in packet.payload # Expecting an HCI Event (code 0x3e, length 0x13, subevent 0x01 ) ) # Disable Advertising self.cert_device.hal.SendHciCommand( - hal_facade_pb2.HciCommandPacket( - payload=b'\x0A\x20\x01\x00' - ) - ) + hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x00')) # Disable Scanning self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x0C\x20\x02\x00\x00' - ) - ) + payload=b'\x0C\x20\x02\x00\x00')) def test_le_connection_dut_advertises(self): with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream, \ @@ -186,64 +167,50 @@ class SimpleHalTest(GdBaseTestClass): # Set the CERT LE Address to 0C:05:04:03:02:01 self.cert_device.hal.SendHciCommand( hal_cert_pb2.HciCommandPacket( - payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C' - ) - ) + payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C')) # Direct connect to 0D:05:04:03:02:01 self.cert_device.hal.SendHciCommand( hal_cert_pb2.HciCommandPacket( - payload=b'\x0D\x20\x19\x11\x01\x22\x02\x00\x01\x01\x02\x03\x04\x05\x0D\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00' - ) - ) + payload= + b'\x0D\x20\x19\x11\x01\x22\x02\x00\x01\x01\x02\x03\x04\x05\x0D\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00' + )) # Set the LE Address to 0D:05:04:03:02:01 self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D' - ) - ) + payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D')) # Set LE Advertising parameters self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x06\x20\x0F\x80\x00\x00\x04\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00' - ) - ) + payload= + b'\x06\x20\x0F\x80\x00\x00\x04\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00' + )) # Set LE Advertising data self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x08\x20\x20\x0C\x0B\x09Im_The_DUT\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - ) - ) + payload= + b'\x08\x20\x20\x0C\x0B\x09Im_The_DUT\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + )) # Enable Advertising self.device_under_test.hal.SendHciCommand( - hal_facade_pb2.HciCommandPacket( - payload=b'\x0A\x20\x01\x01' - ) - ) + hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x01')) # LeConnectionComplete TODO: Extract the handle cert_hci_event_asserts.assert_event_occurs( - lambda packet: b'\x3e\x13\x01\x00' in packet.payload - ) + lambda packet: b'\x3e\x13\x01\x00' in packet.payload) # LeConnectionComplete TODO: Extract the handle hci_event_asserts.assert_event_occurs( - lambda packet: b'\x3e\x13\x01\x00' in packet.payload - ) + lambda packet: b'\x3e\x13\x01\x00' in packet.payload) # Send ACL Data self.device_under_test.hal.SendHciAcl( hal_facade_pb2.HciAclPacket( - payload=b'\xfe\x0e\x0b\x00SomeAclData' - ) - ) + payload=b'\xfe\x0e\x0b\x00SomeAclData')) # Send ACL Data self.cert_device.hal.SendHciAcl( hal_facade_pb2.HciAclPacket( - payload=b'\xfe\x0e\x0f\x00SomeMoreAclData' - ) - ) + payload=b'\xfe\x0e\x0f\x00SomeMoreAclData')) cert_acl_data_asserts.assert_event_occurs( - lambda packet: b'\xfe\x0e\x0b\x00SomeAclData' in packet.payload - ) + lambda packet: b'\xfe\x0e\x0b\x00SomeAclData' in packet.payload) acl_data_asserts.assert_event_occurs( lambda packet: b'\xfe\x0e\x0f\x00SomeMoreAclData' in packet.payload ) @@ -257,51 +224,40 @@ class SimpleHalTest(GdBaseTestClass): # Set the LE Address to 0D:05:04:03:02:01 self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D' - ) - ) + payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D')) # Add the cert device to the white list (Random 0C:05:04:03:02:01) self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x11\x20\x07\x01\x01\x02\x03\x04\x05\x0C' - ) - ) + payload=b'\x11\x20\x07\x01\x01\x02\x03\x04\x05\x0C')) # Connect using the white list self.device_under_test.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x0D\x20\x19\x11\x01\x22\x02\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00' - ) - ) + payload= + b'\x0D\x20\x19\x11\x01\x22\x02\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00' + )) # Set the LE Address to 0C:05:04:03:02:01 self.cert_device.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C' - ) - ) + payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C')) # Set LE Advertising parameters self.cert_device.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00' - ) - ) + payload= + b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00' + )) # Set LE Advertising data self.cert_device.hal.SendHciCommand( hal_facade_pb2.HciCommandPacket( - payload=b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - ) - ) + payload= + b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + )) # Enable Advertising self.cert_device.hal.SendHciCommand( - hal_facade_pb2.HciCommandPacket( - payload=b'\x0A\x20\x01\x01' - ) - ) + hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x01')) # LeConnectionComplete cert_hci_event_asserts.assert_event_occurs( - lambda packet: b'\x3e\x13\x01\x00' in packet.payload - ) + lambda packet: b'\x3e\x13\x01\x00' in packet.payload) # LeConnectionComplete hci_event_asserts.assert_event_occurs( - lambda packet: b'\x3e\x13\x01\x00' in packet.payload - ) + lambda packet: b'\x3e\x13\x01\x00' in packet.payload) diff --git a/system/gd/hci/cert/simple_hci_test.py b/system/gd/hci/cert/simple_hci_test.py index 9f90947fbb..0f0d000599 100644 --- a/system/gd/hci/cert/simple_hci_test.py +++ b/system/gd/hci/cert/simple_hci_test.py @@ -32,6 +32,7 @@ from hci import facade_pb2 as hci_facade_pb2 from hci.cert import api_pb2 as hci_cert_pb2 from hci.cert import api_pb2_grpc as hci_cert_pb2_grpc + class SimpleHciTest(GdBaseTestClass): def setup_test(self): @@ -40,28 +41,26 @@ class SimpleHciTest(GdBaseTestClass): self.device_under_test.rootservice.StartStack( facade_rootservice_pb2.StartStackRequest( - module_under_test=facade_rootservice_pb2.BluetoothModule.Value('HCI'), - ) - ) + module_under_test=facade_rootservice_pb2.BluetoothModule.Value( + 'HCI'),)) self.cert_device.rootservice.StartStack( cert_rootservice_pb2.StartStackRequest( - module_to_test=cert_rootservice_pb2.BluetoothModule.Value('HCI'), - ) - ) + module_to_test=cert_rootservice_pb2.BluetoothModule.Value( + 'HCI'),)) self.device_under_test.wait_channel_ready() self.cert_device.wait_channel_ready() self.device_under_test.hci.SetPageScanMode( - hci_facade_pb2.PageScanMode(enabled=True) - ) + hci_facade_pb2.PageScanMode(enabled=True)) self.cert_device.hci.SetPageScanMode( - hci_cert_pb2.PageScanMode(enabled=True) - ) + hci_cert_pb2.PageScanMode(enabled=True)) - dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address + dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress( + empty_pb2.Empty()).address self.device_under_test.address = dut_address - cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address + cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress( + empty_pb2.Empty()).address self.cert_device.address = cert_address self.dut_address = common_pb2.BluetoothAddress( @@ -71,35 +70,38 @@ class SimpleHciTest(GdBaseTestClass): def teardown_test(self): self.device_under_test.rootservice.StopStack( - facade_rootservice_pb2.StopStackRequest() - ) + facade_rootservice_pb2.StopStackRequest()) self.cert_device.rootservice.StopStack( - cert_rootservice_pb2.StopStackRequest() - ) + cert_rootservice_pb2.StopStackRequest()) def test_none_event(self): - with EventCallbackStream(self.device_under_test.hci.FetchConnectionComplete(empty_pb2.Empty())) as connection_complete_stream: - connection_complete_asserts = EventAsserts(connection_complete_stream) + with EventCallbackStream( + self.device_under_test.hci.FetchConnectionComplete( + empty_pb2.Empty())) as connection_complete_stream: + connection_complete_asserts = EventAsserts( + connection_complete_stream) connection_complete_asserts.assert_none() def _connect_from_dut(self): logging.debug("_connect_from_dut") policy = hci_cert_pb2.IncomingConnectionPolicy( - remote=self.dut_address, - accepted=True - ) + remote=self.dut_address, accepted=True) self.cert_device.hci.SetIncomingConnectionPolicy(policy) - with EventCallbackStream(self.device_under_test.hci.FetchConnectionComplete(empty_pb2.Empty())) as dut_connection_complete_stream: - dut_connection_complete_asserts = EventAsserts(dut_connection_complete_stream) + with EventCallbackStream( + self.device_under_test.hci.FetchConnectionComplete( + empty_pb2.Empty())) as dut_connection_complete_stream: + dut_connection_complete_asserts = EventAsserts( + dut_connection_complete_stream) self.device_under_test.hci.Connect(self.cert_address) dut_connection_complete_asserts.assert_event_occurs( - lambda event: self._get_handle(event) - ) + lambda event: self._get_handle(event)) def _disconnect_from_dut(self): logging.debug("_disconnect_from_dut") - with EventCallbackStream(self.device_under_test.hci.FetchDisconnection(empty_pb2.Empty())) as dut_disconnection_stream: + with EventCallbackStream( + self.device_under_test.hci.FetchDisconnection( + empty_pb2.Empty())) as dut_disconnection_stream: dut_disconnection_asserts = EventAsserts(dut_disconnection_stream) self.device_under_test.hci.Disconnect(self.cert_address) dut_disconnection_asserts.assert_event_occurs( @@ -114,15 +116,17 @@ class SimpleHciTest(GdBaseTestClass): def test_connect_disconnect_send_acl(self): self._connect_from_dut() - with EventCallbackStream(self.cert_device.hci.FetchAclData(empty_pb2.Empty())) as cert_acl_stream: + with EventCallbackStream( + self.cert_device.hci.FetchAclData( + empty_pb2.Empty())) as cert_acl_stream: cert_acl_asserts = EventAsserts(cert_acl_stream) - acl_data = hci_facade_pb2.AclData(remote=self.cert_address, payload=b'123') + acl_data = hci_facade_pb2.AclData( + remote=self.cert_address, payload=b'123') self.device_under_test.hci.SendAclData(acl_data) self.device_under_test.hci.SendAclData(acl_data) self.device_under_test.hci.SendAclData(acl_data) cert_acl_asserts.assert_event_occurs( - lambda packet : b'123' in packet.payload - and packet.remote == self.dut_address + lambda packet: b'123' in packet.payload and packet.remote == self.dut_address ) self._disconnect_from_dut() @@ -130,67 +134,74 @@ class SimpleHciTest(GdBaseTestClass): def test_connect_disconnect_receive_acl(self): self._connect_from_dut() - with EventCallbackStream(self.device_under_test.hci.FetchAclData(empty_pb2.Empty())) as dut_acl_stream: + with EventCallbackStream( + self.device_under_test.hci.FetchAclData( + empty_pb2.Empty())) as dut_acl_stream: dut_acl_asserts = EventAsserts(dut_acl_stream) - acl_data = hci_cert_pb2.AclData(remote=self.dut_address, payload=b'123') + acl_data = hci_cert_pb2.AclData( + remote=self.dut_address, payload=b'123') self.cert_device.hci.SendAclData(acl_data) self.cert_device.hci.SendAclData(acl_data) self.cert_device.hci.SendAclData(acl_data) dut_acl_asserts.assert_event_occurs( - lambda packet : b'123' in packet.payload - and packet.remote == self.cert_address + lambda packet: b'123' in packet.payload and packet.remote == self.cert_address ) self._disconnect_from_dut() def test_reject_connection_request(self): - with EventCallbackStream(self.device_under_test.hci.FetchConnectionFailed(empty_pb2.Empty())) as dut_connection_failed_stream: - dut_connection_failed_asserts = EventAsserts(dut_connection_failed_stream) + with EventCallbackStream( + self.device_under_test.hci.FetchConnectionFailed( + empty_pb2.Empty())) as dut_connection_failed_stream: + dut_connection_failed_asserts = EventAsserts( + dut_connection_failed_stream) self.device_under_test.hci.Connect(self.cert_address) dut_connection_failed_asserts.assert_event_occurs( - lambda event : event.remote == self.cert_address - ) + lambda event: event.remote == self.cert_address) def test_send_classic_security_command(self): self._connect_from_dut() - with EventCallbackStream(self.device_under_test.hci_classic_security.FetchCommandCompleteEvent(empty_pb2.Empty())) as dut_command_complete_stream: - dut_command_complete_asserts = EventAsserts(dut_command_complete_stream) + with EventCallbackStream( + self.device_under_test.hci_classic_security. + FetchCommandCompleteEvent( + empty_pb2.Empty())) as dut_command_complete_stream: + dut_command_complete_asserts = EventAsserts( + dut_command_complete_stream) - self.device_under_test.hci.AuthenticationRequested(self.cert_address) + self.device_under_test.hci.AuthenticationRequested( + self.cert_address) # Link request - self.device_under_test.hci_classic_security.LinkKeyRequestNegativeReply(self.cert_address) + self.device_under_test.hci_classic_security.LinkKeyRequestNegativeReply( + self.cert_address) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x040c - ) + lambda event: event.command_opcode == 0x040c) # Pin code request message = hci_facade_pb2.PinCodeRequestReplyMessage( remote=self.cert_address, len=4, - pin_code=bytes("1234", encoding = "ASCII") - ) - self.device_under_test.hci_classic_security.PinCodeRequestReply(message) + pin_code=bytes("1234", encoding="ASCII")) + self.device_under_test.hci_classic_security.PinCodeRequestReply( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x040d - ) - self.device_under_test.hci_classic_security.PinCodeRequestNegativeReply(self.cert_address) + lambda event: event.command_opcode == 0x040d) + self.device_under_test.hci_classic_security.PinCodeRequestNegativeReply( + self.cert_address) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x040e - ) + lambda event: event.command_opcode == 0x040e) # IO capability request message = hci_facade_pb2.IoCapabilityRequestReplyMessage( remote=self.cert_address, io_capability=0, oob_present=0, - authentication_requirements=0 - ) - self.device_under_test.hci_classic_security.IoCapabilityRequestReply(message) + authentication_requirements=0) + self.device_under_test.hci_classic_security.IoCapabilityRequestReply( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x042b - ) + lambda event: event.command_opcode == 0x042b) # message = hci_facade_pb2.IoCapabilityRequestNegativeReplyMessage( # remote=self.cert_address, @@ -200,135 +211,136 @@ class SimpleHciTest(GdBaseTestClass): # self.device_under_test.hci_classic_security.IoCapabilityRequestNegativeReply(message) # User confirm request - self.device_under_test.hci_classic_security.UserConfirmationRequestReply(self.cert_address) + self.device_under_test.hci_classic_security.UserConfirmationRequestReply( + self.cert_address) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x042c - ) + lambda event: event.command_opcode == 0x042c) message = hci_facade_pb2.LinkKeyRequestReplyMessage( remote=self.cert_address, - link_key=bytes("4C68384139F574D836BCF34E9DFB01BF", encoding = "ASCII") - ) - self.device_under_test.hci_classic_security.LinkKeyRequestReply(message) + link_key=bytes( + "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII")) + self.device_under_test.hci_classic_security.LinkKeyRequestReply( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x040b - ) + lambda event: event.command_opcode == 0x040b) - self.device_under_test.hci_classic_security.UserConfirmationRequestNegativeReply(self.cert_address) + self.device_under_test.hci_classic_security.UserConfirmationRequestNegativeReply( + self.cert_address) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x042d - ) + lambda event: event.command_opcode == 0x042d) # User passkey request message = hci_facade_pb2.UserPasskeyRequestReplyMessage( remote=self.cert_address, passkey=999999, ) - self.device_under_test.hci_classic_security.UserPasskeyRequestReply(message) + self.device_under_test.hci_classic_security.UserPasskeyRequestReply( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x042e - ) + lambda event: event.command_opcode == 0x042e) - self.device_under_test.hci_classic_security.UserPasskeyRequestNegativeReply(self.cert_address) + self.device_under_test.hci_classic_security.UserPasskeyRequestNegativeReply( + self.cert_address) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x042f - ) + lambda event: event.command_opcode == 0x042f) # Remote OOB data request message = hci_facade_pb2.RemoteOobDataRequestReplyMessage( remote=self.cert_address, - c=b'\x19\x20\x21\x22\x23\x24\x25\x26\x19\x20\x21\x22\x23\x24\x25\x26', - r=b'\x30\x31\x32\x33\x34\x35\x36\x37\x30\x31\x32\x33\x34\x35\x36\x37', + c= + b'\x19\x20\x21\x22\x23\x24\x25\x26\x19\x20\x21\x22\x23\x24\x25\x26', + r= + b'\x30\x31\x32\x33\x34\x35\x36\x37\x30\x31\x32\x33\x34\x35\x36\x37', ) - self.device_under_test.hci_classic_security.RemoteOobDataRequestReply(message) + self.device_under_test.hci_classic_security.RemoteOobDataRequestReply( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0430 - ) - self.device_under_test.hci_classic_security.RemoteOobDataRequestNegativeReply(self.cert_address) + lambda event: event.command_opcode == 0x0430) + self.device_under_test.hci_classic_security.RemoteOobDataRequestNegativeReply( + self.cert_address) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0433 - ) + lambda event: event.command_opcode == 0x0433) # Read/Write/Delete link key message = hci_facade_pb2.ReadStoredLinkKeyMessage( remote=self.cert_address, - read_all_flag = 0, + read_all_flag=0, ) - self.device_under_test.hci_classic_security.ReadStoredLinkKey(message) + self.device_under_test.hci_classic_security.ReadStoredLinkKey( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c0d - ) + lambda event: event.command_opcode == 0x0c0d) message = hci_facade_pb2.WriteStoredLinkKeyMessage( num_keys_to_write=1, remote=self.cert_address, - link_keys=bytes("4C68384139F574D836BCF34E9DFB01BF", encoding = "ASCII"), + link_keys=bytes( + "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII"), ) - self.device_under_test.hci_classic_security.WriteStoredLinkKey(message) + self.device_under_test.hci_classic_security.WriteStoredLinkKey( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c11 - ) + lambda event: event.command_opcode == 0x0c11) message = hci_facade_pb2.DeleteStoredLinkKeyMessage( remote=self.cert_address, - delete_all_flag = 0, + delete_all_flag=0, ) - self.device_under_test.hci_classic_security.DeleteStoredLinkKey(message) + self.device_under_test.hci_classic_security.DeleteStoredLinkKey( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c12 - ) + lambda event: event.command_opcode == 0x0c12) # Refresh Encryption Key message = hci_facade_pb2.RefreshEncryptionKeyMessage( - connection_handle=self.connection_handle, - ) - self.device_under_test.hci_classic_security.RefreshEncryptionKey(message) + connection_handle=self.connection_handle,) + self.device_under_test.hci_classic_security.RefreshEncryptionKey( + message) # Read/Write Simple Pairing Mode - self.device_under_test.hci_classic_security.ReadSimplePairingMode(empty_pb2.Empty()) + self.device_under_test.hci_classic_security.ReadSimplePairingMode( + empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c55 - ) + lambda event: event.command_opcode == 0x0c55) message = hci_facade_pb2.WriteSimplePairingModeMessage( - simple_pairing_mode=1, - ) - self.device_under_test.hci_classic_security.WriteSimplePairingMode(message) + simple_pairing_mode=1,) + self.device_under_test.hci_classic_security.WriteSimplePairingMode( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c56 - ) + lambda event: event.command_opcode == 0x0c56) # Read local oob data - self.device_under_test.hci_classic_security.ReadLocalOobData(empty_pb2.Empty()) + self.device_under_test.hci_classic_security.ReadLocalOobData( + empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c57 - ) + lambda event: event.command_opcode == 0x0c57) # Send keypress notification message = hci_facade_pb2.SendKeypressNotificationMessage( remote=self.cert_address, notification_type=1, ) - self.device_under_test.hci_classic_security.SendKeypressNotification(message) + self.device_under_test.hci_classic_security.SendKeypressNotification( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c60 - ) + lambda event: event.command_opcode == 0x0c60) # Read local oob extended data - self.device_under_test.hci_classic_security.ReadLocalOobExtendedData(empty_pb2.Empty()) + self.device_under_test.hci_classic_security.ReadLocalOobExtendedData( + empty_pb2.Empty()) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x0c7d - ) + lambda event: event.command_opcode == 0x0c7d) # Read Encryption key size message = hci_facade_pb2.ReadEncryptionKeySizeMessage( - connection_handle=self.connection_handle, - ) - self.device_under_test.hci_classic_security.ReadEncryptionKeySize(message) + connection_handle=self.connection_handle,) + self.device_under_test.hci_classic_security.ReadEncryptionKeySize( + message) dut_command_complete_asserts.assert_event_occurs( - lambda event: event.command_opcode == 0x1408 - ) + lambda event: event.command_opcode == 0x1408) self._disconnect_from_dut() @@ -340,5 +352,6 @@ class SimpleHciTest(GdBaseTestClass): def test_classic_connection_management_command(self): self._connect_from_dut() - self.device_under_test.hci.TestClassicConnectionManagementCommands(self.cert_address) - self._disconnect_from_dut()
\ No newline at end of file + self.device_under_test.hci.TestClassicConnectionManagementCommands( + self.cert_address) + self._disconnect_from_dut() diff --git a/system/gd/l2cap/classic/cert/pts_l2cap_test.py b/system/gd/l2cap/classic/cert/pts_l2cap_test.py index 2b2fe6446a..f4d026a6d3 100644 --- a/system/gd/l2cap/classic/cert/pts_l2cap_test.py +++ b/system/gd/l2cap/classic/cert/pts_l2cap_test.py @@ -26,18 +26,19 @@ from google.protobuf import empty_pb2 class PTSL2capTest(PTSBaseTestClass): + def setup_test(self): self.device_under_test = self.gd_devices[0] self.device_under_test.rootservice.StartStack( facade_rootservice_pb2.StartStackRequest( - module_under_test=facade_rootservice_pb2.BluetoothModule.Value('L2CAP'), - ) - ) + module_under_test=facade_rootservice_pb2.BluetoothModule.Value( + 'L2CAP'),)) self.device_under_test.wait_channel_ready() - dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address + dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress( + empty_pb2.Empty()).address pts_address = self.controller_configs.get('pts_address').lower() self.device_under_test.address = dut_address @@ -48,26 +49,28 @@ class PTSL2capTest(PTSBaseTestClass): def teardown_test(self): self.device_under_test.rootservice.StopStack( - facade_rootservice_pb2.StopStackRequest() - ) + facade_rootservice_pb2.StopStackRequest()) def _dut_connection_stream(self): - return EventCallbackStream(self.device_under_test.l2cap.FetchConnectionComplete(empty_pb2.Empty())) + return EventCallbackStream( + self.device_under_test.l2cap.FetchConnectionComplete( + empty_pb2.Empty())) def _dut_connection_close_stream(self): - return EventCallbackStream(self.device_under_test.l2cap.FetchConnectionClose(empty_pb2.Empty())) + return EventCallbackStream( + self.device_under_test.l2cap.FetchConnectionClose( + empty_pb2.Empty())) def _assert_connection_complete(self, due_connection_asserts, timeout=30): due_connection_asserts.assert_event_occurs( - lambda device : device.remote.address == self.pts_address.address, - timeout=timedelta(seconds=timeout) - ) + lambda device: device.remote.address == self.pts_address.address, + timeout=timedelta(seconds=timeout)) - def _assert_connection_close(self, due_connection_close_asserts, timeout=30): + def _assert_connection_close(self, due_connection_close_asserts, + timeout=30): due_connection_close_asserts.assert_event_occurs( - lambda device : device.remote.address == self.pts_address.address, - timeout=timedelta(seconds=timeout) - ) + lambda device: device.remote.address == self.pts_address.address, + timeout=timedelta(seconds=timeout)) def test_L2CAP_COS_CED_BV_01_C(self): """ @@ -78,13 +81,17 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.pts_address, psm=psm)) + self.device_under_test.l2cap.OpenChannel( + l2cap_facade_pb2.OpenChannelRequest( + remote=self.pts_address, psm=psm)) self._assert_connection_complete(due_connection_asserts) - self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm)) + self.device_under_test.l2cap.CloseChannel( + l2cap_facade_pb2.CloseChannelRequest(psm=psm)) self._assert_connection_close(due_connection_close_asserts) def test_L2CAP_COS_CED_BV_03_C(self): @@ -95,14 +102,20 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_complete(due_connection_asserts) - self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'*34)) + self.device_under_test.l2cap.SendDynamicChannelPacket( + l2cap_facade_pb2.DynamicChannelPacket( + psm=psm, payload=b'abc' * 34)) self._assert_connection_close(due_connection_close_asserts) def test_L2CAP_COS_CED_BV_04_C(self): @@ -113,14 +126,19 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_complete(due_connection_asserts) time.sleep(2) - self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm)) + self.device_under_test.l2cap.CloseChannel( + l2cap_facade_pb2.CloseChannelRequest(psm=psm)) self._assert_connection_close(due_connection_close_asserts) def test_L2CAP_COS_CED_BV_05_C(self): @@ -131,11 +149,15 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_complete(due_connection_asserts) self._assert_connection_close(due_connection_close_asserts) @@ -147,11 +169,15 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_complete(due_connection_asserts) self._assert_connection_close(due_connection_close_asserts) @@ -163,11 +189,15 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) time.sleep(120) @@ -179,11 +209,15 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_complete(due_connection_asserts) self._assert_connection_close(due_connection_close_asserts) @@ -195,11 +229,15 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_complete(due_connection_asserts) self._assert_connection_close(due_connection_close_asserts) @@ -211,11 +249,15 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_complete(due_connection_asserts) time.sleep(5) @@ -228,11 +270,15 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.BASIC)) self._assert_connection_close(due_connection_close_asserts) def test_L2CAP_COS_CFD_BV_08_C(self): @@ -244,15 +290,18 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.pts_address, psm=psm)) + self.device_under_test.l2cap.OpenChannel( + l2cap_facade_pb2.OpenChannelRequest( + remote=self.pts_address, psm=psm)) self._assert_connection_complete(due_connection_asserts) - self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm)) + self.device_under_test.l2cap.CloseChannel( + l2cap_facade_pb2.CloseChannelRequest(psm=psm)) self._assert_connection_close(due_connection_close_asserts) - def test_L2CAP_ERM_BI_01_C(self): """ L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted] @@ -262,10 +311,14 @@ class PTSL2capTest(PTSBaseTestClass): with self._dut_connection_stream() as dut_connection_stream, \ self._dut_connection_close_stream() as dut_connection_close_stream: due_connection_asserts = EventAsserts(dut_connection_stream) - due_connection_close_asserts = EventAsserts(dut_connection_close_stream) + due_connection_close_asserts = EventAsserts( + dut_connection_close_stream) psm = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest( - psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.ERTM)) self._assert_connection_complete(due_connection_asserts) self._pending_connection_close(timeout=60) diff --git a/system/gd/l2cap/classic/cert/simple_l2cap_test.py b/system/gd/l2cap/classic/cert/simple_l2cap_test.py index b14f5abc2e..cdc0e752c8 100644 --- a/system/gd/l2cap/classic/cert/simple_l2cap_test.py +++ b/system/gd/l2cap/classic/cert/simple_l2cap_test.py @@ -36,57 +36,69 @@ ASYNC_OP_TIME_SECONDS = 1 # TODO: Use events to synchronize events instead def is_connection_request(log): return log.HasField("connection_request") + def is_connection_response(log): return log.HasField("connection_response") + def is_configuration_request(log): return log.HasField("configuration_request") + def is_configuration_response(log): return log.HasField("configuration_response") + def is_disconnection_request(log): return log.HasField("disconnection_request") + def is_disconnection_response(log): return log.HasField("disconnection_response") + def is_echo_response(log): return log.HasField("echo_response") + def is_information_request(log): return log.HasField("information_request") + def is_information_response(log): return log.HasField("information_response") + def is_command_reject(log): return log.HasField("command_reject") + def basic_frame_to_enhanced_information_frame(information_payload): return information_payload[2:] + class SimpleL2capTest(GdBaseTestClass): + def setup_test(self): self.device_under_test = self.gd_devices[0] self.cert_device = self.gd_cert_devices[0] self.device_under_test.rootservice.StartStack( facade_rootservice_pb2.StartStackRequest( - module_under_test=facade_rootservice_pb2.BluetoothModule.Value('L2CAP'), - ) - ) + module_under_test=facade_rootservice_pb2.BluetoothModule.Value( + 'L2CAP'),)) self.cert_device.rootservice.StartStack( cert_rootservice_pb2.StartStackRequest( - module_to_test=cert_rootservice_pb2.BluetoothModule.Value('L2CAP'), - ) - ) + module_to_test=cert_rootservice_pb2.BluetoothModule.Value( + 'L2CAP'),)) self.device_under_test.wait_channel_ready() self.cert_device.wait_channel_ready() - dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address + dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress( + empty_pb2.Empty()).address self.device_under_test.address = dut_address - cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address + cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress( + empty_pb2.Empty()).address self.cert_device.address = cert_address self.dut_address = common_pb2.BluetoothAddress( @@ -100,97 +112,133 @@ class SimpleL2capTest(GdBaseTestClass): def teardown_test(self): self.device_under_test.rootservice.StopStack( - facade_rootservice_pb2.StopStackRequest() - ) + facade_rootservice_pb2.StopStackRequest()) self.cert_device.rootservice.StopStack( - cert_rootservice_pb2.StopStackRequest() - ) + cert_rootservice_pb2.StopStackRequest()) def _register_callbacks(self, event_callback_stream): + def handle_connection_request(log): log = log.connection_request - self.cert_device.l2cap.SendConnectionResponse(l2cap_cert_pb2.ConnectionResponse(dcid=self.next_scid,scid=log.scid, - signal_id=log.signal_id)) + self.cert_device.l2cap.SendConnectionResponse( + l2cap_cert_pb2.ConnectionResponse( + dcid=self.next_scid, scid=log.scid, + signal_id=log.signal_id)) self.scid_dcid_map[self.next_scid] = log.scid self.next_scid += 1 - self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest( - dcid=log.scid, - signal_id=log.signal_id+1, - retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig( - mode=self.retransmission_mode - ))) + self.cert_device.l2cap.SendConfigurationRequest( + l2cap_cert_pb2.ConfigurationRequest( + dcid=log.scid, + signal_id=log.signal_id + 1, + retransmission_config=l2cap_cert_pb2. + ChannelRetransmissionFlowControlConfig( + mode=self.retransmission_mode))) + self.handle_connection_request = handle_connection_request - event_callback_stream.register_callback(self.handle_connection_request, matcher_fn=is_connection_request) + event_callback_stream.register_callback( + self.handle_connection_request, matcher_fn=is_connection_request) def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid - self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest( - dcid=log.dcid, - signal_id=log.signal_id+1, - retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig( - mode=self.retransmission_mode - ))) + self.cert_device.l2cap.SendConfigurationRequest( + l2cap_cert_pb2.ConfigurationRequest( + dcid=log.dcid, + signal_id=log.signal_id + 1, + retransmission_config=l2cap_cert_pb2. + ChannelRetransmissionFlowControlConfig( + mode=self.retransmission_mode))) + self.handle_connection_request = handle_connection_request - event_callback_stream.register_callback(self.handle_connection_response, matcher_fn=is_connection_response) + event_callback_stream.register_callback( + self.handle_connection_response, matcher_fn=is_connection_response) def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: return dcid = self.scid_dcid_map[log.dcid] - self.cert_device.l2cap.SendConfigurationResponse(l2cap_cert_pb2.ConfigurationResponse( - scid=dcid, - signal_id=log.signal_id, - retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig(mode=self.retransmission_mode) - )) + self.cert_device.l2cap.SendConfigurationResponse( + l2cap_cert_pb2.ConfigurationResponse( + scid=dcid, + signal_id=log.signal_id, + retransmission_config=l2cap_cert_pb2. + ChannelRetransmissionFlowControlConfig( + mode=self.retransmission_mode))) + self.handle_configuration_request = handle_configuration_request - event_callback_stream.register_callback(self.handle_configuration_request, matcher_fn=is_configuration_request) + event_callback_stream.register_callback( + self.handle_configuration_request, + matcher_fn=is_configuration_request) def handle_disconnection_request(log): log = log.disconnection_request - self.cert_device.l2cap.SendDisconnectionResponse(l2cap_cert_pb2.DisconnectionResponse(dcid=log.dcid,scid=log.scid, - signal_id=log.signal_id)) + self.cert_device.l2cap.SendDisconnectionResponse( + l2cap_cert_pb2.DisconnectionResponse( + dcid=log.dcid, scid=log.scid, signal_id=log.signal_id)) + self.handle_disconnection_request = handle_disconnection_request - event_callback_stream.register_callback(self.handle_disconnection_request, matcher_fn=is_disconnection_request) + event_callback_stream.register_callback( + self.handle_disconnection_request, + matcher_fn=is_disconnection_request) def handle_information_request(log): log = log.information_request - self.cert_device.l2cap.SendInformationResponse(l2cap_cert_pb2.InformationResponse(type=log.type, - signal_id=log.signal_id)) + self.cert_device.l2cap.SendInformationResponse( + l2cap_cert_pb2.InformationResponse( + type=log.type, signal_id=log.signal_id)) + self.handle_information_request = handle_information_request - event_callback_stream.register_callback(self.handle_information_request, matcher_fn=is_information_request) + event_callback_stream.register_callback( + self.handle_information_request, matcher_fn=is_information_request) self.event_dump = [] + def dump_log(log): self.event_dump.append(log) + self.dump_log = dump_log event_callback_stream.register_callback(self.dump_log) def _setup_link(self, event_asserts): - self.cert_device.l2cap.SetupLink(l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address)) - event_asserts.assert_event_occurs(lambda log : log.HasField("link_up") and log.remote == self.dut_address) + self.cert_device.l2cap.SetupLink( + l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address)) + event_asserts.assert_event_occurs( + lambda log: log.HasField("link_up") and log.remote == self.dut_address + ) def _open_channel(self, event_asserts, scid=0x0101, psm=0x33): - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) - self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) - event_asserts.assert_event_occurs(lambda log : is_configuration_response(log) and scid == log.scid) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) + self.cert_device.l2cap.SendConnectionRequest( + l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) + event_asserts.assert_event_occurs( + lambda log: is_configuration_response(log) and scid == log.scid) def test_connect(self): - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) self._open_channel(l2cap_event_asserts, scid=0x0101) def test_connect_and_send_data_ertm_no_segmentation(self): - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM - self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2)) - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)) + self.device_under_test.l2cap.RegisterChannel( + l2cap_facade_pb2.RegisterChannelRequest(channel=2)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=0x33, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.ERTM)) self._setup_link(l2cap_event_asserts) scid = 0x0101 @@ -199,42 +247,68 @@ class SimpleL2capTest(GdBaseTestClass): def on_data_received(log): packet = log.data_packet if (packet.channel == scid): - self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=1, s=0)) - l2cap_log_stream.register_callback(on_data_received, matcher_fn=lambda log : log.HasField("data_packet")) + self.cert_device.l2cap.SendSFrame( + l2cap_cert_pb2.SFrame( + channel=self.scid_dcid_map[scid], req_seq=1, s=0)) + + l2cap_log_stream.register_callback( + on_data_received, + matcher_fn=lambda log: log.HasField("data_packet")) - self.device_under_test.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123")) + self.device_under_test.l2cap.SendL2capPacket( + l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123")) l2cap_event_asserts.assert_event_occurs( lambda log : log.HasField("data_packet") and \ log.data_packet.channel == 2 and \ basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b"123") - self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'*34)) - self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=1, tx_seq=0, sar=0, information=b"abcd")) + self.device_under_test.l2cap.SendDynamicChannelPacket( + l2cap_facade_pb2.DynamicChannelPacket( + psm=0x33, payload=b'abc' * 34)) + self.cert_device.l2cap.SendIFrame( + l2cap_cert_pb2.IFrame( + channel=self.scid_dcid_map[scid], + req_seq=1, + tx_seq=0, + sar=0, + information=b"abcd")) l2cap_event_asserts.assert_event_occurs( lambda log : log.HasField("data_packet") and \ log.data_packet.channel == scid and \ basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b"abc"*34) def test_connect_and_send_data(self): - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) - self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2)) - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33)) + self.device_under_test.l2cap.RegisterChannel( + l2cap_facade_pb2.RegisterChannelRequest(channel=2)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33)) self._setup_link(l2cap_event_asserts) scid = 0x0101 self._open_channel(l2cap_event_asserts, scid=scid) - self.device_under_test.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123")) - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 2 and log.data_packet.payload == b"123") + self.device_under_test.l2cap.SendL2capPacket( + l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123")) + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == 2 and log.data_packet.payload == b"123" + ) - self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b"abc") + self.device_under_test.l2cap.SendDynamicChannelPacket( + l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b"abc" + ) def test_open_two_channels(self): - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) @@ -245,21 +319,29 @@ class SimpleL2capTest(GdBaseTestClass): """ L2CAP/COS/CED/BV-07-C """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) - scid=0x0101 + scid = 0x0101 self._open_channel(l2cap_event_asserts, scid=scid, psm=0x1) dcid = self.scid_dcid_map[scid] - self.cert_device.l2cap.SendDisconnectionRequest(l2cap_cert_pb2.DisconnectionRequest(scid=scid, dcid=dcid, signal_id=2)) - l2cap_event_asserts.assert_event_occurs(lambda log : is_disconnection_response(log) and log.disconnection_response.scid == scid and log.disconnection_response.dcid == dcid) + self.cert_device.l2cap.SendDisconnectionRequest( + l2cap_cert_pb2.DisconnectionRequest( + scid=scid, dcid=dcid, signal_id=2)) + l2cap_event_asserts.assert_event_occurs( + lambda log: is_disconnection_response(log) and log.disconnection_response.scid == scid and log.disconnection_response.dcid == dcid + ) def test_disconnect_on_timeout(self): """ L2CAP/COS/CED/BV-08-C """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) @@ -267,12 +349,16 @@ class SimpleL2capTest(GdBaseTestClass): psm = 1 self._open_channel(l2cap_event_asserts, scid=0x0101, psm=0x1) - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm)) # Don't send configuration response back - l2cap_log_stream.unregister_callback(self.handle_configuration_request, matcher_fn=is_configuration_request) + l2cap_log_stream.unregister_callback( + self.handle_configuration_request, + matcher_fn=is_configuration_request) - self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) + self.cert_device.l2cap.SendConnectionRequest( + l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) l2cap_event_asserts.assert_none_matching(is_configuration_response) def test_basic_operation_request_connection(self): @@ -281,66 +367,88 @@ class SimpleL2capTest(GdBaseTestClass): Verify that the IUT is able to request the connection establishment for an L2CAP data channel and initiate the configuration procedure. """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) psm = 1 # TODO: Use another test case - self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=psm)) - l2cap_event_asserts.assert_event_occurs(lambda log : is_connection_request(log) and log.connection_request.psm == psm) + self.device_under_test.l2cap.OpenChannel( + l2cap_facade_pb2.OpenChannelRequest( + remote=self.cert_address, psm=psm)) + l2cap_event_asserts.assert_event_occurs( + lambda log: is_connection_request(log) and log.connection_request.psm == psm + ) def test_respond_to_echo_request(self): """ L2CAP/COS/ECH/BV-01-C [Respond to Echo Request] Verify that the IUT responds to an echo request. """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) # TODO: Replace with constructed packets when PDL is available echo_request_packet = b"\x08\x01\x00\x00" - self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=echo_request_packet)) - l2cap_event_asserts.assert_event_occurs(lambda log : is_echo_response(log) and log.echo_response.signal_id == 0x01) + self.cert_device.l2cap.SendL2capPacket( + l2cap_facade_pb2.L2capPacket( + channel=1, payload=echo_request_packet)) + l2cap_event_asserts.assert_event_occurs( + lambda log: is_echo_response(log) and log.echo_response.signal_id == 0x01 + ) def test_reject_unknown_command(self): """ L2CAP/COS/CED/BI-01-C """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) # TODO: Replace with constructed packets when PDL is available invalid_command_packet = b"\xff\x01\x00\x00" - self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=invalid_command_packet)) + self.cert_device.l2cap.SendL2capPacket( + l2cap_facade_pb2.L2capPacket( + channel=1, payload=invalid_command_packet)) # command_reject_packet = b"\x01\x01\x02\x00\x00\x00" - l2cap_event_asserts.assert_event_occurs(lambda log : is_command_reject(log) and log.command_reject.signal_id == 0x01) + l2cap_event_asserts.assert_event_occurs( + lambda log: is_command_reject(log) and log.command_reject.signal_id == 0x01 + ) def test_query_for_1_2_features(self): """ L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features] """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) signal_id = 3 self.cert_device.l2cap.SendInformationRequest( l2cap_cert_pb2.InformationRequest( - type=l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS, signal_id=signal_id)) + type=l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS, + signal_id=signal_id)) l2cap_event_asserts.assert_event_occurs( lambda log : is_information_response(log) and \ log.information_response.signal_id == signal_id and \ log.information_response.type == l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS) - def test_extended_feature_info_response_ertm(self): """ L2CAP/EXF/BV-01-C [Extended Features Information Response for Enhanced Retransmission Mode] """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) @@ -349,9 +457,12 @@ class SimpleL2capTest(GdBaseTestClass): signal_id = 3 self.cert_device.l2cap.SendInformationRequest( l2cap_cert_pb2.InformationRequest( - type=l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES, signal_id=signal_id)) + type=l2cap_cert_pb2.InformationRequestType. + EXTENDED_FEATURES, + signal_id=signal_id)) - l2cap_event_asserts_alt.assert_event_occurs_at_most_times(is_information_response, 1) + l2cap_event_asserts_alt.assert_event_occurs_at_most_times( + is_information_response, 1) expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES expected_mask = 1 << 3 @@ -365,54 +476,88 @@ class SimpleL2capTest(GdBaseTestClass): """ L2CAP/ERM/BV-01-C [Transmit I-frames] """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM - self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2)) - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)) + self.device_under_test.l2cap.RegisterChannel( + l2cap_facade_pb2.RegisterChannelRequest(channel=2)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=0x33, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.ERTM)) self._setup_link(l2cap_event_asserts) scid = 0x0101 self._open_channel(l2cap_event_asserts, scid=scid) - self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) - self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=1, s=0)) - self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) - self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=2, s=0)) - self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) - self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=3, s=0)) - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc') - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc') - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc') - l2cap_event_asserts_alt.assert_event_occurs_at_most(lambda log : log.HasField("data_packet"), 3) + self.device_under_test.l2cap.SendDynamicChannelPacket( + l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) + self.cert_device.l2cap.SendSFrame( + l2cap_cert_pb2.SFrame( + channel=self.scid_dcid_map[scid], req_seq=1, s=0)) + self.device_under_test.l2cap.SendDynamicChannelPacket( + l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) + self.cert_device.l2cap.SendSFrame( + l2cap_cert_pb2.SFrame( + channel=self.scid_dcid_map[scid], req_seq=2, s=0)) + self.device_under_test.l2cap.SendDynamicChannelPacket( + l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) + self.cert_device.l2cap.SendSFrame( + l2cap_cert_pb2.SFrame( + channel=self.scid_dcid_map[scid], req_seq=3, s=0)) + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' + ) + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' + ) + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc' + ) + l2cap_event_asserts_alt.assert_event_occurs_at_most( + lambda log: log.HasField("data_packet"), 3) def test_s_frame_transmissions_exceed_max_transmit(self): """ L2CAP/ERM/BV-11-C [S-Frame Transmissions Exceed MaxTransmit] """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM - self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2)) - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)) + self.device_under_test.l2cap.RegisterChannel( + l2cap_facade_pb2.RegisterChannelRequest(channel=2)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=0x33, + retransmission_mode=l2cap_facade_pb2. + RetransmissionFlowControlMode.ERTM)) self._setup_link(l2cap_event_asserts) scid = 0x0101 self._open_channel(l2cap_event_asserts, scid=scid) - self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) + self.device_under_test.l2cap.SendDynamicChannelPacket( + l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) # Retransmission timer = 1, 1 * monitor timer = 2, so total timeout is 3 time.sleep(4) l2cap_event_asserts.assert_event_occurs(lambda log : is_disconnection_request(log) and \ log.disconnection_request.dcid == scid and \ log.disconnection_request.scid == self.scid_dcid_map[scid]) - l2cap_event_asserts_alt.assert_event_occurs_at_most(lambda log : is_disconnection_request(log), 1) + l2cap_event_asserts_alt.assert_event_occurs_at_most( + lambda log: is_disconnection_request(log), 1) def test_sent_rej_lost(self): """ L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted] """ - with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream: + with EventCallbackStream( + self.cert_device.l2cap.FetchL2capLog( + empty_pb2.Empty())) as l2cap_log_stream: l2cap_event_asserts = EventAsserts(l2cap_log_stream) self._register_callbacks(l2cap_log_stream) self._setup_link(l2cap_event_asserts) @@ -422,23 +567,36 @@ class SimpleL2capTest(GdBaseTestClass): psm = 1 mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM self.tx_window = 1 - self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode)) - self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) + self.device_under_test.l2cap.SetDynamicChannel( + l2cap_facade_pb2.SetEnableDynamicChannelRequest( + psm=psm, retransmission_mode=mode)) + self.cert_device.l2cap.SendConnectionRequest( + l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm)) + + l2cap_log_stream.unregister_callback( + self.handle_Connection_response, + matcher_fn=is_configuration_response) - l2cap_log_stream.unregister_callback(self.handle_Connection_response, matcher_fn=is_configuration_response) def handle_connection_response(log): log = log.connection_response self.scid_dcid_map[log.scid] = log.dcid - self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest( - dcid= self.scid_dcid_map[scid], - signal_id=signal_id + 1, - retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig( - mode=mode - ))) - l2cap_log_stream.unregister_callback(handle_connection_response, matcher_fn=is_connection_response) - l2cap_log_stream.register_callback(handle_connection_response, matcher_fn=is_connection_response) - - l2cap_log_stream.unregister_callback(self.handle_configuration_request, matcher_fn=is_configuration_request) + self.cert_device.l2cap.SendConfigurationRequest( + l2cap_cert_pb2.ConfigurationRequest( + dcid=self.scid_dcid_map[scid], + signal_id=signal_id + 1, + retransmission_config=l2cap_cert_pb2. + ChannelRetransmissionFlowControlConfig(mode=mode))) + l2cap_log_stream.unregister_callback( + handle_connection_response, + matcher_fn=is_connection_response) + + l2cap_log_stream.register_callback( + handle_connection_response, matcher_fn=is_connection_response) + + l2cap_log_stream.unregister_callback( + self.handle_configuration_request, + matcher_fn=is_configuration_request) + def handle_configuration_request(log): log = log.configuration_request if log.dcid not in self.scid_dcid_map: @@ -446,21 +604,50 @@ class SimpleL2capTest(GdBaseTestClass): dcid = self.scid_dcid_map[log.dcid] if log.HasField("retransmission_config"): self.tx_window = log.retransmission_config.tx_window - self.cert_device.l2cap.SendConfigurationResponse(l2cap_cert_pb2.ConfigurationResponse( - scid=dcid, - signal_id=log.signal_id, + self.cert_device.l2cap.SendConfigurationResponse( + l2cap_cert_pb2.ConfigurationResponse( + scid=dcid, + signal_id=log.signal_id, )) - l2cap_log_stream.unregister_callback(handle_configuration_request, matcher_fn=is_configuration_request) - l2cap_log_stream.register_callback(handle_configuration_request, matcher_fn=is_configuration_request) - - self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=0, sar=0)) - self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(self.tx_window - 1), sar=0)) - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01') + l2cap_log_stream.unregister_callback( + handle_configuration_request, + matcher_fn=is_configuration_request) + + l2cap_log_stream.register_callback( + handle_configuration_request, + matcher_fn=is_configuration_request) + + self.cert_device.l2cap.SendIFrame( + l2cap_cert_pb2.IFrame( + channel=self.scid_dcid_map[scid], + req_seq=0, + tx_seq=0, + sar=0)) + self.cert_device.l2cap.SendIFrame( + l2cap_cert_pb2.IFrame( + channel=self.scid_dcid_map[scid], + req_seq=0, + tx_seq=(self.tx_window - 1), + sar=0)) + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01' + ) - self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0)) - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x81\x01') + self.cert_device.l2cap.SendSFrame( + l2cap_cert_pb2.SFrame( + channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0)) + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x81\x01' + ) for i in range(1, self.tx_window): - self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(i), sar=0)) + self.cert_device.l2cap.SendIFrame( + l2cap_cert_pb2.IFrame( + channel=self.scid_dcid_map[scid], + req_seq=0, + tx_seq=(i), + sar=0)) time.sleep(0.1) - l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x01\x0a')
\ No newline at end of file + l2cap_event_asserts.assert_event_occurs( + lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x01\x0a' + ) diff --git a/system/test/gen_coverage.py b/system/test/gen_coverage.py index 926c3755bd..f2658dcb6f 100755 --- a/system/test/gen_coverage.py +++ b/system/test/gen_coverage.py @@ -25,7 +25,6 @@ import sys import webbrowser from run_host_unit_tests import * - """ This script is used to generate code coverage results host supported libraries. The script by default will generate an html report that summarizes the coverage @@ -63,23 +62,28 @@ COVERAGE_TESTS = [ "covered_files": [ "packages/modules/Bluetooth/system/profile/avrcp", ], - }, { + }, + { "test_name": "bluetooth_test_sdp", "covered_files": [ "packages/modules/Bluetooth/system/profile/sdp", ], - }, { - "test_name": "test-vendor_test_host", + }, + { + "test_name": + "test-vendor_test_host", "covered_files": [ "packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/include", "packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/src", ], - }, { + }, + { "test_name": "rootcanal-packets_test_host", "covered_files": [ "packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/packets", ], - }, { + }, + { "test_name": "bluetooth_test_common", "covered_files": [ "packages/modules/Bluetooth/system/common", @@ -93,320 +97,342 @@ LLVM_DIR = 'prebuilts/clang/host/linux-x86/clang-r353983b/bin' LLVM_MERGE = LLVM_DIR + '/llvm-profdata' LLVM_COV = LLVM_DIR + '/llvm-cov' + def write_root_html_head(f): - # Write the header part of the root html file. This was pulled from the - # page source of one of the generated html files. - f.write("<!doctype html><html><head>" \ - "<meta name='viewport' content='width=device-width,initial-scale=1'><met" \ - "a charset='UTF-8'><link rel='stylesheet' type='text/css' href='style.cs" \ - "s'></head><body><h2>Coverage Report</h2><h4>Created: " + - str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M')) + - "</h4><p>Click <a href='http://clang.llvm.org/docs/SourceBasedCodeCovera" \ - "ge.html#interpreting-reports'>here</a> for information about interpreti" \ - "ng this report.</p><div class='centered'><table><tr><td class='column-e" \ - "ntry-bold'>Filename</td><td class='column-entry-bold'>Function Coverage" \ - "</td><td class='column-entry-bold'>Instantiation Coverage</td><td class" \ - "='column-entry-bold'>Line Coverage</td><td class='column-entry-bold'>Re" \ - "gion Coverage</td></tr>" - ) + # Write the header part of the root html file. This was pulled from the + # page source of one of the generated html files. + f.write("<!doctype html><html><head>" \ + "<meta name='viewport' content='width=device-width,initial-scale=1'><met" \ + "a charset='UTF-8'><link rel='stylesheet' type='text/css' href='style.cs" \ + "s'></head><body><h2>Coverage Report</h2><h4>Created: " + + str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M')) + + "</h4><p>Click <a href='http://clang.llvm.org/docs/SourceBasedCodeCovera" \ + "ge.html#interpreting-reports'>here</a> for information about interpreti" \ + "ng this report.</p><div class='centered'><table><tr><td class='column-e" \ + "ntry-bold'>Filename</td><td class='column-entry-bold'>Function Coverage" \ + "</td><td class='column-entry-bold'>Instantiation Coverage</td><td class" \ + "='column-entry-bold'>Line Coverage</td><td class='column-entry-bold'>Re" \ + "gion Coverage</td></tr>" + ) def write_root_html_column(f, covered, count): - percent = covered * 100.0 / count - value = "%.2f%% (%d/%d) " % (percent, covered, count) - color = 'column-entry-yellow' - if percent == 100: - color = 'column-entry-green' - if percent < 80.0: - color = 'column-entry-red' - f.write("<td class=\'" + color + "\'><pre>" + value + "</pre></td>") + percent = covered * 100.0 / count + value = "%.2f%% (%d/%d) " % (percent, covered, count) + color = 'column-entry-yellow' + if percent == 100: + color = 'column-entry-green' + if percent < 80.0: + color = 'column-entry-red' + f.write("<td class=\'" + color + "\'><pre>" + value + "</pre></td>") def write_root_html_rows(f, tests): - totals = { - "functions":{ - "covered": 0, - "count": 0 - }, - "instantiations":{ - "covered": 0, - "count": 0 - }, - "lines":{ - "covered": 0, - "count": 0 - }, - "regions":{ - "covered": 0, - "count": 0 - } - } - - # Write the tests with their coverage summaries. - for test in tests: - test_name = test['test_name'] - covered_files = test['covered_files'] - json_results = generate_coverage_json(test) - test_totals = json_results['data'][0]['totals'] - - f.write("<tr class='light-row'><td><pre><a href=\'" + - os.path.join(test_name, "index.html") + "\'>" + test_name + - "</a></pre></td>") + totals = { + "functions": { + "covered": 0, + "count": 0 + }, + "instantiations": { + "covered": 0, + "count": 0 + }, + "lines": { + "covered": 0, + "count": 0 + }, + "regions": { + "covered": 0, + "count": 0 + } + } + + # Write the tests with their coverage summaries. + for test in tests: + test_name = test['test_name'] + covered_files = test['covered_files'] + json_results = generate_coverage_json(test) + test_totals = json_results['data'][0]['totals'] + + f.write("<tr class='light-row'><td><pre><a href=\'" + + os.path.join(test_name, "index.html") + "\'>" + test_name + + "</a></pre></td>") + for field_name in ['functions', 'instantiations', 'lines', 'regions']: + field = test_totals[field_name] + totals[field_name]['covered'] += field['covered'] + totals[field_name]['count'] += field['count'] + write_root_html_column(f, field['covered'], field['count']) + f.write("</tr>") + + #Write the totals row. + f.write("<tr class='light-row-bold'><td><pre>Totals</a></pre></td>") for field_name in ['functions', 'instantiations', 'lines', 'regions']: - field = test_totals[field_name] - totals[field_name]['covered'] += field['covered'] - totals[field_name]['count'] += field['count'] - write_root_html_column(f, field['covered'], field['count']) - f.write("</tr>"); - - #Write the totals row. - f.write("<tr class='light-row-bold'><td><pre>Totals</a></pre></td>") - for field_name in ['functions', 'instantiations', 'lines', 'regions']: - field = totals[field_name] - write_root_html_column(f, field['covered'], field['count']) - f.write("</tr>"); + field = totals[field_name] + write_root_html_column(f, field['covered'], field['count']) + f.write("</tr>") def write_root_html_tail(f): - # Pulled from the generated html coverage report. - f.write("</table></div><h5>Generated by llvm-cov -- llvm version 7.0.2svn<" \ - "/h5></body></html>") + # Pulled from the generated html coverage report. + f.write("</table></div><h5>Generated by llvm-cov -- llvm version 7.0.2svn<" \ + "/h5></body></html>") def generate_root_html(tests): - # Copy the css file from one of the coverage reports. - source_file = os.path.join(os.path.join(WORKING_DIR, tests[0]['test_name']), "style.css") - dest_file = os.path.join(WORKING_DIR, "style.css") - shutil.copy2(source_file, dest_file) + # Copy the css file from one of the coverage reports. + source_file = os.path.join( + os.path.join(WORKING_DIR, tests[0]['test_name']), "style.css") + dest_file = os.path.join(WORKING_DIR, "style.css") + shutil.copy2(source_file, dest_file) - # Write the root index.html file that sumarizes all the tests. - f = open(os.path.join(WORKING_DIR, "index.html"), "w") - write_root_html_head(f) - write_root_html_rows(f, tests) - write_root_html_tail(f) + # Write the root index.html file that sumarizes all the tests. + f = open(os.path.join(WORKING_DIR, "index.html"), "w") + write_root_html_head(f) + write_root_html_rows(f, tests) + write_root_html_tail(f) def get_profraw_for_test(test_name): - test_root = get_native_test_root_or_die() - test_cmd = os.path.join(os.path.join(test_root, test_name), test_name) - if not os.path.isfile(test_cmd): - logging.error('The test ' + test_name + ' does not exist, please compile first') - sys.exit(1) - - profraw_file_name = test_name + ".profraw" - profraw_path = os.path.join(WORKING_DIR, os.path.join(test_name, profraw_file_name)) - llvm_env_var = "LLVM_PROFILE_FILE=\"" + profraw_path + "\"" - - test_cmd = llvm_env_var + " " + test_cmd - logging.info('Generating profraw data for ' + test_name) - logging.debug('cmd: ' + test_cmd) - if subprocess.call(test_cmd, shell=True) != 0: - logging.error('Test ' + test_name + ' failed. Please fix the test before generating coverage.') - sys.exit(1) - - if not os.path.isfile(profraw_path): - logging.error('Generating the profraw file failed. Did you remember to add the proper compiler flags to your build?') - sys.exit(1) - - return profraw_file_name + test_root = get_native_test_root_or_die() + test_cmd = os.path.join(os.path.join(test_root, test_name), test_name) + if not os.path.isfile(test_cmd): + logging.error('The test ' + test_name + + ' does not exist, please compile first') + sys.exit(1) + + profraw_file_name = test_name + ".profraw" + profraw_path = os.path.join(WORKING_DIR, + os.path.join(test_name, profraw_file_name)) + llvm_env_var = "LLVM_PROFILE_FILE=\"" + profraw_path + "\"" + + test_cmd = llvm_env_var + " " + test_cmd + logging.info('Generating profraw data for ' + test_name) + logging.debug('cmd: ' + test_cmd) + if subprocess.call(test_cmd, shell=True) != 0: + logging.error( + 'Test ' + test_name + + ' failed. Please fix the test before generating coverage.') + sys.exit(1) + + if not os.path.isfile(profraw_path): + logging.error( + 'Generating the profraw file failed. Did you remember to add the proper compiler flags to your build?' + ) + sys.exit(1) + + return profraw_file_name def merge_profraw_data(test_name): - cmd = [] - cmd.append(os.path.join(get_android_root_or_die(), LLVM_MERGE + " merge ")) + cmd = [] + cmd.append(os.path.join(get_android_root_or_die(), LLVM_MERGE + " merge ")) - test_working_dir = os.path.join(WORKING_DIR, test_name); - cmd.append(os.path.join(test_working_dir, test_name + ".profraw")) - profdata_file = os.path.join(test_working_dir, test_name + ".profdata") + test_working_dir = os.path.join(WORKING_DIR, test_name) + cmd.append(os.path.join(test_working_dir, test_name + ".profraw")) + profdata_file = os.path.join(test_working_dir, test_name + ".profdata") - cmd.append('-o ' + profdata_file) - logging.info('Combining profraw files into profdata for ' + test_name) - logging.debug('cmd: ' + " ".join(cmd)) - if subprocess.call(" ".join(cmd), shell=True) != 0: - logging.error('Failed to merge profraw files for ' + test_name) - sys.exit(1) + cmd.append('-o ' + profdata_file) + logging.info('Combining profraw files into profdata for ' + test_name) + logging.debug('cmd: ' + " ".join(cmd)) + if subprocess.call(" ".join(cmd), shell=True) != 0: + logging.error('Failed to merge profraw files for ' + test_name) + sys.exit(1) def generate_coverage_html(test): - COVERAGE_ROOT = '/proc/self/cwd' - - test_name = test['test_name'] - file_list = test['covered_files'] - - test_working_dir = os.path.join(WORKING_DIR, test_name) - test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata") - - cmd = [ - os.path.join(get_android_root_or_die(), LLVM_COV), - "show", - "-format=html", - "-summary-only", - "-show-line-counts-or-regions", - "-show-instantiation-summary", - "-instr-profile=" + test_profdata_file, - "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" + - get_android_root_or_die() + "\"", - "-output-dir=" + test_working_dir - ] + COVERAGE_ROOT = '/proc/self/cwd' + + test_name = test['test_name'] + file_list = test['covered_files'] - # We have to have one object file not as an argument otherwise we can't specify source files. - test_cmd = os.path.join(os.path.join(get_native_test_root_or_die(), test_name), test_name) - cmd.append(test_cmd) + test_working_dir = os.path.join(WORKING_DIR, test_name) + test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata") - # Filter out the specific files we want coverage for - for filename in file_list: - cmd.append(os.path.join(get_android_root_or_die(), filename)) + cmd = [ + os.path.join(get_android_root_or_die(), LLVM_COV), "show", + "-format=html", "-summary-only", "-show-line-counts-or-regions", + "-show-instantiation-summary", "-instr-profile=" + test_profdata_file, + "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" + + get_android_root_or_die() + "\"", "-output-dir=" + test_working_dir + ] - logging.info('Generating coverage report for ' + test['test_name']) - logging.debug('cmd: ' + " ".join(cmd)) - if subprocess.call(" ".join(cmd), shell=True) != 0: - logging.error('Failed to generate coverage for ' + test['test_name']) - sys.exit(1) + # We have to have one object file not as an argument otherwise we can't specify source files. + test_cmd = os.path.join( + os.path.join(get_native_test_root_or_die(), test_name), test_name) + cmd.append(test_cmd) + # Filter out the specific files we want coverage for + for filename in file_list: + cmd.append(os.path.join(get_android_root_or_die(), filename)) -def generate_coverage_json(test): - COVERAGE_ROOT = '/proc/self/cwd' - test_name = test['test_name'] - file_list = test['covered_files'] + logging.info('Generating coverage report for ' + test['test_name']) + logging.debug('cmd: ' + " ".join(cmd)) + if subprocess.call(" ".join(cmd), shell=True) != 0: + logging.error('Failed to generate coverage for ' + test['test_name']) + sys.exit(1) - test_working_dir = os.path.join(WORKING_DIR, test_name) - test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata") - cmd = [ - os.path.join(get_android_root_or_die(), LLVM_COV), - "export", - "-summary-only", - "-show-region-summary", - "-instr-profile=" + test_profdata_file, - "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" + get_android_root_or_die() + "\"", - ] +def generate_coverage_json(test): + COVERAGE_ROOT = '/proc/self/cwd' + test_name = test['test_name'] + file_list = test['covered_files'] + + test_working_dir = os.path.join(WORKING_DIR, test_name) + test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata") + + cmd = [ + os.path.join(get_android_root_or_die(), LLVM_COV), + "export", + "-summary-only", + "-show-region-summary", + "-instr-profile=" + test_profdata_file, + "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" + + get_android_root_or_die() + "\"", + ] - test_cmd = os.path.join(os.path.join(get_native_test_root_or_die(), test_name), test_name) - cmd.append(test_cmd) + test_cmd = os.path.join( + os.path.join(get_native_test_root_or_die(), test_name), test_name) + cmd.append(test_cmd) - # Filter out the specific files we want coverage for - for filename in file_list: - cmd.append(os.path.join(get_android_root_or_die(), filename)) + # Filter out the specific files we want coverage for + for filename in file_list: + cmd.append(os.path.join(get_android_root_or_die(), filename)) - logging.info('Generating coverage json for ' + test['test_name']) - logging.debug('cmd: ' + " ".join(cmd)) + logging.info('Generating coverage json for ' + test['test_name']) + logging.debug('cmd: ' + " ".join(cmd)) - json_str = subprocess.check_output(" ".join(cmd), shell=True) - return json.loads(json_str) + json_str = subprocess.check_output(" ".join(cmd), shell=True) + return json.loads(json_str) def write_json_summary(test): - test_name = test['test_name'] - test_working_dir = os.path.join(WORKING_DIR, test_name) - test_json_summary_file = os.path.join(test_working_dir, test_name + '.json') - logging.debug('Writing json summary file: ' + test_json_summary_file) - json_file = open(test_json_summary_file, 'w') - json.dump(generate_coverage_json(test), json_file) - json_file.close() + test_name = test['test_name'] + test_working_dir = os.path.join(WORKING_DIR, test_name) + test_json_summary_file = os.path.join(test_working_dir, test_name + '.json') + logging.debug('Writing json summary file: ' + test_json_summary_file) + json_file = open(test_json_summary_file, 'w') + json.dump(generate_coverage_json(test), json_file) + json_file.close() def list_tests(): - for test in COVERAGE_TESTS: - print "Test Name: " + test['test_name'] - print "Covered Files: " - for covered_file in test['covered_files']: - print " " + covered_file - print + for test in COVERAGE_TESTS: + print "Test Name: " + test['test_name'] + print "Covered Files: " + for covered_file in test['covered_files']: + print " " + covered_file + print def main(): - parser = argparse.ArgumentParser(description='Generate code coverage for enabled tests.') - parser.add_argument( - '-l', '--list-tests', - action='store_true', - dest='list_tests', - help='List all the available tests to be run as well as covered files.') - parser.add_argument( - '-a', '--all', - action='store_true', - help='Runs all available tests and prints their outputs. If no tests ' \ - 'are specified via the -t option all tests will be run.') - parser.add_argument( - '-t', '--test', - dest='tests', - action='append', - type=str, - metavar='TESTNAME', - default=[], - help='Specifies a test to be run. Multiple tests can be specified by ' \ - 'using this option multiple times. ' \ - 'Example: \"gen_coverage.py -t test1 -t test2\"') - parser.add_argument( - '-o', '--output', - type=str, - metavar='DIRECTORY', - default='/tmp/coverage', - help='Specifies the directory to store all files. The directory will be ' \ - 'created if it does not exist. Default is \"/tmp/coverage\"') - parser.add_argument( - '-s', '--skip-html', - dest='skip_html', - action='store_true', - help='Skip opening up the results of the coverage report in a browser.') - parser.add_argument( - '-j', '--json-file', - dest='json_file', - action='store_true', - help='Write out summary results to json file in test directory.') - - logging.basicConfig(stream=sys.stderr, level=logging.DEBUG, format='%(levelname)s %(message)s') - logging.addLevelName(logging.DEBUG, "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.DEBUG)) - logging.addLevelName(logging.INFO, "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.INFO)) - logging.addLevelName(logging.WARNING, "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.WARNING)) - logging.addLevelName(logging.ERROR, "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.ERROR)) - - args = parser.parse_args() - logging.debug("Args: " + str(args)) - - # Set the working directory - global WORKING_DIR - WORKING_DIR = os.path.abspath(args.output) - logging.debug("Working Dir: " + WORKING_DIR) - - # Print out the list of tests then exit - if args.list_tests: - list_tests() - sys.exit(0) - - # Check to see if a test was specified and if so only generate coverage for - # that test. - if len(args.tests) == 0: - args.all = True - - tests_to_run = [] - for test in COVERAGE_TESTS: - if args.all or test['test_name'] in args.tests: - tests_to_run.append(test) - if test['test_name'] in args.tests: - args.tests.remove(test['test_name']) - - # Error if a test was specified but doesn't exist. - if len(args.tests) != 0: - for test_name in args.tests: - logging.error('\"' + test_name + '\" was not found in the list of available tests.') - sys.exit(1) - - # Generate the info for the tests - for test in tests_to_run: - logging.info('Getting coverage for ' + test['test_name']) - get_profraw_for_test(test['test_name']) - merge_profraw_data(test['test_name']) - if args.json_file: - write_json_summary(test) - generate_coverage_html(test) - - # Generate the root index.html page that sumarizes all of the coverage reports. - generate_root_html(tests_to_run) - - # Open the results in a browser. - if not args.skip_html: - webbrowser.open('file://' + os.path.join(WORKING_DIR, 'index.html')) + parser = argparse.ArgumentParser( + description='Generate code coverage for enabled tests.') + parser.add_argument( + '-l', + '--list-tests', + action='store_true', + dest='list_tests', + help='List all the available tests to be run as well as covered files.') + parser.add_argument( + '-a', '--all', + action='store_true', + help='Runs all available tests and prints their outputs. If no tests ' \ + 'are specified via the -t option all tests will be run.') + parser.add_argument( + '-t', '--test', + dest='tests', + action='append', + type=str, + metavar='TESTNAME', + default=[], + help='Specifies a test to be run. Multiple tests can be specified by ' \ + 'using this option multiple times. ' \ + 'Example: \"gen_coverage.py -t test1 -t test2\"') + parser.add_argument( + '-o', '--output', + type=str, + metavar='DIRECTORY', + default='/tmp/coverage', + help='Specifies the directory to store all files. The directory will be ' \ + 'created if it does not exist. Default is \"/tmp/coverage\"') + parser.add_argument( + '-s', + '--skip-html', + dest='skip_html', + action='store_true', + help='Skip opening up the results of the coverage report in a browser.') + parser.add_argument( + '-j', + '--json-file', + dest='json_file', + action='store_true', + help='Write out summary results to json file in test directory.') + + logging.basicConfig( + stream=sys.stderr, + level=logging.DEBUG, + format='%(levelname)s %(message)s') + logging.addLevelName( + logging.DEBUG, + "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.DEBUG)) + logging.addLevelName( + logging.INFO, + "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.INFO)) + logging.addLevelName( + logging.WARNING, + "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.WARNING)) + logging.addLevelName( + logging.ERROR, + "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.ERROR)) + + args = parser.parse_args() + logging.debug("Args: " + str(args)) + + # Set the working directory + global WORKING_DIR + WORKING_DIR = os.path.abspath(args.output) + logging.debug("Working Dir: " + WORKING_DIR) + + # Print out the list of tests then exit + if args.list_tests: + list_tests() + sys.exit(0) + + # Check to see if a test was specified and if so only generate coverage for + # that test. + if len(args.tests) == 0: + args.all = True + + tests_to_run = [] + for test in COVERAGE_TESTS: + if args.all or test['test_name'] in args.tests: + tests_to_run.append(test) + if test['test_name'] in args.tests: + args.tests.remove(test['test_name']) + + # Error if a test was specified but doesn't exist. + if len(args.tests) != 0: + for test_name in args.tests: + logging.error('\"' + test_name + + '\" was not found in the list of available tests.') + sys.exit(1) + + # Generate the info for the tests + for test in tests_to_run: + logging.info('Getting coverage for ' + test['test_name']) + get_profraw_for_test(test['test_name']) + merge_profraw_data(test['test_name']) + if args.json_file: + write_json_summary(test) + generate_coverage_html(test) + + # Generate the root index.html page that sumarizes all of the coverage reports. + generate_root_html(tests_to_run) + + # Open the results in a browser. + if not args.skip_html: + webbrowser.open('file://' + os.path.join(WORKING_DIR, 'index.html')) if __name__ == '__main__': - main() + main() diff --git a/system/test/run_host_unit_tests.py b/system/test/run_host_unit_tests.py index 06074e51ee..4b9e4bf457 100755 --- a/system/test/run_host_unit_tests.py +++ b/system/test/run_host_unit_tests.py @@ -21,201 +21,201 @@ import argparse # Registered host based unit tests # Must have 'host_supported: true' HOST_TESTS = [ - 'bluetooth_test_common', - 'bluetoothtbd_test', - 'net_test_avrcp', - 'net_test_btcore', - 'net_test_types', - 'net_test_btpackets', + 'bluetooth_test_common', + 'bluetoothtbd_test', + 'net_test_avrcp', + 'net_test_btcore', + 'net_test_types', + 'net_test_btpackets', ] SOONG_UI_BASH = 'build/soong/soong_ui.bash' + def str2bool(argument, default=False): - """ Convert a string to a booleen value. """ - argument = str(argument) - if argument.lower() in ['0', 'f', 'false', 'off', 'no', 'n']: - return False - elif argument.lower() in ['1', 't', 'true', 'on', 'yes', 'y']: - return True - return default + """ Convert a string to a booleen value. """ + argument = str(argument) + if argument.lower() in ['0', 'f', 'false', 'off', 'no', 'n']: + return False + elif argument.lower() in ['1', 't', 'true', 'on', 'yes', 'y']: + return True + return default def check_dir_exists(dir, dirname): - if not os.path.isdir(dir): - print "Couldn't find %s (%s)!" % (dirname, dir) - sys.exit(0) + if not os.path.isdir(dir): + print "Couldn't find %s (%s)!" % (dirname, dir) + sys.exit(0) def get_output_from_command(cmd): - try: - return subprocess.check_output(cmd).strip() - except subprocess.CalledProcessError as e: - print 'Failed to call {cmd}, return code {code}'.format(cmd=cmd, - code=e.returncode) - print e - return None + try: + return subprocess.check_output(cmd).strip() + except subprocess.CalledProcessError as e: + print 'Failed to call {cmd}, return code {code}'.format( + cmd=cmd, code=e.returncode) + print e + return None def get_android_root_or_die(): - value = os.environ.get('ANDROID_BUILD_TOP') - if not value: - # Try to find build/soong/soong_ui.bash upwards until root directory - current_path = os.path.abspath(os.getcwd()) - while current_path and os.path.isdir(current_path): - soong_ui_bash_path = os.path.join(current_path, SOONG_UI_BASH) - if os.path.isfile(soong_ui_bash_path): - # Use value returned from Soong UI instead in case definition to TOP - # changes in the future - value = get_output_from_command((soong_ui_bash_path, - '--dumpvar-mode', - '--abs', - 'TOP')) - break - parent_path = os.path.abspath(os.path.join(current_path, os.pardir)) - if parent_path == current_path: - current_path = None - else: - current_path = parent_path + value = os.environ.get('ANDROID_BUILD_TOP') if not value: - print 'Cannot determine ANDROID_BUILD_TOP' - sys.exit(1) - check_dir_exists(value, '$ANDROID_BUILD_TOP') - return value + # Try to find build/soong/soong_ui.bash upwards until root directory + current_path = os.path.abspath(os.getcwd()) + while current_path and os.path.isdir(current_path): + soong_ui_bash_path = os.path.join(current_path, SOONG_UI_BASH) + if os.path.isfile(soong_ui_bash_path): + # Use value returned from Soong UI instead in case definition to TOP + # changes in the future + value = get_output_from_command( + (soong_ui_bash_path, '--dumpvar-mode', '--abs', 'TOP')) + break + parent_path = os.path.abspath(os.path.join(current_path, os.pardir)) + if parent_path == current_path: + current_path = None + else: + current_path = parent_path + if not value: + print 'Cannot determine ANDROID_BUILD_TOP' + sys.exit(1) + check_dir_exists(value, '$ANDROID_BUILD_TOP') + return value def get_android_host_out_or_die(): - value = os.environ.get('ANDROID_HOST_OUT') - if not value: - ANDROID_BUILD_TOP = get_android_root_or_die() - value = get_output_from_command((os.path.join(ANDROID_BUILD_TOP, - SOONG_UI_BASH), - '--dumpvar-mode', - '--abs', - 'HOST_OUT')) + value = os.environ.get('ANDROID_HOST_OUT') if not value: - print 'Cannot determine ANDROID_HOST_OUT' - sys.exit(1) - check_dir_exists(value, '$ANDROID_HOST_OUT') - return value + ANDROID_BUILD_TOP = get_android_root_or_die() + value = get_output_from_command( + (os.path.join(ANDROID_BUILD_TOP, SOONG_UI_BASH), '--dumpvar-mode', + '--abs', 'HOST_OUT')) + if not value: + print 'Cannot determine ANDROID_HOST_OUT' + sys.exit(1) + check_dir_exists(value, '$ANDROID_HOST_OUT') + return value def get_android_dist_dir_or_die(): - # Check if $DIST_DIR is predefined as environment variable - value = os.environ.get('DIST_DIR') - if not value: - # If not use the default path - ANDROID_BUILD_TOP = get_android_root_or_die() - value = os.path.join(os.path.join(ANDROID_BUILD_TOP, 'out'), 'dist') - if not os.path.isdir(value): - if os.path.exists(value): - print '%s is not a directory!' % (value) - sys.exit(1) - os.makedirs(value) - return value + # Check if $DIST_DIR is predefined as environment variable + value = os.environ.get('DIST_DIR') + if not value: + # If not use the default path + ANDROID_BUILD_TOP = get_android_root_or_die() + value = os.path.join(os.path.join(ANDROID_BUILD_TOP, 'out'), 'dist') + if not os.path.isdir(value): + if os.path.exists(value): + print '%s is not a directory!' % (value) + sys.exit(1) + os.makedirs(value) + return value def get_native_test_root_or_die(): - android_host_out = get_android_host_out_or_die() - test_root = os.path.join(android_host_out, 'nativetest64') - if not os.path.isdir(test_root): - test_root = os.path.join(android_host_out, 'nativetest') + android_host_out = get_android_host_out_or_die() + test_root = os.path.join(android_host_out, 'nativetest64') if not os.path.isdir(test_root): - print 'Neither nativetest64 nor nativetest directory exist,' \ - ' please compile first' - sys.exit(1) - return test_root + test_root = os.path.join(android_host_out, 'nativetest') + if not os.path.isdir(test_root): + print 'Neither nativetest64 nor nativetest directory exist,' \ + ' please compile first' + sys.exit(1) + return test_root def get_test_cmd_or_die(test_root, test_name, enable_xml, test_filter): - test_path = os.path.join(os.path.join(test_root, test_name), test_name) - if not os.path.isfile(test_path): - print 'Cannot find: ' + test_path - sys.exit(1) - cmd = [test_path] - if enable_xml: - dist_dir = get_android_dist_dir_or_die() - log_output_path = os.path.join(dist_dir, 'gtest/{0}_test_details.xml' - .format(test_name)) - cmd.append('--gtest_output=xml:{0}'.format(log_output_path)) - if test_filter: - cmd.append('--gtest_filter=%s' % test_filter) - return cmd + test_path = os.path.join(os.path.join(test_root, test_name), test_name) + if not os.path.isfile(test_path): + print 'Cannot find: ' + test_path + sys.exit(1) + cmd = [test_path] + if enable_xml: + dist_dir = get_android_dist_dir_or_die() + log_output_path = os.path.join( + dist_dir, 'gtest/{0}_test_details.xml'.format(test_name)) + cmd.append('--gtest_output=xml:{0}'.format(log_output_path)) + if test_filter: + cmd.append('--gtest_filter=%s' % test_filter) + return cmd # path is relative to Android build top def build_target(target, num_tasks): - ANDROID_BUILD_TOP = get_android_root_or_die() - build_cmd = [SOONG_UI_BASH, '--make-mode'] - if num_tasks > 1: - build_cmd.append('-j' + str(num_tasks)) - build_cmd.append(target) - p = subprocess.Popen(build_cmd, cwd=ANDROID_BUILD_TOP, env=os.environ.copy()) - return_code = p.wait() - if return_code != 0: - print 'BUILD FAILED, return code: {0}'.format(str(return_code)) - sys.exit(1) - return + ANDROID_BUILD_TOP = get_android_root_or_die() + build_cmd = [SOONG_UI_BASH, '--make-mode'] + if num_tasks > 1: + build_cmd.append('-j' + str(num_tasks)) + build_cmd.append(target) + p = subprocess.Popen( + build_cmd, cwd=ANDROID_BUILD_TOP, env=os.environ.copy()) + return_code = p.wait() + if return_code != 0: + print 'BUILD FAILED, return code: {0}'.format(str(return_code)) + sys.exit(1) + return def main(): - """ run_host_unit_tests.py - Run registered host based unit tests + """ run_host_unit_tests.py - Run registered host based unit tests """ - parser = argparse.ArgumentParser(description='Run host based unit tests.') - parser.add_argument( - '--enable_xml', - type=str2bool, - dest='enable_xml', - nargs='?', - const=True, - default=False, - help= - 'Whether to output structured XML log output in out/dist/gtest directory') - parser.add_argument( - '-j', - type=int, - nargs='?', - dest='num_tasks', - const=-1, - default=-1, - help='Number of tasks to run at the same time') - parser.add_argument( - 'rest', - nargs=argparse.REMAINDER, - help='-- args, other gtest arguments for each individual test') - args = parser.parse_args() - - build_target('MODULES-IN-system-bt', args.num_tasks) - TEST_ROOT = get_native_test_root_or_die() - test_results = [] - for test in HOST_TESTS: - test_cmd = get_test_cmd_or_die(TEST_ROOT, test, args.enable_xml, args.rest) - if subprocess.call(test_cmd) != 0: - test_results.append(False) - else: - test_results.append(True) - if not all(test_results): - failures = [i for i, x in enumerate(test_results) if not x] - for index in failures: - print 'TEST FAILLED: ' + HOST_TESTS[index] - sys.exit(0) - print 'TEST PASSED ' + str(len(test_results)) + ' tests were run' + parser = argparse.ArgumentParser(description='Run host based unit tests.') + parser.add_argument( + '--enable_xml', + type=str2bool, + dest='enable_xml', + nargs='?', + const=True, + default=False, + help= + 'Whether to output structured XML log output in out/dist/gtest directory' + ) + parser.add_argument( + '-j', + type=int, + nargs='?', + dest='num_tasks', + const=-1, + default=-1, + help='Number of tasks to run at the same time') + parser.add_argument( + 'rest', + nargs=argparse.REMAINDER, + help='-- args, other gtest arguments for each individual test') + args = parser.parse_args() + + build_target('MODULES-IN-system-bt', args.num_tasks) + TEST_ROOT = get_native_test_root_or_die() + test_results = [] + for test in HOST_TESTS: + test_cmd = get_test_cmd_or_die(TEST_ROOT, test, args.enable_xml, + args.rest) + if subprocess.call(test_cmd) != 0: + test_results.append(False) + else: + test_results.append(True) + if not all(test_results): + failures = [i for i, x in enumerate(test_results) if not x] + for index in failures: + print 'TEST FAILLED: ' + HOST_TESTS[index] + sys.exit(0) + print 'TEST PASSED ' + str(len(test_results)) + ' tests were run' - dist_dir = get_android_dist_dir_or_die() - log_output_path = os.path.join(dist_dir, 'gtest/coverage') - cmd_path = os.path.join(get_android_root_or_die(), 'packages/modules/Bluetooth/system/test') - print cmd_path - cmd = [ - os.path.join(cmd_path, 'gen_coverage.py'), - '--skip-html', - '--json-file', - '-o', - log_output_path, - ] - subprocess.call(cmd) + dist_dir = get_android_dist_dir_or_die() + log_output_path = os.path.join(dist_dir, 'gtest/coverage') + cmd_path = os.path.join(get_android_root_or_die(), 'packages/modules/Bluetooth/system/test') + print cmd_path + cmd = [ + os.path.join(cmd_path, 'gen_coverage.py'), + '--skip-html', + '--json-file', + '-o', + log_output_path, + ] + subprocess.call(cmd) - sys.exit(0) + sys.exit(0) if __name__ == '__main__': - main() + main() diff --git a/system/tools/scripts/btsnoop_live.py b/system/tools/scripts/btsnoop_live.py index 4e145ffea3..00b45fdda1 100644 --- a/system/tools/scripts/btsnoop_live.py +++ b/system/tools/scripts/btsnoop_live.py @@ -67,19 +67,17 @@ from ctypes import byref, c_bool, c_longlong, CDLL from _ctypes import FreeLibrary from datetime import datetime - # Update below to right path corresponding to your machine, FTS version and OS used. FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\' FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\Executables\\Core\\' - iniName = 'liveimport.ini' if (platform.architecture()[0] == '32bit'): dllName = 'LiveImportAPI.dll' else: dllName = 'LiveImportAPI_x64.dll' -launchFtsCmd = '\"'+FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"' +launchFtsCmd = '\"' + FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"' # Unix Epoch delta since 01/01/1970 FILETIME_EPOCH_DELTA = 116444736000000000 @@ -96,7 +94,8 @@ def get_file_time(): Obtain current time in file time format for display """ date_time = datetime.now() - file_time = FILETIME_EPOCH_DELTA + (timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS) + file_time = FILETIME_EPOCH_DELTA + ( + timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS) file_time = file_time + (date_time.microsecond * 10) return file_time @@ -135,6 +134,7 @@ def get_configuration_string(): else: return None + def check_live_import_connection(live_import): """ Launch FTS app in Virtual Sniffing Mode @@ -146,7 +146,7 @@ def check_live_import_connection(live_import): status = live_import.IsAppReady(byref(is_connection_running)) if (is_connection_running.value == True): - print ("FTS is already launched, Start capture if not already started") + print("FTS is already launched, Start capture if not already started") return True print("Launching FTS Virtual Sniffing") @@ -159,14 +159,14 @@ def check_live_import_connection(live_import): while (is_connection_running.value == False and count < 12): status = live_import.IsAppReady(byref(is_connection_running)) if (status < 0): - print("Live Import Internal Error %d" %(status)) + print("Live Import Internal Error %d" % (status)) return False if (is_connection_running.value == False): print("Waiting for 5 sec.. Open FTS Virtual Sniffing") time.sleep(5) count += 1 if (is_connection_running.value == True): - print ("FTS is ready to receive the data, Start capture now") + print("FTS is ready to receive the data, Start capture now") return True else: print("FTS Virtual Sniffing didn't start until 1 min.. exiting") @@ -184,13 +184,13 @@ def init_live_import(conn_str, config_str): return None if live_import is None: - print("Error: Path to LiveImportAPI.dll is incorrect.. exiting"); + print("Error: Path to LiveImportAPI.dll is incorrect.. exiting") return None print(dllName + " loaded successfully") - result = live_import.InitializeLiveImport(conn_str.encode('ascii', 'ignore'), - config_str.encode('ascii', 'ignore'), - byref(success)) + result = live_import.InitializeLiveImport( + conn_str.encode('ascii', 'ignore'), config_str.encode( + 'ascii', 'ignore'), byref(success)) if (result < 0): print("Live Import Init failed") return None @@ -244,7 +244,7 @@ def main(): while True: try: - snoop_hdr = btsnoop_sock.recv(SNOOP_HDR) + snoop_hdr = btsnoop_sock.recv(SNOOP_HDR) if snoop_hdr is not None: try: olen, ilen, flags = struct.unpack(">LLL", snoop_hdr[0:12]) @@ -261,7 +261,8 @@ def main(): if data_frag is not None: snoop_data += data_frag - print ("Bytes received %d Olen %d ilen %d flags %d" % (len(snoop_data), olen, ilen, flags)) + print("Bytes received %d Olen %d ilen %d flags %d" % + (len(snoop_data), olen, ilen, flags)) packet_type = struct.unpack(">B", snoop_data[0:1])[0] if packet_type == 1: drf = 1 @@ -282,7 +283,9 @@ def main(): drf = 8 isend = 1 - result = live_import.SendFrame(olen-1, olen-1, snoop_data[1:olen], drf, isend, timestamp) + result = live_import.SendFrame(olen - 1, olen - 1, + snoop_data[1:olen], drf, isend, + timestamp) if (result < 0): print("Send frame failed") except KeyboardInterrupt: @@ -294,4 +297,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/system/tools/scripts/btsnooz.py b/system/tools/scripts/btsnooz.py index 6e0e11200b..a29e718d7b 100755 --- a/system/tools/scripts/btsnooz.py +++ b/system/tools/scripts/btsnooz.py @@ -21,14 +21,12 @@ where the file_header and record_header are modified versions of the btsnoop headers. """ - import base64 import fileinput import struct import sys import zlib - # Enumeration of the values the 'type' field can take in a btsnooz # header. These values come from the Bluetooth stack's internal # representation of packet types. @@ -41,127 +39,135 @@ TYPE_OUT_SCO = 0x22 def type_to_direction(type): - """ + """ Returns the inbound/outbound direction of a packet given its type. 0 = sent packet 1 = received packet """ - if type in [TYPE_IN_EVT, TYPE_IN_ACL, TYPE_IN_SCO]: - return 1 - return 0 + if type in [TYPE_IN_EVT, TYPE_IN_ACL, TYPE_IN_SCO]: + return 1 + return 0 def type_to_hci(type): - """ + """ Returns the HCI type of a packet given its btsnooz type. """ - if type == TYPE_OUT_CMD: - return '\x01' - if type == TYPE_IN_ACL or type == TYPE_OUT_ACL: - return '\x02' - if type == TYPE_IN_SCO or type == TYPE_OUT_SCO: - return '\x03' - if type == TYPE_IN_EVT: - return '\x04' + if type == TYPE_OUT_CMD: + return '\x01' + if type == TYPE_IN_ACL or type == TYPE_OUT_ACL: + return '\x02' + if type == TYPE_IN_SCO or type == TYPE_OUT_SCO: + return '\x03' + if type == TYPE_IN_EVT: + return '\x04' def decode_snooz(snooz): - """ + """ Decodes all known versions of a btsnooz file into a btsnoop file. """ - version, last_timestamp_ms = struct.unpack_from('=bQ', snooz) + version, last_timestamp_ms = struct.unpack_from('=bQ', snooz) - if version != 1 and version != 2: - sys.stderr.write('Unsupported btsnooz version: %s\n' % version) - exit(1) + if version != 1 and version != 2: + sys.stderr.write('Unsupported btsnooz version: %s\n' % version) + exit(1) - # Oddly, the file header (9 bytes) is not compressed, but the rest is. - decompressed = zlib.decompress(snooz[9:]) + # Oddly, the file header (9 bytes) is not compressed, but the rest is. + decompressed = zlib.decompress(snooz[9:]) - sys.stdout.write('btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea') + sys.stdout.write('btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea') - if version == 1: - decode_snooz_v1(decompressed, last_timestamp_ms) - elif version == 2: - decode_snooz_v2(decompressed, last_timestamp_ms) + if version == 1: + decode_snooz_v1(decompressed, last_timestamp_ms) + elif version == 2: + decode_snooz_v2(decompressed, last_timestamp_ms) def decode_snooz_v1(decompressed, last_timestamp_ms): - """ + """ Decodes btsnooz v1 files into a btsnoop file. """ - # An unfortunate consequence of the file format design: we have to do a - # pass of the entire file to determine the timestamp of the first packet. - first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000 - offset = 0 - while offset < len(decompressed): - length, delta_time_ms, type = struct.unpack_from('=HIb', decompressed, offset) - offset += 7 + length - 1 - first_timestamp_ms -= delta_time_ms - - # Second pass does the actual writing out to stdout. - offset = 0 - while offset < len(decompressed): - length, delta_time_ms, type = struct.unpack_from('=HIb', decompressed, offset) - first_timestamp_ms += delta_time_ms - offset += 7 - sys.stdout.write(struct.pack('>II', length, length)) - sys.stdout.write(struct.pack('>II', type_to_direction(type), 0)) - sys.stdout.write(struct.pack('>II', (first_timestamp_ms >> 32), (first_timestamp_ms & 0xFFFFFFFF))) - sys.stdout.write(type_to_hci(type)) - sys.stdout.write(decompressed[offset : offset + length - 1]) - offset += length - 1 + # An unfortunate consequence of the file format design: we have to do a + # pass of the entire file to determine the timestamp of the first packet. + first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000 + offset = 0 + while offset < len(decompressed): + length, delta_time_ms, type = struct.unpack_from( + '=HIb', decompressed, offset) + offset += 7 + length - 1 + first_timestamp_ms -= delta_time_ms + + # Second pass does the actual writing out to stdout. + offset = 0 + while offset < len(decompressed): + length, delta_time_ms, type = struct.unpack_from( + '=HIb', decompressed, offset) + first_timestamp_ms += delta_time_ms + offset += 7 + sys.stdout.write(struct.pack('>II', length, length)) + sys.stdout.write(struct.pack('>II', type_to_direction(type), 0)) + sys.stdout.write( + struct.pack('>II', (first_timestamp_ms >> 32), + (first_timestamp_ms & 0xFFFFFFFF))) + sys.stdout.write(type_to_hci(type)) + sys.stdout.write(decompressed[offset:offset + length - 1]) + offset += length - 1 def decode_snooz_v2(decompressed, last_timestamp_ms): - """ + """ Decodes btsnooz v2 files into a btsnoop file. """ - # An unfortunate consequence of the file format design: we have to do a - # pass of the entire file to determine the timestamp of the first packet. - first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000 - offset = 0 - while offset < len(decompressed): - length, packet_length, delta_time_ms, snooz_type = struct.unpack_from('=HHIb', decompressed, offset) - offset += 9 + length - 1 - first_timestamp_ms -= delta_time_ms - - # Second pass does the actual writing out to stdout. - offset = 0 - while offset < len(decompressed): - length, packet_length, delta_time_ms, snooz_type = struct.unpack_from('=HHIb', decompressed, offset) - first_timestamp_ms += delta_time_ms - offset += 9 - sys.stdout.write(struct.pack('>II', packet_length, length)) - sys.stdout.write(struct.pack('>II', type_to_direction(snooz_type), 0)) - sys.stdout.write(struct.pack('>II', (first_timestamp_ms >> 32), (first_timestamp_ms & 0xFFFFFFFF))) - sys.stdout.write(type_to_hci(snooz_type)) - sys.stdout.write(decompressed[offset : offset + length - 1]) - offset += length - 1 + # An unfortunate consequence of the file format design: we have to do a + # pass of the entire file to determine the timestamp of the first packet. + first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000 + offset = 0 + while offset < len(decompressed): + length, packet_length, delta_time_ms, snooz_type = struct.unpack_from( + '=HHIb', decompressed, offset) + offset += 9 + length - 1 + first_timestamp_ms -= delta_time_ms + + # Second pass does the actual writing out to stdout. + offset = 0 + while offset < len(decompressed): + length, packet_length, delta_time_ms, snooz_type = struct.unpack_from( + '=HHIb', decompressed, offset) + first_timestamp_ms += delta_time_ms + offset += 9 + sys.stdout.write(struct.pack('>II', packet_length, length)) + sys.stdout.write(struct.pack('>II', type_to_direction(snooz_type), 0)) + sys.stdout.write( + struct.pack('>II', (first_timestamp_ms >> 32), + (first_timestamp_ms & 0xFFFFFFFF))) + sys.stdout.write(type_to_hci(snooz_type)) + sys.stdout.write(decompressed[offset:offset + length - 1]) + offset += length - 1 def main(): - if len(sys.argv) > 2: - sys.stderr.write('Usage: %s [bugreport]\n' % sys.argv[0]) - exit(1) + if len(sys.argv) > 2: + sys.stderr.write('Usage: %s [bugreport]\n' % sys.argv[0]) + exit(1) - iterator = fileinput.input() - found = False - base64_string = "" - for line in iterator: - if found: - if line.find('--- END:BTSNOOP_LOG_SUMMARY') != -1: - decode_snooz(base64.standard_b64decode(base64_string)) - sys.exit(0) - base64_string += line.strip() + iterator = fileinput.input() + found = False + base64_string = "" + for line in iterator: + if found: + if line.find('--- END:BTSNOOP_LOG_SUMMARY') != -1: + decode_snooz(base64.standard_b64decode(base64_string)) + sys.exit(0) + base64_string += line.strip() - if line.find('--- BEGIN:BTSNOOP_LOG_SUMMARY') != -1: - found = True + if line.find('--- BEGIN:BTSNOOP_LOG_SUMMARY') != -1: + found = True - if not found: - sys.stderr.write('No btsnooz section found in bugreport.\n') - sys.exit(1) + if not found: + sys.stderr.write('No btsnooz section found in bugreport.\n') + sys.exit(1) if __name__ == '__main__': - main() + main() diff --git a/system/tools/scripts/dump_hearingaid_audio.py b/system/tools/scripts/dump_hearingaid_audio.py index ca2eed2027..81cd78d98e 100755 --- a/system/tools/scripts/dump_hearingaid_audio.py +++ b/system/tools/scripts/dump_hearingaid_audio.py @@ -54,18 +54,15 @@ DEBUG_DATA = "DEBUG_DATA" AUDIO_DATA_B = "AUDIO_DATA_B" # Debug packet header struct -header_list_str = ["Event Processed", - "Number Packet Nacked By Slave", - "Number Packet Nacked By Master"] +header_list_str = [ + "Event Processed", "Number Packet Nacked By Slave", + "Number Packet Nacked By Master" +] # Debug frame information structs -data_list_str = ["Event Number", - "Overrun", - "Underrun", - "Skips", - "Rendered Audio Frame", - "First PDU Option", - "Second PDU Option", - "Third PDU Option"] +data_list_str = [ + "Event Number", "Overrun", "Underrun", "Skips", "Rendered Audio Frame", + "First PDU Option", "Second PDU Option", "Third PDU Option" +] AUDIO_CONTROL_POINT_UUID = "f0d4de7e4a88476c9d9f1937b0996cc0" SEC_CONVERT = 1000000 @@ -86,172 +83,184 @@ audio_data = {} # Parse Hearing Aid Packet #----------------------------------------------------------------------- + def parse_acl_ha_debug_buffer(data, result): - """This function extracts HA debug buffer""" - if len(data) < 5: - return - - version, data = unpack_data(data, 1) - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_VERSION, str(version)) - - debug_str = result[TIMESTAMP_TIME_FORMAT]; - for p in range(3): - byte_data, data = unpack_data(data, 1) - debug_str = debug_str + ", " + header_list_str[p] + "=" + str(byte_data).rjust(3) - - if full_debug: - debug_str = debug_str + "\n" + "|".join(data_list_str) + "\n" - while True: - if len(data) < 7: - break - base = 0 - data_list_content = [] - for counter in range(6): - p = base + counter + """This function extracts HA debug buffer""" + if len(data) < 5: + return + + version, data = unpack_data(data, 1) + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], + DEBUG_VERSION, str(version)) + + debug_str = result[TIMESTAMP_TIME_FORMAT] + for p in range(3): byte_data, data = unpack_data(data, 1) - if p == 1: - data_list_content.append(str(byte_data & 0x03).rjust(len(data_list_str[p]))) - data_list_content.append(str((byte_data >> 2) & 0x03).rjust(len(data_list_str[p + 1]))) - data_list_content.append(str((byte_data >> 4) & 0x0f).rjust(len(data_list_str[p + 2]))) - base = 2 - else: - data_list_content.append(str(byte_data).rjust(len(data_list_str[p]))) - debug_str = debug_str + "|".join(data_list_content) + "\n" + debug_str = debug_str + ", " + header_list_str[p] + "=" + str( + byte_data).rjust(3) + + if full_debug: + debug_str = debug_str + "\n" + "|".join(data_list_str) + "\n" + while True: + if len(data) < 7: + break + base = 0 + data_list_content = [] + for counter in range(6): + p = base + counter + byte_data, data = unpack_data(data, 1) + if p == 1: + data_list_content.append( + str(byte_data & 0x03).rjust(len(data_list_str[p]))) + data_list_content.append( + str((byte_data >> 2) & 0x03).rjust( + len(data_list_str[p + 1]))) + data_list_content.append( + str((byte_data >> 4) & 0x0f).rjust( + len(data_list_str[p + 2]))) + base = 2 + else: + data_list_content.append( + str(byte_data).rjust(len(data_list_str[p]))) + debug_str = debug_str + "|".join(data_list_content) + "\n" + + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_DATA, + debug_str) - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_DATA, debug_str) def parse_acl_ha_audio_data(data, result): - """This function extracts HA audio data.""" - if len(data) < 2: - return - # Remove audio packet number - audio_data_b = data[1:] - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - AUDIO_DATA_B, audio_data_b) + """This function extracts HA audio data.""" + if len(data) < 2: + return + # Remove audio packet number + audio_data_b = data[1:] + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], + AUDIO_DATA_B, audio_data_b) def parse_acl_ha_audio_type(data, result): - """This function parses HA audio control cmd audio type.""" - audio_type, data = unpack_data(data, 1) - if audio_type is None: - return - elif audio_type == 0x01: - audio_type = "Ringtone" - elif audio_type == 0x02: - audio_type = "Phonecall" - elif audio_type == 0x03: - audio_type = "Media" - else: - audio_type = "Unknown" - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - AUDIO_TYPE, audio_type) + """This function parses HA audio control cmd audio type.""" + audio_type, data = unpack_data(data, 1) + if audio_type is None: + return + elif audio_type == 0x01: + audio_type = "Ringtone" + elif audio_type == 0x02: + audio_type = "Phonecall" + elif audio_type == 0x03: + audio_type = "Media" + else: + audio_type = "Unknown" + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], AUDIO_TYPE, + audio_type) def parse_acl_ha_codec(data, result): - """This function parses HA audio control cmd codec and sample rate.""" - codec, data = unpack_data(data, 1) - if codec == 0x01: - codec = "G722" - sample_rate = "16KHZ" - elif codec == 0x02: - codec = "G722" - sample_rate = "24KHZ" - else: - codec = "Unknown" - sample_rate = "Unknown" - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - CODEC, codec) - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - SAMPLE_RATE, sample_rate) - parse_acl_ha_audio_type(data, result) + """This function parses HA audio control cmd codec and sample rate.""" + codec, data = unpack_data(data, 1) + if codec == 0x01: + codec = "G722" + sample_rate = "16KHZ" + elif codec == 0x02: + codec = "G722" + sample_rate = "24KHZ" + else: + codec = "Unknown" + sample_rate = "Unknown" + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], CODEC, + codec) + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], SAMPLE_RATE, + sample_rate) + parse_acl_ha_audio_type(data, result) def parse_acl_ha_audio_control_cmd(data, result): - """This function parses HA audio control cmd is start/stop.""" - control_cmd, data = unpack_data(data, 1) - if control_cmd == 0x01: - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - START, True) - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - TIMESTAMP_STR_FORMAT, result[TIMESTAMP_STR_FORMAT]) - parse_acl_ha_codec(data, result) - elif control_cmd == 0x02: - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - START, False) + """This function parses HA audio control cmd is start/stop.""" + control_cmd, data = unpack_data(data, 1) + if control_cmd == 0x01: + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], START, + True) + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], + TIMESTAMP_STR_FORMAT, result[TIMESTAMP_STR_FORMAT]) + parse_acl_ha_codec(data, result) + elif control_cmd == 0x02: + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], START, + False) #----------------------------------------------------------------------- # Parse ACL Packet #----------------------------------------------------------------------- + def parse_acl_att_long_uuid(data, result): - """This function parses ATT long UUID to get attr_handle.""" - # len (1 byte) + start_attr_handle (2 bytes) + properties (1 byte) + - # attr_handle (2 bytes) + long_uuid (16 bytes) = 22 bytes - if len(data) < 22: - return - # skip unpack len, start_attr_handle, properties. - data = data[4:] - attr_handle, data = unpack_data(data, 2) - long_uuid_list = [] - for p in range(0, 16): - long_uuid_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0])) - long_uuid_list.reverse() - long_uuid = "".join(long_uuid_list) - # Check long_uuid is AUDIO_CONTROL_POINT uuid to get the attr_handle. - if long_uuid == AUDIO_CONTROL_POINT_UUID: - update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], - AUDIO_CONTROL_ATTR_HANDLE, attr_handle) + """This function parses ATT long UUID to get attr_handle.""" + # len (1 byte) + start_attr_handle (2 bytes) + properties (1 byte) + + # attr_handle (2 bytes) + long_uuid (16 bytes) = 22 bytes + if len(data) < 22: + return + # skip unpack len, start_attr_handle, properties. + data = data[4:] + attr_handle, data = unpack_data(data, 2) + long_uuid_list = [] + for p in range(0, 16): + long_uuid_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0])) + long_uuid_list.reverse() + long_uuid = "".join(long_uuid_list) + # Check long_uuid is AUDIO_CONTROL_POINT uuid to get the attr_handle. + if long_uuid == AUDIO_CONTROL_POINT_UUID: + update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], + AUDIO_CONTROL_ATTR_HANDLE, attr_handle) def parse_acl_opcode(data, result): - """This function parses acl data opcode.""" - # opcode (1 byte) = 1 bytes - if len(data) < 1: - return - opcode, data = unpack_data(data, 1) - # Check opcode is 0x12 (write request) and attr_handle is - # audio_control_attr_handle for check it is HA audio control cmd. - if result[IS_SENT] and opcode == 0x12: - if len(data) < 2: - return - attr_handle, data = unpack_data(data, 2) - if attr_handle == \ - get_audio_control_attr_handle(result[CONNECTION_HANDLE]): - parse_acl_ha_audio_control_cmd(data, result) - # Check opcode is 0x09 (read response) to parse ATT long UUID. - elif not result[IS_SENT] and opcode == 0x09: - parse_acl_att_long_uuid(data, result) + """This function parses acl data opcode.""" + # opcode (1 byte) = 1 bytes + if len(data) < 1: + return + opcode, data = unpack_data(data, 1) + # Check opcode is 0x12 (write request) and attr_handle is + # audio_control_attr_handle for check it is HA audio control cmd. + if result[IS_SENT] and opcode == 0x12: + if len(data) < 2: + return + attr_handle, data = unpack_data(data, 2) + if attr_handle == \ + get_audio_control_attr_handle(result[CONNECTION_HANDLE]): + parse_acl_ha_audio_control_cmd(data, result) + # Check opcode is 0x09 (read response) to parse ATT long UUID. + elif not result[IS_SENT] and opcode == 0x09: + parse_acl_att_long_uuid(data, result) def parse_acl_handle(data, result): - """This function parses acl data handle.""" - # connection_handle (2 bytes) + total_len (2 bytes) + pdu (2 bytes) - # + channel_id (2 bytes) = 8 bytes - if len(data) < 8: - return - connection_handle, data = unpack_data(data, 2) - connection_handle = connection_handle & 0x0FFF - # skip unpack total_len - data = data[2:] - pdu, data = unpack_data(data, 2) - channel_id, data = unpack_data(data, 2) - - # Check ATT packet or "Coc Data Packet" to get ATT information and audio - # data. - if connection_handle <= 0x0EFF: - if channel_id <= 0x003F: - result[CONNECTION_HANDLE] = connection_handle - parse_acl_opcode(data, result) - elif channel_id >= 0x0040 and channel_id <= 0x007F: - result[CONNECTION_HANDLE] = connection_handle - sdu, data = unpack_data(data, 2) - if pdu - 2 == sdu: - if result[IS_SENT]: - parse_acl_ha_audio_data(data, result) - else: - if simple_debug: - parse_acl_ha_debug_buffer(data, result) + """This function parses acl data handle.""" + # connection_handle (2 bytes) + total_len (2 bytes) + pdu (2 bytes) + # + channel_id (2 bytes) = 8 bytes + if len(data) < 8: + return + connection_handle, data = unpack_data(data, 2) + connection_handle = connection_handle & 0x0FFF + # skip unpack total_len + data = data[2:] + pdu, data = unpack_data(data, 2) + channel_id, data = unpack_data(data, 2) + + # Check ATT packet or "Coc Data Packet" to get ATT information and audio + # data. + if connection_handle <= 0x0EFF: + if channel_id <= 0x003F: + result[CONNECTION_HANDLE] = connection_handle + parse_acl_opcode(data, result) + elif channel_id >= 0x0040 and channel_id <= 0x007F: + result[CONNECTION_HANDLE] = connection_handle + sdu, data = unpack_data(data, 2) + if pdu - 2 == sdu: + if result[IS_SENT]: + parse_acl_ha_audio_data(data, result) + else: + if simple_debug: + parse_acl_ha_debug_buffer(data, result) #======================================================================= @@ -260,48 +269,48 @@ def parse_acl_handle(data, result): def parse_hci_evt_peer_address(data, result): - """This function parses peer address from hci event.""" - peer_address_list = [] - address_empty_list = ["00", "00", "00", "00", "00", "00"] - for n in range(0, 3): - if len(data) < 6: - return - for p in range(0, 6): - peer_address_list.append("{0:02x}".format(struct.unpack(">B", - data[p])[0])) - # Check the address is empty or not. - if peer_address_list == address_empty_list: - del peer_address_list[:] - data = data[6:] - else: - break - peer_address_list.reverse() - peer_address = "_".join(peer_address_list) - update_audio_data("", "", PEER_ADDRESS, peer_address) - update_audio_data(PEER_ADDRESS, peer_address, CONNECTION_HANDLE, - result[CONNECTION_HANDLE]) + """This function parses peer address from hci event.""" + peer_address_list = [] + address_empty_list = ["00", "00", "00", "00", "00", "00"] + for n in range(0, 3): + if len(data) < 6: + return + for p in range(0, 6): + peer_address_list.append("{0:02x}".format( + struct.unpack(">B", data[p])[0])) + # Check the address is empty or not. + if peer_address_list == address_empty_list: + del peer_address_list[:] + data = data[6:] + else: + break + peer_address_list.reverse() + peer_address = "_".join(peer_address_list) + update_audio_data("", "", PEER_ADDRESS, peer_address) + update_audio_data(PEER_ADDRESS, peer_address, CONNECTION_HANDLE, + result[CONNECTION_HANDLE]) def parse_hci_evt_code(data, result): - """This function parses hci event content.""" - # hci_evt (1 byte) + param_total_len (1 byte) + sub_event (1 byte) - # + status (1 byte) + connection_handle (2 bytes) + role (1 byte) - # + address_type (1 byte) = 8 bytes - if len(data) < 8: - return - hci_evt, data = unpack_data(data, 1) - # skip unpack param_total_len. - data = data[1:] - sub_event, data = unpack_data(data, 1) - status, data = unpack_data(data, 1) - connection_handle, data = unpack_data(data, 2) - connection_handle = connection_handle & 0x0FFF - # skip unpack role, address_type. - data = data[2:] - # We will directly check it is LE Enhanced Connection Complete or not - # for get Connection Handle and Address. - if not result[IS_SENT] and hci_evt == 0x3E and sub_event == 0x0A \ - and status == 0x00 and connection_handle <= 0x0EFF: + """This function parses hci event content.""" + # hci_evt (1 byte) + param_total_len (1 byte) + sub_event (1 byte) + # + status (1 byte) + connection_handle (2 bytes) + role (1 byte) + # + address_type (1 byte) = 8 bytes + if len(data) < 8: + return + hci_evt, data = unpack_data(data, 1) + # skip unpack param_total_len. + data = data[1:] + sub_event, data = unpack_data(data, 1) + status, data = unpack_data(data, 1) + connection_handle, data = unpack_data(data, 2) + connection_handle = connection_handle & 0x0FFF + # skip unpack role, address_type. + data = data[2:] + # We will directly check it is LE Enhanced Connection Complete or not + # for get Connection Handle and Address. + if not result[IS_SENT] and hci_evt == 0x3E and sub_event == 0x0A \ + and status == 0x00 and connection_handle <= 0x0EFF: result[CONNECTION_HANDLE] = connection_handle parse_hci_evt_peer_address(data, result) @@ -312,41 +321,42 @@ def parse_hci_evt_code(data, result): def parse_packet_data(data, result): - """This function parses packet type.""" - packet_type, data = unpack_data(data, 1) - if packet_type == 0x02: - # Try to check HearingAid audio control packet and data packet. - parse_acl_handle(data, result) - elif packet_type == 0x04: - # Try to check HearingAid connection successful packet. - parse_hci_evt_code(data, result) + """This function parses packet type.""" + packet_type, data = unpack_data(data, 1) + if packet_type == 0x02: + # Try to check HearingAid audio control packet and data packet. + parse_acl_handle(data, result) + elif packet_type == 0x04: + # Try to check HearingAid connection successful packet. + parse_hci_evt_code(data, result) def parse_packet(btsnoop_file): - """This function parses packet len, timestamp.""" - packet_result = {} - - # ori_len (4 bytes) + include_len (4 bytes) + packet_flag (4 bytes) - # + drop (4 bytes) + timestamp (8 bytes) = 24 bytes - packet_header = btsnoop_file.read(24) - if len(packet_header) != 24: - return False - - ori_len, include_len, packet_flag, drop, timestamp = \ - struct.unpack(">IIIIq", packet_header) - - if ori_len == include_len: - packet_data = btsnoop_file.read(ori_len) - if len(packet_data) != ori_len: - return False - if packet_flag != 2 and drop == 0: - packet_result[IS_SENT] = (packet_flag == 0) - packet_result[TIMESTAMP_STR_FORMAT], packet_result[TIMESTAMP_TIME_FORMAT] = convert_time_str(timestamp) - parse_packet_data(packet_data, packet_result) - else: - return False + """This function parses packet len, timestamp.""" + packet_result = {} + + # ori_len (4 bytes) + include_len (4 bytes) + packet_flag (4 bytes) + # + drop (4 bytes) + timestamp (8 bytes) = 24 bytes + packet_header = btsnoop_file.read(24) + if len(packet_header) != 24: + return False + + ori_len, include_len, packet_flag, drop, timestamp = \ + struct.unpack(">IIIIq", packet_header) + + if ori_len == include_len: + packet_data = btsnoop_file.read(ori_len) + if len(packet_data) != ori_len: + return False + if packet_flag != 2 and drop == 0: + packet_result[IS_SENT] = (packet_flag == 0) + packet_result[TIMESTAMP_STR_FORMAT], packet_result[ + TIMESTAMP_TIME_FORMAT] = convert_time_str(timestamp) + parse_packet_data(packet_data, packet_result) + else: + return False - return True + return True #======================================================================= @@ -355,49 +365,54 @@ def parse_packet(btsnoop_file): def dump_audio_data(data): - """This function dumps audio data into file.""" - file_type = "." + data[CODEC] - file_name_list = [] - file_name_list.append(data[PEER_ADDRESS]) - file_name_list.append(data[TIMESTAMP_STR_FORMAT]) - file_name_list.append(data[AUDIO_TYPE]) - file_name_list.append(data[SAMPLE_RATE]) - if folder is not None: - if not os.path.exists(folder): - os.makedirs(folder) - audio_file_name = os.path.join(folder, "-".join(file_name_list) + file_type) - if data.has_key(DEBUG_VERSION): - file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-" - debug_file_name = os.path.join(folder, file_prefix + "-".join(file_name_list) + ".txt") - else: - audio_file_name = "-".join(file_name_list) + file_type - if data.has_key(DEBUG_VERSION): - file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-" - debug_file_name = file_prefix + "-".join(file_name_list) + ".txt" - - sys.stdout.write("Start to dump Audio File : %s\n" % audio_file_name) - if data.has_key(AUDIO_DATA_B): - with open(audio_file_name, "wb+") as audio_file: - audio_file.write(data[AUDIO_DATA_B]) - sys.stdout.write("Finished to dump Audio File: %s\n\n" % audio_file_name) - else: - sys.stdout.write("Fail to dump Audio File: %s\n" % audio_file_name) - sys.stdout.write("There isn't any Hearing Aid audio data.\n\n") - - if simple_debug: - sys.stdout.write("Start to dump audio %s Debug File\n" % audio_file_name) - if data.has_key(DEBUG_DATA): - with open(debug_file_name, "wb+") as debug_file: - debug_file.write(data[DEBUG_DATA]) - sys.stdout.write("Finished to dump Debug File: %s\n\n" % debug_file_name) + """This function dumps audio data into file.""" + file_type = "." + data[CODEC] + file_name_list = [] + file_name_list.append(data[PEER_ADDRESS]) + file_name_list.append(data[TIMESTAMP_STR_FORMAT]) + file_name_list.append(data[AUDIO_TYPE]) + file_name_list.append(data[SAMPLE_RATE]) + if folder is not None: + if not os.path.exists(folder): + os.makedirs(folder) + audio_file_name = os.path.join(folder, + "-".join(file_name_list) + file_type) + if data.has_key(DEBUG_VERSION): + file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-" + debug_file_name = os.path.join( + folder, file_prefix + "-".join(file_name_list) + ".txt") else: - sys.stdout.write("Fail to dump audio %s Debug File\n" % audio_file_name) - sys.stdout.write("There isn't any Hearing Aid debug data.\n\n") - + audio_file_name = "-".join(file_name_list) + file_type + if data.has_key(DEBUG_VERSION): + file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-" + debug_file_name = file_prefix + "-".join(file_name_list) + ".txt" + + sys.stdout.write("Start to dump Audio File : %s\n" % audio_file_name) + if data.has_key(AUDIO_DATA_B): + with open(audio_file_name, "wb+") as audio_file: + audio_file.write(data[AUDIO_DATA_B]) + sys.stdout.write( + "Finished to dump Audio File: %s\n\n" % audio_file_name) + else: + sys.stdout.write("Fail to dump Audio File: %s\n" % audio_file_name) + sys.stdout.write("There isn't any Hearing Aid audio data.\n\n") + + if simple_debug: + sys.stdout.write( + "Start to dump audio %s Debug File\n" % audio_file_name) + if data.has_key(DEBUG_DATA): + with open(debug_file_name, "wb+") as debug_file: + debug_file.write(data[DEBUG_DATA]) + sys.stdout.write( + "Finished to dump Debug File: %s\n\n" % debug_file_name) + else: + sys.stdout.write( + "Fail to dump audio %s Debug File\n" % audio_file_name) + sys.stdout.write("There isn't any Hearing Aid debug data.\n\n") def update_audio_data(relate_key, relate_value, key, value): - """ + """ This function records the dump audio file related information. audio_data = { PEER_ADDRESS:{ @@ -428,44 +443,44 @@ def update_audio_data(relate_key, relate_value, key, value): } } """ - if key == PEER_ADDRESS: - if audio_data.has_key(value): - # Dump audio data and clear previous data. - update_audio_data(key, value, START, False) - # Extra clear CONNECTION_HANDLE due to new connection create. - if audio_data[value].has_key(CONNECTION_HANDLE): - audio_data[value].pop(CONNECTION_HANDLE, "") - else: - device_audio_data = {key: value} - temp_audio_data = {value: device_audio_data} - audio_data.update(temp_audio_data) - else: - for i in audio_data: - if audio_data[i].has_key(relate_key) \ - and audio_data[i][relate_key] == relate_value: - if key == START: - if audio_data[i].has_key(key) and audio_data[i][key]: - dump_audio_data(audio_data[i]) - # Clear data except PEER_ADDRESS, CONNECTION_HANDLE and - # AUDIO_CONTROL_ATTR_HANDLE. - audio_data[i].pop(key, "") - audio_data[i].pop(TIMESTAMP_STR_FORMAT, "") - audio_data[i].pop(CODEC, "") - audio_data[i].pop(SAMPLE_RATE, "") - audio_data[i].pop(AUDIO_TYPE, "") - audio_data[i].pop(DEBUG_VERSION, "") - audio_data[i].pop(DEBUG_DATA, "") - audio_data[i].pop(AUDIO_DATA_B, "") - elif key == AUDIO_DATA_B or key == DEBUG_DATA: - if audio_data[i].has_key(START) and audio_data[i][START]: - if audio_data[i].has_key(key): - ori_data = audio_data[i].pop(key, "") - value = ori_data + value - else: - # Audio doesn't start, don't record. - return + if key == PEER_ADDRESS: + if audio_data.has_key(value): + # Dump audio data and clear previous data. + update_audio_data(key, value, START, False) + # Extra clear CONNECTION_HANDLE due to new connection create. + if audio_data[value].has_key(CONNECTION_HANDLE): + audio_data[value].pop(CONNECTION_HANDLE, "") + else: device_audio_data = {key: value} - audio_data[i].update(device_audio_data) + temp_audio_data = {value: device_audio_data} + audio_data.update(temp_audio_data) + else: + for i in audio_data: + if audio_data[i].has_key(relate_key) \ + and audio_data[i][relate_key] == relate_value: + if key == START: + if audio_data[i].has_key(key) and audio_data[i][key]: + dump_audio_data(audio_data[i]) + # Clear data except PEER_ADDRESS, CONNECTION_HANDLE and + # AUDIO_CONTROL_ATTR_HANDLE. + audio_data[i].pop(key, "") + audio_data[i].pop(TIMESTAMP_STR_FORMAT, "") + audio_data[i].pop(CODEC, "") + audio_data[i].pop(SAMPLE_RATE, "") + audio_data[i].pop(AUDIO_TYPE, "") + audio_data[i].pop(DEBUG_VERSION, "") + audio_data[i].pop(DEBUG_DATA, "") + audio_data[i].pop(AUDIO_DATA_B, "") + elif key == AUDIO_DATA_B or key == DEBUG_DATA: + if audio_data[i].has_key(START) and audio_data[i][START]: + if audio_data[i].has_key(key): + ori_data = audio_data[i].pop(key, "") + value = ori_data + value + else: + # Audio doesn't start, don't record. + return + device_audio_data = {key: value} + audio_data[i].update(device_audio_data) #======================================================================= @@ -474,159 +489,188 @@ def update_audio_data(relate_key, relate_value, key, value): def get_audio_control_attr_handle(connection_handle): - """This function gets audio_control_attr_handle.""" - # If force_audio_control_attr_handle is set, will use it first. - if force_audio_control_attr_handle is not None: - return force_audio_control_attr_handle + """This function gets audio_control_attr_handle.""" + # If force_audio_control_attr_handle is set, will use it first. + if force_audio_control_attr_handle is not None: + return force_audio_control_attr_handle - # Try to check the audio_control_attr_handle is record into audio_data. - for i in audio_data: - if audio_data[i].has_key(CONNECTION_HANDLE) \ - and audio_data[i][CONNECTION_HANDLE] == connection_handle: - if audio_data[i].has_key(AUDIO_CONTROL_ATTR_HANDLE): - return audio_data[i][AUDIO_CONTROL_ATTR_HANDLE] + # Try to check the audio_control_attr_handle is record into audio_data. + for i in audio_data: + if audio_data[i].has_key(CONNECTION_HANDLE) \ + and audio_data[i][CONNECTION_HANDLE] == connection_handle: + if audio_data[i].has_key(AUDIO_CONTROL_ATTR_HANDLE): + return audio_data[i][AUDIO_CONTROL_ATTR_HANDLE] - # Return default attr_handle if audio_data doesn't record it. - return default_audio_control_attr_handle + # Return default attr_handle if audio_data doesn't record it. + return default_audio_control_attr_handle def unpack_data(data, byte): - """This function unpacks data.""" - if byte == 1: - value = struct.unpack(">B", data[0])[0] - elif byte == 2: - value = struct.unpack(">H", data[1]+data[0])[0] - else: - value = "" - data = data[byte:] - return value, data + """This function unpacks data.""" + if byte == 1: + value = struct.unpack(">B", data[0])[0] + elif byte == 2: + value = struct.unpack(">H", data[1] + data[0])[0] + else: + value = "" + data = data[byte:] + return value, data def convert_time_str(timestamp): - """This function converts time to string format.""" - really_timestamp = float(timestamp) / SEC_CONVERT - local_timestamp = time.localtime(really_timestamp) - dt = really_timestamp - long(really_timestamp) - ms_str = "{0:06}".format(int(round(dt * 1000000))) + """This function converts time to string format.""" + really_timestamp = float(timestamp) / SEC_CONVERT + local_timestamp = time.localtime(really_timestamp) + dt = really_timestamp - long(really_timestamp) + ms_str = "{0:06}".format(int(round(dt * 1000000))) - str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp) - full_str_format = str_format + "_" + ms_str + str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp) + full_str_format = str_format + "_" + ms_str - time_format = time.strftime("%m-%d %H:%M:%S", local_timestamp) - full_time_format = time_format + "." + ms_str - return full_str_format, full_time_format + time_format = time.strftime("%m-%d %H:%M:%S", local_timestamp) + full_time_format = time_format + "." + ms_str + return full_str_format, full_time_format def set_config(): - """This function is for set config by flag and check the argv is correct.""" - argv_parser = argparse.ArgumentParser( - description="Extracts Hearing Aid audio data from BTSNOOP.") - argv_parser.add_argument("BTSNOOP", help="BLUETOOTH BTSNOOP file.") - argv_parser.add_argument("-f", "--folder", help="select output folder.", - dest="folder") - argv_parser.add_argument("-c1", "--connection-handle1", - help="set a fake connection handle 1 to capture \ - audio dump.", dest="connection_handle1", type=int) - argv_parser.add_argument("-c2", "--connection-handle2", - help="set a fake connection handle 2 to capture \ - audio dump.", dest="connection_handle2", type=int) - argv_parser.add_argument("-ns", "--no-start", help="No audio 'Start' cmd is \ + """This function is for set config by flag and check the argv is correct.""" + argv_parser = argparse.ArgumentParser( + description="Extracts Hearing Aid audio data from BTSNOOP.") + argv_parser.add_argument("BTSNOOP", help="BLUETOOTH BTSNOOP file.") + argv_parser.add_argument( + "-f", "--folder", help="select output folder.", dest="folder") + argv_parser.add_argument( + "-c1", + "--connection-handle1", + help="set a fake connection handle 1 to capture \ + audio dump.", + dest="connection_handle1", + type=int) + argv_parser.add_argument( + "-c2", + "--connection-handle2", + help="set a fake connection handle 2 to capture \ + audio dump.", + dest="connection_handle2", + type=int) + argv_parser.add_argument( + "-ns", + "--no-start", + help="No audio 'Start' cmd is \ needed before extracting audio data.", - dest="no_start", default="False") - argv_parser.add_argument("-dc", "--default-codec", help="set a default \ - codec.", dest="codec", default="G722") - argv_parser.add_argument("-a", "--attr-handle", - help="force to select audio control attr handle.", - dest="audio_control_attr_handle", type=int) - argv_parser.add_argument("-d", "--debug", - help="dump full debug buffer content.", - dest="full_debug", default="False") - argv_parser.add_argument("-sd", "--simple-debug", - help="dump debug buffer header content.", - dest="simple_debug", default="False") - arg = argv_parser.parse_args() - - if arg.folder is not None: - global folder - folder = arg.folder - - if arg.connection_handle1 is not None and arg.connection_handle2 is not None \ - and arg.connection_handle1 == arg.connection_handle2: + dest="no_start", + default="False") + argv_parser.add_argument( + "-dc", + "--default-codec", + help="set a default \ + codec.", + dest="codec", + default="G722") + argv_parser.add_argument( + "-a", + "--attr-handle", + help="force to select audio control attr handle.", + dest="audio_control_attr_handle", + type=int) + argv_parser.add_argument( + "-d", + "--debug", + help="dump full debug buffer content.", + dest="full_debug", + default="False") + argv_parser.add_argument( + "-sd", + "--simple-debug", + help="dump debug buffer header content.", + dest="simple_debug", + default="False") + arg = argv_parser.parse_args() + + if arg.folder is not None: + global folder + folder = arg.folder + + if arg.connection_handle1 is not None and arg.connection_handle2 is not None \ + and arg.connection_handle1 == arg.connection_handle2: argv_parser.error("connection_handle1 can't be same with \ connection_handle2") exit(1) - if not (arg.no_start.lower() == "true" or arg.no_start.lower() == "false"): - argv_parser.error("-ns/--no-start arg is invalid, it should be true/false.") - exit(1) - - if arg.connection_handle1 is not None: - fake_name = "ConnectionHandle" + str(arg.connection_handle1) - update_audio_data("", "", PEER_ADDRESS, fake_name) - update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, - arg.connection_handle1) - if arg.no_start.lower() == "true": - update_audio_data(PEER_ADDRESS, fake_name, START, True) - update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown") - update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec) - update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown") - update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown") - - if arg.connection_handle2 is not None: - fake_name = "ConnectionHandle" + str(arg.connection_handle2) - update_audio_data("", "", PEER_ADDRESS, fake_name) - update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, - arg.connection_handle2) - if arg.no_start.lower() == "true": - update_audio_data(PEER_ADDRESS, fake_name, START, True) - update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown") - update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec) - update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown") - update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown") - - if arg.audio_control_attr_handle is not None: - global force_audio_control_attr_handle - force_audio_control_attr_handle = arg.audio_control_attr_handle - - global full_debug - global simple_debug - if arg.full_debug.lower() == "true": - full_debug = True - simple_debug = True - elif arg.simple_debug.lower() == "true": - simple_debug = True - - if os.path.isfile(arg.BTSNOOP): - return arg.BTSNOOP - else: - argv_parser.error("BTSNOOP file not found: %s" % arg.BTSNOOP) - exit(1) - - -def main(): - btsnoop_file_name = set_config() - - with open(btsnoop_file_name, "rb") as btsnoop_file: - identification = btsnoop_file.read(8) - if identification != "btsnoop\0": - sys.stderr.write( - "Check identification fail. It is not correct btsnoop file.") - exit(1) + if not (arg.no_start.lower() == "true" or arg.no_start.lower() == "false"): + argv_parser.error( + "-ns/--no-start arg is invalid, it should be true/false.") + exit(1) - ver, data_link = struct.unpack(">II", btsnoop_file.read(4 + 4)) - if (ver != 1) or (data_link != 1002): - sys.stderr.write( - "Check ver or dataLink fail. It is not correct btsnoop file.") - exit(1) + if arg.connection_handle1 is not None: + fake_name = "ConnectionHandle" + str(arg.connection_handle1) + update_audio_data("", "", PEER_ADDRESS, fake_name) + update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, + arg.connection_handle1) + if arg.no_start.lower() == "true": + update_audio_data(PEER_ADDRESS, fake_name, START, True) + update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, + "Unknown") + update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec) + update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown") + update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown") + + if arg.connection_handle2 is not None: + fake_name = "ConnectionHandle" + str(arg.connection_handle2) + update_audio_data("", "", PEER_ADDRESS, fake_name) + update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, + arg.connection_handle2) + if arg.no_start.lower() == "true": + update_audio_data(PEER_ADDRESS, fake_name, START, True) + update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, + "Unknown") + update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec) + update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown") + update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown") + + if arg.audio_control_attr_handle is not None: + global force_audio_control_attr_handle + force_audio_control_attr_handle = arg.audio_control_attr_handle + + global full_debug + global simple_debug + if arg.full_debug.lower() == "true": + full_debug = True + simple_debug = True + elif arg.simple_debug.lower() == "true": + simple_debug = True + + if os.path.isfile(arg.BTSNOOP): + return arg.BTSNOOP + else: + argv_parser.error("BTSNOOP file not found: %s" % arg.BTSNOOP) + exit(1) - while True: - if not parse_packet(btsnoop_file): - break - for i in audio_data: - if audio_data[i].get(START, False): - dump_audio_data(audio_data[i]) +def main(): + btsnoop_file_name = set_config() + + with open(btsnoop_file_name, "rb") as btsnoop_file: + identification = btsnoop_file.read(8) + if identification != "btsnoop\0": + sys.stderr.write( + "Check identification fail. It is not correct btsnoop file.") + exit(1) + + ver, data_link = struct.unpack(">II", btsnoop_file.read(4 + 4)) + if (ver != 1) or (data_link != 1002): + sys.stderr.write( + "Check ver or dataLink fail. It is not correct btsnoop file.") + exit(1) + + while True: + if not parse_packet(btsnoop_file): + break + + for i in audio_data: + if audio_data[i].get(START, False): + dump_audio_data(audio_data[i]) if __name__ == "__main__": - main() + main() diff --git a/system/tools/scripts/dump_metrics_ascii.py b/system/tools/scripts/dump_metrics_ascii.py index c24d29995f..20d413285b 100755 --- a/system/tools/scripts/dump_metrics_ascii.py +++ b/system/tools/scripts/dump_metrics_ascii.py @@ -23,6 +23,7 @@ from distutils.spawn import find_executable import google.protobuf.text_format as text_format from importlib import import_module + def compile_proto(proto_path, output_dir): """Invoke Protocol Compiler to generate python from given source .proto.""" # Find compiler path @@ -48,15 +49,15 @@ def compile_proto(proto_path, output_dir): if not os.path.exists(output_dir): os.mkdirs(output_dir) elif not os.path.isdir(output_dir): - logging.error("Output path is not a valid directory: %s" % - (output_dir)) + logging.error("Output path is not a valid directory: %s" % (output_dir)) return None input_dir = os.path.dirname(proto_path) output_filename = os.path.basename(proto_path).replace('.proto', '_pb2.py') output_path = os.path.join(output_dir, output_filename) protoc_command = [ - protoc, '-I=%s' % (input_dir), '--python_out=%s' % (output_dir), - proto_path + protoc, + '-I=%s' % (input_dir), + '--python_out=%s' % (output_dir), proto_path ] if subprocess.call(protoc_command, stderr=subprocess.STDOUT) != 0: logging.error("Fail to compile proto") @@ -81,8 +82,8 @@ def compile_import_proto(output_dir, proto_path): try: output_module = import_module(output_module_name) except ImportError: - logging.error("Cannot import generated py-proto %s" % - (output_module_name)) + logging.error( + "Cannot import generated py-proto %s" % (output_module_name)) return output_module @@ -94,26 +95,31 @@ def parse_proto_to_ascii(binary_proto_msg): """ return text_format.MessageToString(binary_proto_msg) + def dump_metrics(): os.system('adb wait-for-device') - p = subprocess.Popen("adb shell dumpsys bluetooth_manager --proto-bin", - shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + p = subprocess.Popen( + "adb shell dumpsys bluetooth_manager --proto-bin", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, stdin=subprocess.PIPE) return p.communicate() + def get_bluetooth_metrics(proto_native_str_64, bluetooth_proto_module): bluetooth_log = bluetooth_proto_module.BluetoothLog() proto_native_str = base64.b64decode(proto_native_str_64) bluetooth_log.MergeFromString(proto_native_str) return bluetooth_log + def main(): root = logging.getLogger() root.setLevel(logging.DEBUG) log_handler = logging.StreamHandler(sys.stderr) log_handler.setLevel(logging.DEBUG) - formatter = logging.Formatter( - "%(asctime)s %(levelname)s %(message)s") + formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") log_handler.setFormatter(formatter) root.addHandler(log_handler) if len(sys.argv) < 2: @@ -125,7 +131,7 @@ def main(): logging.info("Requires Protobuf compiler, protoc, version >=3.0.0") sys.exit(0) bluetooth_proto_module = compile_import_proto(tempfile.gettempdir(), - sys.argv[1]) + sys.argv[1]) if not bluetooth_proto_module: logging.error("Cannot compile " + sys.argv[1]) sys.exit(1) @@ -136,5 +142,6 @@ def main(): bluetooth_log_ascii = parse_proto_to_ascii(bluetooth_log) print(bluetooth_log_ascii) + if __name__ == "__main__": main() diff --git a/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py b/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py index 0c425af935..6833eeb8f5 100644 --- a/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py +++ b/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py @@ -73,31 +73,31 @@ from scapy.all import * class HCI_Cmd_Create_Connection(Packet): - name = 'Create Connection' - fields_desc = [ - LEMACField('addr', None), - LEShortField('packet_type', 8), - ByteEnumField('page_scan_repetition_mode', 0, { - 0: 'R0', - 1: 'R1', - 2: 'R2' - }), - ByteEnumField('rsvd', 0, {0: 'Reserved'}), - LEShortField('clock_offset', 0), - ByteEnumField('allow_role_switch', 1, { - 0: 'false', - 1: 'true' - }), - ] + name = 'Create Connection' + fields_desc = [ + LEMACField('addr', None), + LEShortField('packet_type', 8), + ByteEnumField('page_scan_repetition_mode', 0, { + 0: 'R0', + 1: 'R1', + 2: 'R2' + }), + ByteEnumField('rsvd', 0, {0: 'Reserved'}), + LEShortField('clock_offset', 0), + ByteEnumField('allow_role_switch', 1, { + 0: 'false', + 1: 'true' + }), + ] class HCI_Cmd_Inquiry(Packet): - name = 'Inquiry' - fields_desc = [ - X3BytesField('LAP', 0x9e8b0), - ByteField('length', 1), - ByteField('max_responses', 0), - ] + name = 'Inquiry' + fields_desc = [ + X3BytesField('LAP', 0x9e8b0), + ByteField('length', 1), + ByteField('max_responses', 0), + ] bind_layers(HCI_Command_Hdr, HCI_Cmd_Inquiry, opcode=0x0401) @@ -105,96 +105,96 @@ bind_layers(HCI_Command_Hdr, HCI_Cmd_Create_Connection, opcode=0x0405) class HCI_Event_Inquiry_Result(Packet): - name = 'Inquiry Result' - fields_desc = [ - ByteField('num_responses', 0), - LEMACField('addr', None), - ByteEnumField('page_scan_repetition_mode', 0, { - 0: 'R0', - 1: 'R1', - 2: 'R2' - }), - LEShortEnumField('rsvd', 0, {0: 'Reserved'}), - X3BytesField('class_of_device', 0), - LEShortField('clock_offset', 0), - ] + name = 'Inquiry Result' + fields_desc = [ + ByteField('num_responses', 0), + LEMACField('addr', None), + ByteEnumField('page_scan_repetition_mode', 0, { + 0: 'R0', + 1: 'R1', + 2: 'R2' + }), + LEShortEnumField('rsvd', 0, {0: 'Reserved'}), + X3BytesField('class_of_device', 0), + LEShortField('clock_offset', 0), + ] class HCI_Event_Connection_Complete(Packet): - name = 'Connection Complete' - fields_desc = [ - ByteField('status', 0), - LEShortField('handle', 0xffff), - LEMACField('addr', None), - ByteField('link_type', 1), - ByteField('encryption_mode', 0), - ] + name = 'Connection Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + LEMACField('addr', None), + ByteField('link_type', 1), + ByteField('encryption_mode', 0), + ] class HCI_Event_Remote_Name_Request_Complete(Packet): - name = 'Remote Name Request Complete' - fields_desc = [ - ByteField('status', 0), - LEMACField('addr', None), - ] + name = 'Remote Name Request Complete' + fields_desc = [ + ByteField('status', 0), + LEMACField('addr', None), + ] class HCI_Event_Read_Remote_Supported_Features_Complete(Packet): - name = 'Read Remote Supported Features Complete' - fields_desc = [ - ByteField('status', 0), - LEShortField('handle', 0xffff), - XLELongField('features', 0x0123456789abcdef), - ] + name = 'Read Remote Supported Features Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + XLELongField('features', 0x0123456789abcdef), + ] class HCI_Event_Read_Remote_Version_Information_Complete(Packet): - name = 'Read Remote Version Information Complete' - fields_desc = [ - ByteField('status', 0), - LEShortField('handle', 0xffff), - ByteField('version', 0), - LEShortField('manufacturer_name', 0), - LEShortField('subversion', 0), - ] + name = 'Read Remote Version Information Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + ByteField('version', 0), + LEShortField('manufacturer_name', 0), + LEShortField('subversion', 0), + ] class HCI_Event_Read_Clock_Offset_Complete(Packet): - name = 'Read Clock Offset Complete' - fields_desc = [ - ByteField('status', 0), - LEShortField('handle', 0xffff), - LEShortField('offset', 0xffff), - ] + name = 'Read Clock Offset Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + LEShortField('offset', 0xffff), + ] class HCI_Event_Read_Remote_Extended_Features_Complete(Packet): - name = 'Read Remote Supported Features Complete' - fields_desc = [ - ByteField('status', 0), - LEShortField('handle', 0xffff), - ByteField('page_number', 0), - ByteField('max_page_number', 0), - XLELongField('features', 0x0123456789abcdef), - ] + name = 'Read Remote Supported Features Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + ByteField('page_number', 0), + ByteField('max_page_number', 0), + XLELongField('features', 0x0123456789abcdef), + ] class HCI_Event_Extended_Inquiry_Result(Packet): - name = 'Extended Inquiry Result' - fields_desc = [ - ByteField('num_responses', 1), - LEMACField('addr', None), - ByteEnumField('page_scan_repetition_mode', 0, { - 0: 'R0', - 1: 'R1', - 2: 'R2' - }), - ByteEnumField('rsvd', 0, {0: 'Reserved'}), - X3BytesField('class_of_device', 0), - LEShortField('clock_offset', 0), - SignedByteField('rssi', -20), - PacketListField('extended_inquiry_response', [], EIR_Hdr, 1), - ] + name = 'Extended Inquiry Result' + fields_desc = [ + ByteField('num_responses', 1), + LEMACField('addr', None), + ByteEnumField('page_scan_repetition_mode', 0, { + 0: 'R0', + 1: 'R1', + 2: 'R2' + }), + ByteEnumField('rsvd', 0, {0: 'Reserved'}), + X3BytesField('class_of_device', 0), + LEShortField('clock_offset', 0), + SignedByteField('rssi', -20), + PacketListField('extended_inquiry_response', [], EIR_Hdr, 1), + ] bind_layers(HCI_Event_Hdr, HCI_Event_Inquiry_Result, code=0x02) @@ -214,228 +214,236 @@ bind_layers(HCI_Event_Hdr, HCI_Event_Extended_Inquiry_Result, code=0x2f) class HCISocket(SuperSocket): - """Simple wrapper class for a socket object. + """Simple wrapper class for a socket object. Attributes: socket: The underlying socket created for the specified address and port. """ - def __init__(self, port): - self.done_ = False - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - self.ins = self.outs = s - self.packets_ = queue.Queue() - self.rx_thread_ = threading.Thread(target=self.rx_thread_body) - self.rx_thread_.start() - - def rx_bytes(self, size): - while not self.done_: - raw_bytes = b'' - while len(raw_bytes) < size and not self.done_: - more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048)) - if more_raw_bytes: - raw_bytes += more_raw_bytes - return raw_bytes - - def rx_thread_body(self): - while not self.done_: - payload_length = 0 - # Read the type - type_byte = self.rx_bytes(1) - if not type_byte: - continue - # Read the Header - header = b'' - if type_byte == b'\x01': # Command - header = self.rx_bytes(3) - if not header: - continue - payload_length = header[2] - elif type_byte == b'\x02': # ACL - header = self.rx_bytes(4) - if not header: - continue - payload_length = header[3] << 8 - payload_length |= header[2] - elif type_byte == b'\x03': # SCO - header = self.rx_bytes(3) - if not header: - continue - payload_length = header[2] - elif type_byte == b'\x04': # Event - header = self.rx_bytes(2) - if not header: - continue - payload_length = header[1] - else: + def __init__(self, port): + self.done_ = False + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('localhost', port)) + self.ins = self.outs = s + self.packets_ = queue.Queue() + self.rx_thread_ = threading.Thread(target=self.rx_thread_body) + self.rx_thread_.start() + + def rx_bytes(self, size): + while not self.done_: + raw_bytes = b'' + while len(raw_bytes) < size and not self.done_: + more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048)) + if more_raw_bytes: + raw_bytes += more_raw_bytes + return raw_bytes + + def rx_thread_body(self): + while not self.done_: + payload_length = 0 + # Read the type + type_byte = self.rx_bytes(1) + if not type_byte: + continue + # Read the Header + header = b'' + if type_byte == b'\x01': # Command + header = self.rx_bytes(3) + if not header: + continue + payload_length = header[2] + elif type_byte == b'\x02': # ACL + header = self.rx_bytes(4) + if not header: + continue + payload_length = header[3] << 8 + payload_length |= header[2] + elif type_byte == b'\x03': # SCO + header = self.rx_bytes(3) + if not header: + continue + payload_length = header[2] + elif type_byte == b'\x04': # Event + header = self.rx_bytes(2) + if not header: + continue + payload_length = header[1] + else: + self.done_ = True + print('Rx: type_byte ' + hex(type_byte[0])) + # Read the Payload + payload = self.rx_bytes( + payload_length) if payload_length != 0 else b'' + packet_bytes = type_byte + header + payload + packet = HCI_Hdr(packet_bytes) + print('Rx: ' + packet.__repr__()) + self.packets_.put(packet) + + def get_packet(self): + if self.packets_.empty(): + return False + return self.packets_.get() + + def tell_rx_thread_to_quit(self): self.done_ = True - print('Rx: type_byte ' + hex(type_byte[0])) - # Read the Payload - payload = self.rx_bytes(payload_length) if payload_length != 0 else b'' - packet_bytes = type_byte + header + payload - packet = HCI_Hdr(packet_bytes) - print('Rx: ' + packet.__repr__()) - self.packets_.put(packet) - - def get_packet(self): - if self.packets_.empty(): - return False - return self.packets_.get() - - def tell_rx_thread_to_quit(self): - self.done_ = True - self.rx_thread_.join() + self.rx_thread_.join() class HCIShell(cmd.Cmd): - """Shell for sending binary data to a port. + """Shell for sending binary data to a port. """ - def __init__(self, hci): - cmd.Cmd.__init__(self) - self._hci = hci + def __init__(self, hci): + cmd.Cmd.__init__(self) + self._hci = hci - def do_send(self, args): - """Arguments: dev_type_str Add a new device of type dev_type_str. + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. """ - self._hci.send_binary(args.split()) + self._hci.send_binary(args.split()) - def do_connect(self, args): - """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds) + def do_connect(self, args): + """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds) """ - split_args = args.split() - address = split_args[0] if len(split_args) > 0 else 'NULL' - timeout = int(split_args[1]) if len(split_args) > 1 else 2 - num_responses = 0 - connect = HCI_Hdr(type='Command') / HCI_Command_Hdr( - opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address) - self._hci.send(connect) - status = None - while status == None: - response = self._hci.get_packet() - if response == False: - continue - if response[HCI_Hdr].type == HCI_Hdr( - type='Event' - ).type and response[HCI_Event_Hdr].code == 0xf and response[ - HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode: - status = response[HCI_Event_Command_Status].status - if status != HCI_Event_Command_Status(status='pending').status: - print('Connection failed with status = ' + str(status)) - return - - handle = None - while handle == None: - connection_complete = self._hci.get_packet() - if connection_complete == False: - continue - if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type) and ( - connection_complete[HCI_Event_Hdr].code == 0x3): - status = connection_complete[HCI_Event_Connection_Complete].status - if status != 0: - print('Connection complete with failed status = ' + str(status)) - return - handle = connection_complete[HCI_Event_Connection_Complete].handle - print('Connection established with handle ' + str(handle)) - connection_complete.show() - hexdump(connection_complete) - - l2cap_done = False - while l2cap_done == None: - l2cap_req = self._hci.get_packet() - if l2cap_req == False: - continue - if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and ( - l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr(cid='control').cid) and ( - l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req').code - ) and (l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq( - type='FEAT_MASK').type): - print('Send Features packet' + HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr( - handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) / - L2CAP_Hdr(len=12, cid='control') / - L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp( - type=l2cap_req[L2CAP_InfoResp].type, - result='success', - data=b'\xb8\x00\x00\x00').__repr__()) - self._hci.send( - HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr( - handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) / - L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr( - code='info_resp', id=146, len=8) / L2CAP_InfoResp( - type=l2cap_req[L2CAP_InfoResp].type, - result='success', - data=b'\xb8\x00\x00\x00')) - - def do_le_scan(self, args): - """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices + split_args = args.split() + address = split_args[0] if len(split_args) > 0 else 'NULL' + timeout = int(split_args[1]) if len(split_args) > 1 else 2 + num_responses = 0 + connect = HCI_Hdr(type='Command') / HCI_Command_Hdr( + opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address) + self._hci.send(connect) + status = None + while status == None: + response = self._hci.get_packet() + if response == False: + continue + if response[HCI_Hdr].type == HCI_Hdr( + type='Event' + ).type and response[HCI_Event_Hdr].code == 0xf and response[HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode: + status = response[HCI_Event_Command_Status].status + if status != HCI_Event_Command_Status(status='pending').status: + print('Connection failed with status = ' + str(status)) + return + + handle = None + while handle == None: + connection_complete = self._hci.get_packet() + if connection_complete == False: + continue + if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type + ) and (connection_complete[HCI_Event_Hdr].code == 0x3): + status = connection_complete[ + HCI_Event_Connection_Complete].status + if status != 0: + print('Connection complete with failed status = ' + + str(status)) + return + handle = connection_complete[ + HCI_Event_Connection_Complete].handle + print('Connection established with handle ' + str(handle)) + connection_complete.show() + hexdump(connection_complete) + + l2cap_done = False + while l2cap_done == None: + l2cap_req = self._hci.get_packet() + if l2cap_req == False: + continue + if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and ( + l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr(cid='control').cid + ) and (l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req') + .code) and (l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq( + type='FEAT_MASK').type): + print('Send Features packet' + HCI_Hdr( + type='ACL Data' + ) / HCI_ACL_Hdr( + handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) / + L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr( + code='info_resp', id=146, len=8) / L2CAP_InfoResp( + type=l2cap_req[L2CAP_InfoResp].type, + result='success', + data=b'\xb8\x00\x00\x00').__repr__()) + self._hci.send( + HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr( + handle=l2cap_req[HCI_ACL_Hdr].handle, + PB=0, + BC=2, + len=16) / + L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr( + code='info_resp', id=146, len=8) / L2CAP_InfoResp( + type=l2cap_req[L2CAP_InfoResp].type, + result='success', + data=b'\xb8\x00\x00\x00')) + + def do_le_scan(self, args): + """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices """ - split_args = args.split() - enable = int(split_args[0]) if len(split_args) > 0 else 1 - filter_dups = int(split_args[1]) if len(split_args) > 1 else 1 - set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr( - opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable( - enable=enable, filter_dups=filter_dups) - print('Tx: ' + set_scan_enable.__repr__()) - self._hci.send(set_scan_enable) - - def do_scan(self, args): - """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices + split_args = args.split() + enable = int(split_args[0]) if len(split_args) > 0 else 1 + filter_dups = int(split_args[1]) if len(split_args) > 1 else 1 + set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr( + opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable( + enable=enable, filter_dups=filter_dups) + print('Tx: ' + set_scan_enable.__repr__()) + self._hci.send(set_scan_enable) + + def do_scan(self, args): + """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices """ - split_args = args.split() - scan_time = int(split_args[0]) if len(split_args) > 0 else 0 - max_responses = int(split_args[1]) if len(split_args) > 1 else 0 - num_responses = 0 - inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr( - opcode=0x0401) / HCI_Cmd_Inquiry( - length=scan_time, max_responses=max_responses) - print('Tx: ' + inquiry.__repr__()) - self._hci.send(inquiry) - - def do_quit(self, args): - """Arguments: None. + split_args = args.split() + scan_time = int(split_args[0]) if len(split_args) > 0 else 0 + max_responses = int(split_args[1]) if len(split_args) > 1 else 0 + num_responses = 0 + inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr( + opcode=0x0401) / HCI_Cmd_Inquiry( + length=scan_time, max_responses=max_responses) + print('Tx: ' + inquiry.__repr__()) + self._hci.send(inquiry) + + def do_quit(self, args): + """Arguments: None. Exits. """ - self._hci.tell_rx_thread_to_quit() - self._hci.close() - print('Goodbye.') - return True + self._hci.tell_rx_thread_to_quit() + self._hci.close() + print('Goodbye.') + return True - def do_help(self, args): - """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. """ - if (len(args) == 0): - cmd.Cmd.do_help(self, args) + if (len(args) == 0): + cmd.Cmd.do_help(self, args) def main(argv): - if len(argv) != 2: - print('Usage: python hci_socket.py [port]') - return - try: - port = int(argv[1]) - except ValueError: - print('Error parsing port.') - else: + if len(argv) != 2: + print('Usage: python hci_socket.py [port]') + return try: - hci = HCISocket(port) - except socket.error as e: - print('Error connecting to socket: %s' % e) - except: - print('Error creating (check arguments).') + port = int(argv[1]) + except ValueError: + print('Error parsing port.') else: - hci_shell = HCIShell(hci) - hci_shell.prompt = '$ ' - hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' + - 'Type \'help\' for more information.') + try: + hci = HCISocket(port) + except socket.error as e: + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + hci_shell = HCIShell(hci) + hci_shell.prompt = '$ ' + hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' + + 'Type \'help\' for more information.') if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py b/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py index c7b304520c..e9779cb6db 100644 --- a/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py +++ b/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py @@ -69,124 +69,132 @@ import string import struct import sys + class LinkLayerSocket(object): - """Simple wrapper class for a socket object. + """Simple wrapper class for a socket object. Attributes: socket: The underlying socket created for the specified address and port. """ - def __init__(self, port): - print('port = ' + port) - self.done_ = False - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.connect(('localhost', port)) - # Should it be a non-blocking socket? - # self._socket.setblocking(0) - self.packets_ = queue.Queue() - self.rx_thread_ = threading.Thread(target=self.rx_thread_body) - self.rx_thread_.start() - - def rx_bytes(self, size): - while not self.done_: - raw_bytes = b'' - while len(raw_bytes) < size and not self.done_: - more_raw_bytes = self._socket.recv(min(size - len(raw_bytes), 2048)) - if more_raw_bytes: - raw_bytes += more_raw_bytes - return raw_bytes - - def rx_thread_body(self): - while not self.done_: - payload_length = 0 - # Read the size (4B), the type (1B), and the addresses (2*6B) - header = self.rx_bytes(17) - if not header: - continue - payload_length = header[0] - payload_length |= header[1] << 8 - payload_length |= header[2] << 16 - payload_length |= header[3] << 24 - print('Rx: type_byte ' + hex(header[4])) - print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' + hex(header[7]) + ':' + hex(header[8]) + ':' + hex(header[9]) + ':' + hex(header[10])) - print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' + hex(header[13]) + ':' + hex(header[14]) + ':' + hex(header[15]) + ':' + hex(header[16])) - # Read the Payload - payload = self.rx_bytes(payload_length) if payload_length != 0 else b'' - packet_bytes = header + payload - self.packets_.put(packet_bytes) - - def get_packet(self): - if self.packets_.empty(): - return False - return self.packets_.get() - - def send_binary(self, args): - joined_args = ''.join(arg for arg in args) - print(joined_args) - packet = binascii.a2b_hex(joined_args) - if self._done: - return - self._connection.send(packet) - - def tell_rx_thread_to_quit(self): - self.done_ = True - self.rx_thread_.join() + def __init__(self, port): + print('port = ' + port) + self.done_ = False + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) + # Should it be a non-blocking socket? + # self._socket.setblocking(0) + self.packets_ = queue.Queue() + self.rx_thread_ = threading.Thread(target=self.rx_thread_body) + self.rx_thread_.start() + + def rx_bytes(self, size): + while not self.done_: + raw_bytes = b'' + while len(raw_bytes) < size and not self.done_: + more_raw_bytes = self._socket.recv( + min(size - len(raw_bytes), 2048)) + if more_raw_bytes: + raw_bytes += more_raw_bytes + return raw_bytes + + def rx_thread_body(self): + while not self.done_: + payload_length = 0 + # Read the size (4B), the type (1B), and the addresses (2*6B) + header = self.rx_bytes(17) + if not header: + continue + payload_length = header[0] + payload_length |= header[1] << 8 + payload_length |= header[2] << 16 + payload_length |= header[3] << 24 + print('Rx: type_byte ' + hex(header[4])) + print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' + + hex(header[7]) + ':' + hex(header[8]) + ':' + hex(header[9]) + + ':' + hex(header[10])) + print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' + + hex(header[13]) + ':' + hex(header[14]) + ':' + + hex(header[15]) + ':' + hex(header[16])) + # Read the Payload + payload = self.rx_bytes( + payload_length) if payload_length != 0 else b'' + packet_bytes = header + payload + self.packets_.put(packet_bytes) + + def get_packet(self): + if self.packets_.empty(): + return False + return self.packets_.get() + + def send_binary(self, args): + joined_args = ''.join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + if self._done: + return + self._connection.send(packet) + + def tell_rx_thread_to_quit(self): + self.done_ = True + self.rx_thread_.join() class LinkLayerShell(cmd.Cmd): - """Shell for sending binary data to a port. + """Shell for sending binary data to a port. """ - def __init__(self, link_layer): - cmd.Cmd.__init__(self) - self._link_layer = link_layer + def __init__(self, link_layer): + cmd.Cmd.__init__(self) + self._link_layer = link_layer - def do_send(self, args): - """Arguments: binary representation of a packet. + def do_send(self, args): + """Arguments: binary representation of a packet. """ - self._link_layer.send_binary(args.split()) + self._link_layer.send_binary(args.split()) - def do_quit(self, args): - """Arguments: None. + def do_quit(self, args): + """Arguments: None. Exits. """ - self._link_layer.tell_rx_thread_to_quit() - self._link_layer.close() - print('Goodbye.') - return True + self._link_layer.tell_rx_thread_to_quit() + self._link_layer.close() + print('Goodbye.') + return True - def do_help(self, args): - """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. """ - if (len(args) == 0): - cmd.Cmd.do_help(self, args) + if (len(args) == 0): + cmd.Cmd.do_help(self, args) def main(argv): - if len(argv) != 2: - print('Usage: python link_layer_socket.py [port]') - return - try: - port = int(argv[1]) - except ValueError: - print('Error parsing port.') - else: + if len(argv) != 2: + print('Usage: python link_layer_socket.py [port]') + return try: - link_layer = LinkLayerSocket(port) - except socket.error as e: - print('Error connecting to socket: %s' % e) - except: - print('Error creating (check arguments).') + port = int(argv[1]) + except ValueError: + print('Error parsing port.') else: - link_layer_shell = LinkLayerShell(link_layer) - link_layer_shell.prompt = '$ ' - link_layer_shell.cmdloop('Welcome to the RootCanal LinkLayer Console \n' + - 'Type \'help\' for more information.') + try: + link_layer = LinkLayerSocket(port) + except socket.error as e: + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + link_layer_shell = LinkLayerShell(link_layer) + link_layer_shell.prompt = '$ ' + link_layer_shell.cmdloop( + 'Welcome to the RootCanal LinkLayer Console \n' + + 'Type \'help\' for more information.') if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py b/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py index cadeab3c76..b7b0670f8a 100644 --- a/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py +++ b/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py @@ -75,146 +75,146 @@ DEVICE_ADDRESS_LENGTH = 6 # Used to generate fake device names and addresses during discovery. def generate_random_name(): - return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ - string.digits) for _ in range(DEVICE_NAME_LENGTH)) + return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ + string.digits) for _ in range(DEVICE_NAME_LENGTH)) def generate_random_address(): - return ''.join(random.SystemRandom().choice(string.digits) for _ in \ - range(DEVICE_ADDRESS_LENGTH)) + return ''.join(random.SystemRandom().choice(string.digits) for _ in \ + range(DEVICE_ADDRESS_LENGTH)) class Connection(object): - """Simple wrapper class for a socket object. + """Simple wrapper class for a socket object. Attributes: socket: The underlying socket created for the specified address and port. """ - def __init__(self, port): - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.connect(('localhost', port)) - self._socket.setblocking(0) + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) + self._socket.setblocking(0) - def close(self): - self._socket.close() + def close(self): + self._socket.close() - def send(self, data): - self._socket.sendall(data) + def send(self, data): + self._socket.sendall(data) - def receive(self, size): - return self._socket.recv(size) + def receive(self, size): + return self._socket.recv(size) class RawPort(object): - """Checks outgoing commands and sends them once verified. + """Checks outgoing commands and sends them once verified. Attributes: connection: The connection to the HCI port. """ - def __init__(self, port): - self._connection = Connection(port) - self._closed = False - - def close(self): - self._connection.close() - self._closed = True - - def send_binary(self, args): - joined_args = ''.join(arg for arg in args) - print(joined_args) - packet = binascii.a2b_hex(joined_args) - if self._closed: - return - self._connection.send(packet) - received = self.receive_response() - received_bytes = bytearray(received) - print(raw(received_bytes)) - - def receive_response(self): - if self._closed: - return - size_chars = self._connection.receive(4) - if not size_chars: - print('Debug: No response') - return False - size_bytes = bytearray(size_chars) - response_size = 0 - for i in range(0, len(size_chars) - 1): - response_size |= ord(size_chars[i]) << (8 * i) - response = self._connection.receive(response_size) - return response - - def lint_command(self, name, args, name_size, args_size): - assert name_size == len(name) and args_size == len(args) - try: - name.encode('utf-8') - for arg in args: - arg.encode('utf-8') - except UnicodeError: - print('Unrecognized characters.') - raise - if name_size > 255 or args_size > 255: - raise ValueError # Size must be encodable in one octet. - for arg in args: - if len(arg) > 255: - raise ValueError # Size must be encodable in one octet. + def __init__(self, port): + self._connection = Connection(port) + self._closed = False + + def close(self): + self._connection.close() + self._closed = True + + def send_binary(self, args): + joined_args = ''.join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + if self._closed: + return + self._connection.send(packet) + received = self.receive_response() + received_bytes = bytearray(received) + print(raw(received_bytes)) + + def receive_response(self): + if self._closed: + return + size_chars = self._connection.receive(4) + if not size_chars: + print('Debug: No response') + return False + size_bytes = bytearray(size_chars) + response_size = 0 + for i in range(0, len(size_chars) - 1): + response_size |= ord(size_chars[i]) << (8 * i) + response = self._connection.receive(response_size) + return response + + def lint_command(self, name, args, name_size, args_size): + assert name_size == len(name) and args_size == len(args) + try: + name.encode('utf-8') + for arg in args: + arg.encode('utf-8') + except UnicodeError: + print('Unrecognized characters.') + raise + if name_size > 255 or args_size > 255: + raise ValueError # Size must be encodable in one octet. + for arg in args: + if len(arg) > 255: + raise ValueError # Size must be encodable in one octet. class RawPortShell(cmd.Cmd): - """Shell for sending binary data to a port. + """Shell for sending binary data to a port. """ - def __init__(self, raw_port): - cmd.Cmd.__init__(self) - self._raw_port = raw_port + def __init__(self, raw_port): + cmd.Cmd.__init__(self) + self._raw_port = raw_port - def do_send(self, args): - """Arguments: dev_type_str Add a new device of type dev_type_str. + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. """ - self._raw_port.send_binary(args.split()) + self._raw_port.send_binary(args.split()) - def do_quit(self, args): - """Arguments: None. + def do_quit(self, args): + """Arguments: None. Exits. """ - self._raw_port.close() - print('Goodbye.') - return True + self._raw_port.close() + print('Goodbye.') + return True - def do_help(self, args): - """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. """ - if (len(args) == 0): - cmd.Cmd.do_help(self, args) + if (len(args) == 0): + cmd.Cmd.do_help(self, args) def main(argv): - if len(argv) != 2: - print('Usage: python raw_port.py [port]') - return - try: - port = int(argv[1]) - except ValueError: - print('Error parsing port.') - else: + if len(argv) != 2: + print('Usage: python raw_port.py [port]') + return try: - raw_port = RawPort(port) - except (socket.error, e): - print('Error connecting to socket: %s' % e) - except: - print('Error creating (check arguments).') + port = int(argv[1]) + except ValueError: + print('Error parsing port.') else: - raw_port_shell = RawPortShell(raw_port) - raw_port_shell.prompt = '$ ' - raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + - 'Type \'help\' for more information.') + try: + raw_port = RawPort(port) + except (socket.error, e): + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + raw_port_shell = RawPortShell(raw_port) + raw_port_shell.prompt = '$ ' + raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + + 'Type \'help\' for more information.') if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py b/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py index 982aee2dc9..89552d9eae 100644 --- a/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py +++ b/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py @@ -61,137 +61,137 @@ DEVICE_ADDRESS_LENGTH = 6 # Used to generate fake device names and addresses during discovery. def generate_random_name(): - return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ - string.digits) for _ in range(DEVICE_NAME_LENGTH)) + return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ + string.digits) for _ in range(DEVICE_NAME_LENGTH)) def generate_random_address(): - return ''.join(random.SystemRandom().choice(string.digits) for _ in \ - range(DEVICE_ADDRESS_LENGTH)) + return ''.join(random.SystemRandom().choice(string.digits) for _ in \ + range(DEVICE_ADDRESS_LENGTH)) class Connection(object): - """Simple wrapper class for a socket object. + """Simple wrapper class for a socket object. Attributes: socket: The underlying socket created for the specified address and port. """ - def __init__(self, port): - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.connect(('localhost', port)) - self._socket.setblocking(0) + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) + self._socket.setblocking(0) - def close(self): - self._socket.close() + def close(self): + self._socket.close() - def send(self, data): - self._socket.sendall(data) + def send(self, data): + self._socket.sendall(data) - def receive(self, size): - return self._socket.recv(size) + def receive(self, size): + return self._socket.recv(size) class RawPort(object): - """Checks outgoing commands and sends them once verified. + """Checks outgoing commands and sends them once verified. Attributes: connection: The connection to the HCI port. """ - def __init__(self, port): - self._connection = Connection(port) - self._closed = False - - def close(self): - self._connection.close() - self._closed = True - - def send_binary(self, args): - joined_args = ''.join(arg for arg in args) - print(joined_args) - packet = binascii.a2b_hex(joined_args) - if self._closed: - return - self._connection.send(packet) - - def receive_response(self): - if self._closed: - return - size_chars = self._connection.receive(4) - if not size_chars: - print('Debug: No response') - return False - size_bytes = bytearray(size_chars) - response_size = 0 - for i in range(0, len(size_chars) - 1): - response_size |= ord(size_chars[i]) << (8 * i) - response = self._connection.receive(response_size) - return response - - def lint_command(self, name, args, name_size, args_size): - assert name_size == len(name) and args_size == len(args) - try: - name.encode('utf-8') - for arg in args: - arg.encode('utf-8') - except UnicodeError: - print('Unrecognized characters.') - raise - if name_size > 255 or args_size > 255: - raise ValueError # Size must be encodable in one octet. - for arg in args: - if len(arg) > 255: - raise ValueError # Size must be encodable in one octet. + def __init__(self, port): + self._connection = Connection(port) + self._closed = False + + def close(self): + self._connection.close() + self._closed = True + + def send_binary(self, args): + joined_args = ''.join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + if self._closed: + return + self._connection.send(packet) + + def receive_response(self): + if self._closed: + return + size_chars = self._connection.receive(4) + if not size_chars: + print('Debug: No response') + return False + size_bytes = bytearray(size_chars) + response_size = 0 + for i in range(0, len(size_chars) - 1): + response_size |= ord(size_chars[i]) << (8 * i) + response = self._connection.receive(response_size) + return response + + def lint_command(self, name, args, name_size, args_size): + assert name_size == len(name) and args_size == len(args) + try: + name.encode('utf-8') + for arg in args: + arg.encode('utf-8') + except UnicodeError: + print('Unrecognized characters.') + raise + if name_size > 255 or args_size > 255: + raise ValueError # Size must be encodable in one octet. + for arg in args: + if len(arg) > 255: + raise ValueError # Size must be encodable in one octet. class RawPortShell(cmd.Cmd): - """Shell for sending binary data to a port.""" + """Shell for sending binary data to a port.""" - def __init__(self, raw_port): - cmd.Cmd.__init__(self) - self._raw_port = raw_port + def __init__(self, raw_port): + cmd.Cmd.__init__(self) + self._raw_port = raw_port - def do_send(self, args): - """Arguments: dev_type_str Add a new device of type dev_type_str.""" - self._raw_port.send_binary(args.split()) + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str.""" + self._raw_port.send_binary(args.split()) - def do_quit(self, args): - """Arguments: None. + def do_quit(self, args): + """Arguments: None. Exits. """ - self._raw_port.close() - print('Goodbye.') - return True + self._raw_port.close() + print('Goodbye.') + return True - def do_help(self, args): - """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.""" - if (len(args) == 0): - cmd.Cmd.do_help(self, args) + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.""" + if (len(args) == 0): + cmd.Cmd.do_help(self, args) def main(argv): - if len(argv) != 2: - print('Usage: python raw_port.py [port]') - return - try: - port = int(argv[1]) - except ValueError: - print('Error parsing port.') - else: + if len(argv) != 2: + print('Usage: python raw_port.py [port]') + return try: - raw_port = RawPort(port) - except (socket.error, e): - print('Error connecting to socket: %s' % e) - except: - print('Error creating (check arguments).') + port = int(argv[1]) + except ValueError: + print('Error parsing port.') else: - raw_port_shell = RawPortShell(raw_port) - raw_port_shell.prompt = '$ ' - raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + - 'Type \'help\' for more information.') + try: + raw_port = RawPort(port) + except (socket.error, e): + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + raw_port_shell = RawPortShell(raw_port) + raw_port_shell.prompt = '$ ' + raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + + 'Type \'help\' for more information.') if __name__ == '__main__': - main(sys.argv) + main(sys.argv) diff --git a/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py b/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py index 95d973a2fb..52472e9e7d 100644 --- a/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py +++ b/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py @@ -72,167 +72,167 @@ from scapy.all import * class HCI_Cmd_Connect(Packet): - name = "Connect" - fields_desc = [ - ByteEnumField("filter", 0, {0: "address"}), - LEShortField("packet_type", 8), - ByteEnumField("page_scan_repetition_mode", 0, { - 0: "R0", - 1: "R1", - 2: "R2" - }), - ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}), - LEShortField("clock_offset", 0), - ByteEnumField("allow_role_switch", 0, { - 0: "false", - 1: "true" - }), - ] + name = "Connect" + fields_desc = [ + ByteEnumField("filter", 0, {0: "address"}), + LEShortField("packet_type", 8), + ByteEnumField("page_scan_repetition_mode", 0, { + 0: "R0", + 1: "R1", + 2: "R2" + }), + ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}), + LEShortField("clock_offset", 0), + ByteEnumField("allow_role_switch", 0, { + 0: "false", + 1: "true" + }), + ] class HCI_Cmd_Inquiry(Packet): - name = "Inquiry" - fields_desc = [ - XByteField("LAP0", 0), - XByteField("LAP1", 0x8B), - XByteField("LAP2", 0x9E), - ByteField("length", 1), - ByteField("max_responses", 0), - ] + name = "Inquiry" + fields_desc = [ + XByteField("LAP0", 0), + XByteField("LAP1", 0x8B), + XByteField("LAP2", 0x9E), + ByteField("length", 1), + ByteField("max_responses", 0), + ] """ END SCAPY stuff""" class Connection(object): - """Simple wrapper class for a socket object. + """Simple wrapper class for a socket object. Attributes: socket: The underlying socket created for the specified address and port. """ - def __init__(self, port): - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.connect(("localhost", port)) + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(("localhost", port)) - def close(self): - self._socket.close() + def close(self): + self._socket.close() - def send(self, data): - self._socket.sendall(data) + def send(self, data): + self._socket.sendall(data) - def receive(self, size): - return self._socket.recv(size) + def receive(self, size): + return self._socket.recv(size) class RawPort(object): - """Converts outgoing packets to binary and sends them. + """Converts outgoing packets to binary and sends them. Attributes: connection: The connection to the HCI port. """ - def __init__(self, port): - self._connection = Connection(port) - - def close(self): - self._connection.close() - - def send_binary(self, args): - joined_args = "".join(arg for arg in args) - print(joined_args) - packet = binascii.a2b_hex(joined_args) - self._connection.send(packet) - - def receive_response(self): - ready_to_read, ready_to_write, in_error = \ - select( - [ self._connection._socket ], - [ ], - [ self._connection._socket ], - 1.5) - if len(in_error) > 0: - print("Error") - return False - if len(ready_to_read) > 0: - print("Ready to Read") - type_str = self._connection.receive(512) - print(len(type_str)) - print(type_str) - return type_str - print("Returning false at the end") - return False + def __init__(self, port): + self._connection = Connection(port) + + def close(self): + self._connection.close() + + def send_binary(self, args): + joined_args = "".join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + self._connection.send(packet) + + def receive_response(self): + ready_to_read, ready_to_write, in_error = \ + select( + [ self._connection._socket ], + [ ], + [ self._connection._socket ], + 1.5) + if len(in_error) > 0: + print("Error") + return False + if len(ready_to_read) > 0: + print("Ready to Read") + type_str = self._connection.receive(512) + print(len(type_str)) + print(type_str) + return type_str + print("Returning false at the end") + return False class RawPortShell(cmd.Cmd): - """Shell for sending binary data to a port. + """Shell for sending binary data to a port. """ - def __init__(self, raw_port): - cmd.Cmd.__init__(self) - self._raw_port = raw_port + def __init__(self, raw_port): + cmd.Cmd.__init__(self) + self._raw_port = raw_port - def do_send(self, args): - """Arguments: dev_type_str Add a new device of type dev_type_str. + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. """ - self._raw_port.send_binary(args.split()) + self._raw_port.send_binary(args.split()) - def do_scan(self, args): - """Arguments: timeout (seconds) Print the scan responses from reachable devices + def do_scan(self, args): + """Arguments: timeout (seconds) Print the scan responses from reachable devices """ - self._raw_port.send_binary(args.split()) + self._raw_port.send_binary(args.split()) - def do_quit(self, args): - """Arguments: None. + def do_quit(self, args): + """Arguments: None. Exits. """ - self._raw_port.close() - print("Goodbye.") - return True + self._raw_port.close() + print("Goodbye.") + return True - def do_help(self, args): - """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. """ - if (len(args) == 0): - cmd.Cmd.do_help(self, args) + if (len(args) == 0): + cmd.Cmd.do_help(self, args) - def postcmd(self, stop, line): - """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue + def postcmd(self, stop, line): + """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue """ - if stop: - return True - response = self._raw_port.receive_response() - print(response) - return False + if stop: + return True + response = self._raw_port.receive_response() + print(response) + return False def main(argv): - if len(argv) != 2: - print("Usage: python raw_port.py [port]") - return - try: - port = int(argv[1]) - except ValueError: - print("Error parsing port.") - else: + if len(argv) != 2: + print("Usage: python raw_port.py [port]") + return try: - raw_port = RawPort(port) - except (socket.error, e): - print("Error connecting to socket: %s" % e) - except: - print("Error creating (check arguments).") + port = int(argv[1]) + except ValueError: + print("Error parsing port.") else: - raw_port_shell = RawPortShell(raw_port) - raw_port_shell.prompt = "$ " - raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" + - 'Type \'help\' for more information.') + try: + raw_port = RawPort(port) + except (socket.error, e): + print("Error connecting to socket: %s" % e) + except: + print("Error creating (check arguments).") + else: + raw_port_shell = RawPortShell(raw_port) + raw_port_shell.prompt = "$ " + raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" + + 'Type \'help\' for more information.') if __name__ == "__main__": - main(sys.argv) + main(sys.argv) diff --git a/system/vendor_libs/test_vendor_lib/scripts/test_channel.py b/system/vendor_libs/test_vendor_lib/scripts/test_channel.py index 447130b537..79b19b1bbb 100644 --- a/system/vendor_libs/test_vendor_lib/scripts/test_channel.py +++ b/system/vendor_libs/test_vendor_lib/scripts/test_channel.py @@ -49,97 +49,98 @@ DEVICE_ADDRESS_LENGTH = 6 # Used to generate fake device names and addresses during discovery. def generate_random_name(): - return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ - string.digits) for _ in range(DEVICE_NAME_LENGTH)) + return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ + string.digits) for _ in range(DEVICE_NAME_LENGTH)) def generate_random_address(): - return ''.join(random.SystemRandom().choice(string.digits) for _ in \ - range(DEVICE_ADDRESS_LENGTH)) + return ''.join(random.SystemRandom().choice(string.digits) for _ in \ + range(DEVICE_ADDRESS_LENGTH)) class Connection(object): - """Simple wrapper class for a socket object. + """Simple wrapper class for a socket object. Attributes: socket: The underlying socket created for the specified address and port. """ - def __init__(self, port): - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.connect(('localhost', port)) + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) - def close(self): - self._socket.close() + def close(self): + self._socket.close() - def send(self, data): - self._socket.sendall(data) + def send(self, data): + self._socket.sendall(data) - def receive(self, size): - return self._socket.recv(size) + def receive(self, size): + return self._socket.recv(size) class TestChannel(object): - """Checks outgoing commands and sends them once verified. + """Checks outgoing commands and sends them once verified. Attributes: connection: The connection to the test vendor library that commands are sent on. """ - def __init__(self, port): - self._connection = Connection(port) - self._closed = False - - def close(self): - self._connection.close() - self._closed = True - - def send_command(self, name, args): - name_size = len(name) - args_size = len(args) - self.lint_command(name, args, name_size, args_size) - encoded_name = chr(name_size) + name - encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args) - command = encoded_name + encoded_args - if self._closed: - return - self._connection.send(command) - if name != 'CLOSE_TEST_CHANNEL': - print self.receive_response() - - def receive_response(self): - if self._closed: - return - size_chars = self._connection.receive(4) - size_bytes = bytearray(size_chars) - if not size_chars: - print 'No response, assuming that the connection is broken' - return False - response_size = 0 - for i in range(0, len(size_chars) - 1): - response_size |= ord(size_chars[i]) << (8 * i) - response = self._connection.receive(response_size) - return response - - def lint_command(self, name, args, name_size, args_size): - assert name_size == len(name) and args_size == len(args) - try: - name.encode('utf-8') - for arg in args: - arg.encode('utf-8') - except UnicodeError: - print 'Unrecognized characters.' - raise - if name_size > 255 or args_size > 255: - raise ValueError # Size must be encodable in one octet. - for arg in args: - if len(arg) > 255: - raise ValueError # Size must be encodable in one octet. + def __init__(self, port): + self._connection = Connection(port) + self._closed = False + + def close(self): + self._connection.close() + self._closed = True + + def send_command(self, name, args): + name_size = len(name) + args_size = len(args) + self.lint_command(name, args, name_size, args_size) + encoded_name = chr(name_size) + name + encoded_args = chr(args_size) + ''.join( + chr(len(arg)) + arg for arg in args) + command = encoded_name + encoded_args + if self._closed: + return + self._connection.send(command) + if name != 'CLOSE_TEST_CHANNEL': + print self.receive_response() + + def receive_response(self): + if self._closed: + return + size_chars = self._connection.receive(4) + size_bytes = bytearray(size_chars) + if not size_chars: + print 'No response, assuming that the connection is broken' + return False + response_size = 0 + for i in range(0, len(size_chars) - 1): + response_size |= ord(size_chars[i]) << (8 * i) + response = self._connection.receive(response_size) + return response + + def lint_command(self, name, args, name_size, args_size): + assert name_size == len(name) and args_size == len(args) + try: + name.encode('utf-8') + for arg in args: + arg.encode('utf-8') + except UnicodeError: + print 'Unrecognized characters.' + raise + if name_size > 255 or args_size > 255: + raise ValueError # Size must be encodable in one octet. + for arg in args: + if len(arg) > 255: + raise ValueError # Size must be encodable in one octet. class TestChannelShell(cmd.Cmd): - """Shell for sending test channel data to controller. + """Shell for sending test channel data to controller. Manages the test channel to the controller and defines a set of commands the user can send to the controller as well. These commands are processed parallel @@ -150,138 +151,138 @@ class TestChannelShell(cmd.Cmd): test_channel: The communication channel to send data to the controller. """ - def __init__(self, test_channel): - cmd.Cmd.__init__(self) - self._test_channel = test_channel + def __init__(self, test_channel): + cmd.Cmd.__init__(self) + self._test_channel = test_channel - def do_add(self, args): - """Arguments: dev_type_str Add a new device of type dev_type_str. + def do_add(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. """ - self._test_channel.send_command('add', args.split()) + self._test_channel.send_command('add', args.split()) - def do_del(self, args): - """Arguments: device index Delete the device with the specified index. + def do_del(self, args): + """Arguments: device index Delete the device with the specified index. """ - self._test_channel.send_command('del', args.split()) + self._test_channel.send_command('del', args.split()) - def do_add_phy(self, args): - """Arguments: dev_type_str Add a new device of type dev_type_str. + def do_add_phy(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. """ - self._test_channel.send_command('add_phy', args.split()) + self._test_channel.send_command('add_phy', args.split()) - def do_del_phy(self, args): - """Arguments: phy index Delete the phy with the specified index. + def do_del_phy(self, args): + """Arguments: phy index Delete the phy with the specified index. """ - self._test_channel.send_command('del_phy', args.split()) + self._test_channel.send_command('del_phy', args.split()) - def do_add_device_to_phy(self, args): - """Arguments: device index phy index Add a new device of type dev_type_str. + def do_add_device_to_phy(self, args): + """Arguments: device index phy index Add a new device of type dev_type_str. """ - self._test_channel.send_command('add_device_to_phy', args.split()) + self._test_channel.send_command('add_device_to_phy', args.split()) - def do_del_device_from_phy(self, args): - """Arguments: phy index Delete the phy with the specified index. + def do_del_device_from_phy(self, args): + """Arguments: phy index Delete the phy with the specified index. """ - self._test_channel.send_command('del_device_from_phy', args.split()) + self._test_channel.send_command('del_device_from_phy', args.split()) - def do_add_remote(self, args): - """Arguments: dev_type_str Connect to a remote device at arg1@arg2. + def do_add_remote(self, args): + """Arguments: dev_type_str Connect to a remote device at arg1@arg2. """ - self._test_channel.send_command('add_remote', args.split()) + self._test_channel.send_command('add_remote', args.split()) - def do_get(self, args): - """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num. + def do_get(self, args): + """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num. """ - self._test_channel.send_command('get', args.split()) + self._test_channel.send_command('get', args.split()) - def do_set(self, args): - """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val. + def do_set(self, args): + """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val. """ - self._test_channel.send_command('set', args.split()) + self._test_channel.send_command('set', args.split()) - def do_set_device_address(self, args): - """Arguments: dev_num addr Set the address of device dev_num equal to addr. + def do_set_device_address(self, args): + """Arguments: dev_num addr Set the address of device dev_num equal to addr. """ - self._test_channel.send_command('set_device_address', args.split()) + self._test_channel.send_command('set_device_address', args.split()) - def do_list(self, args): - """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr. + def do_list(self, args): + """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr. """ - self._test_channel.send_command('list', args.split()) + self._test_channel.send_command('list', args.split()) - def do_quit(self, args): - """Arguments: None. + def do_quit(self, args): + """Arguments: None. Exits the test channel. """ - self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) - self._test_channel.close() - print 'Goodbye.' - return True + self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) + self._test_channel.close() + print 'Goodbye.' + return True - def do_help(self, args): - """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. """ - if (len(args) == 0): - cmd.Cmd.do_help(self, args) - else: - self._test_channel.send_command('help', args.split()) + if (len(args) == 0): + cmd.Cmd.do_help(self, args) + else: + self._test_channel.send_command('help', args.split()) - def preloop(self): - """Clear out the buffer + def preloop(self): + """Clear out the buffer """ - response = self._test_channel.receive_response() - - #def postcmd(self, stop, line): - #""" - #Called after each command - #stop : whether we will stop after this command - #line : the previous input line - #Return True to stop, False to continue - #""" - #if stop: - #return True - #response = self._test_channel.receive_response() - #if not response: - #return True - #print response - #return False + response = self._test_channel.receive_response() + + #def postcmd(self, stop, line): + #""" + #Called after each command + #stop : whether we will stop after this command + #line : the previous input line + #Return True to stop, False to continue + #""" + #if stop: + #return True + #response = self._test_channel.receive_response() + #if not response: + #return True + #print response + #return False def main(argv): - if len(argv) != 2: - print 'Usage: python test_channel.py [port]' - return - try: - port = int(argv[1]) - except ValueError: - print 'Error parsing port.' - else: + if len(argv) != 2: + print 'Usage: python test_channel.py [port]' + return try: - test_channel = TestChannel(port) - except socket.error, e: - print 'Error connecting to socket: %s' % e - except: - print 'Error creating test channel (check argument).' + port = int(argv[1]) + except ValueError: + print 'Error parsing port.' else: - test_channel_shell = TestChannelShell(test_channel) - test_channel_shell.prompt = '$ ' - test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + - 'Type \'help\' for more information.') + try: + test_channel = TestChannel(port) + except socket.error, e: + print 'Error connecting to socket: %s' % e + except: + print 'Error creating test channel (check argument).' + else: + test_channel_shell = TestChannelShell(test_channel) + test_channel_shell.prompt = '$ ' + test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + + 'Type \'help\' for more information.') if __name__ == '__main__': - main(sys.argv) + main(sys.argv) |