Refactor devserver/downloader interaction to fix race-conditions.

The old code had two race condtions that could potentially cause
incorrect behavior when two downloads calls for the same archive_url
were made at the same time.

This change moves most of the synchronization logic out of downloader
and into devserver. We synchronize based on the archive_url in
download. By doing this, this guarantees that we are only processing
the foreground artifacts for a specific archive_url one at a time.

BUG=chromium-os:31441
TEST=Unittests for devserver/downloader. Also ran meta-test against
archive_url=gs://chromeos-image-archive/x86-alex-release/R19-2003.0.0-a1-b1819
For this test I set up a local devserver and made sequential download
calls while the first download was being made -- saw that they blocked
and returned success correctly after the foreground artifacts completed.
I also verified wait_for_status and download worked once those
artifacts were already staged.

Change-Id: Iccbd43974ec6929e3298a982981b3315fd0cba03
Reviewed-on: https://gerrit.chromium.org/gerrit/24270
Reviewed-by: Chris Sosa <sosa@chromium.org>
Tested-by: Chris Sosa <sosa@chromium.org>
Commit-Ready: Chris Sosa <sosa@chromium.org>
diff --git a/devserver.py b/devserver.py
index 54b80e8..b952716 100755
--- a/devserver.py
+++ b/devserver.py
@@ -7,8 +7,8 @@
 """A CherryPy-based webserver to host images and build packages."""
 
 import cherrypy
-import cStringIO
 import logging
+import multiprocessing
 import optparse
 import os
 import re
@@ -128,8 +128,10 @@
       cherrypy.log('removing stale symlink to %s' % image_dir, 'DEVSERVER')
       os.unlink('%s/archive' % static_dir)
       os.symlink(image_dir, '%s/archive' % static_dir)
+
   else:
     os.symlink(image_dir, '%s/archive' % static_dir)
+
   cherrypy.log('archive dir: %s ready to be used to serve images.' % image_dir,
                'DEVSERVER')
 
@@ -199,6 +201,8 @@
 
   def __init__(self):
     self._builder = None
+    self._lock_dict_lock = multiprocessing.Lock()
+    self._lock_dict = {}
     self._downloader_dict = {}
 
   @cherrypy.expose
@@ -209,6 +213,39 @@
       self._builder = builder.Builder()
     return self._builder.Build(board, pkg, kwargs)
 
+  def _get_lock_for_archive_url(self, archive_url):
+    """Return a multiprocessing lock to use per archive_url.
+
+    Use this lock to protect critical zones per archive_url.
+
+    Usage:
+      with DevserverInstance._get_lock_for_archive_url(archive_url):
+        # CRITICAL AREA FOR ARCHIVE_URL.
+
+    Returns:
+      A multiprocessing lock that is archive_url specific.
+    """
+    with self._lock_dict_lock:
+      lock = self._lock_dict.get(archive_url)
+      if lock:
+        return lock
+      else:
+        lock = multiprocessing.Lock()
+        self._lock_dict[archive_url] = lock
+        return lock
+
+  @staticmethod
+  def _canonicalize_archive_url(archive_url):
+    """Canonicalizes archive_url strings.
+
+    Raises:
+      DevserverError: if archive_url is not set.
+    """
+    if archive_url:
+      return archive_url.rstrip('/')
+    else:
+      raise DevServerError("Must specify an archive_url in the request")
+
   @cherrypy.expose
   def download(self, **kwargs):
     """Downloads and archives full/delta payloads from Google Storage.
@@ -225,21 +262,55 @@
       'http://myhost/download?archive_url=gs://chromeos-image-archive/'
       'x86-generic/R17-1208.0.0-a1-b338'
     """
-    downloader_instance = downloader.Downloader(updater.static_dir)
-    archive_url = kwargs.get('archive_url')
-    if not archive_url:
-      raise DevServerError("Didn't specify the archive_url in request")
+    archive_url = self._canonicalize_archive_url(kwargs.get('archive_url'))
 
