Dennis Kempin | 76f89c7 | 2014-05-15 15:53:41 -0700 | [diff] [blame^] | 1 | # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """Provides tools for packaging, installing and testing firmware.""" |
| 6 | |
| 7 | from mtlib.util import Path, RequiredRegex, Execute, GitRepo |
| 8 | from tempfile import NamedTemporaryFile |
| 9 | import tarfile |
| 10 | import os |
| 11 | import io |
| 12 | import time |
| 13 | |
| 14 | src_dir = Path("/mnt/host/source/src/") |
| 15 | |
| 16 | class FirmwareException(Exception): |
| 17 | pass |
| 18 | |
| 19 | class FirmwareBinary(object): |
| 20 | """.""" |
| 21 | def __init__(self, filename, fileobj=None, symlink=None): |
| 22 | self.hw_version = None |
| 23 | self.fw_version = None |
| 24 | self.symlink_name = None |
| 25 | |
| 26 | if not fileobj: |
| 27 | fileobj = open(filename, "rb") |
| 28 | self.data = fileobj.read() |
| 29 | |
| 30 | if symlink: |
| 31 | self.symlink_name = symlink |
| 32 | |
| 33 | name_regex = RequiredRegex("([0-9.a-zA-Z]+)_([0-9.a-zA-Z]+)\\.bin") |
| 34 | match = name_regex.Match(filename, must_succeed=False) |
| 35 | if match: |
| 36 | self.hw_version = match.group(1) |
| 37 | self.fw_version = match.group(2) |
| 38 | |
| 39 | self.filename = filename |
| 40 | self.device_file = Path("/opt/google/touch/firmware", filename) |
| 41 | |
| 42 | def UpdateName(self, hw_version, fw_version): |
| 43 | name = "{}_{}.bin".format(hw_version, fw_version) |
| 44 | os.rename(self.filename, name) |
| 45 | self.filename = name |
| 46 | |
| 47 | def ForceUpdate(self, device, remote): |
| 48 | remote.RemountWriteable() |
| 49 | |
| 50 | symlink = Path("/lib/firmware", device.symlink) |
| 51 | symlink_bak = Path(str(symlink) + ".bak") |
| 52 | |
| 53 | target = Path("/opt/google/touch/firmware/force_update.fw") |
| 54 | device.symlink = symlink.basename |
| 55 | |
| 56 | remote.Write(str(target), self.data) |
| 57 | try: |
| 58 | remote.SafeExecute(["mv", str(symlink), str(symlink_bak)]) |
| 59 | try: |
| 60 | remote.SafeExecute(["ln", "-s", str(target), str(symlink)]) |
| 61 | device.ForceFirmwareUpdate(remote) |
| 62 | finally: |
| 63 | remote.SafeExecute(["mv", str(symlink_bak), str(symlink)]) |
| 64 | finally: |
| 65 | remote.SafeExecute(["rm", str(target)]) |
| 66 | |
| 67 | def __str__(self): |
| 68 | symlink = "Unknown" |
| 69 | if self.symlink_name: |
| 70 | symlink = "/lib/firmware/" + self.symlink_name |
| 71 | return "%s @%s" % (self.filename, symlink) |
| 72 | |
| 73 | def __repr__(self): |
| 74 | return str(self) |
| 75 | |
| 76 | |
| 77 | class FirmwarePackage(object): |
| 78 | """Helper class to deal with firmware installation on devices.""" |
| 79 | name = "firmware" |
| 80 | |
| 81 | def __init__(self, board, variant): |
| 82 | self.board = board |
| 83 | self.variant = variant |
| 84 | self.binaries = {} |
| 85 | |
| 86 | # determine path and name of touch firmware ebuild file |
| 87 | if variant: |
| 88 | overlay_name = "overlay-variant-{}-{}-private".format( |
| 89 | board, variant) |
| 90 | else: |
| 91 | overlay_name = "overlay-{}-private".format(board) |
| 92 | self.overlay = src_dir / "private-overlays" / overlay_name |
| 93 | |
| 94 | self.ebuild_name = "chromeos-touch-firmware-{}".format(board) |
| 95 | self.ebuild_dir = self.overlay / "chromeos-base" / self.ebuild_name |
| 96 | self.ebuild_repo = GitRepo(self.ebuild_dir) |
| 97 | |
| 98 | self.ebuild_file = self.ebuild_dir / "{}-0.0.1.ebuild".format( |
| 99 | self.ebuild_name) |
| 100 | |
| 101 | # look for symlink to ebuild file |
| 102 | self.ebuild_symlink = None |
| 103 | for symlink in self.ebuild_dir.ListDir(): |
| 104 | if symlink.is_link and symlink.basename.startswith(self.ebuild_name): |
| 105 | self.ebuild_symlink = symlink |
| 106 | |
| 107 | if not self.ebuild_symlink: |
| 108 | msg = "Cannot find version symlink in {}" |
| 109 | raise Exception(msg.format(self.ebuild_dir)) |
| 110 | |
| 111 | # extract ebuild version from symlink name |
| 112 | regex = "{}-([0-9a-zA-Z_\\-\\.]*).ebuild" |
| 113 | regex = RequiredRegex(.format(self.ebuild_name)) |
| 114 | match = regex.Search(self.ebuild_symlink.basename) |
| 115 | self.ebuild_version = match.group(1) |
| 116 | |
| 117 | bcsname = self.overlay.basename.replace("overlay-", "bcs-") |
| 118 | url = "gs://chromeos-binaries/HOME/{}/{}/chromeos-base/{}/{}-{}.tbz2" |
| 119 | self.bcs_url = url.format(bcsname, self.overlay.basename, |
| 120 | self.ebuild_name, self.ebuild_name, |
| 121 | self.ebuild_version) |
| 122 | |
| 123 | def _ExtractSymlinkfromEbuild(self, firmware): |
| 124 | ebuild = open(str(self.ebuild_file), "r").read() |
| 125 | regex = "dosym \"{}\" \"/lib/firmware/([a-zA-Z0-9_\\-.]+)\"" |
| 126 | regex = RequiredRegex(regex.format(firmware.device_file)) |
| 127 | match = regex.Search(ebuild, must_succeed=False) |
| 128 | if match: |
| 129 | return match.group(1) |
| 130 | else: |
| 131 | return None |
| 132 | |
| 133 | def GetExistingBinaries(self): |
| 134 | tar_file = NamedTemporaryFile("rb") |
| 135 | res = Execute(["gsutil", "cp", self.bcs_url, tar_file.name]) |
| 136 | if not res: |
| 137 | return |
| 138 | tar_file.seek(0) |
| 139 | tar = tarfile.open(fileobj=tar_file) |
| 140 | |
| 141 | for member in tar.getmembers(): |
| 142 | if not member.isfile(): |
| 143 | continue |
| 144 | name = os.path.basename(member.name) |
| 145 | fileobj = tar.extractfile(member) |
| 146 | binary = FirmwareBinary(name, fileobj) |
| 147 | binary.symlink_name = self._ExtractSymlinkfromEbuild(binary) |
| 148 | yield binary |
| 149 | |
| 150 | def AddBinary(self, binary): |
| 151 | self.binaries[binary.hw_version] = binary |
| 152 | |
| 153 | def GenerateBCSPackage(self, version): |
| 154 | tar_name = "chromeos-touch-firmware-{}-{}".format(self.board, version) |
| 155 | tar_file = "{}.tbz2".format(tar_name) |
| 156 | |
| 157 | tar = tarfile.open(tar_file, "w:bz2") |
| 158 | for binary in self.binaries.values(): |
| 159 | data = io.BytesIO(binary.data) |
| 160 | path = tar_name + str(binary.device_file) |
| 161 | info = tarfile.TarInfo(path) |
| 162 | info.size = len(binary.data) |
| 163 | info.mode = 0755 |
| 164 | info.uid = 0 |
| 165 | info.gid = 0 |
| 166 | info.mtime = time.time() |
| 167 | info.uname = "root" |
| 168 | info.gname = "root" |
| 169 | tar.addfile(info, data) |
| 170 | return Path(tar_file) |
| 171 | |
| 172 | def UpdateVersionSymlink(self, version): |
| 173 | new_symlink = self.ebuild_dir / "{}-{}.ebuild".format( |
| 174 | self.ebuild_name, version) |
| 175 | |
| 176 | old_symlink = self.ebuild_symlink |
| 177 | if new_symlink != old_symlink: |
| 178 | self.ebuild_repo.Move(old_symlink, new_symlink) |
| 179 | |
| 180 | def UpdateBinarySymlinks(self, remote): |
| 181 | device_info = remote.GetDeviceInfo() |
| 182 | devices = device_info.touch_devices |
| 183 | for firmware in self.binaries.values(): |
| 184 | if firmware.symlink_name: |
| 185 | continue |
| 186 | if firmware.hw_version not in devices: |
| 187 | msg = "Cannot find device for binary {}" |
| 188 | raise Exception(msg.format(firmware)) |
| 189 | device = devices[firmware.hw_version] |
| 190 | firmware.symlink_name = device.symlink |
| 191 | |
| 192 | symlink_names = [b.symlink_name for b in self.binaries.values()] |
| 193 | if len(set(symlink_names)) != len(symlink_names): |
| 194 | raise Exception("Duplicate symlink names for firmwares found") |
| 195 | |
| 196 | def UpdateEbuildFile(self, remote): |
| 197 | self.UpdateBinarySymlinks(remote) |
| 198 | |
| 199 | ebuild = self.ebuild_file.Read() |
| 200 | |
| 201 | install_idx = ebuild.find("src_install") |
| 202 | begin = ebuild.find("{", install_idx) |
| 203 | |
| 204 | # find closing bracket |
| 205 | brackets = 0 |
| 206 | end = len(ebuild) |
| 207 | for end in range(begin, len(ebuild)): |
| 208 | if ebuild[end] == "{": |
| 209 | brackets = brackets + 1 |
| 210 | elif ebuild[end] == "}": |
| 211 | brackets = brackets - 1 |
| 212 | if brackets == 0: |
| 213 | end = end + 1 |
| 214 | break |
| 215 | |
| 216 | # write ebuild with new src_install method |
| 217 | out = self.ebuild_file.Open("w") |
| 218 | out.write(ebuild[:begin]) |
| 219 | out.write("{\n") |
| 220 | out.write("\t# automatically generated by mtbringup.\n") |
| 221 | out.write("\tcros-binary_src_install\n\n") |
| 222 | |
| 223 | for firmware in self.binaries.values(): |
| 224 | line = "\tdosym \"%s\" \"/lib/firmware/%s\"\n" |
| 225 | out.write(line % (firmware.device_file, firmware.symlink_name)) |
| 226 | |
| 227 | out.write("}\n") |
| 228 | out.write(ebuild[end:].strip()) |
| 229 | out.close() |
| 230 | |
| 231 | def VerifySymlinks(self, remote): |
| 232 | valid = True |
| 233 | |
| 234 | for firmware in self.binaries.values(): |
| 235 | cmd = "readlink -f /lib/firmware/{}".format(firmware.symlink) |
| 236 | target = remote.Execute(cmd) |
| 237 | |
| 238 | if not target: |
| 239 | msg = "Symlink for firmware '{}' does not exist on device" |
| 240 | print msg.format(firmware) |
| 241 | valid = False |
| 242 | |
| 243 | target = Path(target) |
| 244 | if target.basename != firmware.file.basename: |
| 245 | msg = "Symlink for firmware '{}' does not point to the right file" |
| 246 | print msg.format(firmware) |
| 247 | valid = False |
| 248 | |
| 249 | cmd = "ls {}".format(firmware.device_file) |
| 250 | if remote.Execute(cmd) is False: |
| 251 | msg = "Firmware file {} does not exist on device" |
| 252 | print msg.format(firmware.device_file) |
| 253 | valid = False |
| 254 | |
| 255 | return valid |
| 256 | |
| 257 | def VerifyFirmwareVersions(self, remote): |
| 258 | device_info = remote.GetDeviceInfo(refresh=True) |
| 259 | |
| 260 | valid = True |
| 261 | for device in device_info.touch_devices.values(): |
| 262 | if device.hw_version not in self.binaries: |
| 263 | continue |
| 264 | firmware = self.binaries[device.hw_version] |
| 265 | if firmware.fw_version != device.fw_version: |
| 266 | print "Device {} did not update correctly:".format(device.hw_version) |
| 267 | print "Device version {} != firmware version {}".format( |
| 268 | device.fw_version, firmware.fw_version) |
| 269 | valid = False |
| 270 | return valid |
| 271 | |
| 272 | def __str__(self): |
| 273 | res = " firmwares:\n" |
| 274 | for firmware in self.firmwares.values(): |
| 275 | res += " {}".format(firmware) |
| 276 | return res |