Support for EventLogWatcher in goofy.
BUG=None
TEST=net_utils_unittest.py, Manual
Change-Id: Ied7ed8987a901807b87bf8762edb9c122f2f06a3
Reviewed-on: https://gerrit.chromium.org/gerrit/26657
Commit-Ready: Jon Salz <jsalz@chromium.org>
Reviewed-by: Jon Salz <jsalz@chromium.org>
Tested-by: Jon Salz <jsalz@chromium.org>
diff --git a/py/goofy/event_log_watcher.py b/py/goofy/event_log_watcher.py
index 5dfad40..aac5936 100644
--- a/py/goofy/event_log_watcher.py
+++ b/py/goofy/event_log_watcher.py
@@ -7,8 +7,10 @@
import logging
import os
import shelve
+import sys
import threading
import time
+import traceback
from cros.factory.test import factory
from cros.factory import event_log
@@ -49,10 +51,23 @@
def StartWatchThread(self):
'''Starts a thread to watch event logs.'''
- logging.info('Start watching...')
- self._watch_thread = threading.Thread(target=self.WatchForever)
+ logging.info('Watching event logs...')
+ self._watch_thread = threading.Thread(target=self.WatchForever,
+ name='EventLogWatcher')
self._watch_thread.start()
+ def IsThreadStarted(self):
+ '''Returns True if the thread is currently running.'''
+ return self._watch_thread is not None
+
+ def IsScanning(self):
+ '''Returns True if currently scanning (i.e., the lock is held).'''
+ if self._scan_lock.acquire():
+ self._scan_lock.release()
+ return False
+ else:
+ return True
+
def FlushEventLogs(self):
'''Flushes event logs.
@@ -71,6 +86,11 @@
Raise:
ScanException: if at least one ScanEventLog call throws exception.
'''
+ if not os.path.exists(self._event_log_dir):
+ logging.warn("Event log directory %s does not exist yet",
+ self._event_log_dir)
+ return
+
exceptions = []
for file_name in os.listdir(self._event_log_dir):
file_path = os.path.join(self._event_log_dir, file_name)
@@ -79,7 +99,16 @@
try:
self.ScanEventLog(file_name)
except Exception, e:
- logging.exception('Error scanning event log %s', file_name)
+ if suppress_error:
+ # We're suppressing errors, so just log one line at INFO
+ # level, not a whole traceback.
+ logging.info(
+ 'Error handling event log %s: %s', file_name,
+ ''.join(traceback.format_exception_only(
+ *sys.exc_info()[:2])).strip())
+ else:
+ logging.exception(
+ 'Error handling event log %s', file_name)
exceptions.append(e)
self._db.sync()
diff --git a/py/goofy/goofy.py b/py/goofy/goofy.py
index cbdcb98..b69f1b6 100755
--- a/py/goofy/goofy.py
+++ b/py/goofy/goofy.py
@@ -38,6 +38,9 @@
from cros.factory.test import state
from cros.factory.test.factory import TestState
from cros.factory.goofy import updater
+from cros.factory.goofy import test_steps
+from cros.factory.goofy.event_log_watcher import EventLogWatcher
+from cros.factory.test import shopfloor
from cros.factory.test import utils
from cros.factory.test.event import Event
from cros.factory.test.event import EventClient
@@ -155,6 +158,7 @@
self.event_server_thread = None
self.event_client = None
self.connection_manager = None
+ self.log_watcher = None
self.network_enabled = True
self.event_log = None
self.prespawner = None
@@ -237,6 +241,10 @@
self.event_server_thread.join()
self.event_server.server_close()
self.event_server_thread = None
+ if self.log_watcher:
+ if self.log_watcher.IsThreadStarted():
+ self.log_watcher.StopWatchThread()
+ self.log_watcher = None
if self.prespawner:
logging.info('Stopping prespawner')
self.prespawner.stop()
@@ -747,7 +755,9 @@
state.clear_state()
if self.options.print_test_list:
- print (factory.read_test_list(self.options.print_test_list).
+ print (factory.read_test_list(
+ self.options.print_test_list,
+ test_classes=dict(test_steps.__dict__)).
__repr__(recursive=True))
return
@@ -773,8 +783,10 @@
sys.exit(1)
logging.info('Using test list %s', self.options.test_list)
- self.test_list = factory.read_test_list(self.options.test_list,
- self.state_instance)
+ self.test_list = factory.read_test_list(
+ self.options.test_list,
+ self.state_instance,
+ test_classes=dict(test_steps.__dict__))
if not self.state_instance.has_shared_data('ui_lang'):
self.state_instance.set_shared_data('ui_lang',
self.test_list.options.ui_lang)
@@ -787,6 +799,14 @@
self.start_event_server()
self.connection_manager = self.env.create_connection_manager(
self.test_list.options.wlans)
+ # Note that we create a log watcher even if
+ # sync_event_log_period_secs isn't set (no background
+ # syncing), since we may use it to flush event logs as well.
+ self.log_watcher = EventLogWatcher(
+ self.test_list.options.sync_event_log_period_secs,
+ handle_event_logs_callback=self._handle_event_logs)
+ if self.test_list.options.sync_event_log_period_secs:
+ self.log_watcher.StartWatchThread()
self.update_system_info()
@@ -896,6 +916,22 @@
'''Invoked when the run queue has no events.'''
self.check_connection_manager()
+ def _handle_event_logs(self, log_name, chunk):
+ '''Callback for event watcher.
+
+ Attempts to upload the event logs to the shopfloor server.
+ '''
+ description = 'event logs (%s, %d bytes)' % (log_name, len(chunk))
+ start_time = time.time()
+ logging.info('Syncing %s', description)
+ shopfloor_client = shopfloor.get_instance(
+ detect=True,
+ timeout=self.test_list.options.shopfloor_timeout_secs)
+ shopfloor_client.UploadEvent(log_name, chunk)
+ logging.info(
+ 'Successfully synced %s in %.03f s',
+ description, time.time() - start_time)
+
def run_tests_with_status(self, statuses_to_run, starting_at=None,
root=None):
'''Runs all top-level tests with a particular status.
diff --git a/py/goofy/invocation.py b/py/goofy/invocation.py
index 9f3f4d0..2c28192 100755
--- a/py/goofy/invocation.py
+++ b/py/goofy/invocation.py
@@ -11,6 +11,7 @@
import pipes
import re
import subprocess
+import sys
import tempfile
import threading
import time
@@ -281,6 +282,18 @@
logging.exception('Unable to delete temporary file %s',
f)
+ def _invoke_target(self):
+ '''
+ Invokes a target directly within Goofy.
+ '''
+ try:
+ self.test.invocation_target(self)
+ return TestState.PASSED, ''
+ except:
+ logging.exception('Exception while invoking target')
+ error_msg = traceback.format_exc()
+ return TestState.FAILED, traceback.format_exc()
+
def clean_autotest_logs(self, output_dir):
globs = self.goofy.test_list.options.preserve_autotest_results
if '*' in globs:
@@ -340,9 +353,12 @@
status, error_msg = self._invoke_autotest()
elif self.test.pytest_name:
status, error_msg = self._invoke_pytest()
+ elif self.test.invocation_target:
+ status, error_msg = self._invoke_target()
else:
status = TestState.FAILED
- error_msg = 'No autotest_name or pytest_name'
+ error_msg = (
+ 'No autotest_name, pytest_name, or invocation_target')
finally:
try:
self.goofy.event_client.post_event(
diff --git a/py/goofy/test_steps.py b/py/goofy/test_steps.py
new file mode 100644
index 0000000..13fe995
--- /dev/null
+++ b/py/goofy/test_steps.py
@@ -0,0 +1,30 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+'''
+Test steps run directly within Goofy.
+'''
+
+import logging
+
+import factory_common
+from cros.factory.test import factory
+from cros.factory.test.factory import FactoryTest
+
+
+class FlushEventLogsStep(FactoryTest):
+ '''Synchronizes event logs.'''
+ def __init__(self, **kw):
+ super(FlushEventLogsStep, self).__init__(invocation_target=self._Run,
+ _default_id='FlushEventLogs')
+
+ def _Run(self, invocation):
+ log_watcher = invocation.goofy.log_watcher
+ # Display a message on the console if we're going to need to wait
+ if log_watcher.IsScanning():
+ factory.console.info('Waiting for current scan to finish...')
+ factory.console.info('Flushing event logs...')
+ log_watcher.FlushEventLogs()
+ factory.console.info('Flushed event logs.')
diff --git a/py/test/factory.py b/py/test/factory.py
index 54f9771..e505030 100644
--- a/py/test/factory.py
+++ b/py/test/factory.py
@@ -190,15 +190,18 @@
return get_state_instance().del_shared_data(key)
-def read_test_list(path=None, state_instance=None, text=None):
+def read_test_list(path=None, state_instance=None, text=None,
+ test_classes={}):
if len([x for x in [path, text] if x]) != 1:
raise TestListError('Exactly one of path and text must be set')
test_list_locals = {}
+
# Import test classes into the evaluation namespace
- for (k, v) in dict(globals()).iteritems():
- if type(v) == type and issubclass(v, FactoryTest):
- test_list_locals[k] = v
+ for d in dict(globals()), test_classes:
+ for (k, v) in d.iteritems():
+ if type(v) == type and issubclass(v, FactoryTest):
+ test_list_locals[k] = v
# Import WLAN into the evaluation namespace, since it is used
# to construct the wlans option.
@@ -283,9 +286,17 @@
engineering_password_sha1 = None
_types['engineering_password_sha1'] = (type(None), str)
- # WLANs that network_manager may connect to.
+ # WLANs that the connection manager may connect to.
wlans = []
+ # Automatically send events to the shopfloor server when
+ # it is reachable.
+ sync_event_log_period_secs = None
+ _types['sync_event_log_period_secs'] = (type(None), int)
+
+ # Timeout talking to shopfloor server for background operations.
+ shopfloor_timeout_secs = 10
+
def check_valid(self):
'''Throws a TestListError if there are any invalid options.'''
# Make sure no errant options, or options with weird types,
@@ -412,6 +423,7 @@
label_zh='',
autotest_name=None,
pytest_name=None,
+ invocation_target=None,
kbd_shortcut=None,
dargs=None,
backgroundable=False,
@@ -420,7 +432,8 @@
has_ui=None,
never_fails=None,
exclusive=None,
- _root=None):
+ _root=None,
+ _default_id=None):
'''
Constructor.
@@ -429,6 +442,8 @@
@param autotest_name: The name of the autotest to run.
@param pytest_name: The name of the pytest to run (relative to
autotest_lib.client.cros.factory.tests).
+ @param invocation_target: The function to execute to run the test
+ (within the Goofy process).
@param kbd_shortcut: The keyboard shortcut for the test.
@param dargs: Autotest arguments.
@param backgroundable: Whether the test may run in the background.
@@ -443,6 +458,7 @@
@param exclusive: Items that the test may require exclusive access to.
May be a list or a single string. Items must all be in
EXCLUSIVE_OPTIONS. Tests may not be backgroundable.
+ @param _default_id: A default ID to use if no ID is specified.
@param _root: True only if this is the root node (for internal use
only).
'''
@@ -450,6 +466,7 @@
self.label_zh = label_zh
self.autotest_name = autotest_name
self.pytest_name = pytest_name
+ self.invocation_target = invocation_target
self.kbd_shortcut = kbd_shortcut.lower() if kbd_shortcut else None
self.dargs = dargs or {}
self.backgroundable = backgroundable
@@ -462,19 +479,29 @@
self.parent = None
self.root = None
- assert not (autotest_name and pytest_name), (
- 'No more than one of autotest_name, pytest_name must be specified')
-
if _root:
self.id = None
else:
- self.id = id or autotest_name or pytest_name.rpartition('.')[2]
+ if id:
+ self.id = id
+ elif autotest_name:
+ self.id = autotest_name
+ elif pytest_name:
+ self.id = pytest_name.rpartition('.')[2]
+ else:
+ self.id = _default_id
+
assert self.id, (
- 'Tests require either an id or autotest name: %r' % self)
+ 'id not specified for test: %r' % self)
assert '.' not in self.id, (
'id cannot contain a period: %r' % self)
# Note that we check ID uniqueness in _init.
+ assert len(filter(None, [autotest_name, pytest_name,
+ invocation_target, subtests])) <= 1, (
+ 'No more than one of autotest_name, pytest_name, '
+ 'invocation_target, and subtests must be specified')
+
if has_ui is not None:
self.has_ui = has_ui
if never_fails is not None:
@@ -498,9 +525,6 @@
bogus_exclusive_items,
self.EXCLUSIVE_OPTIONS))
- assert not ((autotest_name or pytest_name) and self.subtests), (
- 'Test %s may not have both an autotest and subtests' % self.id)
-
def to_struct(self):
'''Returns the node as a struct suitable for JSONification.'''
ret = dict(
diff --git a/py/test/shopfloor.py b/py/test/shopfloor.py
index b7ea121..db85697 100644
--- a/py/test/shopfloor.py
+++ b/py/test/shopfloor.py
@@ -27,6 +27,7 @@
from xmlrpclib import Binary, Fault
import factory_common
+from cros.factory.utils import net_utils
from cros.factory.test import factory
@@ -153,13 +154,14 @@
return None
-def get_instance(url=None, detect=False):
+def get_instance(url=None, detect=False, timeout=None):
"""Gets an instance (for client side) to access the shop floor server.
@param url: URL of the shop floor server. If None, use the value in
factory shared data.
@param detect: If True, attempt to detect the server URL if none is
specified.
+ @param timeout: If not None, the timeout in seconds.
@return An object with all public functions from shopfloor.ShopFloorBase.
"""
if not url:
@@ -168,7 +170,8 @@
url = detect_default_server_url()
if not url:
raise Exception("Shop floor server URL is NOT configured.")
- return xmlrpclib.ServerProxy(url, allow_none=True, verbose=False)
+ return net_utils.TimeoutXMLRPCServerProxy(
+ url, allow_none=True, verbose=False, timeout=timeout)
@_server_api
diff --git a/py/utils/__init__.py b/py/utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/py/utils/__init__.py
diff --git a/py/utils/factory_common.py b/py/utils/factory_common.py
new file mode 120000
index 0000000..c1b8bb4
--- /dev/null
+++ b/py/utils/factory_common.py
@@ -0,0 +1 @@
+../factory_common.py
\ No newline at end of file
diff --git a/py/utils/net_utils.py b/py/utils/net_utils.py
new file mode 100644
index 0000000..ec49398
--- /dev/null
+++ b/py/utils/net_utils.py
@@ -0,0 +1,41 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Networking-related utilities."""
+
+import httplib
+import xmlrpclib
+
+
+DEFAULT_TIMEOUT = 10
+
+
+class TimeoutHTTPConnection(httplib.HTTPConnection):
+ def connect(self):
+ httplib.HTTPConnection.connect(self)
+ self.sock.settimeout(self.timeout)
+
+class TimeoutHTTP(httplib.HTTP):
+ _connection_class = TimeoutHTTPConnection
+ def set_timeout(self, timeout):
+ self._conn.timeout = timeout
+
+class TimeoutXMLRPCTransport(xmlrpclib.Transport):
+ '''Transport subclass supporting timeout.'''
+ def __init__(self, timeout=DEFAULT_TIMEOUT, *args, **kwargs):
+ xmlrpclib.Transport.__init__(self, *args, **kwargs)
+ self.timeout = timeout
+
+ def make_connection(self, host):
+ conn = TimeoutHTTP(host)
+ conn.set_timeout(self.timeout)
+ return conn
+
+class TimeoutXMLRPCServerProxy(xmlrpclib.ServerProxy):
+ '''XML/RPC ServerProxy supporting timeout.'''
+ def __init__(self, uri, timeout=10, *args, **kwargs):
+ if timeout:
+ kwargs['transport'] = TimeoutXMLRPCTransport(
+ timeout=timeout)
+ xmlrpclib.ServerProxy.__init__(self, uri, *args, **kwargs)
diff --git a/py/utils/net_utils_unittest.py b/py/utils/net_utils_unittest.py
new file mode 100644
index 0000000..460d3d1
--- /dev/null
+++ b/py/utils/net_utils_unittest.py
@@ -0,0 +1,56 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Networking-related utilities."""
+
+import random
+import SimpleXMLRPCServer
+import socket
+import threading
+import time
+import unittest
+
+import factory_common
+from cros.factory.utils import net_utils
+from cros.factory.utils import test_utils
+
+
+class TimeoutXMLRPCTest(unittest.TestCase):
+ def setUp(self):
+ self.port = test_utils.FindUnusedTCPPort()
+ self.server = SimpleXMLRPCServer.SimpleXMLRPCServer(
+ ('localhost', self.port),
+ allow_none=True)
+ self.server.register_function(time.sleep)
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ self.thread.daemon = True
+ self.thread.start()
+
+ def tearDown(self):
+ self.server.shutdown()
+
+ def MakeProxy(self, timeout):
+ return net_utils.TimeoutXMLRPCServerProxy(
+ 'http://localhost:%d' % self.port, timeout=timeout, allow_none=True)
+
+ def runTest(self):
+ self.client = self.MakeProxy(timeout=1)
+
+ start = time.time()
+ self.client.sleep(.001) # No timeout
+ delta = time.time() - start
+ self.assertTrue(delta < 1, delta)
+
+ start = time.time()
+ try:
+ self.client.sleep(2) # Cause a timeout in 1 s
+ self.fail('Expected exception')
+ except socket.timeout:
+ # Good!
+ delta = time.time() - start
+ self.assertTrue(delta > .25, delta)
+ self.assertTrue(delta < 2, delta)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/py/utils/test_utils.py b/py/utils/test_utils.py
new file mode 100644
index 0000000..1593d77
--- /dev/null
+++ b/py/utils/test_utils.py
@@ -0,0 +1,16 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+'''Test-related utilities...'''
+
+
+import random
+
+
+def FindUnusedTCPPort():
+ '''Returns an unused TCP port for testing.
+
+ Currently just returns a random port from [10000,20000).
+ '''
+ return random.randint(10000, 19999)