pygpt: Extend named tuple objects into more useful objects with properties.

The raw implementation of pygpt.GPT is exposing objects in pure named
tuples, which makes it pretty hard to use since every pygpt user (for
example image_tool) has to implement the logic of LBA calculation,
encoding and decoding of UTF-16 strings, GUID conversion, ... etc.

This change tries to enhance the named tuple classes by extending
helpful properties in derived classes, making it more similar to the
Partition object defined in image_tool.

Additionally, as discovered in image_tool.Partition, the GPT objects may
need more information that is not available in GPT, for instance disk
image file name or block size. This is solved by the __dict__ in
GPTObject objects and maintained in Clone and ReadFrom.

With all these changes, we can now largely simplify the manipulation of
GPT objects using the help functions and properties.

BUG=chromium:834237
TEST=make test

Change-Id: I3f3ca55fb50efc503851c493aa4ccb279029e6d8
Reviewed-on: https://chromium-review.googlesource.com/1013451
Commit-Ready: Hung-Te Lin <hungte@chromium.org>
Tested-by: Hung-Te Lin <hungte@chromium.org>
Reviewed-by: Hung-Te Lin <hungte@chromium.org>
diff --git a/py/utils/pygpt.py b/py/utils/pygpt.py
index de4eb9e..d9cdc2a 100755
--- a/py/utils/pygpt.py
+++ b/py/utils/pygpt.py
@@ -40,7 +40,7 @@
 # The binascii.crc32 returns signed integer, so CRC32 in in struct must be
 # declared as 'signed' (l) instead of 'unsigned' (L).
 # http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_table_header_.28LBA_1.29
-HEADER_FORMAT = """
+HEADER_DESCRIPTION = """
    8s Signature
    4s Revision
     L HeaderSize
@@ -58,7 +58,7 @@
 """
 
 # http://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_entries
-PARTITION_FORMAT = """
+PARTITION_DESCRIPTION = """
   16s TypeGUID
   16s UniqueGUID
     Q FirstLBA
@@ -68,20 +68,141 @@
 """
 
 
-def BuildStructFormatAndNamedTuple(name, description):
-  """Builds the format string for struct and create corresponding namedtuple.
+def BitProperty(getter, setter, shift, mask):
+  """A generator for bit-field properties.
+
+  This is used inside a class to manipulate an integer-like variable using
+  properties. The getter and setter should be member functions to change the
+  underlying member data.
 
   Args:
