Support for EventLogWatcher in goofy.

BUG=None
TEST=net_utils_unittest.py, Manual

Change-Id: Ied7ed8987a901807b87bf8762edb9c122f2f06a3
Reviewed-on: https://gerrit.chromium.org/gerrit/26657
Commit-Ready: Jon Salz <jsalz@chromium.org>
Reviewed-by: Jon Salz <jsalz@chromium.org>
Tested-by: Jon Salz <jsalz@chromium.org>
diff --git a/py/goofy/event_log_watcher.py b/py/goofy/event_log_watcher.py
index 5dfad40..aac5936 100644
--- a/py/goofy/event_log_watcher.py
+++ b/py/goofy/event_log_watcher.py
@@ -7,8 +7,10 @@
 import logging
 import os
 import shelve
+import sys
 import threading
 import time
+import traceback
 
 from cros.factory.test import factory
 from cros.factory import event_log
@@ -49,10 +51,23 @@
 
   def StartWatchThread(self):
     '''Starts a thread to watch event logs.'''
-    logging.info('Start watching...')
-    self._watch_thread = threading.Thread(target=self.WatchForever)
+    logging.info('Watching event logs...')
+    self._watch_thread = threading.Thread(target=self.WatchForever,
+                                          name='EventLogWatcher')
     self._watch_thread.start()
 
+  def IsThreadStarted(self):
+    '''Returns True if the thread is currently running.'''
+    return self._watch_thread is not None
+
+  def IsScanning(self):
+    '''Returns True if currently scanning (i.e., the lock is held).'''
+    if self._scan_lock.acquire():
+      self._scan_lock.release()
+      return False
+    else:
+      return True
+
   def FlushEventLogs(self):
     '''Flushes event logs.
 
@@ -71,6 +86,11 @@
     Raise:
       ScanException: if at least one ScanEventLog call throws exception.
     '''
+    if not os.path.exists(self._event_log_dir):
+      logging.warn("Event log directory %s does not exist yet",
+                   self._event_log_dir)
+      return
+
     exceptions = []
     for file_name in os.listdir(self._event_log_dir):
       file_path = os.path.join(self._event_log_dir, file_name)
@@ -79,7 +99,16 @@
         try:
           self.ScanEventLog(file_name)
         except Exception, e:
-          logging.exception('Error scanning event log %s', file_name)
+          if suppress_error:
+            # We're suppressing errors, so just log one line at INFO
+            # level, not a whole traceback.
+            logging.info(
+              'Error handling event log %s: %s', file_name,
+              ''.join(traceback.format_exception_only(
+                  *sys.exc_info()[:2])).strip())
+          else:
+            logging.exception(
+              'Error handling event log %s', file_name)
           exceptions.append(e)
 
     self._db.sync()
diff --git a/py/goofy/goofy.py b/py/goofy/goofy.py
index cbdcb98..b69f1b6 100755
--- a/py/goofy/goofy.py
+++ b/py/goofy/goofy.py
@@ -38,6 +38,9 @@
 from cros.factory.test import state
 from cros.factory.test.factory import TestState
 from cros.factory.goofy import updater
+from cros.factory.goofy import test_steps
+from cros.factory.goofy.event_log_watcher import EventLogWatcher
+from cros.factory.test import shopfloor
 from cros.factory.test import utils
 from cros.factory.test.event import Event
 from cros.factory.test.event import EventClient
@@ -155,6 +158,7 @@
         self.event_server_thread = None
         self.event_client = None
         self.connection_manager = None
+        self.log_watcher = None
         self.network_enabled = True
         self.event_log = None
         self.prespawner = None
@@ -237,6 +241,10 @@
             self.event_server_thread.join()
             self.event_server.server_close()
             self.event_server_thread = None
+        if self.log_watcher:
+            if self.log_watcher.IsThreadStarted():
+                self.log_watcher.StopWatchThread()
+            self.log_watcher = None
         if self.prespawner:
             logging.info('Stopping prespawner')
             self.prespawner.stop()
@@ -747,7 +755,9 @@
             state.clear_state()
 
         if self.options.print_test_list:
