summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Jack He <siyuanh@google.com> 2019-12-10 16:41:59 -0800
committer Jack He <siyuanh@google.com> 2019-12-11 12:29:31 -0800
commit8198497fcfbe9811bc9a6c538e09f853561290a8 (patch)
tree29bb8fa7df694f8f218d9d11f6240541e7944ddc
parentcb11407c93016592a335105fa726c2310a4cea87 (diff)
Python: format all Python files in system/bt using YAPF
python3 yapf -p -i $(git ls-tree --name-only -r aosp/master | grep "\.py") Bug: 146016811 Test: run Python tests Change-Id: Ic5fe6a21151d1abc8eb013f8c8070ba8238a5249
-rw-r--r--system/build/toolchain/clang/get_clang_suffix.py58
-rw-r--r--system/gd/cert/bluetooth_packets_python3_setup.py43
-rw-r--r--system/gd/cert/event_asserts.py76
-rw-r--r--system/gd/cert/gd_base_test.py23
-rw-r--r--system/gd/cert/gd_cert_device.py22
-rw-r--r--system/gd/cert/gd_device.py29
-rw-r--r--system/gd/cert/gd_device_base.py26
-rw-r--r--system/gd/cert/pts_base_test.py9
-rw-r--r--system/gd/hal/cert/simple_hal_test.py180
-rw-r--r--system/gd/hci/cert/simple_hci_test.py255
-rw-r--r--system/gd/l2cap/classic/cert/pts_l2cap_test.py161
-rw-r--r--system/gd/l2cap/classic/cert/simple_l2cap_test.py447
-rwxr-xr-xsystem/test/gen_coverage.py574
-rwxr-xr-xsystem/test/run_host_unit_tests.py312
-rw-r--r--system/tools/scripts/btsnoop_live.py32
-rwxr-xr-xsystem/tools/scripts/btsnooz.py184
-rwxr-xr-xsystem/tools/scripts/dump_hearingaid_audio.py900
-rwxr-xr-xsystem/tools/scripts/dump_metrics_ascii.py29
-rw-r--r--system/vendor_libs/test_vendor_lib/scripts/hci_socket.py568
-rw-r--r--system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py190
-rw-r--r--system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py194
-rw-r--r--system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py188
-rw-r--r--system/vendor_libs/test_vendor_lib/scripts/simple_stack.py218
-rw-r--r--system/vendor_libs/test_vendor_lib/scripts/test_channel.py299
24 files changed, 2681 insertions, 2336 deletions
diff --git a/system/build/toolchain/clang/get_clang_suffix.py b/system/build/toolchain/clang/get_clang_suffix.py
index dedacdd51d..fbfe16c959 100644
--- a/system/build/toolchain/clang/get_clang_suffix.py
+++ b/system/build/toolchain/clang/get_clang_suffix.py
@@ -3,41 +3,43 @@ import subprocess
import re
import sys
+
def which(cmd):
- for p in os.environ["PATH"].split(os.pathsep):
- clang_path = os.path.join(p, cmd)
- if os.path.exists(clang_path):
- return clang_path
- return None
+ for p in os.environ["PATH"].split(os.pathsep):
+ clang_path = os.path.join(p, cmd)
+ if os.path.exists(clang_path):
+ return clang_path
+ return None
+
-CLANG_VERSION_REGEX=".*version\s*([0-9]*\.[0-9]*)\.*"
+CLANG_VERSION_REGEX = ".*version\s*([0-9]*\.[0-9]*)\.*"
clang_path = which("clang++")
clang_version_major = 0
clang_version_minor = 0
if clang_path:
- clang_version_out = subprocess.Popen([clang_path, "--version"],
- stdout=subprocess.PIPE).communicate()[0]
- clang_version_match = re.search(CLANG_VERSION_REGEX, clang_version_out)
- clang_version_str = clang_version_match.group(1)
- clang_version_array = clang_version_str.split('.')
- clang_version_major = int(clang_version_array[0])
- clang_version_minor = int(clang_version_array[1])
+ clang_version_out = subprocess.Popen(
+ [clang_path, "--version"], stdout=subprocess.PIPE).communicate()[0]
+ clang_version_match = re.search(CLANG_VERSION_REGEX, clang_version_out)
+ clang_version_str = clang_version_match.group(1)
+ clang_version_array = clang_version_str.split('.')
+ clang_version_major = int(clang_version_array[0])
+ clang_version_minor = int(clang_version_array[1])
if clang_version_major >= 3 and clang_version_minor >= 5:
- print ""
+ print ""
else:
- # Loop in support clang version only
- clang_version_major = 3
- clang_version_minor = 9
- while clang_version_major >= 3 and clang_version_minor >= 5:
- clang_version_str = "%d.%d" % (clang_version_major, clang_version_minor)
- clang_path = which("clang++-" + clang_version_str)
- if clang_path:
- print clang_version_str
- sys.exit(0)
- clang_version_minor -= 1
- if clang_version_minor < 0:
- clang_version_minor = 9
- clang_version_major -= 1
- print "None"
+ # Loop in support clang version only
+ clang_version_major = 3
+ clang_version_minor = 9
+ while clang_version_major >= 3 and clang_version_minor >= 5:
+ clang_version_str = "%d.%d" % (clang_version_major, clang_version_minor)
+ clang_path = which("clang++-" + clang_version_str)
+ if clang_path:
+ print clang_version_str
+ sys.exit(0)
+ clang_version_minor -= 1
+ if clang_version_minor < 0:
+ clang_version_minor = 9
+ clang_version_major -= 1
+ print "None"
diff --git a/system/gd/cert/bluetooth_packets_python3_setup.py b/system/gd/cert/bluetooth_packets_python3_setup.py
index 33c8293fde..7cca0196ae 100644
--- a/system/gd/cert/bluetooth_packets_python3_setup.py
+++ b/system/gd/cert/bluetooth_packets_python3_setup.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
# Usage:
# 1. Run envsetup and lunch first in an Android checkout
# 2. Make target bluetooth_packets_python3 that will generate C++ sources for the
@@ -25,7 +24,6 @@
# 4. Install:
# python3 bluetooth_packets_python3_setup.py install --user
-
import os
import glob
from setuptools import setup, Extension
@@ -34,10 +32,13 @@ ANDROID_BUILD_TOP = os.getenv("ANDROID_BUILD_TOP")
PYBIND11_INCLUDE_DIR = os.path.join(ANDROID_BUILD_TOP,
"external/python/pybind11/include")
GD_DIR = os.path.join(ANDROID_BUILD_TOP, "packages/modules/Bluetooth/system/gd")
-BT_PACKETS_GEN_DIR = os.path.join(ANDROID_BUILD_TOP,
- "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_h/gen")
-BT_PACKETS_PY3_GEN_DIR = os.path.join(ANDROID_BUILD_TOP,
- "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_python3_cc/gen")
+BT_PACKETS_GEN_DIR = os.path.join(
+ ANDROID_BUILD_TOP,
+ "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_h/gen")
+BT_PACKETS_PY3_GEN_DIR = os.path.join(
+ ANDROID_BUILD_TOP,
+ "out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothGeneratedPackets_python3_cc/gen"
+)
BT_PACKETS_BASE_SRCS = [
os.path.join(GD_DIR, "l2cap/fcs.cc"),
@@ -57,19 +58,19 @@ BT_PACKETS_PY3_SRCs = \
+ glob.glob(os.path.join(BT_PACKETS_PY3_GEN_DIR, "l2cap", "*.cc")) \
+ glob.glob(os.path.join(BT_PACKETS_PY3_GEN_DIR, "security", "*.cc"))
-bluetooth_packets_python3_module = Extension('bluetooth_packets_python3',
- sources=BT_PACKETS_BASE_SRCS + BT_PACKETS_PY3_SRCs,
- include_dirs=[GD_DIR,
- BT_PACKETS_GEN_DIR,
- BT_PACKETS_PY3_GEN_DIR,
- PYBIND11_INCLUDE_DIR],
- extra_compile_args=['-std=c++17']
- )
+bluetooth_packets_python3_module = Extension(
+ 'bluetooth_packets_python3',
+ sources=BT_PACKETS_BASE_SRCS + BT_PACKETS_PY3_SRCs,
+ include_dirs=[
+ GD_DIR, BT_PACKETS_GEN_DIR, BT_PACKETS_PY3_GEN_DIR, PYBIND11_INCLUDE_DIR
+ ],
+ extra_compile_args=['-std=c++17'])
-setup(name='bluetooth_packets_python3',
- version='1.0',
- author="Android Open Source Project",
- description="""Bluetooth Packet Library""",
- ext_modules=[bluetooth_packets_python3_module],
- py_modules=["bluetooth_packets_python3"],
- )
+setup(
+ name='bluetooth_packets_python3',
+ version='1.0',
+ author="Android Open Source Project",
+ description="""Bluetooth Packet Library""",
+ ext_modules=[bluetooth_packets_python3_module],
+ py_modules=["bluetooth_packets_python3"],
+)
diff --git a/system/gd/cert/event_asserts.py b/system/gd/cert/event_asserts.py
index a479c55299..1ce118967f 100644
--- a/system/gd/cert/event_asserts.py
+++ b/system/gd/cert/event_asserts.py
@@ -24,6 +24,7 @@ from google.protobuf import text_format
from cert.event_callback_stream import EventCallbackStream
+
class EventAsserts(object):
"""
A class that handles various asserts with respect to a gRPC unary stream
@@ -46,7 +47,7 @@ class EventAsserts(object):
raise ValueError("event_callback_stream cannot be None")
self.event_callback_stream = event_callback_stream
self.event_queue = SimpleQueue()
- self.callback = lambda event : self.event_queue.put(event)
+ self.callback = lambda event: self.event_queue.put(event)
self.event_callback_stream.register_callback(self.callback)
def __del__(self):
@@ -62,15 +63,16 @@ class EventAsserts(object):
logging.debug("assert_none")
try:
event = self.event_queue.get(timeout=timeout.seconds)
- asserts.assert_equal(event, None,
- msg=(
- "Expected None, but got %s" % text_format.MessageToString(
- event, as_one_line=True)))
+ asserts.assert_equal(
+ event,
+ None,
+ msg=("Expected None, but got %s" % text_format.MessageToString(
+ event, as_one_line=True)))
except Empty:
return
- def assert_none_matching(self, match_fn,
- timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
+ def assert_none_matching(
+ self, match_fn, timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
"""
Assert no events where match_fn(event) is True happen within timeout
period
@@ -88,23 +90,24 @@ class EventAsserts(object):
logging.debug("Waiting for event iteration %d" % iter_count)
try:
time_before = datetime.now()
- current_event = self.event_queue.get(
- timeout=timeout_seconds)
+ current_event = self.event_queue.get(timeout=timeout_seconds)
time_elapsed = datetime.now() - time_before
timeout_seconds -= time_elapsed.seconds
if match_fn(current_event):
event = current_event
except Empty:
continue
- logging.debug(
- "Done waiting for event, got %s" % text_format.MessageToString(
- event, as_one_line=True))
- asserts.assert_equal(event, None,
- msg=(
- "Expected None matching, but got %s" % text_format.MessageToString(
- event, as_one_line=True)))
-
- def assert_event_occurs(self, match_fn, at_least_times=1,
+ logging.debug("Done waiting for event, got %s" %
+ text_format.MessageToString(event, as_one_line=True))
+ asserts.assert_equal(
+ event,
+ None,
+ msg=("Expected None matching, but got %s" %
+ text_format.MessageToString(event, as_one_line=True)))
+
+ def assert_event_occurs(self,
+ match_fn,
+ at_least_times=1,
timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
"""
Assert at least |at_least_times| instances of events happen where
@@ -125,22 +128,25 @@ class EventAsserts(object):
logging.debug("Waiting for event iteration %d" % iter_count)
try:
time_before = datetime.now()
- current_event = self.event_queue.get(
- timeout=timeout_seconds)
+ current_event = self.event_queue.get(timeout=timeout_seconds)
time_elapsed = datetime.now() - time_before
timeout_seconds -= time_elapsed.seconds
if match_fn(current_event):
event.append(current_event)
except Empty:
continue
- logging.debug(
- "Done waiting for event, got %s" % text_format.MessageToString(
- event, as_one_line=True))
- asserts.assert_true(len(event) == at_least_times,
- msg=("Expected at least %d events, but got %d" % at_least_times, len(event)))
-
- def assert_event_occurs_at_most(self, match_fn, at_most_times,
- timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
+ logging.debug("Done waiting for event, got %s" %
+ text_format.MessageToString(event, as_one_line=True))
+ asserts.assert_true(
+ len(event) == at_least_times,
+ msg=("Expected at least %d events, but got %d" % at_least_times,
+ len(event)))
+
+ def assert_event_occurs_at_most(
+ self,
+ match_fn,
+ at_most_times,
+ timeout=timedelta(seconds=DEFAULT_TIMEOUT_SECONDS)):
"""
Assert at most |at_most_times| instances of events happen where
match_fn(event) returns True within timeout period
@@ -160,16 +166,16 @@ class EventAsserts(object):
logging.debug("Waiting for event iteration %d" % iter_count)
try:
time_before = datetime.now()
- current_event = self.event_queue.get(
- timeout=timeout_seconds)
+ current_event = self.event_queue.get(timeout=timeout_seconds)
time_elapsed = datetime.now() - time_before
timeout_seconds -= time_elapsed.seconds
if match_fn(current_event):
event.append(current_event)
except Empty:
continue
- logging.debug(
- "Done waiting for event, got %s" % text_format.MessageToString(
- event, as_one_line=True))
- asserts.assert_true(len(event) <= at_most_times,
- msg=("Expected at most %d events, but got %d" % at_most_times, len(event))) \ No newline at end of file
+ logging.debug("Done waiting for event, got %s" %
+ text_format.MessageToString(event, as_one_line=True))
+ asserts.assert_true(
+ len(event) <= at_most_times,
+ msg=("Expected at most %d events, but got %d" % at_most_times,
+ len(event)))
diff --git a/system/gd/cert/gd_base_test.py b/system/gd/cert/gd_base_test.py
index a103ecccb2..3c5186af0e 100644
--- a/system/gd/cert/gd_base_test.py
+++ b/system/gd/cert/gd_base_test.py
@@ -25,12 +25,17 @@ import subprocess
ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP')
-sys.path.append(ANDROID_BUILD_TOP + '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen')
+sys.path.append(
+ ANDROID_BUILD_TOP +
+ '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen'
+)
ANDROID_HOST_OUT = os.environ.get('ANDROID_HOST_OUT')
ROOTCANAL = ANDROID_HOST_OUT + "/nativetest64/root-canal/root-canal"
+
class GdBaseTestClass(BaseTestClass):
+
def __init__(self, configs):
BaseTestClass.__init__(self, configs)
@@ -41,7 +46,8 @@ class GdBaseTestClass(BaseTestClass):
self.rootcanal_running = False
if 'rootcanal' in self.controller_configs:
self.rootcanal_running = True
- rootcanal_logpath = os.path.join(log_path_base, 'rootcanal_logs.txt')
+ rootcanal_logpath = os.path.join(log_path_base,
+ 'rootcanal_logs.txt')
self.rootcanal_logs = open(rootcanal_logpath, 'w')
rootcanal_config = self.controller_configs['rootcanal']
rootcanal_hci_port = str(rootcanal_config.get("hci_port", "6402"))
@@ -55,19 +61,16 @@ class GdBaseTestClass(BaseTestClass):
cwd=ANDROID_BUILD_TOP,
env=os.environ.copy(),
stdout=self.rootcanal_logs,
- stderr=self.rootcanal_logs
- )
+ stderr=self.rootcanal_logs)
for gd_device in gd_devices:
gd_device["rootcanal_port"] = rootcanal_hci_port
for gd_cert_device in gd_cert_devices:
gd_cert_device["rootcanal_port"] = rootcanal_hci_port
self.register_controller(
- importlib.import_module('cert.gd_device'),
- builtin=True)
+ importlib.import_module('cert.gd_device'), builtin=True)
self.register_controller(
- importlib.import_module('cert.gd_cert_device'),
- builtin=True)
+ importlib.import_module('cert.gd_cert_device'), builtin=True)
def teardown_class(self):
if self.rootcanal_running:
@@ -76,6 +79,6 @@ class GdBaseTestClass(BaseTestClass):
self.rootcanal_logs.close()
if rootcanal_return_code != 0 and\
rootcanal_return_code != -signal.SIGINT:
- logging.error("rootcanal stopped with code: %d" %
- rootcanal_return_code)
+ logging.error(
+ "rootcanal stopped with code: %d" % rootcanal_return_code)
return False
diff --git a/system/gd/cert/gd_cert_device.py b/system/gd/cert/gd_cert_device.py
index 45f05397cc..b8b23480e1 100644
--- a/system/gd/cert/gd_cert_device.py
+++ b/system/gd/cert/gd_cert_device.py
@@ -25,6 +25,7 @@ from l2cap.classic.cert import api_pb2_grpc as l2cap_cert_pb2_grpc
ACTS_CONTROLLER_CONFIG_NAME = "GdCertDevice"
ACTS_CONTROLLER_REFERENCE_NAME = "gd_cert_devices"
+
def create(configs):
if not configs:
raise GdDeviceConfigError("Configuration is empty")
@@ -52,20 +53,25 @@ def get_instances_with_configs(configs):
resolved_cmd = []
for entry in config["cmd"]:
resolved_cmd.append(replace_vars(entry, config))
- devices.append(GdCertDevice(config["grpc_port"],
- config["grpc_root_server_port"],
- config["signal_port"],
- resolved_cmd, config["label"]))
+ devices.append(
+ GdCertDevice(config["grpc_port"], config["grpc_root_server_port"],
+ config["signal_port"], resolved_cmd, config["label"]))
return devices
+
class GdCertDevice(GdDeviceBase):
- def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, label):
+
+ def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd,
+ label):
super().__init__(grpc_port, grpc_root_server_port, signal_port, cmd,
label, ACTS_CONTROLLER_CONFIG_NAME)
# Cert stubs
- self.rootservice = cert_rootservice_pb2_grpc.RootCertStub(self.grpc_root_server_channel)
+ self.rootservice = cert_rootservice_pb2_grpc.RootCertStub(
+ self.grpc_root_server_channel)
self.hal = hal_cert_pb2_grpc.HciHalCertStub(self.grpc_channel)
- self.controller_read_only_property = cert_rootservice_pb2_grpc.ReadOnlyPropertyStub(self.grpc_channel)
+ self.controller_read_only_property = cert_rootservice_pb2_grpc.ReadOnlyPropertyStub(
+ self.grpc_channel)
self.hci = hci_cert_pb2_grpc.AclManagerCertStub(self.grpc_channel)
- self.l2cap = l2cap_cert_pb2_grpc.L2capClassicModuleCertStub(self.grpc_channel)
+ self.l2cap = l2cap_cert_pb2_grpc.L2capClassicModuleCertStub(
+ self.grpc_channel)
diff --git a/system/gd/cert/gd_device.py b/system/gd/cert/gd_device.py
index a02c33846c..9f8b6dea3a 100644
--- a/system/gd/cert/gd_device.py
+++ b/system/gd/cert/gd_device.py
@@ -26,6 +26,7 @@ from l2cap.classic import facade_pb2_grpc as l2cap_facade_pb2_grpc
ACTS_CONTROLLER_CONFIG_NAME = "GdDevice"
ACTS_CONTROLLER_REFERENCE_NAME = "gd_devices"
+
def create(configs):
if not configs:
raise GdDeviceConfigError("Configuration is empty")
@@ -53,23 +54,29 @@ def get_instances_with_configs(configs):
resolved_cmd = []
for entry in config["cmd"]:
resolved_cmd.append(replace_vars(entry, config))
- devices.append(GdDevice(config["grpc_port"],
- config["grpc_root_server_port"],
- config["signal_port"],
- resolved_cmd, config["label"]))
+ devices.append(
+ GdDevice(config["grpc_port"], config["grpc_root_server_port"],
+ config["signal_port"], resolved_cmd, config["label"]))
return devices
+
class GdDevice(GdDeviceBase):
- def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd, label):
+
+ def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd,
+ label):
super().__init__(grpc_port, grpc_root_server_port, signal_port, cmd,
label, ACTS_CONTROLLER_CONFIG_NAME)
# Facade stubs
- self.rootservice = facade_rootservice_pb2_grpc.RootFacadeStub(self.grpc_root_server_channel)
+ self.rootservice = facade_rootservice_pb2_grpc.RootFacadeStub(
+ self.grpc_root_server_channel)
self.hal = hal_facade_pb2_grpc.HciHalFacadeStub(self.grpc_channel)
- self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub(self.grpc_channel)
+ self.controller_read_only_property = facade_rootservice_pb2_grpc.ReadOnlyPropertyStub(
+ self.grpc_channel)
self.hci = hci_facade_pb2_grpc.AclManagerFacadeStub(self.grpc_channel)
- self.hci_classic_security = hci_facade_pb2_grpc.ClassicSecurityManagerFacadeStub(self.grpc_channel)
- self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(self.grpc_channel)
- self.hci_le_advertising_manager = le_advertising_manager_facade_pb2_grpc.LeAdvertisingManagerFacadeStub(self.grpc_channel)
-
+ self.hci_classic_security = hci_facade_pb2_grpc.ClassicSecurityManagerFacadeStub(
+ self.grpc_channel)
+ self.l2cap = l2cap_facade_pb2_grpc.L2capClassicModuleFacadeStub(
+ self.grpc_channel)
+ self.hci_le_advertising_manager = le_advertising_manager_facade_pb2_grpc.LeAdvertisingManagerFacadeStub(
+ self.grpc_channel)
diff --git a/system/gd/cert/gd_device_base.py b/system/gd/cert/gd_device_base.py
index df5f9f23a3..7110212608 100644
--- a/system/gd/cert/gd_device_base.py
+++ b/system/gd/cert/gd_device_base.py
@@ -32,6 +32,7 @@ ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP')
ANDROID_HOST_OUT = os.environ.get('ANDROID_HOST_OUT')
WAIT_CHANNEL_READY_TIMEOUT = 10
+
def replace_vars(string, config):
serial_number = config.get("serial_number")
if serial_number is None:
@@ -46,7 +47,9 @@ def replace_vars(string, config):
.replace("$(signal_port)", config.get("signal_port")) \
.replace("$(serial_number)", serial_number)
+
class GdDeviceBase:
+
def __init__(self, grpc_port, grpc_root_server_port, signal_port, cmd,
label, type_identifier):
self.label = label if label is not None else grpc_port
@@ -55,7 +58,7 @@ class GdDeviceBase:
self.log = tracelogger.TraceLogger(
GdDeviceBaseLoggerAdapter(logging.getLogger(), {
'device': label,
- 'type_identifier' : type_identifier
+ 'type_identifier': type_identifier
}))
backing_process_logpath = os.path.join(
@@ -64,12 +67,13 @@ class GdDeviceBase:
cmd_str = json.dumps(cmd)
if "--btsnoop=" not in cmd_str:
- btsnoop_path = os.path.join(log_path_base, '%s_btsnoop_hci.log' % label)
+ btsnoop_path = os.path.join(log_path_base,
+ '%s_btsnoop_hci.log' % label)
cmd.append("--btsnoop=" + btsnoop_path)
tester_signal_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- tester_signal_socket.setsockopt(
- socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ tester_signal_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
+ 1)
socket_address = ('localhost', int(signal_port))
tester_signal_socket.bind(socket_address)
tester_signal_socket.listen(1)
@@ -83,7 +87,8 @@ class GdDeviceBase:
tester_signal_socket.accept()
tester_signal_socket.close()
- self.grpc_root_server_channel = grpc.insecure_channel("localhost:" + grpc_root_server_port)
+ self.grpc_root_server_channel = grpc.insecure_channel(
+ "localhost:" + grpc_root_server_port)
self.grpc_port = int(grpc_port)
self.grpc_channel = grpc.insecure_channel("localhost:" + grpc_port)
@@ -101,21 +106,22 @@ class GdDeviceBase:
def wait_channel_ready(self):
future = grpc.channel_ready_future(self.grpc_channel)
try:
- future.result(timeout = WAIT_CHANNEL_READY_TIMEOUT)
+ future.result(timeout=WAIT_CHANNEL_READY_TIMEOUT)
except grpc.FutureTimeoutError:
- logging.error("wait channel ready timeout")
-
+ logging.error("wait channel ready timeout")
class GdDeviceBaseLoggerAdapter(logging.LoggerAdapter):
+
def process(self, msg, kwargs):
- msg = "[%s|%s] %s" % (self.extra["type_identifier"], self.extra["device"], msg)
+ msg = "[%s|%s] %s" % (self.extra["type_identifier"],
+ self.extra["device"], msg)
return (msg, kwargs)
+
class GdDeviceConfigError(Exception):
"""Raised when GdDevice configs are malformatted."""
class GdDeviceError(error.ActsError):
"""Raised when there is an error in GdDevice."""
-
diff --git a/system/gd/cert/pts_base_test.py b/system/gd/cert/pts_base_test.py
index 1249e1ff44..c849e150e9 100644
--- a/system/gd/cert/pts_base_test.py
+++ b/system/gd/cert/pts_base_test.py
@@ -25,15 +25,18 @@ import subprocess
ANDROID_BUILD_TOP = os.environ.get('ANDROID_BUILD_TOP')
-sys.path.append(ANDROID_BUILD_TOP + '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen')
+sys.path.append(
+ ANDROID_BUILD_TOP +
+ '/out/soong/.intermediates/packages/modules/Bluetooth/system/gd/BluetoothFacadeAndCertGeneratedStub_py/gen'
+)
class PTSBaseTestClass(BaseTestClass):
+
def __init__(self, configs):
BaseTestClass.__init__(self, configs)
gd_devices = self.controller_configs.get("GdDevice")
self.register_controller(
- importlib.import_module('cert.gd_device'),
- builtin=True)
+ importlib.import_module('cert.gd_device'), builtin=True)
diff --git a/system/gd/hal/cert/simple_hal_test.py b/system/gd/hal/cert/simple_hal_test.py
index edfa78dda8..44038ccfd1 100644
--- a/system/gd/hal/cert/simple_hal_test.py
+++ b/system/gd/hal/cert/simple_hal_test.py
@@ -45,15 +45,11 @@ class SimpleHalTest(GdBaseTestClass):
self.device_under_test.rootservice.StartStack(
facade_rootservice_pb2.StartStackRequest(
module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
- 'HAL'),
- )
- )
+ 'HAL'),))
self.cert_device.rootservice.StartStack(
cert_rootservice_pb2.StartStackRequest(
module_to_test=cert_rootservice_pb2.BluetoothModule.Value(
- 'HAL'),
- )
- )
+ 'HAL'),))
self.device_under_test.wait_channel_ready()
self.cert_device.wait_channel_ready()
@@ -63,114 +59,99 @@ class SimpleHalTest(GdBaseTestClass):
def teardown_test(self):
self.device_under_test.rootservice.StopStack(
- facade_rootservice_pb2.StopStackRequest()
- )
+ facade_rootservice_pb2.StopStackRequest())
self.cert_device.rootservice.StopStack(
- cert_rootservice_pb2.StopStackRequest()
- )
+ cert_rootservice_pb2.StopStackRequest())
def test_none_event(self):
- with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream:
+ with EventCallbackStream(
+ self.device_under_test.hal.FetchHciEvent(
+ empty_pb2.Empty())) as hci_event_stream:
hci_event_asserts = EventAsserts(hci_event_stream)
hci_event_asserts.assert_none(timeout=timedelta(seconds=1))
def test_example(self):
response = self.device_under_test.hal.SetLoopbackMode(
- hal_facade_pb2.LoopbackModeSettings(enable=True)
- )
+ hal_facade_pb2.LoopbackModeSettings(enable=True))
def test_fetch_hci_event(self):
self.device_under_test.hal.SetLoopbackMode(
- hal_facade_pb2.LoopbackModeSettings(enable=True)
- )
+ hal_facade_pb2.LoopbackModeSettings(enable=True))
- with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream:
+ with EventCallbackStream(
+ self.device_under_test.hal.FetchHciEvent(
+ empty_pb2.Empty())) as hci_event_stream:
hci_event_asserts = EventAsserts(hci_event_stream)
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x01\x04\x053\x8b\x9e0\x01'
- )
- )
+ payload=b'\x01\x04\x053\x8b\x9e0\x01'))
hci_event_asserts.assert_event_occurs(
- lambda packet: packet.payload == b'\x19\x08\x01\x04\x053\x8b\x9e0\x01')
+ lambda packet: packet.payload == b'\x19\x08\x01\x04\x053\x8b\x9e0\x01'
+ )
def test_inquiry_from_dut(self):
- with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream:
+ with EventCallbackStream(
+ self.device_under_test.hal.FetchHciEvent(
+ empty_pb2.Empty())) as hci_event_stream:
hci_event_asserts = EventAsserts(hci_event_stream)
self.cert_device.hal.SetScanMode(
- hal_cert_pb2.ScanModeSettings(mode=3)
- )
+ hal_cert_pb2.ScanModeSettings(mode=3))
self.device_under_test.hal.SetInquiry(
- hal_facade_pb2.InquirySettings(length=0x30, num_responses=0xff)
- )
+ hal_facade_pb2.InquirySettings(length=0x30, num_responses=0xff))
hci_event_asserts.assert_event_occurs(
lambda packet: b'\x02\x0f' in packet.payload
# Expecting an HCI Event (code 0x02, length 0x0f)
)
def test_le_ad_scan_cert_advertises(self):
- with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream:
+ with EventCallbackStream(
+ self.device_under_test.hal.FetchHciEvent(
+ empty_pb2.Empty())) as hci_event_stream:
hci_event_asserts = EventAsserts(hci_event_stream)
# Set the LE Address to 0D:05:04:03:02:01
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D'
- )
- )
+ payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D'))
# Set the LE Scan parameters (active, 40ms, 20ms, Random,
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x0B\x20\x07\x01\x40\x00\x20\x00\x01\x00'
- )
- )
+ payload=b'\x0B\x20\x07\x01\x40\x00\x20\x00\x01\x00'))
# Enable Scanning (Disable duplicate filtering)
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x0C\x20\x02\x01\x00'
- )
- )
+ payload=b'\x0C\x20\x02\x01\x00'))
# Set the LE Address to 0C:05:04:03:02:01
self.cert_device.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C'
- )
- )
+ payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C'))
# Set LE Advertising parameters
self.cert_device.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00'
- )
- )
+ payload=
+ b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00'
+ ))
# Set LE Advertising data
self.cert_device.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- )
- )
+ payload=
+ b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ ))
# Enable Advertising
self.cert_device.hal.SendHciCommand(
- hal_facade_pb2.HciCommandPacket(
- payload=b'\x0A\x20\x01\x01'
- )
- )
+ hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x01'))
hci_event_asserts.assert_event_occurs(
lambda packet: b'Im_A_Cert' in packet.payload
# Expecting an HCI Event (code 0x3e, length 0x13, subevent 0x01 )
)
# Disable Advertising
self.cert_device.hal.SendHciCommand(
- hal_facade_pb2.HciCommandPacket(
- payload=b'\x0A\x20\x01\x00'
- )
- )
+ hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x00'))
# Disable Scanning
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x0C\x20\x02\x00\x00'
- )
- )
+ payload=b'\x0C\x20\x02\x00\x00'))
def test_le_connection_dut_advertises(self):
with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream, \
@@ -186,64 +167,50 @@ class SimpleHalTest(GdBaseTestClass):
# Set the CERT LE Address to 0C:05:04:03:02:01
self.cert_device.hal.SendHciCommand(
hal_cert_pb2.HciCommandPacket(
- payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C'
- )
- )
+ payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C'))
# Direct connect to 0D:05:04:03:02:01
self.cert_device.hal.SendHciCommand(
hal_cert_pb2.HciCommandPacket(
- payload=b'\x0D\x20\x19\x11\x01\x22\x02\x00\x01\x01\x02\x03\x04\x05\x0D\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00'
- )
- )
+ payload=
+ b'\x0D\x20\x19\x11\x01\x22\x02\x00\x01\x01\x02\x03\x04\x05\x0D\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00'
+ ))
# Set the LE Address to 0D:05:04:03:02:01
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D'
- )
- )
+ payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D'))
# Set LE Advertising parameters
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x06\x20\x0F\x80\x00\x00\x04\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00'
- )
- )
+ payload=
+ b'\x06\x20\x0F\x80\x00\x00\x04\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00'
+ ))
# Set LE Advertising data
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x08\x20\x20\x0C\x0B\x09Im_The_DUT\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- )
- )
+ payload=
+ b'\x08\x20\x20\x0C\x0B\x09Im_The_DUT\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ ))
# Enable Advertising
self.device_under_test.hal.SendHciCommand(
- hal_facade_pb2.HciCommandPacket(
- payload=b'\x0A\x20\x01\x01'
- )
- )
+ hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x01'))
# LeConnectionComplete TODO: Extract the handle
cert_hci_event_asserts.assert_event_occurs(
- lambda packet: b'\x3e\x13\x01\x00' in packet.payload
- )
+ lambda packet: b'\x3e\x13\x01\x00' in packet.payload)
# LeConnectionComplete TODO: Extract the handle
hci_event_asserts.assert_event_occurs(
- lambda packet: b'\x3e\x13\x01\x00' in packet.payload
- )
+ lambda packet: b'\x3e\x13\x01\x00' in packet.payload)
# Send ACL Data
self.device_under_test.hal.SendHciAcl(
hal_facade_pb2.HciAclPacket(
- payload=b'\xfe\x0e\x0b\x00SomeAclData'
- )
- )
+ payload=b'\xfe\x0e\x0b\x00SomeAclData'))
# Send ACL Data
self.cert_device.hal.SendHciAcl(
hal_facade_pb2.HciAclPacket(
- payload=b'\xfe\x0e\x0f\x00SomeMoreAclData'
- )
- )
+ payload=b'\xfe\x0e\x0f\x00SomeMoreAclData'))
cert_acl_data_asserts.assert_event_occurs(
- lambda packet: b'\xfe\x0e\x0b\x00SomeAclData' in packet.payload
- )
+ lambda packet: b'\xfe\x0e\x0b\x00SomeAclData' in packet.payload)
acl_data_asserts.assert_event_occurs(
lambda packet: b'\xfe\x0e\x0f\x00SomeMoreAclData' in packet.payload
)
@@ -257,51 +224,40 @@ class SimpleHalTest(GdBaseTestClass):
# Set the LE Address to 0D:05:04:03:02:01
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D'
- )
- )
+ payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0D'))
# Add the cert device to the white list (Random 0C:05:04:03:02:01)
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x11\x20\x07\x01\x01\x02\x03\x04\x05\x0C'
- )
- )
+ payload=b'\x11\x20\x07\x01\x01\x02\x03\x04\x05\x0C'))
# Connect using the white list
self.device_under_test.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x0D\x20\x19\x11\x01\x22\x02\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00'
- )
- )
+ payload=
+ b'\x0D\x20\x19\x11\x01\x22\x02\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x01\x06\x00\x70\x0C\x40\x00\x03\x07\x01\x00\x02\x00'
+ ))
# Set the LE Address to 0C:05:04:03:02:01
self.cert_device.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C'
- )
- )
+ payload=b'\x05\x20\x06\x01\x02\x03\x04\x05\x0C'))
# Set LE Advertising parameters
self.cert_device.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00'
- )
- )
+ payload=
+ b'\x06\x20\x0F\x00\x02\x00\x03\x00\x01\x00\xA1\xA2\xA3\xA4\xA5\xA6\x07\x00'
+ ))
# Set LE Advertising data
self.cert_device.hal.SendHciCommand(
hal_facade_pb2.HciCommandPacket(
- payload=b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- )
- )
+ payload=
+ b'\x08\x20\x20\x0C\x0A\x09Im_A_Cert\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ ))
# Enable Advertising
self.cert_device.hal.SendHciCommand(
- hal_facade_pb2.HciCommandPacket(
- payload=b'\x0A\x20\x01\x01'
- )
- )
+ hal_facade_pb2.HciCommandPacket(payload=b'\x0A\x20\x01\x01'))
# LeConnectionComplete
cert_hci_event_asserts.assert_event_occurs(
- lambda packet: b'\x3e\x13\x01\x00' in packet.payload
- )
+ lambda packet: b'\x3e\x13\x01\x00' in packet.payload)
# LeConnectionComplete
hci_event_asserts.assert_event_occurs(
- lambda packet: b'\x3e\x13\x01\x00' in packet.payload
- )
+ lambda packet: b'\x3e\x13\x01\x00' in packet.payload)
diff --git a/system/gd/hci/cert/simple_hci_test.py b/system/gd/hci/cert/simple_hci_test.py
index 9f90947fbb..0f0d000599 100644
--- a/system/gd/hci/cert/simple_hci_test.py
+++ b/system/gd/hci/cert/simple_hci_test.py
@@ -32,6 +32,7 @@ from hci import facade_pb2 as hci_facade_pb2
from hci.cert import api_pb2 as hci_cert_pb2
from hci.cert import api_pb2_grpc as hci_cert_pb2_grpc
+
class SimpleHciTest(GdBaseTestClass):
def setup_test(self):
@@ -40,28 +41,26 @@ class SimpleHciTest(GdBaseTestClass):
self.device_under_test.rootservice.StartStack(
facade_rootservice_pb2.StartStackRequest(
- module_under_test=facade_rootservice_pb2.BluetoothModule.Value('HCI'),
- )
- )
+ module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
+ 'HCI'),))
self.cert_device.rootservice.StartStack(
cert_rootservice_pb2.StartStackRequest(
- module_to_test=cert_rootservice_pb2.BluetoothModule.Value('HCI'),
- )
- )
+ module_to_test=cert_rootservice_pb2.BluetoothModule.Value(
+ 'HCI'),))
self.device_under_test.wait_channel_ready()
self.cert_device.wait_channel_ready()
self.device_under_test.hci.SetPageScanMode(
- hci_facade_pb2.PageScanMode(enabled=True)
- )
+ hci_facade_pb2.PageScanMode(enabled=True))
self.cert_device.hci.SetPageScanMode(
- hci_cert_pb2.PageScanMode(enabled=True)
- )
+ hci_cert_pb2.PageScanMode(enabled=True))
- dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address
+ dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(
+ empty_pb2.Empty()).address
self.device_under_test.address = dut_address
- cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address
+ cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(
+ empty_pb2.Empty()).address
self.cert_device.address = cert_address
self.dut_address = common_pb2.BluetoothAddress(
@@ -71,35 +70,38 @@ class SimpleHciTest(GdBaseTestClass):
def teardown_test(self):
self.device_under_test.rootservice.StopStack(
- facade_rootservice_pb2.StopStackRequest()
- )
+ facade_rootservice_pb2.StopStackRequest())
self.cert_device.rootservice.StopStack(
- cert_rootservice_pb2.StopStackRequest()
- )
+ cert_rootservice_pb2.StopStackRequest())
def test_none_event(self):
- with EventCallbackStream(self.device_under_test.hci.FetchConnectionComplete(empty_pb2.Empty())) as connection_complete_stream:
- connection_complete_asserts = EventAsserts(connection_complete_stream)
+ with EventCallbackStream(
+ self.device_under_test.hci.FetchConnectionComplete(
+ empty_pb2.Empty())) as connection_complete_stream:
+ connection_complete_asserts = EventAsserts(
+ connection_complete_stream)
connection_complete_asserts.assert_none()
def _connect_from_dut(self):
logging.debug("_connect_from_dut")
policy = hci_cert_pb2.IncomingConnectionPolicy(
- remote=self.dut_address,
- accepted=True
- )
+ remote=self.dut_address, accepted=True)
self.cert_device.hci.SetIncomingConnectionPolicy(policy)
- with EventCallbackStream(self.device_under_test.hci.FetchConnectionComplete(empty_pb2.Empty())) as dut_connection_complete_stream:
- dut_connection_complete_asserts = EventAsserts(dut_connection_complete_stream)
+ with EventCallbackStream(
+ self.device_under_test.hci.FetchConnectionComplete(
+ empty_pb2.Empty())) as dut_connection_complete_stream:
+ dut_connection_complete_asserts = EventAsserts(
+ dut_connection_complete_stream)
self.device_under_test.hci.Connect(self.cert_address)
dut_connection_complete_asserts.assert_event_occurs(
- lambda event: self._get_handle(event)
- )
+ lambda event: self._get_handle(event))
def _disconnect_from_dut(self):
logging.debug("_disconnect_from_dut")
- with EventCallbackStream(self.device_under_test.hci.FetchDisconnection(empty_pb2.Empty())) as dut_disconnection_stream:
+ with EventCallbackStream(
+ self.device_under_test.hci.FetchDisconnection(
+ empty_pb2.Empty())) as dut_disconnection_stream:
dut_disconnection_asserts = EventAsserts(dut_disconnection_stream)
self.device_under_test.hci.Disconnect(self.cert_address)
dut_disconnection_asserts.assert_event_occurs(
@@ -114,15 +116,17 @@ class SimpleHciTest(GdBaseTestClass):
def test_connect_disconnect_send_acl(self):
self._connect_from_dut()
- with EventCallbackStream(self.cert_device.hci.FetchAclData(empty_pb2.Empty())) as cert_acl_stream:
+ with EventCallbackStream(
+ self.cert_device.hci.FetchAclData(
+ empty_pb2.Empty())) as cert_acl_stream:
cert_acl_asserts = EventAsserts(cert_acl_stream)
- acl_data = hci_facade_pb2.AclData(remote=self.cert_address, payload=b'123')
+ acl_data = hci_facade_pb2.AclData(
+ remote=self.cert_address, payload=b'123')
self.device_under_test.hci.SendAclData(acl_data)
self.device_under_test.hci.SendAclData(acl_data)
self.device_under_test.hci.SendAclData(acl_data)
cert_acl_asserts.assert_event_occurs(
- lambda packet : b'123' in packet.payload
- and packet.remote == self.dut_address
+ lambda packet: b'123' in packet.payload and packet.remote == self.dut_address
)
self._disconnect_from_dut()
@@ -130,67 +134,74 @@ class SimpleHciTest(GdBaseTestClass):
def test_connect_disconnect_receive_acl(self):
self._connect_from_dut()
- with EventCallbackStream(self.device_under_test.hci.FetchAclData(empty_pb2.Empty())) as dut_acl_stream:
+ with EventCallbackStream(
+ self.device_under_test.hci.FetchAclData(
+ empty_pb2.Empty())) as dut_acl_stream:
dut_acl_asserts = EventAsserts(dut_acl_stream)
- acl_data = hci_cert_pb2.AclData(remote=self.dut_address, payload=b'123')
+ acl_data = hci_cert_pb2.AclData(
+ remote=self.dut_address, payload=b'123')
self.cert_device.hci.SendAclData(acl_data)
self.cert_device.hci.SendAclData(acl_data)
self.cert_device.hci.SendAclData(acl_data)
dut_acl_asserts.assert_event_occurs(
- lambda packet : b'123' in packet.payload
- and packet.remote == self.cert_address
+ lambda packet: b'123' in packet.payload and packet.remote == self.cert_address
)
self._disconnect_from_dut()
def test_reject_connection_request(self):
- with EventCallbackStream(self.device_under_test.hci.FetchConnectionFailed(empty_pb2.Empty())) as dut_connection_failed_stream:
- dut_connection_failed_asserts = EventAsserts(dut_connection_failed_stream)
+ with EventCallbackStream(
+ self.device_under_test.hci.FetchConnectionFailed(
+ empty_pb2.Empty())) as dut_connection_failed_stream:
+ dut_connection_failed_asserts = EventAsserts(
+ dut_connection_failed_stream)
self.device_under_test.hci.Connect(self.cert_address)
dut_connection_failed_asserts.assert_event_occurs(
- lambda event : event.remote == self.cert_address
- )
+ lambda event: event.remote == self.cert_address)
def test_send_classic_security_command(self):
self._connect_from_dut()
- with EventCallbackStream(self.device_under_test.hci_classic_security.FetchCommandCompleteEvent(empty_pb2.Empty())) as dut_command_complete_stream:
- dut_command_complete_asserts = EventAsserts(dut_command_complete_stream)
+ with EventCallbackStream(
+ self.device_under_test.hci_classic_security.
+ FetchCommandCompleteEvent(
+ empty_pb2.Empty())) as dut_command_complete_stream:
+ dut_command_complete_asserts = EventAsserts(
+ dut_command_complete_stream)
- self.device_under_test.hci.AuthenticationRequested(self.cert_address)
+ self.device_under_test.hci.AuthenticationRequested(
+ self.cert_address)
# Link request
- self.device_under_test.hci_classic_security.LinkKeyRequestNegativeReply(self.cert_address)
+ self.device_under_test.hci_classic_security.LinkKeyRequestNegativeReply(
+ self.cert_address)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x040c
- )
+ lambda event: event.command_opcode == 0x040c)
# Pin code request
message = hci_facade_pb2.PinCodeRequestReplyMessage(
remote=self.cert_address,
len=4,
- pin_code=bytes("1234", encoding = "ASCII")
- )
- self.device_under_test.hci_classic_security.PinCodeRequestReply(message)
+ pin_code=bytes("1234", encoding="ASCII"))
+ self.device_under_test.hci_classic_security.PinCodeRequestReply(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x040d
- )
- self.device_under_test.hci_classic_security.PinCodeRequestNegativeReply(self.cert_address)
+ lambda event: event.command_opcode == 0x040d)
+ self.device_under_test.hci_classic_security.PinCodeRequestNegativeReply(
+ self.cert_address)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x040e
- )
+ lambda event: event.command_opcode == 0x040e)
# IO capability request
message = hci_facade_pb2.IoCapabilityRequestReplyMessage(
remote=self.cert_address,
io_capability=0,
oob_present=0,
- authentication_requirements=0
- )
- self.device_under_test.hci_classic_security.IoCapabilityRequestReply(message)
+ authentication_requirements=0)
+ self.device_under_test.hci_classic_security.IoCapabilityRequestReply(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x042b
- )
+ lambda event: event.command_opcode == 0x042b)
# message = hci_facade_pb2.IoCapabilityRequestNegativeReplyMessage(
# remote=self.cert_address,
@@ -200,135 +211,136 @@ class SimpleHciTest(GdBaseTestClass):
# self.device_under_test.hci_classic_security.IoCapabilityRequestNegativeReply(message)
# User confirm request
- self.device_under_test.hci_classic_security.UserConfirmationRequestReply(self.cert_address)
+ self.device_under_test.hci_classic_security.UserConfirmationRequestReply(
+ self.cert_address)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x042c
- )
+ lambda event: event.command_opcode == 0x042c)
message = hci_facade_pb2.LinkKeyRequestReplyMessage(
remote=self.cert_address,
- link_key=bytes("4C68384139F574D836BCF34E9DFB01BF", encoding = "ASCII")
- )
- self.device_under_test.hci_classic_security.LinkKeyRequestReply(message)
+ link_key=bytes(
+ "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII"))
+ self.device_under_test.hci_classic_security.LinkKeyRequestReply(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x040b
- )
+ lambda event: event.command_opcode == 0x040b)
- self.device_under_test.hci_classic_security.UserConfirmationRequestNegativeReply(self.cert_address)
+ self.device_under_test.hci_classic_security.UserConfirmationRequestNegativeReply(
+ self.cert_address)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x042d
- )
+ lambda event: event.command_opcode == 0x042d)
# User passkey request
message = hci_facade_pb2.UserPasskeyRequestReplyMessage(
remote=self.cert_address,
passkey=999999,
)
- self.device_under_test.hci_classic_security.UserPasskeyRequestReply(message)
+ self.device_under_test.hci_classic_security.UserPasskeyRequestReply(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x042e
- )
+ lambda event: event.command_opcode == 0x042e)
- self.device_under_test.hci_classic_security.UserPasskeyRequestNegativeReply(self.cert_address)
+ self.device_under_test.hci_classic_security.UserPasskeyRequestNegativeReply(
+ self.cert_address)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x042f
- )
+ lambda event: event.command_opcode == 0x042f)
# Remote OOB data request
message = hci_facade_pb2.RemoteOobDataRequestReplyMessage(
remote=self.cert_address,
- c=b'\x19\x20\x21\x22\x23\x24\x25\x26\x19\x20\x21\x22\x23\x24\x25\x26',
- r=b'\x30\x31\x32\x33\x34\x35\x36\x37\x30\x31\x32\x33\x34\x35\x36\x37',
+ c=
+ b'\x19\x20\x21\x22\x23\x24\x25\x26\x19\x20\x21\x22\x23\x24\x25\x26',
+ r=
+ b'\x30\x31\x32\x33\x34\x35\x36\x37\x30\x31\x32\x33\x34\x35\x36\x37',
)
- self.device_under_test.hci_classic_security.RemoteOobDataRequestReply(message)
+ self.device_under_test.hci_classic_security.RemoteOobDataRequestReply(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0430
- )
- self.device_under_test.hci_classic_security.RemoteOobDataRequestNegativeReply(self.cert_address)
+ lambda event: event.command_opcode == 0x0430)
+ self.device_under_test.hci_classic_security.RemoteOobDataRequestNegativeReply(
+ self.cert_address)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0433
- )
+ lambda event: event.command_opcode == 0x0433)
# Read/Write/Delete link key
message = hci_facade_pb2.ReadStoredLinkKeyMessage(
remote=self.cert_address,
- read_all_flag = 0,
+ read_all_flag=0,
)
- self.device_under_test.hci_classic_security.ReadStoredLinkKey(message)
+ self.device_under_test.hci_classic_security.ReadStoredLinkKey(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c0d
- )
+ lambda event: event.command_opcode == 0x0c0d)
message = hci_facade_pb2.WriteStoredLinkKeyMessage(
num_keys_to_write=1,
remote=self.cert_address,
- link_keys=bytes("4C68384139F574D836BCF34E9DFB01BF", encoding = "ASCII"),
+ link_keys=bytes(
+ "4C68384139F574D836BCF34E9DFB01BF", encoding="ASCII"),
)
- self.device_under_test.hci_classic_security.WriteStoredLinkKey(message)
+ self.device_under_test.hci_classic_security.WriteStoredLinkKey(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c11
- )
+ lambda event: event.command_opcode == 0x0c11)
message = hci_facade_pb2.DeleteStoredLinkKeyMessage(
remote=self.cert_address,
- delete_all_flag = 0,
+ delete_all_flag=0,
)
- self.device_under_test.hci_classic_security.DeleteStoredLinkKey(message)
+ self.device_under_test.hci_classic_security.DeleteStoredLinkKey(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c12
- )
+ lambda event: event.command_opcode == 0x0c12)
# Refresh Encryption Key
message = hci_facade_pb2.RefreshEncryptionKeyMessage(
- connection_handle=self.connection_handle,
- )
- self.device_under_test.hci_classic_security.RefreshEncryptionKey(message)
+ connection_handle=self.connection_handle,)
+ self.device_under_test.hci_classic_security.RefreshEncryptionKey(
+ message)
# Read/Write Simple Pairing Mode
- self.device_under_test.hci_classic_security.ReadSimplePairingMode(empty_pb2.Empty())
+ self.device_under_test.hci_classic_security.ReadSimplePairingMode(
+ empty_pb2.Empty())
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c55
- )
+ lambda event: event.command_opcode == 0x0c55)
message = hci_facade_pb2.WriteSimplePairingModeMessage(
- simple_pairing_mode=1,
- )
- self.device_under_test.hci_classic_security.WriteSimplePairingMode(message)
+ simple_pairing_mode=1,)
+ self.device_under_test.hci_classic_security.WriteSimplePairingMode(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c56
- )
+ lambda event: event.command_opcode == 0x0c56)
# Read local oob data
- self.device_under_test.hci_classic_security.ReadLocalOobData(empty_pb2.Empty())
+ self.device_under_test.hci_classic_security.ReadLocalOobData(
+ empty_pb2.Empty())
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c57
- )
+ lambda event: event.command_opcode == 0x0c57)
# Send keypress notification
message = hci_facade_pb2.SendKeypressNotificationMessage(
remote=self.cert_address,
notification_type=1,
)
- self.device_under_test.hci_classic_security.SendKeypressNotification(message)
+ self.device_under_test.hci_classic_security.SendKeypressNotification(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c60
- )
+ lambda event: event.command_opcode == 0x0c60)
# Read local oob extended data
- self.device_under_test.hci_classic_security.ReadLocalOobExtendedData(empty_pb2.Empty())
+ self.device_under_test.hci_classic_security.ReadLocalOobExtendedData(
+ empty_pb2.Empty())
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x0c7d
- )
+ lambda event: event.command_opcode == 0x0c7d)
# Read Encryption key size
message = hci_facade_pb2.ReadEncryptionKeySizeMessage(
- connection_handle=self.connection_handle,
- )
- self.device_under_test.hci_classic_security.ReadEncryptionKeySize(message)
+ connection_handle=self.connection_handle,)
+ self.device_under_test.hci_classic_security.ReadEncryptionKeySize(
+ message)
dut_command_complete_asserts.assert_event_occurs(
- lambda event: event.command_opcode == 0x1408
- )
+ lambda event: event.command_opcode == 0x1408)
self._disconnect_from_dut()
@@ -340,5 +352,6 @@ class SimpleHciTest(GdBaseTestClass):
def test_classic_connection_management_command(self):
self._connect_from_dut()
- self.device_under_test.hci.TestClassicConnectionManagementCommands(self.cert_address)
- self._disconnect_from_dut() \ No newline at end of file
+ self.device_under_test.hci.TestClassicConnectionManagementCommands(
+ self.cert_address)
+ self._disconnect_from_dut()
diff --git a/system/gd/l2cap/classic/cert/pts_l2cap_test.py b/system/gd/l2cap/classic/cert/pts_l2cap_test.py
index 2b2fe6446a..f4d026a6d3 100644
--- a/system/gd/l2cap/classic/cert/pts_l2cap_test.py
+++ b/system/gd/l2cap/classic/cert/pts_l2cap_test.py
@@ -26,18 +26,19 @@ from google.protobuf import empty_pb2
class PTSL2capTest(PTSBaseTestClass):
+
def setup_test(self):
self.device_under_test = self.gd_devices[0]
self.device_under_test.rootservice.StartStack(
facade_rootservice_pb2.StartStackRequest(
- module_under_test=facade_rootservice_pb2.BluetoothModule.Value('L2CAP'),
- )
- )
+ module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
+ 'L2CAP'),))
self.device_under_test.wait_channel_ready()
- dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address
+ dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(
+ empty_pb2.Empty()).address
pts_address = self.controller_configs.get('pts_address').lower()
self.device_under_test.address = dut_address
@@ -48,26 +49,28 @@ class PTSL2capTest(PTSBaseTestClass):
def teardown_test(self):
self.device_under_test.rootservice.StopStack(
- facade_rootservice_pb2.StopStackRequest()
- )
+ facade_rootservice_pb2.StopStackRequest())
def _dut_connection_stream(self):
- return EventCallbackStream(self.device_under_test.l2cap.FetchConnectionComplete(empty_pb2.Empty()))
+ return EventCallbackStream(
+ self.device_under_test.l2cap.FetchConnectionComplete(
+ empty_pb2.Empty()))
def _dut_connection_close_stream(self):
- return EventCallbackStream(self.device_under_test.l2cap.FetchConnectionClose(empty_pb2.Empty()))
+ return EventCallbackStream(
+ self.device_under_test.l2cap.FetchConnectionClose(
+ empty_pb2.Empty()))
def _assert_connection_complete(self, due_connection_asserts, timeout=30):
due_connection_asserts.assert_event_occurs(
- lambda device : device.remote.address == self.pts_address.address,
- timeout=timedelta(seconds=timeout)
- )
+ lambda device: device.remote.address == self.pts_address.address,
+ timeout=timedelta(seconds=timeout))
- def _assert_connection_close(self, due_connection_close_asserts, timeout=30):
+ def _assert_connection_close(self, due_connection_close_asserts,
+ timeout=30):
due_connection_close_asserts.assert_event_occurs(
- lambda device : device.remote.address == self.pts_address.address,
- timeout=timedelta(seconds=timeout)
- )
+ lambda device: device.remote.address == self.pts_address.address,
+ timeout=timedelta(seconds=timeout))
def test_L2CAP_COS_CED_BV_01_C(self):
"""
@@ -78,13 +81,17 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.pts_address, psm=psm))
+ self.device_under_test.l2cap.OpenChannel(
+ l2cap_facade_pb2.OpenChannelRequest(
+ remote=self.pts_address, psm=psm))
self._assert_connection_complete(due_connection_asserts)
- self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm))
+ self.device_under_test.l2cap.CloseChannel(
+ l2cap_facade_pb2.CloseChannelRequest(psm=psm))
self._assert_connection_close(due_connection_close_asserts)
def test_L2CAP_COS_CED_BV_03_C(self):
@@ -95,14 +102,20 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_complete(due_connection_asserts)
- self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'*34))
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(
+ psm=psm, payload=b'abc' * 34))
self._assert_connection_close(due_connection_close_asserts)
def test_L2CAP_COS_CED_BV_04_C(self):
@@ -113,14 +126,19 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_complete(due_connection_asserts)
time.sleep(2)
- self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm))
+ self.device_under_test.l2cap.CloseChannel(
+ l2cap_facade_pb2.CloseChannelRequest(psm=psm))
self._assert_connection_close(due_connection_close_asserts)
def test_L2CAP_COS_CED_BV_05_C(self):
@@ -131,11 +149,15 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_complete(due_connection_asserts)
self._assert_connection_close(due_connection_close_asserts)
@@ -147,11 +169,15 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_complete(due_connection_asserts)
self._assert_connection_close(due_connection_close_asserts)
@@ -163,11 +189,15 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
time.sleep(120)
@@ -179,11 +209,15 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_complete(due_connection_asserts)
self._assert_connection_close(due_connection_close_asserts)
@@ -195,11 +229,15 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_complete(due_connection_asserts)
self._assert_connection_close(due_connection_close_asserts)
@@ -211,11 +249,15 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_complete(due_connection_asserts)
time.sleep(5)
@@ -228,11 +270,15 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.BASIC))
self._assert_connection_close(due_connection_close_asserts)
def test_L2CAP_COS_CFD_BV_08_C(self):
@@ -244,15 +290,18 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.pts_address, psm=psm))
+ self.device_under_test.l2cap.OpenChannel(
+ l2cap_facade_pb2.OpenChannelRequest(
+ remote=self.pts_address, psm=psm))
self._assert_connection_complete(due_connection_asserts)
- self.device_under_test.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=psm))
+ self.device_under_test.l2cap.CloseChannel(
+ l2cap_facade_pb2.CloseChannelRequest(psm=psm))
self._assert_connection_close(due_connection_close_asserts)
-
def test_L2CAP_ERM_BI_01_C(self):
"""
L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted]
@@ -262,10 +311,14 @@ class PTSL2capTest(PTSBaseTestClass):
with self._dut_connection_stream() as dut_connection_stream, \
self._dut_connection_close_stream() as dut_connection_close_stream:
due_connection_asserts = EventAsserts(dut_connection_stream)
- due_connection_close_asserts = EventAsserts(dut_connection_close_stream)
+ due_connection_close_asserts = EventAsserts(
+ dut_connection_close_stream)
psm = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.ERTM))
self._assert_connection_complete(due_connection_asserts)
self._pending_connection_close(timeout=60)
diff --git a/system/gd/l2cap/classic/cert/simple_l2cap_test.py b/system/gd/l2cap/classic/cert/simple_l2cap_test.py
index b14f5abc2e..cdc0e752c8 100644
--- a/system/gd/l2cap/classic/cert/simple_l2cap_test.py
+++ b/system/gd/l2cap/classic/cert/simple_l2cap_test.py
@@ -36,57 +36,69 @@ ASYNC_OP_TIME_SECONDS = 1 # TODO: Use events to synchronize events instead
def is_connection_request(log):
return log.HasField("connection_request")
+
def is_connection_response(log):
return log.HasField("connection_response")
+
def is_configuration_request(log):
return log.HasField("configuration_request")
+
def is_configuration_response(log):
return log.HasField("configuration_response")
+
def is_disconnection_request(log):
return log.HasField("disconnection_request")
+
def is_disconnection_response(log):
return log.HasField("disconnection_response")
+
def is_echo_response(log):
return log.HasField("echo_response")
+
def is_information_request(log):
return log.HasField("information_request")
+
def is_information_response(log):
return log.HasField("information_response")
+
def is_command_reject(log):
return log.HasField("command_reject")
+
def basic_frame_to_enhanced_information_frame(information_payload):
return information_payload[2:]
+
class SimpleL2capTest(GdBaseTestClass):
+
def setup_test(self):
self.device_under_test = self.gd_devices[0]
self.cert_device = self.gd_cert_devices[0]
self.device_under_test.rootservice.StartStack(
facade_rootservice_pb2.StartStackRequest(
- module_under_test=facade_rootservice_pb2.BluetoothModule.Value('L2CAP'),
- )
- )
+ module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
+ 'L2CAP'),))
self.cert_device.rootservice.StartStack(
cert_rootservice_pb2.StartStackRequest(
- module_to_test=cert_rootservice_pb2.BluetoothModule.Value('L2CAP'),
- )
- )
+ module_to_test=cert_rootservice_pb2.BluetoothModule.Value(
+ 'L2CAP'),))
self.device_under_test.wait_channel_ready()
self.cert_device.wait_channel_ready()
- dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address
+ dut_address = self.device_under_test.controller_read_only_property.ReadLocalAddress(
+ empty_pb2.Empty()).address
self.device_under_test.address = dut_address
- cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(empty_pb2.Empty()).address
+ cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(
+ empty_pb2.Empty()).address
self.cert_device.address = cert_address
self.dut_address = common_pb2.BluetoothAddress(
@@ -100,97 +112,133 @@ class SimpleL2capTest(GdBaseTestClass):
def teardown_test(self):
self.device_under_test.rootservice.StopStack(
- facade_rootservice_pb2.StopStackRequest()
- )
+ facade_rootservice_pb2.StopStackRequest())
self.cert_device.rootservice.StopStack(
- cert_rootservice_pb2.StopStackRequest()
- )
+ cert_rootservice_pb2.StopStackRequest())
def _register_callbacks(self, event_callback_stream):
+
def handle_connection_request(log):
log = log.connection_request
- self.cert_device.l2cap.SendConnectionResponse(l2cap_cert_pb2.ConnectionResponse(dcid=self.next_scid,scid=log.scid,
- signal_id=log.signal_id))
+ self.cert_device.l2cap.SendConnectionResponse(
+ l2cap_cert_pb2.ConnectionResponse(
+ dcid=self.next_scid, scid=log.scid,
+ signal_id=log.signal_id))
self.scid_dcid_map[self.next_scid] = log.scid
self.next_scid += 1
- self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest(
- dcid=log.scid,
- signal_id=log.signal_id+1,
- retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig(
- mode=self.retransmission_mode
- )))
+ self.cert_device.l2cap.SendConfigurationRequest(
+ l2cap_cert_pb2.ConfigurationRequest(
+ dcid=log.scid,
+ signal_id=log.signal_id + 1,
+ retransmission_config=l2cap_cert_pb2.
+ ChannelRetransmissionFlowControlConfig(
+ mode=self.retransmission_mode)))
+
self.handle_connection_request = handle_connection_request
- event_callback_stream.register_callback(self.handle_connection_request, matcher_fn=is_connection_request)
+ event_callback_stream.register_callback(
+ self.handle_connection_request, matcher_fn=is_connection_request)
def handle_connection_response(log):
log = log.connection_response
self.scid_dcid_map[log.scid] = log.dcid
- self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest(
- dcid=log.dcid,
- signal_id=log.signal_id+1,
- retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig(
- mode=self.retransmission_mode
- )))
+ self.cert_device.l2cap.SendConfigurationRequest(
+ l2cap_cert_pb2.ConfigurationRequest(
+ dcid=log.dcid,
+ signal_id=log.signal_id + 1,
+ retransmission_config=l2cap_cert_pb2.
+ ChannelRetransmissionFlowControlConfig(
+ mode=self.retransmission_mode)))
+
self.handle_connection_request = handle_connection_request
- event_callback_stream.register_callback(self.handle_connection_response, matcher_fn=is_connection_response)
+ event_callback_stream.register_callback(
+ self.handle_connection_response, matcher_fn=is_connection_response)
def handle_configuration_request(log):
log = log.configuration_request
if log.dcid not in self.scid_dcid_map:
return
dcid = self.scid_dcid_map[log.dcid]
- self.cert_device.l2cap.SendConfigurationResponse(l2cap_cert_pb2.ConfigurationResponse(
- scid=dcid,
- signal_id=log.signal_id,
- retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig(mode=self.retransmission_mode)
- ))
+ self.cert_device.l2cap.SendConfigurationResponse(
+ l2cap_cert_pb2.ConfigurationResponse(
+ scid=dcid,
+ signal_id=log.signal_id,
+ retransmission_config=l2cap_cert_pb2.
+ ChannelRetransmissionFlowControlConfig(
+ mode=self.retransmission_mode)))
+
self.handle_configuration_request = handle_configuration_request
- event_callback_stream.register_callback(self.handle_configuration_request, matcher_fn=is_configuration_request)
+ event_callback_stream.register_callback(
+ self.handle_configuration_request,
+ matcher_fn=is_configuration_request)
def handle_disconnection_request(log):
log = log.disconnection_request
- self.cert_device.l2cap.SendDisconnectionResponse(l2cap_cert_pb2.DisconnectionResponse(dcid=log.dcid,scid=log.scid,
- signal_id=log.signal_id))
+ self.cert_device.l2cap.SendDisconnectionResponse(
+ l2cap_cert_pb2.DisconnectionResponse(
+ dcid=log.dcid, scid=log.scid, signal_id=log.signal_id))
+
self.handle_disconnection_request = handle_disconnection_request
- event_callback_stream.register_callback(self.handle_disconnection_request, matcher_fn=is_disconnection_request)
+ event_callback_stream.register_callback(
+ self.handle_disconnection_request,
+ matcher_fn=is_disconnection_request)
def handle_information_request(log):
log = log.information_request
- self.cert_device.l2cap.SendInformationResponse(l2cap_cert_pb2.InformationResponse(type=log.type,
- signal_id=log.signal_id))
+ self.cert_device.l2cap.SendInformationResponse(
+ l2cap_cert_pb2.InformationResponse(
+ type=log.type, signal_id=log.signal_id))
+
self.handle_information_request = handle_information_request
- event_callback_stream.register_callback(self.handle_information_request, matcher_fn=is_information_request)
+ event_callback_stream.register_callback(
+ self.handle_information_request, matcher_fn=is_information_request)
self.event_dump = []
+
def dump_log(log):
self.event_dump.append(log)
+
self.dump_log = dump_log
event_callback_stream.register_callback(self.dump_log)
def _setup_link(self, event_asserts):
- self.cert_device.l2cap.SetupLink(l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address))
- event_asserts.assert_event_occurs(lambda log : log.HasField("link_up") and log.remote == self.dut_address)
+ self.cert_device.l2cap.SetupLink(
+ l2cap_cert_pb2.SetupLinkRequest(remote=self.dut_address))
+ event_asserts.assert_event_occurs(
+ lambda log: log.HasField("link_up") and log.remote == self.dut_address
+ )
def _open_channel(self, event_asserts, scid=0x0101, psm=0x33):
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
- self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
- event_asserts.assert_event_occurs(lambda log : is_configuration_response(log) and scid == log.scid)
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
+ self.cert_device.l2cap.SendConnectionRequest(
+ l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
+ event_asserts.assert_event_occurs(
+ lambda log: is_configuration_response(log) and scid == log.scid)
def test_connect(self):
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
self._open_channel(l2cap_event_asserts, scid=0x0101)
def test_connect_and_send_data_ertm_no_segmentation(self):
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
- self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM))
+ self.device_under_test.l2cap.RegisterChannel(
+ l2cap_facade_pb2.RegisterChannelRequest(channel=2))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=0x33,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.ERTM))
self._setup_link(l2cap_event_asserts)
scid = 0x0101
@@ -199,42 +247,68 @@ class SimpleL2capTest(GdBaseTestClass):
def on_data_received(log):
packet = log.data_packet
if (packet.channel == scid):
- self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=1, s=0))
- l2cap_log_stream.register_callback(on_data_received, matcher_fn=lambda log : log.HasField("data_packet"))
+ self.cert_device.l2cap.SendSFrame(
+ l2cap_cert_pb2.SFrame(
+ channel=self.scid_dcid_map[scid], req_seq=1, s=0))
+
+ l2cap_log_stream.register_callback(
+ on_data_received,
+ matcher_fn=lambda log: log.HasField("data_packet"))
- self.device_under_test.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
+ self.device_under_test.l2cap.SendL2capPacket(
+ l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
l2cap_event_asserts.assert_event_occurs(
lambda log : log.HasField("data_packet") and \
log.data_packet.channel == 2 and \
basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b"123")
- self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'*34))
- self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=1, tx_seq=0, sar=0, information=b"abcd"))
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(
+ psm=0x33, payload=b'abc' * 34))
+ self.cert_device.l2cap.SendIFrame(
+ l2cap_cert_pb2.IFrame(
+ channel=self.scid_dcid_map[scid],
+ req_seq=1,
+ tx_seq=0,
+ sar=0,
+ information=b"abcd"))
l2cap_event_asserts.assert_event_occurs(
lambda log : log.HasField("data_packet") and \
log.data_packet.channel == scid and \
basic_frame_to_enhanced_information_frame(log.data_packet.payload) == b"abc"*34)
def test_connect_and_send_data(self):
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
- self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33))
+ self.device_under_test.l2cap.RegisterChannel(
+ l2cap_facade_pb2.RegisterChannelRequest(channel=2))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33))
self._setup_link(l2cap_event_asserts)
scid = 0x0101
self._open_channel(l2cap_event_asserts, scid=scid)
- self.device_under_test.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 2 and log.data_packet.payload == b"123")
+ self.device_under_test.l2cap.SendL2capPacket(
+ l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == 2 and log.data_packet.payload == b"123"
+ )
- self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b"abc")
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b"abc"
+ )
def test_open_two_channels(self):
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
@@ -245,21 +319,29 @@ class SimpleL2capTest(GdBaseTestClass):
"""
L2CAP/COS/CED/BV-07-C
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
- scid=0x0101
+ scid = 0x0101
self._open_channel(l2cap_event_asserts, scid=scid, psm=0x1)
dcid = self.scid_dcid_map[scid]
- self.cert_device.l2cap.SendDisconnectionRequest(l2cap_cert_pb2.DisconnectionRequest(scid=scid, dcid=dcid, signal_id=2))
- l2cap_event_asserts.assert_event_occurs(lambda log : is_disconnection_response(log) and log.disconnection_response.scid == scid and log.disconnection_response.dcid == dcid)
+ self.cert_device.l2cap.SendDisconnectionRequest(
+ l2cap_cert_pb2.DisconnectionRequest(
+ scid=scid, dcid=dcid, signal_id=2))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: is_disconnection_response(log) and log.disconnection_response.scid == scid and log.disconnection_response.dcid == dcid
+ )
def test_disconnect_on_timeout(self):
"""
L2CAP/COS/CED/BV-08-C
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
@@ -267,12 +349,16 @@ class SimpleL2capTest(GdBaseTestClass):
psm = 1
self._open_channel(l2cap_event_asserts, scid=0x0101, psm=0x1)
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
# Don't send configuration response back
- l2cap_log_stream.unregister_callback(self.handle_configuration_request, matcher_fn=is_configuration_request)
+ l2cap_log_stream.unregister_callback(
+ self.handle_configuration_request,
+ matcher_fn=is_configuration_request)
- self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
+ self.cert_device.l2cap.SendConnectionRequest(
+ l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
l2cap_event_asserts.assert_none_matching(is_configuration_response)
def test_basic_operation_request_connection(self):
@@ -281,66 +367,88 @@ class SimpleL2capTest(GdBaseTestClass):
Verify that the IUT is able to request the connection establishment for an L2CAP data channel and
initiate the configuration procedure.
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
psm = 1
# TODO: Use another test case
- self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=psm))
- l2cap_event_asserts.assert_event_occurs(lambda log : is_connection_request(log) and log.connection_request.psm == psm)
+ self.device_under_test.l2cap.OpenChannel(
+ l2cap_facade_pb2.OpenChannelRequest(
+ remote=self.cert_address, psm=psm))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: is_connection_request(log) and log.connection_request.psm == psm
+ )
def test_respond_to_echo_request(self):
"""
L2CAP/COS/ECH/BV-01-C [Respond to Echo Request]
Verify that the IUT responds to an echo request.
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
# TODO: Replace with constructed packets when PDL is available
echo_request_packet = b"\x08\x01\x00\x00"
- self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=echo_request_packet))
- l2cap_event_asserts.assert_event_occurs(lambda log : is_echo_response(log) and log.echo_response.signal_id == 0x01)
+ self.cert_device.l2cap.SendL2capPacket(
+ l2cap_facade_pb2.L2capPacket(
+ channel=1, payload=echo_request_packet))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: is_echo_response(log) and log.echo_response.signal_id == 0x01
+ )
def test_reject_unknown_command(self):
"""
L2CAP/COS/CED/BI-01-C
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
# TODO: Replace with constructed packets when PDL is available
invalid_command_packet = b"\xff\x01\x00\x00"
- self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=invalid_command_packet))
+ self.cert_device.l2cap.SendL2capPacket(
+ l2cap_facade_pb2.L2capPacket(
+ channel=1, payload=invalid_command_packet))
# command_reject_packet = b"\x01\x01\x02\x00\x00\x00"
- l2cap_event_asserts.assert_event_occurs(lambda log : is_command_reject(log) and log.command_reject.signal_id == 0x01)
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: is_command_reject(log) and log.command_reject.signal_id == 0x01
+ )
def test_query_for_1_2_features(self):
"""
L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features]
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
signal_id = 3
self.cert_device.l2cap.SendInformationRequest(
l2cap_cert_pb2.InformationRequest(
- type=l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS, signal_id=signal_id))
+ type=l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS,
+ signal_id=signal_id))
l2cap_event_asserts.assert_event_occurs(
lambda log : is_information_response(log) and \
log.information_response.signal_id == signal_id and \
log.information_response.type == l2cap_cert_pb2.InformationRequestType.FIXED_CHANNELS)
-
def test_extended_feature_info_response_ertm(self):
"""
L2CAP/EXF/BV-01-C [Extended Features Information Response for Enhanced
Retransmission Mode]
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)
@@ -349,9 +457,12 @@ class SimpleL2capTest(GdBaseTestClass):
signal_id = 3
self.cert_device.l2cap.SendInformationRequest(
l2cap_cert_pb2.InformationRequest(
- type=l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES, signal_id=signal_id))
+ type=l2cap_cert_pb2.InformationRequestType.
+ EXTENDED_FEATURES,
+ signal_id=signal_id))
- l2cap_event_asserts_alt.assert_event_occurs_at_most_times(is_information_response, 1)
+ l2cap_event_asserts_alt.assert_event_occurs_at_most_times(
+ is_information_response, 1)
expected_log_type = l2cap_cert_pb2.InformationRequestType.EXTENDED_FEATURES
expected_mask = 1 << 3
@@ -365,54 +476,88 @@ class SimpleL2capTest(GdBaseTestClass):
"""
L2CAP/ERM/BV-01-C [Transmit I-frames]
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
- self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM))
+ self.device_under_test.l2cap.RegisterChannel(
+ l2cap_facade_pb2.RegisterChannelRequest(channel=2))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=0x33,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.ERTM))
self._setup_link(l2cap_event_asserts)
scid = 0x0101
self._open_channel(l2cap_event_asserts, scid=scid)
- self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
- self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=1, s=0))
- self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
- self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=2, s=0))
- self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
- self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=3, s=0))
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc')
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc')
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc')
- l2cap_event_asserts_alt.assert_event_occurs_at_most(lambda log : log.HasField("data_packet"), 3)
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ self.cert_device.l2cap.SendSFrame(
+ l2cap_cert_pb2.SFrame(
+ channel=self.scid_dcid_map[scid], req_seq=1, s=0))
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ self.cert_device.l2cap.SendSFrame(
+ l2cap_cert_pb2.SFrame(
+ channel=self.scid_dcid_map[scid], req_seq=2, s=0))
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ self.cert_device.l2cap.SendSFrame(
+ l2cap_cert_pb2.SFrame(
+ channel=self.scid_dcid_map[scid], req_seq=3, s=0))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc'
+ )
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc'
+ )
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == 33 and log.data_packet.payload == b'abc'
+ )
+ l2cap_event_asserts_alt.assert_event_occurs_at_most(
+ lambda log: log.HasField("data_packet"), 3)
def test_s_frame_transmissions_exceed_max_transmit(self):
"""
L2CAP/ERM/BV-11-C [S-Frame Transmissions Exceed MaxTransmit]
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
l2cap_event_asserts_alt = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self.retransmission_mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
- self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=0x33, retransmission_mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM))
+ self.device_under_test.l2cap.RegisterChannel(
+ l2cap_facade_pb2.RegisterChannelRequest(channel=2))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=0x33,
+ retransmission_mode=l2cap_facade_pb2.
+ RetransmissionFlowControlMode.ERTM))
self._setup_link(l2cap_event_asserts)
scid = 0x0101
self._open_channel(l2cap_event_asserts, scid=scid)
- self.device_under_test.l2cap.SendDynamicChannelPacket(l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ self.device_under_test.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
# Retransmission timer = 1, 1 * monitor timer = 2, so total timeout is 3
time.sleep(4)
l2cap_event_asserts.assert_event_occurs(lambda log : is_disconnection_request(log) and \
log.disconnection_request.dcid == scid and \
log.disconnection_request.scid == self.scid_dcid_map[scid])
- l2cap_event_asserts_alt.assert_event_occurs_at_most(lambda log : is_disconnection_request(log), 1)
+ l2cap_event_asserts_alt.assert_event_occurs_at_most(
+ lambda log: is_disconnection_request(log), 1)
def test_sent_rej_lost(self):
"""
L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted]
"""
- with EventCallbackStream(self.cert_device.l2cap.FetchL2capLog(empty_pb2.Empty())) as l2cap_log_stream:
+ with EventCallbackStream(
+ self.cert_device.l2cap.FetchL2capLog(
+ empty_pb2.Empty())) as l2cap_log_stream:
l2cap_event_asserts = EventAsserts(l2cap_log_stream)
self._register_callbacks(l2cap_log_stream)
self._setup_link(l2cap_event_asserts)
@@ -422,23 +567,36 @@ class SimpleL2capTest(GdBaseTestClass):
psm = 1
mode = l2cap_cert_pb2.ChannelRetransmissionFlowControlMode.ERTM
self.tx_window = 1
- self.device_under_test.l2cap.SetDynamicChannel(l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode))
- self.cert_device.l2cap.SendConnectionRequest(l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
+ self.device_under_test.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm, retransmission_mode=mode))
+ self.cert_device.l2cap.SendConnectionRequest(
+ l2cap_cert_pb2.ConnectionRequest(scid=scid, psm=psm))
+
+ l2cap_log_stream.unregister_callback(
+ self.handle_Connection_response,
+ matcher_fn=is_configuration_response)
- l2cap_log_stream.unregister_callback(self.handle_Connection_response, matcher_fn=is_configuration_response)
def handle_connection_response(log):
log = log.connection_response
self.scid_dcid_map[log.scid] = log.dcid
- self.cert_device.l2cap.SendConfigurationRequest(l2cap_cert_pb2.ConfigurationRequest(
- dcid= self.scid_dcid_map[scid],
- signal_id=signal_id + 1,
- retransmission_config=l2cap_cert_pb2.ChannelRetransmissionFlowControlConfig(
- mode=mode
- )))
- l2cap_log_stream.unregister_callback(handle_connection_response, matcher_fn=is_connection_response)
- l2cap_log_stream.register_callback(handle_connection_response, matcher_fn=is_connection_response)
-
- l2cap_log_stream.unregister_callback(self.handle_configuration_request, matcher_fn=is_configuration_request)
+ self.cert_device.l2cap.SendConfigurationRequest(
+ l2cap_cert_pb2.ConfigurationRequest(
+ dcid=self.scid_dcid_map[scid],
+ signal_id=signal_id + 1,
+ retransmission_config=l2cap_cert_pb2.
+ ChannelRetransmissionFlowControlConfig(mode=mode)))
+ l2cap_log_stream.unregister_callback(
+ handle_connection_response,
+ matcher_fn=is_connection_response)
+
+ l2cap_log_stream.register_callback(
+ handle_connection_response, matcher_fn=is_connection_response)
+
+ l2cap_log_stream.unregister_callback(
+ self.handle_configuration_request,
+ matcher_fn=is_configuration_request)
+
def handle_configuration_request(log):
log = log.configuration_request
if log.dcid not in self.scid_dcid_map:
@@ -446,21 +604,50 @@ class SimpleL2capTest(GdBaseTestClass):
dcid = self.scid_dcid_map[log.dcid]
if log.HasField("retransmission_config"):
self.tx_window = log.retransmission_config.tx_window
- self.cert_device.l2cap.SendConfigurationResponse(l2cap_cert_pb2.ConfigurationResponse(
- scid=dcid,
- signal_id=log.signal_id,
+ self.cert_device.l2cap.SendConfigurationResponse(
+ l2cap_cert_pb2.ConfigurationResponse(
+ scid=dcid,
+ signal_id=log.signal_id,
))
- l2cap_log_stream.unregister_callback(handle_configuration_request, matcher_fn=is_configuration_request)
- l2cap_log_stream.register_callback(handle_configuration_request, matcher_fn=is_configuration_request)
-
- self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=0, sar=0))
- self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(self.tx_window - 1), sar=0))
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01')
+ l2cap_log_stream.unregister_callback(
+ handle_configuration_request,
+ matcher_fn=is_configuration_request)
+
+ l2cap_log_stream.register_callback(
+ handle_configuration_request,
+ matcher_fn=is_configuration_request)
+
+ self.cert_device.l2cap.SendIFrame(
+ l2cap_cert_pb2.IFrame(
+ channel=self.scid_dcid_map[scid],
+ req_seq=0,
+ tx_seq=0,
+ sar=0))
+ self.cert_device.l2cap.SendIFrame(
+ l2cap_cert_pb2.IFrame(
+ channel=self.scid_dcid_map[scid],
+ req_seq=0,
+ tx_seq=(self.tx_window - 1),
+ sar=0))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x05\x01'
+ )
- self.cert_device.l2cap.SendSFrame(l2cap_cert_pb2.SFrame(channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0))
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x81\x01')
+ self.cert_device.l2cap.SendSFrame(
+ l2cap_cert_pb2.SFrame(
+ channel=self.scid_dcid_map[scid], req_seq=0, p=1, s=0))
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x81\x01'
+ )
for i in range(1, self.tx_window):
- self.cert_device.l2cap.SendIFrame(l2cap_cert_pb2.IFrame(channel=self.scid_dcid_map[scid], req_seq=0, tx_seq=(i), sar=0))
+ self.cert_device.l2cap.SendIFrame(
+ l2cap_cert_pb2.IFrame(
+ channel=self.scid_dcid_map[scid],
+ req_seq=0,
+ tx_seq=(i),
+ sar=0))
time.sleep(0.1)
- l2cap_event_asserts.assert_event_occurs(lambda log : log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x01\x0a') \ No newline at end of file
+ l2cap_event_asserts.assert_event_occurs(
+ lambda log: log.HasField("data_packet") and log.data_packet.channel == scid and log.data_packet.payload == b'\x01\x0a'
+ )
diff --git a/system/test/gen_coverage.py b/system/test/gen_coverage.py
index 926c3755bd..f2658dcb6f 100755
--- a/system/test/gen_coverage.py
+++ b/system/test/gen_coverage.py
@@ -25,7 +25,6 @@ import sys
import webbrowser
from run_host_unit_tests import *
-
"""
This script is used to generate code coverage results host supported libraries.
The script by default will generate an html report that summarizes the coverage
@@ -63,23 +62,28 @@ COVERAGE_TESTS = [
"covered_files": [
"packages/modules/Bluetooth/system/profile/avrcp",
],
- }, {
+ },
+ {
"test_name": "bluetooth_test_sdp",
"covered_files": [
"packages/modules/Bluetooth/system/profile/sdp",
],
- }, {
- "test_name": "test-vendor_test_host",
+ },
+ {
+ "test_name":
+ "test-vendor_test_host",
"covered_files": [
"packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/include",
"packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/src",
],
- }, {
+ },
+ {
"test_name": "rootcanal-packets_test_host",
"covered_files": [
"packages/modules/Bluetooth/system/vendor_libs/test_vendor_lib/packets",
],
- }, {
+ },
+ {
"test_name": "bluetooth_test_common",
"covered_files": [
"packages/modules/Bluetooth/system/common",
@@ -93,320 +97,342 @@ LLVM_DIR = 'prebuilts/clang/host/linux-x86/clang-r353983b/bin'
LLVM_MERGE = LLVM_DIR + '/llvm-profdata'
LLVM_COV = LLVM_DIR + '/llvm-cov'
+
def write_root_html_head(f):
- # Write the header part of the root html file. This was pulled from the
- # page source of one of the generated html files.
- f.write("<!doctype html><html><head>" \
- "<meta name='viewport' content='width=device-width,initial-scale=1'><met" \
- "a charset='UTF-8'><link rel='stylesheet' type='text/css' href='style.cs" \
- "s'></head><body><h2>Coverage Report</h2><h4>Created: " +
- str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M')) +
- "</h4><p>Click <a href='http://clang.llvm.org/docs/SourceBasedCodeCovera" \
- "ge.html#interpreting-reports'>here</a> for information about interpreti" \
- "ng this report.</p><div class='centered'><table><tr><td class='column-e" \
- "ntry-bold'>Filename</td><td class='column-entry-bold'>Function Coverage" \
- "</td><td class='column-entry-bold'>Instantiation Coverage</td><td class" \
- "='column-entry-bold'>Line Coverage</td><td class='column-entry-bold'>Re" \
- "gion Coverage</td></tr>"
- )
+ # Write the header part of the root html file. This was pulled from the
+ # page source of one of the generated html files.
+ f.write("<!doctype html><html><head>" \
+ "<meta name='viewport' content='width=device-width,initial-scale=1'><met" \
+ "a charset='UTF-8'><link rel='stylesheet' type='text/css' href='style.cs" \
+ "s'></head><body><h2>Coverage Report</h2><h4>Created: " +
+ str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M')) +
+ "</h4><p>Click <a href='http://clang.llvm.org/docs/SourceBasedCodeCovera" \
+ "ge.html#interpreting-reports'>here</a> for information about interpreti" \
+ "ng this report.</p><div class='centered'><table><tr><td class='column-e" \
+ "ntry-bold'>Filename</td><td class='column-entry-bold'>Function Coverage" \
+ "</td><td class='column-entry-bold'>Instantiation Coverage</td><td class" \
+ "='column-entry-bold'>Line Coverage</td><td class='column-entry-bold'>Re" \
+ "gion Coverage</td></tr>"
+ )
def write_root_html_column(f, covered, count):
- percent = covered * 100.0 / count
- value = "%.2f%% (%d/%d) " % (percent, covered, count)
- color = 'column-entry-yellow'
- if percent == 100:
- color = 'column-entry-green'
- if percent < 80.0:
- color = 'column-entry-red'
- f.write("<td class=\'" + color + "\'><pre>" + value + "</pre></td>")
+ percent = covered * 100.0 / count
+ value = "%.2f%% (%d/%d) " % (percent, covered, count)
+ color = 'column-entry-yellow'
+ if percent == 100:
+ color = 'column-entry-green'
+ if percent < 80.0:
+ color = 'column-entry-red'
+ f.write("<td class=\'" + color + "\'><pre>" + value + "</pre></td>")
def write_root_html_rows(f, tests):
- totals = {
- "functions":{
- "covered": 0,
- "count": 0
- },
- "instantiations":{
- "covered": 0,
- "count": 0
- },
- "lines":{
- "covered": 0,
- "count": 0
- },
- "regions":{
- "covered": 0,
- "count": 0
- }
- }
-
- # Write the tests with their coverage summaries.
- for test in tests:
- test_name = test['test_name']
- covered_files = test['covered_files']
- json_results = generate_coverage_json(test)
- test_totals = json_results['data'][0]['totals']
-
- f.write("<tr class='light-row'><td><pre><a href=\'" +
- os.path.join(test_name, "index.html") + "\'>" + test_name +
- "</a></pre></td>")
+ totals = {
+ "functions": {
+ "covered": 0,
+ "count": 0
+ },
+ "instantiations": {
+ "covered": 0,
+ "count": 0
+ },
+ "lines": {
+ "covered": 0,
+ "count": 0
+ },
+ "regions": {
+ "covered": 0,
+ "count": 0
+ }
+ }
+
+ # Write the tests with their coverage summaries.
+ for test in tests:
+ test_name = test['test_name']
+ covered_files = test['covered_files']
+ json_results = generate_coverage_json(test)
+ test_totals = json_results['data'][0]['totals']
+
+ f.write("<tr class='light-row'><td><pre><a href=\'" +
+ os.path.join(test_name, "index.html") + "\'>" + test_name +
+ "</a></pre></td>")
+ for field_name in ['functions', 'instantiations', 'lines', 'regions']:
+ field = test_totals[field_name]
+ totals[field_name]['covered'] += field['covered']
+ totals[field_name]['count'] += field['count']
+ write_root_html_column(f, field['covered'], field['count'])
+ f.write("</tr>")
+
+ #Write the totals row.
+ f.write("<tr class='light-row-bold'><td><pre>Totals</a></pre></td>")
for field_name in ['functions', 'instantiations', 'lines', 'regions']:
- field = test_totals[field_name]
- totals[field_name]['covered'] += field['covered']
- totals[field_name]['count'] += field['count']
- write_root_html_column(f, field['covered'], field['count'])
- f.write("</tr>");
-
- #Write the totals row.
- f.write("<tr class='light-row-bold'><td><pre>Totals</a></pre></td>")
- for field_name in ['functions', 'instantiations', 'lines', 'regions']:
- field = totals[field_name]
- write_root_html_column(f, field['covered'], field['count'])
- f.write("</tr>");
+ field = totals[field_name]
+ write_root_html_column(f, field['covered'], field['count'])
+ f.write("</tr>")
def write_root_html_tail(f):
- # Pulled from the generated html coverage report.
- f.write("</table></div><h5>Generated by llvm-cov -- llvm version 7.0.2svn<" \
- "/h5></body></html>")
+ # Pulled from the generated html coverage report.
+ f.write("</table></div><h5>Generated by llvm-cov -- llvm version 7.0.2svn<" \
+ "/h5></body></html>")
def generate_root_html(tests):
- # Copy the css file from one of the coverage reports.
- source_file = os.path.join(os.path.join(WORKING_DIR, tests[0]['test_name']), "style.css")
- dest_file = os.path.join(WORKING_DIR, "style.css")
- shutil.copy2(source_file, dest_file)
+ # Copy the css file from one of the coverage reports.
+ source_file = os.path.join(
+ os.path.join(WORKING_DIR, tests[0]['test_name']), "style.css")
+ dest_file = os.path.join(WORKING_DIR, "style.css")
+ shutil.copy2(source_file, dest_file)
- # Write the root index.html file that sumarizes all the tests.
- f = open(os.path.join(WORKING_DIR, "index.html"), "w")
- write_root_html_head(f)
- write_root_html_rows(f, tests)
- write_root_html_tail(f)
+ # Write the root index.html file that sumarizes all the tests.
+ f = open(os.path.join(WORKING_DIR, "index.html"), "w")
+ write_root_html_head(f)
+ write_root_html_rows(f, tests)
+ write_root_html_tail(f)
def get_profraw_for_test(test_name):
- test_root = get_native_test_root_or_die()
- test_cmd = os.path.join(os.path.join(test_root, test_name), test_name)
- if not os.path.isfile(test_cmd):
- logging.error('The test ' + test_name + ' does not exist, please compile first')
- sys.exit(1)
-
- profraw_file_name = test_name + ".profraw"
- profraw_path = os.path.join(WORKING_DIR, os.path.join(test_name, profraw_file_name))
- llvm_env_var = "LLVM_PROFILE_FILE=\"" + profraw_path + "\""
-
- test_cmd = llvm_env_var + " " + test_cmd
- logging.info('Generating profraw data for ' + test_name)
- logging.debug('cmd: ' + test_cmd)
- if subprocess.call(test_cmd, shell=True) != 0:
- logging.error('Test ' + test_name + ' failed. Please fix the test before generating coverage.')
- sys.exit(1)
-
- if not os.path.isfile(profraw_path):
- logging.error('Generating the profraw file failed. Did you remember to add the proper compiler flags to your build?')
- sys.exit(1)
-
- return profraw_file_name
+ test_root = get_native_test_root_or_die()
+ test_cmd = os.path.join(os.path.join(test_root, test_name), test_name)
+ if not os.path.isfile(test_cmd):
+ logging.error('The test ' + test_name +
+ ' does not exist, please compile first')
+ sys.exit(1)
+
+ profraw_file_name = test_name + ".profraw"
+ profraw_path = os.path.join(WORKING_DIR,
+ os.path.join(test_name, profraw_file_name))
+ llvm_env_var = "LLVM_PROFILE_FILE=\"" + profraw_path + "\""
+
+ test_cmd = llvm_env_var + " " + test_cmd
+ logging.info('Generating profraw data for ' + test_name)
+ logging.debug('cmd: ' + test_cmd)
+ if subprocess.call(test_cmd, shell=True) != 0:
+ logging.error(
+ 'Test ' + test_name +
+ ' failed. Please fix the test before generating coverage.')
+ sys.exit(1)
+
+ if not os.path.isfile(profraw_path):
+ logging.error(
+ 'Generating the profraw file failed. Did you remember to add the proper compiler flags to your build?'
+ )
+ sys.exit(1)
+
+ return profraw_file_name
def merge_profraw_data(test_name):
- cmd = []
- cmd.append(os.path.join(get_android_root_or_die(), LLVM_MERGE + " merge "))
+ cmd = []
+ cmd.append(os.path.join(get_android_root_or_die(), LLVM_MERGE + " merge "))
- test_working_dir = os.path.join(WORKING_DIR, test_name);
- cmd.append(os.path.join(test_working_dir, test_name + ".profraw"))
- profdata_file = os.path.join(test_working_dir, test_name + ".profdata")
+ test_working_dir = os.path.join(WORKING_DIR, test_name)
+ cmd.append(os.path.join(test_working_dir, test_name + ".profraw"))
+ profdata_file = os.path.join(test_working_dir, test_name + ".profdata")
- cmd.append('-o ' + profdata_file)
- logging.info('Combining profraw files into profdata for ' + test_name)
- logging.debug('cmd: ' + " ".join(cmd))
- if subprocess.call(" ".join(cmd), shell=True) != 0:
- logging.error('Failed to merge profraw files for ' + test_name)
- sys.exit(1)
+ cmd.append('-o ' + profdata_file)
+ logging.info('Combining profraw files into profdata for ' + test_name)
+ logging.debug('cmd: ' + " ".join(cmd))
+ if subprocess.call(" ".join(cmd), shell=True) != 0:
+ logging.error('Failed to merge profraw files for ' + test_name)
+ sys.exit(1)
def generate_coverage_html(test):
- COVERAGE_ROOT = '/proc/self/cwd'
-
- test_name = test['test_name']
- file_list = test['covered_files']
-
- test_working_dir = os.path.join(WORKING_DIR, test_name)
- test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata")
-
- cmd = [
- os.path.join(get_android_root_or_die(), LLVM_COV),
- "show",
- "-format=html",
- "-summary-only",
- "-show-line-counts-or-regions",
- "-show-instantiation-summary",
- "-instr-profile=" + test_profdata_file,
- "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" +
- get_android_root_or_die() + "\"",
- "-output-dir=" + test_working_dir
- ]
+ COVERAGE_ROOT = '/proc/self/cwd'
+
+ test_name = test['test_name']
+ file_list = test['covered_files']
- # We have to have one object file not as an argument otherwise we can't specify source files.
- test_cmd = os.path.join(os.path.join(get_native_test_root_or_die(), test_name), test_name)
- cmd.append(test_cmd)
+ test_working_dir = os.path.join(WORKING_DIR, test_name)
+ test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata")
- # Filter out the specific files we want coverage for
- for filename in file_list:
- cmd.append(os.path.join(get_android_root_or_die(), filename))
+ cmd = [
+ os.path.join(get_android_root_or_die(), LLVM_COV), "show",
+ "-format=html", "-summary-only", "-show-line-counts-or-regions",
+ "-show-instantiation-summary", "-instr-profile=" + test_profdata_file,
+ "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" +
+ get_android_root_or_die() + "\"", "-output-dir=" + test_working_dir
+ ]
- logging.info('Generating coverage report for ' + test['test_name'])
- logging.debug('cmd: ' + " ".join(cmd))
- if subprocess.call(" ".join(cmd), shell=True) != 0:
- logging.error('Failed to generate coverage for ' + test['test_name'])
- sys.exit(1)
+ # We have to have one object file not as an argument otherwise we can't specify source files.
+ test_cmd = os.path.join(
+ os.path.join(get_native_test_root_or_die(), test_name), test_name)
+ cmd.append(test_cmd)
+ # Filter out the specific files we want coverage for
+ for filename in file_list:
+ cmd.append(os.path.join(get_android_root_or_die(), filename))
-def generate_coverage_json(test):
- COVERAGE_ROOT = '/proc/self/cwd'
- test_name = test['test_name']
- file_list = test['covered_files']
+ logging.info('Generating coverage report for ' + test['test_name'])
+ logging.debug('cmd: ' + " ".join(cmd))
+ if subprocess.call(" ".join(cmd), shell=True) != 0:
+ logging.error('Failed to generate coverage for ' + test['test_name'])
+ sys.exit(1)
- test_working_dir = os.path.join(WORKING_DIR, test_name)
- test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata")
- cmd = [
- os.path.join(get_android_root_or_die(), LLVM_COV),
- "export",
- "-summary-only",
- "-show-region-summary",
- "-instr-profile=" + test_profdata_file,
- "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" + get_android_root_or_die() + "\"",
- ]
+def generate_coverage_json(test):
+ COVERAGE_ROOT = '/proc/self/cwd'
+ test_name = test['test_name']
+ file_list = test['covered_files']
+
+ test_working_dir = os.path.join(WORKING_DIR, test_name)
+ test_profdata_file = os.path.join(test_working_dir, test_name + ".profdata")
+
+ cmd = [
+ os.path.join(get_android_root_or_die(), LLVM_COV),
+ "export",
+ "-summary-only",
+ "-show-region-summary",
+ "-instr-profile=" + test_profdata_file,
+ "-path-equivalence=\"" + COVERAGE_ROOT + "\",\"" +
+ get_android_root_or_die() + "\"",
+ ]
- test_cmd = os.path.join(os.path.join(get_native_test_root_or_die(), test_name), test_name)
- cmd.append(test_cmd)
+ test_cmd = os.path.join(
+ os.path.join(get_native_test_root_or_die(), test_name), test_name)
+ cmd.append(test_cmd)
- # Filter out the specific files we want coverage for
- for filename in file_list:
- cmd.append(os.path.join(get_android_root_or_die(), filename))
+ # Filter out the specific files we want coverage for
+ for filename in file_list:
+ cmd.append(os.path.join(get_android_root_or_die(), filename))
- logging.info('Generating coverage json for ' + test['test_name'])
- logging.debug('cmd: ' + " ".join(cmd))
+ logging.info('Generating coverage json for ' + test['test_name'])
+ logging.debug('cmd: ' + " ".join(cmd))
- json_str = subprocess.check_output(" ".join(cmd), shell=True)
- return json.loads(json_str)
+ json_str = subprocess.check_output(" ".join(cmd), shell=True)
+ return json.loads(json_str)
def write_json_summary(test):
- test_name = test['test_name']
- test_working_dir = os.path.join(WORKING_DIR, test_name)
- test_json_summary_file = os.path.join(test_working_dir, test_name + '.json')
- logging.debug('Writing json summary file: ' + test_json_summary_file)
- json_file = open(test_json_summary_file, 'w')
- json.dump(generate_coverage_json(test), json_file)
- json_file.close()
+ test_name = test['test_name']
+ test_working_dir = os.path.join(WORKING_DIR, test_name)
+ test_json_summary_file = os.path.join(test_working_dir, test_name + '.json')
+ logging.debug('Writing json summary file: ' + test_json_summary_file)
+ json_file = open(test_json_summary_file, 'w')
+ json.dump(generate_coverage_json(test), json_file)
+ json_file.close()
def list_tests():
- for test in COVERAGE_TESTS:
- print "Test Name: " + test['test_name']
- print "Covered Files: "
- for covered_file in test['covered_files']:
- print " " + covered_file
- print
+ for test in COVERAGE_TESTS:
+ print "Test Name: " + test['test_name']
+ print "Covered Files: "
+ for covered_file in test['covered_files']:
+ print " " + covered_file
+ print
def main():
- parser = argparse.ArgumentParser(description='Generate code coverage for enabled tests.')
- parser.add_argument(
- '-l', '--list-tests',
- action='store_true',
- dest='list_tests',
- help='List all the available tests to be run as well as covered files.')
- parser.add_argument(
- '-a', '--all',
- action='store_true',
- help='Runs all available tests and prints their outputs. If no tests ' \
- 'are specified via the -t option all tests will be run.')
- parser.add_argument(
- '-t', '--test',
- dest='tests',
- action='append',
- type=str,
- metavar='TESTNAME',
- default=[],
- help='Specifies a test to be run. Multiple tests can be specified by ' \
- 'using this option multiple times. ' \
- 'Example: \"gen_coverage.py -t test1 -t test2\"')
- parser.add_argument(
- '-o', '--output',
- type=str,
- metavar='DIRECTORY',
- default='/tmp/coverage',
- help='Specifies the directory to store all files. The directory will be ' \
- 'created if it does not exist. Default is \"/tmp/coverage\"')
- parser.add_argument(
- '-s', '--skip-html',
- dest='skip_html',
- action='store_true',
- help='Skip opening up the results of the coverage report in a browser.')
- parser.add_argument(
- '-j', '--json-file',
- dest='json_file',
- action='store_true',
- help='Write out summary results to json file in test directory.')
-
- logging.basicConfig(stream=sys.stderr, level=logging.DEBUG, format='%(levelname)s %(message)s')
- logging.addLevelName(logging.DEBUG, "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.DEBUG))
- logging.addLevelName(logging.INFO, "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.INFO))
- logging.addLevelName(logging.WARNING, "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.WARNING))
- logging.addLevelName(logging.ERROR, "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.ERROR))
-
- args = parser.parse_args()
- logging.debug("Args: " + str(args))
-
- # Set the working directory
- global WORKING_DIR
- WORKING_DIR = os.path.abspath(args.output)
- logging.debug("Working Dir: " + WORKING_DIR)
-
- # Print out the list of tests then exit
- if args.list_tests:
- list_tests()
- sys.exit(0)
-
- # Check to see if a test was specified and if so only generate coverage for
- # that test.
- if len(args.tests) == 0:
- args.all = True
-
- tests_to_run = []
- for test in COVERAGE_TESTS:
- if args.all or test['test_name'] in args.tests:
- tests_to_run.append(test)
- if test['test_name'] in args.tests:
- args.tests.remove(test['test_name'])
-
- # Error if a test was specified but doesn't exist.
- if len(args.tests) != 0:
- for test_name in args.tests:
- logging.error('\"' + test_name + '\" was not found in the list of available tests.')
- sys.exit(1)
-
- # Generate the info for the tests
- for test in tests_to_run:
- logging.info('Getting coverage for ' + test['test_name'])
- get_profraw_for_test(test['test_name'])
- merge_profraw_data(test['test_name'])
- if args.json_file:
- write_json_summary(test)
- generate_coverage_html(test)
-
- # Generate the root index.html page that sumarizes all of the coverage reports.
- generate_root_html(tests_to_run)
-
- # Open the results in a browser.
- if not args.skip_html:
- webbrowser.open('file://' + os.path.join(WORKING_DIR, 'index.html'))
+ parser = argparse.ArgumentParser(
+ description='Generate code coverage for enabled tests.')
+ parser.add_argument(
+ '-l',
+ '--list-tests',
+ action='store_true',
+ dest='list_tests',
+ help='List all the available tests to be run as well as covered files.')
+ parser.add_argument(
+ '-a', '--all',
+ action='store_true',
+ help='Runs all available tests and prints their outputs. If no tests ' \
+ 'are specified via the -t option all tests will be run.')
+ parser.add_argument(
+ '-t', '--test',
+ dest='tests',
+ action='append',
+ type=str,
+ metavar='TESTNAME',
+ default=[],
+ help='Specifies a test to be run. Multiple tests can be specified by ' \
+ 'using this option multiple times. ' \
+ 'Example: \"gen_coverage.py -t test1 -t test2\"')
+ parser.add_argument(
+ '-o', '--output',
+ type=str,
+ metavar='DIRECTORY',
+ default='/tmp/coverage',
+ help='Specifies the directory to store all files. The directory will be ' \
+ 'created if it does not exist. Default is \"/tmp/coverage\"')
+ parser.add_argument(
+ '-s',
+ '--skip-html',
+ dest='skip_html',
+ action='store_true',
+ help='Skip opening up the results of the coverage report in a browser.')
+ parser.add_argument(
+ '-j',
+ '--json-file',
+ dest='json_file',
+ action='store_true',
+ help='Write out summary results to json file in test directory.')
+
+ logging.basicConfig(
+ stream=sys.stderr,
+ level=logging.DEBUG,
+ format='%(levelname)s %(message)s')
+ logging.addLevelName(
+ logging.DEBUG,
+ "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.DEBUG))
+ logging.addLevelName(
+ logging.INFO,
+ "[\033[1;34m%s\033[0m]" % logging.getLevelName(logging.INFO))
+ logging.addLevelName(
+ logging.WARNING,
+ "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.WARNING))
+ logging.addLevelName(
+ logging.ERROR,
+ "[\033[1;31m%s\033[0m]" % logging.getLevelName(logging.ERROR))
+
+ args = parser.parse_args()
+ logging.debug("Args: " + str(args))
+
+ # Set the working directory
+ global WORKING_DIR
+ WORKING_DIR = os.path.abspath(args.output)
+ logging.debug("Working Dir: " + WORKING_DIR)
+
+ # Print out the list of tests then exit
+ if args.list_tests:
+ list_tests()
+ sys.exit(0)
+
+ # Check to see if a test was specified and if so only generate coverage for
+ # that test.
+ if len(args.tests) == 0:
+ args.all = True
+
+ tests_to_run = []
+ for test in COVERAGE_TESTS:
+ if args.all or test['test_name'] in args.tests:
+ tests_to_run.append(test)
+ if test['test_name'] in args.tests:
+ args.tests.remove(test['test_name'])
+
+ # Error if a test was specified but doesn't exist.
+ if len(args.tests) != 0:
+ for test_name in args.tests:
+ logging.error('\"' + test_name +
+ '\" was not found in the list of available tests.')
+ sys.exit(1)
+
+ # Generate the info for the tests
+ for test in tests_to_run:
+ logging.info('Getting coverage for ' + test['test_name'])
+ get_profraw_for_test(test['test_name'])
+ merge_profraw_data(test['test_name'])
+ if args.json_file:
+ write_json_summary(test)
+ generate_coverage_html(test)
+
+ # Generate the root index.html page that sumarizes all of the coverage reports.
+ generate_root_html(tests_to_run)
+
+ # Open the results in a browser.
+ if not args.skip_html:
+ webbrowser.open('file://' + os.path.join(WORKING_DIR, 'index.html'))
if __name__ == '__main__':
- main()
+ main()
diff --git a/system/test/run_host_unit_tests.py b/system/test/run_host_unit_tests.py
index 06074e51ee..4b9e4bf457 100755
--- a/system/test/run_host_unit_tests.py
+++ b/system/test/run_host_unit_tests.py
@@ -21,201 +21,201 @@ import argparse
# Registered host based unit tests
# Must have 'host_supported: true'
HOST_TESTS = [
- 'bluetooth_test_common',
- 'bluetoothtbd_test',
- 'net_test_avrcp',
- 'net_test_btcore',
- 'net_test_types',
- 'net_test_btpackets',
+ 'bluetooth_test_common',
+ 'bluetoothtbd_test',
+ 'net_test_avrcp',
+ 'net_test_btcore',
+ 'net_test_types',
+ 'net_test_btpackets',
]
SOONG_UI_BASH = 'build/soong/soong_ui.bash'
+
def str2bool(argument, default=False):
- """ Convert a string to a booleen value. """
- argument = str(argument)
- if argument.lower() in ['0', 'f', 'false', 'off', 'no', 'n']:
- return False
- elif argument.lower() in ['1', 't', 'true', 'on', 'yes', 'y']:
- return True
- return default
+ """ Convert a string to a booleen value. """
+ argument = str(argument)
+ if argument.lower() in ['0', 'f', 'false', 'off', 'no', 'n']:
+ return False
+ elif argument.lower() in ['1', 't', 'true', 'on', 'yes', 'y']:
+ return True
+ return default
def check_dir_exists(dir, dirname):
- if not os.path.isdir(dir):
- print "Couldn't find %s (%s)!" % (dirname, dir)
- sys.exit(0)
+ if not os.path.isdir(dir):
+ print "Couldn't find %s (%s)!" % (dirname, dir)
+ sys.exit(0)
def get_output_from_command(cmd):
- try:
- return subprocess.check_output(cmd).strip()
- except subprocess.CalledProcessError as e:
- print 'Failed to call {cmd}, return code {code}'.format(cmd=cmd,
- code=e.returncode)
- print e
- return None
+ try:
+ return subprocess.check_output(cmd).strip()
+ except subprocess.CalledProcessError as e:
+ print 'Failed to call {cmd}, return code {code}'.format(
+ cmd=cmd, code=e.returncode)
+ print e
+ return None
def get_android_root_or_die():
- value = os.environ.get('ANDROID_BUILD_TOP')
- if not value:
- # Try to find build/soong/soong_ui.bash upwards until root directory
- current_path = os.path.abspath(os.getcwd())
- while current_path and os.path.isdir(current_path):
- soong_ui_bash_path = os.path.join(current_path, SOONG_UI_BASH)
- if os.path.isfile(soong_ui_bash_path):
- # Use value returned from Soong UI instead in case definition to TOP
- # changes in the future
- value = get_output_from_command((soong_ui_bash_path,
- '--dumpvar-mode',
- '--abs',
- 'TOP'))
- break
- parent_path = os.path.abspath(os.path.join(current_path, os.pardir))
- if parent_path == current_path:
- current_path = None
- else:
- current_path = parent_path
+ value = os.environ.get('ANDROID_BUILD_TOP')
if not value:
- print 'Cannot determine ANDROID_BUILD_TOP'
- sys.exit(1)
- check_dir_exists(value, '$ANDROID_BUILD_TOP')
- return value
+ # Try to find build/soong/soong_ui.bash upwards until root directory
+ current_path = os.path.abspath(os.getcwd())
+ while current_path and os.path.isdir(current_path):
+ soong_ui_bash_path = os.path.join(current_path, SOONG_UI_BASH)
+ if os.path.isfile(soong_ui_bash_path):
+ # Use value returned from Soong UI instead in case definition to TOP
+ # changes in the future
+ value = get_output_from_command(
+ (soong_ui_bash_path, '--dumpvar-mode', '--abs', 'TOP'))
+ break
+ parent_path = os.path.abspath(os.path.join(current_path, os.pardir))
+ if parent_path == current_path:
+ current_path = None
+ else:
+ current_path = parent_path
+ if not value:
+ print 'Cannot determine ANDROID_BUILD_TOP'
+ sys.exit(1)
+ check_dir_exists(value, '$ANDROID_BUILD_TOP')
+ return value
def get_android_host_out_or_die():
- value = os.environ.get('ANDROID_HOST_OUT')
- if not value:
- ANDROID_BUILD_TOP = get_android_root_or_die()
- value = get_output_from_command((os.path.join(ANDROID_BUILD_TOP,
- SOONG_UI_BASH),
- '--dumpvar-mode',
- '--abs',
- 'HOST_OUT'))
+ value = os.environ.get('ANDROID_HOST_OUT')
if not value:
- print 'Cannot determine ANDROID_HOST_OUT'
- sys.exit(1)
- check_dir_exists(value, '$ANDROID_HOST_OUT')
- return value
+ ANDROID_BUILD_TOP = get_android_root_or_die()
+ value = get_output_from_command(
+ (os.path.join(ANDROID_BUILD_TOP, SOONG_UI_BASH), '--dumpvar-mode',
+ '--abs', 'HOST_OUT'))
+ if not value:
+ print 'Cannot determine ANDROID_HOST_OUT'
+ sys.exit(1)
+ check_dir_exists(value, '$ANDROID_HOST_OUT')
+ return value
def get_android_dist_dir_or_die():
- # Check if $DIST_DIR is predefined as environment variable
- value = os.environ.get('DIST_DIR')
- if not value:
- # If not use the default path
- ANDROID_BUILD_TOP = get_android_root_or_die()
- value = os.path.join(os.path.join(ANDROID_BUILD_TOP, 'out'), 'dist')
- if not os.path.isdir(value):
- if os.path.exists(value):
- print '%s is not a directory!' % (value)
- sys.exit(1)
- os.makedirs(value)
- return value
+ # Check if $DIST_DIR is predefined as environment variable
+ value = os.environ.get('DIST_DIR')
+ if not value:
+ # If not use the default path
+ ANDROID_BUILD_TOP = get_android_root_or_die()
+ value = os.path.join(os.path.join(ANDROID_BUILD_TOP, 'out'), 'dist')
+ if not os.path.isdir(value):
+ if os.path.exists(value):
+ print '%s is not a directory!' % (value)
+ sys.exit(1)
+ os.makedirs(value)
+ return value
def get_native_test_root_or_die():
- android_host_out = get_android_host_out_or_die()
- test_root = os.path.join(android_host_out, 'nativetest64')
- if not os.path.isdir(test_root):
- test_root = os.path.join(android_host_out, 'nativetest')
+ android_host_out = get_android_host_out_or_die()
+ test_root = os.path.join(android_host_out, 'nativetest64')
if not os.path.isdir(test_root):
- print 'Neither nativetest64 nor nativetest directory exist,' \
- ' please compile first'
- sys.exit(1)
- return test_root
+ test_root = os.path.join(android_host_out, 'nativetest')
+ if not os.path.isdir(test_root):
+ print 'Neither nativetest64 nor nativetest directory exist,' \
+ ' please compile first'
+ sys.exit(1)
+ return test_root
def get_test_cmd_or_die(test_root, test_name, enable_xml, test_filter):
- test_path = os.path.join(os.path.join(test_root, test_name), test_name)
- if not os.path.isfile(test_path):
- print 'Cannot find: ' + test_path
- sys.exit(1)
- cmd = [test_path]
- if enable_xml:
- dist_dir = get_android_dist_dir_or_die()
- log_output_path = os.path.join(dist_dir, 'gtest/{0}_test_details.xml'
- .format(test_name))
- cmd.append('--gtest_output=xml:{0}'.format(log_output_path))
- if test_filter:
- cmd.append('--gtest_filter=%s' % test_filter)
- return cmd
+ test_path = os.path.join(os.path.join(test_root, test_name), test_name)
+ if not os.path.isfile(test_path):
+ print 'Cannot find: ' + test_path
+ sys.exit(1)
+ cmd = [test_path]
+ if enable_xml:
+ dist_dir = get_android_dist_dir_or_die()
+ log_output_path = os.path.join(
+ dist_dir, 'gtest/{0}_test_details.xml'.format(test_name))
+ cmd.append('--gtest_output=xml:{0}'.format(log_output_path))
+ if test_filter:
+ cmd.append('--gtest_filter=%s' % test_filter)
+ return cmd
# path is relative to Android build top
def build_target(target, num_tasks):
- ANDROID_BUILD_TOP = get_android_root_or_die()
- build_cmd = [SOONG_UI_BASH, '--make-mode']
- if num_tasks > 1:
- build_cmd.append('-j' + str(num_tasks))
- build_cmd.append(target)
- p = subprocess.Popen(build_cmd, cwd=ANDROID_BUILD_TOP, env=os.environ.copy())
- return_code = p.wait()
- if return_code != 0:
- print 'BUILD FAILED, return code: {0}'.format(str(return_code))
- sys.exit(1)
- return
+ ANDROID_BUILD_TOP = get_android_root_or_die()
+ build_cmd = [SOONG_UI_BASH, '--make-mode']
+ if num_tasks > 1:
+ build_cmd.append('-j' + str(num_tasks))
+ build_cmd.append(target)
+ p = subprocess.Popen(
+ build_cmd, cwd=ANDROID_BUILD_TOP, env=os.environ.copy())
+ return_code = p.wait()
+ if return_code != 0:
+ print 'BUILD FAILED, return code: {0}'.format(str(return_code))
+ sys.exit(1)
+ return
def main():
- """ run_host_unit_tests.py - Run registered host based unit tests
+ """ run_host_unit_tests.py - Run registered host based unit tests
"""
- parser = argparse.ArgumentParser(description='Run host based unit tests.')
- parser.add_argument(
- '--enable_xml',
- type=str2bool,
- dest='enable_xml',
- nargs='?',
- const=True,
- default=False,
- help=
- 'Whether to output structured XML log output in out/dist/gtest directory')
- parser.add_argument(
- '-j',
- type=int,
- nargs='?',
- dest='num_tasks',
- const=-1,
- default=-1,
- help='Number of tasks to run at the same time')
- parser.add_argument(
- 'rest',
- nargs=argparse.REMAINDER,
- help='-- args, other gtest arguments for each individual test')
- args = parser.parse_args()
-
- build_target('MODULES-IN-system-bt', args.num_tasks)
- TEST_ROOT = get_native_test_root_or_die()
- test_results = []
- for test in HOST_TESTS:
- test_cmd = get_test_cmd_or_die(TEST_ROOT, test, args.enable_xml, args.rest)
- if subprocess.call(test_cmd) != 0:
- test_results.append(False)
- else:
- test_results.append(True)
- if not all(test_results):
- failures = [i for i, x in enumerate(test_results) if not x]
- for index in failures:
- print 'TEST FAILLED: ' + HOST_TESTS[index]
- sys.exit(0)
- print 'TEST PASSED ' + str(len(test_results)) + ' tests were run'
+ parser = argparse.ArgumentParser(description='Run host based unit tests.')
+ parser.add_argument(
+ '--enable_xml',
+ type=str2bool,
+ dest='enable_xml',
+ nargs='?',
+ const=True,
+ default=False,
+ help=
+ 'Whether to output structured XML log output in out/dist/gtest directory'
+ )
+ parser.add_argument(
+ '-j',
+ type=int,
+ nargs='?',
+ dest='num_tasks',
+ const=-1,
+ default=-1,
+ help='Number of tasks to run at the same time')
+ parser.add_argument(
+ 'rest',
+ nargs=argparse.REMAINDER,
+ help='-- args, other gtest arguments for each individual test')
+ args = parser.parse_args()
+
+ build_target('MODULES-IN-system-bt', args.num_tasks)
+ TEST_ROOT = get_native_test_root_or_die()
+ test_results = []
+ for test in HOST_TESTS:
+ test_cmd = get_test_cmd_or_die(TEST_ROOT, test, args.enable_xml,
+ args.rest)
+ if subprocess.call(test_cmd) != 0:
+ test_results.append(False)
+ else:
+ test_results.append(True)
+ if not all(test_results):
+ failures = [i for i, x in enumerate(test_results) if not x]
+ for index in failures:
+ print 'TEST FAILLED: ' + HOST_TESTS[index]
+ sys.exit(0)
+ print 'TEST PASSED ' + str(len(test_results)) + ' tests were run'
- dist_dir = get_android_dist_dir_or_die()
- log_output_path = os.path.join(dist_dir, 'gtest/coverage')
- cmd_path = os.path.join(get_android_root_or_die(), 'packages/modules/Bluetooth/system/test')
- print cmd_path
- cmd = [
- os.path.join(cmd_path, 'gen_coverage.py'),
- '--skip-html',
- '--json-file',
- '-o',
- log_output_path,
- ]
- subprocess.call(cmd)
+ dist_dir = get_android_dist_dir_or_die()
+ log_output_path = os.path.join(dist_dir, 'gtest/coverage')
+ cmd_path = os.path.join(get_android_root_or_die(), 'packages/modules/Bluetooth/system/test')
+ print cmd_path
+ cmd = [
+ os.path.join(cmd_path, 'gen_coverage.py'),
+ '--skip-html',
+ '--json-file',
+ '-o',
+ log_output_path,
+ ]
+ subprocess.call(cmd)
- sys.exit(0)
+ sys.exit(0)
if __name__ == '__main__':
- main()
+ main()
diff --git a/system/tools/scripts/btsnoop_live.py b/system/tools/scripts/btsnoop_live.py
index 4e145ffea3..00b45fdda1 100644
--- a/system/tools/scripts/btsnoop_live.py
+++ b/system/tools/scripts/btsnoop_live.py
@@ -67,19 +67,17 @@ from ctypes import byref, c_bool, c_longlong, CDLL
from _ctypes import FreeLibrary
from datetime import datetime
-
# Update below to right path corresponding to your machine, FTS version and OS used.
FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\'
FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\Executables\\Core\\'
-
iniName = 'liveimport.ini'
if (platform.architecture()[0] == '32bit'):
dllName = 'LiveImportAPI.dll'
else:
dllName = 'LiveImportAPI_x64.dll'
-launchFtsCmd = '\"'+FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"'
+launchFtsCmd = '\"' + FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"'
# Unix Epoch delta since 01/01/1970
FILETIME_EPOCH_DELTA = 116444736000000000
@@ -96,7 +94,8 @@ def get_file_time():
Obtain current time in file time format for display
"""
date_time = datetime.now()
- file_time = FILETIME_EPOCH_DELTA + (timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS)
+ file_time = FILETIME_EPOCH_DELTA + (
+ timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS)
file_time = file_time + (date_time.microsecond * 10)
return file_time
@@ -135,6 +134,7 @@ def get_configuration_string():
else:
return None
+
def check_live_import_connection(live_import):
"""
Launch FTS app in Virtual Sniffing Mode
@@ -146,7 +146,7 @@ def check_live_import_connection(live_import):
status = live_import.IsAppReady(byref(is_connection_running))
if (is_connection_running.value == True):
- print ("FTS is already launched, Start capture if not already started")
+ print("FTS is already launched, Start capture if not already started")
return True
print("Launching FTS Virtual Sniffing")
@@ -159,14 +159,14 @@ def check_live_import_connection(live_import):
while (is_connection_running.value == False and count < 12):
status = live_import.IsAppReady(byref(is_connection_running))
if (status < 0):
- print("Live Import Internal Error %d" %(status))
+ print("Live Import Internal Error %d" % (status))
return False
if (is_connection_running.value == False):
print("Waiting for 5 sec.. Open FTS Virtual Sniffing")
time.sleep(5)
count += 1
if (is_connection_running.value == True):
- print ("FTS is ready to receive the data, Start capture now")
+ print("FTS is ready to receive the data, Start capture now")
return True
else:
print("FTS Virtual Sniffing didn't start until 1 min.. exiting")
@@ -184,13 +184,13 @@ def init_live_import(conn_str, config_str):
return None
if live_import is None:
- print("Error: Path to LiveImportAPI.dll is incorrect.. exiting");
+ print("Error: Path to LiveImportAPI.dll is incorrect.. exiting")
return None
print(dllName + " loaded successfully")
- result = live_import.InitializeLiveImport(conn_str.encode('ascii', 'ignore'),
- config_str.encode('ascii', 'ignore'),
- byref(success))
+ result = live_import.InitializeLiveImport(
+ conn_str.encode('ascii', 'ignore'), config_str.encode(
+ 'ascii', 'ignore'), byref(success))
if (result < 0):
print("Live Import Init failed")
return None
@@ -244,7 +244,7 @@ def main():
while True:
try:
- snoop_hdr = btsnoop_sock.recv(SNOOP_HDR)
+ snoop_hdr = btsnoop_sock.recv(SNOOP_HDR)
if snoop_hdr is not None:
try:
olen, ilen, flags = struct.unpack(">LLL", snoop_hdr[0:12])
@@ -261,7 +261,8 @@ def main():
if data_frag is not None:
snoop_data += data_frag
- print ("Bytes received %d Olen %d ilen %d flags %d" % (len(snoop_data), olen, ilen, flags))
+ print("Bytes received %d Olen %d ilen %d flags %d" %
+ (len(snoop_data), olen, ilen, flags))
packet_type = struct.unpack(">B", snoop_data[0:1])[0]
if packet_type == 1:
drf = 1
@@ -282,7 +283,9 @@ def main():
drf = 8
isend = 1
- result = live_import.SendFrame(olen-1, olen-1, snoop_data[1:olen], drf, isend, timestamp)
+ result = live_import.SendFrame(olen - 1, olen - 1,
+ snoop_data[1:olen], drf, isend,
+ timestamp)
if (result < 0):
print("Send frame failed")
except KeyboardInterrupt:
@@ -294,4 +297,3 @@ def main():
if __name__ == '__main__':
main()
-
diff --git a/system/tools/scripts/btsnooz.py b/system/tools/scripts/btsnooz.py
index 6e0e11200b..a29e718d7b 100755
--- a/system/tools/scripts/btsnooz.py
+++ b/system/tools/scripts/btsnooz.py
@@ -21,14 +21,12 @@ where the file_header and record_header are modified versions of
the btsnoop headers.
"""
-
import base64
import fileinput
import struct
import sys
import zlib
-
# Enumeration of the values the 'type' field can take in a btsnooz
# header. These values come from the Bluetooth stack's internal
# representation of packet types.
@@ -41,127 +39,135 @@ TYPE_OUT_SCO = 0x22
def type_to_direction(type):
- """
+ """
Returns the inbound/outbound direction of a packet given its type.
0 = sent packet
1 = received packet
"""
- if type in [TYPE_IN_EVT, TYPE_IN_ACL, TYPE_IN_SCO]:
- return 1
- return 0
+ if type in [TYPE_IN_EVT, TYPE_IN_ACL, TYPE_IN_SCO]:
+ return 1
+ return 0
def type_to_hci(type):
- """
+ """
Returns the HCI type of a packet given its btsnooz type.
"""
- if type == TYPE_OUT_CMD:
- return '\x01'
- if type == TYPE_IN_ACL or type == TYPE_OUT_ACL:
- return '\x02'
- if type == TYPE_IN_SCO or type == TYPE_OUT_SCO:
- return '\x03'
- if type == TYPE_IN_EVT:
- return '\x04'
+ if type == TYPE_OUT_CMD:
+ return '\x01'
+ if type == TYPE_IN_ACL or type == TYPE_OUT_ACL:
+ return '\x02'
+ if type == TYPE_IN_SCO or type == TYPE_OUT_SCO:
+ return '\x03'
+ if type == TYPE_IN_EVT:
+ return '\x04'
def decode_snooz(snooz):
- """
+ """
Decodes all known versions of a btsnooz file into a btsnoop file.
"""
- version, last_timestamp_ms = struct.unpack_from('=bQ', snooz)
+ version, last_timestamp_ms = struct.unpack_from('=bQ', snooz)
- if version != 1 and version != 2:
- sys.stderr.write('Unsupported btsnooz version: %s\n' % version)
- exit(1)
+ if version != 1 and version != 2:
+ sys.stderr.write('Unsupported btsnooz version: %s\n' % version)
+ exit(1)
- # Oddly, the file header (9 bytes) is not compressed, but the rest is.
- decompressed = zlib.decompress(snooz[9:])
+ # Oddly, the file header (9 bytes) is not compressed, but the rest is.
+ decompressed = zlib.decompress(snooz[9:])
- sys.stdout.write('btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea')
+ sys.stdout.write('btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea')
- if version == 1:
- decode_snooz_v1(decompressed, last_timestamp_ms)
- elif version == 2:
- decode_snooz_v2(decompressed, last_timestamp_ms)
+ if version == 1:
+ decode_snooz_v1(decompressed, last_timestamp_ms)
+ elif version == 2:
+ decode_snooz_v2(decompressed, last_timestamp_ms)
def decode_snooz_v1(decompressed, last_timestamp_ms):
- """
+ """
Decodes btsnooz v1 files into a btsnoop file.
"""
- # An unfortunate consequence of the file format design: we have to do a
- # pass of the entire file to determine the timestamp of the first packet.
- first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000
- offset = 0
- while offset < len(decompressed):
- length, delta_time_ms, type = struct.unpack_from('=HIb', decompressed, offset)
- offset += 7 + length - 1
- first_timestamp_ms -= delta_time_ms
-
- # Second pass does the actual writing out to stdout.
- offset = 0
- while offset < len(decompressed):
- length, delta_time_ms, type = struct.unpack_from('=HIb', decompressed, offset)
- first_timestamp_ms += delta_time_ms
- offset += 7
- sys.stdout.write(struct.pack('>II', length, length))
- sys.stdout.write(struct.pack('>II', type_to_direction(type), 0))
- sys.stdout.write(struct.pack('>II', (first_timestamp_ms >> 32), (first_timestamp_ms & 0xFFFFFFFF)))
- sys.stdout.write(type_to_hci(type))
- sys.stdout.write(decompressed[offset : offset + length - 1])
- offset += length - 1
+ # An unfortunate consequence of the file format design: we have to do a
+ # pass of the entire file to determine the timestamp of the first packet.
+ first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000
+ offset = 0
+ while offset < len(decompressed):
+ length, delta_time_ms, type = struct.unpack_from(
+ '=HIb', decompressed, offset)
+ offset += 7 + length - 1
+ first_timestamp_ms -= delta_time_ms
+
+ # Second pass does the actual writing out to stdout.
+ offset = 0
+ while offset < len(decompressed):
+ length, delta_time_ms, type = struct.unpack_from(
+ '=HIb', decompressed, offset)
+ first_timestamp_ms += delta_time_ms
+ offset += 7
+ sys.stdout.write(struct.pack('>II', length, length))
+ sys.stdout.write(struct.pack('>II', type_to_direction(type), 0))
+ sys.stdout.write(
+ struct.pack('>II', (first_timestamp_ms >> 32),
+ (first_timestamp_ms & 0xFFFFFFFF)))
+ sys.stdout.write(type_to_hci(type))
+ sys.stdout.write(decompressed[offset:offset + length - 1])
+ offset += length - 1
def decode_snooz_v2(decompressed, last_timestamp_ms):
- """
+ """
Decodes btsnooz v2 files into a btsnoop file.
"""
- # An unfortunate consequence of the file format design: we have to do a
- # pass of the entire file to determine the timestamp of the first packet.
- first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000
- offset = 0
- while offset < len(decompressed):
- length, packet_length, delta_time_ms, snooz_type = struct.unpack_from('=HHIb', decompressed, offset)
- offset += 9 + length - 1
- first_timestamp_ms -= delta_time_ms
-
- # Second pass does the actual writing out to stdout.
- offset = 0
- while offset < len(decompressed):
- length, packet_length, delta_time_ms, snooz_type = struct.unpack_from('=HHIb', decompressed, offset)
- first_timestamp_ms += delta_time_ms
- offset += 9
- sys.stdout.write(struct.pack('>II', packet_length, length))
- sys.stdout.write(struct.pack('>II', type_to_direction(snooz_type), 0))
- sys.stdout.write(struct.pack('>II', (first_timestamp_ms >> 32), (first_timestamp_ms & 0xFFFFFFFF)))
- sys.stdout.write(type_to_hci(snooz_type))
- sys.stdout.write(decompressed[offset : offset + length - 1])
- offset += length - 1
+ # An unfortunate consequence of the file format design: we have to do a
+ # pass of the entire file to determine the timestamp of the first packet.
+ first_timestamp_ms = last_timestamp_ms + 0x00dcddb30f2f8000
+ offset = 0
+ while offset < len(decompressed):
+ length, packet_length, delta_time_ms, snooz_type = struct.unpack_from(
+ '=HHIb', decompressed, offset)
+ offset += 9 + length - 1
+ first_timestamp_ms -= delta_time_ms
+
+ # Second pass does the actual writing out to stdout.
+ offset = 0
+ while offset < len(decompressed):
+ length, packet_length, delta_time_ms, snooz_type = struct.unpack_from(
+ '=HHIb', decompressed, offset)
+ first_timestamp_ms += delta_time_ms
+ offset += 9
+ sys.stdout.write(struct.pack('>II', packet_length, length))
+ sys.stdout.write(struct.pack('>II', type_to_direction(snooz_type), 0))
+ sys.stdout.write(
+ struct.pack('>II', (first_timestamp_ms >> 32),
+ (first_timestamp_ms & 0xFFFFFFFF)))
+ sys.stdout.write(type_to_hci(snooz_type))
+ sys.stdout.write(decompressed[offset:offset + length - 1])
+ offset += length - 1
def main():
- if len(sys.argv) > 2:
- sys.stderr.write('Usage: %s [bugreport]\n' % sys.argv[0])
- exit(1)
+ if len(sys.argv) > 2:
+ sys.stderr.write('Usage: %s [bugreport]\n' % sys.argv[0])
+ exit(1)
- iterator = fileinput.input()
- found = False
- base64_string = ""
- for line in iterator:
- if found:
- if line.find('--- END:BTSNOOP_LOG_SUMMARY') != -1:
- decode_snooz(base64.standard_b64decode(base64_string))
- sys.exit(0)
- base64_string += line.strip()
+ iterator = fileinput.input()
+ found = False
+ base64_string = ""
+ for line in iterator:
+ if found:
+ if line.find('--- END:BTSNOOP_LOG_SUMMARY') != -1:
+ decode_snooz(base64.standard_b64decode(base64_string))
+ sys.exit(0)
+ base64_string += line.strip()
- if line.find('--- BEGIN:BTSNOOP_LOG_SUMMARY') != -1:
- found = True
+ if line.find('--- BEGIN:BTSNOOP_LOG_SUMMARY') != -1:
+ found = True
- if not found:
- sys.stderr.write('No btsnooz section found in bugreport.\n')
- sys.exit(1)
+ if not found:
+ sys.stderr.write('No btsnooz section found in bugreport.\n')
+ sys.exit(1)
if __name__ == '__main__':
- main()
+ main()
diff --git a/system/tools/scripts/dump_hearingaid_audio.py b/system/tools/scripts/dump_hearingaid_audio.py
index ca2eed2027..81cd78d98e 100755
--- a/system/tools/scripts/dump_hearingaid_audio.py
+++ b/system/tools/scripts/dump_hearingaid_audio.py
@@ -54,18 +54,15 @@ DEBUG_DATA = "DEBUG_DATA"
AUDIO_DATA_B = "AUDIO_DATA_B"
# Debug packet header struct
-header_list_str = ["Event Processed",
- "Number Packet Nacked By Slave",
- "Number Packet Nacked By Master"]
+header_list_str = [
+ "Event Processed", "Number Packet Nacked By Slave",
+ "Number Packet Nacked By Master"
+]
# Debug frame information structs
-data_list_str = ["Event Number",
- "Overrun",
- "Underrun",
- "Skips",
- "Rendered Audio Frame",
- "First PDU Option",
- "Second PDU Option",
- "Third PDU Option"]
+data_list_str = [
+ "Event Number", "Overrun", "Underrun", "Skips", "Rendered Audio Frame",
+ "First PDU Option", "Second PDU Option", "Third PDU Option"
+]
AUDIO_CONTROL_POINT_UUID = "f0d4de7e4a88476c9d9f1937b0996cc0"
SEC_CONVERT = 1000000
@@ -86,172 +83,184 @@ audio_data = {}
# Parse Hearing Aid Packet
#-----------------------------------------------------------------------
+
def parse_acl_ha_debug_buffer(data, result):
- """This function extracts HA debug buffer"""
- if len(data) < 5:
- return
-
- version, data = unpack_data(data, 1)
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_VERSION, str(version))
-
- debug_str = result[TIMESTAMP_TIME_FORMAT];
- for p in range(3):
- byte_data, data = unpack_data(data, 1)
- debug_str = debug_str + ", " + header_list_str[p] + "=" + str(byte_data).rjust(3)
-
- if full_debug:
- debug_str = debug_str + "\n" + "|".join(data_list_str) + "\n"
- while True:
- if len(data) < 7:
- break
- base = 0
- data_list_content = []
- for counter in range(6):
- p = base + counter
+ """This function extracts HA debug buffer"""
+ if len(data) < 5:
+ return
+
+ version, data = unpack_data(data, 1)
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
+ DEBUG_VERSION, str(version))
+
+ debug_str = result[TIMESTAMP_TIME_FORMAT]
+ for p in range(3):
byte_data, data = unpack_data(data, 1)
- if p == 1:
- data_list_content.append(str(byte_data & 0x03).rjust(len(data_list_str[p])))
- data_list_content.append(str((byte_data >> 2) & 0x03).rjust(len(data_list_str[p + 1])))
- data_list_content.append(str((byte_data >> 4) & 0x0f).rjust(len(data_list_str[p + 2])))
- base = 2
- else:
- data_list_content.append(str(byte_data).rjust(len(data_list_str[p])))
- debug_str = debug_str + "|".join(data_list_content) + "\n"
+ debug_str = debug_str + ", " + header_list_str[p] + "=" + str(
+ byte_data).rjust(3)
+
+ if full_debug:
+ debug_str = debug_str + "\n" + "|".join(data_list_str) + "\n"
+ while True:
+ if len(data) < 7:
+ break
+ base = 0
+ data_list_content = []
+ for counter in range(6):
+ p = base + counter
+ byte_data, data = unpack_data(data, 1)
+ if p == 1:
+ data_list_content.append(
+ str(byte_data & 0x03).rjust(len(data_list_str[p])))
+ data_list_content.append(
+ str((byte_data >> 2) & 0x03).rjust(
+ len(data_list_str[p + 1])))
+ data_list_content.append(
+ str((byte_data >> 4) & 0x0f).rjust(
+ len(data_list_str[p + 2])))
+ base = 2
+ else:
+ data_list_content.append(
+ str(byte_data).rjust(len(data_list_str[p])))
+ debug_str = debug_str + "|".join(data_list_content) + "\n"
+
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_DATA,
+ debug_str)
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_DATA, debug_str)
def parse_acl_ha_audio_data(data, result):
- """This function extracts HA audio data."""
- if len(data) < 2:
- return
- # Remove audio packet number
- audio_data_b = data[1:]
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- AUDIO_DATA_B, audio_data_b)
+ """This function extracts HA audio data."""
+ if len(data) < 2:
+ return
+ # Remove audio packet number
+ audio_data_b = data[1:]
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
+ AUDIO_DATA_B, audio_data_b)
def parse_acl_ha_audio_type(data, result):
- """This function parses HA audio control cmd audio type."""
- audio_type, data = unpack_data(data, 1)
- if audio_type is None:
- return
- elif audio_type == 0x01:
- audio_type = "Ringtone"
- elif audio_type == 0x02:
- audio_type = "Phonecall"
- elif audio_type == 0x03:
- audio_type = "Media"
- else:
- audio_type = "Unknown"
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- AUDIO_TYPE, audio_type)
+ """This function parses HA audio control cmd audio type."""
+ audio_type, data = unpack_data(data, 1)
+ if audio_type is None:
+ return
+ elif audio_type == 0x01:
+ audio_type = "Ringtone"
+ elif audio_type == 0x02:
+ audio_type = "Phonecall"
+ elif audio_type == 0x03:
+ audio_type = "Media"
+ else:
+ audio_type = "Unknown"
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], AUDIO_TYPE,
+ audio_type)
def parse_acl_ha_codec(data, result):
- """This function parses HA audio control cmd codec and sample rate."""
- codec, data = unpack_data(data, 1)
- if codec == 0x01:
- codec = "G722"
- sample_rate = "16KHZ"
- elif codec == 0x02:
- codec = "G722"
- sample_rate = "24KHZ"
- else:
- codec = "Unknown"
- sample_rate = "Unknown"
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- CODEC, codec)
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- SAMPLE_RATE, sample_rate)
- parse_acl_ha_audio_type(data, result)
+ """This function parses HA audio control cmd codec and sample rate."""
+ codec, data = unpack_data(data, 1)
+ if codec == 0x01:
+ codec = "G722"
+ sample_rate = "16KHZ"
+ elif codec == 0x02:
+ codec = "G722"
+ sample_rate = "24KHZ"
+ else:
+ codec = "Unknown"
+ sample_rate = "Unknown"
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], CODEC,
+ codec)
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], SAMPLE_RATE,
+ sample_rate)
+ parse_acl_ha_audio_type(data, result)
def parse_acl_ha_audio_control_cmd(data, result):
- """This function parses HA audio control cmd is start/stop."""
- control_cmd, data = unpack_data(data, 1)
- if control_cmd == 0x01:
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- START, True)
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- TIMESTAMP_STR_FORMAT, result[TIMESTAMP_STR_FORMAT])
- parse_acl_ha_codec(data, result)
- elif control_cmd == 0x02:
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- START, False)
+ """This function parses HA audio control cmd is start/stop."""
+ control_cmd, data = unpack_data(data, 1)
+ if control_cmd == 0x01:
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], START,
+ True)
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
+ TIMESTAMP_STR_FORMAT, result[TIMESTAMP_STR_FORMAT])
+ parse_acl_ha_codec(data, result)
+ elif control_cmd == 0x02:
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], START,
+ False)
#-----------------------------------------------------------------------
# Parse ACL Packet
#-----------------------------------------------------------------------
+
def parse_acl_att_long_uuid(data, result):
- """This function parses ATT long UUID to get attr_handle."""
- # len (1 byte) + start_attr_handle (2 bytes) + properties (1 byte) +
- # attr_handle (2 bytes) + long_uuid (16 bytes) = 22 bytes
- if len(data) < 22:
- return
- # skip unpack len, start_attr_handle, properties.
- data = data[4:]
- attr_handle, data = unpack_data(data, 2)
- long_uuid_list = []
- for p in range(0, 16):
- long_uuid_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0]))
- long_uuid_list.reverse()
- long_uuid = "".join(long_uuid_list)
- # Check long_uuid is AUDIO_CONTROL_POINT uuid to get the attr_handle.
- if long_uuid == AUDIO_CONTROL_POINT_UUID:
- update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
- AUDIO_CONTROL_ATTR_HANDLE, attr_handle)
+ """This function parses ATT long UUID to get attr_handle."""
+ # len (1 byte) + start_attr_handle (2 bytes) + properties (1 byte) +
+ # attr_handle (2 bytes) + long_uuid (16 bytes) = 22 bytes
+ if len(data) < 22:
+ return
+ # skip unpack len, start_attr_handle, properties.
+ data = data[4:]
+ attr_handle, data = unpack_data(data, 2)
+ long_uuid_list = []
+ for p in range(0, 16):
+ long_uuid_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0]))
+ long_uuid_list.reverse()
+ long_uuid = "".join(long_uuid_list)
+ # Check long_uuid is AUDIO_CONTROL_POINT uuid to get the attr_handle.
+ if long_uuid == AUDIO_CONTROL_POINT_UUID:
+ update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE],
+ AUDIO_CONTROL_ATTR_HANDLE, attr_handle)
def parse_acl_opcode(data, result):
- """This function parses acl data opcode."""
- # opcode (1 byte) = 1 bytes
- if len(data) < 1:
- return
- opcode, data = unpack_data(data, 1)
- # Check opcode is 0x12 (write request) and attr_handle is
- # audio_control_attr_handle for check it is HA audio control cmd.
- if result[IS_SENT] and opcode == 0x12:
- if len(data) < 2:
- return
- attr_handle, data = unpack_data(data, 2)
- if attr_handle == \
- get_audio_control_attr_handle(result[CONNECTION_HANDLE]):
- parse_acl_ha_audio_control_cmd(data, result)
- # Check opcode is 0x09 (read response) to parse ATT long UUID.
- elif not result[IS_SENT] and opcode == 0x09:
- parse_acl_att_long_uuid(data, result)
+ """This function parses acl data opcode."""
+ # opcode (1 byte) = 1 bytes
+ if len(data) < 1:
+ return
+ opcode, data = unpack_data(data, 1)
+ # Check opcode is 0x12 (write request) and attr_handle is
+ # audio_control_attr_handle for check it is HA audio control cmd.
+ if result[IS_SENT] and opcode == 0x12:
+ if len(data) < 2:
+ return
+ attr_handle, data = unpack_data(data, 2)
+ if attr_handle == \
+ get_audio_control_attr_handle(result[CONNECTION_HANDLE]):
+ parse_acl_ha_audio_control_cmd(data, result)
+ # Check opcode is 0x09 (read response) to parse ATT long UUID.
+ elif not result[IS_SENT] and opcode == 0x09:
+ parse_acl_att_long_uuid(data, result)
def parse_acl_handle(data, result):
- """This function parses acl data handle."""
- # connection_handle (2 bytes) + total_len (2 bytes) + pdu (2 bytes)
- # + channel_id (2 bytes) = 8 bytes
- if len(data) < 8:
- return
- connection_handle, data = unpack_data(data, 2)
- connection_handle = connection_handle & 0x0FFF
- # skip unpack total_len
- data = data[2:]
- pdu, data = unpack_data(data, 2)
- channel_id, data = unpack_data(data, 2)
-
- # Check ATT packet or "Coc Data Packet" to get ATT information and audio
- # data.
- if connection_handle <= 0x0EFF:
- if channel_id <= 0x003F:
- result[CONNECTION_HANDLE] = connection_handle
- parse_acl_opcode(data, result)
- elif channel_id >= 0x0040 and channel_id <= 0x007F:
- result[CONNECTION_HANDLE] = connection_handle
- sdu, data = unpack_data(data, 2)
- if pdu - 2 == sdu:
- if result[IS_SENT]:
- parse_acl_ha_audio_data(data, result)
- else:
- if simple_debug:
- parse_acl_ha_debug_buffer(data, result)
+ """This function parses acl data handle."""
+ # connection_handle (2 bytes) + total_len (2 bytes) + pdu (2 bytes)
+ # + channel_id (2 bytes) = 8 bytes
+ if len(data) < 8:
+ return
+ connection_handle, data = unpack_data(data, 2)
+ connection_handle = connection_handle & 0x0FFF
+ # skip unpack total_len
+ data = data[2:]
+ pdu, data = unpack_data(data, 2)
+ channel_id, data = unpack_data(data, 2)
+
+ # Check ATT packet or "Coc Data Packet" to get ATT information and audio
+ # data.
+ if connection_handle <= 0x0EFF:
+ if channel_id <= 0x003F:
+ result[CONNECTION_HANDLE] = connection_handle
+ parse_acl_opcode(data, result)
+ elif channel_id >= 0x0040 and channel_id <= 0x007F:
+ result[CONNECTION_HANDLE] = connection_handle
+ sdu, data = unpack_data(data, 2)
+ if pdu - 2 == sdu:
+ if result[IS_SENT]:
+ parse_acl_ha_audio_data(data, result)
+ else:
+ if simple_debug:
+ parse_acl_ha_debug_buffer(data, result)
#=======================================================================
@@ -260,48 +269,48 @@ def parse_acl_handle(data, result):
def parse_hci_evt_peer_address(data, result):
- """This function parses peer address from hci event."""
- peer_address_list = []
- address_empty_list = ["00", "00", "00", "00", "00", "00"]
- for n in range(0, 3):
- if len(data) < 6:
- return
- for p in range(0, 6):
- peer_address_list.append("{0:02x}".format(struct.unpack(">B",
- data[p])[0]))
- # Check the address is empty or not.
- if peer_address_list == address_empty_list:
- del peer_address_list[:]
- data = data[6:]
- else:
- break
- peer_address_list.reverse()
- peer_address = "_".join(peer_address_list)
- update_audio_data("", "", PEER_ADDRESS, peer_address)
- update_audio_data(PEER_ADDRESS, peer_address, CONNECTION_HANDLE,
- result[CONNECTION_HANDLE])
+ """This function parses peer address from hci event."""
+ peer_address_list = []
+ address_empty_list = ["00", "00", "00", "00", "00", "00"]
+ for n in range(0, 3):
+ if len(data) < 6:
+ return
+ for p in range(0, 6):
+ peer_address_list.append("{0:02x}".format(
+ struct.unpack(">B", data[p])[0]))
+ # Check the address is empty or not.
+ if peer_address_list == address_empty_list:
+ del peer_address_list[:]
+ data = data[6:]
+ else:
+ break
+ peer_address_list.reverse()
+ peer_address = "_".join(peer_address_list)
+ update_audio_data("", "", PEER_ADDRESS, peer_address)
+ update_audio_data(PEER_ADDRESS, peer_address, CONNECTION_HANDLE,
+ result[CONNECTION_HANDLE])
def parse_hci_evt_code(data, result):
- """This function parses hci event content."""
- # hci_evt (1 byte) + param_total_len (1 byte) + sub_event (1 byte)
- # + status (1 byte) + connection_handle (2 bytes) + role (1 byte)
- # + address_type (1 byte) = 8 bytes
- if len(data) < 8:
- return
- hci_evt, data = unpack_data(data, 1)
- # skip unpack param_total_len.
- data = data[1:]
- sub_event, data = unpack_data(data, 1)
- status, data = unpack_data(data, 1)
- connection_handle, data = unpack_data(data, 2)
- connection_handle = connection_handle & 0x0FFF
- # skip unpack role, address_type.
- data = data[2:]
- # We will directly check it is LE Enhanced Connection Complete or not
- # for get Connection Handle and Address.
- if not result[IS_SENT] and hci_evt == 0x3E and sub_event == 0x0A \
- and status == 0x00 and connection_handle <= 0x0EFF:
+ """This function parses hci event content."""
+ # hci_evt (1 byte) + param_total_len (1 byte) + sub_event (1 byte)
+ # + status (1 byte) + connection_handle (2 bytes) + role (1 byte)
+ # + address_type (1 byte) = 8 bytes
+ if len(data) < 8:
+ return
+ hci_evt, data = unpack_data(data, 1)
+ # skip unpack param_total_len.
+ data = data[1:]
+ sub_event, data = unpack_data(data, 1)
+ status, data = unpack_data(data, 1)
+ connection_handle, data = unpack_data(data, 2)
+ connection_handle = connection_handle & 0x0FFF
+ # skip unpack role, address_type.
+ data = data[2:]
+ # We will directly check it is LE Enhanced Connection Complete or not
+ # for get Connection Handle and Address.
+ if not result[IS_SENT] and hci_evt == 0x3E and sub_event == 0x0A \
+ and status == 0x00 and connection_handle <= 0x0EFF:
result[CONNECTION_HANDLE] = connection_handle
parse_hci_evt_peer_address(data, result)
@@ -312,41 +321,42 @@ def parse_hci_evt_code(data, result):
def parse_packet_data(data, result):
- """This function parses packet type."""
- packet_type, data = unpack_data(data, 1)
- if packet_type == 0x02:
- # Try to check HearingAid audio control packet and data packet.
- parse_acl_handle(data, result)
- elif packet_type == 0x04:
- # Try to check HearingAid connection successful packet.
- parse_hci_evt_code(data, result)
+ """This function parses packet type."""
+ packet_type, data = unpack_data(data, 1)
+ if packet_type == 0x02:
+ # Try to check HearingAid audio control packet and data packet.
+ parse_acl_handle(data, result)
+ elif packet_type == 0x04:
+ # Try to check HearingAid connection successful packet.
+ parse_hci_evt_code(data, result)
def parse_packet(btsnoop_file):
- """This function parses packet len, timestamp."""
- packet_result = {}
-
- # ori_len (4 bytes) + include_len (4 bytes) + packet_flag (4 bytes)
- # + drop (4 bytes) + timestamp (8 bytes) = 24 bytes
- packet_header = btsnoop_file.read(24)
- if len(packet_header) != 24:
- return False
-
- ori_len, include_len, packet_flag, drop, timestamp = \
- struct.unpack(">IIIIq", packet_header)
-
- if ori_len == include_len:
- packet_data = btsnoop_file.read(ori_len)
- if len(packet_data) != ori_len:
- return False
- if packet_flag != 2 and drop == 0:
- packet_result[IS_SENT] = (packet_flag == 0)
- packet_result[TIMESTAMP_STR_FORMAT], packet_result[TIMESTAMP_TIME_FORMAT] = convert_time_str(timestamp)
- parse_packet_data(packet_data, packet_result)
- else:
- return False
+ """This function parses packet len, timestamp."""
+ packet_result = {}
+
+ # ori_len (4 bytes) + include_len (4 bytes) + packet_flag (4 bytes)
+ # + drop (4 bytes) + timestamp (8 bytes) = 24 bytes
+ packet_header = btsnoop_file.read(24)
+ if len(packet_header) != 24:
+ return False
+
+ ori_len, include_len, packet_flag, drop, timestamp = \
+ struct.unpack(">IIIIq", packet_header)
+
+ if ori_len == include_len:
+ packet_data = btsnoop_file.read(ori_len)
+ if len(packet_data) != ori_len:
+ return False
+ if packet_flag != 2 and drop == 0:
+ packet_result[IS_SENT] = (packet_flag == 0)
+ packet_result[TIMESTAMP_STR_FORMAT], packet_result[
+ TIMESTAMP_TIME_FORMAT] = convert_time_str(timestamp)
+ parse_packet_data(packet_data, packet_result)
+ else:
+ return False
- return True
+ return True
#=======================================================================
@@ -355,49 +365,54 @@ def parse_packet(btsnoop_file):
def dump_audio_data(data):
- """This function dumps audio data into file."""
- file_type = "." + data[CODEC]
- file_name_list = []
- file_name_list.append(data[PEER_ADDRESS])
- file_name_list.append(data[TIMESTAMP_STR_FORMAT])
- file_name_list.append(data[AUDIO_TYPE])
- file_name_list.append(data[SAMPLE_RATE])
- if folder is not None:
- if not os.path.exists(folder):
- os.makedirs(folder)
- audio_file_name = os.path.join(folder, "-".join(file_name_list) + file_type)
- if data.has_key(DEBUG_VERSION):
- file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
- debug_file_name = os.path.join(folder, file_prefix + "-".join(file_name_list) + ".txt")
- else:
- audio_file_name = "-".join(file_name_list) + file_type
- if data.has_key(DEBUG_VERSION):
- file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
- debug_file_name = file_prefix + "-".join(file_name_list) + ".txt"
-
- sys.stdout.write("Start to dump Audio File : %s\n" % audio_file_name)
- if data.has_key(AUDIO_DATA_B):
- with open(audio_file_name, "wb+") as audio_file:
- audio_file.write(data[AUDIO_DATA_B])
- sys.stdout.write("Finished to dump Audio File: %s\n\n" % audio_file_name)
- else:
- sys.stdout.write("Fail to dump Audio File: %s\n" % audio_file_name)
- sys.stdout.write("There isn't any Hearing Aid audio data.\n\n")
-
- if simple_debug:
- sys.stdout.write("Start to dump audio %s Debug File\n" % audio_file_name)
- if data.has_key(DEBUG_DATA):
- with open(debug_file_name, "wb+") as debug_file:
- debug_file.write(data[DEBUG_DATA])
- sys.stdout.write("Finished to dump Debug File: %s\n\n" % debug_file_name)
+ """This function dumps audio data into file."""
+ file_type = "." + data[CODEC]
+ file_name_list = []
+ file_name_list.append(data[PEER_ADDRESS])
+ file_name_list.append(data[TIMESTAMP_STR_FORMAT])
+ file_name_list.append(data[AUDIO_TYPE])
+ file_name_list.append(data[SAMPLE_RATE])
+ if folder is not None:
+ if not os.path.exists(folder):
+ os.makedirs(folder)
+ audio_file_name = os.path.join(folder,
+ "-".join(file_name_list) + file_type)
+ if data.has_key(DEBUG_VERSION):
+ file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
+ debug_file_name = os.path.join(
+ folder, file_prefix + "-".join(file_name_list) + ".txt")
else:
- sys.stdout.write("Fail to dump audio %s Debug File\n" % audio_file_name)
- sys.stdout.write("There isn't any Hearing Aid debug data.\n\n")
-
+ audio_file_name = "-".join(file_name_list) + file_type
+ if data.has_key(DEBUG_VERSION):
+ file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
+ debug_file_name = file_prefix + "-".join(file_name_list) + ".txt"
+
+ sys.stdout.write("Start to dump Audio File : %s\n" % audio_file_name)
+ if data.has_key(AUDIO_DATA_B):
+ with open(audio_file_name, "wb+") as audio_file:
+ audio_file.write(data[AUDIO_DATA_B])
+ sys.stdout.write(
+ "Finished to dump Audio File: %s\n\n" % audio_file_name)
+ else:
+ sys.stdout.write("Fail to dump Audio File: %s\n" % audio_file_name)
+ sys.stdout.write("There isn't any Hearing Aid audio data.\n\n")
+
+ if simple_debug:
+ sys.stdout.write(
+ "Start to dump audio %s Debug File\n" % audio_file_name)
+ if data.has_key(DEBUG_DATA):
+ with open(debug_file_name, "wb+") as debug_file:
+ debug_file.write(data[DEBUG_DATA])
+ sys.stdout.write(
+ "Finished to dump Debug File: %s\n\n" % debug_file_name)
+ else:
+ sys.stdout.write(
+ "Fail to dump audio %s Debug File\n" % audio_file_name)
+ sys.stdout.write("There isn't any Hearing Aid debug data.\n\n")
def update_audio_data(relate_key, relate_value, key, value):
- """
+ """
This function records the dump audio file related information.
audio_data = {
PEER_ADDRESS:{
@@ -428,44 +443,44 @@ def update_audio_data(relate_key, relate_value, key, value):
}
}
"""
- if key == PEER_ADDRESS:
- if audio_data.has_key(value):
- # Dump audio data and clear previous data.
- update_audio_data(key, value, START, False)
- # Extra clear CONNECTION_HANDLE due to new connection create.
- if audio_data[value].has_key(CONNECTION_HANDLE):
- audio_data[value].pop(CONNECTION_HANDLE, "")
- else:
- device_audio_data = {key: value}
- temp_audio_data = {value: device_audio_data}
- audio_data.update(temp_audio_data)
- else:
- for i in audio_data:
- if audio_data[i].has_key(relate_key) \
- and audio_data[i][relate_key] == relate_value:
- if key == START:
- if audio_data[i].has_key(key) and audio_data[i][key]:
- dump_audio_data(audio_data[i])
- # Clear data except PEER_ADDRESS, CONNECTION_HANDLE and
- # AUDIO_CONTROL_ATTR_HANDLE.
- audio_data[i].pop(key, "")
- audio_data[i].pop(TIMESTAMP_STR_FORMAT, "")
- audio_data[i].pop(CODEC, "")
- audio_data[i].pop(SAMPLE_RATE, "")
- audio_data[i].pop(AUDIO_TYPE, "")
- audio_data[i].pop(DEBUG_VERSION, "")
- audio_data[i].pop(DEBUG_DATA, "")
- audio_data[i].pop(AUDIO_DATA_B, "")
- elif key == AUDIO_DATA_B or key == DEBUG_DATA:
- if audio_data[i].has_key(START) and audio_data[i][START]:
- if audio_data[i].has_key(key):
- ori_data = audio_data[i].pop(key, "")
- value = ori_data + value
- else:
- # Audio doesn't start, don't record.
- return
+ if key == PEER_ADDRESS:
+ if audio_data.has_key(value):
+ # Dump audio data and clear previous data.
+ update_audio_data(key, value, START, False)
+ # Extra clear CONNECTION_HANDLE due to new connection create.
+ if audio_data[value].has_key(CONNECTION_HANDLE):
+ audio_data[value].pop(CONNECTION_HANDLE, "")
+ else:
device_audio_data = {key: value}
- audio_data[i].update(device_audio_data)
+ temp_audio_data = {value: device_audio_data}
+ audio_data.update(temp_audio_data)
+ else:
+ for i in audio_data:
+ if audio_data[i].has_key(relate_key) \
+ and audio_data[i][relate_key] == relate_value:
+ if key == START:
+ if audio_data[i].has_key(key) and audio_data[i][key]:
+ dump_audio_data(audio_data[i])
+ # Clear data except PEER_ADDRESS, CONNECTION_HANDLE and
+ # AUDIO_CONTROL_ATTR_HANDLE.
+ audio_data[i].pop(key, "")
+ audio_data[i].pop(TIMESTAMP_STR_FORMAT, "")
+ audio_data[i].pop(CODEC, "")
+ audio_data[i].pop(SAMPLE_RATE, "")
+ audio_data[i].pop(AUDIO_TYPE, "")
+ audio_data[i].pop(DEBUG_VERSION, "")
+ audio_data[i].pop(DEBUG_DATA, "")
+ audio_data[i].pop(AUDIO_DATA_B, "")
+ elif key == AUDIO_DATA_B or key == DEBUG_DATA:
+ if audio_data[i].has_key(START) and audio_data[i][START]:
+ if audio_data[i].has_key(key):
+ ori_data = audio_data[i].pop(key, "")
+ value = ori_data + value
+ else:
+ # Audio doesn't start, don't record.
+ return
+ device_audio_data = {key: value}
+ audio_data[i].update(device_audio_data)
#=======================================================================
@@ -474,159 +489,188 @@ def update_audio_data(relate_key, relate_value, key, value):
def get_audio_control_attr_handle(connection_handle):
- """This function gets audio_control_attr_handle."""
- # If force_audio_control_attr_handle is set, will use it first.
- if force_audio_control_attr_handle is not None:
- return force_audio_control_attr_handle
+ """This function gets audio_control_attr_handle."""
+ # If force_audio_control_attr_handle is set, will use it first.
+ if force_audio_control_attr_handle is not None:
+ return force_audio_control_attr_handle
- # Try to check the audio_control_attr_handle is record into audio_data.
- for i in audio_data:
- if audio_data[i].has_key(CONNECTION_HANDLE) \
- and audio_data[i][CONNECTION_HANDLE] == connection_handle:
- if audio_data[i].has_key(AUDIO_CONTROL_ATTR_HANDLE):
- return audio_data[i][AUDIO_CONTROL_ATTR_HANDLE]
+ # Try to check the audio_control_attr_handle is record into audio_data.
+ for i in audio_data:
+ if audio_data[i].has_key(CONNECTION_HANDLE) \
+ and audio_data[i][CONNECTION_HANDLE] == connection_handle:
+ if audio_data[i].has_key(AUDIO_CONTROL_ATTR_HANDLE):
+ return audio_data[i][AUDIO_CONTROL_ATTR_HANDLE]
- # Return default attr_handle if audio_data doesn't record it.
- return default_audio_control_attr_handle
+ # Return default attr_handle if audio_data doesn't record it.
+ return default_audio_control_attr_handle
def unpack_data(data, byte):
- """This function unpacks data."""
- if byte == 1:
- value = struct.unpack(">B", data[0])[0]
- elif byte == 2:
- value = struct.unpack(">H", data[1]+data[0])[0]
- else:
- value = ""
- data = data[byte:]
- return value, data
+ """This function unpacks data."""
+ if byte == 1:
+ value = struct.unpack(">B", data[0])[0]
+ elif byte == 2:
+ value = struct.unpack(">H", data[1] + data[0])[0]
+ else:
+ value = ""
+ data = data[byte:]
+ return value, data
def convert_time_str(timestamp):
- """This function converts time to string format."""
- really_timestamp = float(timestamp) / SEC_CONVERT
- local_timestamp = time.localtime(really_timestamp)
- dt = really_timestamp - long(really_timestamp)
- ms_str = "{0:06}".format(int(round(dt * 1000000)))
+ """This function converts time to string format."""
+ really_timestamp = float(timestamp) / SEC_CONVERT
+ local_timestamp = time.localtime(really_timestamp)
+ dt = really_timestamp - long(really_timestamp)
+ ms_str = "{0:06}".format(int(round(dt * 1000000)))
- str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
- full_str_format = str_format + "_" + ms_str
+ str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
+ full_str_format = str_format + "_" + ms_str
- time_format = time.strftime("%m-%d %H:%M:%S", local_timestamp)
- full_time_format = time_format + "." + ms_str
- return full_str_format, full_time_format
+ time_format = time.strftime("%m-%d %H:%M:%S", local_timestamp)
+ full_time_format = time_format + "." + ms_str
+ return full_str_format, full_time_format
def set_config():
- """This function is for set config by flag and check the argv is correct."""
- argv_parser = argparse.ArgumentParser(
- description="Extracts Hearing Aid audio data from BTSNOOP.")
- argv_parser.add_argument("BTSNOOP", help="BLUETOOTH BTSNOOP file.")
- argv_parser.add_argument("-f", "--folder", help="select output folder.",
- dest="folder")
- argv_parser.add_argument("-c1", "--connection-handle1",
- help="set a fake connection handle 1 to capture \
- audio dump.", dest="connection_handle1", type=int)
- argv_parser.add_argument("-c2", "--connection-handle2",
- help="set a fake connection handle 2 to capture \
- audio dump.", dest="connection_handle2", type=int)
- argv_parser.add_argument("-ns", "--no-start", help="No audio 'Start' cmd is \
+ """This function is for set config by flag and check the argv is correct."""
+ argv_parser = argparse.ArgumentParser(
+ description="Extracts Hearing Aid audio data from BTSNOOP.")
+ argv_parser.add_argument("BTSNOOP", help="BLUETOOTH BTSNOOP file.")
+ argv_parser.add_argument(
+ "-f", "--folder", help="select output folder.", dest="folder")
+ argv_parser.add_argument(
+ "-c1",
+ "--connection-handle1",
+ help="set a fake connection handle 1 to capture \
+ audio dump.",
+ dest="connection_handle1",
+ type=int)
+ argv_parser.add_argument(
+ "-c2",
+ "--connection-handle2",
+ help="set a fake connection handle 2 to capture \
+ audio dump.",
+ dest="connection_handle2",
+ type=int)
+ argv_parser.add_argument(
+ "-ns",
+ "--no-start",
+ help="No audio 'Start' cmd is \
needed before extracting audio data.",
- dest="no_start", default="False")
- argv_parser.add_argument("-dc", "--default-codec", help="set a default \
- codec.", dest="codec", default="G722")
- argv_parser.add_argument("-a", "--attr-handle",
- help="force to select audio control attr handle.",
- dest="audio_control_attr_handle", type=int)
- argv_parser.add_argument("-d", "--debug",
- help="dump full debug buffer content.",
- dest="full_debug", default="False")
- argv_parser.add_argument("-sd", "--simple-debug",
- help="dump debug buffer header content.",
- dest="simple_debug", default="False")
- arg = argv_parser.parse_args()
-
- if arg.folder is not None:
- global folder
- folder = arg.folder
-
- if arg.connection_handle1 is not None and arg.connection_handle2 is not None \
- and arg.connection_handle1 == arg.connection_handle2:
+ dest="no_start",
+ default="False")
+ argv_parser.add_argument(
+ "-dc",
+ "--default-codec",
+ help="set a default \
+ codec.",
+ dest="codec",
+ default="G722")
+ argv_parser.add_argument(
+ "-a",
+ "--attr-handle",
+ help="force to select audio control attr handle.",
+ dest="audio_control_attr_handle",
+ type=int)
+ argv_parser.add_argument(
+ "-d",
+ "--debug",
+ help="dump full debug buffer content.",
+ dest="full_debug",
+ default="False")
+ argv_parser.add_argument(
+ "-sd",
+ "--simple-debug",
+ help="dump debug buffer header content.",
+ dest="simple_debug",
+ default="False")
+ arg = argv_parser.parse_args()
+
+ if arg.folder is not None:
+ global folder
+ folder = arg.folder
+
+ if arg.connection_handle1 is not None and arg.connection_handle2 is not None \
+ and arg.connection_handle1 == arg.connection_handle2:
argv_parser.error("connection_handle1 can't be same with \
connection_handle2")
exit(1)
- if not (arg.no_start.lower() == "true" or arg.no_start.lower() == "false"):
- argv_parser.error("-ns/--no-start arg is invalid, it should be true/false.")
- exit(1)
-
- if arg.connection_handle1 is not None:
- fake_name = "ConnectionHandle" + str(arg.connection_handle1)
- update_audio_data("", "", PEER_ADDRESS, fake_name)
- update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE,
- arg.connection_handle1)
- if arg.no_start.lower() == "true":
- update_audio_data(PEER_ADDRESS, fake_name, START, True)
- update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown")
- update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
- update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
- update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
-
- if arg.connection_handle2 is not None:
- fake_name = "ConnectionHandle" + str(arg.connection_handle2)
- update_audio_data("", "", PEER_ADDRESS, fake_name)
- update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE,
- arg.connection_handle2)
- if arg.no_start.lower() == "true":
- update_audio_data(PEER_ADDRESS, fake_name, START, True)
- update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown")
- update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
- update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
- update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
-
- if arg.audio_control_attr_handle is not None:
- global force_audio_control_attr_handle
- force_audio_control_attr_handle = arg.audio_control_attr_handle
-
- global full_debug
- global simple_debug
- if arg.full_debug.lower() == "true":
- full_debug = True
- simple_debug = True
- elif arg.simple_debug.lower() == "true":
- simple_debug = True
-
- if os.path.isfile(arg.BTSNOOP):
- return arg.BTSNOOP
- else:
- argv_parser.error("BTSNOOP file not found: %s" % arg.BTSNOOP)
- exit(1)
-
-
-def main():
- btsnoop_file_name = set_config()
-
- with open(btsnoop_file_name, "rb") as btsnoop_file:
- identification = btsnoop_file.read(8)
- if identification != "btsnoop\0":
- sys.stderr.write(
- "Check identification fail. It is not correct btsnoop file.")
- exit(1)
+ if not (arg.no_start.lower() == "true" or arg.no_start.lower() == "false"):
+ argv_parser.error(
+ "-ns/--no-start arg is invalid, it should be true/false.")
+ exit(1)
- ver, data_link = struct.unpack(">II", btsnoop_file.read(4 + 4))
- if (ver != 1) or (data_link != 1002):
- sys.stderr.write(
- "Check ver or dataLink fail. It is not correct btsnoop file.")
- exit(1)
+ if arg.connection_handle1 is not None:
+ fake_name = "ConnectionHandle" + str(arg.connection_handle1)
+ update_audio_data("", "", PEER_ADDRESS, fake_name)
+ update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE,
+ arg.connection_handle1)
+ if arg.no_start.lower() == "true":
+ update_audio_data(PEER_ADDRESS, fake_name, START, True)
+ update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT,
+ "Unknown")
+ update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
+ update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
+ update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
+
+ if arg.connection_handle2 is not None:
+ fake_name = "ConnectionHandle" + str(arg.connection_handle2)
+ update_audio_data("", "", PEER_ADDRESS, fake_name)
+ update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE,
+ arg.connection_handle2)
+ if arg.no_start.lower() == "true":
+ update_audio_data(PEER_ADDRESS, fake_name, START, True)
+ update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT,
+ "Unknown")
+ update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
+ update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
+ update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
+
+ if arg.audio_control_attr_handle is not None:
+ global force_audio_control_attr_handle
+ force_audio_control_attr_handle = arg.audio_control_attr_handle
+
+ global full_debug
+ global simple_debug
+ if arg.full_debug.lower() == "true":
+ full_debug = True
+ simple_debug = True
+ elif arg.simple_debug.lower() == "true":
+ simple_debug = True
+
+ if os.path.isfile(arg.BTSNOOP):
+ return arg.BTSNOOP
+ else:
+ argv_parser.error("BTSNOOP file not found: %s" % arg.BTSNOOP)
+ exit(1)
- while True:
- if not parse_packet(btsnoop_file):
- break
- for i in audio_data:
- if audio_data[i].get(START, False):
- dump_audio_data(audio_data[i])
+def main():
+ btsnoop_file_name = set_config()
+
+ with open(btsnoop_file_name, "rb") as btsnoop_file:
+ identification = btsnoop_file.read(8)
+ if identification != "btsnoop\0":
+ sys.stderr.write(
+ "Check identification fail. It is not correct btsnoop file.")
+ exit(1)
+
+ ver, data_link = struct.unpack(">II", btsnoop_file.read(4 + 4))
+ if (ver != 1) or (data_link != 1002):
+ sys.stderr.write(
+ "Check ver or dataLink fail. It is not correct btsnoop file.")
+ exit(1)
+
+ while True:
+ if not parse_packet(btsnoop_file):
+ break
+
+ for i in audio_data:
+ if audio_data[i].get(START, False):
+ dump_audio_data(audio_data[i])
if __name__ == "__main__":
- main()
+ main()
diff --git a/system/tools/scripts/dump_metrics_ascii.py b/system/tools/scripts/dump_metrics_ascii.py
index c24d29995f..20d413285b 100755
--- a/system/tools/scripts/dump_metrics_ascii.py
+++ b/system/tools/scripts/dump_metrics_ascii.py
@@ -23,6 +23,7 @@ from distutils.spawn import find_executable
import google.protobuf.text_format as text_format
from importlib import import_module
+
def compile_proto(proto_path, output_dir):
"""Invoke Protocol Compiler to generate python from given source .proto."""
# Find compiler path
@@ -48,15 +49,15 @@ def compile_proto(proto_path, output_dir):
if not os.path.exists(output_dir):
os.mkdirs(output_dir)
elif not os.path.isdir(output_dir):
- logging.error("Output path is not a valid directory: %s" %
- (output_dir))
+ logging.error("Output path is not a valid directory: %s" % (output_dir))
return None
input_dir = os.path.dirname(proto_path)
output_filename = os.path.basename(proto_path).replace('.proto', '_pb2.py')
output_path = os.path.join(output_dir, output_filename)
protoc_command = [
- protoc, '-I=%s' % (input_dir), '--python_out=%s' % (output_dir),
- proto_path
+ protoc,
+ '-I=%s' % (input_dir),
+ '--python_out=%s' % (output_dir), proto_path
]
if subprocess.call(protoc_command, stderr=subprocess.STDOUT) != 0:
logging.error("Fail to compile proto")
@@ -81,8 +82,8 @@ def compile_import_proto(output_dir, proto_path):
try:
output_module = import_module(output_module_name)
except ImportError:
- logging.error("Cannot import generated py-proto %s" %
- (output_module_name))
+ logging.error(
+ "Cannot import generated py-proto %s" % (output_module_name))
return output_module
@@ -94,26 +95,31 @@ def parse_proto_to_ascii(binary_proto_msg):
"""
return text_format.MessageToString(binary_proto_msg)
+
def dump_metrics():
os.system('adb wait-for-device')
- p = subprocess.Popen("adb shell dumpsys bluetooth_manager --proto-bin",
- shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ p = subprocess.Popen(
+ "adb shell dumpsys bluetooth_manager --proto-bin",
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
return p.communicate()
+
def get_bluetooth_metrics(proto_native_str_64, bluetooth_proto_module):
bluetooth_log = bluetooth_proto_module.BluetoothLog()
proto_native_str = base64.b64decode(proto_native_str_64)
bluetooth_log.MergeFromString(proto_native_str)
return bluetooth_log
+
def main():
root = logging.getLogger()
root.setLevel(logging.DEBUG)
log_handler = logging.StreamHandler(sys.stderr)
log_handler.setLevel(logging.DEBUG)
- formatter = logging.Formatter(
- "%(asctime)s %(levelname)s %(message)s")
+ formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
log_handler.setFormatter(formatter)
root.addHandler(log_handler)
if len(sys.argv) < 2:
@@ -125,7 +131,7 @@ def main():
logging.info("Requires Protobuf compiler, protoc, version >=3.0.0")
sys.exit(0)
bluetooth_proto_module = compile_import_proto(tempfile.gettempdir(),
- sys.argv[1])
+ sys.argv[1])
if not bluetooth_proto_module:
logging.error("Cannot compile " + sys.argv[1])
sys.exit(1)
@@ -136,5 +142,6 @@ def main():
bluetooth_log_ascii = parse_proto_to_ascii(bluetooth_log)
print(bluetooth_log_ascii)
+
if __name__ == "__main__":
main()
diff --git a/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py b/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py
index 0c425af935..6833eeb8f5 100644
--- a/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py
+++ b/system/vendor_libs/test_vendor_lib/scripts/hci_socket.py
@@ -73,31 +73,31 @@ from scapy.all import *
class HCI_Cmd_Create_Connection(Packet):
- name = 'Create Connection'
- fields_desc = [
- LEMACField('addr', None),
- LEShortField('packet_type', 8),
- ByteEnumField('page_scan_repetition_mode', 0, {
- 0: 'R0',
- 1: 'R1',
- 2: 'R2'
- }),
- ByteEnumField('rsvd', 0, {0: 'Reserved'}),
- LEShortField('clock_offset', 0),
- ByteEnumField('allow_role_switch', 1, {
- 0: 'false',
- 1: 'true'
- }),
- ]
+ name = 'Create Connection'
+ fields_desc = [
+ LEMACField('addr', None),
+ LEShortField('packet_type', 8),
+ ByteEnumField('page_scan_repetition_mode', 0, {
+ 0: 'R0',
+ 1: 'R1',
+ 2: 'R2'
+ }),
+ ByteEnumField('rsvd', 0, {0: 'Reserved'}),
+ LEShortField('clock_offset', 0),
+ ByteEnumField('allow_role_switch', 1, {
+ 0: 'false',
+ 1: 'true'
+ }),
+ ]
class HCI_Cmd_Inquiry(Packet):
- name = 'Inquiry'
- fields_desc = [
- X3BytesField('LAP', 0x9e8b0),
- ByteField('length', 1),
- ByteField('max_responses', 0),
- ]
+ name = 'Inquiry'
+ fields_desc = [
+ X3BytesField('LAP', 0x9e8b0),
+ ByteField('length', 1),
+ ByteField('max_responses', 0),
+ ]
bind_layers(HCI_Command_Hdr, HCI_Cmd_Inquiry, opcode=0x0401)
@@ -105,96 +105,96 @@ bind_layers(HCI_Command_Hdr, HCI_Cmd_Create_Connection, opcode=0x0405)
class HCI_Event_Inquiry_Result(Packet):
- name = 'Inquiry Result'
- fields_desc = [
- ByteField('num_responses', 0),
- LEMACField('addr', None),
- ByteEnumField('page_scan_repetition_mode', 0, {
- 0: 'R0',
- 1: 'R1',
- 2: 'R2'
- }),
- LEShortEnumField('rsvd', 0, {0: 'Reserved'}),
- X3BytesField('class_of_device', 0),
- LEShortField('clock_offset', 0),
- ]
+ name = 'Inquiry Result'
+ fields_desc = [
+ ByteField('num_responses', 0),
+ LEMACField('addr', None),
+ ByteEnumField('page_scan_repetition_mode', 0, {
+ 0: 'R0',
+ 1: 'R1',
+ 2: 'R2'
+ }),
+ LEShortEnumField('rsvd', 0, {0: 'Reserved'}),
+ X3BytesField('class_of_device', 0),
+ LEShortField('clock_offset', 0),
+ ]
class HCI_Event_Connection_Complete(Packet):
- name = 'Connection Complete'
- fields_desc = [
- ByteField('status', 0),
- LEShortField('handle', 0xffff),
- LEMACField('addr', None),
- ByteField('link_type', 1),
- ByteField('encryption_mode', 0),
- ]
+ name = 'Connection Complete'
+ fields_desc = [
+ ByteField('status', 0),
+ LEShortField('handle', 0xffff),
+ LEMACField('addr', None),
+ ByteField('link_type', 1),
+ ByteField('encryption_mode', 0),
+ ]
class HCI_Event_Remote_Name_Request_Complete(Packet):
- name = 'Remote Name Request Complete'
- fields_desc = [
- ByteField('status', 0),
- LEMACField('addr', None),
- ]
+ name = 'Remote Name Request Complete'
+ fields_desc = [
+ ByteField('status', 0),
+ LEMACField('addr', None),
+ ]
class HCI_Event_Read_Remote_Supported_Features_Complete(Packet):
- name = 'Read Remote Supported Features Complete'
- fields_desc = [
- ByteField('status', 0),
- LEShortField('handle', 0xffff),
- XLELongField('features', 0x0123456789abcdef),
- ]
+ name = 'Read Remote Supported Features Complete'
+ fields_desc = [
+ ByteField('status', 0),
+ LEShortField('handle', 0xffff),
+ XLELongField('features', 0x0123456789abcdef),
+ ]
class HCI_Event_Read_Remote_Version_Information_Complete(Packet):
- name = 'Read Remote Version Information Complete'
- fields_desc = [
- ByteField('status', 0),
- LEShortField('handle', 0xffff),
- ByteField('version', 0),
- LEShortField('manufacturer_name', 0),
- LEShortField('subversion', 0),
- ]
+ name = 'Read Remote Version Information Complete'
+ fields_desc = [
+ ByteField('status', 0),
+ LEShortField('handle', 0xffff),
+ ByteField('version', 0),
+ LEShortField('manufacturer_name', 0),
+ LEShortField('subversion', 0),
+ ]
class HCI_Event_Read_Clock_Offset_Complete(Packet):
- name = 'Read Clock Offset Complete'
- fields_desc = [
- ByteField('status', 0),
- LEShortField('handle', 0xffff),
- LEShortField('offset', 0xffff),
- ]
+ name = 'Read Clock Offset Complete'
+ fields_desc = [
+ ByteField('status', 0),
+ LEShortField('handle', 0xffff),
+ LEShortField('offset', 0xffff),
+ ]
class HCI_Event_Read_Remote_Extended_Features_Complete(Packet):
- name = 'Read Remote Supported Features Complete'
- fields_desc = [
- ByteField('status', 0),
- LEShortField('handle', 0xffff),
- ByteField('page_number', 0),
- ByteField('max_page_number', 0),
- XLELongField('features', 0x0123456789abcdef),
- ]
+ name = 'Read Remote Supported Features Complete'
+ fields_desc = [
+ ByteField('status', 0),
+ LEShortField('handle', 0xffff),
+ ByteField('page_number', 0),
+ ByteField('max_page_number', 0),
+ XLELongField('features', 0x0123456789abcdef),
+ ]
class HCI_Event_Extended_Inquiry_Result(Packet):
- name = 'Extended Inquiry Result'
- fields_desc = [
- ByteField('num_responses', 1),
- LEMACField('addr', None),
- ByteEnumField('page_scan_repetition_mode', 0, {
- 0: 'R0',
- 1: 'R1',
- 2: 'R2'
- }),
- ByteEnumField('rsvd', 0, {0: 'Reserved'}),
- X3BytesField('class_of_device', 0),
- LEShortField('clock_offset', 0),
- SignedByteField('rssi', -20),
- PacketListField('extended_inquiry_response', [], EIR_Hdr, 1),
- ]
+ name = 'Extended Inquiry Result'
+ fields_desc = [
+ ByteField('num_responses', 1),
+ LEMACField('addr', None),
+ ByteEnumField('page_scan_repetition_mode', 0, {
+ 0: 'R0',
+ 1: 'R1',
+ 2: 'R2'
+ }),
+ ByteEnumField('rsvd', 0, {0: 'Reserved'}),
+ X3BytesField('class_of_device', 0),
+ LEShortField('clock_offset', 0),
+ SignedByteField('rssi', -20),
+ PacketListField('extended_inquiry_response', [], EIR_Hdr, 1),
+ ]
bind_layers(HCI_Event_Hdr, HCI_Event_Inquiry_Result, code=0x02)
@@ -214,228 +214,236 @@ bind_layers(HCI_Event_Hdr, HCI_Event_Extended_Inquiry_Result, code=0x2f)
class HCISocket(SuperSocket):
- """Simple wrapper class for a socket object.
+ """Simple wrapper class for a socket object.
Attributes:
socket: The underlying socket created for the specified address and port.
"""
- def __init__(self, port):
- self.done_ = False
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', port))
- self.ins = self.outs = s
- self.packets_ = queue.Queue()
- self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
- self.rx_thread_.start()
-
- def rx_bytes(self, size):
- while not self.done_:
- raw_bytes = b''
- while len(raw_bytes) < size and not self.done_:
- more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048))
- if more_raw_bytes:
- raw_bytes += more_raw_bytes
- return raw_bytes
-
- def rx_thread_body(self):
- while not self.done_:
- payload_length = 0
- # Read the type
- type_byte = self.rx_bytes(1)
- if not type_byte:
- continue
- # Read the Header
- header = b''
- if type_byte == b'\x01': # Command
- header = self.rx_bytes(3)
- if not header:
- continue
- payload_length = header[2]
- elif type_byte == b'\x02': # ACL
- header = self.rx_bytes(4)
- if not header:
- continue
- payload_length = header[3] << 8
- payload_length |= header[2]
- elif type_byte == b'\x03': # SCO
- header = self.rx_bytes(3)
- if not header:
- continue
- payload_length = header[2]
- elif type_byte == b'\x04': # Event
- header = self.rx_bytes(2)
- if not header:
- continue
- payload_length = header[1]
- else:
+ def __init__(self, port):
+ self.done_ = False
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(('localhost', port))
+ self.ins = self.outs = s
+ self.packets_ = queue.Queue()
+ self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
+ self.rx_thread_.start()
+
+ def rx_bytes(self, size):
+ while not self.done_:
+ raw_bytes = b''
+ while len(raw_bytes) < size and not self.done_:
+ more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048))
+ if more_raw_bytes:
+ raw_bytes += more_raw_bytes
+ return raw_bytes
+
+ def rx_thread_body(self):
+ while not self.done_:
+ payload_length = 0
+ # Read the type
+ type_byte = self.rx_bytes(1)
+ if not type_byte:
+ continue
+ # Read the Header
+ header = b''
+ if type_byte == b'\x01': # Command
+ header = self.rx_bytes(3)
+ if not header:
+ continue
+ payload_length = header[2]
+ elif type_byte == b'\x02': # ACL
+ header = self.rx_bytes(4)
+ if not header:
+ continue
+ payload_length = header[3] << 8
+ payload_length |= header[2]
+ elif type_byte == b'\x03': # SCO
+ header = self.rx_bytes(3)
+ if not header:
+ continue
+ payload_length = header[2]
+ elif type_byte == b'\x04': # Event
+ header = self.rx_bytes(2)
+ if not header:
+ continue
+ payload_length = header[1]
+ else:
+ self.done_ = True
+ print('Rx: type_byte ' + hex(type_byte[0]))
+ # Read the Payload
+ payload = self.rx_bytes(
+ payload_length) if payload_length != 0 else b''
+ packet_bytes = type_byte + header + payload
+ packet = HCI_Hdr(packet_bytes)
+ print('Rx: ' + packet.__repr__())
+ self.packets_.put(packet)
+
+ def get_packet(self):
+ if self.packets_.empty():
+ return False
+ return self.packets_.get()
+
+ def tell_rx_thread_to_quit(self):
self.done_ = True
- print('Rx: type_byte ' + hex(type_byte[0]))
- # Read the Payload
- payload = self.rx_bytes(payload_length) if payload_length != 0 else b''
- packet_bytes = type_byte + header + payload
- packet = HCI_Hdr(packet_bytes)
- print('Rx: ' + packet.__repr__())
- self.packets_.put(packet)
-
- def get_packet(self):
- if self.packets_.empty():
- return False
- return self.packets_.get()
-
- def tell_rx_thread_to_quit(self):
- self.done_ = True
- self.rx_thread_.join()
+ self.rx_thread_.join()
class HCIShell(cmd.Cmd):
- """Shell for sending binary data to a port.
+ """Shell for sending binary data to a port.
"""
- def __init__(self, hci):
- cmd.Cmd.__init__(self)
- self._hci = hci
+ def __init__(self, hci):
+ cmd.Cmd.__init__(self)
+ self._hci = hci
- def do_send(self, args):
- """Arguments: dev_type_str Add a new device of type dev_type_str.
+ def do_send(self, args):
+ """Arguments: dev_type_str Add a new device of type dev_type_str.
"""
- self._hci.send_binary(args.split())
+ self._hci.send_binary(args.split())
- def do_connect(self, args):
- """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds)
+ def do_connect(self, args):
+ """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds)
"""
- split_args = args.split()
- address = split_args[0] if len(split_args) > 0 else 'NULL'
- timeout = int(split_args[1]) if len(split_args) > 1 else 2
- num_responses = 0
- connect = HCI_Hdr(type='Command') / HCI_Command_Hdr(
- opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address)
- self._hci.send(connect)
- status = None
- while status == None:
- response = self._hci.get_packet()
- if response == False:
- continue
- if response[HCI_Hdr].type == HCI_Hdr(
- type='Event'
- ).type and response[HCI_Event_Hdr].code == 0xf and response[
- HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode:
- status = response[HCI_Event_Command_Status].status
- if status != HCI_Event_Command_Status(status='pending').status:
- print('Connection failed with status = ' + str(status))
- return
-
- handle = None
- while handle == None:
- connection_complete = self._hci.get_packet()
- if connection_complete == False:
- continue
- if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type) and (
- connection_complete[HCI_Event_Hdr].code == 0x3):
- status = connection_complete[HCI_Event_Connection_Complete].status
- if status != 0:
- print('Connection complete with failed status = ' + str(status))
- return
- handle = connection_complete[HCI_Event_Connection_Complete].handle
- print('Connection established with handle ' + str(handle))
- connection_complete.show()
- hexdump(connection_complete)
-
- l2cap_done = False
- while l2cap_done == None:
- l2cap_req = self._hci.get_packet()
- if l2cap_req == False:
- continue
- if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and (
- l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr(cid='control').cid) and (
- l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req').code
- ) and (l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq(
- type='FEAT_MASK').type):
- print('Send Features packet' + HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(
- handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) /
- L2CAP_Hdr(len=12, cid='control') /
- L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp(
- type=l2cap_req[L2CAP_InfoResp].type,
- result='success',
- data=b'\xb8\x00\x00\x00').__repr__())
- self._hci.send(
- HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(
- handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) /
- L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(
- code='info_resp', id=146, len=8) / L2CAP_InfoResp(
- type=l2cap_req[L2CAP_InfoResp].type,
- result='success',
- data=b'\xb8\x00\x00\x00'))
-
- def do_le_scan(self, args):
- """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices
+ split_args = args.split()
+ address = split_args[0] if len(split_args) > 0 else 'NULL'
+ timeout = int(split_args[1]) if len(split_args) > 1 else 2
+ num_responses = 0
+ connect = HCI_Hdr(type='Command') / HCI_Command_Hdr(
+ opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address)
+ self._hci.send(connect)
+ status = None
+ while status == None:
+ response = self._hci.get_packet()
+ if response == False:
+ continue
+ if response[HCI_Hdr].type == HCI_Hdr(
+ type='Event'
+ ).type and response[HCI_Event_Hdr].code == 0xf and response[HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode:
+ status = response[HCI_Event_Command_Status].status
+ if status != HCI_Event_Command_Status(status='pending').status:
+ print('Connection failed with status = ' + str(status))
+ return
+
+ handle = None
+ while handle == None:
+ connection_complete = self._hci.get_packet()
+ if connection_complete == False:
+ continue
+ if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type
+ ) and (connection_complete[HCI_Event_Hdr].code == 0x3):
+ status = connection_complete[
+ HCI_Event_Connection_Complete].status
+ if status != 0:
+ print('Connection complete with failed status = ' +
+ str(status))
+ return
+ handle = connection_complete[
+ HCI_Event_Connection_Complete].handle
+ print('Connection established with handle ' + str(handle))
+ connection_complete.show()
+ hexdump(connection_complete)
+
+ l2cap_done = False
+ while l2cap_done == None:
+ l2cap_req = self._hci.get_packet()
+ if l2cap_req == False:
+ continue
+ if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and (
+ l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr(cid='control').cid
+ ) and (l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req')
+ .code) and (l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq(
+ type='FEAT_MASK').type):
+ print('Send Features packet' + HCI_Hdr(
+ type='ACL Data'
+ ) / HCI_ACL_Hdr(
+ handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) /
+ L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(
+ code='info_resp', id=146, len=8) / L2CAP_InfoResp(
+ type=l2cap_req[L2CAP_InfoResp].type,
+ result='success',
+ data=b'\xb8\x00\x00\x00').__repr__())
+ self._hci.send(
+ HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(
+ handle=l2cap_req[HCI_ACL_Hdr].handle,
+ PB=0,
+ BC=2,
+ len=16) /
+ L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(
+ code='info_resp', id=146, len=8) / L2CAP_InfoResp(
+ type=l2cap_req[L2CAP_InfoResp].type,
+ result='success',
+ data=b'\xb8\x00\x00\x00'))
+
+ def do_le_scan(self, args):
+ """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices
"""
- split_args = args.split()
- enable = int(split_args[0]) if len(split_args) > 0 else 1
- filter_dups = int(split_args[1]) if len(split_args) > 1 else 1
- set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr(
- opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable(
- enable=enable, filter_dups=filter_dups)
- print('Tx: ' + set_scan_enable.__repr__())
- self._hci.send(set_scan_enable)
-
- def do_scan(self, args):
- """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices
+ split_args = args.split()
+ enable = int(split_args[0]) if len(split_args) > 0 else 1
+ filter_dups = int(split_args[1]) if len(split_args) > 1 else 1
+ set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr(
+ opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable(
+ enable=enable, filter_dups=filter_dups)
+ print('Tx: ' + set_scan_enable.__repr__())
+ self._hci.send(set_scan_enable)
+
+ def do_scan(self, args):
+ """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices
"""
- split_args = args.split()
- scan_time = int(split_args[0]) if len(split_args) > 0 else 0
- max_responses = int(split_args[1]) if len(split_args) > 1 else 0
- num_responses = 0
- inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr(
- opcode=0x0401) / HCI_Cmd_Inquiry(
- length=scan_time, max_responses=max_responses)
- print('Tx: ' + inquiry.__repr__())
- self._hci.send(inquiry)
-
- def do_quit(self, args):
- """Arguments: None.
+ split_args = args.split()
+ scan_time = int(split_args[0]) if len(split_args) > 0 else 0
+ max_responses = int(split_args[1]) if len(split_args) > 1 else 0
+ num_responses = 0
+ inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr(
+ opcode=0x0401) / HCI_Cmd_Inquiry(
+ length=scan_time, max_responses=max_responses)
+ print('Tx: ' + inquiry.__repr__())
+ self._hci.send(inquiry)
+
+ def do_quit(self, args):
+ """Arguments: None.
Exits.
"""
- self._hci.tell_rx_thread_to_quit()
- self._hci.close()
- print('Goodbye.')
- return True
+ self._hci.tell_rx_thread_to_quit()
+ self._hci.close()
+ print('Goodbye.')
+ return True
- def do_help(self, args):
- """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+ def do_help(self, args):
+ """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
"""
- if (len(args) == 0):
- cmd.Cmd.do_help(self, args)
+ if (len(args) == 0):
+ cmd.Cmd.do_help(self, args)
def main(argv):
- if len(argv) != 2:
- print('Usage: python hci_socket.py [port]')
- return
- try:
- port = int(argv[1])
- except ValueError:
- print('Error parsing port.')
- else:
+ if len(argv) != 2:
+ print('Usage: python hci_socket.py [port]')
+ return
try:
- hci = HCISocket(port)
- except socket.error as e:
- print('Error connecting to socket: %s' % e)
- except:
- print('Error creating (check arguments).')
+ port = int(argv[1])
+ except ValueError:
+ print('Error parsing port.')
else:
- hci_shell = HCIShell(hci)
- hci_shell.prompt = '$ '
- hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' +
- 'Type \'help\' for more information.')
+ try:
+ hci = HCISocket(port)
+ except socket.error as e:
+ print('Error connecting to socket: %s' % e)
+ except:
+ print('Error creating (check arguments).')
+ else:
+ hci_shell = HCIShell(hci)
+ hci_shell.prompt = '$ '
+ hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' +
+ 'Type \'help\' for more information.')
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)
diff --git a/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py b/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py
index c7b304520c..e9779cb6db 100644
--- a/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py
+++ b/system/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py
@@ -69,124 +69,132 @@ import string
import struct
import sys
+
class LinkLayerSocket(object):
- """Simple wrapper class for a socket object.
+ """Simple wrapper class for a socket object.
Attributes:
socket: The underlying socket created for the specified address and port.
"""
- def __init__(self, port):
- print('port = ' + port)
- self.done_ = False
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect(('localhost', port))
- # Should it be a non-blocking socket?
- # self._socket.setblocking(0)
- self.packets_ = queue.Queue()
- self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
- self.rx_thread_.start()
-
- def rx_bytes(self, size):
- while not self.done_:
- raw_bytes = b''
- while len(raw_bytes) < size and not self.done_:
- more_raw_bytes = self._socket.recv(min(size - len(raw_bytes), 2048))
- if more_raw_bytes:
- raw_bytes += more_raw_bytes
- return raw_bytes
-
- def rx_thread_body(self):
- while not self.done_:
- payload_length = 0
- # Read the size (4B), the type (1B), and the addresses (2*6B)
- header = self.rx_bytes(17)
- if not header:
- continue
- payload_length = header[0]
- payload_length |= header[1] << 8
- payload_length |= header[2] << 16
- payload_length |= header[3] << 24
- print('Rx: type_byte ' + hex(header[4]))
- print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' + hex(header[7]) + ':' + hex(header[8]) + ':' + hex(header[9]) + ':' + hex(header[10]))
- print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' + hex(header[13]) + ':' + hex(header[14]) + ':' + hex(header[15]) + ':' + hex(header[16]))
- # Read the Payload
- payload = self.rx_bytes(payload_length) if payload_length != 0 else b''
- packet_bytes = header + payload
- self.packets_.put(packet_bytes)
-
- def get_packet(self):
- if self.packets_.empty():
- return False
- return self.packets_.get()
-
- def send_binary(self, args):
- joined_args = ''.join(arg for arg in args)
- print(joined_args)
- packet = binascii.a2b_hex(joined_args)
- if self._done:
- return
- self._connection.send(packet)
-
- def tell_rx_thread_to_quit(self):
- self.done_ = True
- self.rx_thread_.join()
+ def __init__(self, port):
+ print('port = ' + port)
+ self.done_ = False
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect(('localhost', port))
+ # Should it be a non-blocking socket?
+ # self._socket.setblocking(0)
+ self.packets_ = queue.Queue()
+ self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
+ self.rx_thread_.start()
+
+ def rx_bytes(self, size):
+ while not self.done_:
+ raw_bytes = b''
+ while len(raw_bytes) < size and not self.done_:
+ more_raw_bytes = self._socket.recv(
+ min(size - len(raw_bytes), 2048))
+ if more_raw_bytes:
+ raw_bytes += more_raw_bytes
+ return raw_bytes
+
+ def rx_thread_body(self):
+ while not self.done_:
+ payload_length = 0
+ # Read the size (4B), the type (1B), and the addresses (2*6B)
+ header = self.rx_bytes(17)
+ if not header:
+ continue
+ payload_length = header[0]
+ payload_length |= header[1] << 8
+ payload_length |= header[2] << 16
+ payload_length |= header[3] << 24
+ print('Rx: type_byte ' + hex(header[4]))
+ print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' +
+ hex(header[7]) + ':' + hex(header[8]) + ':' + hex(header[9]) +
+ ':' + hex(header[10]))
+ print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' +
+ hex(header[13]) + ':' + hex(header[14]) + ':' +
+ hex(header[15]) + ':' + hex(header[16]))
+ # Read the Payload
+ payload = self.rx_bytes(
+ payload_length) if payload_length != 0 else b''
+ packet_bytes = header + payload
+ self.packets_.put(packet_bytes)
+
+ def get_packet(self):
+ if self.packets_.empty():
+ return False
+ return self.packets_.get()
+
+ def send_binary(self, args):
+ joined_args = ''.join(arg for arg in args)
+ print(joined_args)
+ packet = binascii.a2b_hex(joined_args)
+ if self._done:
+ return
+ self._connection.send(packet)
+
+ def tell_rx_thread_to_quit(self):
+ self.done_ = True
+ self.rx_thread_.join()
class LinkLayerShell(cmd.Cmd):
- """Shell for sending binary data to a port.
+ """Shell for sending binary data to a port.
"""
- def __init__(self, link_layer):
- cmd.Cmd.__init__(self)
- self._link_layer = link_layer
+ def __init__(self, link_layer):
+ cmd.Cmd.__init__(self)
+ self._link_layer = link_layer
- def do_send(self, args):
- """Arguments: binary representation of a packet.
+ def do_send(self, args):
+ """Arguments: binary representation of a packet.
"""
- self._link_layer.send_binary(args.split())
+ self._link_layer.send_binary(args.split())
- def do_quit(self, args):
- """Arguments: None.
+ def do_quit(self, args):
+ """Arguments: None.
Exits.
"""
- self._link_layer.tell_rx_thread_to_quit()
- self._link_layer.close()
- print('Goodbye.')
- return True
+ self._link_layer.tell_rx_thread_to_quit()
+ self._link_layer.close()
+ print('Goodbye.')
+ return True
- def do_help(self, args):
- """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+ def do_help(self, args):
+ """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
"""
- if (len(args) == 0):
- cmd.Cmd.do_help(self, args)
+ if (len(args) == 0):
+ cmd.Cmd.do_help(self, args)
def main(argv):
- if len(argv) != 2:
- print('Usage: python link_layer_socket.py [port]')
- return
- try:
- port = int(argv[1])
- except ValueError:
- print('Error parsing port.')
- else:
+ if len(argv) != 2:
+ print('Usage: python link_layer_socket.py [port]')
+ return
try:
- link_layer = LinkLayerSocket(port)
- except socket.error as e:
- print('Error connecting to socket: %s' % e)
- except:
- print('Error creating (check arguments).')
+ port = int(argv[1])
+ except ValueError:
+ print('Error parsing port.')
else:
- link_layer_shell = LinkLayerShell(link_layer)
- link_layer_shell.prompt = '$ '
- link_layer_shell.cmdloop('Welcome to the RootCanal LinkLayer Console \n' +
- 'Type \'help\' for more information.')
+ try:
+ link_layer = LinkLayerSocket(port)
+ except socket.error as e:
+ print('Error connecting to socket: %s' % e)
+ except:
+ print('Error creating (check arguments).')
+ else:
+ link_layer_shell = LinkLayerShell(link_layer)
+ link_layer_shell.prompt = '$ '
+ link_layer_shell.cmdloop(
+ 'Welcome to the RootCanal LinkLayer Console \n' +
+ 'Type \'help\' for more information.')
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)
diff --git a/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py b/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py
index cadeab3c76..b7b0670f8a 100644
--- a/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py
+++ b/system/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py
@@ -75,146 +75,146 @@ DEVICE_ADDRESS_LENGTH = 6
# Used to generate fake device names and addresses during discovery.
def generate_random_name():
- return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
- string.digits) for _ in range(DEVICE_NAME_LENGTH))
+ return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
+ string.digits) for _ in range(DEVICE_NAME_LENGTH))
def generate_random_address():
- return ''.join(random.SystemRandom().choice(string.digits) for _ in \
- range(DEVICE_ADDRESS_LENGTH))
+ return ''.join(random.SystemRandom().choice(string.digits) for _ in \
+ range(DEVICE_ADDRESS_LENGTH))
class Connection(object):
- """Simple wrapper class for a socket object.
+ """Simple wrapper class for a socket object.
Attributes:
socket: The underlying socket created for the specified address and port.
"""
- def __init__(self, port):
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect(('localhost', port))
- self._socket.setblocking(0)
+ def __init__(self, port):
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect(('localhost', port))
+ self._socket.setblocking(0)
- def close(self):
- self._socket.close()
+ def close(self):
+ self._socket.close()
- def send(self, data):
- self._socket.sendall(data)
+ def send(self, data):
+ self._socket.sendall(data)
- def receive(self, size):
- return self._socket.recv(size)
+ def receive(self, size):
+ return self._socket.recv(size)
class RawPort(object):
- """Checks outgoing commands and sends them once verified.
+ """Checks outgoing commands and sends them once verified.
Attributes:
connection: The connection to the HCI port.
"""
- def __init__(self, port):
- self._connection = Connection(port)
- self._closed = False
-
- def close(self):
- self._connection.close()
- self._closed = True
-
- def send_binary(self, args):
- joined_args = ''.join(arg for arg in args)
- print(joined_args)
- packet = binascii.a2b_hex(joined_args)
- if self._closed:
- return
- self._connection.send(packet)
- received = self.receive_response()
- received_bytes = bytearray(received)
- print(raw(received_bytes))
-
- def receive_response(self):
- if self._closed:
- return
- size_chars = self._connection.receive(4)
- if not size_chars:
- print('Debug: No response')
- return False
- size_bytes = bytearray(size_chars)
- response_size = 0
- for i in range(0, len(size_chars) - 1):
- response_size |= ord(size_chars[i]) << (8 * i)
- response = self._connection.receive(response_size)
- return response
-
- def lint_command(self, name, args, name_size, args_size):
- assert name_size == len(name) and args_size == len(args)
- try:
- name.encode('utf-8')
- for arg in args:
- arg.encode('utf-8')
- except UnicodeError:
- print('Unrecognized characters.')
- raise
- if name_size > 255 or args_size > 255:
- raise ValueError # Size must be encodable in one octet.
- for arg in args:
- if len(arg) > 255:
- raise ValueError # Size must be encodable in one octet.
+ def __init__(self, port):
+ self._connection = Connection(port)
+ self._closed = False
+
+ def close(self):
+ self._connection.close()
+ self._closed = True
+
+ def send_binary(self, args):
+ joined_args = ''.join(arg for arg in args)
+ print(joined_args)
+ packet = binascii.a2b_hex(joined_args)
+ if self._closed:
+ return
+ self._connection.send(packet)
+ received = self.receive_response()
+ received_bytes = bytearray(received)
+ print(raw(received_bytes))
+
+ def receive_response(self):
+ if self._closed:
+ return
+ size_chars = self._connection.receive(4)
+ if not size_chars:
+ print('Debug: No response')
+ return False
+ size_bytes = bytearray(size_chars)
+ response_size = 0
+ for i in range(0, len(size_chars) - 1):
+ response_size |= ord(size_chars[i]) << (8 * i)
+ response = self._connection.receive(response_size)
+ return response
+
+ def lint_command(self, name, args, name_size, args_size):
+ assert name_size == len(name) and args_size == len(args)
+ try:
+ name.encode('utf-8')
+ for arg in args:
+ arg.encode('utf-8')
+ except UnicodeError:
+ print('Unrecognized characters.')
+ raise
+ if name_size > 255 or args_size > 255:
+ raise ValueError # Size must be encodable in one octet.
+ for arg in args:
+ if len(arg) > 255:
+ raise ValueError # Size must be encodable in one octet.
class RawPortShell(cmd.Cmd):
- """Shell for sending binary data to a port.
+ """Shell for sending binary data to a port.
"""
- def __init__(self, raw_port):
- cmd.Cmd.__init__(self)
- self._raw_port = raw_port
+ def __init__(self, raw_port):
+ cmd.Cmd.__init__(self)
+ self._raw_port = raw_port
- def do_send(self, args):
- """Arguments: dev_type_str Add a new device of type dev_type_str.
+ def do_send(self, args):
+ """Arguments: dev_type_str Add a new device of type dev_type_str.
"""
- self._raw_port.send_binary(args.split())
+ self._raw_port.send_binary(args.split())
- def do_quit(self, args):
- """Arguments: None.
+ def do_quit(self, args):
+ """Arguments: None.
Exits.
"""
- self._raw_port.close()
- print('Goodbye.')
- return True
+ self._raw_port.close()
+ print('Goodbye.')
+ return True
- def do_help(self, args):
- """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+ def do_help(self, args):
+ """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
"""
- if (len(args) == 0):
- cmd.Cmd.do_help(self, args)
+ if (len(args) == 0):
+ cmd.Cmd.do_help(self, args)
def main(argv):
- if len(argv) != 2:
- print('Usage: python raw_port.py [port]')
- return
- try:
- port = int(argv[1])
- except ValueError:
- print('Error parsing port.')
- else:
+ if len(argv) != 2:
+ print('Usage: python raw_port.py [port]')
+ return
try:
- raw_port = RawPort(port)
- except (socket.error, e):
- print('Error connecting to socket: %s' % e)
- except:
- print('Error creating (check arguments).')
+ port = int(argv[1])
+ except ValueError:
+ print('Error parsing port.')
else:
- raw_port_shell = RawPortShell(raw_port)
- raw_port_shell.prompt = '$ '
- raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' +
- 'Type \'help\' for more information.')
+ try:
+ raw_port = RawPort(port)
+ except (socket.error, e):
+ print('Error connecting to socket: %s' % e)
+ except:
+ print('Error creating (check arguments).')
+ else:
+ raw_port_shell = RawPortShell(raw_port)
+ raw_port_shell.prompt = '$ '
+ raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' +
+ 'Type \'help\' for more information.')
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)
diff --git a/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py b/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py
index 982aee2dc9..89552d9eae 100644
--- a/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py
+++ b/system/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py
@@ -61,137 +61,137 @@ DEVICE_ADDRESS_LENGTH = 6
# Used to generate fake device names and addresses during discovery.
def generate_random_name():
- return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
- string.digits) for _ in range(DEVICE_NAME_LENGTH))
+ return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
+ string.digits) for _ in range(DEVICE_NAME_LENGTH))
def generate_random_address():
- return ''.join(random.SystemRandom().choice(string.digits) for _ in \
- range(DEVICE_ADDRESS_LENGTH))
+ return ''.join(random.SystemRandom().choice(string.digits) for _ in \
+ range(DEVICE_ADDRESS_LENGTH))
class Connection(object):
- """Simple wrapper class for a socket object.
+ """Simple wrapper class for a socket object.
Attributes:
socket: The underlying socket created for the specified address and port.
"""
- def __init__(self, port):
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect(('localhost', port))
- self._socket.setblocking(0)
+ def __init__(self, port):
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect(('localhost', port))
+ self._socket.setblocking(0)
- def close(self):
- self._socket.close()
+ def close(self):
+ self._socket.close()
- def send(self, data):
- self._socket.sendall(data)
+ def send(self, data):
+ self._socket.sendall(data)
- def receive(self, size):
- return self._socket.recv(size)
+ def receive(self, size):
+ return self._socket.recv(size)
class RawPort(object):
- """Checks outgoing commands and sends them once verified.
+ """Checks outgoing commands and sends them once verified.
Attributes:
connection: The connection to the HCI port.
"""
- def __init__(self, port):
- self._connection = Connection(port)
- self._closed = False
-
- def close(self):
- self._connection.close()
- self._closed = True
-
- def send_binary(self, args):
- joined_args = ''.join(arg for arg in args)
- print(joined_args)
- packet = binascii.a2b_hex(joined_args)
- if self._closed:
- return
- self._connection.send(packet)
-
- def receive_response(self):
- if self._closed:
- return
- size_chars = self._connection.receive(4)
- if not size_chars:
- print('Debug: No response')
- return False
- size_bytes = bytearray(size_chars)
- response_size = 0
- for i in range(0, len(size_chars) - 1):
- response_size |= ord(size_chars[i]) << (8 * i)
- response = self._connection.receive(response_size)
- return response
-
- def lint_command(self, name, args, name_size, args_size):
- assert name_size == len(name) and args_size == len(args)
- try:
- name.encode('utf-8')
- for arg in args:
- arg.encode('utf-8')
- except UnicodeError:
- print('Unrecognized characters.')
- raise
- if name_size > 255 or args_size > 255:
- raise ValueError # Size must be encodable in one octet.
- for arg in args:
- if len(arg) > 255:
- raise ValueError # Size must be encodable in one octet.
+ def __init__(self, port):
+ self._connection = Connection(port)
+ self._closed = False
+
+ def close(self):
+ self._connection.close()
+ self._closed = True
+
+ def send_binary(self, args):
+ joined_args = ''.join(arg for arg in args)
+ print(joined_args)
+ packet = binascii.a2b_hex(joined_args)
+ if self._closed:
+ return
+ self._connection.send(packet)
+
+ def receive_response(self):
+ if self._closed:
+ return
+ size_chars = self._connection.receive(4)
+ if not size_chars:
+ print('Debug: No response')
+ return False
+ size_bytes = bytearray(size_chars)
+ response_size = 0
+ for i in range(0, len(size_chars) - 1):
+ response_size |= ord(size_chars[i]) << (8 * i)
+ response = self._connection.receive(response_size)
+ return response
+
+ def lint_command(self, name, args, name_size, args_size):
+ assert name_size == len(name) and args_size == len(args)
+ try:
+ name.encode('utf-8')
+ for arg in args:
+ arg.encode('utf-8')
+ except UnicodeError:
+ print('Unrecognized characters.')
+ raise
+ if name_size > 255 or args_size > 255:
+ raise ValueError # Size must be encodable in one octet.
+ for arg in args:
+ if len(arg) > 255:
+ raise ValueError # Size must be encodable in one octet.
class RawPortShell(cmd.Cmd):
- """Shell for sending binary data to a port."""
+ """Shell for sending binary data to a port."""
- def __init__(self, raw_port):
- cmd.Cmd.__init__(self)
- self._raw_port = raw_port
+ def __init__(self, raw_port):
+ cmd.Cmd.__init__(self)
+ self._raw_port = raw_port
- def do_send(self, args):
- """Arguments: dev_type_str Add a new device of type dev_type_str."""
- self._raw_port.send_binary(args.split())
+ def do_send(self, args):
+ """Arguments: dev_type_str Add a new device of type dev_type_str."""
+ self._raw_port.send_binary(args.split())
- def do_quit(self, args):
- """Arguments: None.
+ def do_quit(self, args):
+ """Arguments: None.
Exits.
"""
- self._raw_port.close()
- print('Goodbye.')
- return True
+ self._raw_port.close()
+ print('Goodbye.')
+ return True
- def do_help(self, args):
- """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr."""
- if (len(args) == 0):
- cmd.Cmd.do_help(self, args)
+ def do_help(self, args):
+ """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr."""
+ if (len(args) == 0):
+ cmd.Cmd.do_help(self, args)
def main(argv):
- if len(argv) != 2:
- print('Usage: python raw_port.py [port]')
- return
- try:
- port = int(argv[1])
- except ValueError:
- print('Error parsing port.')
- else:
+ if len(argv) != 2:
+ print('Usage: python raw_port.py [port]')
+ return
try:
- raw_port = RawPort(port)
- except (socket.error, e):
- print('Error connecting to socket: %s' % e)
- except:
- print('Error creating (check arguments).')
+ port = int(argv[1])
+ except ValueError:
+ print('Error parsing port.')
else:
- raw_port_shell = RawPortShell(raw_port)
- raw_port_shell.prompt = '$ '
- raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' +
- 'Type \'help\' for more information.')
+ try:
+ raw_port = RawPort(port)
+ except (socket.error, e):
+ print('Error connecting to socket: %s' % e)
+ except:
+ print('Error creating (check arguments).')
+ else:
+ raw_port_shell = RawPortShell(raw_port)
+ raw_port_shell.prompt = '$ '
+ raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' +
+ 'Type \'help\' for more information.')
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)
diff --git a/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py b/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py
index 95d973a2fb..52472e9e7d 100644
--- a/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py
+++ b/system/vendor_libs/test_vendor_lib/scripts/simple_stack.py
@@ -72,167 +72,167 @@ from scapy.all import *
class HCI_Cmd_Connect(Packet):
- name = "Connect"
- fields_desc = [
- ByteEnumField("filter", 0, {0: "address"}),
- LEShortField("packet_type", 8),
- ByteEnumField("page_scan_repetition_mode", 0, {
- 0: "R0",
- 1: "R1",
- 2: "R2"
- }),
- ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}),
- LEShortField("clock_offset", 0),
- ByteEnumField("allow_role_switch", 0, {
- 0: "false",
- 1: "true"
- }),
- ]
+ name = "Connect"
+ fields_desc = [
+ ByteEnumField("filter", 0, {0: "address"}),
+ LEShortField("packet_type", 8),
+ ByteEnumField("page_scan_repetition_mode", 0, {
+ 0: "R0",
+ 1: "R1",
+ 2: "R2"
+ }),
+ ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}),
+ LEShortField("clock_offset", 0),
+ ByteEnumField("allow_role_switch", 0, {
+ 0: "false",
+ 1: "true"
+ }),
+ ]
class HCI_Cmd_Inquiry(Packet):
- name = "Inquiry"
- fields_desc = [
- XByteField("LAP0", 0),
- XByteField("LAP1", 0x8B),
- XByteField("LAP2", 0x9E),
- ByteField("length", 1),
- ByteField("max_responses", 0),
- ]
+ name = "Inquiry"
+ fields_desc = [
+ XByteField("LAP0", 0),
+ XByteField("LAP1", 0x8B),
+ XByteField("LAP2", 0x9E),
+ ByteField("length", 1),
+ ByteField("max_responses", 0),
+ ]
""" END SCAPY stuff"""
class Connection(object):
- """Simple wrapper class for a socket object.
+ """Simple wrapper class for a socket object.
Attributes:
socket: The underlying socket created for the specified address and port.
"""
- def __init__(self, port):
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect(("localhost", port))
+ def __init__(self, port):
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect(("localhost", port))
- def close(self):
- self._socket.close()
+ def close(self):
+ self._socket.close()
- def send(self, data):
- self._socket.sendall(data)
+ def send(self, data):
+ self._socket.sendall(data)
- def receive(self, size):
- return self._socket.recv(size)
+ def receive(self, size):
+ return self._socket.recv(size)
class RawPort(object):
- """Converts outgoing packets to binary and sends them.
+ """Converts outgoing packets to binary and sends them.
Attributes:
connection: The connection to the HCI port.
"""
- def __init__(self, port):
- self._connection = Connection(port)
-
- def close(self):
- self._connection.close()
-
- def send_binary(self, args):
- joined_args = "".join(arg for arg in args)
- print(joined_args)
- packet = binascii.a2b_hex(joined_args)
- self._connection.send(packet)
-
- def receive_response(self):
- ready_to_read, ready_to_write, in_error = \
- select(
- [ self._connection._socket ],
- [ ],
- [ self._connection._socket ],
- 1.5)
- if len(in_error) > 0:
- print("Error")
- return False
- if len(ready_to_read) > 0:
- print("Ready to Read")
- type_str = self._connection.receive(512)
- print(len(type_str))
- print(type_str)
- return type_str
- print("Returning false at the end")
- return False
+ def __init__(self, port):
+ self._connection = Connection(port)
+
+ def close(self):
+ self._connection.close()
+
+ def send_binary(self, args):
+ joined_args = "".join(arg for arg in args)
+ print(joined_args)
+ packet = binascii.a2b_hex(joined_args)
+ self._connection.send(packet)
+
+ def receive_response(self):
+ ready_to_read, ready_to_write, in_error = \
+ select(
+ [ self._connection._socket ],
+ [ ],
+ [ self._connection._socket ],
+ 1.5)
+ if len(in_error) > 0:
+ print("Error")
+ return False
+ if len(ready_to_read) > 0:
+ print("Ready to Read")
+ type_str = self._connection.receive(512)
+ print(len(type_str))
+ print(type_str)
+ return type_str
+ print("Returning false at the end")
+ return False
class RawPortShell(cmd.Cmd):
- """Shell for sending binary data to a port.
+ """Shell for sending binary data to a port.
"""
- def __init__(self, raw_port):
- cmd.Cmd.__init__(self)
- self._raw_port = raw_port
+ def __init__(self, raw_port):
+ cmd.Cmd.__init__(self)
+ self._raw_port = raw_port
- def do_send(self, args):
- """Arguments: dev_type_str Add a new device of type dev_type_str.
+ def do_send(self, args):
+ """Arguments: dev_type_str Add a new device of type dev_type_str.
"""
- self._raw_port.send_binary(args.split())
+ self._raw_port.send_binary(args.split())
- def do_scan(self, args):
- """Arguments: timeout (seconds) Print the scan responses from reachable devices
+ def do_scan(self, args):
+ """Arguments: timeout (seconds) Print the scan responses from reachable devices
"""
- self._raw_port.send_binary(args.split())
+ self._raw_port.send_binary(args.split())
- def do_quit(self, args):
- """Arguments: None.
+ def do_quit(self, args):
+ """Arguments: None.
Exits.
"""
- self._raw_port.close()
- print("Goodbye.")
- return True
+ self._raw_port.close()
+ print("Goodbye.")
+ return True
- def do_help(self, args):
- """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+ def do_help(self, args):
+ """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
"""
- if (len(args) == 0):
- cmd.Cmd.do_help(self, args)
+ if (len(args) == 0):
+ cmd.Cmd.do_help(self, args)
- def postcmd(self, stop, line):
- """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue
+ def postcmd(self, stop, line):
+ """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue
"""
- if stop:
- return True
- response = self._raw_port.receive_response()
- print(response)
- return False
+ if stop:
+ return True
+ response = self._raw_port.receive_response()
+ print(response)
+ return False
def main(argv):
- if len(argv) != 2:
- print("Usage: python raw_port.py [port]")
- return
- try:
- port = int(argv[1])
- except ValueError:
- print("Error parsing port.")
- else:
+ if len(argv) != 2:
+ print("Usage: python raw_port.py [port]")
+ return
try:
- raw_port = RawPort(port)
- except (socket.error, e):
- print("Error connecting to socket: %s" % e)
- except:
- print("Error creating (check arguments).")
+ port = int(argv[1])
+ except ValueError:
+ print("Error parsing port.")
else:
- raw_port_shell = RawPortShell(raw_port)
- raw_port_shell.prompt = "$ "
- raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" +
- 'Type \'help\' for more information.')
+ try:
+ raw_port = RawPort(port)
+ except (socket.error, e):
+ print("Error connecting to socket: %s" % e)
+ except:
+ print("Error creating (check arguments).")
+ else:
+ raw_port_shell = RawPortShell(raw_port)
+ raw_port_shell.prompt = "$ "
+ raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" +
+ 'Type \'help\' for more information.')
if __name__ == "__main__":
- main(sys.argv)
+ main(sys.argv)
diff --git a/system/vendor_libs/test_vendor_lib/scripts/test_channel.py b/system/vendor_libs/test_vendor_lib/scripts/test_channel.py
index 447130b537..79b19b1bbb 100644
--- a/system/vendor_libs/test_vendor_lib/scripts/test_channel.py
+++ b/system/vendor_libs/test_vendor_lib/scripts/test_channel.py
@@ -49,97 +49,98 @@ DEVICE_ADDRESS_LENGTH = 6
# Used to generate fake device names and addresses during discovery.
def generate_random_name():
- return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
- string.digits) for _ in range(DEVICE_NAME_LENGTH))
+ return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
+ string.digits) for _ in range(DEVICE_NAME_LENGTH))
def generate_random_address():
- return ''.join(random.SystemRandom().choice(string.digits) for _ in \
- range(DEVICE_ADDRESS_LENGTH))
+ return ''.join(random.SystemRandom().choice(string.digits) for _ in \
+ range(DEVICE_ADDRESS_LENGTH))
class Connection(object):
- """Simple wrapper class for a socket object.
+ """Simple wrapper class for a socket object.
Attributes:
socket: The underlying socket created for the specified address and port.
"""
- def __init__(self, port):
- self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect(('localhost', port))
+ def __init__(self, port):
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.connect(('localhost', port))
- def close(self):
- self._socket.close()
+ def close(self):
+ self._socket.close()
- def send(self, data):
- self._socket.sendall(data)
+ def send(self, data):
+ self._socket.sendall(data)
- def receive(self, size):
- return self._socket.recv(size)
+ def receive(self, size):
+ return self._socket.recv(size)
class TestChannel(object):
- """Checks outgoing commands and sends them once verified.
+ """Checks outgoing commands and sends them once verified.
Attributes:
connection: The connection to the test vendor library that commands are sent
on.
"""
- def __init__(self, port):
- self._connection = Connection(port)
- self._closed = False
-
- def close(self):
- self._connection.close()
- self._closed = True
-
- def send_command(self, name, args):
- name_size = len(name)
- args_size = len(args)
- self.lint_command(name, args, name_size, args_size)
- encoded_name = chr(name_size) + name
- encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
- command = encoded_name + encoded_args
- if self._closed:
- return
- self._connection.send(command)
- if name != 'CLOSE_TEST_CHANNEL':
- print self.receive_response()
-
- def receive_response(self):
- if self._closed:
- return
- size_chars = self._connection.receive(4)
- size_bytes = bytearray(size_chars)
- if not size_chars:
- print 'No response, assuming that the connection is broken'
- return False
- response_size = 0
- for i in range(0, len(size_chars) - 1):
- response_size |= ord(size_chars[i]) << (8 * i)
- response = self._connection.receive(response_size)
- return response
-
- def lint_command(self, name, args, name_size, args_size):
- assert name_size == len(name) and args_size == len(args)
- try:
- name.encode('utf-8')
- for arg in args:
- arg.encode('utf-8')
- except UnicodeError:
- print 'Unrecognized characters.'
- raise
- if name_size > 255 or args_size > 255:
- raise ValueError # Size must be encodable in one octet.
- for arg in args:
- if len(arg) > 255:
- raise ValueError # Size must be encodable in one octet.
+ def __init__(self, port):
+ self._connection = Connection(port)
+ self._closed = False
+
+ def close(self):
+ self._connection.close()
+ self._closed = True
+
+ def send_command(self, name, args):
+ name_size = len(name)
+ args_size = len(args)
+ self.lint_command(name, args, name_size, args_size)
+ encoded_name = chr(name_size) + name
+ encoded_args = chr(args_size) + ''.join(
+ chr(len(arg)) + arg for arg in args)
+ command = encoded_name + encoded_args
+ if self._closed:
+ return
+ self._connection.send(command)
+ if name != 'CLOSE_TEST_CHANNEL':
+ print self.receive_response()
+
+ def receive_response(self):
+ if self._closed:
+ return
+ size_chars = self._connection.receive(4)
+ size_bytes = bytearray(size_chars)
+ if not size_chars:
+ print 'No response, assuming that the connection is broken'
+ return False
+ response_size = 0
+ for i in range(0, len(size_chars) - 1):
+ response_size |= ord(size_chars[i]) << (8 * i)
+ response = self._connection.receive(response_size)
+ return response
+
+ def lint_command(self, name, args, name_size, args_size):
+ assert name_size == len(name) and args_size == len(args)
+ try:
+ name.encode('utf-8')
+ for arg in args:
+ arg.encode('utf-8')
+ except UnicodeError:
+ print 'Unrecognized characters.'
+ raise
+ if name_size > 255 or args_size > 255:
+ raise ValueError # Size must be encodable in one octet.
+ for arg in args:
+ if len(arg) > 255:
+ raise ValueError # Size must be encodable in one octet.
class TestChannelShell(cmd.Cmd):
- """Shell for sending test channel data to controller.
+ """Shell for sending test channel data to controller.
Manages the test channel to the controller and defines a set of commands the
user can send to the controller as well. These commands are processed parallel
@@ -150,138 +151,138 @@ class TestChannelShell(cmd.Cmd):
test_channel: The communication channel to send data to the controller.
"""
- def __init__(self, test_channel):
- cmd.Cmd.__init__(self)
- self._test_channel = test_channel
+ def __init__(self, test_channel):
+ cmd.Cmd.__init__(self)
+ self._test_channel = test_channel
- def do_add(self, args):
- """Arguments: dev_type_str Add a new device of type dev_type_str.
+ def do_add(self, args):
+ """Arguments: dev_type_str Add a new device of type dev_type_str.
"""
- self._test_channel.send_command('add', args.split())
+ self._test_channel.send_command('add', args.split())
- def do_del(self, args):
- """Arguments: device index Delete the device with the specified index.
+ def do_del(self, args):
+ """Arguments: device index Delete the device with the specified index.
"""
- self._test_channel.send_command('del', args.split())
+ self._test_channel.send_command('del', args.split())
- def do_add_phy(self, args):
- """Arguments: dev_type_str Add a new device of type dev_type_str.
+ def do_add_phy(self, args):
+ """Arguments: dev_type_str Add a new device of type dev_type_str.
"""
- self._test_channel.send_command('add_phy', args.split())
+ self._test_channel.send_command('add_phy', args.split())
- def do_del_phy(self, args):
- """Arguments: phy index Delete the phy with the specified index.
+ def do_del_phy(self, args):
+ """Arguments: phy index Delete the phy with the specified index.
"""
- self._test_channel.send_command('del_phy', args.split())
+ self._test_channel.send_command('del_phy', args.split())
- def do_add_device_to_phy(self, args):
- """Arguments: device index phy index Add a new device of type dev_type_str.
+ def do_add_device_to_phy(self, args):
+ """Arguments: device index phy index Add a new device of type dev_type_str.
"""
- self._test_channel.send_command('add_device_to_phy', args.split())
+ self._test_channel.send_command('add_device_to_phy', args.split())
- def do_del_device_from_phy(self, args):
- """Arguments: phy index Delete the phy with the specified index.
+ def do_del_device_from_phy(self, args):
+ """Arguments: phy index Delete the phy with the specified index.
"""
- self._test_channel.send_command('del_device_from_phy', args.split())
+ self._test_channel.send_command('del_device_from_phy', args.split())
- def do_add_remote(self, args):
- """Arguments: dev_type_str Connect to a remote device at arg1@arg2.
+ def do_add_remote(self, args):
+ """Arguments: dev_type_str Connect to a remote device at arg1@arg2.
"""
- self._test_channel.send_command('add_remote', args.split())
+ self._test_channel.send_command('add_remote', args.split())
- def do_get(self, args):
- """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num.
+ def do_get(self, args):
+ """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num.
"""
- self._test_channel.send_command('get', args.split())
+ self._test_channel.send_command('get', args.split())
- def do_set(self, args):
- """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val.
+ def do_set(self, args):
+ """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val.
"""
- self._test_channel.send_command('set', args.split())
+ self._test_channel.send_command('set', args.split())
- def do_set_device_address(self, args):
- """Arguments: dev_num addr Set the address of device dev_num equal to addr.
+ def do_set_device_address(self, args):
+ """Arguments: dev_num addr Set the address of device dev_num equal to addr.
"""
- self._test_channel.send_command('set_device_address', args.split())
+ self._test_channel.send_command('set_device_address', args.split())
- def do_list(self, args):
- """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr.
+ def do_list(self, args):
+ """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr.
"""
- self._test_channel.send_command('list', args.split())
+ self._test_channel.send_command('list', args.split())
- def do_quit(self, args):
- """Arguments: None.
+ def do_quit(self, args):
+ """Arguments: None.
Exits the test channel.
"""
- self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
- self._test_channel.close()
- print 'Goodbye.'
- return True
+ self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
+ self._test_channel.close()
+ print 'Goodbye.'
+ return True
- def do_help(self, args):
- """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+ def do_help(self, args):
+ """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
"""
- if (len(args) == 0):
- cmd.Cmd.do_help(self, args)
- else:
- self._test_channel.send_command('help', args.split())
+ if (len(args) == 0):
+ cmd.Cmd.do_help(self, args)
+ else:
+ self._test_channel.send_command('help', args.split())
- def preloop(self):
- """Clear out the buffer
+ def preloop(self):
+ """Clear out the buffer
"""
- response = self._test_channel.receive_response()
-
- #def postcmd(self, stop, line):
- #"""
- #Called after each command
- #stop : whether we will stop after this command
- #line : the previous input line
- #Return True to stop, False to continue
- #"""
- #if stop:
- #return True
- #response = self._test_channel.receive_response()
- #if not response:
- #return True
- #print response
- #return False
+ response = self._test_channel.receive_response()
+
+ #def postcmd(self, stop, line):
+ #"""
+ #Called after each command
+ #stop : whether we will stop after this command
+ #line : the previous input line
+ #Return True to stop, False to continue
+ #"""
+ #if stop:
+ #return True
+ #response = self._test_channel.receive_response()
+ #if not response:
+ #return True
+ #print response
+ #return False
def main(argv):
- if len(argv) != 2:
- print 'Usage: python test_channel.py [port]'
- return
- try:
- port = int(argv[1])
- except ValueError:
- print 'Error parsing port.'
- else:
+ if len(argv) != 2:
+ print 'Usage: python test_channel.py [port]'
+ return
try:
- test_channel = TestChannel(port)
- except socket.error, e:
- print 'Error connecting to socket: %s' % e
- except:
- print 'Error creating test channel (check argument).'
+ port = int(argv[1])
+ except ValueError:
+ print 'Error parsing port.'
else:
- test_channel_shell = TestChannelShell(test_channel)
- test_channel_shell.prompt = '$ '
- test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' +
- 'Type \'help\' for more information.')
+ try:
+ test_channel = TestChannel(port)
+ except socket.error, e:
+ print 'Error connecting to socket: %s' % e
+ except:
+ print 'Error creating test channel (check argument).'
+ else:
+ test_channel_shell = TestChannelShell(test_channel)
+ test_channel_shell.prompt = '$ '
+ test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' +
+ 'Type \'help\' for more information.')
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)