Fix inconsistencies that may arise in event logging.

- Be more careful about creating and re-reading reimage_id and
  device_id.
- Use sync markers to remember which events we have synced to the
  shopfloor server, instead of using event_log_db (which may not have a
  state consistent with the "events" file if an unexpected reboot
  occurs).

BUG=chrome-os-partner:19618
TEST=unit tests
TEST=on device with shopfloor server; make sure that synced events are correct

Change-Id: I4ea9e2b4141ccaa055b480cb99fa93932f4791ff
Reviewed-on: https://gerrit.chromium.org/gerrit/55933
Tested-by: Jon Salz <jsalz@chromium.org>
Reviewed-by: Tom Wai-Hong Tam <waihong@chromium.org>
Commit-Queue: Jon Salz <jsalz@chromium.org>
diff --git a/py/event_log.py b/py/event_log.py
index 8d36455..1bf2e00 100644
--- a/py/event_log.py
+++ b/py/event_log.py
@@ -81,6 +81,42 @@
 EVENT_NAME_RE = re.compile(r"^[a-zA-Z_]\w*$")
 EVENT_KEY_RE = EVENT_NAME_RE
 
+# Sync markers.
+#
+# We will add SYNC_MARKER ("#s\n") to the end of each event.  This is
+# a YAML comment so it does not affect the semantic value of the event
+# at all.
+#
+# If sync markers are enabled, the event log watcher will replace the
+# last "#s" with "#S" after sync.  If it then restarts, it will look
+# for the last "#S" and use that sequence to remember where to resume
+# syncing.  This will look like:
+#
+#   ---
+#   SEQ: 1
+#   foo: a
+#   #s
+#   ---
+#   SEQ: 2
+#   foo: b
+#   #S
+#   ---
+#   SEQ: 3
+#   foo: c
+#   #s
+#   ---
+#
+# In this case, events 1 and 2 have been synced (since the last #S entry is
+# in event 2).  Event 3 has not yet been synced.
+SYNC_MARKER = '#s\n'
+SYNC_MARKER_COMPLETE = '#S\n'
+assert len(SYNC_MARKER) == len(SYNC_MARKER_COMPLETE)
+
+# The strings that the event log watcher will search and replace with
+# to mark a portion of the log as synced.
+SYNC_MARKER_SEARCH = '\n' + SYNC_MARKER + '---\n'
+SYNC_MARKER_REPLACE = '\n' + SYNC_MARKER_COMPLETE + '---\n'
+
 # Since gooftool uses this.
 TimeString = utils.TimeString
 
@@ -225,13 +261,15 @@
   # Always respect the device ID recorded in DEVICE_ID_PATH first.
   if os.path.exists(DEVICE_ID_PATH):
     device_id = open(DEVICE_ID_PATH).read().strip()
-    return device_id
+    if device_id:
+      return device_id
 
   # Find or generate device ID from the search path.
   for path in DEVICE_ID_SEARCH_PATHS:
     if os.path.exists(path):
       device_id = open(path).read().strip()
-      break
+      if device_id:
+        break
   else:
     device_id = str(uuid4())
     logging.warning('No device_id available yet: generated %s', device_id)
@@ -240,6 +278,8 @@
   utils.TryMakeDirs(os.path.dirname(DEVICE_ID_PATH))
   with open(DEVICE_ID_PATH, "w") as f:
     print >> f, device_id
+    f.flush()
+    os.fdatasync(f)
 
   return device_id
 
@@ -263,6 +303,8 @@
       utils.TryMakeDirs(os.path.dirname(REIMAGE_ID_PATH))
       with open(REIMAGE_ID_PATH, "w") as f:
         print >> f, reimage_id
+        f.flush()
+        os.fdatasync(f)
       logging.info('No reimage_id available yet: generated %s', reimage_id)
   return reimage_id
 
@@ -555,7 +597,7 @@
         "PREFIX": self.prefix,
         }
     data.update(kwargs)
-    yaml_data = YamlDump(data) + "---\n"
+    yaml_data = YamlDump(data) + SYNC_MARKER + "---\n"
     fcntl.flock(self.file.fileno(), fcntl.LOCK_EX)
     try:
       self.file.write(yaml_data)
