Refactor devserver/downloader interaction to fix race-conditions.

The old code had two race condtions that could potentially cause
incorrect behavior when two downloads calls for the same archive_url
were made at the same time.

This change moves most of the synchronization logic out of downloader
and into devserver. We synchronize based on the archive_url in
download. By doing this, this guarantees that we are only processing
the foreground artifacts for a specific archive_url one at a time.

BUG=chromium-os:31441
TEST=Unittests for devserver/downloader. Also ran meta-test against
archive_url=gs://chromeos-image-archive/x86-alex-release/R19-2003.0.0-a1-b1819
For this test I set up a local devserver and made sequential download
calls while the first download was being made -- saw that they blocked
and returned success correctly after the foreground artifacts completed.
I also verified wait_for_status and download worked once those
artifacts were already staged.

Change-Id: Iccbd43974ec6929e3298a982981b3315fd0cba03
Reviewed-on: https://gerrit.chromium.org/gerrit/24270
Reviewed-by: Chris Sosa <sosa@chromium.org>
Tested-by: Chris Sosa <sosa@chromium.org>
Commit-Ready: Chris Sosa <sosa@chromium.org>
diff --git a/downloader.py b/downloader.py
index 955e796..3d4ea23 100755
--- a/downloader.py
+++ b/downloader.py
@@ -30,19 +30,18 @@
     self._static_dir = static_dir
     self._build_dir = None
     self._staging_dir = None
-    self._status_queue = multiprocessing.Queue()
+    self._status_queue = multiprocessing.Queue(maxsize=1)
     self._lock_tag = None
 
   @staticmethod
-  def CanonicalizeAndParse(archive_url):
-    """Canonicalize archive_url and parse it into its component parts.
+  def ParseUrl(archive_url):
+    """Parse archive_url into its component parts.
 
     @param archive_url: a URL at which build artifacts are archived.
-    @return a tuple of (canonicalized URL, build target, short build name)
+    @return a tuple of (build target, short build name)
     """
-    archive_url = archive_url.rstrip('/')
     target, short_build = archive_url.rsplit('/', 2)[-2:]
-    return archive_url, target, short_build
+    return target, short_build
 
   @staticmethod
   def GenerateLockTag(target, short_build):
@@ -57,7 +56,7 @@
   @staticmethod
   def BuildStaged(archive_url, static_dir):
     """Returns True if the build is already staged."""
-    _, target, short_build = Downloader.CanonicalizeAndParse(archive_url)
+    target, short_build = Downloader.ParseUrl(archive_url)
     sub_directory = Downloader.GenerateLockTag(target, short_build)
     return os.path.isdir(os.path.join(static_dir, sub_directory))
 
@@ -73,18 +72,13 @@
     """
     # Parse archive_url into target and short_build.
     # e.g. gs://chromeos-image-archive/{target}/{short_build}
-    archive_url, target, short_build = self.CanonicalizeAndParse(archive_url)
-
+    target, short_build = self.ParseUrl(archive_url)
+    # This should never happen. The Devserver should only try to call this
+    # method if no previous downloads have been staged for this archive_url.
+    assert not Downloader.BuildStaged(archive_url, self._static_dir)
     # Bind build_dir and staging_dir here so we can tell if we need to do any
     # cleanup after an exception occurs before build_dir is set.
     self._lock_tag = self.GenerateLockTag(target, short_build)
-
-    if Downloader.BuildStaged(archive_url, self._static_dir):
-      cherrypy.log('Build %s has already been processed.' % self._lock_tag,
-                   self._LOG_TAG)
-      self._status_queue.put('Success')
-      return 'Success'
-
     try:
       # Create Dev Server directory for this build and tell other Downloader
       # instances we have processed this build.
@@ -110,7 +104,7 @@
           background_artifacts.append(artifact)
 
       if background:
-        self._DownloadArtifactsInBackground(background_artifacts, archive_url)
+        self._DownloadArtifactsInBackground(background_artifacts)
       else:
         self._DownloadArtifactsSerially(background_artifacts)
 
@@ -156,7 +150,7 @@
     finally:
       self._Cleanup()
 
-  def _DownloadArtifactsInBackground(self, artifacts, archive_url):
+  def _DownloadArtifactsInBackground(self, artifacts):
     """Downloads |artifacts| in the background and signals when complete."""
     proc = multiprocessing.Process(target=self._DownloadArtifactsSerially,
                                    args=(artifacts,))
@@ -204,14 +198,14 @@
   def GenerateLockTag(target, short_build):
     return '/'.join([target, short_build, 'symbols'])
 
-  def Download(self, archive_url):
+  def Download(self, archive_url, _background=False):
     """Downloads debug symbols for the build defined by the |archive_url|.
 
     The symbols will be downloaded synchronously
     """
     # Parse archive_url into target and short_build.
     # e.g. gs://chromeos-image-archive/{target}/{short_build}
-    archive_url, target, short_build = self.CanonicalizeAndParse(archive_url)
+    target, short_build = self.ParseUrl(archive_url)
 
     # Bind build_dir and staging_dir here so we can tell if we need to do any
     # cleanup after an exception occurs before build_dir is set.
@@ -276,7 +270,7 @@
 
   def SymbolsStaged(self, archive_url, static_dir):
     """Returns True if the build is already staged."""
-    _, target, short_build = self.CanonicalizeAndParse(archive_url)
+    target, short_build = self.ParseUrl(archive_url)
     sub_directory = self.GenerateLockTag(target, short_build)
     return os.path.isfile(os.path.join(static_dir,
                                        sub_directory,