lib/metrics.py: Added Queue indirection

When using multiprocessing in cbuildbot, we end up creating a duplicate
ts_mon flushing thread. The parent and child threads may end send
conflicting messages. Furthermore, messages from the child may not be
sent if the child process finishes before its thread flushes messages.
To solve this, we add an "indirect" flag to SetupTsMonGlobalState, which
creates a multiprocessing.Queue. The parent and child send metrics to
the Queue, and a separate process consumes these messages and emits
metrics via ts_mon.

BUG=chromium:623293
TEST=cbuildbot binhost-pre-cq -p chromiumos/chromite --local

Change-Id: Ib6a414c0f4139fa523755ad7f29e593454570000
Reviewed-on: https://chromium-review.googlesource.com/363502
Commit-Ready: Paul Hobbs <phobbs@google.com>
Tested-by: Paul Hobbs <phobbs@google.com>
Reviewed-by: Paul Hobbs <phobbs@google.com>
diff --git a/scripts/cbuildbot.py b/scripts/cbuildbot.py
index 8638b1a..6ec5e9d 100644
--- a/scripts/cbuildbot.py
+++ b/scripts/cbuildbot.py
@@ -1015,11 +1015,13 @@
 
   if run_type == _ENVIRONMENT_PROD:
     cidb.CIDBConnectionFactory.SetupProdCidb()
-    ts_mon_config.SetupTsMonGlobalState('cbuildbot')
+    context = ts_mon_config.SetupTsMonGlobalState('cbuildbot', indirect=True)
   elif run_type == _ENVIRONMENT_DEBUG:
     cidb.CIDBConnectionFactory.SetupDebugCidb()
+    context = ts_mon_config.TrivialContextManager()
   else:
     cidb.CIDBConnectionFactory.SetupNoCidb()
+    context = ts_mon_config.TrivialContextManager()
 
   db = cidb.CIDBConnectionFactory.GetCIDBConnectionForBuilder()
   topology.FetchTopologyFromCIDB(db)
@@ -1034,6 +1036,8 @@
     graphite.ESMetadataFactory.SetupReadOnly()
     graphite.StatsFactory.SetupMock()
 
+  return context
+
 
 def _FetchInitialBootstrapConfigRepo(repo_url, branch_name):
   """Fetch the TOT site config repo, if necessary to start bootstrap."""
@@ -1120,9 +1124,9 @@
       _ConfirmRemoteBuildbotRun()
 
     print('Submitting tryjob...')
-    _SetupConnections(options, build_config)
-    tryjob = remote_try.RemoteTryJob(options, args, patch_pool.local_patches)
-    tryjob.Submit(testjob=options.test_tryjob, dryrun=False)
+    with _SetupConnections(options, build_config):
+      tryjob = remote_try.RemoteTryJob(options, args, patch_pool.local_patches)
+      tryjob.Submit(testjob=options.test_tryjob, dryrun=False)
     print('Tryjob submitted!')
     print(('Go to %s to view the status of your job.'
            % tryjob.GetTrybotWaterfallLink()))
@@ -1246,8 +1250,7 @@
                 '_FetchSlaveStatuses',
                 return_value=mock_statuses)
 
-    _SetupConnections(options, build_config)
-    stack.Add(ts_mon_config.TsMonFlusher)
+    stack.Add(_SetupConnections, options, build_config)
     retry_stats.SetupStats()
 
     timeout_display_message = None