diff --git a/py/event_log_watcher.py b/py/event_log_watcher.py
index 6e0069b..171e7c3 100644
--- a/py/event_log_watcher.py
+++ b/py/event_log_watcher.py
@@ -4,6 +4,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+import collections
 import logging
 import os
 import shelve
@@ -23,21 +24,38 @@
   pass
 
 
+# Chunk scanned by the log watcher.
+#
+# Properties:
+#
+#   log_name: Name of the log
+#   chunk: Value of the chunk
+#   pos: Position of the chunk within the file
+class Chunk(collections.namedtuple('Chunk', 'log_name chunk pos')):
+  # pylint: disable=W0232
+  # pylint: disable=E1101
+  def __str__(self):
+    return 'Chunk(log_name=%r, len=%s, pos=%d)' % (
+        self.log_name, len(self.chunk), self.pos)
+
+
 class EventLogWatcher(object):
   '''An object watches event log and invokes a callback as new logs appear.'''
 
-  def __init__(self, watch_period_sec=30,
-             event_log_dir=event_log.EVENT_LOG_DIR,
-             event_log_db_file=EVENT_LOG_DB_FILE,
-             handle_event_logs_callback=None,
-             num_log_per_callback=0):
+  def __init__(self,
+               watch_period_sec=30,
+               event_log_dir=event_log.EVENT_LOG_DIR,
+               event_log_db_file=EVENT_LOG_DB_FILE,
+               handle_event_logs_callback=None,
+               num_log_per_callback=0):
     '''Constructor.
 
     Args:
       watch_period_sec: The time period in seconds between consecutive
           watches.
-      event_log_db_file: The file in which to store the DB of event logs.
-      handle_event_logs__callback: The callback to trigger after new event logs
+      event_log_db_file: The file in which to store the DB of event logs,
+          or None to use sync markers instead (see event_log.py).
+      handle_event_logs_callback: The callback to trigger after new event logs
           found.
       num_log_per_callback: The maximum number of log files per callback, or 0
           for unlimited number of log files.
@@ -50,9 +68,11 @@
     self._watch_thread = None
     self._aborted = threading.Event()
     self._kick = threading.Event()
-    self._db = self.GetOrCreateDb()
     self._scan_lock = threading.Lock()
 
+    self._use_sync_markers = event_log_db_file is None
+    self._db = {} if self._use_sync_markers else self.GetOrCreateDb()
+
   def StartWatchThread(self):
     '''Starts a thread to watch event logs.'''
     logging.info('Watching event logs...')
@@ -80,11 +100,11 @@
     with self._scan_lock:
       self.ScanEventLogs(False)
 
-  def _CallEventLogHandler(self, chunk_info_list, suppress_error):
+  def _CallEventLogHandler(self, chunks, suppress_error):
     '''Invoke event log handler callback.
 
     Args:
-      chunk_info_list: A list of tuple (log_name, chunk).
+      chunks: A list of Chunks.
       suppress_error: if set to true then any exception from handle event
           log callback will be ignored.
 
