blob: f5b3c63b5281dadfbfc97f8d1bb7db3acbf97602 [file] [log] [blame]
David James8c846492011-01-25 17:07:29 -08001#!/usr/bin/python
2# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import copy
7import mox
8import os
9import prebuilt
10import shutil
11import tempfile
12import unittest
13import urllib
14from chromite.lib import cros_build_lib
15from chromite.lib.binpkg import PackageIndex
16
17PUBLIC_PACKAGES = [{'CPV': 'gtk+/public1', 'SHA1': '1'},
18 {'CPV': 'gtk+/public2', 'SHA1': '2',
19 'PATH': 'gtk%2B/foo.tgz'}]
20PRIVATE_PACKAGES = [{'CPV': 'private', 'SHA1': '3'}]
21
22
23def SimplePackageIndex(header=True, packages=True):
24 pkgindex = PackageIndex()
25 if header:
26 pkgindex.header['URI'] = 'http://www.example.com'
27 if packages:
28 pkgindex.packages = copy.deepcopy(PUBLIC_PACKAGES + PRIVATE_PACKAGES)
29 return pkgindex
30
31
32class TestUpdateFile(unittest.TestCase):
33
34 def setUp(self):
35 self.contents_str = ['# comment that should be skipped',
36 'PKGDIR="/var/lib/portage/pkgs"',
37 'PORTAGE_BINHOST="http://no.thanks.com"',
38 'portage portage-20100310.tar.bz2',
39 'COMPILE_FLAGS="some_value=some_other"',
40 ]
41 temp_fd, self.version_file = tempfile.mkstemp()
42 os.write(temp_fd, '\n'.join(self.contents_str))
43 os.close(temp_fd)
44
45 def tearDown(self):
46 os.remove(self.version_file)
47
48 def _read_version_file(self, version_file=None):
49 """Read the contents of self.version_file and return as a list."""
50 if not version_file:
51 version_file = self.version_file
52
53 version_fh = open(version_file)
54 try:
55 return [line.strip() for line in version_fh.readlines()]
56 finally:
57 version_fh.close()
58
59 def _verify_key_pair(self, key, val):
60 file_contents = self._read_version_file()
61 # ensure key for verify is wrapped on quotes
62 if '"' not in val:
63 val = '"%s"' % val
64 for entry in file_contents:
65 if '=' not in entry:
66 continue
67 file_key, file_val = entry.split('=')
68 if file_key == key:
69 if val == file_val:
70 break
71 else:
72 self.fail('Could not find "%s=%s" in version file' % (key, val))
73
74 def testAddVariableThatDoesNotExist(self):
75 """Add in a new variable that was no present in the file."""
76 key = 'PORTAGE_BINHOST'
77 value = '1234567'
78 prebuilt.UpdateLocalFile(self.version_file, value)
79 print self.version_file
80 current_version_str = self._read_version_file()
81 self._verify_key_pair(key, value)
82 print self.version_file
83
84 def testUpdateVariable(self):
85 """Test updating a variable that already exists."""
86 key, val = self.contents_str[2].split('=')
87 new_val = 'test_update'
88 self._verify_key_pair(key, val)
89 prebuilt.UpdateLocalFile(self.version_file, new_val)
90 self._verify_key_pair(key, new_val)
91
92 def testUpdateNonExistentFile(self):
93 key = 'PORTAGE_BINHOST'
94 value = '1234567'
95 non_existent_file = tempfile.mktemp()
96 try:
97 prebuilt.UpdateLocalFile(non_existent_file, value)
98 file_contents = self._read_version_file(non_existent_file)
99 self.assertEqual(file_contents, ['%s=%s' % (key, value)])
100 finally:
101 if os.path.exists(non_existent_file):
102 os.remove(non_existent_file)
103
104
105class TestPrebuiltFilters(unittest.TestCase):
106
107 def setUp(self):
108 self.tmp_dir = tempfile.mkdtemp()
109 self.private_dir = os.path.join(self.tmp_dir,
110 prebuilt._PRIVATE_OVERLAY_DIR)
111 self.private_structure_base = 'chromeos-overlay/chromeos-base'
112 self.private_pkgs = ['test-package/salt-flavor-0.1.r3.ebuild',
113 'easy/alpha_beta-0.1.41.r3.ebuild',
114 'dev/j-t-r-0.1.r3.ebuild',]
115 self.expected_filters = set(['salt-flavor', 'alpha_beta', 'j-t-r'])
116
117 def tearDown(self):
118 if self.tmp_dir:
119 shutil.rmtree(self.tmp_dir)
120
121 def _CreateNestedDir(self, tmp_dir, dir_structure):
122 for entry in dir_structure:
123 full_path = os.path.join(os.path.join(tmp_dir, entry))
124 # ensure dirs are created
125 try:
126 os.makedirs(os.path.dirname(full_path))
127 if full_path.endswith('/'):
128 # we only want to create directories
129 return
130 except OSError, err:
131 if err.errno == errno.EEXIST:
132 # we don't care if the dir already exists
133 pass
134 else:
135 raise
136 # create dummy files
137 tmp = open(full_path, 'w')
138 tmp.close()
139
140 def _LoadPrivateMockFilters(self):
141 """Load mock filters as defined in the setUp function."""
142 dir_structure = [os.path.join(self.private_structure_base, entry)
143 for entry in self.private_pkgs]
144
145 self._CreateNestedDir(self.private_dir, dir_structure)
146 prebuilt.LoadPrivateFilters(self.tmp_dir)
147
148 def testFilterPattern(self):
149 """Check that particular packages are filtered properly."""
150 self._LoadPrivateMockFilters()
151 packages = ['/some/dir/area/j-t-r-0.1.r3.tbz',
152 '/var/pkgs/new/alpha_beta-0.2.3.4.tbz',
153 '/usr/local/cache/good-0.1.3.tbz',
154 '/usr-blah/b_d/salt-flavor-0.0.3.tbz']
155 expected_list = ['/usr/local/cache/good-0.1.3.tbz']
156 filtered_list = [file for file in packages if not
157 prebuilt.ShouldFilterPackage(file)]
158 self.assertEqual(expected_list, filtered_list)
159
160 def testLoadPrivateFilters(self):
161 self._LoadPrivateMockFilters()
162 prebuilt.LoadPrivateFilters(self.tmp_dir)
163 self.assertEqual(self.expected_filters, prebuilt._FILTER_PACKAGES)
164
165 def testEmptyFiltersErrors(self):
166 """Ensure LoadPrivateFilters errors if an empty list is generated."""
167 os.makedirs(os.path.join(self.tmp_dir, prebuilt._PRIVATE_OVERLAY_DIR))
168 self.assertRaises(prebuilt.FiltersEmpty, prebuilt.LoadPrivateFilters,
169 self.tmp_dir)
170
171
172class TestPrebuilt(unittest.TestCase):
173
174 def setUp(self):
175 self.mox = mox.Mox()
176
177 def tearDown(self):
178 self.mox.UnsetStubs()
179 self.mox.VerifyAll()
180
181 def testGenerateUploadDict(self):
182 base_local_path = '/b/cbuild/build/chroot/build/x86-dogfood/'
183 gs_bucket_path = 'gs://chromeos-prebuilt/host/version'
184 local_path = os.path.join(base_local_path, 'public1.tbz2')
185 self.mox.StubOutWithMock(prebuilt.os.path, 'exists')
186 prebuilt.os.path.exists(local_path).AndReturn(True)
187 self.mox.ReplayAll()
188 pkgs = [{ 'CPV': 'public1' }]
189 result = prebuilt.GenerateUploadDict(base_local_path, gs_bucket_path, pkgs)
190 expected = { local_path: gs_bucket_path + '/public1.tbz2' }
191 self.assertEqual(result, expected)
192
193 def testFailonUploadFail(self):
194 """Make sure we fail if one of the upload processes fail."""
195 files = {'test': '/uasd'}
196 self.assertEqual(prebuilt.RemoteUpload(files), set([('test', '/uasd')]))
197
198 def testDeterminePrebuiltConfHost(self):
199 """Test that the host prebuilt path comes back properly."""
200 expected_path = os.path.join(prebuilt._PREBUILT_MAKE_CONF['amd64'])
201 self.assertEqual(prebuilt.DeterminePrebuiltConfFile('fake_path', 'amd64'),
202 expected_path)
203
204 def testDeterminePrebuiltConf(self):
205 """Test the different known variants of boards for proper path discovery."""
206 fake_path = '/b/cbuild'
207 script_path = os.path.join(fake_path, 'src/scripts/bin')
208 public_overlay_path = os.path.join(fake_path, 'src/overlays')
209 private_overlay_path = os.path.join(fake_path,
210 prebuilt._PRIVATE_OVERLAY_DIR)
211 path_dict = {'private_overlay_path': private_overlay_path,
212 'public_overlay_path': public_overlay_path}
213 # format for targets
214 # board target key in dictionar
215 # Tuple containing cmd run, expected results as cmd obj, and expected output
216
217 # Mock output from cros_overlay_list
218 x86_out = ('%(private_overlay_path)s/chromeos-overlay\n'
219 '%(public_overlay_path)s/overlay-x86-generic\n' % path_dict)
220
221 x86_cmd = './cros_overlay_list --board x86-generic'
222 x86_expected_path = os.path.join(public_overlay_path, 'overlay-x86-generic',
223 'prebuilt.conf')
224 # Mock output from cros_overlay_list
225 tegra2_out = ('%(private_overlay_path)s/chromeos-overlay\n'
226 '%(public_overlay_path)s/overlay-tegra2\n'
227 '%(public_overlay_path)s/overlay-variant-tegra2-seaboard\n'
228 '%(private_overlay_path)s/overlay-tegra2-private\n'
229 '%(private_overlay_path)s/'
230 'overlay-variant-tegra2-seaboard-private\n' % path_dict)
231 tegra2_cmd = './cros_overlay_list --board tegra2 --variant seaboard'
232 tegra2_expected_path = os.path.join(
233 private_overlay_path, 'overlay-variant-tegra2-seaboard-private',
234 'prebuilt.conf')
235
236
237 targets = {'x86-generic': {'cmd': x86_cmd,
238 'output': x86_out,
239 'result': x86_expected_path},
240 'tegra2_seaboard': {'cmd': tegra2_cmd,
241 'output': tegra2_out,
242 'result': tegra2_expected_path}
243 }
244
245 self.mox.StubOutWithMock(prebuilt.cros_build_lib, 'RunCommand')
246 for target, expected_results in targets.iteritems():
247 # create command object for output
248 cmd_result_obj = cros_build_lib.CommandResult()
249 cmd_result_obj.output = expected_results['output']
250 prebuilt.cros_build_lib.RunCommand(
251 expected_results['cmd'].split(), redirect_stdout=True,
252 cwd=script_path).AndReturn(cmd_result_obj)
253
254 self.mox.ReplayAll()
255 for target, expected_results in targets.iteritems():
256 self.assertEqual(
257 prebuilt.DeterminePrebuiltConfFile(fake_path, target),
258 expected_results['result'])
259
260 def testDeterminePrebuiltConfGarbage(self):
261 """Ensure an exception is raised on bad input."""
262 self.assertRaises(prebuilt.UnknownBoardFormat,
263 prebuilt.DeterminePrebuiltConfFile,
264 'fake_path', 'asdfasdf')
265
266
267class TestPackagesFileFiltering(unittest.TestCase):
268
269 def testFilterPkgIndex(self):
270 pkgindex = SimplePackageIndex()
271 pkgindex.RemoveFilteredPackages(lambda pkg: pkg in PRIVATE_PACKAGES)
272 self.assertEqual(pkgindex.packages, PUBLIC_PACKAGES)
273 self.assertEqual(pkgindex.modified, True)
274
275
276class TestPopulateDuplicateDB(unittest.TestCase):
277
278 def testEmptyIndex(self):
279 pkgindex = SimplePackageIndex(packages=False)
280 db = {}
281 pkgindex._PopulateDuplicateDB(db)
282 self.assertEqual(db, {})
283
284 def testNormalIndex(self):
285 pkgindex = SimplePackageIndex()
286 db = {}
287 pkgindex._PopulateDuplicateDB(db)
288 self.assertEqual(len(db), 3)
289 self.assertEqual(db['1'], 'http://www.example.com/gtk%2B/public1.tbz2')
290 self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz')
291 self.assertEqual(db['3'], 'http://www.example.com/private.tbz2')
292
293 def testMissingSHA1(self):
294 db = {}
295 pkgindex = SimplePackageIndex()
296 del pkgindex.packages[0]['SHA1']
297 pkgindex._PopulateDuplicateDB(db)
298 self.assertEqual(len(db), 2)
299 self.assertEqual(db['2'], 'http://www.example.com/gtk%2B/foo.tgz')
300 self.assertEqual(db['3'], 'http://www.example.com/private.tbz2')
301
302 def testFailedPopulate(self):
303 db = {}
304 pkgindex = SimplePackageIndex(header=False)
305 self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db)
306 pkgindex = SimplePackageIndex()
307 del pkgindex.packages[0]['CPV']
308 self.assertRaises(KeyError, pkgindex._PopulateDuplicateDB, db)
309
310
311class TestResolveDuplicateUploads(unittest.TestCase):
312
313 def testEmptyList(self):
314 pkgindex = SimplePackageIndex()
315 pristine = SimplePackageIndex()
316 uploads = pkgindex.ResolveDuplicateUploads([])
317 self.assertEqual(uploads, pristine.packages)
318 self.assertEqual(pkgindex.packages, pristine.packages)
319 self.assertEqual(pkgindex.modified, False)
320
321 def testEmptyIndex(self):
322 pkgindex = SimplePackageIndex()
323 pristine = SimplePackageIndex()
324 empty = SimplePackageIndex(packages=False)
325 uploads = pkgindex.ResolveDuplicateUploads([empty])
326 self.assertEqual(uploads, pristine.packages)
327 self.assertEqual(pkgindex.packages, pristine.packages)
328 self.assertEqual(pkgindex.modified, False)
329
330 def testDuplicates(self):
331 pkgindex = SimplePackageIndex()
332 dup_pkgindex = SimplePackageIndex()
333 expected_pkgindex = SimplePackageIndex()
334 for pkg in expected_pkgindex.packages:
335 pkg.setdefault('PATH', urllib.quote(pkg['CPV'] + '.tbz2'))
336 uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex])
337 self.assertEqual(pkgindex.packages, expected_pkgindex.packages)
338
339 def testMissingSHA1(self):
340 db = {}
341 pkgindex = SimplePackageIndex()
342 dup_pkgindex = SimplePackageIndex()
343 expected_pkgindex = SimplePackageIndex()
344 del pkgindex.packages[0]['SHA1']
345 del expected_pkgindex.packages[0]['SHA1']
346 for pkg in expected_pkgindex.packages[1:]:
347 pkg.setdefault('PATH', pkg['CPV'] + '.tbz2')
348 uploads = pkgindex.ResolveDuplicateUploads([dup_pkgindex])
349 self.assertEqual(pkgindex.packages, expected_pkgindex.packages)
350
351
352class TestWritePackageIndex(unittest.TestCase):
353
354 def setUp(self):
355 self.mox = mox.Mox()
356
357 def tearDown(self):
358 self.mox.UnsetStubs()
359 self.mox.VerifyAll()
360
361 def testSimple(self):
362 pkgindex = SimplePackageIndex()
363 self.mox.StubOutWithMock(pkgindex, 'Write')
364 pkgindex.Write(mox.IgnoreArg())
365 self.mox.ReplayAll()
366 f = pkgindex.WriteToNamedTemporaryFile()
367 self.assertEqual(f.read(), '')
368
369
370if __name__ == '__main__':
371 unittest.main()