blob: 1411d938136bdec2fbacc686909859966f16f73d [file] [log] [blame]
You-Cheng Syud5692942018-01-04 14:40:59 +08001#!/usr/bin/env python
Hung-Te Linc772e1a2017-04-14 16:50:50 +08002# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""An utility to manipulate GPT on a disk image.
7
8Chromium OS factory software usually needs to access partitions from disk
9images. However, there is no good, lightweight, and portable GPT utility.
10Most Chromium OS systems use `cgpt`, but that's not by default installed on
11Ubuntu. Most systems have parted (GNU) or partx (util-linux-ng) but they have
12their own problems.
13
14For example, when a disk image is resized (usually enlarged for putting more
15resources on stateful partition), GPT table must be updated. However,
16 - `parted` can't repair partition without interactive console in exception
17 handler.
18 - `partx` cannot fix headers nor make changes to partition table.
19 - `cgpt repair` does not fix `LastUsableLBA` so we cannot enlarge partition.
20 - `gdisk` is not installed on most systems.
21
22As a result, we need a dedicated tool to help processing GPT.
23
24This pygpt.py provides a simple and customized implementation for processing
25GPT, as a replacement for `cgpt`.
26"""
27
28
29from __future__ import print_function
30
31import argparse
32import binascii
33import collections
34import logging
35import os
36import struct
Hung-Te Linf641d302018-04-18 15:09:35 +080037import subprocess
38import sys
Hung-Te Linc772e1a2017-04-14 16:50:50 +080039import uuid
40
41
42# The binascii.crc32 returns signed integer, so CRC32 in in struct must be
43# declared as 'signed' (l) instead of 'unsigned' (L).
44# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_table_header_.28LBA_1.29
Hung-Te Lin49ac3c22018-04-17 14:37:54 +080045HEADER_DESCRIPTION = """
Hung-Te Linc772e1a2017-04-14 16:50:50 +080046 8s Signature
47 4s Revision
48 L HeaderSize
49 l CRC32
50 4s Reserved
51 Q CurrentLBA
52 Q BackupLBA
53 Q FirstUsableLBA
54 Q LastUsableLBA
55 16s DiskGUID
56 Q PartitionEntriesStartingLBA
57 L PartitionEntriesNumber
58 L PartitionEntrySize
59 l PartitionArrayCRC32
60"""
61
62# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_entries
Hung-Te Lin49ac3c22018-04-17 14:37:54 +080063PARTITION_DESCRIPTION = """
Hung-Te Linc772e1a2017-04-14 16:50:50 +080064 16s TypeGUID
65 16s UniqueGUID
66 Q FirstLBA
67 Q LastLBA
68 Q Attributes
69 72s Names
70"""
71
Hung-Te Linc6e009c2018-04-17 15:06:16 +080072# The PMBR has so many variants. The basic format is defined in
73# https://en.wikipedia.org/wiki/Master_boot_record#Sector_layout, and our
74# implementation, as derived from `cgpt`, is following syslinux as:
75# https://chromium.googlesource.com/chromiumos/platform/vboot_reference/+/master/cgpt/cgpt.h#32
76PMBR_DESCRIPTION = """
77 424s BootCode
78 16s BootGUID
79 L DiskID
80 2s Magic
81 16s LegacyPart0
82 16s LegacyPart1
83 16s LegacyPart2
84 16s LegacyPart3
85 2s Signature
86"""
Hung-Te Linc772e1a2017-04-14 16:50:50 +080087
Hung-Te Lin49ac3c22018-04-17 14:37:54 +080088def BitProperty(getter, setter, shift, mask):
89 """A generator for bit-field properties.
90
91 This is used inside a class to manipulate an integer-like variable using
92 properties. The getter and setter should be member functions to change the
93 underlying member data.
Hung-Te Linc772e1a2017-04-14 16:50:50 +080094
95 Args:
Hung-Te Lin49ac3c22018-04-17 14:37:54 +080096 getter: a function to read integer type variable (for all the bits).
97 setter: a function to set the new changed integer type variable.
98 shift: integer for how many bits should be shifted (right).
99 mask: integer for the mask to filter out bit field.
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800100 """
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800101 def _getter(self):
102 return (getter(self) >> shift) & mask
103 def _setter(self, value):
104 assert value & mask == value, (
105 'Value %s out of range (mask=%s)' % (value, mask))
106 setter(self, getter(self) & ~(mask << shift) | value << shift)
107 return property(_getter, _setter)
108
109
110class GPTBlob(object):
111 """A decorator class to help accessing GPT blobs as named tuple.
112
113 To use this, specify the blob description (struct format and named tuple field
114 names) above the derived class, for example:
115
116 @GPTBlob(description):
117 class Header(GPTObject):
118 pass
119 """
120 def __init__(self, description):
121 spec = description.split()
122 self.struct_format = '<' + ''.join(spec[::2])
123 self.fields = spec[1::2]
124
125 def __call__(self, cls):
126 new_bases = ((
127 collections.namedtuple(cls.__name__, self.fields),) + cls.__bases__)
128 new_cls = type(cls.__name__, new_bases, dict(cls.__dict__))
129 setattr(new_cls, 'FORMAT', self.struct_format)
130 return new_cls
131
132
133class GPTObject(object):
134 """An object in GUID Partition Table.
135
136 This needs to be decorated by @GPTBlob(description) and inherited by a real
137 class. Properties (not member functions) in CamelCase should be reserved for
138 named tuple attributes.
139
140 To create a new object, use class method ReadFrom(), which takes a stream
141 as input or None to create with all elements set to zero. To make changes to
142 named tuple elements, use member function Clone(changes).
143
144 It is also possible to attach some additional properties to the object as meta
145 data (for example path of the underlying image file). To do that, specify the
146 data as keyword arguments when calling ReadFrom(). These properties will be
147 preserved when you call Clone().
148
149 A special case is "reset named tuple elements of an object but keeping all
150 properties", for example changing a partition object to unused (zeroed).
151 ReadFrom() is a class method so properties won't be copied. You need to
152 call as cls.ReadFrom(None, **p.__dict__), or a short cut - p.CloneAndZero().
153 """
154
155 FORMAT = None
156 """The struct.{pack,unpack} format string, and should be set by GPTBlob."""
157
158 CLONE_CONVERTERS = None
159 """A dict (name, cvt) to convert input arguments into named tuple data.
160
161 `name` is a string for the name of argument to convert.
162 `cvt` is a callable to convert value. The return value may be:
163 - a tuple in (new_name, value): save the value as new name.
164 - otherwise, save the value in original name.
165 Note tuple is an invalid input for struct.unpack so it's used for the
166 special value.
167 """
168
169 @classmethod
170 def ReadFrom(cls, f, **kargs):
171 """Reads and decode an object from stream.
172
173 Args:
174 f: a stream to read blob, or None to decode with all zero bytes.
175 kargs: a dict for additional attributes in object.
176 """
177 if f is None:
178 reader = lambda num: '\x00' * num
179 else:
180 reader = f.read
181 data = cls(*struct.unpack(cls.FORMAT, reader(struct.calcsize(cls.FORMAT))))
182 # Named tuples do not accept kargs in constructor.
183 data.__dict__.update(kargs)
184 return data
185
186 def Clone(self, **dargs):
187 """Clones a new instance with modifications.
188
189 GPT objects are usually named tuples that are immutable, so the only way
190 to make changes is to create a new instance with modifications.
191
192 Args:
193 dargs: a dict with all modifications.
194 """
195 for name, convert in (self.CLONE_CONVERTERS or {}).iteritems():
196 if name not in dargs:
197 continue
198 result = convert(dargs.pop(name))
199 if isinstance(result, tuple):
200 assert len(result) == 2, 'Converted tuple must be (name, value).'
201 dargs[result[0]] = result[1]
202 else:
203 dargs[name] = result
204
205 cloned = self._replace(**dargs)
206 cloned.__dict__.update(self.__dict__)
207 return cloned
208
209 def CloneAndZero(self, **dargs):
210 """Short cut to create a zeroed object while keeping all properties.
211
212 This is very similar to Clone except all named tuple elements will be zero.
213 Also different from class method ReadFrom(None) because this keeps all
214 properties from one object.
215 """
216 cloned = self.ReadFrom(None, **self.__dict__)
217 return cloned.Clone(**dargs) if dargs else cloned
218
219 @property
220 def blob(self):
221 """Returns the object in formatted bytes."""
222 return struct.pack(self.FORMAT, *self)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800223
224
Hung-Te Lin4dfd3302018-04-17 14:47:52 +0800225class GPTError(Exception):
226 """All exceptions by GPT."""
227 pass
228
229
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800230class GPT(object):
231 """A GPT helper class.
232
233 To load GPT from an existing disk image file, use `LoadFromFile`.
234 After modifications were made, use `WriteToFile` to commit changes.
235
236 Attributes:
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800237 header: a namedtuple of GPT header.
Hung-Te Linc6e009c2018-04-17 15:06:16 +0800238 pmbr: a namedtuple of Protective MBR.
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800239 partitions: a list of GPT partition entry nametuple.
240 block_size: integer for size of bytes in one block (sector).
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800241 is_secondary: boolean to indicate if the header is from primary or backup.
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800242 """
243
Hung-Te Linf148d322018-04-13 10:24:42 +0800244 DEFAULT_BLOCK_SIZE = 512
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800245 TYPE_GUID_UNUSED = '\x00' * 16
246 TYPE_GUID_MAP = {
247 '00000000-0000-0000-0000-000000000000': 'Unused',
248 'EBD0A0A2-B9E5-4433-87C0-68B6B72699C7': 'Linux data',
Hung-Te Linfe724f82018-04-18 15:03:58 +0800249 'FE3A2A5D-4F32-41A7-B725-ACCC3285A309': 'ChromeOS kernel',
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800250 '3CB8E202-3B7E-47DD-8A3C-7FF2A13CFCEC': 'ChromeOS rootfs',
251 '2E0A753D-9E48-43B0-8337-B15192CB1B5E': 'ChromeOS reserved',
252 'CAB6E88E-ABF3-4102-A07A-D4BB9BE3C1D3': 'ChromeOS firmware',
253 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B': 'EFI System Partition',
254 }
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +0800255 TYPE_GUID_REVERSE_MAP = dict(
256 ('efi' if v.startswith('EFI') else v.lower().split()[-1], k)
257 for k, v in TYPE_GUID_MAP.iteritems())
Hung-Te Linfe724f82018-04-18 15:03:58 +0800258 STR_TYPE_GUID_LIST_BOOTABLE = [
259 TYPE_GUID_REVERSE_MAP['kernel'],
260 TYPE_GUID_REVERSE_MAP['efi'],
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800261 ]
262
Hung-Te Linc6e009c2018-04-17 15:06:16 +0800263 @GPTBlob(PMBR_DESCRIPTION)
264 class ProtectiveMBR(GPTObject):
265 """Protective MBR (PMBR) in GPT."""
266 SIGNATURE = '\x55\xAA'
267 MAGIC = '\x1d\x9a'
268
269 CLONE_CONVERTERS = {
270 'BootGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v
271 }
272
273 @property
274 def boot_guid(self):
275 """Returns the BootGUID in decoded (uuid.UUID) format."""
276 return uuid.UUID(bytes_le=self.BootGUID)
277
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800278 @GPTBlob(HEADER_DESCRIPTION)
279 class Header(GPTObject):
280 """Wrapper to Header in GPT."""
281 SIGNATURES = ['EFI PART', 'CHROMEOS']
282 SIGNATURE_IGNORE = 'IGNOREME'
283 DEFAULT_REVISION = '\x00\x00\x01\x00'
284
285 DEFAULT_PARTITION_ENTRIES = 128
286 DEFAULT_PARTITIONS_LBA = 2 # LBA 0 = MBR, LBA 1 = GPT Header.
287
288 def Clone(self, **dargs):
289 """Creates a new instance with modifications.
290
291 GPT objects are usually named tuples that are immutable, so the only way
292 to make changes is to create a new instance with modifications.
293
294 CRC32 is always updated but PartitionArrayCRC32 must be updated explicitly
295 since we can't track changes in GPT.partitions automatically.
296
297 Note since GPTHeader.Clone will always update CRC, we can only check and
298 compute CRC by super(GPT.Header, header).Clone, or header._replace.
299 """
300 dargs['CRC32'] = 0
301 header = super(GPT.Header, self).Clone(**dargs)
302 return super(GPT.Header, header).Clone(CRC32=binascii.crc32(header.blob))
303
Hung-Te Lin6c3575a2018-04-17 15:00:49 +0800304 @classmethod
305 def Create(cls, size, block_size, pad_blocks=0,
306 part_entries=DEFAULT_PARTITION_ENTRIES):
307 """Creates a header with default values.
308
309 Args:
310 size: integer of expected image size.
311 block_size: integer for size of each block (sector).
312 pad_blocks: number of preserved sectors between header and partitions.
313 part_entries: number of partitions to include in header.
314 """
315 part_entry_size = struct.calcsize(GPT.Partition.FORMAT)
316 parts_lba = cls.DEFAULT_PARTITIONS_LBA + pad_blocks
317 parts_bytes = part_entries * part_entry_size
318 parts_blocks = parts_bytes / block_size
319 if parts_bytes % block_size:
320 parts_blocks += 1
321 # PartitionsCRC32 must be updated later explicitly.
322 return cls.ReadFrom(None).Clone(
323 Signature=cls.SIGNATURES[0],
324 Revision=cls.DEFAULT_REVISION,
325 HeaderSize=struct.calcsize(cls.FORMAT),
326 CurrentLBA=1,
327 BackupLBA=size / block_size - 1,
328 FirstUsableLBA=parts_lba + parts_blocks,
329 LastUsableLBA=size / block_size - parts_blocks - parts_lba,
330 DiskGUID=uuid.uuid4().get_bytes(),
331 PartitionEntriesStartingLBA=parts_lba,
332 PartitionEntriesNumber=part_entries,
333 PartitionEntrySize=part_entry_size,
334 )
335
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800336 class PartitionAttributes(object):
337 """Wrapper for Partition.Attributes.
338
339 This can be created using Partition.attrs, but the changed properties won't
340 apply to underlying Partition until an explicit call with
341 Partition.Clone(Attributes=new_attrs).
342 """
343
344 def __init__(self, attrs):
345 self._attrs = attrs
346
347 @property
348 def raw(self):
349 """Returns the raw integer type attributes."""
350 return self._Get()
351
352 def _Get(self):
353 return self._attrs
354
355 def _Set(self, value):
356 self._attrs = value
357
358 successful = BitProperty(_Get, _Set, 56, 1)
359 tries = BitProperty(_Get, _Set, 52, 0xf)
360 priority = BitProperty(_Get, _Set, 48, 0xf)
361 legacy_boot = BitProperty(_Get, _Set, 2, 1)
362 required = BitProperty(_Get, _Set, 0, 1)
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +0800363 raw_16 = BitProperty(_Get, _Set, 48, 0xffff)
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800364
365 @GPTBlob(PARTITION_DESCRIPTION)
366 class Partition(GPTObject):
367 """The partition entry in GPT.
368
369 Please include following properties when creating a Partition object:
370 - image: a string for path to the image file the partition maps to.
371 - number: the 1-based partition number.
372 - block_size: an integer for size of each block (LBA, or sector).
373 """
374 NAMES_ENCODING = 'utf-16-le'
375 NAMES_LENGTH = 72
376
377 CLONE_CONVERTERS = {
378 # TODO(hungte) check if encoded name is too long.
379 'label': lambda l: (None if l is None else
380 ('Names', l.encode(GPT.Partition.NAMES_ENCODING))),
381 'TypeGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v,
382 'UniqueGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v,
383 'Attributes': (
384 lambda v: v.raw if isinstance(v, GPT.PartitionAttributes) else v),
385 }
386
387 def __str__(self):
388 return '%s#%s' % (self.image, self.number)
389
Hung-Te Lin6c3575a2018-04-17 15:00:49 +0800390 @classmethod
391 def Create(cls, block_size, image, number):
392 """Creates a new partition entry with given meta data."""
393 part = cls.ReadFrom(
394 None, image=image, number=number, block_size=block_size)
395 return part
396
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800397 def IsUnused(self):
398 """Returns if the partition is unused and can be allocated."""
399 return self.TypeGUID == GPT.TYPE_GUID_UNUSED
400
Hung-Te Linfe724f82018-04-18 15:03:58 +0800401 def IsChromeOSKernel(self):
402 """Returns if the partition is a Chrome OS kernel partition."""
403 return self.TypeGUID == uuid.UUID(
404 GPT.TYPE_GUID_REVERSE_MAP['kernel']).bytes_le
405
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800406 @property
407 def blocks(self):
408 """Return size of partition in blocks (see block_size)."""
409 return self.LastLBA - self.FirstLBA + 1
410
411 @property
412 def offset(self):
413 """Returns offset to partition in bytes."""
414 return self.FirstLBA * self.block_size
415
416 @property
417 def size(self):
418 """Returns size of partition in bytes."""
419 return self.blocks * self.block_size
420
421 @property
422 def type_guid(self):
423 return uuid.UUID(bytes_le=self.TypeGUID)
424
425 @property
426 def unique_guid(self):
427 return uuid.UUID(bytes_le=self.UniqueGUID)
428
429 @property
430 def label(self):
431 """Returns the Names in decoded string type."""
432 return self.Names.decode(self.NAMES_ENCODING).strip('\0')
433
434 @property
435 def attrs(self):
436 return GPT.PartitionAttributes(self.Attributes)
437
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800438 def __init__(self):
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800439 """GPT constructor.
440
441 See LoadFromFile for how it's usually used.
442 """
Hung-Te Linc6e009c2018-04-17 15:06:16 +0800443 self.pmbr = None
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800444 self.header = None
445 self.partitions = None
Hung-Te Linf148d322018-04-13 10:24:42 +0800446 self.block_size = self.DEFAULT_BLOCK_SIZE
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800447 self.is_secondary = False
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800448
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800449 @classmethod
Hung-Te Linf641d302018-04-18 15:09:35 +0800450 def GetTypeGUID(cls, input_uuid):
451 if input_uuid.lower() in cls.TYPE_GUID_REVERSE_MAP:
452 input_uuid = cls.TYPE_GUID_REVERSE_MAP[input_uuid.lower()]
453 return uuid.UUID(input_uuid)
454
455 @classmethod
Hung-Te Lin6c3575a2018-04-17 15:00:49 +0800456 def Create(cls, image_name, size, block_size, pad_blocks=0):
457 """Creates a new GPT instance from given size and block_size.
458
459 Args:
460 image_name: a string of underlying disk image file name.
461 size: expected size of disk image.
462 block_size: size of each block (sector) in bytes.
463 pad_blocks: number of blocks between header and partitions array.
464 """
465 gpt = cls()
466 gpt.block_size = block_size
467 gpt.header = cls.Header.Create(size, block_size, pad_blocks)
468 gpt.partitions = [
469 cls.Partition.Create(block_size, image_name, i + 1)
470 for i in xrange(gpt.header.PartitionEntriesNumber)]
471 return gpt
472
473 @classmethod
Hung-Te Lin6977ae12018-04-17 12:20:32 +0800474 def LoadFromFile(cls, image):
475 """Loads a GPT table from give disk image file object.
476
477 Args:
478 image: a string as file path or a file-like object to read from.
479 """
480 if isinstance(image, basestring):
481 with open(image, 'rb') as f:
482 return cls.LoadFromFile(f)
483
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800484 gpt = cls()
Hung-Te Linc6e009c2018-04-17 15:06:16 +0800485 image.seek(0)
486 pmbr = gpt.ProtectiveMBR.ReadFrom(image)
487 if pmbr.Signature == cls.ProtectiveMBR.SIGNATURE:
488 logging.debug('Found MBR signature in %s', image.name)
489 if pmbr.Magic == cls.ProtectiveMBR.MAGIC:
490 logging.debug('Found PMBR in %s', image.name)
491 gpt.pmbr = pmbr
492
Hung-Te Linf148d322018-04-13 10:24:42 +0800493 # Try DEFAULT_BLOCK_SIZE, then 4K.
494 for block_size in [cls.DEFAULT_BLOCK_SIZE, 4096]:
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800495 # Note because there are devices setting Primary as ignored and the
496 # partition table signature accepts 'CHROMEOS' which is also used by
497 # Chrome OS kernel partition, we have to look for Secondary (backup) GPT
498 # first before trying other block sizes, otherwise we may incorrectly
499 # identify a kernel partition as LBA 1 of larger block size system.
500 for i, seek in enumerate([(block_size * 1, os.SEEK_SET),
501 (-block_size, os.SEEK_END)]):
502 image.seek(*seek)
503 header = gpt.Header.ReadFrom(image)
504 if header.Signature in cls.Header.SIGNATURES:
505 gpt.block_size = block_size
506 if i != 0:
507 gpt.is_secondary = True
508 break
509 else:
510 # Nothing found, try next block size.
511 continue
512 # Found a valid signature.
513 break
Hung-Te Linf148d322018-04-13 10:24:42 +0800514 else:
Hung-Te Lin4dfd3302018-04-17 14:47:52 +0800515 raise GPTError('Invalid signature in GPT header.')
Hung-Te Linf148d322018-04-13 10:24:42 +0800516
Hung-Te Lin6977ae12018-04-17 12:20:32 +0800517 image.seek(gpt.block_size * header.PartitionEntriesStartingLBA)
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800518 def ReadPartition(image, i):
519 p = gpt.Partition.ReadFrom(
520 image, image=image.name, number=i + 1, block_size=gpt.block_size)
521 return p
522
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800523 gpt.header = header
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800524 gpt.partitions = [
525 ReadPartition(image, i) for i in range(header.PartitionEntriesNumber)]
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800526 return gpt
527
Hung-Te Linc5196682018-04-18 22:59:59 +0800528 def GetUsedPartitions(self):
529 """Returns a list of partitions with type GUID not set to unused.
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800530
Hung-Te Linc5196682018-04-18 22:59:59 +0800531 Use 'number' property to find the real location of partition in
532 self.partitions.
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800533 """
Hung-Te Linc5196682018-04-18 22:59:59 +0800534 return [p for p in self.partitions if not p.IsUnused()]
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800535
536 def GetMaxUsedLBA(self):
Hung-Te Lind3a2e9a2018-04-19 13:07:26 +0800537 """Returns the max LastLBA from all used partitions."""
Hung-Te Linc5196682018-04-18 22:59:59 +0800538 parts = self.GetUsedPartitions()
Hung-Te Lind3a2e9a2018-04-19 13:07:26 +0800539 return (max(p.LastLBA for p in parts)
540 if parts else self.header.FirstUsableLBA - 1)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800541
542 def GetPartitionTableBlocks(self, header=None):
543 """Returns the blocks (or LBA) of partition table from given header."""
544 if header is None:
545 header = self.header
546 size = header.PartitionEntrySize * header.PartitionEntriesNumber
Hung-Te Linf148d322018-04-13 10:24:42 +0800547 blocks = size / self.block_size
548 if size % self.block_size:
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800549 blocks += 1
550 return blocks
551
Hung-Te Lin5f0dea42018-04-18 23:20:11 +0800552 def GetPartition(self, number):
553 """Gets the Partition by given (1-based) partition number.
554
555 Args:
556 number: an integer as 1-based partition number.
557 """
558 if not 0 < number <= len(self.partitions):
559 raise GPTError('Invalid partition number %s.' % number)
560 return self.partitions[number - 1]
561
562 def UpdatePartition(self, part, number=None):
563 """Updates the entry in partition table by given Partition object.
564
565 Args:
566 part: a Partition GPT object.
567 number: an integer as 1-based partition number. None to use part.number.
568 """
569 number = part.number if number is None else number
570 self.partitions[number - 1] = part
571
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800572 def Resize(self, new_size):
573 """Adjust GPT for a disk image in given size.
574
575 Args:
576 new_size: Integer for new size of disk image file.
577 """
Hung-Te Linf148d322018-04-13 10:24:42 +0800578 old_size = self.block_size * (self.header.BackupLBA + 1)
579 if new_size % self.block_size:
Hung-Te Lin4dfd3302018-04-17 14:47:52 +0800580 raise GPTError(
581 'New file size %d is not valid for image files.' % new_size)
Hung-Te Linf148d322018-04-13 10:24:42 +0800582 new_blocks = new_size / self.block_size
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800583 if old_size != new_size:
584 logging.warn('Image size (%d, LBA=%d) changed from %d (LBA=%d).',
Hung-Te Linf148d322018-04-13 10:24:42 +0800585 new_size, new_blocks, old_size, old_size / self.block_size)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800586 else:
587 logging.info('Image size (%d, LBA=%d) not changed.',
588 new_size, new_blocks)
Hung-Te Lind3a2e9a2018-04-19 13:07:26 +0800589 return
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800590
Hung-Te Lind3a2e9a2018-04-19 13:07:26 +0800591 # Expected location
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800592 backup_lba = new_blocks - 1
Hung-Te Lind3a2e9a2018-04-19 13:07:26 +0800593 last_usable_lba = backup_lba - self.header.FirstUsableLBA
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800594
Hung-Te Lind3a2e9a2018-04-19 13:07:26 +0800595 if last_usable_lba < self.header.LastUsableLBA:
596 max_used_lba = self.GetMaxUsedLBA()
597 if last_usable_lba < max_used_lba:
Hung-Te Lin4dfd3302018-04-17 14:47:52 +0800598 raise GPTError('Backup partition tables will overlap used partitions')
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800599
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800600 self.header = self.header.Clone(
601 BackupLBA=backup_lba, LastUsableLBA=last_usable_lba)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800602
603 def GetFreeSpace(self):
604 """Returns the free (available) space left according to LastUsableLBA."""
605 max_lba = self.GetMaxUsedLBA()
606 assert max_lba <= self.header.LastUsableLBA, "Partitions too large."
Hung-Te Linf148d322018-04-13 10:24:42 +0800607 return self.block_size * (self.header.LastUsableLBA - max_lba)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800608
Hung-Te Lin5f0dea42018-04-18 23:20:11 +0800609 def ExpandPartition(self, number):
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800610 """Expands a given partition to last usable LBA.
611
612 Args:
Hung-Te Lin5f0dea42018-04-18 23:20:11 +0800613 number: an integer to specify partition in 1-based number.
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800614
615 Returns:
616 (old_blocks, new_blocks) for size in blocks.
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800617 """
618 # Assume no partitions overlap, we need to make sure partition[i] has
619 # largest LBA.
Hung-Te Lin5f0dea42018-04-18 23:20:11 +0800620 p = self.GetPartition(number)
621 if p.IsUnused():
622 raise GPTError('Partition %s is unused.' % p)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800623 max_used_lba = self.GetMaxUsedLBA()
Hung-Te Linc5196682018-04-18 22:59:59 +0800624 # TODO(hungte) We can do more by finding free space after i.
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800625 if max_used_lba > p.LastLBA:
Hung-Te Lin4dfd3302018-04-17 14:47:52 +0800626 raise GPTError(
Hung-Te Linc5196682018-04-18 22:59:59 +0800627 'Cannot expand %s because it is not allocated at last.' % p)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800628
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800629 old_blocks = p.blocks
630 p = p.Clone(LastLBA=self.header.LastUsableLBA)
631 new_blocks = p.blocks
Hung-Te Lin5f0dea42018-04-18 23:20:11 +0800632 self.UpdatePartition(p)
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800633 logging.warn(
634 '%s expanded, size in LBA: %d -> %d.', p, old_blocks, new_blocks)
635 return (old_blocks, new_blocks)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800636
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800637 def GetIgnoredHeader(self):
638 """Returns a primary header with signature set to 'IGNOREME'.
639
640 This is a special trick to enforce using backup header, when there is
641 some security exploit in LBA1.
642 """
643 return self.header.Clone(Signature=self.header.SIGNATURE_IGNORE)
644
Hung-Te Lin3b491672018-04-19 01:41:20 +0800645 def CheckIntegrity(self):
646 """Checks if the GPT objects all look good."""
647 # Check if the header allocation looks good. CurrentLBA and
648 # PartitionEntriesStartingLBA should be all outside [FirstUsableLBA,
649 # LastUsableLBA].
650 header = self.header
651 entries_first_lba = header.PartitionEntriesStartingLBA
652 entries_last_lba = entries_first_lba + self.GetPartitionTableBlocks() - 1
653
654 def CheckOutsideUsable(name, lba, outside_entries=False):
655 if lba < 1:
656 raise GPTError('%s should not live in LBA %s.' % (name, lba))
657 if lba > max(header.BackupLBA, header.CurrentLBA):
658 # Note this is "in theory" possible, but we want to report this as
659 # error as well, since it usually leads to error.
660 raise GPTError('%s (%s) should not be larger than BackupLBA (%s).' %
661 (name, lba, header.BackupLBA))
662 if header.FirstUsableLBA <= lba <= header.LastUsableLBA:
663 raise GPTError('%s (%s) should not be included in usable LBAs [%s,%s]' %
664 (name, lba, header.FirstUsableLBA, header.LastUsableLBA))
665 if outside_entries and entries_first_lba <= lba <= entries_last_lba:
666 raise GPTError('%s (%s) should be outside partition entries [%s,%s]' %
667 (name, lba, entries_first_lba, entries_last_lba))
668 CheckOutsideUsable('Header', header.CurrentLBA, True)
669 CheckOutsideUsable('Backup header', header.BackupLBA, True)
670 CheckOutsideUsable('Partition entries', entries_first_lba)
671 CheckOutsideUsable('Partition entries end', entries_last_lba)
672
673 parts = self.GetUsedPartitions()
674 # Check if partition entries overlap with each other.
675 lba_list = [(p.FirstLBA, p.LastLBA, p) for p in parts]
676 lba_list.sort(key=lambda t: t[0])
677 for i in xrange(len(lba_list) - 1):
678 if lba_list[i][1] >= lba_list[i + 1][0]:
679 raise GPTError('Overlap in partition entries: [%s,%s]%s, [%s,%s]%s.' %
680 (lba_list[i] + lba_list[i + 1]))
681 # Now, check the first and last partition.
682 if lba_list:
683 p = lba_list[0][2]
684 if p.FirstLBA < header.FirstUsableLBA:
685 raise GPTError(
686 'Partition %s must not go earlier (%s) than FirstUsableLBA=%s' %
687 (p, p.FirstLBA, header.FirstLBA))
688 p = lba_list[-1][2]
689 if p.LastLBA > header.LastUsableLBA:
690 raise GPTError(
691 'Partition %s must not go further (%s) than LastUsableLBA=%s' %
692 (p, p.LastLBA, header.LastLBA))
693 # Check if UniqueGUIDs are not unique.
694 if len(set(p.UniqueGUID for p in parts)) != len(parts):
695 raise GPTError('Partition UniqueGUIDs are duplicated.')
696 # Check if CRCs match.
697 if (binascii.crc32(''.join(p.blob for p in self.partitions)) !=
698 header.PartitionArrayCRC32):
699 raise GPTError('GPT Header PartitionArrayCRC32 does not match.')
700
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800701 def UpdateChecksum(self):
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800702 """Updates all checksum fields in GPT objects.
703
704 The Header.CRC32 is automatically updated in Header.Clone().
705 """
706 parts = ''.join(p.blob for p in self.partitions)
707 self.header = self.header.Clone(
708 PartitionArrayCRC32=binascii.crc32(parts))
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800709
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800710 def GetBackupHeader(self, header):
711 """Returns the backup header according to given header."""
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800712 partitions_starting_lba = (
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800713 header.BackupLBA - self.GetPartitionTableBlocks())
714 return header.Clone(
715 BackupLBA=header.CurrentLBA,
716 CurrentLBA=header.BackupLBA,
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800717 PartitionEntriesStartingLBA=partitions_starting_lba)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800718
Hung-Te Linc6e009c2018-04-17 15:06:16 +0800719 @classmethod
720 def WriteProtectiveMBR(cls, image, create, bootcode=None, boot_guid=None):
721 """Writes a protective MBR to given file.
722
723 Each MBR is 512 bytes: 424 bytes for bootstrap code, 16 bytes of boot GUID,
724 4 bytes of disk id, 2 bytes of bootcode magic, 4*16 for 4 partitions, and 2
725 byte as signature. cgpt has hard-coded the CHS and bootstrap magic values so
726 we can follow that.
727
728 Args:
729 create: True to re-create PMBR structure.
730 bootcode: a blob of new boot code.
731 boot_guid a blob for new boot GUID.
732
733 Returns:
734 The written PMBR structure.
735 """
736 if isinstance(image, basestring):
737 with open(image, 'rb+') as f:
738 return cls.WriteProtectiveMBR(f, create, bootcode, boot_guid)
739
740 image.seek(0)
741 assert struct.calcsize(cls.ProtectiveMBR.FORMAT) == cls.DEFAULT_BLOCK_SIZE
742 pmbr = cls.ProtectiveMBR.ReadFrom(image)
743
744 if create:
745 legacy_sectors = min(
746 0x100000000,
747 os.path.getsize(image.name) / cls.DEFAULT_BLOCK_SIZE) - 1
748 # Partition 0 must have have the fixed CHS with number of sectors
749 # (calculated as legacy_sectors later).
750 part0 = ('00000200eeffffff01000000'.decode('hex') +
751 struct.pack('<I', legacy_sectors))
752 # Partition 1~3 should be all zero.
753 part1 = '\x00' * 16
754 assert len(part0) == len(part1) == 16, 'MBR entry is wrong.'
755 pmbr = pmbr.Clone(
756 BootGUID=cls.TYPE_GUID_UNUSED,
757 DiskID=0,
758 Magic=cls.ProtectiveMBR.MAGIC,
759 LegacyPart0=part0,
760 LegacyPart1=part1,
761 LegacyPart2=part1,
762 LegacyPart3=part1,
763 Signature=cls.ProtectiveMBR.SIGNATURE)
764
765 if bootcode:
766 if len(bootcode) > len(pmbr.BootCode):
767 logging.info(
768 'Bootcode is larger (%d > %d)!', len(bootcode), len(pmbr.BootCode))
769 bootcode = bootcode[:len(pmbr.BootCode)]
770 pmbr = pmbr.Clone(BootCode=bootcode)
771 if boot_guid:
772 pmbr = pmbr.Clone(BootGUID=boot_guid)
773
774 blob = pmbr.blob
775 assert len(blob) == cls.DEFAULT_BLOCK_SIZE
776 image.seek(0)
777 image.write(blob)
778 return pmbr
779
Hung-Te Lin6977ae12018-04-17 12:20:32 +0800780 def WriteToFile(self, image):
781 """Updates partition table in a disk image file.
782
783 Args:
784 image: a string as file path or a file-like object to write into.
785 """
786 if isinstance(image, basestring):
787 with open(image, 'rb+') as f:
788 return self.WriteToFile(f)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800789
790 def WriteData(name, blob, lba):
791 """Writes a blob into given location."""
792 logging.info('Writing %s in LBA %d (offset %d)',
Hung-Te Linf148d322018-04-13 10:24:42 +0800793 name, lba, lba * self.block_size)
Hung-Te Lin6977ae12018-04-17 12:20:32 +0800794 image.seek(lba * self.block_size)
795 image.write(blob)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800796
797 self.UpdateChecksum()
Hung-Te Lin3b491672018-04-19 01:41:20 +0800798 self.CheckIntegrity()
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800799 parts_blob = ''.join(p.blob for p in self.partitions)
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800800
801 header = self.header
802 WriteData('GPT Header', header.blob, header.CurrentLBA)
803 WriteData('GPT Partitions', parts_blob, header.PartitionEntriesStartingLBA)
804 logging.info(
805 'Usable LBA: First=%d, Last=%d', header.FirstUsableLBA,
806 header.LastUsableLBA)
807
808 if not self.is_secondary:
809 # When is_secondary is True, the header we have is actually backup header.
810 backup_header = self.GetBackupHeader(self.header)
811 WriteData(
812 'Backup Partitions', parts_blob,
813 backup_header.PartitionEntriesStartingLBA)
814 WriteData(
815 'Backup Header', backup_header.blob, backup_header.CurrentLBA)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800816
817
818class GPTCommands(object):
819 """Collection of GPT sub commands for command line to use.
820
821 The commands are derived from `cgpt`, but not necessary to be 100% compatible
822 with cgpt.
823 """
824
825 FORMAT_ARGS = [
Peter Shihc7156ca2018-02-26 14:46:24 +0800826 ('begin', 'beginning sector'),
Hung-Te Lin49ac3c22018-04-17 14:37:54 +0800827 ('size', 'partition size (in sectors)'),
Peter Shihc7156ca2018-02-26 14:46:24 +0800828 ('type', 'type guid'),
829 ('unique', 'unique guid'),
830 ('label', 'label'),
831 ('Successful', 'Successful flag'),
832 ('Tries', 'Tries flag'),
833 ('Priority', 'Priority flag'),
834 ('Legacy', 'Legacy Boot flag'),
835 ('Attribute', 'raw 16-bit attribute value (bits 48-63)')]
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800836
837 def __init__(self):
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800838 commands = dict(
839 (command.lower(), getattr(self, command)())
840 for command in dir(self)
841 if (isinstance(getattr(self, command), type) and
842 issubclass(getattr(self, command), self.SubCommand) and
843 getattr(self, command) is not self.SubCommand)
844 )
845 self.commands = commands
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800846
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800847 def DefineArgs(self, parser):
848 """Defines all available commands to an argparser subparsers instance."""
849 subparsers = parser.add_subparsers(help='Sub-command help.', dest='command')
850 for name, instance in sorted(self.commands.iteritems()):
851 parser = subparsers.add_parser(
852 name, description=instance.__doc__,
853 formatter_class=argparse.RawDescriptionHelpFormatter,
854 help=instance.__doc__.splitlines()[0])
855 instance.DefineArgs(parser)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800856
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800857 def Execute(self, args):
858 """Execute the sub commands by given parsed arguments."""
Hung-Te Linf641d302018-04-18 15:09:35 +0800859 return self.commands[args.command].Execute(args)
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800860
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800861 class SubCommand(object):
862 """A base class for sub commands to derive from."""
863
864 def DefineArgs(self, parser):
865 """Defines command line arguments to argparse parser.
866
867 Args:
868 parser: An argparse parser instance.
869 """
870 del parser # Unused.
871 raise NotImplementedError
872
873 def Execute(self, args):
874 """Execute the command.
875
876 Args:
877 args: An argparse parsed namespace.
878 """
879 del args # Unused.
880 raise NotImplementedError
881
Hung-Te Lin6c3575a2018-04-17 15:00:49 +0800882 class Create(SubCommand):
883 """Create or reset GPT headers and tables.
884
885 Create or reset an empty GPT.
886 """
887
888 def DefineArgs(self, parser):
889 parser.add_argument(
890 '-z', '--zero', action='store_true',
891 help='Zero the sectors of the GPT table and entries')
892 parser.add_argument(
893 '-p', '--pad_blocks', type=int, default=0,
894 help=('Size (in blocks) of the disk to pad between the '
895 'primary GPT header and its entries, default %(default)s'))
896 parser.add_argument(
897 '--block_size', type=int, default=GPT.DEFAULT_BLOCK_SIZE,
898 help='Size of each block (sector) in bytes.')
899 parser.add_argument(
900 'image_file', type=argparse.FileType('rb+'),
901 help='Disk image file to create.')
902
903 def Execute(self, args):
904 block_size = args.block_size
905 gpt = GPT.Create(
906 args.image_file.name, os.path.getsize(args.image_file.name),
907 block_size, args.pad_blocks)
908 if args.zero:
909 # In theory we only need to clear LBA 1, but to make sure images already
910 # initialized with different block size won't have GPT signature in
911 # different locations, we should zero until first usable LBA.
912 args.image_file.seek(0)
913 args.image_file.write('\0' * block_size * gpt.header.FirstUsableLBA)
914 gpt.WriteToFile(args.image_file)
915 print('OK: Created GPT for %s' % args.image_file.name)
916
Hung-Te Linc6e009c2018-04-17 15:06:16 +0800917 class Boot(SubCommand):
918 """Edit the PMBR sector for legacy BIOSes.
919
920 With no options, it will just print the PMBR boot guid.
921 """
922
923 def DefineArgs(self, parser):
924 parser.add_argument(
925 '-i', '--number', type=int,
926 help='Set bootable partition')
927 parser.add_argument(
928 '-b', '--bootloader', type=argparse.FileType('r'),
929 help='Install bootloader code in the PMBR')
930 parser.add_argument(
931 '-p', '--pmbr', action='store_true',
932 help='Create legacy PMBR partition table')
933 parser.add_argument(
934 'image_file', type=argparse.FileType('rb+'),
935 help='Disk image file to change PMBR.')
936
937 def Execute(self, args):
938 """Rebuilds the protective MBR."""
939 bootcode = args.bootloader.read() if args.bootloader else None
940 boot_guid = None
941 if args.number is not None:
942 gpt = GPT.LoadFromFile(args.image_file)
Hung-Te Lin5f0dea42018-04-18 23:20:11 +0800943 boot_guid = gpt.GetPartition(args.number).UniqueGUID
Hung-Te Linc6e009c2018-04-17 15:06:16 +0800944 pmbr = GPT.WriteProtectiveMBR(
945 args.image_file, args.pmbr, bootcode=bootcode, boot_guid=boot_guid)
946
947 print(str(pmbr.boot_guid).upper())
948
Hung-Te Linc34d89c2018-04-17 15:11:34 +0800949 class Legacy(SubCommand):
950 """Switch between GPT and Legacy GPT.
951
952 Switch GPT header signature to "CHROMEOS".
953 """
954
955 def DefineArgs(self, parser):
956 parser.add_argument(
957 '-e', '--efi', action='store_true',
958 help='Switch GPT header signature back to "EFI PART"')
959 parser.add_argument(
960 '-p', '--primary-ignore', action='store_true',
961 help='Switch primary GPT header signature to "IGNOREME"')
962 parser.add_argument(
963 'image_file', type=argparse.FileType('rb+'),
964 help='Disk image file to change.')
965
966 def Execute(self, args):
967 gpt = GPT.LoadFromFile(args.image_file)
968 # cgpt behavior: if -p is specified, -e is ignored.
969 if args.primary_ignore:
970 if gpt.is_secondary:
971 raise GPTError('Sorry, the disk already has primary GPT ignored.')
972 args.image_file.seek(gpt.header.CurrentLBA * gpt.block_size)
973 args.image_file.write(gpt.header.SIGNATURE_IGNORE)
974 gpt.header = gpt.GetBackupHeader(self.header)
975 gpt.is_secondary = True
976 else:
977 new_signature = gpt.Header.SIGNATURES[0 if args.efi else 1]
978 gpt.header = gpt.header.Clone(Signature=new_signature)
979 gpt.WriteToFile(args.image_file)
980 if args.primary_ignore:
981 print('OK: Set %s primary GPT header to %s.' %
982 (args.image_file.name, gpt.header.SIGNATURE_IGNORE))
983 else:
984 print('OK: Changed GPT signature for %s to %s.' %
985 (args.image_file.name, new_signature))
986
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800987 class Repair(SubCommand):
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800988 """Repair damaged GPT headers and tables."""
Hung-Te Linc772e1a2017-04-14 16:50:50 +0800989
Hung-Te Lin5cb0c312018-04-17 14:56:43 +0800990 def DefineArgs(self, parser):
991 parser.add_argument(
992 'image_file', type=argparse.FileType('rb+'),
993 help='Disk image file to repair.')
994
995 def Execute(self, args):
996 gpt = GPT.LoadFromFile(args.image_file)
997 gpt.Resize(os.path.getsize(args.image_file.name))
998 gpt.WriteToFile(args.image_file)
999 print('Disk image file %s repaired.' % args.image_file.name)
1000
1001 class Expand(SubCommand):
1002 """Expands a GPT partition to all available free space."""
1003
1004 def DefineArgs(self, parser):
1005 parser.add_argument(
1006 '-i', '--number', type=int, required=True,
1007 help='The partition to expand.')
1008 parser.add_argument(
1009 'image_file', type=argparse.FileType('rb+'),
1010 help='Disk image file to modify.')
1011
1012 def Execute(self, args):
1013 gpt = GPT.LoadFromFile(args.image_file)
Hung-Te Lin5f0dea42018-04-18 23:20:11 +08001014 old_blocks, new_blocks = gpt.ExpandPartition(args.number)
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001015 gpt.WriteToFile(args.image_file)
1016 if old_blocks < new_blocks:
1017 print(
1018 'Partition %s on disk image file %s has been extended '
1019 'from %s to %s .' %
1020 (args.number, args.image_file.name, old_blocks * gpt.block_size,
1021 new_blocks * gpt.block_size))
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001022 else:
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001023 print('Nothing to expand for disk image %s partition %s.' %
1024 (args.image_file.name, args.number))
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001025
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001026 class Add(SubCommand):
1027 """Add, edit, or remove a partition entry.
1028
1029 Use the -i option to modify an existing partition.
1030 The -b, -s, and -t options must be given for new partitions.
1031
1032 The partition type may also be given as one of these aliases:
1033
1034 firmware ChromeOS firmware
1035 kernel ChromeOS kernel
1036 rootfs ChromeOS rootfs
1037 data Linux data
1038 reserved ChromeOS reserved
1039 efi EFI System Partition
1040 unused Unused (nonexistent) partition
1041 """
1042 def DefineArgs(self, parser):
1043 parser.add_argument(
1044 '-i', '--number', type=int,
1045 help='Specify partition (default is next available)')
1046 parser.add_argument(
1047 '-b', '--begin', type=int,
1048 help='Beginning sector')
1049 parser.add_argument(
1050 '-s', '--sectors', type=int,
1051 help='Size in sectors (logical blocks).')
1052 parser.add_argument(
1053 '-t', '--type_guid',
1054 help='Partition Type GUID')
1055 parser.add_argument(
1056 '-u', '--unique_guid',
1057 help='Partition Unique ID')
1058 parser.add_argument(
1059 '-l', '--label',
1060 help='Label')
1061 parser.add_argument(
1062 '-S', '--successful', type=int, choices=xrange(2),
1063 help='set Successful flag')
1064 parser.add_argument(
1065 '-T', '--tries', type=int,
1066 help='set Tries flag (0-15)')
1067 parser.add_argument(
1068 '-P', '--priority', type=int,
1069 help='set Priority flag (0-15)')
1070 parser.add_argument(
1071 '-R', '--required', type=int, choices=xrange(2),
1072 help='set Required flag')
1073 parser.add_argument(
1074 '-B', '--boot_legacy', dest='legacy_boot', type=int,
1075 choices=xrange(2),
1076 help='set Legacy Boot flag')
1077 parser.add_argument(
1078 '-A', '--attribute', dest='raw_16', type=int,
1079 help='set raw 16-bit attribute value (bits 48-63)')
1080 parser.add_argument(
1081 'image_file', type=argparse.FileType('rb+'),
1082 help='Disk image file to modify.')
1083
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001084 def Execute(self, args):
1085 gpt = GPT.LoadFromFile(args.image_file)
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001086 number = args.number
1087 if number is None:
Hung-Te Linc5196682018-04-18 22:59:59 +08001088 number = next(p for p in gpt.partitions if p.IsUnused()).number
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001089
1090 # First and last LBA must be calculated explicitly because the given
1091 # argument is size.
Hung-Te Lin5f0dea42018-04-18 23:20:11 +08001092 part = gpt.GetPartition(number)
Hung-Te Linc5196682018-04-18 22:59:59 +08001093 is_new_part = part.IsUnused()
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001094
1095 if is_new_part:
1096 part = part.ReadFrom(None, **part.__dict__).Clone(
Hung-Te Linc5196682018-04-18 22:59:59 +08001097 FirstLBA=gpt.GetMaxUsedLBA() + 1,
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001098 LastLBA=gpt.header.LastUsableLBA,
1099 UniqueGUID=uuid.uuid4(),
Hung-Te Linf641d302018-04-18 15:09:35 +08001100 TypeGUID=gpt.GetTypeGUID('data'))
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001101
1102 attr = part.attrs
1103 if args.legacy_boot is not None:
1104 attr.legacy_boot = args.legacy_boot
1105 if args.required is not None:
1106 attr.required = args.required
1107 if args.priority is not None:
1108 attr.priority = args.priority
1109 if args.tries is not None:
1110 attr.tries = args.tries
1111 if args.successful is not None:
1112 attr.successful = args.successful
1113 if args.raw_16 is not None:
1114 attr.raw_16 = args.raw_16
1115
1116 first_lba = part.FirstLBA if args.begin is None else args.begin
1117 last_lba = first_lba - 1 + (
1118 part.blocks if args.sectors is None else args.sectors)
1119 dargs = dict(
1120 FirstLBA=first_lba,
1121 LastLBA=last_lba,
1122 TypeGUID=(part.TypeGUID if args.type_guid is None else
Hung-Te Linf641d302018-04-18 15:09:35 +08001123 gpt.GetTypeGUID(args.type_guid)),
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001124 UniqueGUID=(part.UniqueGUID if args.unique_guid is None else
1125 uuid.UUID(bytes_le=args.unique_guid)),
1126 Attributes=attr,
1127 )
1128 if args.label is not None:
1129 dargs['label'] = args.label
1130
1131 part = part.Clone(**dargs)
1132 # Wipe partition again if it should be empty.
1133 if part.IsUnused():
1134 part = part.ReadFrom(None, **part.__dict__)
1135
Hung-Te Lin5f0dea42018-04-18 23:20:11 +08001136 gpt.UpdatePartition(part)
Hung-Te Linfcd1a8d2018-04-17 15:15:01 +08001137 gpt.WriteToFile(args.image_file)
1138 if part.IsUnused():
1139 # If we do ('%s' % part) there will be TypeError.
1140 print('OK: Deleted (zeroed) %s.' % (part,))
1141 else:
1142 print('OK: %s %s (%s+%s).' %
1143 ('Added' if is_new_part else 'Modified',
1144 part, part.FirstLBA, part.blocks))
1145
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001146 class Show(SubCommand):
1147 """Show partition table and entries.
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001148
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001149 Display the GPT table.
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001150 """
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001151
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001152 def DefineArgs(self, parser):
1153 parser.add_argument(
1154 '--numeric', '-n', action='store_true',
1155 help='Numeric output only.')
1156 parser.add_argument(
1157 '--quick', '-q', action='store_true',
1158 help='Quick output.')
1159 parser.add_argument(
1160 '-i', '--number', type=int,
1161 help='Show specified partition only, with format args.')
1162 for name, help_str in GPTCommands.FORMAT_ARGS:
1163 # TODO(hungte) Alert if multiple args were specified.
1164 parser.add_argument(
1165 '--%s' % name, '-%c' % name[0], action='store_true',
1166 help='[format] %s.' % help_str)
1167 parser.add_argument(
1168 'image_file', type=argparse.FileType('rb'),
1169 help='Disk image file to show.')
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001170
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001171 def Execute(self, args):
1172 """Show partition table and entries."""
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001173
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001174 def FormatGUID(bytes_le):
1175 return str(uuid.UUID(bytes_le=bytes_le)).upper()
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001176
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001177 def FormatTypeGUID(p):
1178 guid_str = FormatGUID(p.TypeGUID)
1179 if not args.numeric:
1180 names = gpt.TYPE_GUID_MAP.get(guid_str)
1181 if names:
1182 return names
1183 return guid_str
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001184
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001185 def FormatNames(p):
Hung-Te Lin49ac3c22018-04-17 14:37:54 +08001186 return p.label
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001187
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001188 def IsBootableType(type_guid):
Hung-Te Linfe724f82018-04-18 15:03:58 +08001189 return type_guid in gpt.STR_TYPE_GUID_LIST_BOOTABLE
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001190
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001191 def FormatAttribute(attrs, chromeos_kernel=False):
1192 if args.numeric:
1193 return '[%x]' % (attrs.raw >> 48)
1194 results = []
1195 if chromeos_kernel:
1196 results += [
1197 'priority=%d' % attrs.priority,
1198 'tries=%d' % attrs.tries,
1199 'successful=%d' % attrs.successful]
1200 if attrs.required:
1201 results += ['required=1']
1202 if attrs.legacy_boot:
1203 results += ['legacy_boot=1']
1204 return ' '.join(results)
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001205
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001206 def ApplyFormatArgs(p):
1207 if args.begin:
1208 return p.FirstLBA
1209 elif args.size:
1210 return p.blocks
1211 elif args.type:
1212 return FormatTypeGUID(p)
1213 elif args.unique:
1214 return FormatGUID(p.UniqueGUID)
1215 elif args.label:
1216 return FormatNames(p)
1217 elif args.Successful:
1218 return p.attrs.successful
1219 elif args.Priority:
1220 return p.attrs.priority
1221 elif args.Tries:
1222 return p.attrs.tries
1223 elif args.Legacy:
1224 return p.attrs.legacy_boot
1225 elif args.Attribute:
1226 return '[%x]' % (p.Attributes >> 48)
1227 else:
1228 return None
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001229
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001230 def IsFormatArgsSpecified():
1231 return any(getattr(args, arg[0]) for arg in GPTCommands.FORMAT_ARGS)
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001232
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001233 gpt = GPT.LoadFromFile(args.image_file)
1234 logging.debug('%r', gpt.header)
1235 fmt = '%12s %11s %7s %s'
1236 fmt2 = '%32s %s: %s'
1237 header = ('start', 'size', 'part', 'contents')
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001238
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001239 if IsFormatArgsSpecified() and args.number is None:
1240 raise GPTError('Format arguments must be used with -i.')
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001241
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001242 if not (args.number is None or
1243 0 < args.number <= gpt.header.PartitionEntriesNumber):
1244 raise GPTError('Invalid partition number: %d' % args.number)
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001245
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001246 partitions = gpt.partitions
1247 do_print_gpt_blocks = False
1248 if not (args.quick or IsFormatArgsSpecified()):
1249 print(fmt % header)
1250 if args.number is None:
1251 do_print_gpt_blocks = True
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001252
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001253 if do_print_gpt_blocks:
Hung-Te Linc34d89c2018-04-17 15:11:34 +08001254 if gpt.pmbr:
1255 print(fmt % (0, 1, '', 'PMBR'))
1256 if gpt.is_secondary:
1257 print(fmt % (gpt.header.BackupLBA, 1, 'IGNORED', 'Pri GPT header'))
1258 else:
1259 print(fmt % (gpt.header.CurrentLBA, 1, '', 'Pri GPT header'))
1260 print(fmt % (gpt.header.PartitionEntriesStartingLBA,
1261 gpt.GetPartitionTableBlocks(), '', 'Pri GPT table'))
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001262
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001263 for p in partitions:
1264 if args.number is None:
1265 # Skip unused partitions.
1266 if p.IsUnused():
1267 continue
1268 elif p.number != args.number:
1269 continue
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001270
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001271 if IsFormatArgsSpecified():
1272 print(ApplyFormatArgs(p))
1273 continue
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001274
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001275 type_guid = FormatGUID(p.TypeGUID)
1276 print(fmt % (p.FirstLBA, p.blocks, p.number,
1277 FormatTypeGUID(p) if args.quick else
1278 'Label: "%s"' % FormatNames(p)))
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001279
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001280 if not args.quick:
1281 print(fmt2 % ('', 'Type', FormatTypeGUID(p)))
1282 print(fmt2 % ('', 'UUID', FormatGUID(p.UniqueGUID)))
1283 if args.numeric or IsBootableType(type_guid):
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001284 print(fmt2 % ('', 'Attr', FormatAttribute(
Hung-Te Linfe724f82018-04-18 15:03:58 +08001285 p.attrs, p.IsChromeOSKernel())))
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001286
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001287 if do_print_gpt_blocks:
Hung-Te Linc34d89c2018-04-17 15:11:34 +08001288 if gpt.is_secondary:
1289 header = gpt.header
1290 else:
1291 f = args.image_file
1292 f.seek(gpt.header.BackupLBA * gpt.block_size)
1293 header = gpt.Header.ReadFrom(f)
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001294 print(fmt % (header.PartitionEntriesStartingLBA,
1295 gpt.GetPartitionTableBlocks(header), '',
1296 'Sec GPT table'))
1297 print(fmt % (header.CurrentLBA, 1, '', 'Sec GPT header'))
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001298
Hung-Te Lin3b491672018-04-19 01:41:20 +08001299 # Check integrity after showing all fields.
1300 gpt.CheckIntegrity()
1301
Hung-Te Linfe724f82018-04-18 15:03:58 +08001302 class Prioritize(SubCommand):
1303 """Reorder the priority of all kernel partitions.
1304
1305 Reorder the priority of all active ChromeOS Kernel partitions.
1306
1307 With no options this will set the lowest active kernel to priority 1 while
1308 maintaining the original order.
1309 """
1310
1311 def DefineArgs(self, parser):
1312 parser.add_argument(
1313 '-P', '--priority', type=int,
1314 help=('Highest priority to use in the new ordering. '
1315 'The other partitions will be ranked in decreasing '
1316 'priority while preserving their original order. '
1317 'If necessary the lowest ranks will be coalesced. '
1318 'No active kernels will be lowered to priority 0.'))
1319 parser.add_argument(
1320 '-i', '--number', type=int,
1321 help='Specify the partition to make the highest in the new order.')
1322 parser.add_argument(
1323 '-f', '--friends', action='store_true',
1324 help=('Friends of the given partition (those with the same '
1325 'starting priority) are also updated to the new '
1326 'highest priority. '))
1327 parser.add_argument(
1328 'image_file', type=argparse.FileType('rb+'),
1329 help='Disk image file to prioritize.')
1330
1331 def Execute(self, args):
1332 gpt = GPT.LoadFromFile(args.image_file)
1333 parts = [p for p in gpt.partitions if p.IsChromeOSKernel()]
1334 prios = list(set(p.attrs.priority for p in parts if p.attrs.priority))
1335 prios.sort(reverse=True)
1336 groups = [[p for p in parts if p.attrs.priority == priority]
1337 for priority in prios]
1338 if args.number:
Hung-Te Lin5f0dea42018-04-18 23:20:11 +08001339 p = gpt.GetPartition(args.number)
Hung-Te Linfe724f82018-04-18 15:03:58 +08001340 if p not in parts:
1341 raise GPTError('%s is not a ChromeOS kernel.' % p)
1342 if args.friends:
1343 group0 = [f for f in parts if f.attrs.priority == p.attrs.priority]
1344 else:
1345 group0 = [p]
1346 groups.insert(0, group0)
1347
1348 # Max priority is 0xf.
1349 highest = min(args.priority or len(prios), 0xf)
1350 logging.info('New highest priority: %s', highest)
1351 done = []
1352
1353 new_priority = highest
1354 for g in groups:
1355 has_new_part = False
1356 for p in g:
1357 if p.number in done:
1358 continue
1359 done.append(p.number)
1360 attrs = p.attrs
1361 old_priority = attrs.priority
1362 assert new_priority > 0, 'Priority must be > 0.'
1363 attrs.priority = new_priority
1364 p = p.Clone(Attributes=attrs)
Hung-Te Lin5f0dea42018-04-18 23:20:11 +08001365 gpt.UpdatePartition(p)
Hung-Te Linfe724f82018-04-18 15:03:58 +08001366 has_new_part = True
1367 logging.info('%s priority changed from %s to %s.', p, old_priority,
1368 new_priority)
1369 if has_new_part:
1370 new_priority -= 1
1371
1372 gpt.WriteToFile(args.image_file)
1373
Hung-Te Linf641d302018-04-18 15:09:35 +08001374 class Find(SubCommand):
1375 """Locate a partition by its GUID.
1376
1377 Find a partition by its UUID or label. With no specified DRIVE it scans all
1378 physical drives.
1379
1380 The partition type may also be given as one of these aliases:
1381
1382 firmware ChromeOS firmware
1383 kernel ChromeOS kernel
1384 rootfs ChromeOS rootfs
1385 data Linux data
1386 reserved ChromeOS reserved
1387 efi EFI System Partition
1388 unused Unused (nonexistent) partition
1389 """
1390 def DefineArgs(self, parser):
1391 parser.add_argument(
1392 '-t', '--type-guid',
1393 help='Search for Partition Type GUID')
1394 parser.add_argument(
1395 '-u', '--unique-guid',
1396 help='Search for Partition Unique GUID')
1397 parser.add_argument(
1398 '-l', '--label',
1399 help='Search for Label')
1400 parser.add_argument(
1401 '-n', '--numeric', action='store_true',
1402 help='Numeric output only.')
1403 parser.add_argument(
1404 '-1', '--single-match', action='store_true',
1405 help='Fail if more than one match is found.')
1406 parser.add_argument(
1407 '-M', '--match-file', type=str,
1408 help='Matching partition data must also contain MATCH_FILE content.')
1409 parser.add_argument(
1410 '-O', '--offset', type=int, default=0,
1411 help='Byte offset into partition to match content (default 0).')
1412 parser.add_argument(
1413 'drive', type=argparse.FileType('rb+'), nargs='?',
1414 help='Drive or disk image file to find.')
1415
1416 def Execute(self, args):
1417 if not any((args.type_guid, args.unique_guid, args.label)):
1418 raise GPTError('You must specify at least one of -t, -u, or -l')
1419
1420 drives = [args.drive.name] if args.drive else (
1421 '/dev/%s' % name for name in subprocess.check_output(
1422 'lsblk -d -n -r -o name', shell=True).split())
1423
1424 match_pattern = None
1425 if args.match_file:
1426 with open(args.match_file) as f:
1427 match_pattern = f.read()
1428
1429 found = 0
1430 for drive in drives:
1431 try:
1432 gpt = GPT.LoadFromFile(drive)
1433 except GPTError:
1434 if args.drive:
1435 raise
1436 # When scanning all block devices on system, ignore failure.
1437
1438 for p in gpt.partitions:
1439 if p.IsUnused():
1440 continue
1441 if args.label is not None and args.label != p.label:
1442 continue
1443 if args.unique_guid is not None and (
1444 uuid.UUID(args.unique_guid) != uuid.UUID(bytes_le=p.UniqueGUID)):
1445 continue
1446 type_guid = gpt.GetTypeGUID(args.type_guid)
1447 if args.type_guid is not None and (
1448 type_guid != uuid.UUID(bytes_le=p.TypeGUID)):
1449 continue
1450 if match_pattern:
1451 with open(drive, 'rb') as f:
1452 f.seek(p.offset + args.offset)
1453 if f.read(len(match_pattern)) != match_pattern:
1454 continue
1455 # Found the partition, now print.
1456 found += 1
1457 if args.numeric:
1458 print(p.number)
1459 else:
1460 # This is actually more for block devices.
1461 print('%s%s%s' % (p.image, 'p' if p.image[-1].isdigit() else '',
1462 p.number))
1463
1464 if found < 1 or (args.single_match and found > 1):
1465 return 1
1466 return 0
1467
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001468
1469def main():
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001470 commands = GPTCommands()
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001471 parser = argparse.ArgumentParser(description='GPT Utility.')
1472 parser.add_argument('--verbose', '-v', action='count', default=0,
1473 help='increase verbosity.')
1474 parser.add_argument('--debug', '-d', action='store_true',
1475 help='enable debug output.')
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001476 commands.DefineArgs(parser)
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001477
1478 args = parser.parse_args()
1479 log_level = max(logging.WARNING - args.verbose * 10, logging.DEBUG)
1480 if args.debug:
1481 log_level = logging.DEBUG
1482 logging.basicConfig(format='%(module)s:%(funcName)s %(message)s',
1483 level=log_level)
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001484 try:
Hung-Te Linf641d302018-04-18 15:09:35 +08001485 code = commands.Execute(args)
1486 if type(code) is int:
1487 sys.exit(code)
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001488 except Exception as e:
1489 if args.verbose or args.debug:
1490 logging.exception('Failure in command [%s]', args.command)
Hung-Te Lin5cb0c312018-04-17 14:56:43 +08001491 exit('ERROR: %s: %s' % (args.command, str(e) or 'Unknown error.'))
Hung-Te Linc772e1a2017-04-14 16:50:50 +08001492
1493
1494if __name__ == '__main__':
1495 main()