blob: ab500f70352126ac78d3f9f0761194b8b3050b82 [file] [log] [blame]
David Pursell9476bf42015-03-30 13:34:27 -07001# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Unit tests for the deploy module."""
6
7from __future__ import print_function
8
9import json
Ralph Nathane01ccf12015-04-16 10:40:32 -070010import multiprocessing
David Pursell9476bf42015-03-30 13:34:27 -070011import os
12
Ralph Nathane01ccf12015-04-16 10:40:32 -070013from chromite.cli import command
David Pursell9476bf42015-03-30 13:34:27 -070014from chromite.cli import deploy
15from chromite.lib import cros_build_lib
16from chromite.lib import cros_test_lib
Ralph Nathane01ccf12015-04-16 10:40:32 -070017from chromite.lib import remote_access
David Pursell9476bf42015-03-30 13:34:27 -070018try:
19 import portage
20except ImportError:
21 if cros_build_lib.IsInsideChroot():
22 raise
23
24
25# pylint: disable=protected-access
26
27
Ralph Nathane01ccf12015-04-16 10:40:32 -070028class ChromiumOSDeviceFake(object):
29 """Fake for device."""
30
31 def __init__(self):
32 self.board = 'board'
33 self.hostname = None
34 self.username = None
35 self.port = None
36 self.lsb_release = None
37
38 def IsPathWritable(self, _):
39 return True
40
41
David Pursell9476bf42015-03-30 13:34:27 -070042class ChromiumOSDeviceHandlerFake(object):
43 """Fake for chromite.lib.remote_access.ChomiumOSDeviceHandler."""
44
45 class RemoteAccessFake(object):
46 """Fake for chromite.lib.remote_access.RemoteAccess."""
47
48 def __init__(self):
49 self.remote_sh_output = None
50
51 def RemoteSh(self, *_args, **_kwargs):
52 return cros_build_lib.CommandResult(output=self.remote_sh_output)
53
Ralph Nathane01ccf12015-04-16 10:40:32 -070054 def __init__(self, *_args, **_kwargs):
David Pursell67a82762015-04-30 17:26:59 -070055 self._agent = self.RemoteAccessFake()
56
Ralph Nathane01ccf12015-04-16 10:40:32 -070057 # TODO(dpursell): Mock remote access object in cros_test_lib (brbug.com/986).
David Pursell67a82762015-04-30 17:26:59 -070058 def GetAgent(self):
59 return self._agent
David Pursell9476bf42015-03-30 13:34:27 -070060
Ralph Nathane01ccf12015-04-16 10:40:32 -070061 def __exit__(self, _type, _value, _traceback):
62 pass
63
64 def __enter__(self):
65 return ChromiumOSDeviceFake()
66
67
68class BrilloDeployOperationFake(deploy.BrilloDeployOperation):
69 """Fake for deploy.BrilloDeployOperation."""
70 def __init__(self, pkg_count, emerge, queue):
71 super(BrilloDeployOperationFake, self).__init__(pkg_count, emerge)
72 self._queue = queue
73
Ralph Nathandc14ed92015-04-22 11:17:40 -070074 def ParseOutput(self, output=None):
75 super(BrilloDeployOperationFake, self).ParseOutput(output)
Ralph Nathane01ccf12015-04-16 10:40:32 -070076 self._queue.put('advance')
77
David Pursell9476bf42015-03-30 13:34:27 -070078
79class DbApiFake(object):
80 """Fake for Portage dbapi."""
81
82 def __init__(self, pkgs):
83 self.pkg_db = {}
84 for cpv, slot, rdeps_raw, build_time in pkgs:
85 self.pkg_db[cpv] = {
86 'SLOT': slot, 'RDEPEND': rdeps_raw, 'BUILD_TIME': build_time}
87
88 def cpv_all(self):
89 return self.pkg_db.keys()
90
91 def aux_get(self, cpv, keys):
92 pkg_info = self.pkg_db[cpv]
93 return [pkg_info[key] for key in keys]
94
95
Ralph Nathane01ccf12015-04-16 10:40:32 -070096class PackageScannerFake(object):
97 """Fake for PackageScanner."""
98
99 def __init__(self, packages):
100 self.pkgs = packages
101 self.listed = []
102 self.num_updates = None
103
104 def Run(self, _device, _root, _packages, _update, _deep, _deep_rev):
105 return self.pkgs, self.listed, self.num_updates
106
107
David Pursell9476bf42015-03-30 13:34:27 -0700108class PortageTreeFake(object):
109 """Fake for Portage tree."""
110
111 def __init__(self, dbapi):
112 self.dbapi = dbapi
113
114
Ralph Nathane01ccf12015-04-16 10:40:32 -0700115class TestInstallPackageScanner(cros_test_lib.MockOutputTestCase):
David Pursell9476bf42015-03-30 13:34:27 -0700116 """Test the update package scanner."""
117 _BOARD = 'foo_board'
118 _BUILD_ROOT = '/build/%s' % _BOARD
119 _VARTREE = [
120 ('foo/app1-1.2.3-r4', '0', 'foo/app2 !foo/app3', '1413309336'),
121 ('foo/app2-4.5.6-r7', '0', '', '1413309336'),
122 ('foo/app4-2.0.0-r1', '0', 'foo/app1 foo/app5', '1413309336'),
123 ('foo/app5-3.0.7-r3', '0', '', '1413309336'),
124 ]
125
126 def setUp(self):
127 """Patch imported modules."""
128 self.PatchObject(cros_build_lib, 'GetChoice', return_value=0)
129 self.device = ChromiumOSDeviceHandlerFake()
130 self.scanner = deploy._InstallPackageScanner(self._BUILD_ROOT)
131
132 def SetupVartree(self, vartree_pkgs):
David Pursell67a82762015-04-30 17:26:59 -0700133 self.device.GetAgent().remote_sh_output = json.dumps(vartree_pkgs)
David Pursell9476bf42015-03-30 13:34:27 -0700134
135 def SetupBintree(self, bintree_pkgs):
136 bintree = PortageTreeFake(DbApiFake(bintree_pkgs))
137 build_root = os.path.join(self._BUILD_ROOT, '')
138 portage_db = {build_root: {'bintree': bintree}}
139 self.PatchObject(portage, 'create_trees', return_value=portage_db)
140
141 def ValidatePkgs(self, actual, expected, constraints=None):
142 # Containing exactly the same packages.
143 self.assertEquals(sorted(expected), sorted(actual))
144 # Packages appear in the right order.
145 if constraints is not None:
146 for needs, needed in constraints:
147 self.assertGreater(actual.index(needs), actual.index(needed))
148
149 def testRunUpdatedVersion(self):
150 self.SetupVartree(self._VARTREE)
151 app1 = 'foo/app1-1.2.5-r4'
152 self.SetupBintree([
153 (app1, '0', 'foo/app2 !foo/app3', '1413309336'),
154 ('foo/app2-4.5.6-r7', '0', '', '1413309336'),
155 ])
156 installs, listed, num_updates = self.scanner.Run(
157 self.device, '/', ['app1'], True, True, True)
158 self.ValidatePkgs(installs, [app1])
159 self.ValidatePkgs(listed, [app1])
160 self.assertEquals(num_updates, 1)
161
162 def testRunUpdatedBuildTime(self):
163 self.SetupVartree(self._VARTREE)
164 app1 = 'foo/app1-1.2.3-r4'
165 self.SetupBintree([
166 (app1, '0', 'foo/app2 !foo/app3', '1413309350'),
167 ('foo/app2-4.5.6-r7', '0', '', '1413309336'),
168 ])
169 installs, listed, num_updates = self.scanner.Run(
170 self.device, '/', ['app1'], True, True, True)
171 self.ValidatePkgs(installs, [app1])
172 self.ValidatePkgs(listed, [app1])
173 self.assertEquals(num_updates, 1)
174
175 def testRunExistingDepUpdated(self):
176 self.SetupVartree(self._VARTREE)
177 app1 = 'foo/app1-1.2.5-r2'
178 app2 = 'foo/app2-4.5.8-r3'
179 self.SetupBintree([
180 (app1, '0', 'foo/app2 !foo/app3', '1413309350'),
181 (app2, '0', '', '1413309350'),
182 ])
183 installs, listed, num_updates = self.scanner.Run(
184 self.device, '/', ['app1'], True, True, True)
185 self.ValidatePkgs(installs, [app1, app2], constraints=[(app1, app2)])
186 self.ValidatePkgs(listed, [app1])
187 self.assertEquals(num_updates, 2)
188
189 def testRunMissingDepUpdated(self):
190 self.SetupVartree(self._VARTREE)
191 app1 = 'foo/app1-1.2.5-r2'
192 app6 = 'foo/app6-1.0.0-r1'
193 self.SetupBintree([
194 (app1, '0', 'foo/app2 !foo/app3 foo/app6', '1413309350'),
195 ('foo/app2-4.5.6-r7', '0', '', '1413309336'),
196 (app6, '0', '', '1413309350'),
197 ])
198 installs, listed, num_updates = self.scanner.Run(
199 self.device, '/', ['app1'], True, True, True)
200 self.ValidatePkgs(installs, [app1, app6], constraints=[(app1, app6)])
201 self.ValidatePkgs(listed, [app1])
202 self.assertEquals(num_updates, 1)
203
204 def testRunExistingRevDepUpdated(self):
205 self.SetupVartree(self._VARTREE)
206 app1 = 'foo/app1-1.2.5-r2'
207 app4 = 'foo/app4-2.0.1-r3'
208 self.SetupBintree([
209 (app1, '0', 'foo/app2 !foo/app3', '1413309350'),
210 (app4, '0', 'foo/app1 foo/app5', '1413309350'),
211 ('foo/app5-3.0.7-r3', '0', '', '1413309336'),
212 ])
213 installs, listed, num_updates = self.scanner.Run(
214 self.device, '/', ['app1'], True, True, True)
215 self.ValidatePkgs(installs, [app1, app4], constraints=[(app4, app1)])
216 self.ValidatePkgs(listed, [app1])
217 self.assertEquals(num_updates, 2)
218
219 def testRunMissingRevDepNotUpdated(self):
220 self.SetupVartree(self._VARTREE)
221 app1 = 'foo/app1-1.2.5-r2'
222 app6 = 'foo/app6-1.0.0-r1'
223 self.SetupBintree([
224 (app1, '0', 'foo/app2 !foo/app3', '1413309350'),
225 (app6, '0', 'foo/app1', '1413309350'),
226 ])
227 installs, listed, num_updates = self.scanner.Run(
228 self.device, '/', ['app1'], True, True, True)
229 self.ValidatePkgs(installs, [app1])
230 self.ValidatePkgs(listed, [app1])
231 self.assertEquals(num_updates, 1)
232
233 def testRunTransitiveDepsUpdated(self):
234 self.SetupVartree(self._VARTREE)
235 app1 = 'foo/app1-1.2.5-r2'
236 app2 = 'foo/app2-4.5.8-r3'
237 app4 = 'foo/app4-2.0.0-r1'
238 app5 = 'foo/app5-3.0.8-r2'
239 self.SetupBintree([
240 (app1, '0', 'foo/app2 !foo/app3', '1413309350'),
241 (app2, '0', '', '1413309350'),
242 (app4, '0', 'foo/app1 foo/app5', '1413309350'),
243 (app5, '0', '', '1413309350'),
244 ])
245 installs, listed, num_updates = self.scanner.Run(
246 self.device, '/', ['app1'], True, True, True)
247 self.ValidatePkgs(installs, [app1, app2, app4, app5],
248 constraints=[(app1, app2), (app4, app1), (app4, app5)])
249 self.ValidatePkgs(listed, [app1])
250 self.assertEquals(num_updates, 4)
251
252 def testRunDisjunctiveDepsExistingUpdated(self):
253 self.SetupVartree(self._VARTREE)
254 app1 = 'foo/app1-1.2.5-r2'
255 self.SetupBintree([
256 (app1, '0', '|| ( foo/app6 foo/app2 ) !foo/app3', '1413309350'),
257 ('foo/app2-4.5.6-r7', '0', '', '1413309336'),
258 ])
259 installs, listed, num_updates = self.scanner.Run(
260 self.device, '/', ['app1'], True, True, True)
261 self.ValidatePkgs(installs, [app1])
262 self.ValidatePkgs(listed, [app1])
263 self.assertEquals(num_updates, 1)
264
265 def testRunDisjunctiveDepsDefaultUpdated(self):
266 self.SetupVartree(self._VARTREE)
267 app1 = 'foo/app1-1.2.5-r2'
268 app7 = 'foo/app7-1.0.0-r1'
269 self.SetupBintree([
270 (app1, '0', '|| ( foo/app6 foo/app7 ) !foo/app3', '1413309350'),
271 (app7, '0', '', '1413309350'),
272 ])
273 installs, listed, num_updates = self.scanner.Run(
274 self.device, '/', ['app1'], True, True, True)
275 self.ValidatePkgs(installs, [app1, app7], constraints=[(app1, app7)])
276 self.ValidatePkgs(listed, [app1])
277 self.assertEquals(num_updates, 1)
Ralph Nathane01ccf12015-04-16 10:40:32 -0700278
279
280class TestDeploy(cros_test_lib.ProgressBarTestCase):
281 """Test deploy.Deploy."""
282
283 def setUp(self):
284 self.PatchObject(remote_access, 'ChromiumOSDeviceHandler',
285 side_effect=ChromiumOSDeviceHandlerFake)
286 self.PatchObject(cros_build_lib, 'GetBoard', return_value=None)
287 self.PatchObject(cros_build_lib, 'GetSysroot', return_value='sysroot')
288 self.package_scanner = self.PatchObject(deploy, '_InstallPackageScanner')
289 self.emerge = self.PatchObject(deploy, '_Emerge', return_value=None)
290 self.unmerge = self.PatchObject(deploy, '_Unmerge', return_value=None)
291
292 def testDeployEmerge(self):
293 """Test that deploy._Emerge is called for each package."""
294 packages = ['foo', 'bar', 'foobar']
295 self.package_scanner.return_value = PackageScannerFake(packages)
296
297 deploy.Deploy(None, 'package', force=True, clean_binpkg=False)
298
299 # Check that deploy._Emerge is called the right number of times.
300 self.assertEqual(self.emerge.call_count, len(packages))
301 self.assertEqual(self.unmerge.call_count, 0)
302
303 def testDeployUnmerge(self):
304 """Test that deploy._Unmerge is called for each package."""
305 packages = ['foo', 'bar', 'foobar']
306 self.package_scanner.return_value = PackageScannerFake(packages)
307
308 deploy.Deploy(None, 'package', force=True, clean_binpkg=False,
309 emerge=False)
310
311 # Check that deploy._Unmerge is called the right number of times.
312 self.assertEqual(self.emerge.call_count, 0)
313 self.assertEqual(self.unmerge.call_count, len(packages))
314
315 def testDeployMergeWithProgressBar(self):
316 """Test that BrilloDeployOperation.Run() is called for merge."""
317 packages = ['foo', 'bar', 'foobar']
318 self.package_scanner.return_value = PackageScannerFake(packages)
319
320 run = self.PatchObject(deploy.BrilloDeployOperation, 'Run',
321 return_value=None)
322
323 self.PatchObject(command, 'UseProgressBar', return_value=True)
324 deploy.Deploy(None, 'package', force=True, clean_binpkg=False)
325
326 # Check that BrilloDeployOperation.Run was called.
327 self.assertTrue(run.called)
328
329 def testDeployUnmergeWithProgressBar(self):
330 """Test that BrilloDeployOperation.Run() is called for unmerge."""
331 packages = ['foo', 'bar', 'foobar']
332 self.package_scanner.return_value = PackageScannerFake(packages)
333
334 run = self.PatchObject(deploy.BrilloDeployOperation, 'Run',
335 return_value=None)
336
337 self.PatchObject(command, 'UseProgressBar', return_value=True)
338 deploy.Deploy(None, 'package', force=True, clean_binpkg=False,
339 emerge=False)
340
341 # Check that BrilloDeployOperation.Run was called.
342 self.assertTrue(run.called)
343
344 def testBrilloDeployMergeOperation(self):
345 """Test that BrilloDeployOperation works for merge."""
346 def func(queue):
Ralph Nathan90475a12015-05-20 13:19:01 -0700347 for event in op.MERGE_EVENTS:
Ralph Nathane01ccf12015-04-16 10:40:32 -0700348 queue.get()
349 print(event)
350
351 queue = multiprocessing.Queue()
352 # Emerge one package.
353 op = BrilloDeployOperationFake(1, True, queue)
354
355 with self.OutputCapturer():
356 op.Run(func, queue)
357
358 # Check that the progress bar prints correctly.
Ralph Nathan90475a12015-05-20 13:19:01 -0700359 self.AssertProgressBarAllEvents(len(op.MERGE_EVENTS))
Ralph Nathane01ccf12015-04-16 10:40:32 -0700360
361 def testBrilloDeployUnmergeOperation(self):
362 """Test that BrilloDeployOperation works for unmerge."""
363 def func(queue):
Ralph Nathan90475a12015-05-20 13:19:01 -0700364 for event in op.UNMERGE_EVENTS:
Ralph Nathane01ccf12015-04-16 10:40:32 -0700365 queue.get()
366 print(event)
367
368 queue = multiprocessing.Queue()
369 # Unmerge one package.
370 op = BrilloDeployOperationFake(1, False, queue)
371
372 with self.OutputCapturer():
373 op.Run(func, queue)
374
375 # Check that the progress bar prints correctly.
Ralph Nathan90475a12015-05-20 13:19:01 -0700376 self.AssertProgressBarAllEvents(len(op.UNMERGE_EVENTS))