-    # Do this before we start such that other calls to the downloader or
-    # wait_for_status are blocked until this completed/failed.
-    self._downloader_dict[archive_url] = downloader_instance
-    try:
-      return_obj = downloader_instance.Download(archive_url, background=True)
-    except:
+    # Guarantees that no two downloads for the same url can run this code
+    # at the same time.
+    with self._get_lock_for_archive_url(archive_url):
+      try:
+        # If we are currently downloading, return. Note, due to the above lock
+        # we know that the foreground artifacts must have finished downloading
+        # and returned Success if this downloader instance exists.
+        if (self._downloader_dict.get(archive_url) or
+            downloader.Downloader.BuildStaged(archive_url, updater.static_dir)):
+          cherrypy.log('Build %s has already been processed.' % archive_url,
+                       'DEVSERVER')
+          return 'Success'
+
+        downloader_instance = downloader.Downloader(updater.static_dir)
+        self._downloader_dict[archive_url] = downloader_instance
+        return downloader_instance.Download(archive_url, background=True)
+
+      except:
+        # On any exception, reset the state of the downloader_dict.
+        self._downloader_dict[archive_url] = None
+
+  @cherrypy.expose
+  def wait_for_status(self, **kwargs):
+    """Waits for background artifacts to be downloaded from Google Storage.
+
+    Args:
+      archive_url: Google Storage URL for the build.
+
+    Example URL:
+      'http://myhost/wait_for_status?archive_url=gs://chromeos-image-archive/'
+      'x86-generic/R17-1208.0.0-a1-b338'
+    """
+    archive_url = self._canonicalize_archive_url(kwargs.get('archive_url'))
+    downloader_instance = self._downloader_dict.get(archive_url)
+    if downloader_instance:
+      status = downloader_instance.GetStatusOfBackgroundDownloads()
       self._downloader_dict[archive_url] = None
-      raise
-
-    return return_obj
+      return status
+    else:
+      # We may have previously downloaded but removed the downloader instance
+      # from the cache.
+      if downloader.Downloader.BuildStaged(archive_url, updater.static_dir):
+        logging.info('%s not found in downloader cache but previously staged.',
+                     archive_url)
+        return 'Success'
+      else:
+        raise DevServerError('No download for the given archive_url found.')
 
   @cherrypy.expose
   def stage_debug(self, **kwargs):
@@ -255,10 +326,7 @@
       'http://myhost/stage_debug?archive_url=gs://chromeos-image-archive/'
       'x86-generic/R17-1208.0.0-a1-b338'
     """
-    archive_url = kwargs.get('archive_url')
-    if not archive_url:
-      raise DevServerError("Didn't specify the archive_url in request")
-
+    archive_url = self._canonicalize_archive_url(kwargs.get('archive_url'))
     return downloader.SymbolDownloader(updater.static_dir).Download(archive_url)
 
   @cherrypy.expose
@@ -297,36 +365,6 @@
     return to_return
 
   @cherrypy.expose
-  def wait_for_status(self, **kwargs):
-    """Waits for background artifacts to be downloaded from Google Storage.
-
-    Args:
-      archive_url: Google Storage URL for the build.
-
-    Example URL:
-      'http://myhost/wait_for_status?archive_url=gs://chromeos-image-archive/'
-      'x86-generic/R17-1208.0.0-a1-b338'
-    """
-    archive_url = kwargs.get('archive_url')
-    if not archive_url:
-      raise DevServerError("Didn't specify the archive_url in request")
-
-    downloader_instance = self._downloader_dict.get(archive_url)
-    if downloader_instance:
-      status = downloader_instance.GetStatusOfBackgroundDownloads()
-      self._downloader_dict[archive_url] = None
-      return status
-    else:
-      # We may have previously downloaded but removed the downloader instance
-      # from the cache.
-      if downloader.Downloader.BuildStaged(archive_url, updater.static_dir):
-        logging.info('%s not found in downloader cache but previously staged.',
-                     archive_url)
-        return 'Success'
-      else:
-        raise DevServerError('No download for the given archive_url found.')
-
-  @cherrypy.expose
   def latestbuild(self, **params):
     """Return a string representing the latest build for a given target.
 
@@ -398,7 +436,7 @@
     return updater.HandleUpdatePing(data, label)
 
 
-if __name__ == '__main__':
+def main():
   usage = 'usage: %prog [options]'
   parser = optparse.OptionParser(usage=usage)
   parser.add_option('--archive_dir', dest='archive_dir',
@@ -507,6 +545,7 @@
   cherrypy.log('Source root is %s' % root_dir, 'DEVSERVER')
   cherrypy.log('Serving from %s' % static_dir, 'DEVSERVER')
 
+  global updater
   updater = autoupdate.Autoupdate(
       root_dir=root_dir,
       static_dir=static_dir,
@@ -543,3 +582,7 @@
   # If the command line requested after setup, it's time to do it.
   if not options.exit:
     cherrypy.quickstart(DevServerRoot(), config=_GetConfig(options))
+
+
+if __name__ == '__main__':
+  main()