David James | 8c84649 | 2011-01-25 17:07:29 -0800 | [diff] [blame^] | 1 | #!/usr/bin/python |
| 2 | # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| 3 | # Use of this source code is governed by a BSD-style license that can be |
| 4 | # found in the LICENSE file. |
| 5 | |
| 6 | import copy |
| 7 | import mox |
| 8 | import os |
| 9 | import prebuilt |
| 10 | import shutil |
| 11 | import tempfile |
| 12 | import unittest |
| 13 | import urllib |
| 14 | from chromite.lib import cros_build_lib |
| 15 | from chromite.lib.binpkg import PackageIndex |
| 16 | |
| 17 | PUBLIC_PACKAGES = [{'CPV': 'gtk+/public1', 'SHA1': '1'}, |
| 18 | {'CPV': 'gtk+/public2', 'SHA1': '2', |
| 19 | 'PATH': 'gtk%2B/foo.tgz'}] |
| 20 | PRIVATE_PACKAGES = [{'CPV': 'private', 'SHA1': '3'}] |
| 21 | |
| 22 | |
| 23 | def SimplePackageIndex(header=True, packages=True): |
| 24 | pkgindex = PackageIndex() |
| 25 | if header: |
| 26 | pkgindex.header['URI'] = 'http://www.example.com' |
| 27 | if packages: |
| 28 | pkgindex.packages = copy.deepcopy(PUBLIC_PACKAGES + PRIVATE_PACKAGES) |
| 29 | return pkgindex |
| 30 | |
| 31 | |
| 32 | class TestUpdateFile(unittest.TestCase): |
| 33 | |
| 34 | def setUp(self): |
| 35 | self.contents_str = ['# comment that should be skipped', |
| 36 | 'PKGDIR="/var/lib/portage/pkgs"', |
| 37 | 'PORTAGE_BINHOST="http://no.thanks.com"', |
| 38 | 'portage portage-20100310.tar.bz2', |
| 39 | 'COMPILE_FLAGS="some_value=some_other"', |
| 40 | ] |
| 41 | temp_fd, self.version_file = tempfile.mkstemp() |
| 42 | os.write(temp_fd, '\n'.join(self.contents_str)) |
| 43 | os.close(temp_fd) |
| 44 | |
| 45 | def tearDown(self): |
| 46 | os.remove(self.version_file) |
| 47 | |
| 48 | def _read_version_file(self, version_file=None): |
| 49 | """Read the contents of self.version_file and return as a list.""" |
| 50 | if not version_file: |
| 51 | version_file = self.version_file |
| 52 | |
| 53 | version_fh = open(version_file) |
| 54 | try: |
| 55 | return [line.strip() for line in version_fh.readlines()] |
| 56 | finally: |
| 57 | version_fh.close() |
| 58 | |
| 59 | def _verify_key_pair(self, key, val): |
| 60 | file_contents = self._read_version_file() |
| 61 | # ensure key for verify is wrapped on quotes |
| 62 | if '"' not in val: |
| 63 | val = '"%s"' % val |
| 64 | for entry in file_contents: |
| 65 | if '=' not in entry: |
| 66 | continue |
| 67 | file_key, file_val = entry.split('=') |
| 68 | if file_key == key: |
| 69 | if val == file_val: |
| 70 | break |
| 71 | else: |
| 72 | self.fail('Could not find "%s=%s" in version file' % (key, val)) |
| 73 | |
| 74 | def testAddVariableThatDoesNotExist(self): |
| 75 | """Add in a new variable that was no present in the file.""" |
| 76 | key = 'PORTAGE_BINHOST' |
| 77 | value = '1234567' |
| 78 | prebuilt.UpdateLocalFile(self.version_file, value) |
| 79 | print self.version_file |
| 80 | current_version_str = self._read_version_file() |
| 81 | self._verify_key_pair(key, value) |
| 82 | print self.version_file |
| 83 | |
| 84 | def testUpdateVariable(self): |
| 85 | """Test updating a variable that already exists.""" |
| 86 | key, val = self.contents_str[2].split('=') |
| 87 | new_val = 'test_update' |
| 88 | self._verify_key_pair(key, val) |
| 89 | prebuilt.UpdateLocalFile(self.version_file, new_val) |
| 90 | self._verify_key_pair(key, new_val) |
| 91 | |
| 92 | def testUpdateNonExistentFile(self): |
| 93 | key = 'PORTAGE_BINHOST' |
| 94 | value = '1234567' |
| 95 | non_existent_file = tempfile.mktemp() |
| 96 | try: |
| 97 | prebuilt.UpdateLocalFile(non_existent_file, value) |
| 98 | file_contents = self._read_version_file(non_existent_file) |
| 99 | self.assertEqual(file_contents, ['%s=%s' % (key, value)]) |
| 100 | finally: |
| 101 | if os.path.exists(non_existent_file): |
| 102 | os.remove(non_existent_file) |
| 103 | |
| 104 | |
| 105 | class TestPrebuiltFilters(unittest.TestCase): |
| 106 | |
| 107 | def setUp(self): |
| 108 | self.tmp_dir = tempfile.mkdtemp() |
| 109 | self.private_dir = os.path.join(self.tmp_dir, |
| 110 | prebuilt._PRIVATE_OVERLAY_DIR) |
| 111 | self.private_structure_base = 'chromeos-overlay/chromeos-base' |
| 112 | self.private_pkgs = ['test-package/salt-flavor-0.1.r3.ebuild', |
| 113 | 'easy/alpha_beta-0.1.41.r3.ebuild', |
| 114 | 'dev/j-t-r-0.1.r3.ebuild',] |
| 115 | self.expected_filters = set(['salt-flavor', 'alpha_beta', 'j-t-r']) |
| 116 | |
| 117 | def tearDown(self): |
| 118 | if self.tmp_dir: |
| 119 | shutil.rmtree(self.tmp_dir) |
| 120 | |
| 121 | def _CreateNestedDir(self, tmp_dir, dir_structure): |
| 122 | for entry in dir_structure: |
| 123 | full_path = os.path.join(os.path.join(tmp_dir, entry)) |
| 124 | # ensure dirs are created |
| 125 | try: |
| 126 | os.makedirs(os.path.dirname(full_path)) |
| 127 | if full_path.endswith('/'): |
| 128 | # we only want to create directories |
| 129 | return |
| 130 | except OSError, err: |
| 131 | if err.errno == errno.EEXIST: |
| 132 | # we don't care if the dir already exists |
| 133 | pass |
| 134 | else: |
| 135 | raise |
| 136 | # create dummy files |
| 137 | tmp = open(full_path, 'w') |
| 138 | tmp.close() |
| 139 | |
| 140 | def _LoadPrivateMockFilters(self): |
| 141 | """Load mock filters as defined in the setUp function.""" |
| 142 | dir_structure = [os.path.join(self.private_structure_base, entry) |
| 143 | for entry in self.private_pkgs] |
| 144 | |
| 145 | self._CreateNestedDir(self.private_dir, dir_structure) |
| 146 | prebuilt.LoadPrivateFilters(self.tmp_dir) |
| 147 | |
| 148 | def testFilterPattern(self): |
| 149 | """Check that particular packages are filtered properly.""" |
| 150 | self._LoadPrivateMockFilters() |
| 151 | packages = ['/some/dir/area/j-t-r-0.1.r3.tbz', |
| 152 | '/var/pkgs/new/alpha_beta-0.2.3.4.tbz', |
| 153 | '/usr/local/cache/good-0.1.3.tbz', |
| 154 | '/usr-blah/b_d/salt-flavor-0.0.3.tbz'] |
| 155 | expected_list = ['/usr/local/cache/good-0.1.3.tbz'] |
| 156 | filtered_list = [file for file in packages if not |
| 157 | prebuilt.ShouldFilterPackage(file)] |
| 158 | self.assertEqual(expected_list, filtered_list) |
| 159 | |
| 160 | def testLoadPrivateFilters(self): |
| 161 | self._LoadPrivateMockFilters() |
| 162 | prebuilt.LoadPrivateFilters(self.tmp_dir) |
| 163 | self.assertEqual(self.expected_filters, prebuilt._FILTER_PACKAGES) |
| 164 | |
| 165 | def testEmptyFiltersErrors(self): |
| 166 | """Ensure LoadPrivateFilters errors if an empty list is generated.""" |
| 167 | os.makedirs(os.path.join(self.tmp_dir, prebuilt._PRIVATE_OVERLAY_DIR)) |
| 168 | self.assertRaises(prebuilt.FiltersEmpty, prebuilt.LoadPrivateFilters, |
| 169 | self.tmp_dir) |
| 170 | |
| 171 | |
| 172 | class TestPrebuilt(unittest.TestCase): |
| 173 | |
| 174 | def setUp(self): |
| 175 | self.mox = mox.Mox() |
| 176 | |
| 177 | def tearDown(self): |
| 178 | self.mox.UnsetStubs() |
| 179 | self.mox.VerifyAll() |
| 180 | |
| 181 | def testGenerateUploadDict(self): |
| 182 | base_local_path = '/b/cbuild/build/chroot/build/x86-dogfood/' |
| 183 | gs_bucket_path = 'gs://chromeos-prebuilt/host/version' |
| 184 | local_path = os.path.join(base_local_path, 'public1.tbz2') |
| 185 | self.mox.StubOutWithMock(prebuilt.os.path, 'exists') |
| 186 | prebuilt.os.path.exists(local_path).AndReturn(True) |
| 187 | self.mox.ReplayAll() |
| 188 | pkgs = [{ 'CPV': 'public1' }] |
| 189 | result = prebuilt.GenerateUploadDict(base_local_path, gs_bucket_path, pkgs) |
| 190 | expected = { local_path: gs_bucket_path + '/public1.tbz2' } |
| 191 | self.assertEqual(result, expected) |
| 192 | |
| 193 | def testFailonUploadFail(self): |
| 194 | """Make sure we fail if one of the upload processes fail.""" |
| 195 | files = {'test': '/uasd'} |
| 196 | self.assertEqual(prebuilt.RemoteUpload(files), set([('test', '/uasd')])) |
| 197 | |
| 198 | def testDeterminePrebuiltConfHost(self): |
| 199 | """Test that the host prebuilt path comes back properly.""" |
| 200 | expected_path = os.path.join(prebuilt._PREBUILT_MAKE_CONF['amd64']) |
| 201 | self.assertEqual(prebuilt.DeterminePrebuiltConfFile('fake_path', 'amd64'), |
| 202 | expected_path) |
| 203 | |
| 204 | def testDeterminePrebuiltConf(self): |
| 205 | """Test the different known variants of boards for proper path discovery.""" |
| 206 | fake_path = '/b/cbuild' |
| 207 | script_path = os.path.join(fake_path, 'src/scripts/bin') |
| 208 | public_overlay_path = os.path.join(fake_path, 'src/overlays') |
| 209 | private_overlay_path = os.path.join(fake_path, |
| 210 | prebuilt._PRIVATE_OVERLAY_DIR) |
| 211 | path_dict = {'private_overlay_path': private_overlay_path, |
| 212 | 'public_overlay_path': public_overlay_path} |
| 213 | # format for targets |
| 214 | # board target key in dictionar |
| 215 | # Tuple containing cmd run, expected results as cmd obj, and expected output |
| 216 | |
| 217 | # Mock output from cros_overlay_list |
| 218 | x86_out = ('%(private_overlay_path)s/chromeos-overlay\n' |
| 219 | '%(public_overlay_path)s/overlay-x86-generic\n' % path_dict) |
| 220 | |
| 221 | x86_cmd = './cros_overlay_list --board x86-generic' |
| 222 | x86_expected_path = os.path.join(public_overlay_path, 'overlay-x86-generic', |
| 223 | 'prebuilt.conf') |
| 224 | # Mock output from cros_overlay_list |
| 225 | tegra2_out = ('%(private_overlay_path)s/chromeos-overlay\n' |
| 226 | '%(public_overlay_path)s/overlay-tegra2\n' |
| 227 | '%(public_overlay_path)s/overlay-variant-tegra2-seaboard\n' |
| 228 | '%(private_overlay_path)s/overlay-tegra2-private\n' |
| 229 | '%(private_overlay_path)s/' |
| 230 | 'overlay-variant-tegra2-seaboard-private\n' % path_dict) |
| 231 | tegra2_cmd = './cros_overlay_list --board tegra2 --variant seaboard' |
| 232 | tegra2_expected_path = os.path.join( |
| 233 | private_overlay_path, 'overlay-variant-tegra2-seaboard-private', |
| 234 | 'prebuilt.conf') |
| 235 | |
| 236 | |
| 237 | targets = {'x86-generic': {'cmd': x86_cmd, |
| 238 | 'output': x86_out, |
| 239 | 'result': x86_expected_path}, |
| 240 | 'tegra2_seaboard': {'cmd': tegra2_cmd, |
| 241 | 'output': tegra2_out, |
| 242 | 'result': tegra2_expected_path} |
| 243 | } |
| 244 | |
| 245 | self.mox.StubOutWithMock(prebuilt.cros_build_lib, 'RunCommand') |
| 246 | for target, expected_results in targets.iteritems(): |
| 247 | # create command object for output |
| 248 | cmd_result_obj = cros_build_lib.CommandResult() |
| 249 | cmd_result_obj.output = expected_results['output'] |
| 250 | prebuilt.cros_build_lib.RunCommand( |
| 251 | expected_results['cmd'].split(), redirect_stdout=True, |
| 252 | cwd=script_path).AndReturn(cmd_result_obj) |
| 253 | |
| 254 | self.mox.ReplayAll() |
| 255 | for target, expected_results in targets.iteritems(): |
| 256 | self.assertEqual( |
| 257 | prebuilt.DeterminePrebuiltConfFile(fake_path, target), |
| 258 | expected_results['result']) |
| 259 | |
| 260 | def testDeterminePrebuiltConfGarbage(self): |
| 261 | """Ensure an exception is raised on bad input.""" |
| 262 | self.assertRaises(prebuilt.UnknownBoardFormat, |
| 263 | prebuilt.DeterminePrebuiltConfFile, |
| 264 | 'fake_path', 'asdfasdf') |
| 265 | |
| 266 | |
| 267 | class TestPackagesFileFiltering(unittest.TestCase): |
| 268 | |
| 269 | def testFilterPkgIndex(self): |
| 270 | pkgindex = SimplePackageIndex() |
| 271 | pkgindex.RemoveFilteredPackages(lambda pkg: pkg in PRIVATE_PACKAGES) |
| 272 | self.assertEqual(pkgindex.packages, PUBLIC_PACKAGES) |
| 273 | self.assertEqual(pkgindex.modified, True) |
| 274 | |
| 275 | |
| 276 | class TestPopulateDuplicateDB(unittest.TestCase): |
| 277 | |
| 278 | def testEmptyIndex(self): |
| 279 | pkgindex = SimplePackageIndex(packages=False) |
| 280 | db = {} |
| 281 | pkgindex._PopulateDuplicateDB(db) |
| 282 | self.assertEqual(db, {}) |
| 283 | |
| 284 | def testNormalIndex(self): |
| 285 | pkgindex = SimplePackageIndex() |
| 286 | db = {} |
| 287 | pkgindex._PopulateDuplicateDB(db) |
| 288 | self.assertEqual(len(db), 3) |
| 289 | self.assertEqual(db['1'], 'http://www.example.com/gtk%2B/public1.tbz2') |
| 290 | self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz') |
| 291 | self.assertEqual(db['3'], 'http://www.example.com/private.tbz2') |
| 292 | |
| 293 | def testMissingSHA1(self): |
| 294 | db = {} |
| 295 | pkgindex = SimplePackageIndex() |
| 296 | del pkgindex.packages[0]['SHA1'] |
| 297 | pkgindex._PopulateDuplicateDB(db) |
| 298 | self.assertEqual(len(db), 2) |
| 299 | self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz') |
| 300 | self.assertEqual(db['3'], 'http://www.example.com/private.tbz2') |
| 301 | |
| 302 | def testFailedPopulate(self): |
| 303 | db = {} |
| 304 | pkgindex = SimplePackageIndex(header=False) |
| 305 | self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db) |
| 306 | pkgindex = SimplePackageIndex() |
| 307 | del pkgindex.packages[0]['CPV'] |
| 308 | self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db) |
| 309 | |
| 310 | |
| 311 | class TestResolveDuplicateUploads(unittest.TestCase): |
| 312 | |
| 313 | def testEmptyList(self): |
| 314 | pkgindex = SimplePackageIndex() |
| 315 | pristine = SimplePackageIndex() |
| 316 | uploads = pkgindex.ResolveDuplicateUploads([]) |
| 317 | self.assertEqual(uploads, pristine.packages) |
| 318 | self.assertEqual(pkgindex.packages, pristine.packages) |
| 319 | self.assertEqual(pkgindex.modified, False) |
| 320 | |
| 321 | def testEmptyIndex(self): |
| 322 | pkgindex = SimplePackageIndex() |
| 323 | pristine = SimplePackageIndex() |
| 324 | empty = SimplePackageIndex(packages=False) |
| 325 | uploads = pkgindex.ResolveDuplicateUploads([empty]) |
| 326 | self.assertEqual(uploads, pristine.packages) |
| 327 | self.assertEqual(pkgindex.packages, pristine.packages) |
| 328 | self.assertEqual(pkgindex.modified, False) |
| 329 | |
| 330 | def testDuplicates(self): |
| 331 | pkgindex = SimplePackageIndex() |
| 332 | dup_pkgindex = SimplePackageIndex() |
| 333 | expected_pkgindex = SimplePackageIndex() |
| 334 | for pkg in expected_pkgindex.packages: |
| 335 | pkg.setdefault('PATH', urllib.quote(pkg['CPV'] + '.tbz2')) |
| 336 | uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex]) |
| 337 | self.assertEqual(pkgindex.packages, expected_pkgindex.packages) |
| 338 | |
| 339 | def testMissingSHA1(self): |
| 340 | db = {} |
| 341 | pkgindex = SimplePackageIndex() |
| 342 | dup_pkgindex = SimplePackageIndex() |
| 343 | expected_pkgindex = SimplePackageIndex() |
| 344 | del pkgindex.packages[0]['SHA1'] |
| 345 | del expected_pkgindex.packages[0]['SHA1'] |
| 346 | for pkg in expected_pkgindex.packages[1:]: |
| 347 | pkg.setdefault('PATH', pkg['CPV'] + '.tbz2') |
| 348 | uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex]) |
| 349 | self.assertEqual(pkgindex.packages, expected_pkgindex.packages) |
| 350 | |
| 351 | |
| 352 | class TestWritePackageIndex(unittest.TestCase): |
| 353 | |
| 354 | def setUp(self): |
| 355 | self.mox = mox.Mox() |
| 356 | |
| 357 | def tearDown(self): |
| 358 | self.mox.UnsetStubs() |
| 359 | self.mox.VerifyAll() |
| 360 | |
| 361 | def testSimple(self): |
| 362 | pkgindex = SimplePackageIndex() |
| 363 | self.mox.StubOutWithMock(pkgindex, 'Write') |
| 364 | pkgindex.Write(mox.IgnoreArg()) |
| 365 | self.mox.ReplayAll() |
| 366 | f = pkgindex.WriteToNamedTemporaryFile() |
| 367 | self.assertEqual(f.read(), '') |
| 368 | |
| 369 | |
| 370 | if __name__ == '__main__': |
| 371 | unittest.main() |