Implement stage RPC which allows user to specify artifacts.
As specified in goto/devserver-cache, implement the stage RPC
and replace existing calls download/symbol/wait_for_status/stage_images
with their respective stage calls.
I'm supporting the old rpc's by mapping them to stage for now until
all callers use the new RPC. Advantage of using the new rpc means
less being downloaded (only download what you need rather than
everything).
This is a very large code change, sadly I can't really break it up that
much. Fortunately it's mostly deleting old/dead code.
If you haven't read the doc yet, please read it before reviewing (
http://goto.google.com/devserver-cache ). You can
pretty much review all the new code stand-alone without looking at what
we are replacing.
BUG=chromium-os:38427
TEST=Pylint + Unittests + download, wait_for_status, stage separately
using different combinations. Still continuing testing of debug symbols
etc.
Change-Id: I201dcdaa8a14024247eca222a8f2e47deea464b9
Reviewed-on: https://gerrit.chromium.org/gerrit/42401
Reviewed-by: Chris Sosa <sosa@chromium.org>
Tested-by: Chris Sosa <sosa@chromium.org>
diff --git a/downloader.py b/downloader.py
index 95b0d7d..2567e92 100755
--- a/downloader.py
+++ b/downloader.py
@@ -1,66 +1,72 @@
-#!/usr/bin/python
-#
-# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import Queue
import os
-import shutil
-import tempfile
import threading
+import build_artifact
import common_util
import log_util
class Downloader(log_util.Loggable):
- """Download images to the devsever.
+ """Downloader of images to the devsever.
Given a URL to a build on the archive server:
+ - Caches that build and the given artifacts onto the devserver.
+ - May also initiate caching of related artifacts in the background.
- - Determine if the build already exists.
- - Download and extract the build to a staging directory.
- - Package autotest tests.
- - Install components to static dir.
+ Private class members:
+ archive_url: a URL where to download build artifacts from.
+ static_dir: local filesystem directory to store all artifacts.
+ build_dir: the local filesystem directory to store artifacts for the given
+ build defined by the archive_url.
"""
# This filename must be kept in sync with clean_staged_images.py
_TIMESTAMP_FILENAME = 'staged.timestamp'
- def __init__(self, static_dir):
+ def __init__(self, static_dir, archive_url):
+ super(Downloader, self).__init__()
+ self._archive_url = archive_url
self._static_dir = static_dir
- self._build_dir = None
- self._staging_dir = None
- self._status_queue = Queue.Queue(maxsize=1)
- self._lock_tag = None
+ self._build_dir = Downloader.GetBuildDir(static_dir, archive_url)
@staticmethod
def ParseUrl(archive_url):
- """Parse archive_url into rel_path and short_build
- e.g. gs://chromeos-image-archive/{rel_path}/{short_build}
+ """Parses archive_url into rel_path and build.
- @param archive_url: a URL at which build artifacts are archived.
- @return a tuple of (build relative path, short build name)
+ Parses archive_url into rel_path and build e.g.
+ gs://chromeos-image-archive/{rel_path}/{build}.
+
+ Args:
+ archive_url: a URL at which build artifacts are archived.
+
+ Returns:
+ A tuple of (build relative path, short build name)
"""
# The archive_url is of the form gs://server/[some_path/target]/...]/build
# This function discards 'gs://server/' and extracts the [some_path/target]
- # as rel_path and the build as short_build.
+ # as rel_path and the build as build.
sub_url = archive_url.partition('://')[2]
split_sub_url = sub_url.split('/')
rel_path = '/'.join(split_sub_url[1:-1])
- short_build = split_sub_url[-1]
- return rel_path, short_build
+ build = split_sub_url[-1]
+ return rel_path, build
@staticmethod
- def GenerateLockTag(rel_path, short_build):
- """Generate a name for a lock scoped to this rel_path/build pair.
+ def GetBuildDir(static_dir, archive_url):
+ """Returns the path to where the artifacts will be staged.
- @param rel_path: the relative path for the build.
- @param short_build: short build name
- @return a name to use with AcquireLock that will scope the lock.
+ Args:
+ static_dir: The base static dir that will be used.
+ archive_url: The gs path to the archive url.
"""
- return '/'.join([rel_path, short_build])
+ # Parse archive_url into rel_path (contains the build target) and
+ # build e.g. gs://chromeos-image-archive/{rel_path}/{build}.
+ rel_path, build = Downloader.ParseUrl(archive_url)
+ return os.path.join(static_dir, rel_path, build)
@staticmethod
def _TouchTimestampForStaged(directory_path):
@@ -69,397 +75,57 @@
with file(file_name, 'a'):
os.utime(file_name, None)
- @staticmethod
- def BuildStaged(archive_url, static_dir):
- """Returns True if the build is already staged."""
- rel_path, short_build = Downloader.ParseUrl(archive_url)
- sub_directory = Downloader.GenerateLockTag(rel_path, short_build)
- directory_path = os.path.join(static_dir, sub_directory)
- exists = os.path.isdir(directory_path)
- # If the build exists, then touch the timestamp to tell
- # clean_stages_images.py that we're using this build.
- if exists:
- Downloader._TouchTimestampForStaged(directory_path)
- return exists
+ def Download(self, artifacts):
+ """Downloads and caches the |artifacts|.
- def Download(self, archive_url, background=False):
- """Downloads the given build artifacts defined by the |archive_url|.
-
- If background is set to True, will return back early before all artifacts
- have been downloaded. The artifacts that can be backgrounded are all those
- that are not set as synchronous.
-
- TODO: refactor this into a common Download method, once unit tests are
- fixed up to make iterating on the code easier.
- """
- # Parse archive_url into rel_path (contains the build target) and
- # short_build.
- # e.g. gs://chromeos-image-archive/{rel_path}/{short_build}
- rel_path, short_build = self.ParseUrl(archive_url)
- # This should never happen. The Devserver should only try to call this
- # method if no previous downloads have been staged for this archive_url.
- assert not Downloader.BuildStaged(archive_url, self._static_dir)
- # Bind build_dir and staging_dir here so we can tell if we need to do any
- # cleanup after an exception occurs before build_dir is set.
- self._lock_tag = self.GenerateLockTag(rel_path, short_build)
- try:
- # Create Dev Server directory for this build and tell other Downloader
- # instances we have processed this build. Note that during normal
- # execution, this lock is only released in the actual downloading
- # procedure called below.
- self._build_dir = common_util.AcquireLock(
- static_dir=self._static_dir, tag=self._lock_tag)
-
- # Replace '/' with '_' in rel_path because it may contain multiple levels
- # which would not be qualified as part of the suffix.
- self._staging_dir = tempfile.mkdtemp(suffix='_'.join(
- [rel_path.replace('/', '_'), short_build]))
- Downloader._TouchTimestampForStaged(self._build_dir)
- self._Log('Gathering download requirements %s' % archive_url)
- artifacts = self.GatherArtifactDownloads(
- self._staging_dir, archive_url, self._build_dir, short_build)
- common_util.PrepareBuildDirectory(self._build_dir)
-
- self._Log('Downloading foreground artifacts from %s' % archive_url)
- background_artifacts = []
- for artifact in artifacts:
- if artifact.Synchronous():
- artifact.Download()
- artifact.Stage()
- else:
- background_artifacts.append(artifact)
-
- if background:
- self._DownloadArtifactsInBackground(background_artifacts)
- else:
- self._DownloadArtifactsSerially(background_artifacts)
-
- except Exception, e:
- # Release processing lock, which will remove build components directory
- # so future runs can retry.
- if self._build_dir:
- common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag,
- destroy=True)
-
- self._status_queue.put(e)
- self._Cleanup()
- raise
- return 'Success'
-
- def _Cleanup(self):
- """Cleans up the staging dir for this downloader instanfce."""
- if self._staging_dir:
- self._Log('Cleaning up staging directory %s' % self._staging_dir)
- shutil.rmtree(self._staging_dir)
-
- self._staging_dir = None
-
- def _DownloadArtifactsSerially(self, artifacts):
- """Simple function to download all the given artifacts serially."""
- self._Log('Downloading artifacts serially.')
- try:
- for artifact in artifacts:
- artifact.Download()
- artifact.Stage()
- except Exception, e:
- self._status_queue.put(e)
-
- # Release processing lock, which will remove build components directory
- # so future runs can retry.
- if self._build_dir:
- common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag,
- destroy=True)
- else:
- # Release processing lock, keeping directory intact.
- if self._build_dir:
- common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag)
- self._status_queue.put('Success')
- finally:
- self._Cleanup()
-
- def _DownloadArtifactsInBackground(self, artifacts):
- """Downloads |artifacts| in the background and signals when complete."""
- self._Log('Invoking background download of artifacts')
- thread = threading.Thread(target=self._DownloadArtifactsSerially,
- args=(artifacts,))
- thread.start()
-
- def GatherArtifactDownloads(self, main_staging_dir, archive_url, build_dir,
- short_build):
- """Wrapper around common_util.GatherArtifactDownloads().
-
- The wrapper allows mocking and overriding in derived classes.
- """
- return common_util.GatherArtifactDownloads(
- main_staging_dir, archive_url, build_dir, short_build)
-
- def GetStatusOfBackgroundDownloads(self):
- """Returns the status of the background downloads.
-
- This commands returns the status of the background downloads and blocks
- until a status is returned.
- """
- status = self._status_queue.get()
- # In case anyone else is calling.
- self._status_queue.put(status)
- # If someone is curious about the status of a build, then we should
- # probably keep it around for a bit longer.
- if self._build_dir and os.path.exists(self._build_dir):
- Downloader._TouchTimestampForStaged(self._build_dir)
- # It's possible we received an exception, if so, re-raise it here.
- if isinstance(status, Exception):
- raise status
-
- return status
-
-
-class SymbolDownloader(Downloader):
- """Download and stage debug symbols for a build on the devsever.
-
- Given a URL to a build on the archive server:
-
- - Determine if the build already exists.
- - Download and extract the debug symbols to a staging directory.
- - Install symbols to static dir.
- """
-
- _DONE_FLAG = 'done'
-
- @staticmethod
- def GenerateLockTag(rel_path, short_build):
- return '/'.join([rel_path, short_build, 'symbols'])
-
- def Download(self, archive_url, _background=False):
- """Downloads debug symbols for the build defined by the |archive_url|.
-
- The symbols will be downloaded synchronously
- """
- # Parse archive_url into rel_path (contains the build target) and
- # short_build.
- # e.g. gs://chromeos-image-archive/{rel_path}/{short_build}
- rel_path, short_build = self.ParseUrl(archive_url)
-
- # Bind build_dir and staging_dir here so we can tell if we need to do any
- # cleanup after an exception occurs before build_dir is set.
- self._lock_tag = self.GenerateLockTag(rel_path, short_build)
- if self.SymbolsStaged(archive_url, self._static_dir):
- self._Log('Symbols for build %s have already been staged.' %
- self._lock_tag)
- return 'Success'
-
- try:
- # Create Dev Server directory for this build and tell other Downloader
- # instances we have processed this build.
- self._build_dir = common_util.AcquireLock(
- static_dir=self._static_dir, tag=self._lock_tag)
-
- # Replace '/' with '_' in rel_path because it may contain multiple levels
- # which would not be qualified as part of the suffix.
- self._staging_dir = tempfile.mkdtemp(suffix='_'.join(
- [rel_path.replace('/', '_'), short_build]))
- self._Log('Downloading debug symbols from %s' % archive_url)
-
- [symbol_artifact] = self.GatherArtifactDownloads(
- self._staging_dir, archive_url, self._static_dir)
- symbol_artifact.Download()
- symbol_artifact.Stage()
- self.MarkSymbolsStaged()
-
- except Exception:
- # Release processing "lock", which will indicate to future runs that we
- # did not succeed, and so they should try again.
- if self._build_dir:
- common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag,
- destroy=True)
-
- raise
- else:
- # Release processing "lock", keeping directory intact.
- if self._build_dir:
- common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag)
- finally:
- self._Cleanup()
-
- return 'Success'
-
- def GatherArtifactDownloads(self, temp_download_dir, archive_url, static_dir,
- short_build=None):
- """Call SymbolDownloader-appropriate artifact gathering method.
-
- @param temp_download_dir: the tempdir into which we're downloading artifacts
- prior to staging them.
- @param archive_url: the google storage url of the bucket where the debug
- symbols for the desired build are stored.
- @param staging_dir: the dir into which to stage the symbols
- @param short_build: (ignored)
-
- @return an iterable of one DebugTarballBuildArtifact pointing to the right
- debug symbols. This is an iterable so that it's similar to
- GatherArtifactDownloads. Also, it's possible that someday we might
- have more than one.
- """
- return common_util.GatherSymbolArtifactDownloads(
- temp_download_dir, archive_url, static_dir)
-
- def MarkSymbolsStaged(self):
- """Puts a flag file on disk to signal that symbols are staged."""
- with open(os.path.join(self._build_dir, self._DONE_FLAG), 'w') as flag:
- flag.write(self._DONE_FLAG)
-
- def SymbolsStaged(self, archive_url, static_dir):
- """Returns True if the build is already staged."""
- rel_path, short_build = self.ParseUrl(archive_url)
- sub_directory = self.GenerateLockTag(rel_path, short_build)
- return os.path.isfile(os.path.join(static_dir,
- sub_directory,
- self._DONE_FLAG))
-
-
-class ImagesDownloader(Downloader):
- """Download and stage prebuilt images for a given build.
-
- Given a URL to a build on the archive server and a list of images:
- - Determine which images have not been staged yet.
- - Download the image archive.
- - Extract missing images to the staging directory.
-
- """
- _DONE_FLAG = 'staged'
-
- # List of images to be staged; empty (default) means all.
- _image_list = []
-
- # A mapping from test image types to their archived file names.
- _IMAGE_TO_FNAME = {
- 'test': 'chromiumos_test_image.bin',
- 'base': 'chromiumos_base_image.bin',
- 'recovery': 'recovery_image.bin',
- }
-
- @staticmethod
- def GenerateLockTag(rel_path, short_build):
- return os.path.join('images', rel_path, short_build)
-
- def Download(self, archive_url, image_list, _background=False):
- """Downloads images in |image_list| from the build defined by |archive_url|.
-
- Download happens synchronously. |images| may include any of those in
- self._IMAGE_TO_FNAME.keys().
-
- """
- # Check correctness of image list, remove duplicates.
- if not image_list:
- raise DevServerError('empty list of image types')
- invalid_images = list(set(image_list) - set(self._IMAGE_TO_FNAME.keys()))
- if invalid_images:
- raise DevServerError('invalid images requested: %s' % invalid_images)
- image_list = list(set(image_list))
-
- # Parse archive_url into rel_path (contains the build target) and
- # short_build.
- # e.g. gs://chromeos-image-archive/{rel_path}/{short_build}
- rel_path, short_build = self.ParseUrl(archive_url)
-
- # Bind build_dir and staging_dir here so we can tell if we need to do any
- # cleanup after an exception occurs before build_dir is set.
- self._lock_tag = self.GenerateLockTag(rel_path, short_build)
- staged_image_list = self._CheckStagedImages(archive_url, self._static_dir)
- unstaged_image_list = [image for image in image_list
- if image not in staged_image_list]
- if not unstaged_image_list:
- self._Log(
- 'All requested images (%s) for build %s have already been staged.' %
- (common_util.CommaSeparatedList(image_list, is_quoted=True)
- if image_list else 'none',
- self._lock_tag))
- return 'Success'
-
- self._Log(
- 'Image(s) %s for build %s will be staged' %
- (common_util.CommaSeparatedList(unstaged_image_list, is_quoted=True),
- self._lock_tag))
- self._image_list = unstaged_image_list
-
- try:
- # Create a static target directory and lock it for processing. We permit
- # the directory to preexist, as different images might be downloaded and
- # extracted at different times.
- self._build_dir = common_util.AcquireLock(
- static_dir=self._static_dir, tag=self._lock_tag,
- create_once=False)
-
- # Replace '/' with '_' in rel_path because it may contain multiple levels
- # which would not be qualified as part of the suffix.
- self._staging_dir = tempfile.mkdtemp(suffix='_'.join(
- [rel_path.replace('/', '_'), short_build]))
- self._Log('Downloading image archive from %s' % archive_url)
- dest_static_dir = os.path.join(self._static_dir, self._lock_tag)
- [image_archive_artifact] = self.GatherArtifactDownloads(
- self._staging_dir, archive_url, dest_static_dir)
- image_archive_artifact.Download()
- self._Log('Staging images to %s' % dest_static_dir)
- image_archive_artifact.Stage()
- self._MarkStagedImages(unstaged_image_list)
-
- except Exception:
- # Release processing "lock", which will indicate to future runs that we
- # did not succeed, and so they should try again.
- if self._build_dir:
- common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag,
- destroy=True)
- raise
- else:
- # Release processing "lock", keeping directory intact.
- if self._build_dir:
- common_util.ReleaseLock(static_dir=self._static_dir, tag=self._lock_tag)
- finally:
- self._Cleanup()
-
- return 'Success'
-
- def GatherArtifactDownloads(self, temp_download_dir, archive_url, static_dir,
- short_build=None):
- """Call appropriate artifact gathering method.
+ Downloads and caches the |artifacts|. Returns once these
+ are present on the devserver. A call to this will attempt to cache
+ non-specified artifacts in the background following the principle of
+ spatial locality.
Args:
- temp_download_dir: temporary directory for downloading artifacts to
- archive_url: URI to the bucket where image archive is stored
- staging_dir: directory into which to stage extracted images
- short_build: (ignored)
- Returns:
- list of downloadable artifacts (of type ZipfileBuildArtifact), currently
- containing a single object, configured for extracting a predetermined
- list of images
+ artifacts: a list of artifact names that correspond to artifacts to stage.
"""
- return common_util.GatherImageArchiveArtifactDownloads(
- temp_download_dir, archive_url, static_dir,
- [self._IMAGE_TO_FNAME[image] for image in self._image_list])
+ common_util.MkDirP(self._build_dir)
- def _MarkStagedImages(self, image_list):
- """Update the on-disk flag file with the list of newly staged images.
+ # We are doing some work on this build -- let's touch it to indicate that
+ # we shouldn't be cleaning it up anytime soon.
+ Downloader._TouchTimestampForStaged(self._build_dir)
- This does not check for duplicates against already listed images, and will
- add any listed images regardless.
+ # Create factory to create build_artifacts from artifact names.
+ build = self.ParseUrl(self._archive_url)[1]
+ factory = build_artifact.ArtifactFactory(self._build_dir, self._archive_url,
+ artifacts, build)
+ background_artifacts = factory.OptionalArtifacts()
+ if background_artifacts:
+ self._DownloadArtifactsInBackground(background_artifacts)
+ required_artifacts = factory.RequiredArtifacts()
+ str_repr = [str(a) for a in required_artifacts]
+ self._Log('Downloading artifacts %s.', ' '.join(str_repr))
+ self._DownloadArtifactsSerially(required_artifacts, no_wait=True)
+
+ def _DownloadArtifactsSerially(self, artifacts, no_wait):
+ """Simple function to download all the given artifacts serially.
+
+ Args:
+ artifacts: List of build_artifact.BuildArtifact instances to download.
+ no_wait: If True, don't block waiting for artifact to exist if we fail to
+ immediately find it.
"""
- flag_fname = os.path.join(self._build_dir, self._DONE_FLAG)
- with open(flag_fname, 'a') as flag_file:
- flag_file.writelines([image + '\n' for image in image_list])
+ for artifact in artifacts:
+ artifact.Process(no_wait)
- def _CheckStagedImages(self, archive_url, static_dir):
- """Returns a list of images that were already staged.
+ def _DownloadArtifactsInBackground(self, artifacts):
+ """Downloads |artifacts| in the background.
- Reads the list of images from a flag file, if one is present, and returns
- after removing duplicates.
+ Downloads |artifacts| in the background. As these are backgrounded
+ artifacts, they are done best effort and may not exist.
+ Args:
+ artifacts: List of build_artifact.BuildArtifact instances to download.
"""
- rel_path, short_build = self.ParseUrl(archive_url)
- sub_directory = self.GenerateLockTag(rel_path, short_build)
- flag_fname = os.path.join(static_dir, sub_directory, self._DONE_FLAG)
- staged_image_list = []
- # TODO(garnold) make this code immune to race conditions, probably by
- # acquiring a lock around the file access code.
- if os.path.isfile(flag_fname):
- with open(flag_fname) as flag_file:
- staged_image_list = [image.strip() for image in flag_file.readlines()]
- return list(set(staged_image_list))
+ self._Log('Invoking background download of artifacts for %r', artifacts)
+ thread = threading.Thread(target=self._DownloadArtifactsSerially,
+ args=(artifacts, False))
+ thread.start()