-    name: A string for name of the named tuple.
-    description: A string with struct descriptor and attribute name.
-
-  Returns:
-    A pair of (struct_format, namedtuple_class).
+    getter: a function to read integer type variable (for all the bits).
+    setter: a function to set the new changed integer type variable.
+    shift: integer for how many bits should be shifted (right).
+    mask: integer for the mask to filter out bit field.
   """
-  elements = description.split()
-  struct_format = '<' + ''.join(elements[::2])
-  tuple_class = collections.namedtuple(name, elements[1::2])
-  return (struct_format, tuple_class)
+  def _getter(self):
+    return (getter(self) >> shift) & mask
+  def _setter(self, value):
+    assert value & mask == value, (
+        'Value %s out of range (mask=%s)' % (value, mask))
+    setter(self, getter(self) & ~(mask << shift) | value << shift)
+  return property(_getter, _setter)
+
+
+class GPTBlob(object):
+  """A decorator class to help accessing GPT blobs as named tuple.
+
+  To use this, specify the blob description (struct format and named tuple field
+  names) above the derived class, for example:
+
+  @GPTBlob(description):
+  class Header(GPTObject):
+    pass
+  """
+  def __init__(self, description):
+    spec = description.split()
+    self.struct_format = '<' + ''.join(spec[::2])
+    self.fields = spec[1::2]
+
+  def __call__(self, cls):
+    new_bases = ((
+        collections.namedtuple(cls.__name__, self.fields),) + cls.__bases__)
+    new_cls = type(cls.__name__, new_bases, dict(cls.__dict__))
+    setattr(new_cls, 'FORMAT', self.struct_format)
+    return new_cls
+
+
+class GPTObject(object):
+  """An object in GUID Partition Table.
+
+  This needs to be decorated by @GPTBlob(description) and inherited by a real
+  class. Properties (not member functions) in CamelCase should be reserved for
+  named tuple attributes.
+
+  To create a new object, use class method ReadFrom(), which takes a stream
+  as input or None to create with all elements set to zero.  To make changes to
+  named tuple elements, use member function Clone(changes).
+
+  It is also possible to attach some additional properties to the object as meta
+  data (for example path of the underlying image file). To do that, specify the
+  data as keyword arguments when calling ReadFrom(). These properties will be
+  preserved when you call Clone().
+
+  A special case is "reset named tuple elements of an object but keeping all
+  properties", for example changing a partition object to unused (zeroed).
+  ReadFrom() is a class method so properties won't be copied. You need to
+  call as cls.ReadFrom(None, **p.__dict__), or a short cut - p.CloneAndZero().
+  """
+
+  FORMAT = None
+  """The struct.{pack,unpack} format string, and should be set by GPTBlob."""
+
+  CLONE_CONVERTERS = None
+  """A dict (name, cvt) to convert input arguments into named tuple data.
+
+  `name` is a string for the name of argument to convert.
+  `cvt` is a callable to convert value. The return value may be:
+  - a tuple in (new_name, value): save the value as new name.
+  - otherwise, save the value in original name.
+  Note tuple is an invalid input for struct.unpack so it's used for the
+  special value.
+  """
+
+  @classmethod
+  def ReadFrom(cls, f, **kargs):
+    """Reads and decode an object from stream.
+
+    Args:
+      f: a stream to read blob, or None to decode with all zero bytes.
+      kargs: a dict for additional attributes in object.
+    """
+    if f is None:
+      reader = lambda num: '\x00' * num
+    else:
+      reader = f.read
+    data = cls(*struct.unpack(cls.FORMAT, reader(struct.calcsize(cls.FORMAT))))
+    # Named tuples do not accept kargs in constructor.
+    data.__dict__.update(kargs)
+    return data
+
+  def Clone(self, **dargs):
+    """Clones a new instance with modifications.
+
+    GPT objects are usually named tuples that are immutable, so the only way
+    to make changes is to create a new instance with modifications.
+
+    Args:
+      dargs: a dict with all modifications.
+    """
+    for name, convert in (self.CLONE_CONVERTERS or {}).iteritems():
+      if name not in dargs:
+        continue
+      result = convert(dargs.pop(name))
+      if isinstance(result, tuple):
+        assert len(result) == 2, 'Converted tuple must be (name, value).'
+        dargs[result[0]] = result[1]
+      else:
+        dargs[name] = result
+
+    cloned = self._replace(**dargs)
+    cloned.__dict__.update(self.__dict__)
+    return cloned
+
+  def CloneAndZero(self, **dargs):
+    """Short cut to create a zeroed object while keeping all properties.
+
+    This is very similar to Clone except all named tuple elements will be zero.
+    Also different from class method ReadFrom(None) because this keeps all
+    properties from one object.
+    """
+    cloned = self.ReadFrom(None, **self.__dict__)
+    return cloned.Clone(**dargs) if dargs else cloned
+
+  @property
+  def blob(self):
+    """Returns the object in formatted bytes."""
+    return struct.pack(self.FORMAT, *self)
 
 
 class GPT(object):
@@ -91,16 +212,12 @@
   After modifications were made, use `WriteToFile` to commit changes.
 
   Attributes:
-    header: A namedtuple of GPT header.
-    partitions: A list of GPT partition entry nametuple.
+    header: a namedtuple of GPT header.
+    partitions: a list of GPT partition entry nametuple.
+    block_size: integer for size of bytes in one block (sector).
   """
 
