Fix inconsistencies that may arise in event logging.
- Be more careful about creating and re-reading reimage_id and
device_id.
- Use sync markers to remember which events we have synced to the
shopfloor server, instead of using event_log_db (which may not have a
state consistent with the "events" file if an unexpected reboot
occurs).
BUG=chrome-os-partner:19618
TEST=unit tests
TEST=on device with shopfloor server; make sure that synced events are correct
Change-Id: I4ea9e2b4141ccaa055b480cb99fa93932f4791ff
Reviewed-on: https://gerrit.chromium.org/gerrit/55933
Tested-by: Jon Salz <jsalz@chromium.org>
Reviewed-by: Tom Wai-Hong Tam <waihong@chromium.org>
Commit-Queue: Jon Salz <jsalz@chromium.org>
diff --git a/py/event_log.py b/py/event_log.py
index 8d36455..1bf2e00 100644
--- a/py/event_log.py
+++ b/py/event_log.py
@@ -81,6 +81,42 @@
EVENT_NAME_RE = re.compile(r"^[a-zA-Z_]\w*$")
EVENT_KEY_RE = EVENT_NAME_RE
+# Sync markers.
+#
+# We will add SYNC_MARKER ("#s\n") to the end of each event. This is
+# a YAML comment so it does not affect the semantic value of the event
+# at all.
+#
+# If sync markers are enabled, the event log watcher will replace the
+# last "#s" with "#S" after sync. If it then restarts, it will look
+# for the last "#S" and use that sequence to remember where to resume
+# syncing. This will look like:
+#
+# ---
+# SEQ: 1
+# foo: a
+# #s
+# ---
+# SEQ: 2
+# foo: b
+# #S
+# ---
+# SEQ: 3
+# foo: c
+# #s
+# ---
+#
+# In this case, events 1 and 2 have been synced (since the last #S entry is
+# in event 2). Event 3 has not yet been synced.
+SYNC_MARKER = '#s\n'
+SYNC_MARKER_COMPLETE = '#S\n'
+assert len(SYNC_MARKER) == len(SYNC_MARKER_COMPLETE)
+
+# The strings that the event log watcher will search and replace with
+# to mark a portion of the log as synced.
+SYNC_MARKER_SEARCH = '\n' + SYNC_MARKER + '---\n'
+SYNC_MARKER_REPLACE = '\n' + SYNC_MARKER_COMPLETE + '---\n'
+
# Since gooftool uses this.
TimeString = utils.TimeString
@@ -225,13 +261,15 @@
# Always respect the device ID recorded in DEVICE_ID_PATH first.
if os.path.exists(DEVICE_ID_PATH):
device_id = open(DEVICE_ID_PATH).read().strip()
- return device_id
+ if device_id:
+ return device_id
# Find or generate device ID from the search path.
for path in DEVICE_ID_SEARCH_PATHS:
if os.path.exists(path):
device_id = open(path).read().strip()
- break
+ if device_id:
+ break
else:
device_id = str(uuid4())
logging.warning('No device_id available yet: generated %s', device_id)
@@ -240,6 +278,8 @@
utils.TryMakeDirs(os.path.dirname(DEVICE_ID_PATH))
with open(DEVICE_ID_PATH, "w") as f:
print >> f, device_id
+ f.flush()
+ os.fdatasync(f)
return device_id
@@ -263,6 +303,8 @@
utils.TryMakeDirs(os.path.dirname(REIMAGE_ID_PATH))
with open(REIMAGE_ID_PATH, "w") as f:
print >> f, reimage_id
+ f.flush()
+ os.fdatasync(f)
logging.info('No reimage_id available yet: generated %s', reimage_id)
return reimage_id
@@ -555,7 +597,7 @@
"PREFIX": self.prefix,
}
data.update(kwargs)
- yaml_data = YamlDump(data) + "---\n"
+ yaml_data = YamlDump(data) + SYNC_MARKER + "---\n"
fcntl.flock(self.file.fileno(), fcntl.LOCK_EX)
try:
self.file.write(yaml_data)
diff --git a/py/event_log_watcher.py b/py/event_log_watcher.py
index 6e0069b..171e7c3 100644
--- a/py/event_log_watcher.py
+++ b/py/event_log_watcher.py
@@ -4,6 +4,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import collections
import logging
import os
import shelve
@@ -23,21 +24,38 @@
pass
+# Chunk scanned by the log watcher.
+#
+# Properties:
+#
+# log_name: Name of the log
+# chunk: Value of the chunk
+# pos: Position of the chunk within the file
+class Chunk(collections.namedtuple('Chunk', 'log_name chunk pos')):
+ # pylint: disable=W0232
+ # pylint: disable=E1101
+ def __str__(self):
+ return 'Chunk(log_name=%r, len=%s, pos=%d)' % (
+ self.log_name, len(self.chunk), self.pos)
+
+
class EventLogWatcher(object):
'''An object watches event log and invokes a callback as new logs appear.'''
- def __init__(self, watch_period_sec=30,
- event_log_dir=event_log.EVENT_LOG_DIR,
- event_log_db_file=EVENT_LOG_DB_FILE,
- handle_event_logs_callback=None,
- num_log_per_callback=0):
+ def __init__(self,
+ watch_period_sec=30,
+ event_log_dir=event_log.EVENT_LOG_DIR,
+ event_log_db_file=EVENT_LOG_DB_FILE,
+ handle_event_logs_callback=None,
+ num_log_per_callback=0):
'''Constructor.
Args:
watch_period_sec: The time period in seconds between consecutive
watches.
- event_log_db_file: The file in which to store the DB of event logs.
- handle_event_logs__callback: The callback to trigger after new event logs
+ event_log_db_file: The file in which to store the DB of event logs,
+ or None to use sync markers instead (see event_log.py).
+ handle_event_logs_callback: The callback to trigger after new event logs
found.
num_log_per_callback: The maximum number of log files per callback, or 0
for unlimited number of log files.
@@ -50,9 +68,11 @@
self._watch_thread = None
self._aborted = threading.Event()
self._kick = threading.Event()
- self._db = self.GetOrCreateDb()
self._scan_lock = threading.Lock()
+ self._use_sync_markers = event_log_db_file is None
+ self._db = {} if self._use_sync_markers else self.GetOrCreateDb()
+
def StartWatchThread(self):
'''Starts a thread to watch event logs.'''
logging.info('Watching event logs...')
@@ -80,11 +100,11 @@
with self._scan_lock:
self.ScanEventLogs(False)
- def _CallEventLogHandler(self, chunk_info_list, suppress_error):
+ def _CallEventLogHandler(self, chunks, suppress_error):
'''Invoke event log handler callback.
Args:
- chunk_info_list: A list of tuple (log_name, chunk).
+ chunks: A list of Chunks.
suppress_error: if set to true then any exception from handle event
log callback will be ignored.
@@ -93,7 +113,20 @@
'''
try:
if self._handle_event_logs_callback is not None:
- self._handle_event_logs_callback(chunk_info_list)
+ self._handle_event_logs_callback(chunks)
+ if self._use_sync_markers:
+ # Update the sync marker in each chunk.
+ for chunk in chunks:
+ last_sync_marker = chunk.chunk.rfind(event_log.SYNC_MARKER_SEARCH)
+ if not last_sync_marker:
+ continue
+ with open(os.path.join(self._event_log_dir, chunk.log_name),
+ 'r+') as f:
+ f.seek(chunk.pos + last_sync_marker)
+ f.write(event_log.SYNC_MARKER_REPLACE)
+ f.flush()
+ os.fdatasync(f)
+
except: # pylint: disable=W0702
if suppress_error:
logging.exception('Upload handler error')
@@ -103,11 +136,12 @@
try:
# Update log state to db.
- for log_name, chunk in chunk_info_list:
- log_state = self._db.setdefault(log_name, {KEY_OFFSET: 0})
- log_state[KEY_OFFSET] += len(chunk)
- self._db[log_name] = log_state
- self._db.sync()
+ for chunk in chunks:
+ log_state = self._db.setdefault(chunk.log_name, {KEY_OFFSET: 0})
+ log_state[KEY_OFFSET] += len(chunk.chunk)
+ self._db[chunk.log_name] = log_state
+ if not self._use_sync_markers:
+ self._db.sync()
except: # pylint: disable=W0702
if suppress_error:
logging.exception('Upload handler error')
@@ -129,7 +163,7 @@
self._event_log_dir)
return
- chunk_info_list = []
+ chunks = []
# Sorts dirs by their names, as its modification time is changed when
# their files inside are changed/added/removed. Their names are more
@@ -149,7 +183,7 @@
try:
chunk_info = self.ScanEventLog(relative_path)
if chunk_info is not None:
- chunk_info_list.append(chunk_info)
+ chunks.append(chunk_info)
except: # pylint: disable=W0702
msg = relative_path + ': ' + utils.FormatExceptionOnly()
if suppress_error:
@@ -157,16 +191,16 @@
else:
raise ScanException(msg)
if (self._num_log_per_callback and
- len(chunk_info_list) >= self._num_log_per_callback):
- self._CallEventLogHandler(chunk_info_list, suppress_error)
- chunk_info_list = []
+ len(chunks) >= self._num_log_per_callback):
+ self._CallEventLogHandler(chunks, suppress_error)
+ chunks = []
# Skip remaining when abort. We don't want to wait too long for the
# remaining finished.
if self._aborted.isSet():
return
- if chunk_info_list:
- self._CallEventLogHandler(chunk_info_list, suppress_error)
+ if chunks:
+ self._CallEventLogHandler(chunks, suppress_error)
def StopWatchThread(self):
@@ -201,6 +235,8 @@
def GetOrCreateDb(self):
'''Gets the database or recreate one if exception occurs.'''
+ assert not self._use_sync_markers
+
try:
db = OpenShelfOrBackup(self._event_log_db_file)
except: # pylint: disable=W0702
@@ -218,7 +254,27 @@
Args:
log_name: name of the log file.
'''
- log_state = self._db.setdefault(log_name, {KEY_OFFSET: 0})
+ log_state = self._db.get(log_name)
+ if not log_state:
+ # We haven't seen this file yet since starting up.
+ offset = 0
+ if self._use_sync_markers:
+ # Read in the file and set offset from the last sync marker.
+ with open(os.path.join(self._event_log_dir, log_name)) as f:
+ contents = f.read()
+ # Set the offset to just after the last instance of
+ # "\n#S\n---\n".
+ replace_pos = contents.rfind(event_log.SYNC_MARKER_REPLACE)
+ if replace_pos == -1:
+ # Not found; start at the beginning.
+ offset = 0
+ else:
+ offset = replace_pos + len(event_log.SYNC_MARKER_REPLACE)
+ else:
+ # No sync markers; start from the beginning.
+ offset = 0
+ log_state = {KEY_OFFSET: offset}
+ self._db[log_name] = log_state
with open(os.path.join(self._event_log_dir, log_name)) as f:
f.seek(log_state[KEY_OFFSET])
@@ -227,10 +283,10 @@
last_separator = chunk.rfind(EVENT_SEPARATOR)
# No need to proceed if available chunk is empty.
if last_separator == -1:
- return
+ return None
chunk = chunk[0:(last_separator + len(EVENT_SEPARATOR))]
- return (log_name, chunk)
+ return Chunk(log_name, chunk, log_state[KEY_OFFSET])
def GetEventLog(self, log_name):
'''Gets the log for given log name.'''
diff --git a/py/event_log_watcher_unittest.py b/py/event_log_watcher_unittest.py
index 3790736..f02c75f 100755
--- a/py/event_log_watcher_unittest.py
+++ b/py/event_log_watcher_unittest.py
@@ -13,14 +13,32 @@
import time
import unittest
+from cros.factory import event_log
from cros.factory import event_log_watcher
-from cros.factory.event_log_watcher import EventLogWatcher
+from cros.factory.event_log_watcher import Chunk, EventLogWatcher
MOCK_LOG_NAME = lambda x: 'mylog12345%d' % x
-MOCK_PREAMBLE = lambda x: 'device: 123%d\nimage: 456\nmd5: abc\n---\n' % x
-MOCK_EVENT = 'seq: 1\nevent: start\n---\n'
+def MOCK_PREAMBLE(x, sync_marker=False):
+ ret = 'device: 123%d\nimage: 456\nmd5: abc\n' % x
+ if sync_marker:
+ ret += event_log.SYNC_MARKER
+ ret += '---\n'
+ return ret
+def MOCK_EVENT(x=0, sync_marker=False):
+ ret = 'seq: %d\nevent: start\n' % x
+ if sync_marker:
+ ret += event_log.SYNC_MARKER
+ ret += '---\n'
+ return ret
MOCK_PERIOD = 0.01
+
+class ChunkTest(unittest.TestCase):
+ def testStr(self):
+ self.assertEquals("Chunk(log_name='a', len=3, pos=10)",
+ str(Chunk('a', 'foo', 10)))
+
+
class EventLogWatcherTest(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
@@ -89,12 +107,12 @@
self.assertNotEqual(log[event_log_watcher.KEY_OFFSET], 0)
# Write more logs and flush.
- self.WriteLog(MOCK_EVENT, MOCK_LOG_NAME(0))
+ self.WriteLog(MOCK_EVENT(), MOCK_LOG_NAME(0))
watcher.FlushEventLogs()
log = watcher.GetEventLog(MOCK_LOG_NAME(0))
self.assertEqual(log[event_log_watcher.KEY_OFFSET],
- len(MOCK_PREAMBLE(0)) + len(MOCK_EVENT))
+ len(MOCK_PREAMBLE(0)) + len(MOCK_EVENT()))
watcher.Close()
@@ -117,7 +135,8 @@
def testHandleEventLogsCallback(self):
mock = mox.MockAnything()
- mock.handle_event_log([(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0))])
+ mock.handle_event_log([
+ Chunk(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0), 0)])
mox.Replay(mock)
watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, self.db,
@@ -174,10 +193,72 @@
mox.Verify(mock)
+ def testSyncMarkers_NoRestart(self):
+ self._testSyncMarkers(False)
+
+ def testSyncMarkers_Restart(self):
+ self._testSyncMarkers(True)
+
+ def _testSyncMarkers(self, unexpected_restart):
+ # pylint: disable=E1102
+ m = mox.Mox()
+ mock_callback = m.CreateMockAnything()
+ path = os.path.join(self.events_dir, MOCK_LOG_NAME(0))
+
+ # No DB; use sync markers.
+ watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, None,
+ mock_callback)
+ self.WriteLog(MOCK_PREAMBLE(0, True), MOCK_LOG_NAME(0))
+
+ mock_callback([
+ Chunk(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0, True), 0)])
+ m.ReplayAll()
+ watcher.ScanEventLogs()
+ m.VerifyAll()
+
+ def ReplaceSyncMarker(s):
+ return s.replace(event_log.SYNC_MARKER_SEARCH,
+ event_log.SYNC_MARKER_REPLACE)
+
+ # We should have replaced '#s' with '#S' in the preamble.
+ self.assertEqual(ReplaceSyncMarker(MOCK_PREAMBLE(0, True)),
+ open(path).read())
+
+ if unexpected_restart:
+ # Re-create the event log watcher to zap its state.
+ watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, None,
+ mock_callback)
+ # The event log watcher has forgotten about the file.
+ self.assertIsNone(watcher.GetEventLog(MOCK_LOG_NAME(0)))
+ else:
+ # The event log watcher has the correct sync offset for the file.
+ self.assertEquals({
+ event_log_watcher.KEY_OFFSET: len(MOCK_PREAMBLE(0, True))},
+ watcher.GetEventLog(MOCK_LOG_NAME(0)))
+
+ # Write two events; they (but not the preamble) should be scanned.
+ self.WriteLog(MOCK_EVENT(0, True), MOCK_LOG_NAME(0))
+ self.WriteLog(MOCK_EVENT(1, True), MOCK_LOG_NAME(0))
+ m.ResetAll()
+ mock_callback([
+ Chunk(MOCK_LOG_NAME(0), MOCK_EVENT(0, True) + MOCK_EVENT(1, True),
+ len(MOCK_PREAMBLE(0, True)))])
+ m.ReplayAll()
+ watcher.ScanEventLogs()
+ m.VerifyAll()
+
+ # We should have replaced '#s' with '#S' in the preamble and the
+ # second real event.
+ self.assertEqual(ReplaceSyncMarker(MOCK_PREAMBLE(0, True)) +
+ MOCK_EVENT(0, True) +
+ ReplaceSyncMarker(MOCK_EVENT(1, True)),
+ open(path).read())
+
def testHandleEventLogsFail(self):
mock = mox.MockAnything()
- mock.handle_event_log([(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0))]).AndRaise(
- Exception("Bar"))
+ mock.handle_event_log(
+ [Chunk(MOCK_LOG_NAME(0), MOCK_PREAMBLE(0), 0)]
+ ).AndRaise(Exception("Bar"))
mox.Replay(mock)
watcher = EventLogWatcher(MOCK_PERIOD, self.events_dir, self.db,
mock.handle_event_log)
diff --git a/py/goofy/goofy.py b/py/goofy/goofy.py
index 127dbbc..4960fcd 100755
--- a/py/goofy/goofy.py
+++ b/py/goofy/goofy.py
@@ -1112,7 +1112,9 @@
sys.exit(0)
event_log.IncrementBootSequence()
- self.event_log = EventLog('goofy')
+ # Don't defer logging the initial event, so we can make sure
+ # that device_id, reimage_id, etc. are all set up.
+ self.event_log = EventLog('goofy', defer=False)
if (not suppress_chroot_warning and
factory.in_chroot() and
@@ -1230,6 +1232,7 @@
# syncing), since we may use it to flush event logs as well.
self.log_watcher = EventLogWatcher(
self.test_list.options.sync_event_log_period_secs,
+ event_log_db_file=None,
handle_event_logs_callback=self.handle_event_logs)
if self.test_list.options.sync_event_log_period_secs:
self.log_watcher.StartWatchThread()
@@ -1639,31 +1642,32 @@
self.check_battery()
self.check_core_dump()
- def handle_event_logs(self, chunk_info):
+ def handle_event_logs(self, chunks):
'''Callback for event watcher.
Attempts to upload the event logs to the shopfloor server.
Args:
- chunk_info: A list of tuple (log_name, chunk)
+ chunks: A list of Chunk objects.
'''
first_exception = None
exception_count = 0
- for log_name, chunk in chunk_info:
+ for chunk in chunks:
try:
- description = 'event logs (%s, %d bytes)' % (log_name, len(chunk))
+ description = 'event logs (%s)' % chunk
start_time = time.time()
shopfloor_client = shopfloor.get_instance(
detect=True,
timeout=self.test_list.options.shopfloor_timeout_secs)
- shopfloor_client.UploadEvent(log_name + "." + event_log.GetReimageId(),
- Binary(chunk))
+ shopfloor_client.UploadEvent(chunk.log_name + "." +
+ event_log.GetReimageId(),
+ Binary(chunk.chunk))
logging.info(
'Successfully synced %s in %.03f s',
description, time.time() - start_time)
except: # pylint: disable=W0702
- first_exception = (first_exception or (log_name + ': ' +
+ first_exception = (first_exception or (chunk.log_name + ': ' +
utils.FormatExceptionOnly()))
exception_count += 1
diff --git a/py/minijack/minijack.py b/py/minijack/minijack.py
index c438361..cd19130 100755
--- a/py/minijack/minijack.py
+++ b/py/minijack/minijack.py
@@ -275,9 +275,9 @@
def HandleEventLogs(self, chunk_info):
"""Callback for event log watcher."""
- for log_name, chunk in chunk_info:
- logging.info('Get new event logs (%s, %d bytes)', log_name, len(chunk))
- blob = EventBlob({'log_name': log_name}, chunk)
+ for chunk in chunk_info:
+ logging.info('Get new event logs (%s)', chunk)
+ blob = EventBlob({'log_name': chunk.log_name}, chunk.chunk)
self._event_blob_queue.put(blob)
def CheckQueuesEmpty(self):