Formatting: Format all python code with black.
This CL is probably not what you're looking for, it's only
automated formatting. Ignore it with
`git blame --ignore-rev <revision>` for this commit.
BUG=b:233893248
TEST=CQ
Change-Id: I66591d7a738d241aed3290138c0f68065ab10a6d
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/3879174
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Tested-by: Alex Klein <saklein@chromium.org>
diff --git a/cli/flash.py b/cli/flash.py
index 62b0974..e57869b 100644
--- a/cli/flash.py
+++ b/cli/flash.py
@@ -23,384 +23,437 @@
def GetDefaultBoard():
- """Look up default board.
+ """Look up default board.
- In a chrome checkout, return $SDK_BOARD. In a chromeos checkout,
- return the contents of .default_board.
- """
- if path_util.DetermineCheckout().type == path_util.CHECKOUT_TYPE_GCLIENT:
- return os.environ.get(cros_chrome_sdk.SDKFetcher.SDK_BOARD_ENV)
- return cros_build_lib.GetDefaultBoard()
+ In a chrome checkout, return $SDK_BOARD. In a chromeos checkout,
+ return the contents of .default_board.
+ """
+ if path_util.DetermineCheckout().type == path_util.CHECKOUT_TYPE_GCLIENT:
+ return os.environ.get(cros_chrome_sdk.SDKFetcher.SDK_BOARD_ENV)
+ return cros_build_lib.GetDefaultBoard()
class UsbImagerOperation(operation.ProgressBarOperation):
- """Progress bar for flashing image to operation."""
+ """Progress bar for flashing image to operation."""
- def __init__(self, image):
- super().__init__()
- self._size = os.path.getsize(image)
- self._transferred = 0
- self._bytes = re.compile(r'(\d+) bytes')
+ def __init__(self, image):
+ super().__init__()
+ self._size = os.path.getsize(image)
+ self._transferred = 0
+ self._bytes = re.compile(r"(\d+) bytes")
- def _GetDDPid(self):
- """Get the Pid of dd."""
- try:
- pids = cros_build_lib.run(['pgrep', 'dd'], capture_output=True,
- print_cmd=False, encoding='utf-8').stdout
- for pid in pids.splitlines():
- if osutils.IsChildProcess(int(pid), name='dd'):
- return int(pid)
- return -1
- except cros_build_lib.RunCommandError:
- # If dd isn't still running, then we assume that it is finished.
- return -1
+ def _GetDDPid(self):
+ """Get the Pid of dd."""
+ try:
+ pids = cros_build_lib.run(
+ ["pgrep", "dd"],
+ capture_output=True,
+ print_cmd=False,
+ encoding="utf-8",
+ ).stdout
+ for pid in pids.splitlines():
+ if osutils.IsChildProcess(int(pid), name="dd"):
+ return int(pid)
+ return -1
+ except cros_build_lib.RunCommandError:
+ # If dd isn't still running, then we assume that it is finished.
+ return -1
- def _PingDD(self, dd_pid):
- """Send USR1 signal to dd to get status update."""
- try:
- cmd = ['kill', '-USR1', str(dd_pid)]
- cros_build_lib.sudo_run(cmd, print_cmd=False)
- except cros_build_lib.RunCommandError:
- # Here we assume that dd finished in the background.
- return
+ def _PingDD(self, dd_pid):
+ """Send USR1 signal to dd to get status update."""
+ try:
+ cmd = ["kill", "-USR1", str(dd_pid)]
+ cros_build_lib.sudo_run(cmd, print_cmd=False)
+ except cros_build_lib.RunCommandError:
+ # Here we assume that dd finished in the background.
+ return
- def ParseOutput(self, output=None):
- """Parse the output of dd to update progress bar."""
- dd_pid = self._GetDDPid()
- if dd_pid == -1:
- return
+ def ParseOutput(self, output=None):
+ """Parse the output of dd to update progress bar."""
+ dd_pid = self._GetDDPid()
+ if dd_pid == -1:
+ return
- self._PingDD(dd_pid)
+ self._PingDD(dd_pid)
- if output is None:
- stdout = self._stdout.read()
- stderr = self._stderr.read()
- output = stdout + stderr
+ if output is None:
+ stdout = self._stdout.read()
+ stderr = self._stderr.read()
+ output = stdout + stderr
- match = self._bytes.search(output)
- if match:
- self._transferred = int(match.groups()[0])
+ match = self._bytes.search(output)
+ if match:
+ self._transferred = int(match.groups()[0])
- self.ProgressBar(self._transferred / self._size)
+ self.ProgressBar(self._transferred / self._size)
def _IsFilePathGPTDiskImage(file_path, require_pmbr=False):
- """Determines if a file is a valid GPT disk.
+ """Determines if a file is a valid GPT disk.
- Args:
- file_path: Path to the file to test.
- require_pmbr: Whether to require a PMBR in LBA0.
- """
- if os.path.isfile(file_path):
- with open(file_path, 'rb') as image_file:
- if require_pmbr:
- # Seek to the end of LBA0 and look for the PMBR boot signature.
- image_file.seek(0x1fe)
- if image_file.read(2) != b'\x55\xaa':
- return False
- # Current file position is start of LBA1 now.
- else:
- # Seek to LBA1 where the GPT starts.
- image_file.seek(0x200)
+ Args:
+ file_path: Path to the file to test.
+ require_pmbr: Whether to require a PMBR in LBA0.
+ """
+ if os.path.isfile(file_path):
+ with open(file_path, "rb") as image_file:
+ if require_pmbr:
+ # Seek to the end of LBA0 and look for the PMBR boot signature.
+ image_file.seek(0x1FE)
+ if image_file.read(2) != b"\x55\xaa":
+ return False
+ # Current file position is start of LBA1 now.
+ else:
+ # Seek to LBA1 where the GPT starts.
+ image_file.seek(0x200)
- # See if there's a GPT here.
- if image_file.read(8) == b'EFI PART':
- return True
+ # See if there's a GPT here.
+ if image_file.read(8) == b"EFI PART":
+ return True
- return False
+ return False
def _ChooseImageFromDirectory(dir_path):
- """Lists all image files in |dir_path| and ask user to select one.
+ """Lists all image files in |dir_path| and ask user to select one.
- Args:
- dir_path: Path to the directory.
- """
- images = sorted([x for x in os.listdir(dir_path) if
- _IsFilePathGPTDiskImage(os.path.join(dir_path, x))])
- idx = 0
- if not images:
- raise ValueError('No image found in %s.' % dir_path)
- elif len(images) > 1:
- idx = cros_build_lib.GetChoice(
- 'Multiple images found in %s. Please select one to continue:' % (
- (dir_path,)),
- images)
+ Args:
+ dir_path: Path to the directory.
+ """
+ images = sorted(
+ [
+ x
+ for x in os.listdir(dir_path)
+ if _IsFilePathGPTDiskImage(os.path.join(dir_path, x))
+ ]
+ )
+ idx = 0
+ if not images:
+ raise ValueError("No image found in %s." % dir_path)
+ elif len(images) > 1:
+ idx = cros_build_lib.GetChoice(
+ "Multiple images found in %s. Please select one to continue:"
+ % ((dir_path,)),
+ images,
+ )
- return os.path.join(dir_path, images[idx])
+ return os.path.join(dir_path, images[idx])
class FlashError(Exception):
- """Thrown when there is an unrecoverable error during flash."""
+ """Thrown when there is an unrecoverable error during flash."""
class USBImager(object):
- """Copy image to the target removable device."""
+ """Copy image to the target removable device."""
- def __init__(self, device, board, image, version, debug=False, yes=False):
- """Initializes USBImager."""
- self.device = device
- self.board = board if board else GetDefaultBoard()
- self.image = image
- self.version = version
- self.debug = debug
- self.debug_level = logging.DEBUG if debug else logging.INFO
- self.yes = yes
+ def __init__(self, device, board, image, version, debug=False, yes=False):
+ """Initializes USBImager."""
+ self.device = device
+ self.board = board if board else GetDefaultBoard()
+ self.image = image
+ self.version = version
+ self.debug = debug
+ self.debug_level = logging.DEBUG if debug else logging.INFO
+ self.yes = yes
- def DeviceNameToPath(self, device_name):
- return '/dev/%s' % device_name
+ def DeviceNameToPath(self, device_name):
+ return "/dev/%s" % device_name
- def GetRemovableDeviceDescription(self, device):
- """Returns a informational description of the removable |device|.
+ def GetRemovableDeviceDescription(self, device):
+ """Returns a informational description of the removable |device|.
- Args:
- device: the device name (e.g. sdc).
+ Args:
+ device: the device name (e.g. sdc).
- Returns:
- A string describing |device| (e.g. Patriot Memory 7918 MB).
- """
- desc = [
- osutils.GetDeviceInfo(device, keyword='manufacturer'),
- osutils.GetDeviceInfo(device, keyword='product'),
- osutils.GetDeviceSize(self.DeviceNameToPath(device)),
- '(%s)' % self.DeviceNameToPath(device),
- ]
- return ' '.join([x for x in desc if x])
+ Returns:
+ A string describing |device| (e.g. Patriot Memory 7918 MB).
+ """
+ desc = [
+ osutils.GetDeviceInfo(device, keyword="manufacturer"),
+ osutils.GetDeviceInfo(device, keyword="product"),
+ osutils.GetDeviceSize(self.DeviceNameToPath(device)),
+ "(%s)" % self.DeviceNameToPath(device),
+ ]
+ return " ".join([x for x in desc if x])
- def ListAllRemovableDevices(self):
- """Returns a list of removable devices.
+ def ListAllRemovableDevices(self):
+ """Returns a list of removable devices.
- Returns:
- A list of device names (e.g. ['sdb', 'sdc']).
- """
- devices = osutils.ListBlockDevices()
- removable_devices = []
- for d in devices:
- if d.TYPE == 'disk' and (d.RM == '1' or d.HOTPLUG == '1'):
- removable_devices.append(d.NAME)
+ Returns:
+ A list of device names (e.g. ['sdb', 'sdc']).
+ """
+ devices = osutils.ListBlockDevices()
+ removable_devices = []
+ for d in devices:
+ if d.TYPE == "disk" and (d.RM == "1" or d.HOTPLUG == "1"):
+ removable_devices.append(d.NAME)
- return removable_devices
+ return removable_devices
- def ChooseRemovableDevice(self, devices):
- """Lists all removable devices and asks user to select/confirm.
+ def ChooseRemovableDevice(self, devices):
+ """Lists all removable devices and asks user to select/confirm.
- Args:
- devices: a list of device names (e.g. ['sda', 'sdb']).
+ Args:
+ devices: a list of device names (e.g. ['sda', 'sdb']).
- Returns:
- The device name chosen by the user.
- """
- idx = cros_build_lib.GetChoice(
- 'Removable device(s) found. Please select/confirm to continue:',
- [self.GetRemovableDeviceDescription(x) for x in devices])
+ Returns:
+ The device name chosen by the user.
+ """
+ idx = cros_build_lib.GetChoice(
+ "Removable device(s) found. Please select/confirm to continue:",
+ [self.GetRemovableDeviceDescription(x) for x in devices],
+ )
- return devices[idx]
+ return devices[idx]
- def CopyImageToDevice(self, image, device):
- """Copies |image| to the removable |device|.
+ def CopyImageToDevice(self, image, device):
+ """Copies |image| to the removable |device|.
- Args:
- image: Path to the image to copy.
- device: Device to copy to.
- """
- cmd = ['dd', 'if=%s' % image, 'of=%s' % device, 'bs=4M', 'iflag=fullblock',
- 'oflag=direct', 'conv=fdatasync']
- if logging.getLogger().getEffectiveLevel() <= logging.NOTICE:
- op = UsbImagerOperation(image)
- op.Run(cros_build_lib.sudo_run, cmd, debug_level=logging.NOTICE,
- encoding='utf-8', update_period=0.5)
- else:
- cros_build_lib.sudo_run(
- cmd, debug_level=logging.NOTICE,
- print_cmd=logging.getLogger().getEffectiveLevel() < logging.NOTICE)
+ Args:
+ image: Path to the image to copy.
+ device: Device to copy to.
+ """
+ cmd = [
+ "dd",
+ "if=%s" % image,
+ "of=%s" % device,
+ "bs=4M",
+ "iflag=fullblock",
+ "oflag=direct",
+ "conv=fdatasync",
+ ]
+ if logging.getLogger().getEffectiveLevel() <= logging.NOTICE:
+ op = UsbImagerOperation(image)
+ op.Run(
+ cros_build_lib.sudo_run,
+ cmd,
+ debug_level=logging.NOTICE,
+ encoding="utf-8",
+ update_period=0.5,
+ )
+ else:
+ cros_build_lib.sudo_run(
+ cmd,
+ debug_level=logging.NOTICE,
+ print_cmd=logging.getLogger().getEffectiveLevel()
+ < logging.NOTICE,
+ )
- # dd likely didn't put the backup GPT in the last block. sfdisk fixes this
- # up for us with a 'write' command, so we have a standards-conforming GPT.
- # Ignore errors because sfdisk (util-linux < v2.32) isn't always happy to
- # fix GPT correctness issues.
- cros_build_lib.sudo_run(['sfdisk', device], input='write\n',
- check=False,
- debug_level=self.debug_level)
+ # dd likely didn't put the backup GPT in the last block. sfdisk fixes this
+ # up for us with a 'write' command, so we have a standards-conforming GPT.
+ # Ignore errors because sfdisk (util-linux < v2.32) isn't always happy to
+ # fix GPT correctness issues.
+ cros_build_lib.sudo_run(
+ ["sfdisk", device],
+ input="write\n",
+ check=False,
+ debug_level=self.debug_level,
+ )
- cros_build_lib.sudo_run(['partx', '-u', device],
- debug_level=self.debug_level)
- cros_build_lib.sudo_run(['sync', '-d', device],
- debug_level=self.debug_level)
+ cros_build_lib.sudo_run(
+ ["partx", "-u", device], debug_level=self.debug_level
+ )
+ cros_build_lib.sudo_run(
+ ["sync", "-d", device], debug_level=self.debug_level
+ )
- def _GetImagePath(self):
- """Returns the image path to use."""
- image_path = None
- if os.path.isfile(self.image):
- if not self.yes and not _IsFilePathGPTDiskImage(self.image):
- # TODO(wnwen): Open the tarball and if there is just one file in it,
- # use that instead. Existing code in upload_symbols.py.
- if cros_build_lib.BooleanPrompt(
- prolog='The given image file is not a valid disk image. Perhaps '
- 'you forgot to untar it.',
- prompt='Terminate the current flash process?'):
- raise FlashError('Update terminated by user.')
- image_path = self.image
- elif os.path.isdir(self.image):
- # Ask user which image (*.bin) in the folder to use.
- image_path = _ChooseImageFromDirectory(self.image)
- else:
- # Translate the xbuddy path to get the exact image to use.
- _, image_path = ds_wrapper.GetImagePathWithXbuddy(
- self.image, self.board, self.version)
+ def _GetImagePath(self):
+ """Returns the image path to use."""
+ image_path = None
+ if os.path.isfile(self.image):
+ if not self.yes and not _IsFilePathGPTDiskImage(self.image):
+ # TODO(wnwen): Open the tarball and if there is just one file in it,
+ # use that instead. Existing code in upload_symbols.py.
+ if cros_build_lib.BooleanPrompt(
+ prolog="The given image file is not a valid disk image. Perhaps "
+ "you forgot to untar it.",
+ prompt="Terminate the current flash process?",
+ ):
+ raise FlashError("Update terminated by user.")
+ image_path = self.image
+ elif os.path.isdir(self.image):
+ # Ask user which image (*.bin) in the folder to use.
+ image_path = _ChooseImageFromDirectory(self.image)
+ else:
+ # Translate the xbuddy path to get the exact image to use.
+ _, image_path = ds_wrapper.GetImagePathWithXbuddy(
+ self.image, self.board, self.version
+ )
- logging.info('Using image %s', image_path)
- return image_path
+ logging.info("Using image %s", image_path)
+ return image_path
- def Run(self):
- """Image the removable device."""
- devices = self.ListAllRemovableDevices()
+ def Run(self):
+ """Image the removable device."""
+ devices = self.ListAllRemovableDevices()
- if self.device:
- # If user specified a device path, check if it exists.
- if not os.path.exists(self.device):
- raise FlashError('Device path %s does not exist.' % self.device)
+ if self.device:
+ # If user specified a device path, check if it exists.
+ if not os.path.exists(self.device):
+ raise FlashError("Device path %s does not exist." % self.device)
- # Then check if it is removable.
- if self.device not in [self.DeviceNameToPath(x) for x in devices]:
- msg = '%s is not a removable device.' % self.device
- if not (self.yes or cros_build_lib.BooleanPrompt(
- default=False, prolog=msg)):
- raise FlashError('You can specify usb:// to choose from a list of '
- 'removable devices.')
- target = None
- if self.device:
- # Get device name from path (e.g. sdc in /dev/sdc).
- target = self.device.rsplit(os.path.sep, 1)[-1]
- elif devices:
- # Ask user to choose from the list.
- target = self.ChooseRemovableDevice(devices)
- else:
- raise FlashError('No removable devices detected.')
+ # Then check if it is removable.
+ if self.device not in [self.DeviceNameToPath(x) for x in devices]:
+ msg = "%s is not a removable device." % self.device
+ if not (
+ self.yes
+ or cros_build_lib.BooleanPrompt(default=False, prolog=msg)
+ ):
+ raise FlashError(
+ "You can specify usb:// to choose from a list of "
+ "removable devices."
+ )
+ target = None
+ if self.device:
+ # Get device name from path (e.g. sdc in /dev/sdc).
+ target = self.device.rsplit(os.path.sep, 1)[-1]
+ elif devices:
+ # Ask user to choose from the list.
+ target = self.ChooseRemovableDevice(devices)
+ else:
+ raise FlashError("No removable devices detected.")
- image_path = self._GetImagePath()
- device = self.DeviceNameToPath(target)
+ image_path = self._GetImagePath()
+ device = self.DeviceNameToPath(target)
- device_size_bytes = osutils.GetDeviceSize(device, in_bytes=True)
- image_size_bytes = os.path.getsize(image_path)
- if device_size_bytes < image_size_bytes:
- raise FlashError(
- 'Removable device %s has less storage (%d) than the image size (%d).'
- % (device, device_size_bytes, image_size_bytes))
+ device_size_bytes = osutils.GetDeviceSize(device, in_bytes=True)
+ image_size_bytes = os.path.getsize(image_path)
+ if device_size_bytes < image_size_bytes:
+ raise FlashError(
+ "Removable device %s has less storage (%d) than the image size (%d)."
+ % (device, device_size_bytes, image_size_bytes)
+ )
- try:
- self.CopyImageToDevice(image_path, device)
- except cros_build_lib.RunCommandError:
- logging.error('Failed copying image to device %s',
- self.DeviceNameToPath(target))
+ try:
+ self.CopyImageToDevice(image_path, device)
+ except cros_build_lib.RunCommandError:
+ logging.error(
+ "Failed copying image to device %s",
+ self.DeviceNameToPath(target),
+ )
class FileImager(USBImager):
- """Copy image to the target path."""
+ """Copy image to the target path."""
- def Run(self):
- """Copy the image to the path specified by self.device."""
- if not os.path.isdir(os.path.dirname(self.device)):
- raise FlashError('Parent of path %s is not a directory.' % self.device)
+ def Run(self):
+ """Copy the image to the path specified by self.device."""
+ if not os.path.isdir(os.path.dirname(self.device)):
+ raise FlashError(
+ "Parent of path %s is not a directory." % self.device
+ )
- image_path = self._GetImagePath()
- if os.path.isdir(self.device):
- logging.info('Copying to %s',
- os.path.join(self.device, os.path.basename(image_path)))
- else:
- logging.info('Copying to %s', self.device)
- try:
- shutil.copy(image_path, self.device)
- except IOError:
- logging.error('Failed to copy image %s to %s', image_path, self.device)
+ image_path = self._GetImagePath()
+ if os.path.isdir(self.device):
+ logging.info(
+ "Copying to %s",
+ os.path.join(self.device, os.path.basename(image_path)),
+ )
+ else:
+ logging.info("Copying to %s", self.device)
+ try:
+ shutil.copy(image_path, self.device)
+ except IOError:
+ logging.error(
+ "Failed to copy image %s to %s", image_path, self.device
+ )
# TODO(b/190631159, b/196056723): Change default of no_minios_update to |False|.
-def Flash(device, image, board=None, version=None,
- no_rootfs_update=False, no_stateful_update=False,
- no_minios_update=True, clobber_stateful=False, reboot=True,
- ssh_private_key=None, ping=True, disable_rootfs_verification=False,
- clear_cache=False, yes=False, force=False, debug=False,
- clear_tpm_owner=False, delta=False):
- """Flashes a device, USB drive, or file with an image.
+def Flash(
+ device,
+ image,
+ board=None,
+ version=None,
+ no_rootfs_update=False,
+ no_stateful_update=False,
+ no_minios_update=True,
+ clobber_stateful=False,
+ reboot=True,
+ ssh_private_key=None,
+ ping=True,
+ disable_rootfs_verification=False,
+ clear_cache=False,
+ yes=False,
+ force=False,
+ debug=False,
+ clear_tpm_owner=False,
+ delta=False,
+):
+ """Flashes a device, USB drive, or file with an image.
- This provides functionality common to `cros flash` and `brillo flash`
- so that they can parse the commandline separately but still use the
- same underlying functionality.
+ This provides functionality common to `cros flash` and `brillo flash`
+ so that they can parse the commandline separately but still use the
+ same underlying functionality.
- Args:
- device: commandline.Device object; None to use the default device.
- image: Path (string) to the update image. Can be a local or xbuddy path;
- non-existant local paths are converted to xbuddy.
- board: Board to use; None to automatically detect.
- no_rootfs_update: Don't update rootfs partition; SSH |device| scheme only.
- no_stateful_update: Don't update stateful partition; SSH |device| scheme
- only.
- no_minios_update: Don't update miniOS partition; SSH |device| scheme only.
- clobber_stateful: Clobber stateful partition; SSH |device| scheme only.
- clear_tpm_owner: Clear the TPM owner on reboot; SSH |device| scheme only.
- reboot: Reboot device after update; SSH |device| scheme only.
- ssh_private_key: Path to an SSH private key file; None to use test keys.
- ping: Ping the device before attempting update; SSH |device| scheme only.
- disable_rootfs_verification: Remove rootfs verification after update; SSH
- |device| scheme only.
- clear_cache: Clear the devserver static directory.
- yes: Assume "yes" for any prompt.
- force: Ignore confidence checks and prompts. Overrides |yes| if True.
- debug: Print additional debugging messages.
- version: Default version.
- delta: Whether to use delta compression when tranferring image bytes.
+ Args:
+ device: commandline.Device object; None to use the default device.
+ image: Path (string) to the update image. Can be a local or xbuddy path;
+ non-existant local paths are converted to xbuddy.
+ board: Board to use; None to automatically detect.
+ no_rootfs_update: Don't update rootfs partition; SSH |device| scheme only.
+ no_stateful_update: Don't update stateful partition; SSH |device| scheme
+ only.
+ no_minios_update: Don't update miniOS partition; SSH |device| scheme only.
+ clobber_stateful: Clobber stateful partition; SSH |device| scheme only.
+ clear_tpm_owner: Clear the TPM owner on reboot; SSH |device| scheme only.
+ reboot: Reboot device after update; SSH |device| scheme only.
+ ssh_private_key: Path to an SSH private key file; None to use test keys.
+ ping: Ping the device before attempting update; SSH |device| scheme only.
+ disable_rootfs_verification: Remove rootfs verification after update; SSH
+ |device| scheme only.
+ clear_cache: Clear the devserver static directory.
+ yes: Assume "yes" for any prompt.
+ force: Ignore confidence checks and prompts. Overrides |yes| if True.
+ debug: Print additional debugging messages.
+ version: Default version.
+ delta: Whether to use delta compression when tranferring image bytes.
- Raises:
- FlashError: An unrecoverable error occured.
- ValueError: Invalid parameter combination.
- """
- if force:
- yes = True
+ Raises:
+ FlashError: An unrecoverable error occured.
+ ValueError: Invalid parameter combination.
+ """
+ if force:
+ yes = True
- if clear_cache:
- ds_wrapper.DevServerWrapper.WipeStaticDirectory()
- ds_wrapper.DevServerWrapper.CreateStaticDirectory()
+ if clear_cache:
+ ds_wrapper.DevServerWrapper.WipeStaticDirectory()
+ ds_wrapper.DevServerWrapper.CreateStaticDirectory()
- # The user may not have specified a source image, use version as the default.
- image = image or version
- if not device or device.scheme == commandline.DEVICE_SCHEME_SSH:
- if device:
- hostname, port = device.hostname, device.port
- else:
- hostname, port = None, None
+ # The user may not have specified a source image, use version as the default.
+ image = image or version
+ if not device or device.scheme == commandline.DEVICE_SCHEME_SSH:
+ if device:
+ hostname, port = device.hostname, device.port
+ else:
+ hostname, port = None, None
- with remote_access.ChromiumOSDeviceHandler(
- hostname, port=port,
- private_key=ssh_private_key, ping=ping) as device_p:
- device_imager.DeviceImager(
- device_p,
- image,
- board=board,
- version=version,
- no_rootfs_update=no_rootfs_update,
- no_stateful_update=no_stateful_update,
- no_minios_update=no_minios_update,
- no_reboot=not reboot,
- disable_verification=disable_rootfs_verification,
- clobber_stateful=clobber_stateful,
- clear_tpm_owner=clear_tpm_owner,
- delta=delta).Run()
- elif device.scheme == commandline.DEVICE_SCHEME_USB:
- path = osutils.ExpandPath(device.path) if device.path else ''
- logging.info('Preparing to image the removable device %s', path)
- imager = USBImager(path,
- board,
- image,
- version,
- debug=debug,
- yes=yes)
- imager.Run()
- elif device.scheme == commandline.DEVICE_SCHEME_FILE:
- logging.info('Preparing to copy image to %s', device.path)
- imager = FileImager(device.path,
- board,
- image,
- version,
- debug=debug,
- yes=yes)
- imager.Run()
+ with remote_access.ChromiumOSDeviceHandler(
+ hostname, port=port, private_key=ssh_private_key, ping=ping
+ ) as device_p:
+ device_imager.DeviceImager(
+ device_p,
+ image,
+ board=board,
+ version=version,
+ no_rootfs_update=no_rootfs_update,
+ no_stateful_update=no_stateful_update,
+ no_minios_update=no_minios_update,
+ no_reboot=not reboot,
+ disable_verification=disable_rootfs_verification,
+ clobber_stateful=clobber_stateful,
+ clear_tpm_owner=clear_tpm_owner,
+ delta=delta,
+ ).Run()
+ elif device.scheme == commandline.DEVICE_SCHEME_USB:
+ path = osutils.ExpandPath(device.path) if device.path else ""
+ logging.info("Preparing to image the removable device %s", path)
+ imager = USBImager(path, board, image, version, debug=debug, yes=yes)
+ imager.Run()
+ elif device.scheme == commandline.DEVICE_SCHEME_FILE:
+ logging.info("Preparing to copy image to %s", device.path)
+ imager = FileImager(
+ device.path, board, image, version, debug=debug, yes=yes
+ )
+ imager.Run()