-  HEADER_FORMAT, HEADER_CLASS = BuildStructFormatAndNamedTuple(
-      'Header', HEADER_FORMAT)
-  PARTITION_FORMAT, PARTITION_CLASS = BuildStructFormatAndNamedTuple(
-      'Partition', PARTITION_FORMAT)
   DEFAULT_BLOCK_SIZE = 512
-  HEADER_SIGNATURE = 'EFI PART'
   TYPE_GUID_UNUSED = '\x00' * 16
   TYPE_GUID_MAP = {
       '00000000-0000-0000-0000-000000000000': 'Unused',
@@ -116,55 +233,130 @@
       'C12A7328-F81F-11D2-BA4B-00A0C93EC93B',  # EFI System Partition
   ]
 
+  @GPTBlob(HEADER_DESCRIPTION)
+  class Header(GPTObject):
+    """Wrapper to Header in GPT."""
+    SIGNATURES = ['EFI PART', 'CHROMEOS']
+    SIGNATURE_IGNORE = 'IGNOREME'
+    DEFAULT_REVISION = '\x00\x00\x01\x00'
+
+    DEFAULT_PARTITION_ENTRIES = 128
+    DEFAULT_PARTITIONS_LBA = 2  # LBA 0 = MBR, LBA 1 = GPT Header.
+
+    def Clone(self, **dargs):
+      """Creates a new instance with modifications.
+
+      GPT objects are usually named tuples that are immutable, so the only way
+      to make changes is to create a new instance with modifications.
+
+      CRC32 is always updated but PartitionArrayCRC32 must be updated explicitly
+      since we can't track changes in GPT.partitions automatically.
+
+      Note since GPTHeader.Clone will always update CRC, we can only check and
+      compute CRC by super(GPT.Header, header).Clone, or header._replace.
+      """
+      dargs['CRC32'] = 0
+      header = super(GPT.Header, self).Clone(**dargs)
+      return super(GPT.Header, header).Clone(CRC32=binascii.crc32(header.blob))
+
+  class PartitionAttributes(object):
+    """Wrapper for Partition.Attributes.
+
+    This can be created using Partition.attrs, but the changed properties won't
+    apply to underlying Partition until an explicit call with
+    Partition.Clone(Attributes=new_attrs).
+    """
+
+    def __init__(self, attrs):
+      self._attrs = attrs
+
+    @property
+    def raw(self):
+      """Returns the raw integer type attributes."""
+      return self._Get()
+
+    def _Get(self):
+      return self._attrs
+
+    def _Set(self, value):
+      self._attrs = value
+
+    successful = BitProperty(_Get, _Set, 56, 1)
+    tries = BitProperty(_Get, _Set, 52, 0xf)
+    priority = BitProperty(_Get, _Set, 48, 0xf)
+    legacy_boot = BitProperty(_Get, _Set, 2, 1)
+    required = BitProperty(_Get, _Set, 0, 1)
+
+  @GPTBlob(PARTITION_DESCRIPTION)
+  class Partition(GPTObject):
+    """The partition entry in GPT.
+
+    Please include following properties when creating a Partition object:
+    - image: a string for path to the image file the partition maps to.
+    - number: the 1-based partition number.
+    - block_size: an integer for size of each block (LBA, or sector).
+    """
+    NAMES_ENCODING = 'utf-16-le'
+    NAMES_LENGTH = 72
+
+    CLONE_CONVERTERS = {
+        # TODO(hungte) check if encoded name is too long.
+        'label': lambda l: (None if l is None else
+                            ('Names', l.encode(GPT.Partition.NAMES_ENCODING))),
+        'TypeGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v,
+        'UniqueGUID': lambda v: v.bytes_le if isinstance(v, uuid.UUID) else v,
+        'Attributes': (
+            lambda v: v.raw if isinstance(v, GPT.PartitionAttributes) else v),
+    }
+
+    def __str__(self):
+      return '%s#%s' % (self.image, self.number)
+
+    def IsUnused(self):
+      """Returns if the partition is unused and can be allocated."""
+      return self.TypeGUID == GPT.TYPE_GUID_UNUSED
+
+    @property
+    def blocks(self):
+      """Return size of partition in blocks (see block_size)."""
+      return self.LastLBA - self.FirstLBA + 1
+
+    @property
+    def offset(self):
+      """Returns offset to partition in bytes."""
+      return self.FirstLBA * self.block_size
+
+    @property
+    def size(self):
+      """Returns size of partition in bytes."""
+      return self.blocks * self.block_size
+
+    @property
+    def type_guid(self):
+      return uuid.UUID(bytes_le=self.TypeGUID)
+
+    @property
+    def unique_guid(self):
+      return uuid.UUID(bytes_le=self.UniqueGUID)
+
+    @property
+    def label(self):
+      """Returns the Names in decoded string type."""
+      return self.Names.decode(self.NAMES_ENCODING).strip('\0')
+
+    @property
+    def attrs(self):
+      return GPT.PartitionAttributes(self.Attributes)
+
   def __init__(self):
