blob: 432c50f4bf1bac418c7e6e884816c566eacc156b [file] [log] [blame]
Hung-Te Linf2f78f72012-02-08 19:27:11 +08001#!/usr/bin/python -u
2#
3# -*- coding: utf-8 -*-
4#
5# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
6# Use of this source code is governed by a BSD-style license that can be
7# found in the LICENSE file.
8
9'''
10The main factory flow that runs the factory test and finalizes a device.
11'''
12
13import logging, os, pickle, pipes, signal, subprocess, sys, tempfile
14import threading, time, traceback
15from collections import deque
16from optparse import OptionParser
17from Queue import Queue
18
19import factory_common
20from autotest_lib.client.bin import prespawner
21from autotest_lib.client.cros import factory
22from autotest_lib.client.cros.factory import state
23from autotest_lib.client.cros.factory import TestState
24from autotest_lib.client.cros.factory.event import Event
25from autotest_lib.client.cros.factory.event import EventClient
26from autotest_lib.client.cros.factory.event import EventServer
27
28
29SCRIPT_PATH = os.path.realpath(__file__)
30CROS_FACTORY_LIB_PATH = os.path.dirname(SCRIPT_PATH)
Hung-Te Lin6bb48552012-02-09 14:37:43 +080031FACTORY_UI_PATH = os.path.join(CROS_FACTORY_LIB_PATH, 'ui')
Hung-Te Linf2f78f72012-02-08 19:27:11 +080032CLIENT_PATH = os.path.realpath(os.path.join(CROS_FACTORY_LIB_PATH, '..', '..'))
33DEFAULT_TEST_LIST_PATH = os.path.join(
34 CLIENT_PATH , 'site_tests', 'suite_Factory', 'test_list')
35HWID_CFG_PATH = '/usr/local/share/chromeos-hwid/cfg'
36
37
38def get_hwid_cfg():
39 '''
40 Returns the HWID config tag, or an empty string if none can be found.
41 '''
42 if 'CROS_HWID' in os.environ:
43 return os.environ['CROS_HWID']
44 if os.path.exists(HWID_CFG_PATH):
45 with open(HWID_CFG_PATH, 'rt') as hwid_cfg_handle:
46 return hwid_cfg_handle.read().strip()
47 return ''
48
49
50def find_test_list():
51 '''
52 Returns the path to the active test list, based on the HWID config tag.
53 '''
54 hwid_cfg = get_hwid_cfg()
55
56 # Try in order: test_list, test_list.$hwid_cfg, test_list.all
57 if hwid_cfg:
58 test_list = '%s_%s' % (DEFAULT_TEST_LIST_PATH, hwid_cfg)
59 if os.path.exists(test_list):
60 logging.info('Using special test list: %s', test_list)
61 return test_list
62 logging.info('WARNING: no specific test list for config: %s', hwid_cfg)
63
64 test_list = DEFAULT_TEST_LIST_PATH
65 if os.path.exists(test_list):
66 return test_list
67
68 test_list = ('%s.all' % DEFAULT_TEST_LIST_PATH)
69 if os.path.exists(test_list):
70 logging.info('Using default test list: ' + test_list)
71 return test_list
72 logging.info('ERROR: Cannot find any test list.')
73
74
75def is_process_alive(pid):
76 '''
77 Returns true if the named process is alive and not a zombie.
78 '''
79 try:
80 with open("/proc/%d/stat" % pid) as f:
81 return f.readline().split()[2] != 'Z'
82 except IOError:
83 return False
84
85
86def kill_process_tree(process, caption):
87 '''
88 Kills a process and all its subprocesses.
89
90 @param process: The process to kill (opened with the subprocess module).
91 @param caption: A caption describing the process.
92 '''
93 for sig in [signal.SIGTERM, signal.SIGKILL]:
94 pids = [process.pid]
95 try:
96 process = subprocess.Popen(["pgrep", "-P", str(process.pid)],
97 stdout=subprocess.PIPE)
98 stdout, stderr = process.communicate()
99 if len(stdout):
100 pids.extend(int(x) for x in stdout.strip().split("\n"))
101 except subprocess.CalledProcessError:
102 pass # No child processes
103 logging.info('Stopping %s (pid=%s)...', caption, sorted(pids))
104
105 for i in range(25): # Try 25 times (200 ms between tries)
106 for pid in pids:
107 try:
Jon Salza751ce52012-03-15 14:59:33 +0800108 logging.info("Sending signal %s to %d" % (sig, pid))
Hung-Te Linf2f78f72012-02-08 19:27:11 +0800109 os.kill(pid, sig)
110 except OSError:
111 pass
Jon Salza751ce52012-03-15 14:59:33 +0800112 pids = filter(is_process_alive, pids)
113 if not pids:
114 return
Hung-Te Linf2f78f72012-02-08 19:27:11 +0800115 time.sleep(0.2) # Sleep 200 ms and try again
116
117 logging.warn('Failed to stop %s process. Ignoring.', caption)
118
119
120class TestInvocation(object):
121 '''
122 State for an active test.
123 '''
124 def __init__(self, goofy, test, on_completion=None):
125 '''Constructor.
126
127 @param goofy: The controlling Goofy object.
128 @param test: The FactoryTest object to test.
129 @param on_completion: Callback to invoke in the goofy event queue
130 on completion.
131 '''
132 self.goofy = goofy
133 self.test = test
134 self.thread = threading.Thread(target=self._run)
135 self.on_completion = on_completion
136
137 self._lock = threading.Lock()
138 # The following properties are guarded by the lock.
139 self._aborted = False
140 self._completed = False
141 self._process = None
142
143 def start(self):
144 '''Starts the test thread.'''
145 self.thread.start()
146
147 def abort_and_join(self):
148 '''
149 Aborts a test (must be called from the event controller thread).
150 '''
151 with self._lock:
152 self._aborted = True
153 if self._process:
154 kill_process_tree(self._process, 'autotest')
155 if self.thread:
156 self.thread.join()
157 with self._lock:
158 # Should be set by the thread itself, but just in case...
159 self._completed = True
160
161 def is_completed(self):
162 '''
163 Returns true if the test has finished.
164 '''
165 with self._lock:
166 return self._completed
167
168 def _invoke_autotest(self, test, dargs):
169 '''
170 Invokes an autotest test.
171
172 This method encapsulates all the magic necessary to run a single
173 autotest test using the 'autotest' command-line tool and get a
174 sane pass/fail status and error message out. It may be better
175 to just write our own command-line wrapper for job.run_test
176 instead.
177
178 @param test: the autotest to run
179 @param dargs: the argument map
180 @return: tuple of status (TestState.PASSED or TestState.FAILED) and
181 error message, if any
182 '''
183 status = TestState.FAILED
184 error_msg = 'Unknown'
185
186 try:
187 client_dir = CLIENT_PATH
188
189 output_dir = '%s/results/%s' % (client_dir, test.path)
190 if not os.path.exists(output_dir):
191 os.makedirs(output_dir)
192 tmp_dir = tempfile.mkdtemp(prefix='tmp', dir=output_dir)
193
194 control_file = os.path.join(tmp_dir, 'control')
195 result_file = os.path.join(tmp_dir, 'result')
196 args_file = os.path.join(tmp_dir, 'args')
197
198 with open(args_file, 'w') as f:
199 pickle.dump(dargs, f)
200
201 # Create a new control file to use to run the test
202 with open(control_file, 'w') as f:
203 print >> f, 'import common, traceback, utils'
204 print >> f, 'import cPickle as pickle'
205 print >> f, ("success = job.run_test("
206 "'%s', **pickle.load(open('%s')))" % (
207 test.autotest_name, args_file))
208
209 print >> f, (
210 "pickle.dump((success, "
211 "str(job.last_error) if job.last_error else None), "
212 "open('%s', 'w'), protocol=2)"
213 % result_file)
214
215 args = ['%s/bin/autotest' % client_dir,
216 '--output_dir', output_dir,
217 control_file]
218
219 factory.console.info('Running test %s' % test.path)
220 logging.debug('Test command line: %s', ' '.join(
221 [pipes.quote(arg) for arg in args]))
222
223 with self._lock:
224 self._process = prespawner.spawn(
225 args, {'CROS_FACTORY_TEST_PATH': test.path})
226
227 returncode = self._process.wait()
228 with self._lock:
229 if self._aborted:
230 error_msg = 'Aborted by operator'
231 return
232
233 if returncode:
234 # Only happens when there is an autotest-level problem (not when
235 # the test actually failed).
236 error_msg = 'autotest returned with code %d' % returncode
237 return
238
239 with open(result_file) as f:
240 try:
241 success, error_msg = pickle.load(f)
242 except: # pylint: disable=W0702
243 logging.exception('Unable to retrieve autotest results')
244 error_msg = 'Unable to retrieve autotest results'
245 return
246
247 if success:
248 status = TestState.PASSED
249 error_msg = ''
250 except Exception: # pylint: disable=W0703
251 traceback.print_exc(sys.stderr)
252 finally:
253 factory.console.info('Test %s: %s' % (test.path, status))
254 return status, error_msg # pylint: disable=W0150
255
256 def _run(self):
257 with self._lock:
258 if self._aborted:
259 return
260
261 count = self.test.update_state(
262 status=TestState.ACTIVE, increment_count=1, error_msg='').count
263
264 test_tag = '%s_%s' % (self.test.path, count)
265 dargs = dict(self.test.dargs)
266 dargs.update({'tag': test_tag,
267 'test_list_path': self.goofy.test_list_path})
268
269 status, error_msg = self._invoke_autotest(self.test, dargs)
270 self.test.update_state(status=status, error_msg=error_msg,
271 visible=False)
272 with self._lock:
273 self._completed = True
274 self.goofy.run_queue.put(self.goofy.reap_completed_tests)
275 self.goofy.run_queue.put(self.on_completion)
276
277
278class Goofy(object):
279 '''
280 The main factory flow.
281
282 Note that all methods in this class must be invoked from the main
283 (event) thread. Other threads, such as callbacks and TestInvocation
284 methods, should instead post events on the run queue.
285
286 TODO: Unit tests. (chrome-os-partner:7409)
287
288 Properties:
289 state_instance: An instance of FactoryState.
290 state_server: The FactoryState XML/RPC server.
291 state_server_thread: A thread running state_server.
292 event_server: The EventServer socket server.
293 event_server_thread: A thread running event_server.
294 event_client: A client to the event server.
Hung-Te Lin6bb48552012-02-09 14:37:43 +0800295 ui_process: The factory ui process object.
Hung-Te Linf2f78f72012-02-08 19:27:11 +0800296 run_queue: A queue of callbacks to invoke from the main thread.
297 invocations: A map from FactoryTest objects to the corresponding
298 TestInvocations objects representing active tests.
299 tests_to_run: A deque of tests that should be run when the current
300 test(s) complete.
301 options: Command-line options.
302 args: Command-line args.
303 test_list_path: The path to the test list.
304 test_list: The test list.
305 '''
306 def __init__(self):
307 self.state_instance = None
308 self.state_server = None
309 self.state_server_thread = None
310 self.event_server = None
311 self.event_server_thread = None
312 self.event_client = None
313 self.ui_process = None
314 self.run_queue = Queue()
315 self.invocations = {}
316 self.tests_to_run = deque()
317 self.visible_test = None
318
319 self.options = None
320 self.args = None
321 self.test_list_path = None
322 self.test_list = None
323
324 def __del__(self):
325 if self.ui_process:
326 kill_process_tree(self.ui_process, 'ui')
327 if self.state_server_thread:
328 logging.info('Stopping state server')
329 self.state_server.shutdown()
330 self.state_server_thread.join()
331 if self.event_server_thread:
332 logging.info('Stopping event server')
333 self.event_server.shutdown() # pylint: disable=E1101
334 self.event_server_thread.join()
335 prespawner.stop()
336
337 def start_state_server(self):
338 self.state_instance, self.state_server = state.create_server()
339 logging.info('Starting state server')
340 self.state_server_thread = threading.Thread(
341 target=self.state_server.serve_forever)
342 self.state_server_thread.start()
343
344 def start_event_server(self):
345 self.event_server = EventServer()
346 logging.info('Starting factory event server')
347 self.event_server_thread = threading.Thread(
348 target=self.event_server.serve_forever) # pylint: disable=E1101
349 self.event_server_thread.start()
350
351 self.event_client = EventClient(
352 callback=self.handle_event, event_loop=self.run_queue)
353
354 def start_ui(self):
355 ui_proc_args = [FACTORY_UI_PATH, self.test_list_path]
356 logging.info('Starting ui %s', ui_proc_args)
357 self.ui_process = subprocess.Popen(ui_proc_args)
358 logging.info('Waiting for UI to come up...')
359 self.event_client.wait(
360 lambda event: event.type == Event.Type.UI_READY)
361 logging.info('UI has started')
362
363 def set_visible_test(self, test):
364 if self.visible_test == test:
365 return
366
367 if test:
368 test.update_state(visible=True)
369 if self.visible_test:
370 self.visible_test.update_state(visible=False)
371 self.visible_test = test
372
373 def handle_reboot_complete(self, test, state):
374 '''
375 Handles the case where a reboot was detected during a reboot step.
376
377 @param test: The RebootStep.
378 @param state: The test state.
379 '''
380 state = test.update_state(increment_reboot_count=1)
381 logging.info('Detected reboot (%d of %d)',
382 state.reboot_count, test.iterations)
383 if state.reboot_count == test.iterations:
384 # Good!
385 test.update_state(status=TestState.PASSED, error_msg='')
386 elif state.reboot_count > test.iterations:
387 # Rebooted too many times
388 test.update_state(status=TestState.FAILED,
389 error_msg='Too many reboots')
390 else:
391 # Need to reboot again
392 self.reboot()
393
394 def init_states(self):
395 '''
396 Initializes all states on startup.
397 '''
398 for test in self.test_list.get_all_tests():
399 # Make sure the state server knows about all the tests,
400 # defaulting to an untested state.
401 test.update_state(update_parent=False, visible=False)
402
403 # Any 'active' tests should be marked as failed now.
404 for test in self.test_list.walk():
405 state = test.get_state()
406 if isinstance(test, factory.InformationScreen):
407 test.update_state(status=TestState.UNTESTED, error_msg='')
408 elif state.status == TestState.ACTIVE:
409 if isinstance(test, factory.RebootStep):
410 # Rebooted while the test was active - that's good.
411 self.handle_reboot_complete(test, state)
412 else:
413 test.update_state(status=TestState.FAILED,
414 error_msg='Unknown (reboot?)')
415
416 def show_next_active_test(self):
417 '''
418 Rotates to the next visible active test.
419 '''
420 self.reap_completed_tests()
421 active_tests = [
422 t for t in self.test_list.walk()
423 if t.is_leaf() and t.get_state().status == TestState.ACTIVE]
424 if not active_tests:
425 return
426
427 try:
428 next_test = active_tests[
429 (active_tests.index(self.visible_test) + 1) % len(active_tests)]
430 except ValueError: # visible_test not present in active_tests
431 next_test = active_tests[0]
432
433 self.set_visible_test(next_test)
434
435 def handle_event(self, event):
436 '''
437 Handles an event from the event server.
438 '''
439 if event.type == 'kbd_shortcut':
440 if event.key == 'Tab':
441 self.show_next_active_test()
442 test = self.test_list.kbd_shortcut_map.get(event.key)
443 if test:
444 invoc = self.invocations.get(test)
445 if invoc and test.backgroundable:
446 # Already running: just bring to the front if it
447 # has a UI.
448 logging.info('Setting visible test to %s', test.path)
449 self.event_client.post_event(
450 Event(Event.Type.SET_VISIBLE_TEST,
451 path=test.path))
452
453 self.abort_active_tests()
454 for t in test.walk():
455 t.update_state(status=TestState.UNTESTED)
456 self.run_tests(test)
457
458 def run_next_test(self):
459 '''
460 Runs the next eligible test (or tests) in self.tests_to_run.
461 '''
462 self.reap_completed_tests()
463 while self.tests_to_run:
464 logging.debug('Tests to run: %s',
465 [x.path for x in self.tests_to_run])
466
467 test = self.tests_to_run[0]
468
469 if test in self.invocations:
470 logging.info('Next test %s is already running', test.path)
471 self.tests_to_run.popleft()
472 return
473
474 if self.invocations and not (test.backgroundable and all(
475 [x.backgroundable for x in self.invocations])):
476 logging.debug('Waiting for non-backgroundable tests to '
477 'complete before running %s', test.path)
478 return
479
480 self.tests_to_run.popleft()
481
482 if isinstance(test, factory.RebootStep):
483 test.update_state(status=TestState.ACTIVE, increment_count=1,
484 error_msg='', reboot_count=0)
485 # Save pending test list in the state server
486 self.state_instance.set_shared_data(
487 'tests_after_reboot',
488 [t.path for t in self.tests_to_run])
489 self.reboot()
490
491 invoc = TestInvocation(self, test, on_completion=self.run_next_test)
492 self.invocations[test] = invoc
493 if self.visible_test is None and test.has_ui:
494 self.set_visible_test(test)
495 invoc.start()
496
497 def run_tests(self, root, untested_only=False):
498 '''
499 Runs tests under root.
500
501 The tests are run in order unless one fails (then stops).
502 Backgroundable tests are run simultaneously; when a foreground test is
503 encountered, we wait for all active tests to finish before continuing.
504 '''
505 self.tests_to_run = deque()
506 for x in root.walk():
507 if not x.is_leaf():
508 continue
509 if untested_only and x.get_state().status != TestState.UNTESTED:
510 continue
511 self.tests_to_run.append(x)
512 self.run_next_test()
513
514 def reap_completed_tests(self):
515 '''
516 Removes completed tests from the set of active tests.
517
518 Also updates the visible test if it was reaped.
519 '''
520 for t, v in dict(self.invocations).iteritems():
521 if v.is_completed():
522 del self.invocations[t]
523
524 if (self.visible_test is None or
525 self.visible_test not in self.invocations):
526 self.set_visible_test(None)
527 # Make the first running test, if any, the visible test
528 for t in self.test_list.walk():
529 if t in self.invocations:
530 self.set_visible_test(t)
531 break
532
533 def abort_active_tests(self):
534 '''
535 Kills and waits for all active tests.
536 '''
537 self.reap_completed_tests()
538 for test, invoc in self.invocations.items():
539 factory.console.info('Killing active test %s...' % test.path)
540 invoc.abort_and_join()
541 factory.console.info('Killed %s' % test.path)
542 del self.invocations[test]
543 self.reap_completed_tests()
544
545 def reboot(self):
546 '''
547 Reboots the machine (from a RebootTest).
548 '''
549 logging.info('Rebooting')
550 subprocess.check_call('sync')
551 subprocess.check_call('reboot')
552 time.sleep(30)
553 assert False, 'Never reached (should reboot)'
554
555 def main(self):
556 parser = OptionParser()
557 parser.add_option('-v', '--verbose', dest='verbose',
558 action='store_true',
559 help='Enable debug logging')
560 parser.add_option('--print_test_list', dest='print_test_list',
561 metavar='FILE',
562 help='Read and print test list FILE, and exit')
563 (self.options, self.args) = parser.parse_args()
564
565 factory.init_logging('goofy', verbose=self.options.verbose)
566
567 if self.options.print_test_list:
568 print (factory.read_test_list(self.options.print_test_list).
569 __repr__(recursive=True))
570 return
571
572 logging.info('Started')
573
574 self.start_state_server()
575 # Update HWID configuration tag.
576 self.state_instance.set_shared_data('hwid_cfg', get_hwid_cfg())
577
578 self.test_list_path = find_test_list()
579 self.test_list = factory.read_test_list(self.test_list_path,
580 self.state_instance)
581 logging.info('TEST_LIST:\n%s', self.test_list.__repr__(recursive=True))
582
583 self.init_states()
584 self.start_event_server()
585 self.start_ui()
586 prespawner.start()
587
588 def state_change_callback(test, state):
589 self.event_client.post_event(
590 Event(Event.Type.STATE_CHANGE,
591 path=test.path, state=state))
592 self.test_list.state_change_callback = state_change_callback
593
594 try:
595 tests_after_reboot = self.state_instance.get_shared_data(
596 'tests_after_reboot')
597 except KeyError:
598 tests_after_reboot = None
599
600 if tests_after_reboot is not None:
601 logging.info('Resuming tests after reboot: %s', tests_after_reboot)
602 self.state_instance.set_shared_data('tests_after_reboot', None)
603 self.tests_to_run.extend(
604 self.test_list.lookup_path(t) for t in tests_after_reboot)
605 self.run_next_test()
606 else:
607 self.run_tests(self.test_list, untested_only=True)
608
609 # Process events forever.
610 while True:
611 event = self.run_queue.get()
612 if not event:
613 # Shutdown request (although this currently never happens).
614 self.run_queue.task_done()
615 break
616
617 try:
618 event()
619 except Exception as e: # pylint: disable=W0703
620 logging.error('Error in event loop: %s', e)
621 traceback.print_exc(sys.stderr)
622 # But keep going
623 finally:
624 self.run_queue.task_done()
625
626
627if __name__ == '__main__':
628 Goofy().main()