summaryrefslogtreecommitdiff
path: root/tools/releasetools/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/releasetools/common.py')
-rw-r--r--tools/releasetools/common.py916
1 files changed, 764 insertions, 152 deletions
diff --git a/tools/releasetools/common.py b/tools/releasetools/common.py
index 9a27ad3f94..dcf470a81f 100644
--- a/tools/releasetools/common.py
+++ b/tools/releasetools/common.py
@@ -14,6 +14,7 @@
from __future__ import print_function
+import base64
import collections
import copy
import errno
@@ -30,7 +31,6 @@ import platform
import re
import shlex
import shutil
-import string
import subprocess
import sys
import tempfile
@@ -39,32 +39,37 @@ import time
import zipfile
from hashlib import sha1, sha256
-import blockimgdiff
+import images
import sparse_img
+from blockimgdiff import BlockImageDiff
logger = logging.getLogger(__name__)
class Options(object):
- def __init__(self):
- base_out_path = os.getenv('OUT_DIR_COMMON_BASE')
- if base_out_path is None:
- base_search_path = "out"
- else:
- base_search_path = os.path.join(base_out_path,
- os.path.basename(os.getcwd()))
- platform_search_path = {
- "linux2": os.path.join(base_search_path, "host/linux-x86"),
- "darwin": os.path.join(base_search_path, "host/darwin-x86"),
- }
+ def __init__(self):
+ # Set up search path, in order to find framework/ and lib64/. At the time of
+ # running this function, user-supplied search path (`--path`) hasn't been
+ # available. So the value set here is the default, which might be overridden
+ # by commandline flag later.
+ exec_path = sys.argv[0]
+ if exec_path.endswith('.py'):
+ script_name = os.path.basename(exec_path)
+ # logger hasn't been initialized yet at this point. Use print to output
+ # warnings.
+ print(
+ 'Warning: releasetools script should be invoked as hermetic Python '
+ 'executable -- build and run `{}` directly.'.format(script_name[:-3]),
+ file=sys.stderr)
+ self.search_path = os.path.realpath(os.path.join(os.path.dirname(exec_path), '..'))
- self.search_path = platform_search_path.get(sys.platform)
self.signapk_path = "framework/signapk.jar" # Relative to search_path
self.signapk_shared_library_path = "lib64" # Relative to search_path
self.extra_signapk_args = []
self.java_path = "java" # Use the one on the path by default.
self.java_args = ["-Xmx2048m"] # The default JVM args.
+ self.android_jar_path = None
self.public_key_suffix = ".x509.pem"
self.private_key_suffix = ".pk8"
# use otatools built boot_signer by default
@@ -72,6 +77,11 @@ class Options(object):
self.boot_signer_args = []
self.verity_signer_path = None
self.verity_signer_args = []
+ self.aftl_tool_path = None
+ self.aftl_server = None
+ self.aftl_key_path = None
+ self.aftl_manufacturer_key_path = None
+ self.aftl_signer_helper = None
self.verbose = False
self.tempfiles = []
self.device_specific = None
@@ -83,6 +93,7 @@ class Options(object):
# Stash size cannot exceed cache_size * threshold.
self.cache_size = None
self.stash_threshold = 0.8
+ self.logfile = None
OPTIONS = Options()
@@ -93,16 +104,17 @@ BLOCK_SIZE = 4096
# Values for "certificate" in apkcerts that mean special things.
SPECIAL_CERT_STRINGS = ("PRESIGNED", "EXTERNAL")
-# The partitions allowed to be signed by AVB (Android verified boot 2.0).
-AVB_PARTITIONS = ('boot', 'recovery', 'system', 'vendor', 'product',
- 'product_services', 'dtbo', 'odm')
+# The partitions allowed to be signed by AVB (Android Verified Boot 2.0). Note
+# that system_other is not in the list because we don't want to include its
+# descriptor into vbmeta.img.
+AVB_PARTITIONS = ('boot', 'dtbo', 'odm', 'product', 'recovery', 'system',
+ 'system_ext', 'vendor', 'vendor_boot')
# Chained VBMeta partitions.
AVB_VBMETA_PARTITIONS = ('vbmeta_system', 'vbmeta_vendor')
# Partitions that should have their care_map added to META/care_map.pb
-PARTITIONS_WITH_CARE_MAP = ('system', 'vendor', 'product', 'product_services',
- 'odm')
+PARTITIONS_WITH_CARE_MAP = ('system', 'vendor', 'product', 'system_ext', 'odm')
class ErrorCode(object):
@@ -153,13 +165,14 @@ def InitLogging():
'default': {
'class': 'logging.StreamHandler',
'formatter': 'standard',
+ 'level': 'WARNING',
},
},
'loggers': {
'': {
'handlers': ['default'],
- 'level': 'WARNING',
'propagate': True,
+ 'level': 'INFO',
}
}
}
@@ -172,8 +185,19 @@ def InitLogging():
# Increase the logging level for verbose mode.
if OPTIONS.verbose:
- config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
- config['loggers']['']['level'] = 'INFO'
+ config = copy.deepcopy(config)
+ config['handlers']['default']['level'] = 'INFO'
+
+ if OPTIONS.logfile:
+ config = copy.deepcopy(config)
+ config['handlers']['logfile'] = {
+ 'class': 'logging.FileHandler',
+ 'formatter': 'standard',
+ 'level': 'INFO',
+ 'mode': 'w',
+ 'filename': OPTIONS.logfile,
+ }
+ config['loggers']['']['handlers'].append('logfile')
logging.config.dictConfig(config)
@@ -188,6 +212,8 @@ def Run(args, verbose=None, **kwargs):
kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
stdin, etc. stdout and stderr will default to subprocess.PIPE and
subprocess.STDOUT respectively unless caller specifies any of them.
+ universal_newlines will default to True, as most of the users in
+ releasetools expect string output.
Returns:
A subprocess.Popen object.
@@ -195,6 +221,8 @@ def Run(args, verbose=None, **kwargs):
if 'stdout' not in kwargs and 'stderr' not in kwargs:
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT
+ if 'universal_newlines' not in kwargs:
+ kwargs['universal_newlines'] = True
# Don't log any if caller explicitly says so.
if verbose != False:
logger.info(" Running: \"%s\"", " ".join(args))
@@ -243,6 +271,8 @@ def RunAndCheckOutput(args, verbose=None, **kwargs):
"""
proc = Run(args, verbose=verbose, **kwargs)
output, _ = proc.communicate()
+ if output is None:
+ output = ""
# Don't log any if caller explicitly says so.
if verbose != False:
logger.info("%s", output.rstrip())
@@ -274,6 +304,241 @@ def CloseInheritedPipes():
pass
+class BuildInfo(object):
+ """A class that holds the information for a given build.
+
+ This class wraps up the property querying for a given source or target build.
+ It abstracts away the logic of handling OEM-specific properties, and caches
+ the commonly used properties such as fingerprint.
+
+ There are two types of info dicts: a) build-time info dict, which is generated
+ at build time (i.e. included in a target_files zip); b) OEM info dict that is
+ specified at package generation time (via command line argument
+ '--oem_settings'). If a build doesn't use OEM-specific properties (i.e. not
+ having "oem_fingerprint_properties" in build-time info dict), all the queries
+ would be answered based on build-time info dict only. Otherwise if using
+ OEM-specific properties, some of them will be calculated from two info dicts.
+
+ Users can query properties similarly as using a dict() (e.g. info['fstab']),
+ or to query build properties via GetBuildProp() or GetPartitionBuildProp().
+
+ Attributes:
+ info_dict: The build-time info dict.
+ is_ab: Whether it's a build that uses A/B OTA.
+ oem_dicts: A list of OEM dicts.
+ oem_props: A list of OEM properties that should be read from OEM dicts; None
+ if the build doesn't use any OEM-specific property.
+ fingerprint: The fingerprint of the build, which would be calculated based
+ on OEM properties if applicable.
+ device: The device name, which could come from OEM dicts if applicable.
+ """
+
+ _RO_PRODUCT_RESOLVE_PROPS = ["ro.product.brand", "ro.product.device",
+ "ro.product.manufacturer", "ro.product.model",
+ "ro.product.name"]
+ _RO_PRODUCT_PROPS_DEFAULT_SOURCE_ORDER = ["product", "odm", "vendor",
+ "system_ext", "system"]
+
+ def __init__(self, info_dict, oem_dicts=None):
+ """Initializes a BuildInfo instance with the given dicts.
+
+ Note that it only wraps up the given dicts, without making copies.
+
+ Arguments:
+ info_dict: The build-time info dict.
+ oem_dicts: A list of OEM dicts (which is parsed from --oem_settings). Note
+ that it always uses the first dict to calculate the fingerprint or the
+ device name. The rest would be used for asserting OEM properties only
+ (e.g. one package can be installed on one of these devices).
+
+ Raises:
+ ValueError: On invalid inputs.
+ """
+ self.info_dict = info_dict
+ self.oem_dicts = oem_dicts
+
+ self._is_ab = info_dict.get("ab_update") == "true"
+ self._oem_props = info_dict.get("oem_fingerprint_properties")
+
+ if self._oem_props:
+ assert oem_dicts, "OEM source required for this build"
+
+ def check_fingerprint(fingerprint):
+ if (" " in fingerprint or any(ord(ch) > 127 for ch in fingerprint)):
+ raise ValueError(
+ 'Invalid build fingerprint: "{}". See the requirement in Android CDD '
+ "3.2.2. Build Parameters.".format(fingerprint))
+
+
+ self._partition_fingerprints = {}
+ for partition in PARTITIONS_WITH_CARE_MAP:
+ try:
+ fingerprint = self.CalculatePartitionFingerprint(partition)
+ check_fingerprint(fingerprint)
+ self._partition_fingerprints[partition] = fingerprint
+ except ExternalError:
+ continue
+ if "system" in self._partition_fingerprints:
+ # system_other is not included in PARTITIONS_WITH_CARE_MAP, but does
+ # need a fingerprint when creating the image.
+ self._partition_fingerprints[
+ "system_other"] = self._partition_fingerprints["system"]
+
+ # These two should be computed only after setting self._oem_props.
+ self._device = self.GetOemProperty("ro.product.device")
+ self._fingerprint = self.CalculateFingerprint()
+ check_fingerprint(self._fingerprint)
+
+ @property
+ def is_ab(self):
+ return self._is_ab
+
+ @property
+ def device(self):
+ return self._device
+
+ @property
+ def fingerprint(self):
+ return self._fingerprint
+
+ @property
+ def oem_props(self):
+ return self._oem_props
+
+ def __getitem__(self, key):
+ return self.info_dict[key]
+
+ def __setitem__(self, key, value):
+ self.info_dict[key] = value
+
+ def get(self, key, default=None):
+ return self.info_dict.get(key, default)
+
+ def items(self):
+ return self.info_dict.items()
+
+ def GetPartitionBuildProp(self, prop, partition):
+ """Returns the inquired build property for the provided partition."""
+ # If provided a partition for this property, only look within that
+ # partition's build.prop.
+ if prop in BuildInfo._RO_PRODUCT_RESOLVE_PROPS:
+ prop = prop.replace("ro.product", "ro.product.{}".format(partition))
+ else:
+ prop = prop.replace("ro.", "ro.{}.".format(partition))
+ try:
+ return self.info_dict.get("{}.build.prop".format(partition), {})[prop]
+ except KeyError:
+ raise ExternalError("couldn't find %s in %s.build.prop" %
+ (prop, partition))
+
+ def GetBuildProp(self, prop):
+ """Returns the inquired build property from the standard build.prop file."""
+ if prop in BuildInfo._RO_PRODUCT_RESOLVE_PROPS:
+ return self._ResolveRoProductBuildProp(prop)
+
+ try:
+ return self.info_dict.get("build.prop", {})[prop]
+ except KeyError:
+ raise ExternalError("couldn't find %s in build.prop" % (prop,))
+
+ def _ResolveRoProductBuildProp(self, prop):
+ """Resolves the inquired ro.product.* build property"""
+ prop_val = self.info_dict.get("build.prop", {}).get(prop)
+ if prop_val:
+ return prop_val
+
+ source_order_val = self.info_dict.get("build.prop", {}).get(
+ "ro.product.property_source_order")
+ if source_order_val:
+ source_order = source_order_val.split(",")
+ else:
+ source_order = BuildInfo._RO_PRODUCT_PROPS_DEFAULT_SOURCE_ORDER
+
+ # Check that all sources in ro.product.property_source_order are valid
+ if any([x not in BuildInfo._RO_PRODUCT_PROPS_DEFAULT_SOURCE_ORDER
+ for x in source_order]):
+ raise ExternalError(
+ "Invalid ro.product.property_source_order '{}'".format(source_order))
+
+ for source in source_order:
+ source_prop = prop.replace(
+ "ro.product", "ro.product.{}".format(source), 1)
+ prop_val = self.info_dict.get(
+ "{}.build.prop".format(source), {}).get(source_prop)
+ if prop_val:
+ return prop_val
+
+ raise ExternalError("couldn't resolve {}".format(prop))
+
+ def GetOemProperty(self, key):
+ if self.oem_props is not None and key in self.oem_props:
+ return self.oem_dicts[0][key]
+ return self.GetBuildProp(key)
+
+ def GetPartitionFingerprint(self, partition):
+ return self._partition_fingerprints.get(partition, None)
+
+ def CalculatePartitionFingerprint(self, partition):
+ try:
+ return self.GetPartitionBuildProp("ro.build.fingerprint", partition)
+ except ExternalError:
+ return "{}/{}/{}:{}/{}/{}:{}/{}".format(
+ self.GetPartitionBuildProp("ro.product.brand", partition),
+ self.GetPartitionBuildProp("ro.product.name", partition),
+ self.GetPartitionBuildProp("ro.product.device", partition),
+ self.GetPartitionBuildProp("ro.build.version.release", partition),
+ self.GetPartitionBuildProp("ro.build.id", partition),
+ self.GetPartitionBuildProp("ro.build.version.incremental", partition),
+ self.GetPartitionBuildProp("ro.build.type", partition),
+ self.GetPartitionBuildProp("ro.build.tags", partition))
+
+ def CalculateFingerprint(self):
+ if self.oem_props is None:
+ try:
+ return self.GetBuildProp("ro.build.fingerprint")
+ except ExternalError:
+ return "{}/{}/{}:{}/{}/{}:{}/{}".format(
+ self.GetBuildProp("ro.product.brand"),
+ self.GetBuildProp("ro.product.name"),
+ self.GetBuildProp("ro.product.device"),
+ self.GetBuildProp("ro.build.version.release"),
+ self.GetBuildProp("ro.build.id"),
+ self.GetBuildProp("ro.build.version.incremental"),
+ self.GetBuildProp("ro.build.type"),
+ self.GetBuildProp("ro.build.tags"))
+ return "%s/%s/%s:%s" % (
+ self.GetOemProperty("ro.product.brand"),
+ self.GetOemProperty("ro.product.name"),
+ self.GetOemProperty("ro.product.device"),
+ self.GetBuildProp("ro.build.thumbprint"))
+
+ def WriteMountOemScript(self, script):
+ assert self.oem_props is not None
+ recovery_mount_options = self.info_dict.get("recovery_mount_options")
+ script.Mount("/oem", recovery_mount_options)
+
+ def WriteDeviceAssertions(self, script, oem_no_mount):
+ # Read the property directly if not using OEM properties.
+ if not self.oem_props:
+ script.AssertDevice(self.device)
+ return
+
+ # Otherwise assert OEM properties.
+ if not self.oem_dicts:
+ raise ExternalError(
+ "No OEM file provided to answer expected assertions")
+
+ for prop in self.oem_props.split():
+ values = []
+ for oem_dict in self.oem_dicts:
+ if prop in oem_dict:
+ values.append(oem_dict[prop])
+ if not values:
+ raise ExternalError(
+ "The OEM file is missing the property %s" % (prop,))
+ script.AssertOemProperty(prop, values, oem_no_mount)
+
+
def LoadInfoDict(input_file, repacking=False):
"""Loads the key/value pairs from the given input target_files.
@@ -312,7 +577,7 @@ def LoadInfoDict(input_file, repacking=False):
def read_helper(fn):
if isinstance(input_file, zipfile.ZipFile):
- return input_file.read(fn)
+ return input_file.read(fn).decode()
else:
path = os.path.join(input_file, *fn.split("/"))
try:
@@ -333,41 +598,34 @@ def LoadInfoDict(input_file, repacking=False):
raise ValueError("Failed to find 'fstab_version'")
if repacking:
- # We carry a copy of file_contexts.bin under META/. If not available, search
- # BOOT/RAMDISK/. Note that sometimes we may need a different file to build
- # images than the one running on device, in that case, we must have the one
- # for image generation copied to META/.
- fc_basename = os.path.basename(d.get("selinux_fc", "file_contexts"))
- fc_config = os.path.join(input_file, "META", fc_basename)
- assert os.path.exists(fc_config)
+ # "selinux_fc" properties should point to the file_contexts files
+ # (file_contexts.bin) under META/.
+ for key in d:
+ if key.endswith("selinux_fc"):
+ fc_basename = os.path.basename(d[key])
+ fc_config = os.path.join(input_file, "META", fc_basename)
+ assert os.path.exists(fc_config)
- d["selinux_fc"] = fc_config
+ d[key] = fc_config
# Similarly we need to redirect "root_dir", and "root_fs_config".
d["root_dir"] = os.path.join(input_file, "ROOT")
d["root_fs_config"] = os.path.join(
input_file, "META", "root_filesystem_config.txt")
- # Redirect {system,vendor}_base_fs_file.
- if "system_base_fs_file" in d:
- basename = os.path.basename(d["system_base_fs_file"])
- system_base_fs_file = os.path.join(input_file, "META", basename)
- if os.path.exists(system_base_fs_file):
- d["system_base_fs_file"] = system_base_fs_file
- else:
- logger.warning(
- "Failed to find system base fs file: %s", system_base_fs_file)
- del d["system_base_fs_file"]
-
- if "vendor_base_fs_file" in d:
- basename = os.path.basename(d["vendor_base_fs_file"])
- vendor_base_fs_file = os.path.join(input_file, "META", basename)
- if os.path.exists(vendor_base_fs_file):
- d["vendor_base_fs_file"] = vendor_base_fs_file
+ # Redirect {partition}_base_fs_file for each of the named partitions.
+ for part_name in ["system", "vendor", "system_ext", "product", "odm"]:
+ key_name = part_name + "_base_fs_file"
+ if key_name not in d:
+ continue
+ basename = os.path.basename(d[key_name])
+ base_fs_file = os.path.join(input_file, "META", basename)
+ if os.path.exists(base_fs_file):
+ d[key_name] = base_fs_file
else:
logger.warning(
- "Failed to find vendor base fs file: %s", vendor_base_fs_file)
- del d["vendor_base_fs_file"]
+ "Failed to find %s base fs file: %s", part_name, base_fs_file)
+ del d[key_name]
def makeint(key):
if key in d:
@@ -383,37 +641,8 @@ def LoadInfoDict(input_file, repacking=False):
makeint("boot_size")
makeint("fstab_version")
- # We changed recovery.fstab path in Q, from ../RAMDISK/etc/recovery.fstab to
- # ../RAMDISK/system/etc/recovery.fstab. LoadInfoDict() has to handle both
- # cases, since it may load the info_dict from an old build (e.g. when
- # generating incremental OTAs from that build).
- system_root_image = d.get("system_root_image") == "true"
- if d.get("no_recovery") != "true":
- recovery_fstab_path = "RECOVERY/RAMDISK/system/etc/recovery.fstab"
- if isinstance(input_file, zipfile.ZipFile):
- if recovery_fstab_path not in input_file.namelist():
- recovery_fstab_path = "RECOVERY/RAMDISK/etc/recovery.fstab"
- else:
- path = os.path.join(input_file, *recovery_fstab_path.split("/"))
- if not os.path.exists(path):
- recovery_fstab_path = "RECOVERY/RAMDISK/etc/recovery.fstab"
- d["fstab"] = LoadRecoveryFSTab(
- read_helper, d["fstab_version"], recovery_fstab_path, system_root_image)
-
- elif d.get("recovery_as_boot") == "true":
- recovery_fstab_path = "BOOT/RAMDISK/system/etc/recovery.fstab"
- if isinstance(input_file, zipfile.ZipFile):
- if recovery_fstab_path not in input_file.namelist():
- recovery_fstab_path = "BOOT/RAMDISK/etc/recovery.fstab"
- else:
- path = os.path.join(input_file, *recovery_fstab_path.split("/"))
- if not os.path.exists(path):
- recovery_fstab_path = "BOOT/RAMDISK/etc/recovery.fstab"
- d["fstab"] = LoadRecoveryFSTab(
- read_helper, d["fstab_version"], recovery_fstab_path, system_root_image)
-
- else:
- d["fstab"] = None
+ # Load recovery fstab if applicable.
+ d["fstab"] = _FindAndLoadRecoveryFstab(d, input_file, read_helper)
# Tries to load the build props for all partitions with care_map, including
# system and vendor.
@@ -428,18 +657,14 @@ def LoadInfoDict(input_file, repacking=False):
read_helper, "{}/etc/build.prop".format(partition.upper()))
d["build.prop"] = d["system.build.prop"]
- # Set up the salt (based on fingerprint or thumbprint) that will be used when
- # adding AVB footer.
+ # Set up the salt (based on fingerprint) that will be used when adding AVB
+ # hash / hashtree footers.
if d.get("avb_enable") == "true":
- fp = None
- if "build.prop" in d:
- build_prop = d["build.prop"]
- if "ro.build.fingerprint" in build_prop:
- fp = build_prop["ro.build.fingerprint"]
- elif "ro.build.thumbprint" in build_prop:
- fp = build_prop["ro.build.thumbprint"]
- if fp:
- d["avb_salt"] = sha256(fp).hexdigest()
+ build_info = BuildInfo(d)
+ for partition in PARTITIONS_WITH_CARE_MAP:
+ fingerprint = build_info.GetPartitionFingerprint(partition)
+ if fingerprint:
+ d["avb_{}_salt".format(partition)] = sha256(fingerprint).hexdigest()
return d
@@ -453,6 +678,16 @@ def LoadBuildProp(read_helper, prop_file):
return LoadDictionaryFromLines(data.split("\n"))
+def LoadListFromFile(file_path):
+ with open(file_path) as f:
+ return f.read().splitlines()
+
+
+def LoadDictionaryFromFile(file_path):
+ lines = LoadListFromFile(file_path)
+ return LoadDictionaryFromLines(lines)
+
+
def LoadDictionaryFromLines(lines):
d = {}
for line in lines:
@@ -524,20 +759,108 @@ def LoadRecoveryFSTab(read_helper, fstab_version, recovery_fstab_path,
# system. Other areas assume system is always at "/system" so point /system
# at /.
if system_root_image:
- assert not d.has_key("/system") and d.has_key("/")
+ assert '/system' not in d and '/' in d
d["/system"] = d["/"]
return d
+def _FindAndLoadRecoveryFstab(info_dict, input_file, read_helper):
+ """Finds the path to recovery fstab and loads its contents."""
+ # recovery fstab is only meaningful when installing an update via recovery
+ # (i.e. non-A/B OTA). Skip loading fstab if device used A/B OTA.
+ if info_dict.get('ab_update') == 'true':
+ return None
+
+ # We changed recovery.fstab path in Q, from ../RAMDISK/etc/recovery.fstab to
+ # ../RAMDISK/system/etc/recovery.fstab. This function has to handle both
+ # cases, since it may load the info_dict from an old build (e.g. when
+ # generating incremental OTAs from that build).
+ system_root_image = info_dict.get('system_root_image') == 'true'
+ if info_dict.get('no_recovery') != 'true':
+ recovery_fstab_path = 'RECOVERY/RAMDISK/system/etc/recovery.fstab'
+ if isinstance(input_file, zipfile.ZipFile):
+ if recovery_fstab_path not in input_file.namelist():
+ recovery_fstab_path = 'RECOVERY/RAMDISK/etc/recovery.fstab'
+ else:
+ path = os.path.join(input_file, *recovery_fstab_path.split('/'))
+ if not os.path.exists(path):
+ recovery_fstab_path = 'RECOVERY/RAMDISK/etc/recovery.fstab'
+ return LoadRecoveryFSTab(
+ read_helper, info_dict['fstab_version'], recovery_fstab_path,
+ system_root_image)
+
+ if info_dict.get('recovery_as_boot') == 'true':
+ recovery_fstab_path = 'BOOT/RAMDISK/system/etc/recovery.fstab'
+ if isinstance(input_file, zipfile.ZipFile):
+ if recovery_fstab_path not in input_file.namelist():
+ recovery_fstab_path = 'BOOT/RAMDISK/etc/recovery.fstab'
+ else:
+ path = os.path.join(input_file, *recovery_fstab_path.split('/'))
+ if not os.path.exists(path):
+ recovery_fstab_path = 'BOOT/RAMDISK/etc/recovery.fstab'
+ return LoadRecoveryFSTab(
+ read_helper, info_dict['fstab_version'], recovery_fstab_path,
+ system_root_image)
+
+ return None
+
+
def DumpInfoDict(d):
for k, v in sorted(d.items()):
logger.info("%-25s = (%s) %s", k, type(v).__name__, v)
+def MergeDynamicPartitionInfoDicts(framework_dict, vendor_dict):
+ """Merges dynamic partition info variables.
+
+ Args:
+ framework_dict: The dictionary of dynamic partition info variables from the
+ partial framework target files.
+ vendor_dict: The dictionary of dynamic partition info variables from the
+ partial vendor target files.
+
+ Returns:
+ The merged dynamic partition info dictionary.
+ """
+ merged_dict = {}
+ # Partition groups and group sizes are defined by the vendor dict because
+ # these values may vary for each board that uses a shared system image.
+ merged_dict["super_partition_groups"] = vendor_dict["super_partition_groups"]
+ framework_dynamic_partition_list = framework_dict.get(
+ "dynamic_partition_list", "")
+ vendor_dynamic_partition_list = vendor_dict.get("dynamic_partition_list", "")
+ merged_dict["dynamic_partition_list"] = ("%s %s" % (
+ framework_dynamic_partition_list, vendor_dynamic_partition_list)).strip()
+ for partition_group in merged_dict["super_partition_groups"].split(" "):
+ # Set the partition group's size using the value from the vendor dict.
+ key = "super_%s_group_size" % partition_group
+ if key not in vendor_dict:
+ raise ValueError("Vendor dict does not contain required key %s." % key)
+ merged_dict[key] = vendor_dict[key]
+
+ # Set the partition group's partition list using a concatenation of the
+ # framework and vendor partition lists.
+ key = "super_%s_partition_list" % partition_group
+ merged_dict[key] = (
+ "%s %s" %
+ (framework_dict.get(key, ""), vendor_dict.get(key, ""))).strip()
+
+ # Pick virtual ab related flags from vendor dict, if defined.
+ if "virtual_ab" in vendor_dict.keys():
+ merged_dict["virtual_ab"] = vendor_dict["virtual_ab"]
+ if "virtual_ab_retrofit" in vendor_dict.keys():
+ merged_dict["virtual_ab_retrofit"] = vendor_dict["virtual_ab_retrofit"]
+ return merged_dict
+
+
def AppendAVBSigningArgs(cmd, partition):
"""Append signing arguments for avbtool."""
# e.g., "--key path/to/signing_key --algorithm SHA256_RSA4096"
key_path = OPTIONS.info_dict.get("avb_" + partition + "_key_path")
+ if key_path and not os.path.exists(key_path) and OPTIONS.search_path:
+ new_key_path = os.path.join(OPTIONS.search_path, key_path)
+ if os.path.exists(new_key_path):
+ key_path = new_key_path
algorithm = OPTIONS.info_dict.get("avb_" + partition + "_algorithm")
if key_path and algorithm:
cmd.extend(["--key", key_path, "--algorithm", algorithm])
@@ -547,6 +870,42 @@ def AppendAVBSigningArgs(cmd, partition):
cmd.extend(["--salt", avb_salt])
+def GetAvbPartitionArg(partition, image, info_dict=None):
+ """Returns the VBMeta arguments for partition.
+
+ It sets up the VBMeta argument by including the partition descriptor from the
+ given 'image', or by configuring the partition as a chained partition.
+
+ Args:
+ partition: The name of the partition (e.g. "system").
+ image: The path to the partition image.
+ info_dict: A dict returned by common.LoadInfoDict(). Will use
+ OPTIONS.info_dict if None has been given.
+
+ Returns:
+ A list of VBMeta arguments.
+ """
+ if info_dict is None:
+ info_dict = OPTIONS.info_dict
+
+ # Check if chain partition is used.
+ key_path = info_dict.get("avb_" + partition + "_key_path")
+ if not key_path:
+ return ["--include_descriptors_from_image", image]
+
+ # For a non-A/B device, we don't chain /recovery nor include its descriptor
+ # into vbmeta.img. The recovery image will be configured on an independent
+ # boot chain, to be verified with AVB_SLOT_VERIFY_FLAGS_NO_VBMETA_PARTITION.
+ # See details at
+ # https://android.googlesource.com/platform/external/avb/+/master/README.md#booting-into-recovery.
+ if info_dict.get("ab_update") != "true" and partition == "recovery":
+ return []
+
+ # Otherwise chain the partition into vbmeta.
+ chained_partition_arg = GetAvbChainedPartitionArg(partition, info_dict)
+ return ["--chain_partition", chained_partition_arg]
+
+
def GetAvbChainedPartitionArg(partition, info_dict, key=None):
"""Constructs and returns the arg to build or verify a chained partition.
@@ -563,12 +922,131 @@ def GetAvbChainedPartitionArg(partition, info_dict, key=None):
"""
if key is None:
key = info_dict["avb_" + partition + "_key_path"]
- pubkey_path = ExtractAvbPublicKey(key)
+ if key and not os.path.exists(key) and OPTIONS.search_path:
+ new_key_path = os.path.join(OPTIONS.search_path, key)
+ if os.path.exists(new_key_path):
+ key = new_key_path
+ pubkey_path = ExtractAvbPublicKey(info_dict["avb_avbtool"], key)
rollback_index_location = info_dict[
"avb_" + partition + "_rollback_index_location"]
return "{}:{}:{}".format(partition, rollback_index_location, pubkey_path)
+def AddAftlInclusionProof(output_image):
+ """Appends the aftl inclusion proof to the vbmeta image."""
+
+ # Ensure the other AFTL parameters are set as well.
+ assert OPTIONS.aftl_tool_path is not None, 'No aftl tool provided.'
+ assert OPTIONS.aftl_key_path is not None, 'No AFTL key provided.'
+ assert OPTIONS.aftl_manufacturer_key_path is not None, \
+ 'No AFTL manufacturer key provided.'
+
+ vbmeta_image = MakeTempFile()
+ os.rename(output_image, vbmeta_image)
+ build_info = BuildInfo(OPTIONS.info_dict)
+ version_incremental = build_info.GetBuildProp("ro.build.version.incremental")
+ aftltool = OPTIONS.aftl_tool_path
+ aftl_cmd = [aftltool, "make_icp_from_vbmeta",
+ "--vbmeta_image_path", vbmeta_image,
+ "--output", output_image,
+ "--version_incremental", version_incremental,
+ "--transparency_log_servers", OPTIONS.aftl_server,
+ "--transparency_log_pub_keys", OPTIONS.aftl_key_path,
+ "--manufacturer_key", OPTIONS.aftl_manufacturer_key_path,
+ "--algorithm", "SHA256_RSA4096",
+ "--padding", "4096"]
+ if OPTIONS.aftl_signer_helper:
+ aftl_cmd.extend(shlex.split(OPTIONS.aftl_signer_helper))
+ RunAndCheckOutput(aftl_cmd)
+
+ verify_cmd = ['aftltool', 'verify_image_icp', '--vbmeta_image_path',
+ output_image, '--transparency_log_pub_keys',
+ OPTIONS.aftl_key_path]
+ RunAndCheckOutput(verify_cmd)
+
+
+def BuildVBMeta(image_path, partitions, name, needed_partitions):
+ """Creates a VBMeta image.
+
+ It generates the requested VBMeta image. The requested image could be for
+ top-level or chained VBMeta image, which is determined based on the name.
+
+ Args:
+ image_path: The output path for the new VBMeta image.
+ partitions: A dict that's keyed by partition names with image paths as
+ values. Only valid partition names are accepted, as listed in
+ common.AVB_PARTITIONS.
+ name: Name of the VBMeta partition, e.g. 'vbmeta', 'vbmeta_system'.
+ needed_partitions: Partitions whose descriptors should be included into the
+ generated VBMeta image.
+
+ Raises:
+ AssertionError: On invalid input args.
+ """
+ avbtool = OPTIONS.info_dict["avb_avbtool"]
+ cmd = [avbtool, "make_vbmeta_image", "--output", image_path]
+ AppendAVBSigningArgs(cmd, name)
+
+ for partition, path in partitions.items():
+ if partition not in needed_partitions:
+ continue
+ assert (partition in AVB_PARTITIONS or
+ partition in AVB_VBMETA_PARTITIONS), \
+ 'Unknown partition: {}'.format(partition)
+ assert os.path.exists(path), \
+ 'Failed to find {} for {}'.format(path, partition)
+ cmd.extend(GetAvbPartitionArg(partition, path))
+
+ args = OPTIONS.info_dict.get("avb_{}_args".format(name))
+ if args and args.strip():
+ split_args = shlex.split(args)
+ for index, arg in enumerate(split_args[:-1]):
+ # Sanity check that the image file exists. Some images might be defined
+ # as a path relative to source tree, which may not be available at the
+ # same location when running this script (we have the input target_files
+ # zip only). For such cases, we additionally scan other locations (e.g.
+ # IMAGES/, RADIO/, etc) before bailing out.
+ if arg == '--include_descriptors_from_image':
+ chained_image = split_args[index + 1]
+ if os.path.exists(chained_image):
+ continue
+ found = False
+ for dir_name in ['IMAGES', 'RADIO', 'PREBUILT_IMAGES']:
+ alt_path = os.path.join(
+ OPTIONS.input_tmp, dir_name, os.path.basename(chained_image))
+ if os.path.exists(alt_path):
+ split_args[index + 1] = alt_path
+ found = True
+ break
+ assert found, 'Failed to find {}'.format(chained_image)
+ cmd.extend(split_args)
+
+ RunAndCheckOutput(cmd)
+
+ # Generate the AFTL inclusion proof.
+ if OPTIONS.aftl_server is not None:
+ AddAftlInclusionProof(image_path)
+
+
+def _MakeRamdisk(sourcedir, fs_config_file=None):
+ ramdisk_img = tempfile.NamedTemporaryFile()
+
+ if fs_config_file is not None and os.access(fs_config_file, os.F_OK):
+ cmd = ["mkbootfs", "-f", fs_config_file,
+ os.path.join(sourcedir, "RAMDISK")]
+ else:
+ cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
+ p1 = Run(cmd, stdout=subprocess.PIPE)
+ p2 = Run(["minigzip"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno())
+
+ p2.wait()
+ p1.wait()
+ assert p1.returncode == 0, "mkbootfs of %s ramdisk failed" % (sourcedir,)
+ assert p2.returncode == 0, "minigzip of %s ramdisk failed" % (sourcedir,)
+
+ return ramdisk_img
+
+
def _BuildBootableImage(sourcedir, fs_config_file, info_dict=None,
has_ramdisk=False, two_step_image=False):
"""Build a bootable image from the specified sourcedir.
@@ -582,24 +1060,6 @@ def _BuildBootableImage(sourcedir, fs_config_file, info_dict=None,
for building the requested image.
"""
- def make_ramdisk():
- ramdisk_img = tempfile.NamedTemporaryFile()
-
- if os.access(fs_config_file, os.F_OK):
- cmd = ["mkbootfs", "-f", fs_config_file,
- os.path.join(sourcedir, "RAMDISK")]
- else:
- cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
- p1 = Run(cmd, stdout=subprocess.PIPE)
- p2 = Run(["minigzip"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno())
-
- p2.wait()
- p1.wait()
- assert p1.returncode == 0, "mkbootfs of %s ramdisk failed" % (sourcedir,)
- assert p2.returncode == 0, "minigzip of %s ramdisk failed" % (sourcedir,)
-
- return ramdisk_img
-
if not os.access(os.path.join(sourcedir, "kernel"), os.F_OK):
return None
@@ -612,7 +1072,7 @@ def _BuildBootableImage(sourcedir, fs_config_file, info_dict=None,
img = tempfile.NamedTemporaryFile()
if has_ramdisk:
- ramdisk_img = make_ramdisk()
+ ramdisk_img = _MakeRamdisk(sourcedir, fs_config_file)
# use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
@@ -774,6 +1234,105 @@ def GetBootableImage(name, prebuilt_name, unpack_dir, tree_subdir,
return None
+def _BuildVendorBootImage(sourcedir, info_dict=None):
+ """Build a vendor boot image from the specified sourcedir.
+
+ Take a ramdisk, dtb, and vendor_cmdline from the input (in 'sourcedir'), and
+ turn them into a vendor boot image.
+
+ Return the image data, or None if sourcedir does not appear to contains files
+ for building the requested image.
+ """
+
+ if info_dict is None:
+ info_dict = OPTIONS.info_dict
+
+ img = tempfile.NamedTemporaryFile()
+
+ ramdisk_img = _MakeRamdisk(sourcedir)
+
+ # use MKBOOTIMG from environ, or "mkbootimg" if empty or not set
+ mkbootimg = os.getenv('MKBOOTIMG') or "mkbootimg"
+
+ cmd = [mkbootimg]
+
+ fn = os.path.join(sourcedir, "dtb")
+ if os.access(fn, os.F_OK):
+ cmd.append("--dtb")
+ cmd.append(fn)
+
+ fn = os.path.join(sourcedir, "vendor_cmdline")
+ if os.access(fn, os.F_OK):
+ cmd.append("--vendor_cmdline")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "base")
+ if os.access(fn, os.F_OK):
+ cmd.append("--base")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ fn = os.path.join(sourcedir, "pagesize")
+ if os.access(fn, os.F_OK):
+ cmd.append("--pagesize")
+ cmd.append(open(fn).read().rstrip("\n"))
+
+ args = info_dict.get("mkbootimg_args")
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ args = info_dict.get("mkbootimg_version_args")
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+
+ cmd.extend(["--vendor_ramdisk", ramdisk_img.name])
+ cmd.extend(["--vendor_boot", img.name])
+
+ RunAndCheckOutput(cmd)
+
+ # AVB: if enabled, calculate and add hash.
+ if info_dict.get("avb_enable") == "true":
+ avbtool = info_dict["avb_avbtool"]
+ part_size = info_dict["vendor_boot_size"]
+ cmd = [avbtool, "add_hash_footer", "--image", img.name,
+ "--partition_size", str(part_size), "--partition_name", "vendor_boot"]
+ AppendAVBSigningArgs(cmd, "vendor_boot")
+ args = info_dict.get("avb_vendor_boot_add_hash_footer_args")
+ if args and args.strip():
+ cmd.extend(shlex.split(args))
+ RunAndCheckOutput(cmd)
+
+ img.seek(os.SEEK_SET, 0)
+ data = img.read()
+
+ ramdisk_img.close()
+ img.close()
+
+ return data
+
+
+def GetVendorBootImage(name, prebuilt_name, unpack_dir, tree_subdir,
+ info_dict=None):
+ """Return a File object with the desired vendor boot image.
+
+ Look for it under 'unpack_dir'/IMAGES, otherwise construct it from
+ the source files in 'unpack_dir'/'tree_subdir'."""
+
+ prebuilt_path = os.path.join(unpack_dir, "IMAGES", prebuilt_name)
+ if os.path.exists(prebuilt_path):
+ logger.info("using prebuilt %s from IMAGES...", prebuilt_name)
+ return File.FromLocalFile(name, prebuilt_path)
+
+ logger.info("building image from target_files %s...", tree_subdir)
+
+ if info_dict is None:
+ info_dict = OPTIONS.info_dict
+
+ data = _BuildVendorBootImage(os.path.join(unpack_dir, tree_subdir), info_dict)
+ if data:
+ return File(name, data)
+ return None
+
+
def Gunzip(in_filename, out_filename):
"""Gunzips the given gzip compressed file to a given output file."""
with gzip.open(in_filename, "rb") as in_file, \
@@ -861,7 +1420,7 @@ def GetUserImage(which, tmpdir, input_zip,
A Image object. If it is a sparse image and reset_file_map is False, the
image will have file_map info loaded.
"""
- if info_dict == None:
+ if info_dict is None:
info_dict = LoadInfoDict(input_zip)
is_sparse = info_dict.get("extfs_sparse_flag")
@@ -901,8 +1460,8 @@ def GetNonSparseImage(which, tmpdir, hashtree_info_generator=None):
# ota_from_target_files.py (since LMP).
assert os.path.exists(path) and os.path.exists(mappath)
- return blockimgdiff.FileImage(path, hashtree_info_generator=
- hashtree_info_generator)
+ return images.FileImage(path, hashtree_info_generator=hashtree_info_generator)
+
def GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks,
hashtree_info_generator=None):
@@ -951,7 +1510,7 @@ def GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks,
# filename listed in system.map may contain an additional leading slash
# (i.e. "//system/framework/am.jar"). Using lstrip to get consistent
# results.
- arcname = string.replace(entry, which, which.upper(), 1).lstrip('/')
+ arcname = entry.replace(which, which.upper(), 1).lstrip('/')
# Special handling another case, where files not under /system
# (e.g. "/sbin/charger") are packed under ROOT/ in a target_files.zip.
@@ -1028,7 +1587,7 @@ def GetKeyPasswords(keylist):
def GetMinSdkVersion(apk_name):
"""Gets the minSdkVersion declared in the APK.
- It calls 'aapt' to query the embedded minSdkVersion from the given APK file.
+ It calls 'aapt2' to query the embedded minSdkVersion from the given APK file.
This can be both a decimal number (API Level) or a codename.
Args:
@@ -1041,12 +1600,12 @@ def GetMinSdkVersion(apk_name):
ExternalError: On failing to obtain the min SDK version.
"""
proc = Run(
- ["aapt", "dump", "badging", apk_name], stdout=subprocess.PIPE,
+ ["aapt2", "dump", "badging", apk_name], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdoutdata, stderrdata = proc.communicate()
if proc.returncode != 0:
raise ExternalError(
- "Failed to obtain minSdkVersion: aapt return code {}:\n{}\n{}".format(
+ "Failed to obtain minSdkVersion: aapt2 return code {}:\n{}\n{}".format(
proc.returncode, stdoutdata, stderrdata))
for line in stdoutdata.split("\n"):
@@ -1054,7 +1613,7 @@ def GetMinSdkVersion(apk_name):
m = re.match(r'sdkVersion:\'([^\']*)\'', line)
if m:
return m.group(1)
- raise ExternalError("No minSdkVersion returned by aapt")
+ raise ExternalError("No minSdkVersion returned by aapt2")
def GetMinSdkVersionInt(apk_name, codename_to_api_level_map):
@@ -1221,13 +1780,14 @@ def ReadApkCerts(tf_zip):
if basename:
installed_files.add(basename)
- for line in tf_zip.read("META/apkcerts.txt").split("\n"):
+ for line in tf_zip.read('META/apkcerts.txt').decode().split('\n'):
line = line.strip()
if not line:
continue
m = re.match(
r'^name="(?P<NAME>.*)"\s+certificate="(?P<CERT>.*)"\s+'
- r'private_key="(?P<PRIVKEY>.*?)"(\s+compressed="(?P<COMPRESSED>.*)")?$',
+ r'private_key="(?P<PRIVKEY>.*?)"(\s+compressed="(?P<COMPRESSED>.*?)")?'
+ r'(\s+partition="(?P<PARTITION>.*?)")?$',
line)
if not m:
continue
@@ -1291,6 +1851,9 @@ Global options
-h (--help)
Display this usage message and exit.
+
+ --logfile <file>
+ Put verbose logs to specified file (regardless of --verbose option.)
"""
def Usage(docstring):
@@ -1313,11 +1876,12 @@ def ParseOptions(argv,
argv, "hvp:s:x:" + extra_opts,
["help", "verbose", "path=", "signapk_path=",
"signapk_shared_library_path=", "extra_signapk_args=",
- "java_path=", "java_args=", "public_key_suffix=",
+ "java_path=", "java_args=", "android_jar_path=", "public_key_suffix=",
"private_key_suffix=", "boot_signer_path=", "boot_signer_args=",
"verity_signer_path=", "verity_signer_args=", "device_specific=",
- "extra="] +
- list(extra_long_opts))
+ "extra=", "logfile=", "aftl_tool_path=", "aftl_server=",
+ "aftl_key_path=", "aftl_manufacturer_key_path=",
+ "aftl_signer_helper="] + list(extra_long_opts))
except getopt.GetoptError as err:
Usage(docstring)
print("**", str(err), "**")
@@ -1341,6 +1905,8 @@ def ParseOptions(argv,
OPTIONS.java_path = a
elif o in ("--java_args",):
OPTIONS.java_args = shlex.split(a)
+ elif o in ("--android_jar_path",):
+ OPTIONS.android_jar_path = a
elif o in ("--public_key_suffix",):
OPTIONS.public_key_suffix = a
elif o in ("--private_key_suffix",):
@@ -1353,11 +1919,23 @@ def ParseOptions(argv,
OPTIONS.verity_signer_path = a
elif o in ("--verity_signer_args",):
OPTIONS.verity_signer_args = shlex.split(a)
+ elif o in ("--aftl_tool_path",):
+ OPTIONS.aftl_tool_path = a
+ elif o in ("--aftl_server",):
+ OPTIONS.aftl_server = a
+ elif o in ("--aftl_key_path",):
+ OPTIONS.aftl_key_path = a
+ elif o in ("--aftl_manufacturer_key_path",):
+ OPTIONS.aftl_manufacturer_key_path = a
+ elif o in ("--aftl_signer_helper",):
+ OPTIONS.aftl_signer_helper = a
elif o in ("-s", "--device_specific"):
OPTIONS.device_specific = a
elif o in ("-x", "--extra"):
key, value = a.split("=", 1)
OPTIONS.extras[key] = value
+ elif o in ("--logfile",):
+ OPTIONS.logfile = a
else:
if extra_option_handler is None or not extra_option_handler(o, a):
assert False, "unknown option \"%s\"" % (o,)
@@ -1431,6 +2009,8 @@ class PasswordManager(object):
if not first:
print("key file %s still missing some passwords." % (self.pwfile,))
+ if sys.version_info[0] >= 3:
+ raw_input = input # pylint: disable=redefined-builtin
answer = raw_input("try to edit again? [y]> ").strip()
if answer and answer[0] not in 'yY':
raise RuntimeError("key passwords unavailable")
@@ -1444,7 +2024,7 @@ class PasswordManager(object):
values.
"""
result = {}
- for k, v in sorted(current.iteritems()):
+ for k, v in sorted(current.items()):
if v:
result[k] = v
else:
@@ -1465,7 +2045,7 @@ class PasswordManager(object):
f.write("# (Additional spaces are harmless.)\n\n")
first_line = None
- sorted_list = sorted([(not v, k, v) for (k, v) in current.iteritems()])
+ sorted_list = sorted([(not v, k, v) for (k, v) in current.items()])
for i, (_, k, v) in enumerate(sorted_list):
f.write("[[[ %s ]]] %s\n" % (v, k))
if not v and first_line is None:
@@ -1566,6 +2146,15 @@ def ZipWriteStr(zip_file, zinfo_or_arcname, data, perms=None,
perms = 0o100644
else:
zinfo = zinfo_or_arcname
+ # Python 2 and 3 behave differently when calling ZipFile.writestr() with
+ # zinfo.external_attr being 0. Python 3 uses `0o600 << 16` as the value for
+ # such a case (since
+ # https://github.com/python/cpython/commit/18ee29d0b870caddc0806916ca2c823254f1a1f9),
+ # which seems to make more sense. Otherwise the entry will have 0o000 as the
+ # permission bits. We follow the logic in Python 3 to get consistent
+ # behavior between using the two versions.
+ if not zinfo.external_attr:
+ zinfo.external_attr = 0o600 << 16
# If compress_type is given, it overrides the value in zinfo.
if compress_type is not None:
@@ -1598,7 +2187,7 @@ def ZipDelete(zip_filename, entries):
Raises:
AssertionError: In case of non-zero return from 'zip'.
"""
- if isinstance(entries, basestring):
+ if isinstance(entries, str):
entries = [entries]
cmd = ["zip", "-d", zip_filename] + entries
RunAndCheckOutput(cmd)
@@ -1622,7 +2211,7 @@ class DeviceSpecificParams(object):
"""Keyword arguments to the constructor become attributes of this
object, which is passed to all functions in the device-specific
module."""
- for k, v in kwargs.iteritems():
+ for k, v in kwargs.items():
setattr(self, k, v)
self.extras = OPTIONS.extras
@@ -1891,9 +2480,9 @@ class BlockDifference(object):
assert version >= 3
self.version = version
- b = blockimgdiff.BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads,
- version=self.version,
- disable_imgdiff=self.disable_imgdiff)
+ b = BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads,
+ version=self.version,
+ disable_imgdiff=self.disable_imgdiff)
self.path = os.path.join(MakeTempDir(), partition)
b.Compute(self.path)
self._required_cache = b.max_stashed_size
@@ -2147,8 +2736,10 @@ class BlockDifference(object):
return ctx.hexdigest()
-DataImage = blockimgdiff.DataImage
-EmptyImage = blockimgdiff.EmptyImage
+# Expose these two classes to support vendor-specific scripts
+DataImage = images.DataImage
+EmptyImage = images.EmptyImage
+
# map recovery.fstab's fs_types to mount/format "partition types"
PARTITION_TYPES = {
@@ -2174,7 +2765,7 @@ def ParseCertificate(data):
This gives the same result as `openssl x509 -in <filename> -outform DER`.
Returns:
- The decoded certificate string.
+ The decoded certificate bytes.
"""
cert_buffer = []
save = False
@@ -2185,7 +2776,7 @@ def ParseCertificate(data):
cert_buffer.append(line)
if "--BEGIN CERTIFICATE--" in line:
save = True
- cert = "".join(cert_buffer).decode('base64')
+ cert = base64.b64decode("".join(cert_buffer))
return cert
@@ -2213,10 +2804,11 @@ def ExtractPublicKey(cert):
return pubkey
-def ExtractAvbPublicKey(key):
+def ExtractAvbPublicKey(avbtool, key):
"""Extracts the AVB public key from the given public or private key.
Args:
+ avbtool: The AVB tool to use.
key: The input key file, which should be PEM-encoded public or private key.
Returns:
@@ -2224,7 +2816,7 @@ def ExtractAvbPublicKey(key):
"""
output = MakeTempFile(prefix='avb-', suffix='.avbpubkey')
RunAndCheckOutput(
- ['avbtool', 'extract_public_key', "--key", key, "--output", output])
+ [avbtool, 'extract_public_key', "--key", key, "--output", output])
return output
@@ -2249,13 +2841,25 @@ def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
info_dict = OPTIONS.info_dict
full_recovery_image = info_dict.get("full_recovery_image") == "true"
+ board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true"
+
+ if board_uses_vendorimage:
+ # In this case, the output sink is rooted at VENDOR
+ recovery_img_path = "etc/recovery.img"
+ recovery_resource_dat_path = "VENDOR/etc/recovery-resource.dat"
+ sh_dir = "bin"
+ else:
+ # In this case the output sink is rooted at SYSTEM
+ recovery_img_path = "vendor/etc/recovery.img"
+ recovery_resource_dat_path = "SYSTEM/vendor/etc/recovery-resource.dat"
+ sh_dir = "vendor/bin"
if full_recovery_image:
- output_sink("etc/recovery.img", recovery_img.data)
+ output_sink(recovery_img_path, recovery_img.data)
else:
system_root_image = info_dict.get("system_root_image") == "true"
- path = os.path.join(input_dir, "SYSTEM", "etc", "recovery-resource.dat")
+ path = os.path.join(input_dir, recovery_resource_dat_path)
# With system-root-image, boot and recovery images will have mismatching
# entries (only recovery has the ramdisk entry) (Bug: 72731506). Use bsdiff
# to handle such a case.
@@ -2268,7 +2872,7 @@ def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
if os.path.exists(path):
diff_program.append("-b")
diff_program.append(path)
- bonus_args = "--bonus /system/etc/recovery-resource.dat"
+ bonus_args = "--bonus /vendor/etc/recovery-resource.dat"
else:
bonus_args = ""
@@ -2285,10 +2889,16 @@ def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
return
if full_recovery_image:
- sh = """#!/system/bin/sh
+
+ # Note that we use /vendor to refer to the recovery resources. This will
+ # work for a separate vendor partition mounted at /vendor or a
+ # /system/vendor subdirectory on the system partition, for which init will
+ # create a symlink from /vendor to /system/vendor.
+
+ sh = """#!/vendor/bin/sh
if ! applypatch --check %(type)s:%(device)s:%(size)d:%(sha1)s; then
applypatch \\
- --flash /system/etc/recovery.img \\
+ --flash /vendor/etc/recovery.img \\
--target %(type)s:%(device)s:%(size)d:%(sha1)s && \\
log -t recovery "Installing new recovery image: succeeded" || \\
log -t recovery "Installing new recovery image: failed"
@@ -2300,10 +2910,10 @@ fi
'sha1': recovery_img.sha1,
'size': recovery_img.size}
else:
- sh = """#!/system/bin/sh
+ sh = """#!/vendor/bin/sh
if ! applypatch --check %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s; then
applypatch %(bonus_args)s \\
- --patch /system/recovery-from-boot.p \\
+ --patch /vendor/recovery-from-boot.p \\
--source %(boot_type)s:%(boot_device)s:%(boot_size)d:%(boot_sha1)s \\
--target %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s && \\
log -t recovery "Installing new recovery image: succeeded" || \\
@@ -2321,13 +2931,13 @@ fi
'recovery_device': recovery_device,
'bonus_args': bonus_args}
- # The install script location moved from /system/etc to /system/bin
- # in the L release.
- sh_location = "bin/install-recovery.sh"
+ # The install script location moved from /system/etc to /system/bin in the L
+ # release. In the R release it is in VENDOR/bin or SYSTEM/vendor/bin.
+ sh_location = os.path.join(sh_dir, "install-recovery.sh")
logger.info("putting script in %s", sh_location)
- output_sink(sh_location, sh)
+ output_sink(sh_location, sh.encode())
class DynamicPartitionUpdate(object):
@@ -2368,14 +2978,16 @@ class DynamicPartitionsDifference(object):
def __init__(self, info_dict, block_diffs, progress_dict=None,
source_info_dict=None):
if progress_dict is None:
- progress_dict = dict()
+ progress_dict = {}
self._remove_all_before_apply = False
if source_info_dict is None:
self._remove_all_before_apply = True
- source_info_dict = dict()
+ source_info_dict = {}
+
+ block_diff_dict = collections.OrderedDict(
+ [(e.partition, e) for e in block_diffs])
- block_diff_dict = {e.partition:e for e in block_diffs}
assert len(block_diff_dict) == len(block_diffs), \
"Duplicated BlockDifference object for {}".format(
[partition for partition, count in