@@ -93,7 +113,20 @@
     '''
     try:
       if self._handle_event_logs_callback is not None:
-        self._handle_event_logs_callback(chunk_info_list)
+        self._handle_event_logs_callback(chunks)
+      if self._use_sync_markers:
+        # Update the sync marker in each chunk.
+        for chunk in chunks:
+          last_sync_marker = chunk.chunk.rfind(event_log.SYNC_MARKER_SEARCH)
+          if not last_sync_marker:
+            continue
+          with open(os.path.join(self._event_log_dir, chunk.log_name),
+                    'r+') as f:
+            f.seek(chunk.pos + last_sync_marker)
+            f.write(event_log.SYNC_MARKER_REPLACE)
+            f.flush()
+            os.fdatasync(f)
+
     except: # pylint: disable=W0702
       if suppress_error:
         logging.exception('Upload handler error')
@@ -103,11 +136,12 @@
 
     try:
       # Update log state to db.
-      for log_name, chunk in chunk_info_list:
-        log_state = self._db.setdefault(log_name, {KEY_OFFSET: 0})
-        log_state[KEY_OFFSET] += len(chunk)
-        self._db[log_name] = log_state
-      self._db.sync()
+      for chunk in chunks:
+        log_state = self._db.setdefault(chunk.log_name, {KEY_OFFSET: 0})
+        log_state[KEY_OFFSET] += len(chunk.chunk)
+        self._db[chunk.log_name] = log_state
+      if not self._use_sync_markers:
+        self._db.sync()
     except: # pylint: disable=W0702
       if suppress_error:
         logging.exception('Upload handler error')
@@ -129,7 +163,7 @@
                    self._event_log_dir)
       return
 
-    chunk_info_list = []
+    chunks = []
 
     # Sorts dirs by their names, as its modification time is changed when
     # their files inside are changed/added/removed. Their names are more
@@ -149,7 +183,7 @@
           try:
             chunk_info = self.ScanEventLog(relative_path)
             if chunk_info is not None:
-              chunk_info_list.append(chunk_info)
+              chunks.append(chunk_info)
           except:  # pylint: disable=W0702
             msg = relative_path + ': ' + utils.FormatExceptionOnly()
             if suppress_error:
@@ -157,16 +191,16 @@
             else:
               raise ScanException(msg)
         if (self._num_log_per_callback and
-            len(chunk_info_list) >= self._num_log_per_callback):
-          self._CallEventLogHandler(chunk_info_list, suppress_error)
-          chunk_info_list = []
+            len(chunks) >= self._num_log_per_callback):
+          self._CallEventLogHandler(chunks, suppress_error)
+          chunks = []
           # Skip remaining when abort. We don't want to wait too long for the
           # remaining finished.
           if self._aborted.isSet():
             return
 
-    if chunk_info_list:
-      self._CallEventLogHandler(chunk_info_list, suppress_error)
+    if chunks:
+      self._CallEventLogHandler(chunks, suppress_error)
 
 
   def StopWatchThread(self):
@@ -201,6 +235,8 @@
 
   def GetOrCreateDb(self):
     '''Gets the database or recreate one if exception occurs.'''
+    assert not self._use_sync_markers
+
     try:
       db = OpenShelfOrBackup(self._event_log_db_file)
     except:  # pylint: disable=W0702
@@ -218,7 +254,27 @@
     Args:
       log_name: name of the log file.
     '''
-    log_state = self._db.setdefault(log_name, {KEY_OFFSET: 0})
+    log_state = self._db.get(log_name)
+    if not log_state:
+      # We haven't seen this file yet since starting up.
+      offset = 0
+      if self._use_sync_markers:
+        # Read in the file and set offset from the last sync marker.
+        with open(os.path.join(self._event_log_dir, log_name)) as f:
+          contents = f.read()
+        # Set the offset to just after the last instance of
+        # "\n#S\n---\n".
+        replace_pos = contents.rfind(event_log.SYNC_MARKER_REPLACE)
+        if replace_pos == -1:
+          # Not found; start at the beginning.
+          offset = 0
+        else:
+          offset = replace_pos + len(event_log.SYNC_MARKER_REPLACE)
+      else:
+        # No sync markers; start from the beginning.
+        offset = 0
+      log_state = {KEY_OFFSET: offset}
+      self._db[log_name] = log_state
 
     with open(os.path.join(self._event_log_dir, log_name)) as f:
       f.seek(log_state[KEY_OFFSET])
@@ -227,10 +283,10 @@
       last_separator = chunk.rfind(EVENT_SEPARATOR)
       # No need to proceed if available chunk is empty.
       if last_separator == -1:
-        return
+        return None
 
       chunk = chunk[0:(last_separator + len(EVENT_SEPARATOR))]
-      return (log_name, chunk)
+      return Chunk(log_name, chunk, log_state[KEY_OFFSET])
 
   def GetEventLog(self, log_name):
     '''Gets the log for given log name.'''
diff --git a/py/event_log_watcher_unittest.py b/py/event_log_watcher_unittest.py
index 3790736..f02c75f 100755
--- a/py/event_log_watcher_unittest.py
+++ b/py/event_log_watcher_unittest.py
@@ -13,14 +13,32 @@
 import time
 import unittest
 
+from cros.factory import event_log
 from cros.factory import event_log_watcher
-from cros.factory.event_log_watcher import EventLogWatcher
+from cros.factory.event_log_watcher import Chunk, EventLogWatcher
 
 MOCK_LOG_NAME = lambda x: 'mylog12345%d' % x