+    """GPT constructor.
+
+    See LoadFromFile for how it's usually used.
+    """
     self.header = None
     self.partitions = None
     self.block_size = self.DEFAULT_BLOCK_SIZE
 
-  @staticmethod
-  def GetAttributeSuccess(attrs):
-    return (attrs >> 56) & 1
-
-  @staticmethod
-  def GetAttributeTries(attrs):
-    return (attrs >> 52) & 0xf
-
-  @staticmethod
-  def GetAttributePriority(attrs):
-    return (attrs >> 48) & 0xf
-
-  @staticmethod
-  def NewNamedTuple(base, **dargs):
-    """Builds a new named tuple based on dargs."""
-    return base._replace(**dargs)
-
-  @classmethod
-  def ReadHeader(cls, f):
-    return cls.HEADER_CLASS(*struct.unpack(
-        cls.HEADER_FORMAT, f.read(struct.calcsize(cls.HEADER_FORMAT))))
-
-  @classmethod
-  def ReadPartitionEntry(cls, f):
-    return cls.PARTITION_CLASS(*struct.unpack(
-        cls.PARTITION_FORMAT, f.read(struct.calcsize(cls.PARTITION_FORMAT))))
-
-  @classmethod
-  def GetHeaderBlob(cls, header):
-    return struct.pack(cls.HEADER_FORMAT, *header)
-
-  @classmethod
-  def GetHeaderCRC32(cls, header):
-    return binascii.crc32(cls.GetHeaderBlob(cls.NewNamedTuple(header, CRC32=0)))
-
-  @classmethod
-  def GetPartitionsBlob(cls, partitions):
-    return ''.join(struct.pack(cls.PARTITION_FORMAT, *partition)
-                   for partition in partitions)
-
-  @classmethod
-  def GetPartitionsCRC32(cls, partitions):
-    return binascii.crc32(cls.GetPartitionsBlob(partitions))
-
   @classmethod
   def LoadFromFile(cls, image):
     """Loads a GPT table from give disk image file object.
@@ -176,22 +368,26 @@
       with open(image, 'rb') as f:
         return cls.LoadFromFile(f)
 
-    gpt = GPT()
+    gpt = cls()
     # Try DEFAULT_BLOCK_SIZE, then 4K.
     for block_size in [cls.DEFAULT_BLOCK_SIZE, 4096]:
       image.seek(block_size * 1)
-      header = gpt.ReadHeader(image)
-      if header.Signature == cls.HEADER_SIGNATURE:
+      header = gpt.Header.ReadFrom(image)
+      if header.Signature in cls.Header.SIGNATURES:
         gpt.block_size = block_size
         break
     else:
       raise ValueError('Invalid signature in GPT header.')
 
     image.seek(gpt.block_size * header.PartitionEntriesStartingLBA)
-    partitions = [gpt.ReadPartitionEntry(image)
-                  for unused_i in range(header.PartitionEntriesNumber)]
+    def ReadPartition(image, i):
+      p = gpt.Partition.ReadFrom(
+          image, image=image.name, number=i + 1, block_size=gpt.block_size)
+      return p
+
     gpt.header = header
-    gpt.partitions = partitions
+    gpt.partitions = [
+        ReadPartition(image, i) for i in range(header.PartitionEntriesNumber)]
     return gpt
 
   def GetValidPartitions(self):
@@ -199,16 +395,17 @@
 
     In partition table, the first entry with empty type GUID indicates end of
     valid partitions. In most implementations all partitions after that should
-    be zeroed.
+    be zeroed. However, few implementations for example cgpt, may create
+    partitions in arbitrary order so use this carefully.
     """
     for i, p in enumerate(self.partitions):