-            print (factory.read_test_list(self.options.print_test_list).
+            print (factory.read_test_list(
+                    self.options.print_test_list,
+                    test_classes=dict(test_steps.__dict__)).
                    __repr__(recursive=True))
             return
 
@@ -773,8 +783,10 @@
                 sys.exit(1)
             logging.info('Using test list %s', self.options.test_list)
 
-        self.test_list = factory.read_test_list(self.options.test_list,
-                                                self.state_instance)
+        self.test_list = factory.read_test_list(
+            self.options.test_list,
+            self.state_instance,
+            test_classes=dict(test_steps.__dict__))
         if not self.state_instance.has_shared_data('ui_lang'):
             self.state_instance.set_shared_data('ui_lang',
                                                 self.test_list.options.ui_lang)
@@ -787,6 +799,14 @@
         self.start_event_server()
         self.connection_manager = self.env.create_connection_manager(
             self.test_list.options.wlans)
+        # Note that we create a log watcher even if
+        # sync_event_log_period_secs isn't set (no background
+        # syncing), since we may use it to flush event logs as well.
+        self.log_watcher = EventLogWatcher(
+            self.test_list.options.sync_event_log_period_secs,
+            handle_event_logs_callback=self._handle_event_logs)
+        if self.test_list.options.sync_event_log_period_secs:
+            self.log_watcher.StartWatchThread()
 
         self.update_system_info()
 
@@ -896,6 +916,22 @@
         '''Invoked when the run queue has no events.'''
         self.check_connection_manager()
 
+    def _handle_event_logs(self, log_name, chunk):
+        '''Callback for event watcher.
+
+        Attempts to upload the event logs to the shopfloor server.
+        '''
+        description = 'event logs (%s, %d bytes)' % (log_name, len(chunk))
+        start_time = time.time()
+        logging.info('Syncing %s', description)
+        shopfloor_client = shopfloor.get_instance(
+            detect=True,
+            timeout=self.test_list.options.shopfloor_timeout_secs)
+        shopfloor_client.UploadEvent(log_name, chunk)
+        logging.info(
+            'Successfully synced %s in %.03f s',
+            description, time.time() - start_time)
+
     def run_tests_with_status(self, statuses_to_run, starting_at=None,
         root=None):
         '''Runs all top-level tests with a particular status.
diff --git a/py/goofy/invocation.py b/py/goofy/invocation.py
index 9f3f4d0..2c28192 100755
--- a/py/goofy/invocation.py
+++ b/py/goofy/invocation.py
@@ -11,6 +11,7 @@
 import pipes
 import re
 import subprocess
+import sys
 import tempfile
 import threading
 import time
@@ -281,6 +282,18 @@
                     logging.exception('Unable to delete temporary file %s',
                                       f)
 
+    def _invoke_target(self):
+        '''
+        Invokes a target directly within Goofy.
+        '''
+        try:
+            self.test.invocation_target(self)
+            return TestState.PASSED, ''
+        except:
+            logging.exception('Exception while invoking target')
+            error_msg = traceback.format_exc()
+            return TestState.FAILED, traceback.format_exc()
+
     def clean_autotest_logs(self, output_dir):
         globs = self.goofy.test_list.options.preserve_autotest_results
         if '*' in globs:
@@ -340,9 +353,12 @@
                 status, error_msg = self._invoke_autotest()
             elif self.test.pytest_name:
                 status, error_msg = self._invoke_pytest()
+            elif self.test.invocation_target:
+                status, error_msg = self._invoke_target()
             else:
                 status = TestState.FAILED
-                error_msg = 'No autotest_name or pytest_name'
+                error_msg = (
+                    'No autotest_name, pytest_name, or invocation_target')
         finally:
             try:
                 self.goofy.event_client.post_event(
diff --git a/py/goofy/test_steps.py b/py/goofy/test_steps.py
new file mode 100644
index 0000000..13fe995
--- /dev/null
+++ b/py/goofy/test_steps.py
@@ -0,0 +1,30 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+'''
+Test steps run directly within Goofy.
+'''
+
+import logging
+
+import factory_common
+from cros.factory.test import factory
+from cros.factory.test.factory import FactoryTest
+
+
+class FlushEventLogsStep(FactoryTest):
+    '''Synchronizes event logs.'''
+    def __init__(self, **kw):
+        super(FlushEventLogsStep, self).__init__(invocation_target=self._Run,
+                                                 _default_id='FlushEventLogs')
+
+    def _Run(self, invocation):
+        log_watcher = invocation.goofy.log_watcher
+        # Display a message on the console if we're going to need to wait
+        if log_watcher.IsScanning():
+            factory.console.info('Waiting for current scan to finish...')
+        factory.console.info('Flushing event logs...')
+        log_watcher.FlushEventLogs()
+        factory.console.info('Flushed event logs.')
diff --git a/py/test/factory.py b/py/test/factory.py
index 54f9771..e505030 100644
--- a/py/test/factory.py
+++ b/py/test/factory.py
@@ -190,15 +190,18 @@
     return get_state_instance().del_shared_data(key)
 
 
-def read_test_list(path=None, state_instance=None, text=None):
+def read_test_list(path=None, state_instance=None, text=None,
+                   test_classes={}):
     if len([x for x in [path, text] if x]) != 1:
         raise TestListError('Exactly one of path and text must be set')
 
     test_list_locals = {}
+
     # Import test classes into the evaluation namespace
-    for (k, v) in dict(globals()).iteritems():
-        if type(v) == type and issubclass(v, FactoryTest):
-            test_list_locals[k] = v
+    for d in dict(globals()), test_classes:
+        for (k, v) in d.iteritems():
+            if type(v) == type and issubclass(v, FactoryTest):
+                test_list_locals[k] = v
 
     # Import WLAN into the evaluation namespace, since it is used
     # to construct the wlans option.
@@ -283,9 +286,17 @@
     engineering_password_sha1 = None
     _types['engineering_password_sha1'] = (type(None), str)
 
-    # WLANs that network_manager may connect to.
+    # WLANs that the connection manager may connect to.
     wlans = []
 
+    # Automatically send events to the shopfloor server when
+    # it is reachable.
+    sync_event_log_period_secs = None
+    _types['sync_event_log_period_secs'] = (type(None), int)
+
+    # Timeout talking to shopfloor server for background operations.
+    shopfloor_timeout_secs = 10
+
     def check_valid(self):
         '''Throws a TestListError if there are any invalid options.'''
         # Make sure no errant options, or options with weird types,
@@ -412,6 +423,7 @@
                  label_zh='',
                  autotest_name=None,
                  pytest_name=None,
+                 invocation_target=None,
                  kbd_shortcut=None,
                  dargs=None,
                  backgroundable=False,
@@ -420,7 +432,8 @@
                  has_ui=None,
                  never_fails=None,
                  exclusive=None,
-                 _root=None):
+                 _root=None,
+                 _default_id=None):
         '''
         Constructor.
 
@@ -429,6 +442,8 @@
         @param autotest_name: The name of the autotest to run.
         @param pytest_name: The name of the pytest to run (relative to
             autotest_lib.client.cros.factory.tests).
+        @param invocation_target: The function to execute to run the test
+            (within the Goofy process).
         @param kbd_shortcut: The keyboard shortcut for the test.
         @param dargs: Autotest arguments.
         @param backgroundable: Whether the test may run in the background.
@@ -443,6 +458,7 @@
         @param exclusive: Items that the test may require exclusive access to.
             May be a list or a single string.  Items must all be in
             EXCLUSIVE_OPTIONS.  Tests may not be backgroundable.
+        @param _default_id: A default ID to use if no ID is specified.
         @param _root: True only if this is the root node (for internal use
             only).
         '''
@@ -450,6 +466,7 @@
         self.label_zh = label_zh
         self.autotest_name = autotest_name
         self.pytest_name = pytest_name
+        self.invocation_target = invocation_target
         self.kbd_shortcut = kbd_shortcut.lower() if kbd_shortcut else None
         self.dargs = dargs or {}
         self.backgroundable = backgroundable
@@ -462,19 +479,29 @@
         self.parent = None
         self.root = None
 
-        assert not (autotest_name and pytest_name), (
-            'No more than one of autotest_name, pytest_name must be specified')
-
         if _root:
             self.id = None
         else:
-            self.id = id or autotest_name or pytest_name.rpartition('.')[2]
+            if id:
+                self.id = id
+            elif autotest_name:
+                self.id = autotest_name
+            elif pytest_name:
+                self.id = pytest_name.rpartition('.')[2]
+            else:
+                self.id = _default_id
+
             assert self.id, (
-                'Tests require either an id or autotest name: %r' % self)
+                'id not specified for test: %r' % self)
             assert '.' not in self.id, (
                 'id cannot contain a period: %r' % self)
             # Note that we check ID uniqueness in _init.
 
+        assert len(filter(None, [autotest_name, pytest_name,
+                                 invocation_target, subtests])) <= 1, (
+            'No more than one of autotest_name, pytest_name, '
+            'invocation_target, and subtests must be specified')
+
         if has_ui is not None:
             self.has_ui = has_ui
         if never_fails is not None:
@@ -498,9 +525,6 @@
                 bogus_exclusive_items,
                 self.EXCLUSIVE_OPTIONS))
 
-        assert not ((autotest_name or pytest_name) and self.subtests), (
-            'Test %s may not have both an autotest and subtests' % self.id)
-
     def to_struct(self):
         '''Returns the node as a struct suitable for JSONification.'''
         ret = dict(
diff --git a/py/test/shopfloor.py b/py/test/shopfloor.py
index b7ea121..db85697 100644
--- a/py/test/shopfloor.py
+++ b/py/test/shopfloor.py
@@ -27,6 +27,7 @@
 from xmlrpclib import Binary, Fault
 
 import factory_common
+from cros.factory.utils import net_utils
 from cros.factory.test import factory
 
 
@@ -153,13 +154,14 @@
     return None
 
 
-def get_instance(url=None, detect=False):
+def get_instance(url=None, detect=False, timeout=None):
     """Gets an instance (for client side) to access the shop floor server.
 
     @param url: URL of the shop floor server. If None, use the value in
             factory shared data.
     @param detect: If True, attempt to detect the server URL if none is
         specified.
+    @param timeout: If not None, the timeout in seconds.
     @return An object with all public functions from shopfloor.ShopFloorBase.
     """
     if not url:
@@ -168,7 +170,8 @@
         url = detect_default_server_url()
     if not url:
         raise Exception("Shop floor server URL is NOT configured.")
-    return xmlrpclib.ServerProxy(url, allow_none=True, verbose=False)
+    return net_utils.TimeoutXMLRPCServerProxy(
+        url, allow_none=True, verbose=False, timeout=timeout)
 
 
 @_server_api
diff --git a/py/utils/__init__.py b/py/utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/py/utils/__init__.py
diff --git a/py/utils/factory_common.py b/py/utils/factory_common.py
new file mode 120000
index 0000000..c1b8bb4
--- /dev/null
+++ b/py/utils/factory_common.py
@@ -0,0 +1 @@
+../factory_common.py
\ No newline at end of file
diff --git a/py/utils/net_utils.py b/py/utils/net_utils.py
new file mode 100644
index 0000000..ec49398
--- /dev/null
+++ b/py/utils/net_utils.py
@@ -0,0 +1,41 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Networking-related utilities."""
+
+import httplib
+import xmlrpclib
+
+
+DEFAULT_TIMEOUT = 10
+
+
+class TimeoutHTTPConnection(httplib.HTTPConnection):
+    def connect(self):
+        httplib.HTTPConnection.connect(self)
+        self.sock.settimeout(self.timeout)
+
+class TimeoutHTTP(httplib.HTTP):
+    _connection_class = TimeoutHTTPConnection
+    def set_timeout(self, timeout):
+        self._conn.timeout = timeout
+
+class TimeoutXMLRPCTransport(xmlrpclib.Transport):
+    '''Transport subclass supporting timeout.'''
+    def __init__(self, timeout=DEFAULT_TIMEOUT, *args, **kwargs):
+        xmlrpclib.Transport.__init__(self, *args, **kwargs)
+        self.timeout = timeout
+
+    def make_connection(self, host):
+        conn = TimeoutHTTP(host)
+        conn.set_timeout(self.timeout)
+        return conn
+
+class TimeoutXMLRPCServerProxy(xmlrpclib.ServerProxy):
+    '''XML/RPC ServerProxy supporting timeout.'''
+    def __init__(self, uri, timeout=10, *args, **kwargs):
+        if timeout:
+            kwargs['transport'] = TimeoutXMLRPCTransport(
+                timeout=timeout)
+        xmlrpclib.ServerProxy.__init__(self, uri, *args, **kwargs)
diff --git a/py/utils/net_utils_unittest.py b/py/utils/net_utils_unittest.py
new file mode 100644
index 0000000..460d3d1
--- /dev/null
+++ b/py/utils/net_utils_unittest.py
@@ -0,0 +1,56 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Networking-related utilities."""
+
+import random
+import SimpleXMLRPCServer
+import socket
+import threading
+import time
+import unittest
+
+import factory_common
+from cros.factory.utils import net_utils
+from cros.factory.utils import test_utils
+
+
+class TimeoutXMLRPCTest(unittest.TestCase):
+    def setUp(self):
+        self.port = test_utils.FindUnusedTCPPort()
+        self.server = SimpleXMLRPCServer.SimpleXMLRPCServer(
+            ('localhost', self.port),
+            allow_none=True)
+        self.server.register_function(time.sleep)
+        self.thread = threading.Thread(target=self.server.serve_forever)
+        self.thread.daemon = True
+        self.thread.start()
+
+    def tearDown(self):
+        self.server.shutdown()
+
+    def MakeProxy(self, timeout):
+        return net_utils.TimeoutXMLRPCServerProxy(
+            'http://localhost:%d' % self.port, timeout=timeout, allow_none=True)
+
+    def runTest(self):
+        self.client = self.MakeProxy(timeout=1)
+
+        start = time.time()
+        self.client.sleep(.001)  # No timeout
+        delta = time.time() - start
+        self.assertTrue(delta < 1, delta)
+
+        start = time.time()
+        try:
+            self.client.sleep(2)  # Cause a timeout in 1 s
+            self.fail('Expected exception')
+        except socket.timeout:
+            # Good!
+            delta = time.time() - start
+            self.assertTrue(delta > .25, delta)
+            self.assertTrue(delta < 2, delta)
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/py/utils/test_utils.py b/py/utils/test_utils.py
new file mode 100644
index 0000000..1593d77
--- /dev/null
+++ b/py/utils/test_utils.py
@@ -0,0 +1,16 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+'''Test-related utilities...'''
+
+
+import random
+
+
+def FindUnusedTCPPort():
+    '''Returns an unused TCP port for testing.
+
+    Currently just returns a random port from [10000,20000).
+    '''
+    return random.randint(10000, 19999)