-MOCK_PREAMBLE = lambda x: 'device: 123%d\nimage: 456\nmd5: abc\n---\n' % x
-MOCK_EVENT = 'seq: 1\nevent: start\n---\n'
+def MOCK_PREAMBLE(x, sync_marker=False):
+  ret = 'device: 123%d\nimage: 456\nmd5: abc\n' % x
+  if sync_marker:
+    ret += event_log.SYNC_MARKER
+  ret += '---\n'
+  return ret
+def MOCK_EVENT(x=0, sync_marker=False):
+  ret = 'seq: %d\nevent: start\n' % x
+  if sync_marker:
+    ret += event_log.SYNC_MARKER
+  ret += '---\n'
+  return ret
 MOCK_PERIOD = 0.01
 
+
+class ChunkTest(unittest.TestCase):
+  def testStr(self):
+    self.assertEquals("Chunk(log_name='a', len=3, pos=10)",
+                      str(Chunk('a', 'foo', 10)))
+
+
 class EventLogWatcherTest(unittest.TestCase):
   def setUp(self):
     self.temp_dir = tempfile.mkdtemp()
@@ -89,12 +107,12 @@
     self.assertNotEqual(log[event_log_watcher.KEY_OFFSET], 0)
 
     # Write more logs and flush.
-    self.WriteLog(MOCK_EVENT, MOCK_LOG_NAME(0))
+    self.WriteLog(MOCK_EVENT(), MOCK_LOG_NAME(0))
     watcher.FlushEventLogs()
 
     log = watcher.GetEventLog(MOCK_LOG_NAME(0))
     self.assertEqual(log[event_log_watcher.KEY_OFFSET],
-        len(MOCK_PREAMBLE(0)) + len(MOCK_EVENT))
+        len(MOCK_PREAMBLE(0)) + len(MOCK_EVENT()))
 
     watcher.Close()
 
@@ -117,7 +135,8 @@
 
   def testHandleEventLogsCallback(self):
     mock = mox.MockAnything()
-    mock.handle_event_log([(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0))])
+    mock.handle_event_log([
+        Chunk(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0), 0)])
     mox.Replay(mock)
 
     watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, self.db,
@@ -174,10 +193,72 @@
 
     mox.Verify(mock)
 
+  def testSyncMarkers_NoRestart(self):
+    self._testSyncMarkers(False)
+
+  def testSyncMarkers_Restart(self):
+    self._testSyncMarkers(True)
+
+  def _testSyncMarkers(self, unexpected_restart):
+    # pylint: disable=E1102
+    m = mox.Mox()
+    mock_callback = m.CreateMockAnything()
+    path = os.path.join(self.events_dir, MOCK_LOG_NAME(0))
+
+    # No DB; use sync markers.
+    watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, None,
+        mock_callback)
+    self.WriteLog(MOCK_PREAMBLE(0, True), MOCK_LOG_NAME(0))
+
+    mock_callback([
+        Chunk(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0, True), 0)])
+    m.ReplayAll()
+    watcher.ScanEventLogs()
+    m.VerifyAll()
+
+    def ReplaceSyncMarker(s):
+      return s.replace(event_log.SYNC_MARKER_SEARCH,
+                       event_log.SYNC_MARKER_REPLACE)
+
+    # We should have replaced '#s' with '#S' in the preamble.
+    self.assertEqual(ReplaceSyncMarker(MOCK_PREAMBLE(0, True)),
+                     open(path).read())
+
+    if unexpected_restart:
+      # Re-create the event log watcher to zap its state.
+      watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, None,
+                                mock_callback)
+      # The event log watcher has forgotten about the file.
+      self.assertIsNone(watcher.GetEventLog(MOCK_LOG_NAME(0)))
+    else:
+      # The event log watcher has the correct sync offset for the file.
+      self.assertEquals({
+          event_log_watcher.KEY_OFFSET: len(MOCK_PREAMBLE(0, True))},
+                        watcher.GetEventLog(MOCK_LOG_NAME(0)))
+
+    # Write two events; they (but not the preamble) should be scanned.
+    self.WriteLog(MOCK_EVENT(0, True), MOCK_LOG_NAME(0))
+    self.WriteLog(MOCK_EVENT(1, True), MOCK_LOG_NAME(0))
+    m.ResetAll()
+    mock_callback([
+        Chunk(MOCK_LOG_NAME(0), MOCK_EVENT(0, True) + MOCK_EVENT(1, True),
+              len(MOCK_PREAMBLE(0, True)))])
+    m.ReplayAll()
+    watcher.ScanEventLogs()
+    m.VerifyAll()
+
+    # We should have replaced '#s' with '#S' in the preamble and the
+    # second real event.
+    self.assertEqual(ReplaceSyncMarker(MOCK_PREAMBLE(0, True)) +
+                     MOCK_EVENT(0, True) +
+                     ReplaceSyncMarker(MOCK_EVENT(1, True)),
+                     open(path).read())
+
   def testHandleEventLogsFail(self):
     mock = mox.MockAnything()
