pygpt: Change sub command so it can be reused by other commands.
Revise pygpt sub command system so it's more like image_tool, and can be
invoked directly from image_tool.
The ultimate goal will be to invoke pygpt as python module within
image_tool without shell execution.
BUG=chromium:834237
TEST=make test
Change-Id: I62c746409302f6ec9a8bf3345d8317b6abe32ebb
Reviewed-on: https://chromium-review.googlesource.com/1013453
Commit-Ready: Hung-Te Lin <hungte@chromium.org>
Tested-by: Hung-Te Lin <hungte@chromium.org>
Reviewed-by: Pi-Hsun Shih <pihsun@chromium.org>
diff --git a/py/utils/pygpt.py b/py/utils/pygpt.py
index 3fe2d3e..292286d 100755
--- a/py/utils/pygpt.py
+++ b/py/utils/pygpt.py
@@ -224,6 +224,7 @@
DEFAULT_BLOCK_SIZE = 512
TYPE_GUID_UNUSED = '\x00' * 16
+ TYPE_NAME_CHROMEOS_KERNEL = 'ChromeOS kernel'
TYPE_GUID_MAP = {
'00000000-0000-0000-0000-000000000000': 'Unused',
'EBD0A0A2-B9E5-4433-87C0-68B6B72699C7': 'Linux data',
@@ -466,11 +467,14 @@
Args:
i: Index (0-based) of target partition.
+
+ Returns:
+ (old_blocks, new_blocks) for size in blocks.
"""
# Assume no partitions overlap, we need to make sure partition[i] has
# largest LBA.
if i < 0 or i >= len(self.GetValidPartitions()):
- raise GPTError('Partition index %d is invalid.' % (i + 1))
+ raise GPTError('Partition number %d is invalid.' % (i + 1))
p = self.partitions[i]
max_used_lba = self.GetMaxUsedLBA()
if max_used_lba > p.LastLBA:
@@ -482,8 +486,9 @@
p = p.Clone(LastLBA=self.header.LastUsableLBA)
new_blocks = p.blocks
self.partitions[i] = p
- logging.warn('Partition NR=%d expanded, size in LBA: %d -> %d.',
- i + 1, old_blocks, new_blocks)
+ logging.warn(
+ '%s expanded, size in LBA: %d -> %d.', p, old_blocks, new_blocks)
+ return (old_blocks, new_blocks)
def UpdateChecksum(self):
"""Updates all checksum fields in GPT objects.
@@ -554,214 +559,244 @@
('Attribute', 'raw 16-bit attribute value (bits 48-63)')]
def __init__(self):
- pass
+ commands = dict(
+ (command.lower(), getattr(self, command)())
+ for command in dir(self)
+ if (isinstance(getattr(self, command), type) and
+ issubclass(getattr(self, command), self.SubCommand) and
+ getattr(self, command) is not self.SubCommand)
+ )
+ self.commands = commands
- @classmethod
- def RegisterRepair(cls, p):
- """Registers the repair command to argparser.
+ def DefineArgs(self, parser):
+ """Defines all available commands to an argparser subparsers instance."""
+ subparsers = parser.add_subparsers(help='Sub-command help.', dest='command')
+ for name, instance in sorted(self.commands.iteritems()):
+ parser = subparsers.add_parser(
+ name, description=instance.__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ help=instance.__doc__.splitlines()[0])
+ instance.DefineArgs(parser)
- Args:
- p: An argparse parser instance.
- """
- p.add_argument(
- '--expand', action='store_true', default=False,
- help='Expands stateful partition to full disk.')
- p.add_argument('image_file', type=argparse.FileType('rb+'),
- help='Disk image file to repair.')
+ def Execute(self, args):
+ """Execute the sub commands by given parsed arguments."""
+ self.commands[args.command].Execute(args)
- def Repair(self, args):
+ class SubCommand(object):
+ """A base class for sub commands to derive from."""
+
+ def DefineArgs(self, parser):
+ """Defines command line arguments to argparse parser.
+
+ Args:
+ parser: An argparse parser instance.
+ """
+ del parser # Unused.
+ raise NotImplementedError
+
+ def Execute(self, args):
+ """Execute the command.
+
+ Args:
+ args: An argparse parsed namespace.
+ """
+ del args # Unused.
+ raise NotImplementedError
+
+ class Repair(SubCommand):
"""Repair damaged GPT headers and tables."""
- gpt = GPT.LoadFromFile(args.image_file)
- gpt.Resize(os.path.getsize(args.image_file.name))
- free_space = gpt.GetFreeSpace()
- if args.expand:
- if free_space:
- gpt.ExpandPartition(0)
+ def DefineArgs(self, parser):
+ parser.add_argument(
+ 'image_file', type=argparse.FileType('rb+'),
+ help='Disk image file to repair.')
+
+ def Execute(self, args):
+ gpt = GPT.LoadFromFile(args.image_file)
+ gpt.Resize(os.path.getsize(args.image_file.name))
+ gpt.WriteToFile(args.image_file)
+ print('Disk image file %s repaired.' % args.image_file.name)
+
+ class Expand(SubCommand):
+ """Expands a GPT partition to all available free space."""
+
+ def DefineArgs(self, parser):
+ parser.add_argument(
+ '-i', '--number', type=int, required=True,
+ help='The partition to expand.')
+ parser.add_argument(
+ 'image_file', type=argparse.FileType('rb+'),
+ help='Disk image file to modify.')
+
+ def Execute(self, args):
+ gpt = GPT.LoadFromFile(args.image_file)
+ old_blocks, new_blocks = gpt.ExpandPartition(args.number - 1)
+ gpt.WriteToFile(args.image_file)
+ if old_blocks < new_blocks:
+ print(
+ 'Partition %s on disk image file %s has been extended '
+ 'from %s to %s .' %
+ (args.number, args.image_file.name, old_blocks * gpt.block_size,
+ new_blocks * gpt.block_size))
else:
- logging.warn('- No extra space to expand.')
- elif free_space:
- logging.warn('Extra space found (%d, LBA=%d), '
- 'use --expand to expand partitions.',
- free_space, free_space / gpt.block_size)
+ print('Nothing to expand for disk image %s partition %s.' %
+ (args.image_file.name, args.number))
- gpt.WriteToFile(args.image_file)
- print('Disk image file %s repaired.' % args.image_file.name)
+ class Show(SubCommand):
+ """Show partition table and entries.
- @classmethod
- def RegisterShow(cls, p):
- """Registers the repair command to argparser.
-
- Args:
- p: An argparse parser instance.
+ Display the GPT table.
"""
- p.add_argument('--numeric', '-n', action='store_true',
- help='Numeric output only.')
- p.add_argument('--quick', '-q', action='store_true',
- help='Quick output.')
- p.add_argument('--index', '-i', type=int, default=None,
- help='Show specified partition only, with format args.')
- for name, help_str in cls.FORMAT_ARGS:
- # TODO(hungte) Alert if multiple args were specified.
- p.add_argument('--%s' % name, '-%c' % name[0], action='store_true',
- help='[format] %s.' % help_str)
- p.add_argument('image_file', type=argparse.FileType('rb'),
- help='Disk image file to show.')
+ def DefineArgs(self, parser):
+ parser.add_argument(
+ '--numeric', '-n', action='store_true',
+ help='Numeric output only.')
+ parser.add_argument(
+ '--quick', '-q', action='store_true',
+ help='Quick output.')
+ parser.add_argument(
+ '-i', '--number', type=int,
+ help='Show specified partition only, with format args.')
+ for name, help_str in GPTCommands.FORMAT_ARGS:
+ # TODO(hungte) Alert if multiple args were specified.
+ parser.add_argument(
+ '--%s' % name, '-%c' % name[0], action='store_true',
+ help='[format] %s.' % help_str)
+ parser.add_argument(
+ 'image_file', type=argparse.FileType('rb'),
+ help='Disk image file to show.')
- def Show(self, args):
- """Show partition table and entries."""
+ def Execute(self, args):
+ """Show partition table and entries."""
- def FormatGUID(bytes_le):
- return str(uuid.UUID(bytes_le=bytes_le)).upper()
+ def FormatGUID(bytes_le):
+ return str(uuid.UUID(bytes_le=bytes_le)).upper()
- def FormatTypeGUID(p):
- guid_str = FormatGUID(p.TypeGUID)
- if not args.numeric:
- names = gpt.TYPE_GUID_MAP.get(guid_str)
- if names:
- return names
- return guid_str
+ def FormatTypeGUID(p):
+ guid_str = FormatGUID(p.TypeGUID)
+ if not args.numeric:
+ names = gpt.TYPE_GUID_MAP.get(guid_str)
+ if names:
+ return names
+ return guid_str
- def FormatNames(p):
- return p.Names.decode('utf-16-le').strip('\0')
-
- def IsBootableType(type_guid):
- return type_guid in gpt.TYPE_GUID_LIST_BOOTABLE
-
- def FormatAttribute(attrs):
- if args.numeric:
- return '[%x]' % (attrs.raw >> 48)
- if attrs.legacy_boot:
- return 'legacy_boot=1'
- return 'priority=%d tries=%d successful=%d' % (
- attrs.priority, attrs.tries, attrs.successful)
-
- def ApplyFormatArgs(p):
- if args.begin:
- return p.FirstLBA
- elif args.size:
- return p.blocks
- elif args.type:
- return FormatTypeGUID(p)
- elif args.unique:
- return FormatGUID(p.UniqueGUID)
- elif args.label:
+ def FormatNames(p):
return p.label
- elif args.Successful:
- return p.attrs.successful
- elif args.Priority:
- return p.attrs.priority
- elif args.Tries:
- return p.attrs.tries
- elif args.Legacy:
- return p.attrs.legacy_boot
- elif args.Attribute:
- return '[%x]' % (p.Attributes >> 48)
- else:
- return None
- def IsFormatArgsSpecified():
- return any(getattr(args, arg[0]) for arg in self.FORMAT_ARGS)
+ def IsBootableType(type_guid):
+ return type_guid in gpt.TYPE_GUID_LIST_BOOTABLE
- gpt = GPT.LoadFromFile(args.image_file)
- fmt = '%12s %11s %7s %s'
- fmt2 = '%32s %s: %s'
- header = ('start', 'size', 'part', 'contents')
+ def FormatAttribute(attrs, chromeos_kernel=False):
+ if args.numeric:
+ return '[%x]' % (attrs.raw >> 48)
+ results = []
+ if chromeos_kernel:
+ results += [
+ 'priority=%d' % attrs.priority,
+ 'tries=%d' % attrs.tries,
+ 'successful=%d' % attrs.successful]
+ if attrs.required:
+ results += ['required=1']
+ if attrs.legacy_boot:
+ results += ['legacy_boot=1']
+ return ' '.join(results)
- if IsFormatArgsSpecified() and args.index is None:
- raise GPTError('Format arguments must be used with -i.')
+ def ApplyFormatArgs(p):
+ if args.begin:
+ return p.FirstLBA
+ elif args.size:
+ return p.blocks
+ elif args.type:
+ return FormatTypeGUID(p)
+ elif args.unique:
+ return FormatGUID(p.UniqueGUID)
+ elif args.label:
+ return FormatNames(p)
+ elif args.Successful:
+ return p.attrs.successful
+ elif args.Priority:
+ return p.attrs.priority
+ elif args.Tries:
+ return p.attrs.tries
+ elif args.Legacy:
+ return p.attrs.legacy_boot
+ elif args.Attribute:
+ return '[%x]' % (p.Attributes >> 48)
+ else:
+ return None
- partitions = gpt.GetValidPartitions()
- if not (args.index is None or 0 < args.index <= len(partitions)):
- raise GPTError('Invalid partition index: %d' % args.index)
+ def IsFormatArgsSpecified():
+ return any(getattr(args, arg[0]) for arg in GPTCommands.FORMAT_ARGS)
- do_print_gpt_blocks = False
- if not (args.quick or IsFormatArgsSpecified()):
- print(fmt % header)
- if args.index is None:
- do_print_gpt_blocks = True
+ gpt = GPT.LoadFromFile(args.image_file)
+ logging.debug('%r', gpt.header)
+ fmt = '%12s %11s %7s %s'
+ fmt2 = '%32s %s: %s'
+ header = ('start', 'size', 'part', 'contents')
- if do_print_gpt_blocks:
- print(fmt % (gpt.header.CurrentLBA, 1, '', 'Pri GPT header'))
- print(fmt % (gpt.header.PartitionEntriesStartingLBA,
- gpt.GetPartitionTableBlocks(), '', 'Pri GPT table'))
+ if IsFormatArgsSpecified() and args.number is None:
+ raise GPTError('Format arguments must be used with -i.')
- for i, p in enumerate(partitions):
- if args.index is not None and i != args.index - 1:
- continue
+ if not (args.number is None or
+ 0 < args.number <= gpt.header.PartitionEntriesNumber):
+ raise GPTError('Invalid partition number: %d' % args.number)
- if IsFormatArgsSpecified():
- print(ApplyFormatArgs(p))
- continue
+ partitions = gpt.partitions
+ do_print_gpt_blocks = False
+ if not (args.quick or IsFormatArgsSpecified()):
+ print(fmt % header)
+ if args.number is None:
+ do_print_gpt_blocks = True
- type_guid = FormatGUID(p.TypeGUID)
- print(fmt % (p.FirstLBA, p.LastLBA - p.FirstLBA + 1, i + 1,
- FormatTypeGUID(p) if args.quick else
- 'Label: "%s"' % FormatNames(p)))
+ if do_print_gpt_blocks:
+ print(fmt % (gpt.header.CurrentLBA, 1, '', 'Pri GPT header'))
+ print(fmt % (gpt.header.PartitionEntriesStartingLBA,
+ gpt.GetPartitionTableBlocks(), '', 'Pri GPT table'))
- if not args.quick:
- print(fmt2 % ('', 'Type', FormatTypeGUID(p)))
- print(fmt2 % ('', 'UUID', FormatGUID(p.UniqueGUID)))
- if args.numeric or IsBootableType(type_guid):
- print(fmt2 % ('', 'Attr', FormatAttribute(p.attrs)))
+ for p in partitions:
+ if args.number is None:
+ # Skip unused partitions.
+ if p.IsUnused():
+ continue
+ elif p.number != args.number:
+ continue
- if do_print_gpt_blocks:
- f = args.image_file
- f.seek(gpt.header.BackupLBA * gpt.block_size)
- backup_header = gpt.Header.ReadFrom(f)
- print(fmt % (backup_header.PartitionEntriesStartingLBA,
- gpt.GetPartitionTableBlocks(backup_header), '',
- 'Sec GPT table'))
- print(fmt % (gpt.header.BackupLBA, 1, '', 'Sec GPT header'))
+ if IsFormatArgsSpecified():
+ print(ApplyFormatArgs(p))
+ continue
- def Create(self, args):
- """Create or reset GPT headers and tables."""
- del args # Not used yet.
- raise NotImplementedError
+ type_guid = FormatGUID(p.TypeGUID)
+ print(fmt % (p.FirstLBA, p.blocks, p.number,
+ FormatTypeGUID(p) if args.quick else
+ 'Label: "%s"' % FormatNames(p)))
- def Add(self, args):
- """Add, edit or remove a partition entry."""
- del args # Not used yet.
- raise NotImplementedError
+ if not args.quick:
+ print(fmt2 % ('', 'Type', FormatTypeGUID(p)))
+ print(fmt2 % ('', 'UUID', FormatGUID(p.UniqueGUID)))
+ if args.numeric or IsBootableType(type_guid):
+ name = GPT.TYPE_GUID_MAP[type_guid]
+ print(fmt2 % ('', 'Attr', FormatAttribute(
+ p.attrs, name == GPT.TYPE_NAME_CHROMEOS_KERNEL)))
- def Boot(self, args):
- """Edit the PMBR sector for legacy BIOSes."""
- del args # Not used yet.
- raise NotImplementedError
-
- def Find(self, args):
- """Locate a partition by its GUID."""
- del args # Not used yet.
- raise NotImplementedError
-
- def Prioritize(self, args):
- """Reorder the priority of all kernel partitions."""
- del args # Not used yet.
- raise NotImplementedError
-
- def Legacy(self, args):
- """Switch between GPT and Legacy GPT."""
- del args # Not used yet.
- raise NotImplementedError
-
- @classmethod
- def RegisterAllCommands(cls, subparsers):
- """Registers all available commands to an argparser subparsers instance."""
- subcommands = [('show', cls.Show, cls.RegisterShow),
- ('repair', cls.Repair, cls.RegisterRepair)]
- for name, invocation, register_command in subcommands:
- register_command(subparsers.add_parser(name, help=invocation.__doc__))
+ if do_print_gpt_blocks:
+ f = args.image_file
+ f.seek(gpt.header.BackupLBA * gpt.block_size)
+ header = gpt.Header.ReadFrom(f)
+ print(fmt % (header.PartitionEntriesStartingLBA,
+ gpt.GetPartitionTableBlocks(header), '',
+ 'Sec GPT table'))
+ print(fmt % (header.CurrentLBA, 1, '', 'Sec GPT header'))
def main():
+ commands = GPTCommands()
parser = argparse.ArgumentParser(description='GPT Utility.')
parser.add_argument('--verbose', '-v', action='count', default=0,
help='increase verbosity.')
parser.add_argument('--debug', '-d', action='store_true',
help='enable debug output.')
- subparsers = parser.add_subparsers(help='Sub-command help.', dest='command')
- GPTCommands.RegisterAllCommands(subparsers)
+ commands.DefineArgs(parser)
args = parser.parse_args()
log_level = max(logging.WARNING - args.verbose * 10, logging.DEBUG)
@@ -769,13 +804,12 @@
log_level = logging.DEBUG
logging.basicConfig(format='%(module)s:%(funcName)s %(message)s',
level=log_level)
- commands = GPTCommands()
try:
- getattr(commands, args.command.capitalize())(args)
+ commands.Execute(args)
except Exception as e:
if args.verbose or args.debug:
logging.exception('Failure in command [%s]', args.command)
- exit('ERROR: %s' % e)
+ exit('ERROR: %s: %s' % (args.command, str(e) or 'Unknown error.'))
if __name__ == '__main__':