Add factory test automation module using E2E test framework.
BUG=none
TEST=manually test with automated generic test list.
Change-Id: I4eafe521b3e1cf3fe80a5b538272c774efe059aa
Reviewed-on: https://chromium-review.googlesource.com/181310
Reviewed-by: Ricky Liang <jcliang@chromium.org>
Commit-Queue: Ricky Liang <jcliang@chromium.org>
Tested-by: Ricky Liang <jcliang@chromium.org>
diff --git a/py/goofy/goofy.py b/py/goofy/goofy.py
index 9441ce0..1b0dbdf 100755
--- a/py/goofy/goofy.py
+++ b/py/goofy/goofy.py
@@ -47,6 +47,8 @@
from cros.factory.test import shopfloor
from cros.factory.test import utils
from cros.factory.test.test_lists import test_lists
+from cros.factory.test.e2e_test.common import (
+ AutomationMode, AutomationModePrompt, ParseAutomationMode)
from cros.factory.test.event import Event
from cros.factory.test.event import EventClient
from cros.factory.test.event import EventServer
@@ -412,8 +414,9 @@
def handle_shutdown_complete(self, test, test_state):
"""Handles the case where a shutdown was detected during a shutdown step.
- @param test: The ShutdownStep.
- @param test_state: The test state.
+ Args:
+ test: The ShutdownStep.
+ test_state: The test state.
"""
test_state = test.update_state(increment_shutdown_count=1)
logging.info('Detected shutdown (%d of %d)',
@@ -734,8 +737,7 @@
self.tests_to_run.clear()
return
while self.tests_to_run:
- logging.debug('Tests to run: %s',
- [x.path for x in self.tests_to_run])
+ logging.debug('Tests to run: %s', [x.path for x in self.tests_to_run])
test = self.tests_to_run[0]
@@ -754,7 +756,7 @@
if self.invocations and not (test.backgroundable and all(
[x.backgroundable for x in self.invocations])):
logging.debug('Waiting for non-backgroundable tests to '
- 'complete before running %s', test.path)
+ 'complete before running %s', test.path)
return
if test.get_state().skip:
@@ -802,28 +804,33 @@
if isinstance(test, factory.ShutdownStep):
if os.path.exists(NO_REBOOT_FILE):
test.update_state(
- status=TestState.FAILED, increment_count=1,
- error_msg=('Skipped shutdown since %s is present' %
- NO_REBOOT_FILE))
+ status=TestState.FAILED, increment_count=1,
+ error_msg=('Skipped shutdown since %s is present' %
+ NO_REBOOT_FILE))
+ continue
+
+ if (test.operation == factory.ShutdownStep.HALT and
+ self.options.automation_mode == AutomationMode.FULL):
+ logging.info('Skip halt in full automation mode.')
+ test.update_state(status=TestState.PASSED)
continue
test.update_state(status=TestState.ACTIVE, increment_count=1,
- error_msg='', shutdown_count=0)
+ error_msg='', shutdown_count=0)
if self._prompt_cancel_shutdown(test, 1):
self.event_log.Log('reboot_cancelled')
test.update_state(
- status=TestState.FAILED, increment_count=1,
- error_msg='Shutdown aborted by operator',
- shutdown_count=0)
+ status=TestState.FAILED, increment_count=1,
+ error_msg='Shutdown aborted by operator',
+ shutdown_count=0)
continue
# Save pending test list in the state server
self.state_instance.set_shared_data(
- 'tests_after_shutdown',
- [t.path for t in self.tests_to_run])
+ 'tests_after_shutdown',
+ [t.path for t in self.tests_to_run])
# Save shutdown time
- self.state_instance.set_shared_data('shutdown_time',
- time.time())
+ self.state_instance.set_shared_data('shutdown_time', time.time())
with self.env.lock:
self.event_log.Log('shutdown', operation=test.operation)
@@ -841,12 +848,10 @@
else:
# Just pass (e.g., in the chroot).
test.update_state(status=TestState.PASSED)
- self.state_instance.set_shared_data(
- 'tests_after_shutdown', None)
+ self.state_instance.set_shared_data('tests_after_shutdown', None)
# Send event with no fields to indicate that there is no
# longer a pending shutdown.
- self.event_client.post_event(Event(
- Event.Type.PENDING_SHUTDOWN))
+ self.event_client.post_event(Event(Event.Type.PENDING_SHUTDOWN))
continue
self._run_test(test, test.iterations, test.retries)
@@ -947,8 +952,10 @@
Backgroundable tests are run simultaneously; when a foreground test is
encountered, we wait for all active tests to finish before continuing.
- @param subtrees: Node or nodes containing tests to run (may either be
- a single test or a list). Duplicates will be ignored.
+ Args:
+ subtrees: Node or nodes containing tests to run (may either be
+ a single test or a list). Duplicates will be ignored.
+ untested_only: True to run untested tests only.
"""
if type(subtrees) != list:
subtrees = [subtrees]
@@ -1250,9 +1257,9 @@
help='Use FILE as test list')
parser.add_option('--dummy_shopfloor', action='store_true',
help='Use a dummy shopfloor server')
- parser.add_option('--automation', dest='automation',
- action='store_true',
- help='Enable automation on running factory test')
+ parser.add_option('--automation-mode',
+ choices=[m.lower() for m in AutomationMode],
+ default='none', help="Factory test automation mode.")
parser.add_option('--guest_login', dest='guest_login', default=False,
action='store_true',
help='Log in as guest. This will not own the TPM.')
@@ -1318,6 +1325,14 @@
self.state_instance.del_shared_data('shutdown_time', optional=True)
self.state_instance.del_shared_data('startup_error', optional=True)
+ self.options.automation_mode = ParseAutomationMode(
+ self.options.automation_mode)
+ self.state_instance.set_shared_data('automation_mode',
+ self.options.automation_mode)
+ self.state_instance.set_shared_data(
+ 'automation_mode_prompt',
+ AutomationModePrompt[self.options.automation_mode])
+
try:
self.InitTestLists()
except: # pylint: disable=W0702
@@ -1966,7 +1981,8 @@
def handle_switch_test(self, event):
"""Switches to a particular test.
- @param event: The SWITCH_TEST event.
+ Args:
+ event: The SWITCH_TEST event.
"""
test = self.test_list.lookup_path(event.path)
if not test:
diff --git a/py/goofy/goofy_remote.py b/py/goofy/goofy_remote.py
index cb3e466..4a51549 100755
--- a/py/goofy/goofy_remote.py
+++ b/py/goofy/goofy_remote.py
@@ -5,7 +5,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-'''rsyncs goofy and runs on a remote device.'''
+"""rsyncs goofy and runs on a remote device."""
import argparse
import glob
@@ -18,6 +18,7 @@
import factory_common # pylint: disable=W0611
from cros.factory.test import factory
+from cros.factory.test.e2e_test.common import AutomationMode
from cros.factory.test.test_lists import test_lists
from cros.factory.test.utils import Retry, in_chroot
from cros.factory.utils import file_utils
@@ -30,6 +31,11 @@
rsync_command = None
+class GoofyRemoteException(Exception):
+ """Goofy remote exception."""
+ pass
+
+
def GetBoard(host):
logging.info('Checking release board on %s...', host)
release = Spawn(ssh_command + [host, 'cat /etc/lsb-release'],
@@ -115,6 +121,9 @@
help='remove password from test_list')
parser.add_argument('-s', dest='shopfloor_host',
help='set shopfloor host')
+ parser.add_argument('--automation-mode',
+ choices=[m.lower() for m in AutomationMode],
+ default='none', help="Factory test automation mode.")
parser.add_argument('--shopfloor_port', dest='shopfloor_port', type=int,
default=8082, help='set shopfloor port')
parser.add_argument('--board', '-b', dest='board',
@@ -219,7 +228,8 @@
if args.restart:
Spawn(ssh_command +
[args.host, '/usr/local/factory/bin/factory_restart'] +
- (['-a'] if args.clear_state else []),
+ (['-a'] if args.clear_state else []) +
+ ['--automation-mode', '%s' % args.automation_mode],
check_call=True, log=True)
if args.run_test:
diff --git a/py/goofy/invocation.py b/py/goofy/invocation.py
index fca5c41..99647ee 100755
--- a/py/goofy/invocation.py
+++ b/py/goofy/invocation.py
@@ -36,6 +36,7 @@
from cros.factory.test import test_ui
from cros.factory.test import utils
from cros.factory.test.args import Args
+from cros.factory.test.e2e_test.common import AutomationMode
from cros.factory.test.event import Event
from cros.factory.test.factory import TestState
from cros.factory.test.test_lists.test_lists import BuildAllTestLists
@@ -51,6 +52,11 @@
# pylint: disable=W0702
+class InvocationError(Exception):
+ """Invocation error."""
+ pass
+
+
class TestArgEnv(object):
"""Environment for resolving test arguments.
@@ -125,20 +131,31 @@
class PyTestInfo(object):
- """A class to hold all the data needed when invoking a test."""
+ """A class to hold all the data needed when invoking a test.
+
+ Properties:
+ test_list: The test list name or ID to get the factory test info from.
+ path: The path of the test in the test list.
+ pytest_name: The name of the factory test to run.
+ args: Arguments passing down to the factory test.
+ results_path: The path to the result file.
+ test_case_id: The ID of the test case to run.
+ automation_mode: The enabled automation mode.
+ """
# A special test case ID to tell RunPytest to run the pytest directly instead
# of invoking it in a subprocess.
NO_SUBPROCESS = '__NO_SUBPROCESS__'
def __init__(self, test_list, path, pytest_name, args, results_path,
- test_case_id=None):
+ test_case_id=None, automation_mode=None):
self.test_list = test_list
self.path = path
self.pytest_name = pytest_name
self.args = args
self.results_path = results_path
self.test_case_id = test_case_id
+ self.automation_mode = automation_mode
def ReadTestList(self):
"""Reads and returns the test list."""
@@ -177,8 +194,8 @@
"""
self.goofy = goofy
self.test = test
- self.thread = threading.Thread(target=self._run,
- name='TestInvocation-%s' % test.path)
+ self.thread = threading.Thread(
+ target=self._run, name='TestInvocation-%s' % test.path)
self.on_completion = on_completion
self.uuid = event_log.TimedUuid()
self.output_dir = os.path.join(factory.get_test_data_root(),
@@ -267,17 +284,18 @@
to just write our own command-line wrapper for job.run_test
instead.
- @param test: the autotest to run
- @param dargs: the argument map
- @return: tuple of status (TestState.PASSED or TestState.FAILED) and
- error message, if any
+ Returns:
+ tuple of status (TestState.PASSED or TestState.FAILED) and error message,
+ if any
"""
assert self.test.autotest_name
test_tag = '%s_%s' % (self.test.path, self.count)
dargs = dict(self.test.dargs)
- dargs.update({'tag': test_tag,
- 'test_list_path': self.goofy.options.test_list})
+ dargs.update({
+ 'tag': test_tag,
+ 'test_list_path': self.goofy.options.test_list
+ })
status = TestState.FAILED
error_msg = 'Unknown'
@@ -385,14 +403,29 @@
logging.exception('Unable to resolve test arguments')
return TestState.FAILED, 'Unable to resolve test arguments: %s' % e
+ pytest_name = self.test.pytest_name
+ if (self.test.has_automator and
+ self.goofy.options.automation_mode != AutomationMode.NONE):
+ logging.info('Enable factory test automator for %r', pytest_name)
+ if os.path.exists(os.path.join(
+ factory.FACTORY_PATH, 'py', 'test', 'pytests', pytest_name,
+ pytest_name + '_automator_private.py')):
+ pytest_name += '_automator_private'
+ elif os.path.exists(os.path.join(
+ factory.FACTORY_PATH, 'py', 'test', 'pytests', pytest_name,
+ pytest_name + '_automator.py')):
+ pytest_name += '_automator'
+ else:
+ raise InvocationError('Cannot find automator for %r' % pytest_name)
+
with open(info_path, 'w') as info:
pickle.dump(PyTestInfo(
test_list=self.goofy.options.test_list,
path=self.test.path,
- pytest_name=self.test.pytest_name,
+ pytest_name=pytest_name,
args=args,
- results_path=results_path),
- info)
+ results_path=results_path,
+ automation_mode=self.goofy.options.automation_mode), info)
# Invoke the unittest driver in a separate process.
with open(self.log_path, 'wb', 0) as log:
@@ -407,8 +440,7 @@
cmd_line, self.log_path)
self.env_additions['CROS_PROC_TITLE'] = (
- '%s.py (factory pytest %s)' % (
- self.test.pytest_name, self.output_dir))
+ '%s.py (factory pytest %s)' % (pytest_name, self.output_dir))
env = dict(os.environ)
env.update(self.env_additions)
@@ -667,7 +699,13 @@
A list of strings of test case IDs.
"""
test_cases = []
- _RecursiveApply(lambda t: test_cases.append(t.id()), suite)
+ def FilterTestCase(test):
+ # Filter out the test case from base Automator class.
+ if test.id() == 'cros.factory.test.e2e_test.automator.Automator.runTest':
+ return
+ test_cases.append(test.id())
+
+ _RecursiveApply(FilterTestCase, suite)
return test_cases
@@ -774,12 +812,19 @@
The loaded pytest module object.
"""
from cros.factory.test import pytests
+ base_pytest_name = pytest_name
+ for suffix in ('_e2etest', '_automator', '_automator_private'):
+ base_pytest_name = re.sub(suffix, '', base_pytest_name)
+
try:
- base_pytest_name = re.sub(r'_e2etest', r'', pytest_name)
__import__('cros.factory.test.pytests.%s.%s' %
(base_pytest_name, pytest_name))
return getattr(getattr(pytests, base_pytest_name), pytest_name)
except ImportError:
+ logging.info(
+ ('Cannot import cros.factory.test.pytests.%s.%s. '
+ 'Fall back to cros.factory.test.pytests.%s'),
+ base_pytest_name, pytest_name, pytest_name)
__import__('cros.factory.test.pytests.%s' % pytest_name)
return getattr(pytests, pytest_name)
diff --git a/py/goofy/js/goofy.js b/py/goofy/js/goofy.js
index 000b4da..5bc3445 100644
--- a/py/goofy/js/goofy.js
+++ b/py/goofy/js/goofy.js
@@ -594,6 +594,12 @@
*/
this.testLists = [];
+ /**
+ * Whether any automation mode is enabled.
+ * @type {boolean}
+ */
+ this.automationEnabled = false;
+
// Set up magic keyboard shortcuts.
goog.events.listen(
window, goog.events.EventType.KEYDOWN, this.keyListener, true, this);
@@ -659,7 +665,8 @@
mainAndConsole.setInitialSize(
viewportSize.height -
Math.max(cros.factory.LOG_PANE_MIN_HEIGHT,
- 1 - cros.factory.LOG_PANE_HEIGHT_FRACTION));
+ viewportSize.height *
+ cros.factory.LOG_PANE_HEIGHT_FRACTION));
goog.debug.catchErrors(goog.bind(function(info) {
try {
@@ -732,10 +739,14 @@
goog.events.listen(
window, goog.events.EventType.RESIZE,
function(event) {
- topSplitPane.setSize(
- goog.dom.getViewportSize(goog.dom.getWindow(document) ||
- window));
- });
+ var viewportSize = goog.dom.getViewportSize(
+ goog.dom.getWindow(document) || window);
+ if (this.automationEnabled) {
+ var indicator = document.getElementById('goofy-automation-div');
+ viewportSize.height -= indicator.offsetHeight;
+ }
+ topSplitPane.setSize(viewportSize);
+ }, false, this);
// Whenever we get focus, try to focus any visible iframe (if no modal
// dialog is visible).
@@ -854,6 +865,11 @@
function() {
// Unable to retrieve the key; that's fine, no startup error!
});
+ this.sendRpc(
+ 'get_shared_data', ['automation_mode'],
+ function(mode) {
+ this.setAutomationMode(mode);
+ });
var timer = new goog.Timer(1000);
goog.events.listen(timer, goog.Timer.TICK, this.updateTime, false, this);
@@ -883,6 +899,26 @@
};
/**
+ * Sets up the automation mode indicator bar.
+ *
+ * @param {string} mode
+ */
+cros.factory.Goofy.prototype.setAutomationMode = function(mode) {
+ if (mode != 'NONE') {
+ this.automationEnabled = true;
+ this.sendRpc(
+ 'get_shared_data', ['automation_mode_prompt'],
+ function(prompt) {
+ document.getElementById(
+ 'goofy-automation-div').innerHTML = prompt;
+ });
+ }
+ this.updateCSSClasses();
+ goog.events.fireListeners(
+ window, goog.events.EventType.RESIZE, false, null);
+};
+
+/**
* Gets an invocation for a test (creating it if necessary).
*
* @param {string} path
@@ -913,6 +949,8 @@
this.engineeringMode);
goog.dom.classes.enable(doc.body, 'goofy-operator-mode',
!this.engineeringMode);
+ goog.dom.classes.enable(doc.body, 'goofy-enable-automation',
+ this.automationEnabled);
}
};
diff --git a/py/goofy/static/goofy.css b/py/goofy/static/goofy.css
index 60b5869..36e3cc8 100644
--- a/py/goofy/static/goofy.css
+++ b/py/goofy/static/goofy.css
@@ -175,6 +175,23 @@
width: 21; height: 21;
}
+.goofy-automation-indicator {
+ color: white;
+ display: none;
+ font-size: 150%;
+ font-weight: bold;
+ height: 0px;
+ text-align: center;
+}
+.goofy-enable-automation #goofy-automation-div {
+ background-color: navy;
+ display: block;
+ height: 36px;
+}
+.goofy-enable-automation #goofy-splitpane {
+ top: 36px;
+}
+
#goofy-eth-indicator {
background-image: url('images/eth_off.png');
background-repeat: no-repeat;
diff --git a/py/goofy/static/index.html b/py/goofy/static/index.html
index 6b74f1c..f579ece 100644
--- a/py/goofy/static/index.html
+++ b/py/goofy/static/index.html
@@ -10,6 +10,7 @@
</script>
</head>
<body>
+ <div class="goofy-automation-indicator" id="goofy-automation-div"></div>
<div class="goog-splitpane" id="goofy-splitpane">
<div class="goog-splitpane-first-container">
<div class="goofy-horizontal-border">
diff --git a/py/test/e2e_test/automator.py b/py/test/e2e_test/automator.py
new file mode 100644
index 0000000..766f66d
--- /dev/null
+++ b/py/test/e2e_test/automator.py
@@ -0,0 +1,152 @@
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Factory test automation module."""
+
+import logging
+
+import factory_common # pylint: disable=W0611
+from cros.factory.common import MakeList
+from cros.factory.hwid import common
+from cros.factory.test import utils
+from cros.factory.test.e2e_test import e2e_test
+from cros.factory.test.e2e_test.common import AutomationMode, DEFAULT, CHROOT
+
+
+class AutomationError(Exception):
+ """Automation error."""
+ pass
+
+
+class AutomatorMetaclass(e2e_test.E2ETestMetaclass):
+ """Metaclass for Automator class.
+
+ This metaclass is used to hold the cache for board automation function map.
+ The cache is copied to the created Automator subclass after the subclass
+ object is created, and is reset to empty after copy.
+ """
+ automator_registry = {}
+
+ def __init__(mcs, name, bases, attrs):
+ # Copy the constructed board automation function map cache to the automator
+ # subclass and reset the cache.
+ mcs.automator_for_board = AutomatorMetaclass.automator_registry
+ AutomatorMetaclass.automator_registry = {}
+ super(AutomatorMetaclass, mcs).__init__(name, bases, attrs)
+
+
+class AutomatorSetting(object):
+ """A class to hold the settings for a board automation function."""
+ def __init__(self, function, override_dargs=None,
+ automation_mode=AutomationMode.PARTIAL,
+ wait_for_factory_test=True):
+ self.function = function
+ self.override_dargs = override_dargs or {}
+ if not automation_mode in AutomationMode:
+ raise AutomationError('Invalid automation mode %r' % automation_mode)
+ self.automation_mode = automation_mode
+ self.wait_for_factory_test = wait_for_factory_test
+
+
+class Automator(e2e_test.E2ETest):
+ """The base automator class."""
+ __metaclass__ = AutomatorMetaclass
+
+ automator_for_board = None
+
+ def runTest(self):
+ """The main automator method.
+
+ This method tries to locate and start the automation function with the
+ following logic:
+
+ 1) Look for an automation function with current enabled automation mode
+ of the current board.
+ 2) If not automation function is found in 1), look for an automation
+ function with current enabled automation mode in DEFAULT board.
+ 3) If an automation function is found, start it.
+ 4) If no automation function was found, start the factory test and wait
+ for it to pass.
+ """
+ if utils.in_chroot():
+ board = CHROOT
+ else:
+ board = common.ProbeBoard()
+
+ automator_setting = None
+ mode = self.test_info.automation_mode
+
+ for b in (board, DEFAULT):
+ if b in self.automator_for_board:
+ setting = self.automator_for_board[b].get(mode)
+ if setting:
+ automator_setting = setting
+ break
+
+ # Start factory test.
+ dargs = automator_setting.override_dargs if automator_setting else {}
+ self._InitFactoryTest(dargs=dargs)
+ self.StartFactoryTest()
+
+ if automator_setting:
+ logging.info('Start %s automation function for factory test %r.',
+ mode, self.test_info.pytest_name)
+ automator_setting.function(self)
+ if automator_setting.wait_for_factory_test:
+ self.pytest_thread.join()
+ self.WaitForPass()
+ else:
+ logging.warn('Factory test %r does not have %s automation function. '
+ 'Simply wait for the factory test to end.',
+ self.test_info.pytest_name, mode)
+ self.pytest_thread.join()
+ self.WaitForPass()
+
+
+def AutomationFunction(boards=(DEFAULT,), override_dargs=None,
+ automation_mode=AutomationMode.PARTIAL,
+ wait_for_factory_test=True):
+ """A decorator to create a test automation function.
+
+ Args:
+ boards: The list of boards this automation function is for.
+ override_dargs: A dict of dargs to override.
+ automation_mode: The list of automation mode under which this automation
+ function is enabled
+ wait_for_factory_test: Whether to wait for the factory test to finish.
+
+ Returns:
+ A decorator for automation function.
+ """
+ def Decorator(automation_function):
+ if not automation_function.__name__.startswith('automate'):
+ raise AutomationError(
+ ('Invalid automation function: %r: automation function\'s name '
+ 'must start with "automate"') % automation_function.__name__)
+
+ modes = MakeList(automation_mode)
+
+ for board in boards:
+ registry = AutomatorMetaclass.automator_registry
+ if board in registry:
+ for mode in modes:
+ if mode in registry[board]:
+ existing_function = registry[board][mode].function
+ raise AutomationError(
+ ('More than one automation function (%r and %r) registered '
+ 'for board %r of mode %r') % (
+ existing_function.function.__name__,
+ automation_function.__name__, board, mode))
+ else:
+ registry[board] = {}
+
+ for mode in modes:
+ AutomatorMetaclass.automator_registry[board][mode] = (
+ AutomatorSetting(automation_function,
+ override_dargs=override_dargs,
+ automation_mode=mode,
+ wait_for_factory_test=wait_for_factory_test))
+ return automation_function
+
+ return Decorator
diff --git a/py/test/e2e_test/common.py b/py/test/e2e_test/common.py
new file mode 100644
index 0000000..598c691
--- /dev/null
+++ b/py/test/e2e_test/common.py
@@ -0,0 +1,38 @@
+# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Common constants and utility methods."""
+
+
+import factory_common # pylint: disable=W0611
+from cros.factory.test import utils
+
+
+DEFAULT = '__DEFAULT__'
+CHROOT = '__CHROOT__'
+
+AutomationMode = utils.Enum(['NONE', 'PARTIAL', 'FULL'])
+AutomationModePrompt = {
+ AutomationMode.NONE: None,
+ AutomationMode.PARTIAL: 'Partial automation mode; manual tests are run.',
+ AutomationMode.FULL: 'Full automation mode; manual tests are skipped.'
+}
+
+
+def ParseAutomationMode(mode):
+ """Parses the given mode string to AutomationMode enumeration.
+
+ Args:
+ mode: An automation mode string.
+
+ Returns:
+ The parsed Enum string.
+
+ Raises:
+ ValueError if the given mode string is invalid.
+ """
+ if mode.upper() not in AutomationMode:
+ raise ValueError('Invalid mode string %r; valid values are: %r' %
+ (mode, list(AutomationMode)))
+ return mode.upper()
diff --git a/py/test/factory.py b/py/test/factory.py
index 13dde29..96e5a4d 100644
--- a/py/test/factory.py
+++ b/py/test/factory.py
@@ -2,8 +2,8 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+"""Common types and routines for factory test infrastructure.
-'''
This library provides common types and routines for the factory test
infrastructure. This library explicitly does not import gtk, to
allow its use by the autotest control process.
@@ -12,8 +12,9 @@
from cros.factory.test import factory
factory.console.info('...') # Or warn, or error
-'''
+"""
+# pylint: disable=W0105
import getpass
import logging
@@ -59,7 +60,7 @@
def get_factory_root(subdir=None):
- '''Returns the root for logging and state.
+ """Returns the root for logging and state.
This is usually /var/log, or /tmp/factory.$USER if in the chroot, but may be
overridden by the CROS_FACTORY_ROOT environment variable.
@@ -68,7 +69,7 @@
Args:
subdir: If not None, returns that subdirectory.
- '''
+ """
ret = (os.environ.get('CROS_FACTORY_ROOT') or
(('/tmp/factory.%s' % getpass.getuser())
if utils.in_chroot() else '/var/factory'))
@@ -79,17 +80,17 @@
def get_log_root():
- '''Returns the root for logs'''
+ """Returns the root for logs"""
return get_factory_root('log')
def get_state_root():
- '''Returns the root for all factory state.'''
+ """Returns the root for all factory state."""
return get_factory_root('state')
def get_test_data_root():
- '''Returns the root for all test logs/state.'''
+ """Returns the root for all test logs/state."""
return get_factory_root('tests')
@@ -108,7 +109,7 @@
def get_current_test_metadata():
- '''Returns metadata for the currently executing test, if any.'''
+ """Returns metadata for the currently executing test, if any."""
path = os.environ.get("CROS_FACTORY_TEST_METADATA")
if not path or not os.path.exists(path):
return {}
@@ -118,7 +119,7 @@
def get_lsb_data():
- '''Reads all key-value pairs from system lsb-* configuration files.'''
+ """Reads all key-value pairs from system lsb-* configuration files."""
# TODO(hungte) Re-implement using regex.
# lsb-* file format:
# [#]KEY="VALUE DATA"
@@ -147,11 +148,11 @@
def get_current_md5sum():
- '''Returns MD5SUM of the current autotest directory.
+ """Returns MD5SUM of the current autotest directory.
Returns None if there has been no update (i.e., unable to read
the MD5SUM file).
- '''
+ """
if os.path.exists(FACTORY_MD5SUM_PATH):
return open(FACTORY_MD5SUM_PATH, 'r').read().strip()
else:
@@ -176,14 +177,15 @@
def get_verbose_log_file():
- '''
- Returns an opened log file. Note that this returns a file instead of a
- logger (so the verbose log is not picked up by root logger.) Therefore,
- the caller is responsible for flushing and closing this file.
+ """Returns an opened log file.
+
+ Note that this returns a file instead of a logger (so the verbose log is not
+ picked up by root logger.) Therefore, the caller is responsible for flushing
+ and closing this file.
The log file name will contain test invocation ID and thus this method
can only be called from a test.
- '''
+ """
invocation = os.environ['CROS_FACTORY_TEST_INVOCATION']
log_name = '%s-log-%s' % (get_current_test_path(), invocation)
log_path = os.path.join(get_factory_root('log'), log_name)
@@ -192,14 +194,15 @@
def std_repr(obj, extra=None, excluded_keys=None, true_only=False):
- '''
- Returns the representation of an object including its properties.
+ """Returns the representation of an object including its properties.
- @param extra: Extra items to include in the representation.
- @param excluded_keys: Keys not to include in the representation.
- @param true_only: Whether to include only values that evaluate to
- true.
- '''
+ Args:
+ obj: The object to get properties from.
+ extra: Extra items to include in the representation.
+ excluded_keys: Keys not to include in the representation.
+ true_only: Whether to include only values that evaluate to
+ true.
+ """
extra = extra or []
excluded_keys = excluded_keys or []
return (obj.__class__.__name__ + '('
@@ -213,19 +216,17 @@
def log(message):
- '''
- Logs a message to the console. Deprecated; use the 'console'
- property instead.
+ """Logs a message to the console.
+
+ Deprecated; use the 'console' property instead.
TODO(jsalz): Remove references throughout factory tests.
- '''
+ """
console.info(message)
def get_state_instance():
- '''
- Returns a cached factory state client instance.
- '''
+ """Returns a cached factory state client instance."""
# Delay loading modules to prevent circular dependency.
from cros.factory.test import state # pylint: disable=W0404
global _state_instance # pylint: disable=W0603
@@ -306,13 +307,13 @@
_inited_logging = False
def init_logging(prefix=None, verbose=False):
- '''
- Initializes logging.
+ """Initializes logging.
- @param prefix: A prefix to display for each log line, e.g., the program
- name.
- @param verbose: True for debug logging, false for info logging.
- '''
+ Args:
+ prefix: A prefix to display for each log line, e.g., the program
+ name.
+ verbose: True for debug logging, false for info logging.
+ """
global _inited_logging # pylint: disable=W0603
assert not _inited_logging, "May only call init_logging once"
_inited_logging = True
@@ -335,8 +336,7 @@
class Hooks(object):
- """
- Goofy hooks.
+ """Goofy hooks.
This class is a dummy implementation, but methods may be overridden
by the subclass.
@@ -607,8 +607,7 @@
class TestState(object):
- '''
- The complete state of a test.
+ """The complete state of a test.
Properties:
status: The status of the test (one of ACTIVE, PASSED,
@@ -621,7 +620,7 @@
iterations_left: For an active test, the number of remaining
iterations after the current one.
retries_left: Maximum number of retries allowed to pass the test.
- '''
+ """
ACTIVE = 'ACTIVE'
PASSED = 'PASSED'
FAILED = 'FAILED'
@@ -653,29 +652,30 @@
decrement_iterations_left=0, iterations_left=None,
decrement_retries_left=0, retries_left=None,
skip=None):
- '''
- Updates the state of a test.
+ """Updates the state of a test.
- @param status: The new status of the test.
- @param increment_count: An amount by which to increment count.
- @param error_msg: If non-None, the new error message for the test.
- @param shutdown_count: If non-None, the new shutdown count.
- @param increment_shutdown_count: An amount by which to increment
- shutdown_count.
- @param visible: If non-None, whether the test should become visible.
- @param invocation: The currently executing or last invocation, if any.
- @param iterations_left: If non-None, the new iterations_left.
- @param decrement_iterations_left: An amount by which to decrement
- iterations_left.
- @param retries_left: If non-None, the new retries_left.
- The case retries_left = -1 means the test had already used the first try
- and all the retries.
- @param decrement_retries_left: An amount by which to decrement
- retries_left.
- @param skip: Whether the test should be skipped.
+ Args:
+ status: The new status of the test.
+ increment_count: An amount by which to increment count.
+ error_msg: If non-None, the new error message for the test.
+ shutdown_count: If non-None, the new shutdown count.
+ increment_shutdown_count: An amount by which to increment
+ shutdown_count.
+ visible: If non-None, whether the test should become visible.
+ invocation: The currently executing or last invocation, if any.
+ iterations_left: If non-None, the new iterations_left.
+ decrement_iterations_left: An amount by which to decrement
+ iterations_left.
+ retries_left: If non-None, the new retries_left.
+ The case retries_left = -1 means the test had already used the first try
+ and all the retries.
+ decrement_retries_left: An amount by which to decrement
+ retries_left.
+ skip: Whether the test should be skipped.
- Returns True if anything was changed.
- '''
+ Returns:
+ True if anything was changed.
+ """
old_dict = dict(self.__dict__)
if status:
@@ -718,12 +718,11 @@
def overall_status(statuses):
- '''
- Returns the "overall status" given a list of statuses.
+ """Returns the "overall status" given a list of statuses.
This is the first element of [ACTIVE, FAILED, UNTESTED, PASSED]
(in that order) that is present in the status list.
- '''
+ """
status_set = set(statuses)
for status in [TestState.ACTIVE, TestState.FAILED,
TestState.UNTESTED, TestState.PASSED]:
@@ -735,33 +734,33 @@
class TestListError(Exception):
+ """Test list error."""
pass
class FactoryTestFailure(Exception):
- '''
- Failure of a factory test.
+ """Failure of a factory test.
Args:
message: The exception message.
status: The status to report for the failure (usually FAILED
but possibly UNTESTED).
- '''
+ """
def __init__(self, message=None, status=TestState.FAILED):
super(FactoryTestFailure, self).__init__(message)
self.status = status
class RequireRun(object):
- '''Requirement that a test has run (and optionally passed).'''
+ """Requirement that a test has run (and optionally passed)."""
def __init__(self, path, passed=True):
- '''Constructor.
+ """Constructor.
Args:
path: Path to the test that must have been run. "ALL" is
a valid value and refers to the root (all tests).
passed: Whether the test is required to have passed.
- '''
+ """
# '' is the key of the root and will resolve to the root node.
self.path = ('' if path == ALL else path)
self.passed = passed
@@ -771,8 +770,7 @@
class FactoryTest(object):
- '''
- A factory test object.
+ """A factory test object.
Factory tests are stored in a tree. Each node has an id (unique
among its siblings). Each node also has a path (unique throughout the
@@ -789,7 +787,7 @@
should be run.
implicit_id: Whether the ID was determined implicitly (i.e., not
explicitly specified in the test list).
- '''
+ """
# If True, the test never fails, but only returns to an untested state.
never_fails = False
@@ -813,6 +811,7 @@
def __init__(self,
label_en='',
label_zh='',
+ has_automator=False,
autotest_name=None,
pytest_name=None,
invocation_target=None,
@@ -835,15 +834,15 @@
finish=None,
_root=None,
_default_id=None):
- '''
- Constructor.
+ """Constructor.
See cros.factory.test.test_lists.FactoryTest for argument
documentation.
- '''
+ """
self.label_en = label_en
self.label_zh = (label_zh if isinstance(label_zh, unicode)
else label_zh.decode('utf-8'))
+ self.has_automator = has_automator
self.autotest_name = autotest_name
self.pytest_name = pytest_name
self.invocation_target = invocation_target
@@ -960,18 +959,18 @@
@staticmethod
def pytest_name_to_id(pytest_name):
- '''Converts a pytest name to an ID.
+ """Converts a pytest name to an ID.
Removes all but the rightmost dot-separated component, removes
underscores, and converts to CamelCase.
- '''
+ """
name = pytest_name.rpartition('.')[2]
return re.sub('(?:^|_)([a-z])',
lambda match: match.group(1).upper(),
name)
def to_struct(self):
- '''Returns the node as a struct suitable for JSONification.'''
+ """Returns the node as a struct suitable for JSONification."""
ret = dict(
(k, getattr(self, k))
for k in ['id', 'path', 'label_en', 'label_zh',
@@ -997,11 +996,10 @@
return '%s(%s)' % (self.__class__.__name__, ', '.join(attrs))
def _init(self, prefix, path_map):
- '''
- Recursively assigns paths to this node and its children.
+ """Recursively assigns paths to this node and its children.
Also adds this node to the root's path_map.
- '''
+ """
if self.parent:
self.root = self.parent.root
@@ -1015,69 +1013,56 @@
subtest._init((self.path + '.' if len(self.path) else ''), path_map)
def depth(self):
- '''
- Returns the depth of the node (0 for the root).
- '''
+ """Returns the depth of the node (0 for the root)."""
return self.path.count('.') + (self.parent is not None)
def is_leaf(self):
- '''
- Returns true if this is a leaf node.
- '''
+ """Returns true if this is a leaf node."""
return not self.subtests
def has_ancestor(self, other):
- '''
- Returns True if other is an ancestor of this test (or is that test
+ """Returns True if other is an ancestor of this test (or is that test
itself).
- '''
+ """
return (self == other) or (self.parent and self.parent.has_ancestor(other))
def get_ancestors(self):
- '''
- Returns list of ancestors, ordered by seniority.
- '''
+ """Returns list of ancestors, ordered by seniority."""
if self.parent is not None:
return self.parent.get_ancestors() + [self.parent]
return []
def get_ancestor_groups(self):
- '''
- Returns list of ancestors that are groups, ordered by seniority.
- '''
+ """Returns list of ancestors that are groups, ordered by seniority."""
return [node for node in self.get_ancestors() if node.is_group()]
def get_state(self):
- '''
- Returns the current test state from the state instance.
- '''
+ """Returns the current test state from the state instance."""
return TestState.from_dict_or_object(
self.root.state_instance.get_test_state(self.path))
- def update_state(self, update_parent=True, status=None, **kw):
- '''
- Updates the test state.
+ def update_state(self, update_parent=True, status=None, **kwargs):
+ """Updates the test state.
- See TestState.update for allowable kw arguments.
- '''
+ See TestState.update for allowable kwargs arguments.
+ """
if self.never_fails and status == TestState.FAILED:
status = TestState.UNTESTED
ret = TestState.from_dict_or_object(
self.root._update_test_state( # pylint: disable=W0212
- self.path, status=status, **kw))
+ self.path, status=status, **kwargs))
if update_parent and self.parent:
self.parent.update_status_from_children()
return ret
def update_status_from_children(self):
- '''
- Updates the status based on children's status.
+ """Updates the status based on children's status.
A test is active if any children are active; else failed if
any children are failed; else untested if any children are
untested; else passed.
- '''
+ """
if not self.subtests:
return
@@ -1088,11 +1073,11 @@
self.update_state(status=status)
def walk(self, in_order=False):
- '''
- Yields this test and each sub-test.
+ """Yields this test and each sub-test.
- @param in_order: Whether to walk in-order. If False, walks depth-first.
- '''
+ Args:
+ in_order: Whether to walk in-order. If False, walks depth-first.
+ """
if in_order:
# Walking in order - yield self first.
yield self
@@ -1104,19 +1089,16 @@
yield self
def is_group(self):
- '''
- Returns true if this node is a test group.
- '''
+ """Returns true if this node is a test group."""
return isinstance(self, TestGroup)
def is_top_level_test(self):
- '''
- Returns true if this node is a top-level test.
+ """Returns true if this node is a top-level test.
A 'top-level test' is a test directly underneath the root or a
TestGroup, e.g., a node under which all tests must be run
together to be meaningful.
- '''
+ """
return ((not self.is_group()) and
self.parent and
(self.parent == self.root or self.parent.is_group()))
@@ -1127,26 +1109,23 @@
return self.parent.get_top_level_parent_or_group()
def get_top_level_tests(self):
- '''
- Returns a list of top-level tests.
- '''
+ """Returns a list of top-level tests."""
return [node for node in self.walk() if node.is_top_level_test()]
def is_exclusive(self, option):
- '''
- Returns true if the test or any parent is exclusive w.r.t. option.
+ """Returns true if the test or any parent is exclusive w.r.t. option.
Args:
option: A member of EXCLUSIVE_OPTIONS.
- '''
+ """
assert option in self.EXCLUSIVE_OPTIONS
return option in self.exclusive or (
self.parent and self.parent.is_exclusive(option))
def as_dict(self, state_map=None):
- '''
- Returns this node and children in a dictionary suitable for YAMLification.
- '''
+ """Returns this node and children in a dictionary suitable for
+ YAMLification.
+ """
node = {'id': self.id or None, 'path': self.path or None}
if not self.subtests and state_map:
state = state_map[self.path]
@@ -1160,14 +1139,11 @@
return node
def as_yaml(self, state_map=None):
- '''
- Returns this node and children in YAML format.
- '''
+ """Returns this node and children in YAML format."""
return yaml.dump(self.as_dict(state_map))
def disable_by_run_if(self):
- """
- Overwrites properties related to run_if to disable a test.
+ """Overwrites properties related to run_if to disable a test.
Modifies run_if_expr, run_if_not, run_if_table_name so the run_if evaluation
will always skip the test.
@@ -1177,12 +1153,11 @@
self.run_if_table_name = None
def skip(self):
- '''
- Skips and passes this test and any subtests that have not passed yet.
+ """Skips this test and any subtests that have not already passed.
Subtests that have passed are not modified. If any subtests were
skipped, this node (if not a leaf node) is marked as skipped as well.
- '''
+ """
# Modifies run_if argument of this test so it will not be enabled again
# when its run_if is evaluated.
self.disable_by_run_if()
@@ -1201,14 +1176,13 @@
error_msg=TestState.SKIPPED_MSG)
class FactoryTestList(FactoryTest):
- '''
- The root node for factory tests.
+ """The root node for factory tests.
Properties:
path_map: A map from test paths to FactoryTest objects.
source_path: The path to the file in which the test list was defined,
if known. For new-style test lists only.
- '''
+ """
def __init__(self, subtests, state_instance, options,
test_list_id=None, label_en=None, finish_construction=True):
"""Constructor.
@@ -1280,15 +1254,11 @@
"explicitly specified IDs" % bad_implicit_ids)
def get_all_tests(self):
- '''
- Returns all FactoryTest objects.
- '''
+ """Returns all FactoryTest objects."""
return self.path_map.values()
def get_state_map(self):
- '''
- Returns a map of all FactoryTest objects to their TestStates.
- '''
+ """Returns a map of all FactoryTest objects to their TestStates."""
# The state instance may return a dict (for the XML/RPC proxy)
# or the TestState object itself. Convert accordingly.
return dict(
@@ -1296,19 +1266,16 @@
for k, v in self.state_instance.get_test_states().iteritems())
def lookup_path(self, path):
- '''
- Looks up a test from its path.
- '''
+ """Looks up a test from its path."""
return self.path_map.get(path, None)
- def _update_test_state(self, path, **kw):
- '''
- Updates a test state, invoking the state_change_callback if any.
+ def _update_test_state(self, path, **kwargs):
+ """Updates a test state, invoking the state_change_callback if any.
Internal-only; clients should call update_state directly on the
appropriate TestState object.
- '''
- ret, changed = self.state_instance.update_test_state(path, **kw)
+ """
+ ret, changed = self.state_instance.update_test_state(path, **kwargs)
if changed and self.state_change_callback:
self.state_change_callback( # pylint: disable=E1102
self.lookup_path(path), ret)
@@ -1316,17 +1283,19 @@
class TestGroup(FactoryTest):
- '''
- A collection of related tests, shown together in RHS panel if one is active.
- '''
+ """A collection of related tests, shown together in RHS panel if one is
+ active.
+ """
pass
class FactoryAutotestTest(FactoryTest):
+ """Autotest-based factory test."""
pass
class OperatorTest(FactoryAutotestTest):
+ """Factory test with UI to interact with operators."""
has_ui = True
@@ -1335,14 +1304,14 @@
class ShutdownStep(AutomatedSubTest):
- '''A shutdown (halt or reboot) step.
+ """A shutdown (halt or reboot) step.
Properties:
iterations: The number of times to reboot.
operation: The command to run to perform the shutdown
(REBOOT or HALT).
delay_secs: Number of seconds the operator has to abort the shutdown.
- '''
+ """
REBOOT = 'reboot'
HALT = 'halt'
@@ -1362,14 +1331,14 @@
class HaltStep(ShutdownStep):
- '''Halts the machine.'''
+ """Halts the machine."""
def __init__(self, **kw):
kw.setdefault('id', 'Halt')
super(HaltStep, self).__init__(operation=ShutdownStep.HALT, **kw)
class RebootStep(ShutdownStep):
- '''Reboots the machine.'''
+ """Reboots the machine."""
def __init__(self, **kw):
kw.setdefault('id', 'Reboot')
super(RebootStep, self).__init__(operation=ShutdownStep.REBOOT, **kw)