Implement stage RPC which allows user to specify artifacts.
As specified in goto/devserver-cache, implement the stage RPC
and replace existing calls download/symbol/wait_for_status/stage_images
with their respective stage calls.
I'm supporting the old rpc's by mapping them to stage for now until
all callers use the new RPC. Advantage of using the new rpc means
less being downloaded (only download what you need rather than
everything).
This is a very large code change, sadly I can't really break it up that
much. Fortunately it's mostly deleting old/dead code.
If you haven't read the doc yet, please read it before reviewing (
http://goto.google.com/devserver-cache ). You can
pretty much review all the new code stand-alone without looking at what
we are replacing.
BUG=chromium-os:38427
TEST=Pylint + Unittests + download, wait_for_status, stage separately
using different combinations. Still continuing testing of debug symbols
etc.
Change-Id: I201dcdaa8a14024247eca222a8f2e47deea464b9
Reviewed-on: https://gerrit.chromium.org/gerrit/42401
Reviewed-by: Chris Sosa <sosa@chromium.org>
Tested-by: Chris Sosa <sosa@chromium.org>
diff --git a/common_util.py b/common_util.py
index 55a95ff..ac7f093 100644
--- a/common_util.py
+++ b/common_util.py
@@ -10,15 +10,9 @@
import errno
import hashlib
import os
-import random
-import re
import shutil
-import time
+import threading
-import lockfile
-
-import build_artifact
-import gsutil_util
import log_util
@@ -27,405 +21,22 @@
return log_util.LogWithTag('UTIL', message, *args)
-AU_BASE = 'au'
-NTON_DIR_SUFFIX = '_nton'
-MTON_DIR_SUFFIX = '_mton'
-UPLOADED_LIST = 'UPLOADED'
-DEVSERVER_LOCK_FILE = 'devserver'
-
_HASH_BLOCK_SIZE = 8192
-def CommaSeparatedList(value_list, is_quoted=False):
- """Concatenates a list of strings.
-
- This turns ['a', 'b', 'c'] into a single string 'a, b and c'. It optionally
- adds quotes (`a') around each element. Used for logging.
-
- """
- if is_quoted:
- value_list = ["`" + value + "'" for value in value_list]
-
- if len(value_list) > 1:
- return (', '.join(value_list[:-1]) + ' and ' + value_list[-1])
- elif value_list:
- return value_list[0]
- else:
- return ''
-
class CommonUtilError(Exception):
"""Exception classes used by this module."""
pass
-def ParsePayloadList(archive_url, payload_list):
- """Parse and return the full, delta, and firmware payload URLs.
-
- Args:
- archive_url: The URL of the Google Storage bucket.
- payload_list: A list filenames.
-
- Returns:
- Tuple of 4 payload URLs: (full, nton, mton, firmware).
-
- Raises:
- CommonUtilError: If full payload is missing or invalid.
- """
- full_payload_url = None
- mton_payload_url = None
- nton_payload_url = None
- firmware_payload_url = None
-
- for payload in payload_list:
- if '_full_' in payload:
- full_payload_url = '/'.join([archive_url, payload])
- elif '_delta_' in payload:
- # e.g. chromeos_{from_version}_{to_version}_x86-generic_delta_dev.bin
- from_version, to_version = payload.split('_')[1:3]
- if from_version == to_version:
- nton_payload_url = '/'.join([archive_url, payload])
- else:
- mton_payload_url = '/'.join([archive_url, payload])
- elif build_artifact.FIRMWARE_ARCHIVE in payload:
- firmware_payload_url = '/'.join([archive_url, payload])
-
- if not full_payload_url:
- raise CommonUtilError(
- 'Full payload is missing or has unexpected name format.', payload_list)
-
- return (full_payload_url, nton_payload_url,
- mton_payload_url, firmware_payload_url)
-
-
-def IsAvailable(pattern_list, uploaded_list):
- """Checks whether the target artifacts we wait for are available.
-
- This method searches the uploaded_list for a match for every pattern
- in the pattern_list. It aborts and returns false if no filename
- matches a given pattern.
-
- Args:
- pattern_list: List of regular expression patterns to identify
- the target artifacts.
- uploaded_list: List of all uploaded files.
-
- Returns:
- True if there is a match for every pattern; false otherwise.
- """
-
- # Pre-compile the regular expression patterns
- compiled_patterns = []
- for p in pattern_list:
- compiled_patterns.append(re.compile(p))
-
- for pattern in compiled_patterns:
- found = False
- for filename in uploaded_list:
- if re.search(pattern, filename):
- found = True
- break
- if not found:
- return False
-
- return True
-
-
-def WaitUntilAvailable(to_wait_list, archive_url, err_str, timeout=600,
- delay=10):
- """Waits until all target artifacts are available in Google Storage or
- until the request times out.
-
- This method polls Google Storage until all target artifacts are
- available or until the timeout occurs. Because we may not know the
- exact name of the target artifacts, the method accepts to_wait_list, a
- list of filename patterns, to identify whether an artifact whose name
- matches the pattern exists (e.g. use pattern '_full_' to search for
- the full payload 'chromeos_R17-1413.0.0-a1_x86-mario_full_dev.bin').
-
- Args:
- to_wait_list: List of regular expression patterns to identify
- the target artifacts.
- archive_url: URL of the Google Storage bucket.
- err_str: String to display in the error message.
-
- Returns:
- The list of artifacts in the Google Storage bucket.
-
- Raises:
- CommonUtilError: If timeout occurs.
- """
-
- cmd = 'gsutil cat %s/%s' % (archive_url, UPLOADED_LIST)
- msg = 'Failed to get a list of uploaded files.'
-
- deadline = time.time() + timeout
- while time.time() < deadline:
- uploaded_list = []
- to_delay = delay + random.uniform(.5 * delay, 1.5 * delay)
- try:
- # Run "gsutil cat" to retrieve the list.
- uploaded_list = gsutil_util.GSUtilRun(cmd, msg).splitlines()
- except gsutil_util.GSUtilError:
- # For backward compatibility, fallling back to use "gsutil ls"
- # when the manifest file is not present.
- cmd = 'gsutil ls %s/*' % archive_url
- msg = 'Failed to list payloads.'
- payload_list = gsutil_util.GSUtilRun(cmd, msg).splitlines()
- for payload in payload_list:
- uploaded_list.append(payload.rsplit('/', 1)[1])
-
- # Check if all target artifacts are available.
- if IsAvailable(to_wait_list, uploaded_list):
- return uploaded_list
- _Log('Retrying in %f seconds...%s' % (to_delay, err_str))
- time.sleep(to_delay)
-
- raise CommonUtilError('Missing %s for %s.' % (err_str, archive_url))
-
-
-def GatherArtifactDownloads(main_staging_dir, archive_url, build_dir, build,
- timeout=600, delay=10):
- """Generates artifacts that we mean to download and install for autotest.
-
- This method generates the list of artifacts we will need for autotest. These
- artifacts are instances of build_artifact.BuildArtifact.
-
- Note, these artifacts can be downloaded asynchronously iff
- !artifact.Synchronous().
- """
-
- # Wait up to 10 minutes for the full payload to be uploaded because we
- # do not know the exact name of the full payload.
-
- # We also wait for 'autotest.tar' because we do not know what type of
- # autotest tarballs (tar or tar.bz2) is available
- # (crosbug.com/32312). This dependency can be removed once all
- # branches move to the new 'tar' format.
- to_wait_list = ['_full_', build_artifact.AUTOTEST_PACKAGE]
- err_str = 'full payload or autotest tarball'
- uploaded_list = WaitUntilAvailable(to_wait_list, archive_url, err_str,
- timeout=timeout, delay=delay)
-
- # First we gather the urls/paths for the update payloads.
- full_url, nton_url, mton_url, fw_url = ParsePayloadList(
- archive_url, uploaded_list)
-
- full_payload = os.path.join(build_dir, build_artifact.ROOT_UPDATE)
-
- artifacts = []
- artifacts.append(build_artifact.BuildArtifact(
- full_url, main_staging_dir, full_payload, synchronous=True))
-
- if nton_url:
- nton_payload = os.path.join(build_dir, AU_BASE, build + NTON_DIR_SUFFIX,
- build_artifact.ROOT_UPDATE)
- artifacts.append(build_artifact.AUTestPayloadBuildArtifact(
- nton_url, main_staging_dir, nton_payload))
-
- if mton_url:
- mton_payload = os.path.join(build_dir, AU_BASE, build + MTON_DIR_SUFFIX,
- build_artifact.ROOT_UPDATE)
- artifacts.append(build_artifact.AUTestPayloadBuildArtifact(
- mton_url, main_staging_dir, mton_payload))
-
- if fw_url:
- artifacts.append(build_artifact.BuildArtifact(
- fw_url, main_staging_dir, build_dir))
-
- # Gather information about autotest tarballs. Use autotest.tar if available.
- if build_artifact.AUTOTEST_PACKAGE in uploaded_list:
- autotest_url = '%s/%s' % (archive_url, build_artifact.AUTOTEST_PACKAGE)
- else:
- # Use autotest.tar.bz for backward compatibility. This can be
- # removed once all branches start using "autotest.tar"
- autotest_url = '%s/%s' % (
- archive_url, build_artifact.AUTOTEST_ZIPPED_PACKAGE)
-
- # Next we gather the miscellaneous payloads.
- stateful_url = archive_url + '/' + build_artifact.STATEFUL_UPDATE
- test_suites_url = (archive_url + '/' + build_artifact.TEST_SUITES_PACKAGE)
-
- stateful_payload = os.path.join(build_dir, build_artifact.STATEFUL_UPDATE)
-
- artifacts.append(build_artifact.BuildArtifact(
- stateful_url, main_staging_dir, stateful_payload, synchronous=True))
- artifacts.append(build_artifact.AutotestTarballBuildArtifact(
- autotest_url, main_staging_dir, build_dir))
- artifacts.append(build_artifact.TarballBuildArtifact(
- test_suites_url, main_staging_dir, build_dir, synchronous=True))
- return artifacts
-
-
-def GatherSymbolArtifactDownloads(temp_download_dir, archive_url, staging_dir,
- timeout=600, delay=10):
- """Generates debug symbol artifacts that we mean to download and stage.
-
- This method generates the list of artifacts we will need to
- symbolicate crash dumps that occur during autotest runs. These
- artifacts are instances of build_artifact.BuildArtifact.
-
- This will poll google storage until the debug symbol artifact becomes
- available, or until the 10 minute timeout is up.
-
- @param temp_download_dir: the tempdir into which we're downloading artifacts
- prior to staging them.
- @param archive_url: the google storage url of the bucket where the debug
- symbols for the desired build are stored.
- @param staging_dir: the dir into which to stage the symbols
-
- @return an iterable of one DebugTarballBuildArtifact pointing to the right
- debug symbols. This is an iterable so that it's similar to
- GatherArtifactDownloads. Also, it's possible that someday we might
- have more than one.
- """
-
- artifact_name = build_artifact.DEBUG_SYMBOLS
- WaitUntilAvailable([artifact_name], archive_url, 'debug symbols',
- timeout=timeout, delay=delay)
- artifact = build_artifact.DebugTarballBuildArtifact(
- archive_url + '/' + artifact_name,
- temp_download_dir,
- staging_dir)
- return [artifact]
-
-
-def GatherImageArchiveArtifactDownloads(temp_download_dir, archive_url,
- staging_dir, image_file_list,
- timeout=600, delay=10):
- """Generates image archive artifact(s) for downloading / staging.
-
- Generates the list of artifacts that are used for extracting Chrome OS images
- from. Currently, it returns a single artifact, which is a zipfile configured
- to extract a given list of images. It first polls Google Storage unti lthe
- desired artifacts become available (or a timeout expires).
-
- Args:
- temp_download_dir: temporary directory, used for downloading artifacts
- archive_url: URI to the bucket where the artifacts are stored
- staging_dir: directory into which to stage the extracted files
- image_file_list: list of image files to be extracted
- Returns:
- list of downloadable artifacts (of type ZipfileBuildArtifact), currently
- containing a single obejct
- """
-
- artifact_name = build_artifact.IMAGE_ARCHIVE
- WaitUntilAvailable([artifact_name], archive_url, 'image archive',
- timeout=timeout, delay=delay)
- artifact = build_artifact.ZipfileBuildArtifact(
- archive_url + '/' + artifact_name,
- temp_download_dir, staging_dir,
- unzip_file_list=image_file_list)
- return [artifact]
-
-
-def PrepareBuildDirectory(build_dir):
- """Preliminary staging of installation directory for build.
-
- Args:
- build_dir: Directory to install build components into.
- """
- if not os.path.isdir(build_dir):
- os.path.makedirs(build_dir)
-
- # Create blank chromiumos_test_image.bin. Otherwise the Dev Server will
- # try to rebuild it unnecessarily.
- test_image = os.path.join(build_dir, build_artifact.TEST_IMAGE)
- open(test_image, 'a').close()
-
-
-def SafeSandboxAccess(static_dir, path):
- """Verify that the path is in static_dir.
-
- Args:
- static_dir: Directory where builds are served from.
- path: Path to verify.
-
- Returns:
- True if path is in static_dir, False otherwise
- """
- static_dir = os.path.realpath(static_dir)
- path = os.path.realpath(path)
- return (path.startswith(static_dir) and path != static_dir)
-
-
-def AcquireLock(static_dir, tag, create_once=True):
- """Acquires a lock for a given tag.
-
- Creates a directory for the specified tag, and atomically creates a lock file
- in it. This tells other components the resource/task represented by the tag
- is unavailable.
-
- Args:
- static_dir: Directory where builds are served from.
- tag: Unique resource/task identifier. Use '/' for nested tags.
- create_once: Determines whether the directory must be freshly created; this
- preserves previous semantics of the lock acquisition.
-
- Returns:
- Path to the created directory or None if creation failed.
-
- Raises:
- CommonUtilError: If lock can't be acquired.
- """
- build_dir = os.path.join(static_dir, tag)
- if not SafeSandboxAccess(static_dir, build_dir):
- raise CommonUtilError('Invalid tag "%s".' % tag)
-
- # Create the directory.
- is_created = False
+def MkDirP(directory):
+ """Thread-safely create a directory like mkdir -p."""
try:
- os.makedirs(build_dir)
- is_created = True
+ os.makedirs(directory)
except OSError, e:
- if e.errno == errno.EEXIST:
- if create_once:
- raise CommonUtilError(str(e))
- else:
+ if not (e.errno == errno.EEXIST and os.path.isdir(directory)):
raise
- # Lock the directory.
- try:
- lock = lockfile.FileLock(os.path.join(build_dir, DEVSERVER_LOCK_FILE))
- lock.acquire(timeout=0)
- except lockfile.AlreadyLocked, e:
- raise CommonUtilError(str(e))
- except:
- # In any other case, remove the directory if we actually created it, so
- # that subsequent attempts won't fail to re-create it.
- if is_created:
- shutil.rmtree(build_dir)
- raise
-
- return build_dir
-
-
-def ReleaseLock(static_dir, tag, destroy=False):
- """Releases the lock for a given tag.
-
- Optionally, removes the locked directory entirely.
-
- Args:
- static_dir: Directory where builds are served from.
- tag: Unique resource/task identifier. Use '/' for nested tags.
- destroy: Determines whether the locked directory should be removed
- entirely.
-
- Raises:
- CommonUtilError: If lock can't be released.
- """
- build_dir = os.path.join(static_dir, tag)
- if not SafeSandboxAccess(static_dir, build_dir):
- raise CommonUtilError('Invalid tag "%s".' % tag)
-
- lock = lockfile.FileLock(os.path.join(build_dir, DEVSERVER_LOCK_FILE))
- try:
- lock.break_lock()
- if destroy:
- shutil.rmtree(build_dir)
- except Exception, e:
- raise CommonUtilError(str(e))
-
def GetLatestBuildVersion(static_dir, target, milestone=None):
"""Retrieves the latest build version for a given board.
@@ -463,6 +74,21 @@
return str(max(builds))
+def PathInDir(directory, path):
+ """Returns True if the path is in directory.
+
+ Args:
+ directory: Directory where the path should be in.
+ path: Path to check.
+
+ Returns:
+ True if path is in static_dir, False otherwise
+ """
+ directory = os.path.realpath(directory)
+ path = os.path.realpath(path)
+ return (path.startswith(directory) and len(path) != len(directory))
+
+
def GetControlFile(static_dir, build, control_path):
"""Attempts to pull the requested control file from the Dev Server.
@@ -481,7 +107,7 @@
control_path = control_path.lstrip('/')
control_path = os.path.join(static_dir, build, 'autotest',
control_path)
- if not SafeSandboxAccess(static_dir, control_path):
+ if not PathInDir(static_dir, control_path):
raise CommonUtilError('Invalid control file "%s".' % control_path)
if not os.path.exists(control_path):
@@ -507,7 +133,7 @@
String of each file separated by a newline.
"""
autotest_dir = os.path.join(static_dir, build, 'autotest/')
- if not SafeSandboxAccess(static_dir, autotest_dir):
+ if not PathInDir(static_dir, autotest_dir):
raise CommonUtilError('Autotest dir not in sandbox "%s".' % autotest_dir)
control_files = set()
@@ -593,3 +219,31 @@
"""Copies a file from |source| to |dest|."""
_Log('Copy File %s -> %s' % (source, dest))
shutil.copy(source, dest)
+
+
+class LockDict(object):
+ """A dictionary of locks.
+
+ This class provides a thread-safe store of threading.Lock objects, which can
+ be used to regulate access to any set of hashable resources. Usage:
+
+ foo_lock_dict = LockDict()
+ ...
+ with foo_lock_dict.lock('bar'):
+ # Critical section for 'bar'
+ """
+ def __init__(self):
+ self._lock = self._new_lock()
+ self._dict = {}
+
+ @staticmethod
+ def _new_lock():
+ return threading.Lock()
+
+ def lock(self, key):
+ with self._lock:
+ lock = self._dict.get(key)
+ if not lock:
+ lock = self._new_lock()
+ self._dict[key] = lock
+ return lock