pygpt, image_tool: Refactor from namedtuple to class with restricted __slots__.
pygpt and image_tool uses GPT fields using name instead of numeric
instead often so it is better to use customized class instead of named
tuple. This also prevents hacking Python inheritance by @GPTBlob.
With the new class and field description, we also abandoned the
CLONE_CONVERTERS. The execution time is unfortunately increased because
now we always do type conversion when creating objects (previously we
create different properties, for example TypeGUID (string) and type_guid
(GUID)). However this makes it easier to maintain and access GPT data
fields.
image_tool now also extends pygpt.GPT directly instead of creating a
wrapper class.
BUG=chromium:834237
TEST=make test
Change-Id: Ieef1bdc33b8a606677ec2a8dd8c98360845e381e
Reviewed-on: https://chromium-review.googlesource.com/1027250
Commit-Ready: Hung-Te Lin <hungte@chromium.org>
Tested-by: Hung-Te Lin <hungte@chromium.org>
Reviewed-by: Pi-Hsun Shih <pihsun@chromium.org>
diff --git a/py/utils/pygpt.py b/py/utils/pygpt.py
index 8b07553..6a8a0bd 100755
--- a/py/utils/pygpt.py
+++ b/py/utils/pygpt.py
@@ -30,7 +30,6 @@
import argparse
import binascii
-import collections
import logging
import os
import struct
@@ -39,51 +38,54 @@
import uuid
-# The binascii.crc32 returns signed integer, so CRC32 in in struct must be
-# declared as 'signed' (l) instead of 'unsigned' (L).
-# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_table_header_.28LBA_1.29
-HEADER_DESCRIPTION = """
- 8s Signature
- 4s Revision
- L HeaderSize
- l CRC32
- 4s Reserved
- Q CurrentLBA
- Q BackupLBA
- Q FirstUsableLBA
- Q LastUsableLBA
- 16s DiskGUID
- Q PartitionEntriesStartingLBA
- L PartitionEntriesNumber
- L PartitionEntrySize
- l PartitionArrayCRC32
-"""
+class StructError(Exception):
+ """Exceptions in packing and unpacking from/to struct fields."""
+ pass
-# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_entries
-PARTITION_DESCRIPTION = """
- 16s TypeGUID
- 16s UniqueGUID
- Q FirstLBA
- Q LastLBA
- Q Attributes
- 72s Names
-"""
-# The PMBR has so many variants. The basic format is defined in
-# https://en.wikipedia.org/wiki/Master_boot_record#Sector_layout, and our
-# implementation, as derived from `cgpt`, is following syslinux as:
-# https://chromium.googlesource.com/chromiumos/platform/vboot_reference/+/master/cgpt/cgpt.h#32
-PMBR_DESCRIPTION = """
- 424s BootCode
- 16s BootGUID
- L DiskID
- 2s Magic
- 16s LegacyPart0
- 16s LegacyPart1
- 16s LegacyPart2
- 16s LegacyPart3
- 2s Signature
-"""
+class StructField(object):
+ """Definition of a field in struct.
+
+ Attributes:
+ fmt: a format string for struct.{pack,unpack} to use.
+ name: a string for name of processed field.
+ """
+ __slots__ = ['fmt', 'name']
+
+ def __init__(self, fmt, name):
+ self.fmt = fmt
+ self.name = name
+
+ def Pack(self, value):
+ """"Packs given value from given format."""
+ del self # Unused.
+ return value
+
+ def Unpack(self, value):
+ """Unpacks given value into given format."""
+ del self # Unused.
+ return value
+
+
+class UTF16StructField(StructField):
+ """A field in UTF encoded string."""
+ __slots__ = ['encoding', 'max_length']
+ encoding = 'utf-16-le'
+
+ def __init__(self, max_length, name):
+ self.max_length = max_length
+ fmt = '%ds' % max_length
+ super(UTF16StructField, self).__init__(fmt, name)
+
+ def Pack(self, value):
+ new_value = value.encode(self.encoding)
+ if len(new_value) >= self.max_length:
+ raise StructError('Value "%s" cannot be packed into field %s (len=%s)' %
+ (value, self.name, self.max_length))
+ return new_value
+
+ def Unpack(self, value):
+ return value.decode(self.encoding).strip('\x00')
class GUID(uuid.UUID):
@@ -98,6 +100,24 @@
return uuid.uuid4()
+class GUIDStructField(StructField):
+ """A GUID field."""
+
+ def __init__(self, name):
+ super(GUIDStructField, self).__init__('16s', name)
+
+ def Pack(self, value):
+ if value is None:
+ return '\x00' * 16
+ if not isinstance(value, uuid.UUID):
+ raise StructError('Field %s needs a GUID value instead of [%r].' %
+ (self.name, value))
+ return value.bytes_le
+
+ def Unpack(self, value):
+ return GUID(bytes_le=value)
+
+
def BitProperty(getter, setter, shift, mask):
"""A generator for bit-field properties.
@@ -120,119 +140,93 @@
return property(_getter, _setter)
-class GPTBlob(object):
- """A decorator class to help accessing GPT blobs as named tuple.
+class PartitionAttributes(object):
+ """Wrapper for Partition.Attributes.
- To use this, specify the blob description (struct format and named tuple field
- names) above the derived class, for example:
-
- @GPTBlob(description):
- class Header(GPTObject):
- pass
- """
- def __init__(self, description):
- spec = description.split()
- self.struct_format = '<' + ''.join(spec[::2])
- self.fields = spec[1::2]
-
- def __call__(self, cls):
- new_bases = ((
- collections.namedtuple(cls.__name__, self.fields),) + cls.__bases__)
- new_cls = type(cls.__name__, new_bases, dict(cls.__dict__))
- setattr(new_cls, 'FORMAT', self.struct_format)
- return new_cls
-
-
-class GPTObject(object):
- """An object in GUID Partition Table.
-
- This needs to be decorated by @GPTBlob(description) and inherited by a real
- class. Properties (not member functions) in CamelCase should be reserved for
- named tuple attributes.
-
- To create a new object, use class method ReadFrom(), which takes a stream
- as input or None to create with all elements set to zero. To make changes to
- named tuple elements, use member function Clone(changes).
-
- It is also possible to attach some additional properties to the object as meta
- data (for example path of the underlying image file). To do that, specify the
- data as keyword arguments when calling ReadFrom(). These properties will be
- preserved when you call Clone().
-
- A special case is "reset named tuple elements of an object but keeping all
- properties", for example changing a partition object to unused (zeroed).
- ReadFrom() is a class method so properties won't be copied. You need to
- call as cls.ReadFrom(None, **p.__dict__), or a short cut - p.CloneAndZero().
+ This can be created using Partition.attrs, but the changed properties won't
+ apply to underlying Partition until an explicit call with
+ Partition.Update(Attributes=new_attrs).
"""
- FORMAT = None
- """The struct.{pack,unpack} format string, and should be set by GPTBlob."""
-
- CLONE_CONVERTERS = None
- """A dict (name, cvt) to convert input arguments into named tuple data.
-
- `name` is a string for the name of argument to convert.
- `cvt` is a callable to convert value. The return value may be:
- - a tuple in (new_name, value): save the value as new name.
- - otherwise, save the value in original name.
- Note tuple is an invalid input for struct.unpack so it's used for the
- special value.
- """
-
- @classmethod
- def ReadFrom(cls, f, **kargs):
- """Reads and decode an object from stream.
-
- Args:
- f: a stream to read blob, or None to decode with all zero bytes.
- kargs: a dict for additional attributes in object.
- """
- if f is None:
- reader = lambda num: '\x00' * num
- else:
- reader = f.read
- data = cls(*struct.unpack(cls.FORMAT, reader(struct.calcsize(cls.FORMAT))))
- # Named tuples do not accept kargs in constructor.
- data.__dict__.update(kargs)
- return data
-
- def Clone(self, **dargs):
- """Clones a new instance with modifications.
-
- GPT objects are usually named tuples that are immutable, so the only way
- to make changes is to create a new instance with modifications.
-
- Args:
- dargs: a dict with all modifications.
- """
- for name, convert in (self.CLONE_CONVERTERS or {}).iteritems():
- if name not in dargs:
- continue
- result = convert(dargs.pop(name))
- if isinstance(result, tuple):
- assert len(result) == 2, 'Converted tuple must be (name, value).'
- dargs[result[0]] = result[1]
- else:
- dargs[name] = result
-
- cloned = self._replace(**dargs)
- cloned.__dict__.update(self.__dict__)
- return cloned
-
- def CloneAndZero(self, **dargs):
- """Short cut to create a zeroed object while keeping all properties.
-
- This is very similar to Clone except all named tuple elements will be zero.
- Also different from class method ReadFrom(None) because this keeps all
- properties from one object.
- """
- cloned = self.ReadFrom(None, **self.__dict__)
- return cloned.Clone(**dargs) if dargs else cloned
+ def __init__(self, attrs):
+ self._attrs = attrs
@property
- def blob(self):
- """Returns the object in formatted bytes."""
- return struct.pack(self.FORMAT, *self)
+ def raw(self):
+ """Returns the raw integer type attributes."""
+ return self._Get()
+
+ def _Get(self):
+ return self._attrs
+
+ def _Set(self, value):
+ self._attrs = value
+
+ successful = BitProperty(_Get, _Set, 56, 1)
+ tries = BitProperty(_Get, _Set, 52, 0xf)
+ priority = BitProperty(_Get, _Set, 48, 0xf)
+ legacy_boot = BitProperty(_Get, _Set, 2, 1)
+ required = BitProperty(_Get, _Set, 0, 1)
+ raw_16 = BitProperty(_Get, _Set, 48, 0xffff)
+
+
+class PartitionAttributeStructField(StructField):
+
+ def Pack(self, value):
+ if not isinstance(value, PartitionAttributes):
+ raise StructError('Given value %r is not %s.' %
+ (value, PartitionAttributes.__name__))
+ return value.raw
+
+ def Unpack(self, value):
+ return PartitionAttributes(value)
+
+
+# The binascii.crc32 returns signed integer, so CRC32 in in struct must be
+# declared as 'signed' (l) instead of 'unsigned' (L).
+# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_table_header_.28LBA_1.29
+HEADER_FIELDS = [
+ StructField('8s', 'Signature'),
+ StructField('4s', 'Revision'),
+ StructField('L', 'HeaderSize'),
+ StructField('l', 'CRC32'),
+ StructField('4s', 'Reserved'),
+ StructField('Q', 'CurrentLBA'),
+ StructField('Q', 'BackupLBA'),
+ StructField('Q', 'FirstUsableLBA'),
+ StructField('Q', 'LastUsableLBA'),
+ GUIDStructField('DiskGUID'),
+ StructField('Q', 'PartitionEntriesStartingLBA'),
+ StructField('L', 'PartitionEntriesNumber'),
+ StructField('L', 'PartitionEntrySize'),
+ StructField('l', 'PartitionArrayCRC32'),
+]
+
+# http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_entries
+PARTITION_FIELDS = [
+ GUIDStructField('TypeGUID'),
+ GUIDStructField('UniqueGUID'),
+ StructField('Q', 'FirstLBA'),
+ StructField('Q', 'LastLBA'),
+ PartitionAttributeStructField('Q', 'Attributes'),
+ UTF16StructField(72, 'Names'),
+]
+
+# The PMBR has so many variants. The basic format is defined in
+# https://en.wikipedia.org/wiki/Master_boot_record#Sector_layout, and our
+# implementation, as derived from `cgpt`, is following syslinux as:
+# https://chromium.googlesource.com/chromiumos/platform/vboot_reference/+/master/cgpt/cgpt.h#32
+PMBR_FIELDS = [
+ StructField('424s', 'BootCode'),
+ GUIDStructField('BootGUID'),
+ StructField('L', 'DiskID'),
+ StructField('2s', 'Magic'),
+ StructField('16s', 'LegacyPart0'),
+ StructField('16s', 'LegacyPart1'),
+ StructField('16s', 'LegacyPart2'),
+ StructField('16s', 'LegacyPart3'),
+ StructField('2s', 'Signature'),
+]
class GPTError(Exception):
@@ -240,6 +234,130 @@
pass
+class GPTObject(object):
+ """A base object in GUID Partition Table.
+
+ All objects (for instance, header or partition entries) must inherit this
+ class and define the FIELD attribute with a list of field definitions using
+ StructField.
+
+ The 'name' in StructField will become the attribute name of GPT objects that
+ can be directly packed into / unpacked from. Derived (calculated from existing
+ attributes) attributes should be in lower_case.
+
+ It is also possible to attach some additional properties to the object as meta
+ data (for example path of the underlying image file). To do that, first
+ include it in __slots__ list and specify them as dictionary-type args in
+ constructors. These properties will be preserved when you call Clone().
+
+ To create a new object, call the constructor. Field data can be assigned as
+ in arguments, or give nothing to initialize as zero (see Zero()). Field data
+ and meta values can be also specified in keyword arguments (**kargs) at the
+ same time.
+
+ To read a object from file or stream, use class method ReadFrom(source).
+ To make changes, modify the field directly or use Update(dict), or create a
+ copy by Clone() first then Update.
+
+ To wipe all fields (but not meta), call Zero(). There is currently no way
+ to clear meta except setting them to None one by one.
+ """
+ __slots__ = []
+
+ FIELDS = None
+ """A list of StructField definitions."""
+
+ def __init__(self, *args, **kargs):
+ if args:
+ if len(args) != len(self.FIELDS):
+ raise GPTError('%s need %s arguments (found %s).' %
+ (type(self).__name__, len(self.FIELDS), len(args)))
+ for f, value in zip(self.FIELDS, args):
+ setattr(self, f.name, value)
+ else:
+ self.Zero()
+
+ all_names = [f for f in self.__slots__]
+ for name, value in kargs.iteritems():
+ if name not in all_names:
+ raise GPTError('%s does not support keyword arg <%s>.' %
+ (type(self).__name__, name))
+ setattr(self, name, value)
+
+ def __iter__(self):
+ """An iterator to return all fields associated in the object."""
+ return (getattr(self, f.name) for f in self.FIELDS)
+
+ def __repr__(self):
+ return '(%s: %s)' % (type(self).__name__, ', '.join(
+ '%s=%r' %(f, getattr(self, f)) for f in self.__slots__))
+
+ @classmethod
+ def GetStructFormat(cls):
+ """Returns a format string for struct to use."""
+ return '<' + ''.join(f.fmt for f in cls.FIELDS)
+
+ @classmethod
+ def ReadFrom(cls, source, **kargs):
+ """Returns an object from given source."""
+ obj = cls(**kargs)
+ obj.Unpack(source)
+ return obj
+
+ @property
+ def blob(self):
+ """The (packed) blob representation of the object."""
+ return self.Pack()
+
+ @property
+ def meta(self):
+ """Meta values (those not in GPT object fields)."""
+ metas = set(self.__slots__) - set([f.name for f in self.FIELDS])
+ return dict((name, getattr(self, name)) for name in metas)
+
+ def Unpack(self, source):
+ """Unpacks values from a given source.
+
+ Args:
+ source: a string of bytes or a file-like object to read from.
+ """
+ fmt = self.GetStructFormat()
+ if source is None:
+ source = '\x00' * struct.calcsize(fmt)
+ if not isinstance(source, basestring):
+ return self.Unpack(source.read(struct.calcsize(fmt)))
+ for f, value in zip(self.FIELDS, struct.unpack(fmt, source)):
+ setattr(self, f.name, f.Unpack(value))
+
+ def Pack(self):
+ """Packs values in all fields into a string by struct format."""
+ return struct.pack(self.GetStructFormat(),
+ *(f.Pack(getattr(self, f.name)) for f in self.FIELDS))
+
+ def Clone(self):
+ """Clones a new instance."""
+ return type(self)(*self, **self.meta)
+
+ def Update(self, **dargs):
+ """Applies multiple values in current object."""
+ for name, value in dargs.iteritems():
+ setattr(self, name, value)
+
+ def Zero(self):
+ """Set all fields to values representing zero or empty.
+
+ Note the meta attributes won't be cleared.
+ """
+ class ZeroReader(object):
+ """A /dev/zero like stream."""
+
+ @staticmethod
+ def read(num):
+ return '\x00' * num
+
+ self.Unpack(ZeroReader())
+
+
class GPT(object):
"""A GPT helper class.
@@ -273,24 +391,19 @@
TYPE_GUID_FROM_NAME['efi'],
]
- @GPTBlob(PMBR_DESCRIPTION)
class ProtectiveMBR(GPTObject):
"""Protective MBR (PMBR) in GPT."""
+ FIELDS = PMBR_FIELDS
+ __slots__ = [f.name for f in FIELDS]
+
SIGNATURE = '\x55\xAA'
MAGIC = '\x1d\x9a'
- CLONE_CONVERTERS = {
- 'BootGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v
- }
-
- @property
- def boot_guid(self):
- """Returns the BootGUID in decoded (GUID) format."""
- return GUID(bytes_le=self.BootGUID)
-
- @GPTBlob(HEADER_DESCRIPTION)
class Header(GPTObject):
"""Wrapper to Header in GPT."""
+ FIELDS = HEADER_FIELDS
+ __slots__ = [f.name for f in FIELDS]
+
SIGNATURES = ['EFI PART', 'CHROMEOS']
SIGNATURE_IGNORE = 'IGNOREME'
DEFAULT_REVISION = '\x00\x00\x01\x00'
@@ -298,22 +411,6 @@
DEFAULT_PARTITION_ENTRIES = 128
DEFAULT_PARTITIONS_LBA = 2 # LBA 0 = MBR, LBA 1 = GPT Header.
- def Clone(self, **dargs):
- """Creates a new instance with modifications.
-
- GPT objects are usually named tuples that are immutable, so the only way
- to make changes is to create a new instance with modifications.
-
- CRC32 is always updated but PartitionArrayCRC32 must be updated explicitly
- since we can't track changes in GPT.partitions automatically.
-
- Note since GPTHeader.Clone will always update CRC, we can only check and
- compute CRC by super(GPT.Header, header).Clone, or header._replace.
- """
- dargs['CRC32'] = 0
- header = super(GPT.Header, self).Clone(**dargs)
- return super(GPT.Header, header).Clone(CRC32=binascii.crc32(header.blob))
-
@classmethod
def Create(cls, size, block_size, pad_blocks=0,
part_entries=DEFAULT_PARTITION_ENTRIES):
@@ -325,57 +422,37 @@
pad_blocks: number of preserved sectors between header and partitions.
part_entries: number of partitions to include in header.
"""
- part_entry_size = struct.calcsize(GPT.Partition.FORMAT)
+ PART_FORMAT = GPT.Partition.GetStructFormat()
+ FORMAT = cls.GetStructFormat()
+ part_entry_size = struct.calcsize(PART_FORMAT)
parts_lba = cls.DEFAULT_PARTITIONS_LBA + pad_blocks
parts_bytes = part_entries * part_entry_size
parts_blocks = parts_bytes / block_size
if parts_bytes % block_size:
parts_blocks += 1
- # PartitionsCRC32 must be updated later explicitly.
- return cls.ReadFrom(None).Clone(
+ # CRC32 and PartitionsCRC32 must be updated later explicitly.
+ return cls(
Signature=cls.SIGNATURES[0],
Revision=cls.DEFAULT_REVISION,
- HeaderSize=struct.calcsize(cls.FORMAT),
+ HeaderSize=struct.calcsize(FORMAT),
CurrentLBA=1,
BackupLBA=size / block_size - 1,
FirstUsableLBA=parts_lba + parts_blocks,
LastUsableLBA=size / block_size - parts_blocks - parts_lba,
- DiskGUID=GUID.Random().get_bytes(),
+ DiskGUID=GUID.Random(),
PartitionEntriesStartingLBA=parts_lba,
PartitionEntriesNumber=part_entries,
- PartitionEntrySize=part_entry_size,
- )
+ PartitionEntrySize=part_entry_size)
- class PartitionAttributes(object):
- """Wrapper for Partition.Attributes.
+ def UpdateChecksum(self):
+ """Updates the CRC32 field in GPT header.
- This can be created using Partition.attrs, but the changed properties won't
- apply to underlying Partition until an explicit call with
- Partition.Clone(Attributes=new_attrs).
- """
+ Note the PartitionArrayCRC32 is not touched - you have to make sure that
+ is correct before calling Header.UpdateChecksum().
+ """
+ self.Update(CRC32=0)
+ self.Update(CRC32=binascii.crc32(self.blob))
- def __init__(self, attrs):
- self._attrs = attrs
-
- @property
- def raw(self):
- """Returns the raw integer type attributes."""
- return self._Get()
-
- def _Get(self):
- return self._attrs
-
- def _Set(self, value):
- self._attrs = value
-
- successful = BitProperty(_Get, _Set, 56, 1)
- tries = BitProperty(_Get, _Set, 52, 0xf)
- priority = BitProperty(_Get, _Set, 48, 0xf)
- legacy_boot = BitProperty(_Get, _Set, 2, 1)
- required = BitProperty(_Get, _Set, 0, 1)
- raw_16 = BitProperty(_Get, _Set, 48, 0xffff)
-
- @GPTBlob(PARTITION_DESCRIPTION)
class Partition(GPTObject):
"""The partition entry in GPT.
@@ -384,32 +461,17 @@
- number: the 1-based partition number.
- block_size: an integer for size of each block (LBA, or sector).
"""
+ FIELDS = PARTITION_FIELDS
+ __slots__ = [f.name for f in FIELDS] + ['image', 'number', 'block_size']
NAMES_ENCODING = 'utf-16-le'
NAMES_LENGTH = 72
- CLONE_CONVERTERS = {
- # TODO(hungte) check if encoded name is too long.
- 'label': lambda l: (None if l is None else
- ('Names', l.encode(GPT.Partition.NAMES_ENCODING))),
- 'TypeGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v,
- 'UniqueGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v,
- 'Attributes': (
- lambda v: v.raw if isinstance(v, GPT.PartitionAttributes) else v),
- }
-
def __str__(self):
return '%s#%s' % (self.image, self.number)
- @classmethod
- def Create(cls, block_size, image, number):
- """Creates a new partition entry with given meta data."""
- part = cls.ReadFrom(
- None, image=image, number=number, block_size=block_size)
- return part
-
def IsUnused(self):
"""Returns if the partition is unused and can be allocated."""
- return self.type_guid == GPT.TYPE_GUID_UNUSED
+ return self.TypeGUID == GPT.TYPE_GUID_UNUSED
def IsChromeOSKernel(self):
"""Returns if the partition is a Chrome OS kernel partition."""
@@ -430,23 +492,6 @@
"""Returns size of partition in bytes."""
return self.blocks * self.block_size
- @property
- def type_guid(self):
- return GUID(bytes_le=self.TypeGUID)
-
- @property
- def unique_guid(self):
- return GUID(bytes_le=self.UniqueGUID)
-
- @property
- def label(self):
- """Returns the Names in decoded string type."""
- return self.Names.decode(self.NAMES_ENCODING).strip('\0')
-
- @property
- def attrs(self):
- return GPT.PartitionAttributes(self.Attributes)
-
def __init__(self):
"""GPT constructor.
@@ -478,7 +523,7 @@
gpt.block_size = block_size
gpt.header = cls.Header.Create(size, block_size, pad_blocks)
gpt.partitions = [
- cls.Partition.Create(block_size, image_name, i + 1)
+ cls.Partition(block_size=block_size, image=image_name, number=i + 1)
for i in xrange(gpt.header.PartitionEntriesNumber)]
return gpt
@@ -518,6 +563,7 @@
if i != 0:
gpt.is_secondary = True
break
+ # TODO(hungte) Try harder to see if this block is valid.
else:
# Nothing found, try next block size.
continue
@@ -527,14 +573,15 @@
raise GPTError('Invalid signature in GPT header.')
image.seek(gpt.block_size * header.PartitionEntriesStartingLBA)
- def ReadPartition(image, i):
+ def ReadPartition(image, number):
p = gpt.Partition.ReadFrom(
- image, image=image.name, number=i + 1, block_size=gpt.block_size)
+ image, image=image.name, number=number, block_size=gpt.block_size)
return p
gpt.header = header
gpt.partitions = [
- ReadPartition(image, i) for i in range(header.PartitionEntriesNumber)]
+ ReadPartition(image, i + 1)
+ for i in xrange(header.PartitionEntriesNumber)]
return gpt
def GetUsedPartitions(self):
@@ -571,14 +618,21 @@
raise GPTError('Invalid partition number %s.' % number)
return self.partitions[number - 1]
- def UpdatePartition(self, part, number=None):
+ def UpdatePartition(self, part, number):
"""Updates the entry in partition table by given Partition object.
+ Usually you only need to call this if you want to copy one partition to
+ different location (number of image).
+
Args:
part: a Partition GPT object.
- number: an integer as 1-based partition number. None to use part.number.
+ number: an integer as 1-based partition number.
"""
- number = part.number if number is None else number
+ ref = self.partitions[number - 1]
+ part = part.Clone()
+ part.number = number
+ part.image = ref.image
+ part.block_size = ref.block_size
self.partitions[number - 1] = part
def Resize(self, new_size):
@@ -609,8 +663,7 @@
if last_usable_lba < max_used_lba:
raise GPTError('Backup partition tables will overlap used partitions')
- self.header = self.header.Clone(
- BackupLBA=backup_lba, LastUsableLBA=last_usable_lba)
+ self.header.Update(BackupLBA=backup_lba, LastUsableLBA=last_usable_lba)
def GetFreeSpace(self):
"""Returns the free (available) space left according to LastUsableLBA."""
@@ -639,21 +692,12 @@
'Cannot expand %s because it is not allocated at last.' % p)
old_blocks = p.blocks
- p = p.Clone(LastLBA=self.header.LastUsableLBA)
+ p.Update(LastLBA=self.header.LastUsableLBA)
new_blocks = p.blocks
- self.UpdatePartition(p)
logging.warn(
'%s expanded, size in LBA: %d -> %d.', p, old_blocks, new_blocks)
return (old_blocks, new_blocks)
- def GetIgnoredHeader(self):
- """Returns a primary header with signature set to 'IGNOREME'.
-
- This is a special trick to enforce using backup header, when there is
- some security exploit in LBA1.
- """
- return self.header.Clone(Signature=self.header.SIGNATURE_IGNORE)
-
def CheckIntegrity(self):
"""Checks if the GPT objects all look good."""
# Check if the header allocation looks good. CurrentLBA and
@@ -709,24 +753,32 @@
if (binascii.crc32(''.join(p.blob for p in self.partitions)) !=
header.PartitionArrayCRC32):
raise GPTError('GPT Header PartitionArrayCRC32 does not match.')
+ header_crc = header.Clone()
+ header_crc.UpdateChecksum()
+ if header_crc.CRC32 != header.CRC32:
+ raise GPTError('GPT Header CRC32 does not match.')
def UpdateChecksum(self):
- """Updates all checksum fields in GPT objects.
-
- The Header.CRC32 is automatically updated in Header.Clone().
- """
+ """Updates all checksum fields in GPT objects."""
parts = ''.join(p.blob for p in self.partitions)
- self.header = self.header.Clone(
- PartitionArrayCRC32=binascii.crc32(parts))
+ self.header.Update(PartitionArrayCRC32=binascii.crc32(parts))
+ self.header.UpdateChecksum()
def GetBackupHeader(self, header):
- """Returns the backup header according to given header."""
+ """Returns the backup header according to given header.
+
+ This should be invoked only after GPT.UpdateChecksum() has updated all CRC32
+ fields.
+ """
partitions_starting_lba = (
header.BackupLBA - self.GetPartitionTableBlocks())
- return header.Clone(
+ h = header.Clone()
+ h.Update(
BackupLBA=header.CurrentLBA,
CurrentLBA=header.BackupLBA,
PartitionEntriesStartingLBA=partitions_starting_lba)
+ h.UpdateChecksum()
+ return h
@classmethod
def WriteProtectiveMBR(cls, image, create, bootcode=None, boot_guid=None):
@@ -750,7 +802,8 @@
return cls.WriteProtectiveMBR(f, create, bootcode, boot_guid)
image.seek(0)
- assert struct.calcsize(cls.ProtectiveMBR.FORMAT) == cls.DEFAULT_BLOCK_SIZE
+ pmbr_format = cls.ProtectiveMBR.GetStructFormat()
+ assert struct.calcsize(pmbr_format) == cls.DEFAULT_BLOCK_SIZE
pmbr = cls.ProtectiveMBR.ReadFrom(image)
if create:
@@ -764,24 +817,24 @@
# Partition 1~3 should be all zero.
part1 = '\x00' * 16
assert len(part0) == len(part1) == 16, 'MBR entry is wrong.'
- pmbr = pmbr.Clone(
+ pmbr.Update(
BootGUID=cls.TYPE_GUID_UNUSED,
DiskID=0,
- Magic=cls.ProtectiveMBR.MAGIC,
+ Magic=pmbr.MAGIC,
LegacyPart0=part0,
LegacyPart1=part1,
LegacyPart2=part1,
LegacyPart3=part1,
- Signature=cls.ProtectiveMBR.SIGNATURE)
+ Signature=pmbr.SIGNATURE)
if bootcode:
if len(bootcode) > len(pmbr.BootCode):
logging.info(
'Bootcode is larger (%d > %d)!', len(bootcode), len(pmbr.BootCode))
bootcode = bootcode[:len(pmbr.BootCode)]
- pmbr = pmbr.Clone(BootCode=bootcode)
+ pmbr.Update(BootCode=bootcode)
if boot_guid:
- pmbr = pmbr.Clone(BootGUID=boot_guid)
+ pmbr.Update(BootGUID=boot_guid)
blob = pmbr.blob
assert len(blob) == cls.DEFAULT_BLOCK_SIZE
@@ -988,7 +1041,7 @@
gpt.is_secondary = True
else:
new_signature = gpt.Header.SIGNATURES[0 if args.efi else 1]
- gpt.header = gpt.header.Clone(Signature=new_signature)
+ gpt.header.Update(Signature=new_signature)
gpt.WriteToFile(args.image_file)
if args.primary_ignore:
print('OK: Set %s primary GPT header to %s.' %
@@ -1106,7 +1159,8 @@
is_new_part = part.IsUnused()
if is_new_part:
- part = part.ReadFrom(None, **part.__dict__).Clone(
+ part.Zero()
+ part.Update(
FirstLBA=gpt.GetMaxUsedLBA() + 1,
LastLBA=gpt.header.LastUsableLBA,
UniqueGUID=GUID.Random(),
@@ -1121,13 +1175,13 @@
def GetArg(arg_value, default_value):
return default_value if arg_value is None else arg_value
- attrs = part.attrs
+ attrs = part.Attributes
for name in ['legacy_boot', 'required', 'priority', 'tries',
'successful', 'raw_16']:
UpdateAttr(name)
first_lba = GetArg(args.begin, part.FirstLBA)
- part = part.Clone(
- label=GetArg(args.label, part.label),
+ part.Update(
+ Names=GetArg(args.label, part.Names),
FirstLBA=first_lba,
LastLBA=first_lba - 1 + GetArg(args.sectors, part.blocks),
TypeGUID=GetArg(args.type_guid, part.TypeGUID),
@@ -1136,9 +1190,8 @@
# Wipe partition again if it should be empty.
if part.IsUnused():
- part = part.ReadFrom(None, **part.__dict__)
+ part.Zero()
- gpt.UpdatePartition(part)
gpt.WriteToFile(args.image_file)
if part.IsUnused():
# If we do ('%s' % part) there will be TypeError.
@@ -1177,7 +1230,7 @@
"""Show partition table and entries."""
def FormatTypeGUID(p):
- guid = p.type_guid
+ guid = p.TypeGUID
if not args.numeric:
names = gpt.TYPE_GUID_MAP.get(guid)
if names:
@@ -1212,19 +1265,19 @@
elif args.type:
return FormatTypeGUID(p)
elif args.unique:
- return p.unique_guid
+ return p.UniqueGUID
elif args.label:
- return p.label
+ return p.Names
elif args.Successful:
- return p.attrs.successful
+ return p.Attributes.successful
elif args.Priority:
- return p.attrs.priority
+ return p.Attributes.priority
elif args.Tries:
- return p.attrs.tries
+ return p.Attributes.tries
elif args.Legacy:
- return p.attrs.legacy_boot
+ return p.Attributes.legacy_boot
elif args.Attribute:
- return '[%x]' % (p.Attributes >> 48)
+ return '[%x]' % (p.Attributes.raw >> 48)
else:
return None
@@ -1275,14 +1328,14 @@
print(fmt % (p.FirstLBA, p.blocks, p.number,
FormatTypeGUID(p) if args.quick else
- 'Label: "%s"' % p.label))
+ 'Label: "%s"' % p.Names))
if not args.quick:
print(fmt2 % ('', 'Type', FormatTypeGUID(p)))
- print(fmt2 % ('', 'UUID', p.unique_guid))
- if args.numeric or IsBootableType(p.type_guid):
+ print(fmt2 % ('', 'UUID', p.UniqueGUID))
+ if args.numeric or IsBootableType(p.TypeGUID):
print(fmt2 % ('', 'Attr', FormatAttribute(
- p.attrs, p.IsChromeOSKernel())))
+ p.Attributes, p.IsChromeOSKernel())))
if do_print_gpt_blocks:
if gpt.is_secondary:
@@ -1331,16 +1384,18 @@
def Execute(self, args):
gpt = GPT.LoadFromFile(args.image_file)
parts = [p for p in gpt.partitions if p.IsChromeOSKernel()]
- prios = list(set(p.attrs.priority for p in parts if p.attrs.priority))
+ prios = list(set(p.Attributes.priority for p in parts
+ if p.Attributes.priority))
prios.sort(reverse=True)
- groups = [[p for p in parts if p.attrs.priority == priority]
+ groups = [[p for p in parts if p.Attributes.priority == priority]
for priority in prios]
if args.number:
p = gpt.GetPartition(args.number)
if p not in parts:
raise GPTError('%s is not a ChromeOS kernel.' % p)
if args.friends:
- group0 = [f for f in parts if f.attrs.priority == p.attrs.priority]
+ group0 = [f for f in parts
+ if f.Attributes.priority == p.Attributes.priority]
else:
group0 = [p]
groups.insert(0, group0)
@@ -1357,12 +1412,11 @@
if p.number in done:
continue
done.append(p.number)
- attrs = p.attrs
+ attrs = p.Attributes
old_priority = attrs.priority
assert new_priority > 0, 'Priority must be > 0.'
attrs.priority = new_priority
- p = p.Clone(Attributes=attrs)
- gpt.UpdatePartition(p)
+ p.Update(Attributes=attrs)
has_new_part = True
logging.info('%s priority changed from %s to %s.', p, old_priority,
new_priority)
@@ -1440,9 +1494,9 @@
for p in gpt.partitions:
if (p.IsUnused() or
- Unmatch(args.label, p.label) or
- Unmatch(args.unique_guid, p.unique_guid) or
- Unmatch(args.type_guid, p.type_guid)):
+ Unmatch(args.label, p.Names) or
+ Unmatch(args.unique_guid, p.UniqueGUID) or
+ Unmatch(args.type_guid, p.TypeGUID)):
continue
if match_pattern:
with open(drive, 'rb') as f: