1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
|
#!/usr/bin/env python
#
# Copyright (C) 2022 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
"""Common utility functions shared by merge_* scripts.
Expects items in OPTIONS prepared by merge_target_files.py.
"""
import fnmatch
import logging
import os
import re
import shutil
import zipfile
import common
logger = logging.getLogger(__name__)
OPTIONS = common.OPTIONS
def ExtractItems(input_zip, output_dir, extract_item_list):
"""Extracts items in extract_item_list from a zip to a dir."""
# Filter the extract_item_list to remove any items that do not exist in the
# zip file. Otherwise, the extraction step will fail.
with zipfile.ZipFile(input_zip, allowZip64=True) as input_zipfile:
input_namelist = input_zipfile.namelist()
filtered_extract_item_list = []
for pattern in extract_item_list:
if fnmatch.filter(input_namelist, pattern):
filtered_extract_item_list.append(pattern)
common.UnzipToDir(input_zip, output_dir, filtered_extract_item_list)
def CopyItems(from_dir, to_dir, copy_item_list):
"""Copies the items in copy_item_list from source to destination directory.
copy_item_list may include files and directories. Will copy the matched
files and create the matched directories.
Args:
from_dir: The source directory.
to_dir: The destination directory.
copy_item_list: Items to be copied.
"""
item_paths = []
for root, dirs, files in os.walk(from_dir):
item_paths.extend(
os.path.relpath(path=os.path.join(root, item_name), start=from_dir)
for item_name in files + dirs)
filtered = set()
for pattern in copy_item_list:
filtered.update(fnmatch.filter(item_paths, pattern))
for item in filtered:
original_path = os.path.join(from_dir, item)
copied_path = os.path.join(to_dir, item)
copied_parent_path = os.path.dirname(copied_path)
if not os.path.exists(copied_parent_path):
os.makedirs(copied_parent_path)
if os.path.islink(original_path):
os.symlink(os.readlink(original_path), copied_path)
elif os.path.isdir(original_path):
if not os.path.exists(copied_path):
os.makedirs(copied_path)
else:
shutil.copyfile(original_path, copied_path)
def GetTargetFilesItems(target_files_zipfile_or_dir):
"""Gets a list of target files items."""
if zipfile.is_zipfile(target_files_zipfile_or_dir):
with zipfile.ZipFile(target_files_zipfile_or_dir, allowZip64=True) as fz:
return fz.namelist()
elif os.path.isdir(target_files_zipfile_or_dir):
item_list = []
for root, dirs, files in os.walk(target_files_zipfile_or_dir):
item_list.extend(
os.path.relpath(path=os.path.join(root, item),
start=target_files_zipfile_or_dir)
for item in dirs + files)
return item_list
else:
raise ValueError('Target files should be either zipfile or directory.')
def CollectTargetFiles(input_zipfile_or_dir, output_dir, item_list=None):
"""Extracts input zipfile or copy input directory to output directory.
Extracts the input zipfile if `input_zipfile_or_dir` is a zip archive, or
copies the items if `input_zipfile_or_dir` is a directory.
Args:
input_zipfile_or_dir: The input target files, could be either a zipfile to
extract or a directory to copy.
output_dir: The output directory that the input files are either extracted
or copied.
item_list: Files to be extracted or copied. Will extract or copy all files
if omitted.
"""
patterns = item_list if item_list else ('*',)
if zipfile.is_zipfile(input_zipfile_or_dir):
ExtractItems(input_zipfile_or_dir, output_dir, patterns)
elif os.path.isdir(input_zipfile_or_dir):
CopyItems(input_zipfile_or_dir, output_dir, patterns)
else:
raise ValueError('Target files should be either zipfile or directory.')
def WriteSortedData(data, path):
"""Writes the sorted contents of either a list or dict to file.
This function sorts the contents of the list or dict and then writes the
resulting sorted contents to a file specified by path.
Args:
data: The list or dict to sort and write.
path: Path to the file to write the sorted values to. The file at path will
be overridden if it exists.
"""
with open(path, 'w') as output:
for entry in sorted(data):
out_str = '{}={}\n'.format(entry, data[entry]) if isinstance(
data, dict) else '{}\n'.format(entry)
output.write(out_str)
def ValidateConfigLists():
"""Performs validations on the merge config lists.
Returns:
False if a validation fails, otherwise true.
"""
has_error = False
# Check that partitions only come from one input.
framework_partitions = ItemListToPartitionSet(OPTIONS.framework_item_list)
vendor_partitions = ItemListToPartitionSet(OPTIONS.vendor_item_list)
from_both = framework_partitions.intersection(vendor_partitions)
if from_both:
logger.error(
'Cannot extract items from the same partition in both the '
'framework and vendor builds. Please ensure only one merge config '
'item list (or inferred list) includes each partition: %s' %
','.join(from_both))
has_error = True
if any([
key in OPTIONS.framework_misc_info_keys
for key in ('dynamic_partition_list', 'super_partition_groups')
]):
logger.error('Dynamic partition misc info keys should come from '
'the vendor instance of META/misc_info.txt.')
has_error = True
return not has_error
# In an item list (framework or vendor), we may see entries that select whole
# partitions. Such an entry might look like this 'SYSTEM/*' (e.g., for the
# system partition). The following regex matches this and extracts the
# partition name.
_PARTITION_ITEM_PATTERN = re.compile(r'^([A-Z_]+)/.*$')
_IMAGE_PARTITION_PATTERN = re.compile(r'^IMAGES/(.*)\.img$')
def ItemListToPartitionSet(item_list):
"""Converts a target files item list to a partition set.
The item list contains items that might look like 'SYSTEM/*' or 'VENDOR/*' or
'OTA/android-info.txt'. Items that end in '/*' are assumed to match entire
directories where 'SYSTEM' or 'VENDOR' is a directory name that identifies the
contents of a partition of the same name. Other items in the list, such as the
'OTA' example contain metadata. This function iterates such a list, returning
a set that contains the partition entries.
Args:
item_list: A list of items in a target files package.
Returns:
A set of partitions extracted from the list of items.
"""
partition_set = set()
for item in item_list:
for pattern in (_PARTITION_ITEM_PATTERN, _IMAGE_PARTITION_PATTERN):
partition_match = pattern.search(item.strip())
if partition_match:
partition = partition_match.group(1).lower()
# These directories in target-files are not actual partitions.
if partition not in ('meta', 'images'):
partition_set.add(partition)
return partition_set
# Partitions that are grabbed from the framework partial build by default.
_FRAMEWORK_PARTITIONS = {
'system', 'product', 'system_ext', 'system_other', 'root', 'system_dlkm',
'vbmeta_system'
}
def InferItemList(input_namelist, framework):
item_set = set()
# Some META items are always grabbed from partial builds directly.
# Others are combined in merge_meta.py.
if framework:
item_set.update([
'META/liblz4.so',
'META/postinstall_config.txt',
'META/zucchini_config.txt',
])
else: # vendor
item_set.update([
'META/kernel_configs.txt',
'META/kernel_version.txt',
'META/otakeys.txt',
'META/pack_radioimages.txt',
'META/releasetools.py',
])
# Grab a set of items for the expected partitions in the partial build.
seen_partitions = []
for namelist in input_namelist:
if namelist.endswith('/'):
continue
partition = namelist.split('/')[0].lower()
# META items are grabbed above, or merged later.
if partition == 'meta':
continue
if partition == 'images':
image_partition, extension = os.path.splitext(os.path.basename(namelist))
if image_partition == 'vbmeta':
# Always regenerate vbmeta.img since it depends on hash information
# from both builds.
continue
if extension in ('.img', '.map'):
# Include image files in IMAGES/* if the partition comes from
# the expected set.
if (framework and image_partition in _FRAMEWORK_PARTITIONS) or (
not framework and image_partition not in _FRAMEWORK_PARTITIONS):
item_set.add(namelist)
elif not framework:
# Include all miscellaneous non-image files in IMAGES/* from
# the vendor build.
item_set.add(namelist)
continue
# Skip already-visited partitions.
if partition in seen_partitions:
continue
seen_partitions.append(partition)
if (framework and partition in _FRAMEWORK_PARTITIONS) or (
not framework and partition not in _FRAMEWORK_PARTITIONS):
fs_config_prefix = '' if partition == 'system' else '%s_' % partition
item_set.update([
'%s/*' % partition.upper(),
'META/%sfilesystem_config.txt' % fs_config_prefix,
])
return sorted(item_set)
def InferFrameworkMiscInfoKeys(input_namelist):
keys = [
'ab_update',
'avb_vbmeta_system',
'avb_vbmeta_system_algorithm',
'avb_vbmeta_system_key_path',
'avb_vbmeta_system_rollback_index_location',
'default_system_dev_certificate',
]
for partition in _FRAMEWORK_PARTITIONS:
for partition_dir in ('%s/' % partition.upper(), 'SYSTEM/%s/' % partition):
if partition_dir in input_namelist:
fs_type_prefix = '' if partition == 'system' else '%s_' % partition
keys.extend([
'avb_%s_hashtree_enable' % partition,
'avb_%s_add_hashtree_footer_args' % partition,
'%s_disable_sparse' % partition,
'building_%s_image' % partition,
'%sfs_type' % fs_type_prefix,
])
return sorted(keys)
|