-      if p.TypeGUID == self.TYPE_GUID_UNUSED:
+      if p.IsUnused():
         return self.partitions[:i]
     return self.partitions
 
   def GetMaxUsedLBA(self):
     """Returns the max LastLBA from all used partitions."""
-    parts = [p for p in self.partitions if p.TypeGUID != GPT.TYPE_GUID_UNUSED]
+    parts = [p for p in self.partitions if not p.IsUnused()]
     return (max(p.LastLBA for p in parts)
             if parts else self.header.FirstUsableLBA - 1)
 
@@ -250,10 +447,8 @@
       if last_usable_lba < max_used_lba:
         raise ValueError('Backup partition tables will overlap used partitions')
 
-    self.header = self.NewNamedTuple(
-        self.header,
-        BackupLBA=backup_lba,
-        LastUsableLBA=last_usable_lba)
+    self.header = self.header.Clone(
+        BackupLBA=backup_lba, LastUsableLBA=last_usable_lba)
 
   def GetFreeSpace(self):
     """Returns the free (available) space left according to LastUsableLBA."""
@@ -274,39 +469,34 @@
     p = self.partitions[i]
     max_used_lba = self.GetMaxUsedLBA()
     if max_used_lba > p.LastLBA:
-      raise ValueError('Cannot expand partition %d because it is not the last '
-                       'allocated partition.' % (i + 1))
+      raise ValueError(
+          'Cannot expand partition %d because it is not the last allocated '
+          'partition.' % (i + 1))
 
-    old_blocks = p.LastLBA - p.FirstLBA + 1
-    p = self.NewNamedTuple(p, LastLBA=self.header.LastUsableLBA)
-    new_blocks = p.LastLBA - p.FirstLBA + 1
+    old_blocks = p.blocks
+    p = p.Clone(LastLBA=self.header.LastUsableLBA)
+    new_blocks = p.blocks
     self.partitions[i] = p
     logging.warn('Partition NR=%d expanded, size in LBA: %d -> %d.',
                  i + 1, old_blocks, new_blocks)
 
   def UpdateChecksum(self):
-    """Updates all checksum values in GPT header."""
-    header = self.NewNamedTuple(
-        self.header,
-        CRC32=0,
-        PartitionArrayCRC32=self.GetPartitionsCRC32(self.partitions))
-    self.header = self.NewNamedTuple(
-        header,
-        CRC32=self.GetHeaderCRC32(header))
+    """Updates all checksum fields in GPT objects.
+
+    The Header.CRC32 is automatically updated in Header.Clone().
+    """
+    parts = ''.join(p.blob for p in self.partitions)
+    self.header = self.header.Clone(
+        PartitionArrayCRC32=binascii.crc32(parts))
 
   def GetBackupHeader(self):
     """Returns the backup header according to current header."""
     partitions_starting_lba = (
         self.header.BackupLBA - self.GetPartitionTableBlocks())
-    header = self.NewNamedTuple(
-        self.header,
-        CRC32=0,
+    return self.header.Clone(
         BackupLBA=self.header.CurrentLBA,
         CurrentLBA=self.header.BackupLBA,
         PartitionEntriesStartingLBA=partitions_starting_lba)
-    return self.NewNamedTuple(
-        header,
-        CRC32=self.GetHeaderCRC32(header))
 
   def WriteToFile(self, image):
     """Updates partition table in a disk image file.
@@ -326,17 +516,17 @@
       image.write(blob)
 
     self.UpdateChecksum()
-    WriteData('GPT Header', self.GetHeaderBlob(self.header),
-              self.header.CurrentLBA)
-    WriteData('GPT Partitions', self.GetPartitionsBlob(self.partitions),
-              self.header.PartitionEntriesStartingLBA)
+    parts_blob = ''.join(p.blob for p in self.partitions)
+    WriteData('GPT Header', self.header.blob, self.header.CurrentLBA)
+    WriteData(
+        'GPT Partitions', parts_blob, self.header.PartitionEntriesStartingLBA)
     logging.info('Usable LBA: First=%d, Last=%d',
                  self.header.FirstUsableLBA, self.header.LastUsableLBA)
     backup_header = self.GetBackupHeader()
-    WriteData('Backup Partitions', self.GetPartitionsBlob(self.partitions),
-              backup_header.PartitionEntriesStartingLBA)
-    WriteData('Backup Header', self.GetHeaderBlob(backup_header),
-              backup_header.CurrentLBA)
+    WriteData(
+        'Backup Partitions', parts_blob,
+        backup_header.PartitionEntriesStartingLBA)
+    WriteData('Backup Header', backup_header.blob, backup_header.CurrentLBA)
 
 
 class GPTCommands(object):
@@ -348,7 +538,7 @@
 
   FORMAT_ARGS = [
       ('begin', 'beginning sector'),
-      ('size', 'partition size'),
+      ('size', 'partition size (in sectors)'),
       ('type', 'type guid'),
       ('unique', 'unique guid'),
       ('label', 'label'),
@@ -434,35 +624,33 @@
     def IsBootableType(type_guid):
       return type_guid in gpt.TYPE_GUID_LIST_BOOTABLE
 
-    def FormatAttribute(attr):
+    def FormatAttribute(attrs):
       if args.numeric:
-        return '[%x]' % (attr >> 48)
-      if attr & 4:
+        return '[%x]' % (attrs.raw >> 48)
+      if attrs.legacy_boot:
         return 'legacy_boot=1'
       return 'priority=%d tries=%d successful=%d' % (
-          gpt.GetAttributePriority(attr),
-          gpt.GetAttributeTries(attr),
-          gpt.GetAttributeSuccess(attr))
+          attrs.priority, attrs.tries, attrs.successful)
 
     def ApplyFormatArgs(p):
       if args.begin:
         return p.FirstLBA
       elif args.size:
-        return p.LastLBA - p.FirstLBA + 1
+        return p.blocks
       elif args.type:
         return FormatTypeGUID(p)
       elif args.unique:
         return FormatGUID(p.UniqueGUID)
       elif args.label:
-        return FormatNames(p)
+        return p.label
       elif args.Successful:
-        return gpt.GetAttributeSuccess(p.Attributes)
+        return p.attrs.successful
       elif args.Priority:
-        return gpt.GetAttributePriority(p.Attributes)
+        return p.attrs.priority
       elif args.Tries:
-        return gpt.GetAttributeTries(p.Attributes)
+        return p.attrs.tries
       elif args.Legacy:
-        raise NotImplementedError
+        return p.attrs.legacy_boot
       elif args.Attribute:
         return '[%x]' % (p.Attributes >> 48)
       else:
@@ -511,12 +699,12 @@
         print(fmt2 % ('', 'Type', FormatTypeGUID(p)))
         print(fmt2 % ('', 'UUID', FormatGUID(p.UniqueGUID)))
         if args.numeric or IsBootableType(type_guid):
-          print(fmt2 % ('', 'Attr', FormatAttribute(p.Attributes)))
+          print(fmt2 % ('', 'Attr', FormatAttribute(p.attrs)))
 
     if do_print_gpt_blocks:
       f = args.image_file
       f.seek(gpt.header.BackupLBA * gpt.block_size)
-      backup_header = gpt.ReadHeader(f)
+      backup_header = gpt.Header.ReadFrom(f)
       print(fmt % (backup_header.PartitionEntriesStartingLBA,
                    gpt.GetPartitionTableBlocks(backup_header), '',
                    'Sec GPT table'))