-    mock.handle_event_log([(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0))]).AndRaise(
-            Exception("Bar"))
+    mock.handle_event_log(
+        [Chunk(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0), 0)]
+        ).AndRaise(Exception("Bar"))
     mox.Replay(mock)
     watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, self.db,
         mock.handle_event_log)
diff --git a/py/goofy/goofy.py b/py/goofy/goofy.py
index 127dbbc..4960fcd 100755
--- a/py/goofy/goofy.py
+++ b/py/goofy/goofy.py
@@ -1112,7 +1112,9 @@
       sys.exit(0)
 
     event_log.IncrementBootSequence()
-    self.event_log = EventLog('goofy')
+    # Don't defer logging the initial event, so we can make sure
+    # that device_id, reimage_id, etc. are all set up.
+    self.event_log = EventLog('goofy', defer=False)
 
     if (not suppress_chroot_warning and
       factory.in_chroot() and
@@ -1230,6 +1232,7 @@
     # syncing), since we may use it to flush event logs as well.
     self.log_watcher = EventLogWatcher(
       self.test_list.options.sync_event_log_period_secs,
+      event_log_db_file=None,
       handle_event_logs_callback=self.handle_event_logs)
     if self.test_list.options.sync_event_log_period_secs:
       self.log_watcher.StartWatchThread()
@@ -1639,31 +1642,32 @@
     self.check_battery()
     self.check_core_dump()
 
-  def handle_event_logs(self, chunk_info):
+  def handle_event_logs(self, chunks):
     '''Callback for event watcher.
 
     Attempts to upload the event logs to the shopfloor server.
 
     Args:
-      chunk_info: A list of tuple (log_name, chunk)
+      chunks: A list of Chunk objects.
     '''
     first_exception = None
     exception_count = 0
 
-    for log_name, chunk in chunk_info:
+    for chunk in chunks:
       try:
-        description = 'event logs (%s, %d bytes)' % (log_name, len(chunk))
+        description = 'event logs (%s)' % chunk
         start_time = time.time()
         shopfloor_client = shopfloor.get_instance(
           detect=True,
           timeout=self.test_list.options.shopfloor_timeout_secs)
-        shopfloor_client.UploadEvent(log_name + "." + event_log.GetReimageId(),
-                                     Binary(chunk))
+        shopfloor_client.UploadEvent(chunk.log_name + "." +
+                                     event_log.GetReimageId(),
+                                     Binary(chunk.chunk))
         logging.info(
           'Successfully synced %s in %.03f s',
           description, time.time() - start_time)
       except: # pylint: disable=W0702
-        first_exception = (first_exception or (log_name + ': ' +
+        first_exception = (first_exception or (chunk.log_name + ': ' +
                                                utils.FormatExceptionOnly()))
         exception_count += 1
 
diff --git a/py/minijack/minijack.py b/py/minijack/minijack.py
index c438361..cd19130 100755
--- a/py/minijack/minijack.py
+++ b/py/minijack/minijack.py
@@ -275,9 +275,9 @@
 
   def HandleEventLogs(self, chunk_info):
     """Callback for event log watcher."""
-    for log_name, chunk in chunk_info:
-      logging.info('Get new event logs (%s, %d bytes)', log_name, len(chunk))
-      blob = EventBlob({'log_name': log_name}, chunk)
+    for chunk in chunk_info:
+      logging.info('Get new event logs (%s)', chunk)
+      blob = EventBlob({'log_name': chunk.log_name}, chunk.chunk)
       self._event_blob_queue.put(blob)
 
   def CheckQueuesEmpty(self):