blob: a4cdb41e9c41cef1b7474c792c24b44def82abc2 [file] [log] [blame]
Amin Hassani92f6c4a2021-02-20 17:36:09 -08001# Copyright 2021 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Library containing functions to install an image on a Chromium OS device."""
6
7import enum
8import os
9import re
10import sys
11from typing import Tuple, Dict
12
13from chromite.lib import cros_build_lib
14from chromite.lib import cros_logging as logging
15from chromite.lib import gs
16from chromite.lib import parallel
17from chromite.lib import remote_access
18from chromite.lib import retry_util
19from chromite.lib.xbuddy import devserver_constants
20from chromite.lib.xbuddy import xbuddy
21
22
23assert sys.version_info >= (3, 6), 'This module requires Python 3.6+'
24
25
26class Error(Exception):
27 """Thrown when there is a general Chromium OS-specific flash error."""
28
29
30class ImageType(enum.Enum):
31 """Type of the image that is used for flashing the device."""
32
33 # The full image on disk (e.g. chromiumos_test_image.bin).
34 FULL = 0
35 # The remote directory path
36 # (e.g gs://chromeos-image-archive/eve-release/R90-x.x.x)
37 REMOTE_DIRECTORY = 1
38
39
40class Partition(enum.Enum):
41 """An enum for partition types like kernel and rootfs."""
42 KERNEL = 0
43 ROOTFS = 1
44
45
46class DeviceImager(object):
47 """A class to flash a Chromium OS device.
48
49 This utility uses parallelism as much as possible to achieve its goal as fast
50 as possible. For example, it uses parallel compressors, parallel transfers,
51 and simultaneous pipes.
52 """
53
54 # The parameters of the kernel and rootfs's two main partitions.
55 A = {Partition.KERNEL: 2, Partition.ROOTFS: 3}
56 B = {Partition.KERNEL: 4, Partition.ROOTFS: 5}
57
58 def __init__(self, device, image: str,
59 no_rootfs_update: bool = False,
60 no_stateful_update: bool = False,
61 no_reboot: bool = False,
62 disable_verification: bool = False,
63 clobber_stateful: bool = False,
64 clear_tpm_owner: bool = False):
65 """Initialize DeviceImager for flashing a Chromium OS device.
66
67 Args:
68 device: The ChromiumOSDevice to be updated.
69 image: The target image path (can be xBuddy path).
70 no_rootfs_update: Whether to do rootfs partition update.
71 no_stateful_update: Whether to do stateful partition update.
72 no_reboot: Whether to reboot device after update. The default is True.
73 disable_verification: Whether to disabling rootfs verification on the
74 device.
75 clobber_stateful: Whether to do a clean stateful partition.
76 clear_tpm_owner: If true, it will clear the TPM owner on reboot.
77 """
78
79 self._device = device
80 self._image = image
81 self._no_rootfs_update = no_rootfs_update
82 self._no_stateful_update = no_stateful_update
83 self._no_reboot = no_reboot
84 self._disable_verification = disable_verification
85 self._clobber_stateful = clobber_stateful
86 self._clear_tpm_owner = clear_tpm_owner
87
88 self._compression = cros_build_lib.COMP_XZ
89 self._inactive_state = None
90
91 def Run(self):
92 """Update the device with image of specific version."""
93
94 try:
95 self._Run()
96 except Exception as e:
97 raise Error(f'DeviceImager Failed with error: {e}')
98
99 logging.info('DeviceImager completed.')
100
101 def _Run(self):
102 """Runs the various operations to install the image on device."""
103 image, image_type = self._GetImage()
104 logging.info('Using image %s of type %s', image, image_type )
105
106 if image_type == ImageType.REMOTE_DIRECTORY:
107 self._compression = cros_build_lib.COMP_GZIP
108
109 self._InstallPartitions(image, image_type)
110
111 if self._clear_tpm_owner:
112 self._device.ClearTpmOwner()
113
114 if not self._no_reboot:
115 self._Reboot()
116 self._VerifyBootExpectations()
117
118 if self._disable_verification:
119 self._device.DisableRootfsVerification()
120
121 def _GetImage(self) -> Tuple[str, ImageType]:
122 """Returns the path to the final image(s) that need to be installed.
123
124 If the paths is local, the image should be the Chromium OS GPT image
125 (e.g. chromiumos_test_image.bin). If the path is remote, it should be the
126 remote directory where we can find the quick-provision and stateful update
127 files (e.g. gs://chromeos-image-archive/eve-release/R90-x.x.x).
128
129 NOTE: At this point there is no caching involved. Hence we always download
130 the partition payloads or extract them from the Chromium OS image.
131
132 Returns:
133 A tuple of image path and image type.
134 """
135 if os.path.isfile(self._image):
136 return self._image, ImageType.FULL
137
138 # TODO(b/172212406): We could potentially also allow this by searching
139 # through the directory to see whether we have quick-provision and stateful
140 # payloads. This only makes sense when a user has their workstation at home
141 # and doesn't want to incur the bandwidth cost of downloading the same
142 # image multiple times. For that, they can simply download the GPT image
143 # image first and flash that instead.
144 if os.path.isdir(self._image):
145 raise ValueError(
146 f'{self._image}: input must be a disk image, not a directory.')
147
148 if gs.PathIsGs(self._image):
149 # TODO(b/172212406): Check whether it is a directory. If it wasn't a
150 # directory download the image into some temp location and use it instead.
151 return self._image, ImageType.REMOTE_DIRECTORY
152
153 # Assuming it is an xBuddy path.
154 xb = xbuddy.XBuddy(log_screen=False)
155 build_id, local_file = xb.Translate([self._image])
156 if build_id is None:
157 raise Error(f'{self._image}: unable to find matching xBuddy path.')
158 logging.info('XBuddy path translated to build ID %s', build_id)
159
160 if local_file:
161 return local_file, ImageType.FULL
162
163 return (f'{devserver_constants.GS_IMAGE_DIR}/{build_id}',
164 ImageType.REMOTE_DIRECTORY)
165
166 def _SplitDevPath(self, path: str) -> Tuple[str, int]:
167 """Splits the given /dev/x path into prefix and the dev number.
168
169 Args:
170 path: The path to a block dev device.
171
172 Returns:
173 A tuple of representing the prefix and the index of the dev path.
174 e.g.: '/dev/mmcblk0p1' -> ['/dev/mmcblk0p', 1]
175 """
176 match = re.search(r'(.*)([0-9]+)$', path)
177 if match is None:
178 raise Error(f'{path}: Could not parse root dev path.')
179
180 return match.group(1), int(match.group(2))
181
182 def _GetKernelState(self, root_num: int) -> Tuple[Dict, Dict]:
183 """Returns the kernel state.
184
185 Returns:
186 A tuple of two dictionaries: The current active kernel state and the
187 inactive kernel state. (Look at A and B constants in this class.)
188 """
189 if root_num == self.A[Partition.ROOTFS]:
190 return self.A, self.B
191 elif root_num == self.B[Partition.ROOTFS]:
192 return self.B, self.A
193 else:
194 raise Error(f'Invalid root partition number {root_num}')
195
196 def _InstallPartitions(self, image: str, image_type):
197 """The main method that installs the partitions of a Chrome OS device.
198
199 It uses parallelism to install the partitions as fast as possible.
200
201 Args:
202 image: The image path (local file or remote directory).
203 image_type: The type of the image (ImageType).
204 """
205 updaters = []
206
207 # Retry the partitions updates that failed, in case a transient error (like
208 # SSH drop, etc) caused the error.
209 num_retries = 1
210 try:
211 retry_util.RetryException(Error, num_retries,
212 parallel.RunParallelSteps,
213 (x.Run for x in updaters if not x.IsFinished()),
214 halt_on_error=True)
215 except Exception:
216 # If one of the partitions failed to be installed, revert all partitions.
217 parallel.RunParallelSteps(x.Revert for x in updaters)
218 raise
219
220 def _Reboot(self):
221 """Reboots the device."""
222 try:
223 self._device.Reboot(timeout_sec=60)
224 except remote_access.RebootError:
225 raise Error('Could not recover from reboot. Once example reason'
226 ' could be the image provided was a non-test image'
227 ' or the system failed to boot after the update.')
228 except Exception as e:
229 raise Error(f'Failed to reboot to the device with error: {e}')
230
231 def _VerifyBootExpectations(self):
232 """Verify that we fully booted into the expected kernel state."""
233 # Discover the newly active kernel.
234 _, root_num = self._SplitDevPath(self._device.root_dev)
235 active_state, _ = self._GetKernelState(root_num)
236
237 # If this happens, we should rollback.
238 if active_state != self._inactive_state:
239 raise Error('The expected kernel state after update is invalid.')
240
241 logging.info('Verified boot expectations.')