blob: c5c17622226a9be3a1a76b75fecf5bd76c0f61d1 [file] [log] [blame]
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001#!/usr/bin/env python
2# Copyright (c) 2012 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00006"""Reads a .isolated, creates a tree of hardlinks and runs the test.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00007
8Keeps a local cache.
9"""
10
vadimsh@chromium.org87d63262013-04-04 19:34:21 +000011import cookielib
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000012import ctypes
13import hashlib
csharp@chromium.orga110d792013-01-07 16:16:16 +000014import httplib
maruel@chromium.orgedd25d02013-03-26 14:38:00 +000015import inspect
maruel@chromium.org2b2139a2013-04-30 20:14:58 +000016import itertools
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000017import json
csharp@chromium.orgbfb98742013-03-26 20:28:36 +000018import locale
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000019import logging
csharp@chromium.orgff2a4662012-11-21 20:49:32 +000020import logging.handlers
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000021import math
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000022import optparse
23import os
24import Queue
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000025import random
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000026import re
27import shutil
28import stat
29import subprocess
30import sys
31import tempfile
32import threading
33import time
maruel@chromium.org97cd0be2013-03-13 14:01:36 +000034import traceback
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000035import urllib
csharp@chromium.orga92403f2012-11-20 15:13:59 +000036import urllib2
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000037import urlparse
csharp@chromium.orga92403f2012-11-20 15:13:59 +000038import zlib
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000039
vadimsh@chromium.org87d63262013-04-04 19:34:21 +000040# Try to import 'upload' module used by AppEngineService for authentication.
41# If it is not there, app engine authentication support will be disabled.
42try:
43 from third_party import upload
44 # Hack out upload logging.info()
45 upload.logging = logging.getLogger('upload')
46 # Mac pylint choke on this line.
47 upload.logging.setLevel(logging.WARNING) # pylint: disable=E1103
48except ImportError:
49 upload = None
50
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000051
maruel@chromium.org6b365dc2012-10-18 19:17:56 +000052# Types of action accepted by link_file().
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000053HARDLINK, SYMLINK, COPY = range(1, 4)
54
55RE_IS_SHA1 = re.compile(r'^[a-fA-F0-9]{40}$')
56
csharp@chromium.org8dc52542012-11-08 20:29:55 +000057# The file size to be used when we don't know the correct file size,
58# generally used for .isolated files.
59UNKNOWN_FILE_SIZE = None
60
csharp@chromium.orga92403f2012-11-20 15:13:59 +000061# The size of each chunk to read when downloading and unzipping files.
62ZIPPED_FILE_CHUNK = 16 * 1024
63
csharp@chromium.orgff2a4662012-11-21 20:49:32 +000064# The name of the log file to use.
65RUN_ISOLATED_LOG_FILE = 'run_isolated.log'
66
csharp@chromium.orge217f302012-11-22 16:51:53 +000067# The base directory containing this file.
68BASE_DIR = os.path.dirname(os.path.abspath(__file__))
69
70# The name of the log to use for the run_test_cases.py command
71RUN_TEST_CASES_LOG = os.path.join(BASE_DIR, 'run_test_cases.log')
72
csharp@chromium.org9c59ff12012-12-12 02:32:29 +000073# The delay (in seconds) to wait between logging statements when retrieving
74# the required files. This is intended to let the user (or buildbot) know that
75# the program is still running.
76DELAY_BETWEEN_UPDATES_IN_SECS = 30
77
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +000078# Maximum expected delay (in seconds) between successive file fetches
79# in run_tha_test. If it takes longer than that, a deadlock might be happening
80# and all stack frames for all threads are dumped to log.
81DEADLOCK_TIMEOUT = 5 * 60
82
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000083# The name of the key to store the count of url attempts.
84COUNT_KEY = 'UrlOpenAttempt'
85
maruel@chromium.org2b2139a2013-04-30 20:14:58 +000086# Default maximum number of attempts to trying opening a url before aborting.
87URL_OPEN_MAX_ATTEMPTS = 30
88# Default timeout when retrying.
89URL_OPEN_TIMEOUT = 6*60.
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000090
vadimsh@chromium.org87d63262013-04-04 19:34:21 +000091# Global (for now) map: server URL (http://example.com) -> HttpService instance.
92# Used by get_http_service to cache HttpService instances.
93_http_services = {}
94_http_services_lock = threading.Lock()
95
maruel@chromium.org9e9ceaa2013-04-05 15:42:42 +000096# Used by get_flavor().
97FLAVOR_MAPPING = {
98 'cygwin': 'win',
99 'win32': 'win',
100 'darwin': 'mac',
101 'sunos5': 'solaris',
102 'freebsd7': 'freebsd',
103 'freebsd8': 'freebsd',
104}
105
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000106
107class ConfigError(ValueError):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +0000108 """Generic failure to load a .isolated file."""
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000109 pass
110
111
112class MappingError(OSError):
113 """Failed to recreate the tree."""
114 pass
115
116
117def get_flavor():
118 """Returns the system default flavor. Copied from gyp/pylib/gyp/common.py."""
maruel@chromium.org9e9ceaa2013-04-05 15:42:42 +0000119 return FLAVOR_MAPPING.get(sys.platform, 'linux')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000120
121
csharp@chromium.orgbfb98742013-03-26 20:28:36 +0000122def fix_default_encoding():
123 """Forces utf8 solidly on all platforms.
124
125 By default python execution environment is lazy and defaults to ascii
126 encoding.
127
128 http://uucode.com/blog/2007/03/23/shut-up-you-dummy-7-bit-python/
129 """
130 if sys.getdefaultencoding() == 'utf-8':
131 return False
132
133 # Regenerate setdefaultencoding.
134 reload(sys)
135 # Module 'sys' has no 'setdefaultencoding' member
136 # pylint: disable=E1101
137 sys.setdefaultencoding('utf-8')
138 for attr in dir(locale):
139 if attr[0:3] != 'LC_':
140 continue
141 aref = getattr(locale, attr)
142 try:
143 locale.setlocale(aref, '')
144 except locale.Error:
145 continue
146 try:
147 lang = locale.getlocale(aref)[0]
148 except (TypeError, ValueError):
149 continue
150 if lang:
151 try:
152 locale.setlocale(aref, (lang, 'UTF-8'))
153 except locale.Error:
154 os.environ[attr] = lang + '.UTF-8'
155 try:
156 locale.setlocale(locale.LC_ALL, '')
157 except locale.Error:
158 pass
159 return True
160
161
maruel@chromium.org46e61cc2013-03-25 19:55:34 +0000162class Unbuffered(object):
163 """Disable buffering on a file object."""
164 def __init__(self, stream):
165 self.stream = stream
166
167 def write(self, data):
168 self.stream.write(data)
169 if '\n' in data:
170 self.stream.flush()
171
172 def __getattr__(self, attr):
173 return getattr(self.stream, attr)
174
175
176def disable_buffering():
177 """Makes this process and child processes stdout unbuffered."""
178 if not os.environ.get('PYTHONUNBUFFERED'):
179 # Since sys.stdout is a C++ object, it's impossible to do
180 # sys.stdout.write = lambda...
181 sys.stdout = Unbuffered(sys.stdout)
182 os.environ['PYTHONUNBUFFERED'] = 'x'
183
184
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000185def os_link(source, link_name):
186 """Add support for os.link() on Windows."""
187 if sys.platform == 'win32':
188 if not ctypes.windll.kernel32.CreateHardLinkW(
189 unicode(link_name), unicode(source), 0):
190 raise OSError()
191 else:
192 os.link(source, link_name)
193
194
195def readable_copy(outfile, infile):
196 """Makes a copy of the file that is readable by everyone."""
csharp@chromium.org59d116d2013-07-05 18:04:08 +0000197 shutil.copy2(infile, outfile)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000198 read_enabled_mode = (os.stat(outfile).st_mode | stat.S_IRUSR |
199 stat.S_IRGRP | stat.S_IROTH)
200 os.chmod(outfile, read_enabled_mode)
201
202
203def link_file(outfile, infile, action):
204 """Links a file. The type of link depends on |action|."""
205 logging.debug('Mapping %s to %s' % (infile, outfile))
206 if action not in (HARDLINK, SYMLINK, COPY):
207 raise ValueError('Unknown mapping action %s' % action)
208 if not os.path.isfile(infile):
209 raise MappingError('%s is missing' % infile)
210 if os.path.isfile(outfile):
211 raise MappingError(
212 '%s already exist; insize:%d; outsize:%d' %
213 (outfile, os.stat(infile).st_size, os.stat(outfile).st_size))
214
215 if action == COPY:
216 readable_copy(outfile, infile)
217 elif action == SYMLINK and sys.platform != 'win32':
218 # On windows, symlink are converted to hardlink and fails over to copy.
maruel@chromium.orgf43e68b2012-10-15 20:23:10 +0000219 os.symlink(infile, outfile) # pylint: disable=E1101
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000220 else:
221 try:
222 os_link(infile, outfile)
223 except OSError:
224 # Probably a different file system.
maruel@chromium.org9e98e432013-05-31 17:06:51 +0000225 logging.warning(
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000226 'Failed to hardlink, failing back to copy %s to %s' % (
227 infile, outfile))
228 readable_copy(outfile, infile)
229
230
231def _set_write_bit(path, read_only):
232 """Sets or resets the executable bit on a file or directory."""
233 mode = os.lstat(path).st_mode
234 if read_only:
235 mode = mode & 0500
236 else:
237 mode = mode | 0200
238 if hasattr(os, 'lchmod'):
239 os.lchmod(path, mode) # pylint: disable=E1101
240 else:
241 if stat.S_ISLNK(mode):
242 # Skip symlink without lchmod() support.
243 logging.debug('Can\'t change +w bit on symlink %s' % path)
244 return
245
246 # TODO(maruel): Implement proper DACL modification on Windows.
247 os.chmod(path, mode)
248
249
250def make_writable(root, read_only):
251 """Toggle the writable bit on a directory tree."""
csharp@chromium.org837352f2013-01-17 21:17:03 +0000252 assert os.path.isabs(root), root
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000253 for dirpath, dirnames, filenames in os.walk(root, topdown=True):
254 for filename in filenames:
255 _set_write_bit(os.path.join(dirpath, filename), read_only)
256
257 for dirname in dirnames:
258 _set_write_bit(os.path.join(dirpath, dirname), read_only)
259
260
261def rmtree(root):
262 """Wrapper around shutil.rmtree() to retry automatically on Windows."""
263 make_writable(root, False)
264 if sys.platform == 'win32':
265 for i in range(3):
266 try:
267 shutil.rmtree(root)
268 break
269 except WindowsError: # pylint: disable=E0602
270 delay = (i+1)*2
271 print >> sys.stderr, (
272 'The test has subprocess outliving it. Sleep %d seconds.' % delay)
273 time.sleep(delay)
274 else:
275 shutil.rmtree(root)
276
277
278def is_same_filesystem(path1, path2):
279 """Returns True if both paths are on the same filesystem.
280
281 This is required to enable the use of hardlinks.
282 """
283 assert os.path.isabs(path1), path1
284 assert os.path.isabs(path2), path2
285 if sys.platform == 'win32':
286 # If the drive letter mismatches, assume it's a separate partition.
287 # TODO(maruel): It should look at the underlying drive, a drive letter could
288 # be a mount point to a directory on another drive.
289 assert re.match(r'^[a-zA-Z]\:\\.*', path1), path1
290 assert re.match(r'^[a-zA-Z]\:\\.*', path2), path2
291 if path1[0].lower() != path2[0].lower():
292 return False
293 return os.stat(path1).st_dev == os.stat(path2).st_dev
294
295
296def get_free_space(path):
297 """Returns the number of free bytes."""
298 if sys.platform == 'win32':
299 free_bytes = ctypes.c_ulonglong(0)
300 ctypes.windll.kernel32.GetDiskFreeSpaceExW(
301 ctypes.c_wchar_p(path), None, None, ctypes.pointer(free_bytes))
302 return free_bytes.value
maruel@chromium.orgf43e68b2012-10-15 20:23:10 +0000303 # For OSes other than Windows.
304 f = os.statvfs(path) # pylint: disable=E1101
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000305 return f.f_bfree * f.f_frsize
306
307
308def make_temp_dir(prefix, root_dir):
309 """Returns a temporary directory on the same file system as root_dir."""
310 base_temp_dir = None
311 if not is_same_filesystem(root_dir, tempfile.gettempdir()):
312 base_temp_dir = os.path.dirname(root_dir)
313 return tempfile.mkdtemp(prefix=prefix, dir=base_temp_dir)
314
315
frankf@chromium.org3348ee02013-06-27 14:53:17 +0000316def load_isolated(content, os_flavor=None):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +0000317 """Verifies the .isolated file is valid and loads this object with the json
318 data.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000319 """
320 try:
321 data = json.loads(content)
322 except ValueError:
323 raise ConfigError('Failed to parse: %s...' % content[:100])
324
325 if not isinstance(data, dict):
326 raise ConfigError('Expected dict, got %r' % data)
327
328 for key, value in data.iteritems():
329 if key == 'command':
330 if not isinstance(value, list):
331 raise ConfigError('Expected list, got %r' % value)
maruel@chromium.org89ad2db2012-12-12 14:29:22 +0000332 if not value:
333 raise ConfigError('Expected non-empty command')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000334 for subvalue in value:
335 if not isinstance(subvalue, basestring):
336 raise ConfigError('Expected string, got %r' % subvalue)
337
338 elif key == 'files':
339 if not isinstance(value, dict):
340 raise ConfigError('Expected dict, got %r' % value)
341 for subkey, subvalue in value.iteritems():
342 if not isinstance(subkey, basestring):
343 raise ConfigError('Expected string, got %r' % subkey)
344 if not isinstance(subvalue, dict):
345 raise ConfigError('Expected dict, got %r' % subvalue)
346 for subsubkey, subsubvalue in subvalue.iteritems():
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000347 if subsubkey == 'l':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000348 if not isinstance(subsubvalue, basestring):
349 raise ConfigError('Expected string, got %r' % subsubvalue)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000350 elif subsubkey == 'm':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000351 if not isinstance(subsubvalue, int):
352 raise ConfigError('Expected int, got %r' % subsubvalue)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000353 elif subsubkey == 'h':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000354 if not RE_IS_SHA1.match(subsubvalue):
355 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000356 elif subsubkey == 's':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000357 if not isinstance(subsubvalue, int):
358 raise ConfigError('Expected int, got %r' % subsubvalue)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000359 else:
360 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000361 if bool('h' in subvalue) and bool('l' in subvalue):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000362 raise ConfigError(
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000363 'Did not expect both \'h\' (sha-1) and \'l\' (link), got: %r' %
364 subvalue)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000365
366 elif key == 'includes':
367 if not isinstance(value, list):
368 raise ConfigError('Expected list, got %r' % value)
maruel@chromium.org89ad2db2012-12-12 14:29:22 +0000369 if not value:
370 raise ConfigError('Expected non-empty includes list')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000371 for subvalue in value:
372 if not RE_IS_SHA1.match(subvalue):
373 raise ConfigError('Expected sha-1, got %r' % subvalue)
374
375 elif key == 'read_only':
376 if not isinstance(value, bool):
377 raise ConfigError('Expected bool, got %r' % value)
378
379 elif key == 'relative_cwd':
380 if not isinstance(value, basestring):
381 raise ConfigError('Expected string, got %r' % value)
382
383 elif key == 'os':
frankf@chromium.org3348ee02013-06-27 14:53:17 +0000384 expected_value = os_flavor or get_flavor()
385 if value != expected_value:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000386 raise ConfigError(
387 'Expected \'os\' to be \'%s\' but got \'%s\'' %
frankf@chromium.org3348ee02013-06-27 14:53:17 +0000388 (expected_value, value))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000389
390 else:
391 raise ConfigError('Unknown key %s' % key)
392
393 return data
394
395
396def fix_python_path(cmd):
397 """Returns the fixed command line to call the right python executable."""
398 out = cmd[:]
399 if out[0] == 'python':
400 out[0] = sys.executable
401 elif out[0].endswith('.py'):
402 out.insert(0, sys.executable)
403 return out
404
405
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000406def url_open(url, **kwargs):
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000407 """Attempts to open the given url multiple times.
408
409 |data| can be either:
410 -None for a GET request
411 -str for pre-encoded data
412 -list for data to be encoded
413 -dict for data to be encoded (COUNT_KEY will be added in this case)
414
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000415 Returns a file-like object, where the response may be read from, or None
416 if it was unable to connect.
417 """
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000418 urlhost, urlpath = split_server_request_url(url)
419 service = get_http_service(urlhost)
420 return service.request(urlpath, **kwargs)
421
422
423def split_server_request_url(url):
424 """Splits the url into scheme+netloc and path+params+query+fragment."""
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000425 url_parts = list(urlparse.urlparse(url))
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000426 urlhost = '%s://%s' % (url_parts[0], url_parts[1])
427 urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
428 return urlhost, urlpath
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000429
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000430
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000431def get_http_service(urlhost):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000432 """Returns existing or creates new instance of HttpService that can send
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000433 requests to given base urlhost.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000434 """
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000435 # Ensure consistency.
436 urlhost = str(urlhost).lower().rstrip('/')
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000437 with _http_services_lock:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000438 service = _http_services.get(urlhost)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000439 if not service:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000440 service = AppEngineService(urlhost)
441 _http_services[urlhost] = service
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000442 return service
443
444
445class HttpService(object):
446 """Base class for a class that provides an API to HTTP based service:
447 - Provides 'request' method.
448 - Supports automatic request retries.
449 - Supports persistent cookies.
450 - Thread safe.
451 """
452
453 # File to use to store all auth cookies.
maruel@chromium.org16452a32013-04-05 00:18:44 +0000454 COOKIE_FILE = os.path.join('~', '.isolated_cookies')
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000455
456 # CookieJar reused by all services + lock that protects its instantiation.
457 _cookie_jar = None
458 _cookie_jar_lock = threading.Lock()
459
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000460 def __init__(self, urlhost):
461 self.urlhost = urlhost
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000462 self.cookie_jar = self.load_cookie_jar()
463 self.opener = self.create_url_opener()
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000464
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000465 def authenticate(self): # pylint: disable=R0201
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000466 """Called when HTTP server asks client to authenticate.
467 Can be implemented in subclasses.
468 """
469 return False
470
471 @staticmethod
472 def load_cookie_jar():
473 """Returns global CoookieJar object that stores cookies in the file."""
474 with HttpService._cookie_jar_lock:
475 if HttpService._cookie_jar is not None:
476 return HttpService._cookie_jar
477 jar = ThreadSafeCookieJar(os.path.expanduser(HttpService.COOKIE_FILE))
478 jar.load()
479 HttpService._cookie_jar = jar
480 return jar
481
482 @staticmethod
483 def save_cookie_jar():
484 """Called when cookie jar needs to be flushed to disk."""
485 with HttpService._cookie_jar_lock:
486 if HttpService._cookie_jar is not None:
487 HttpService._cookie_jar.save()
488
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000489 def create_url_opener(self): # pylint: disable=R0201
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000490 """Returns OpenerDirector that will be used when sending requests.
491 Can be reimplemented in subclasses."""
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000492 return urllib2.build_opener(urllib2.HTTPCookieProcessor(self.cookie_jar))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000493
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000494 def request(self, urlpath, data=None, content_type=None, **kwargs):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000495 """Attempts to open the given url multiple times.
496
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000497 |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000498
499 |data| can be either:
500 -None for a GET request
501 -str for pre-encoded data
502 -list for data to be encoded
503 -dict for data to be encoded (COUNT_KEY will be added in this case)
504
505 Returns a file-like object, where the response may be read from, or None
506 if it was unable to connect.
507 """
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000508 assert urlpath and urlpath[0] == '/'
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000509
510 if isinstance(data, dict) and COUNT_KEY in data:
511 logging.error('%s already existed in the data passed into UlrOpen. It '
512 'would be overwritten. Aborting UrlOpen', COUNT_KEY)
513 return None
514
515 method = 'GET' if data is None else 'POST'
516 assert not ((method != 'POST') and content_type), (
517 'Can\'t use content_type on GET')
518
519 def make_request(extra):
520 """Returns a urllib2.Request instance for this specific retry."""
521 if isinstance(data, str) or data is None:
522 payload = data
523 else:
524 if isinstance(data, dict):
525 payload = data.items()
526 else:
527 payload = data[:]
528 payload.extend(extra.iteritems())
529 payload = urllib.urlencode(payload)
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000530 new_url = urlparse.urljoin(self.urlhost, urlpath[1:])
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000531 if isinstance(data, str) or data is None:
532 # In these cases, add the extra parameter to the query part of the url.
533 url_parts = list(urlparse.urlparse(new_url))
534 # Append the query parameter.
535 if url_parts[4] and extra:
536 url_parts[4] += '&'
537 url_parts[4] += urllib.urlencode(extra)
538 new_url = urlparse.urlunparse(url_parts)
539 request = urllib2.Request(new_url, data=payload)
540 if payload is not None:
541 if content_type:
542 request.add_header('Content-Type', content_type)
543 request.add_header('Content-Length', len(payload))
544 return request
545
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000546 return self._retry_loop(make_request, **kwargs)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000547
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000548 def _retry_loop(
549 self,
550 make_request,
551 max_attempts=URL_OPEN_MAX_ATTEMPTS,
552 retry_404=False,
553 retry_50x=True,
554 timeout=URL_OPEN_TIMEOUT):
555 """Runs internal request-retry loop.
556
557 - Optionally retries HTTP 404 and 50x.
558 - Retries up to |max_attempts| times. If None or 0, there's no limit in the
559 number of retries.
560 - Retries up to |timeout| duration in seconds. If None or 0, there's no
561 limit in the time taken to do retries.
562 - If both |max_attempts| and |timeout| are None or 0, this functions retries
563 indefinitely.
564 """
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000565 authenticated = False
566 last_error = None
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000567 attempt = 0
568 start = self._now()
569 for attempt in itertools.count():
570 if max_attempts and attempt >= max_attempts:
571 # Too many attempts.
572 break
573 if timeout and (self._now() - start) >= timeout:
574 # Retried for too long.
575 break
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000576 extra = {COUNT_KEY: attempt} if attempt else {}
577 request = make_request(extra)
578 try:
579 url_response = self._url_open(request)
580 logging.debug('url_open(%s) succeeded', request.get_full_url())
581 return url_response
582 except urllib2.HTTPError as e:
583 # Unauthorized. Ask to authenticate and then try again.
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000584 if e.code in (401, 403):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000585 # Try to authenticate only once. If it doesn't help, then server does
586 # not support app engine authentication.
vadimsh@chromium.orga1697342013-04-10 22:57:09 +0000587 logging.error(
vadimsh@chromium.orgdde2d732013-04-10 21:12:52 +0000588 'Authentication is required for %s on attempt %d.\n%s',
589 request.get_full_url(), attempt,
590 self._format_exception(e, verbose=True))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000591 if not authenticated and self.authenticate():
592 authenticated = True
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000593 # Do not sleep.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000594 continue
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000595 # If authentication failed, return.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000596 logging.error(
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000597 'Unable to authenticate to %s.\n%s',
598 request.get_full_url(), self._format_exception(e, verbose=True))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000599 return None
600
maruel@chromium.orgd58bf5b2013-04-26 17:57:42 +0000601 if ((e.code < 500 and not (retry_404 and e.code == 404)) or
602 (e.code >= 500 and not retry_50x)):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000603 # This HTTPError means we reached the server and there was a problem
604 # with the request, so don't retry.
605 logging.error(
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000606 'Able to connect to %s but an exception was thrown.\n%s',
607 request.get_full_url(), self._format_exception(e, verbose=True))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000608 return None
609
610 # The HTTPError was due to a server error, so retry the attempt.
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000611 logging.warning('Able to connect to %s on attempt %d.\n%s',
612 request.get_full_url(), attempt,
613 self._format_exception(e))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000614 last_error = e
615
616 except (urllib2.URLError, httplib.HTTPException) as e:
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000617 logging.warning('Unable to open url %s on attempt %d.\n%s',
618 request.get_full_url(), attempt,
619 self._format_exception(e))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000620 last_error = e
621
622 # Only sleep if we are going to try again.
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000623 if max_attempts and attempt != max_attempts:
624 remaining = None
625 if timeout:
626 remaining = timeout - (self._now() - start)
627 if remaining <= 0:
628 break
629 self.sleep_before_retry(attempt, remaining)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000630
631 logging.error('Unable to open given url, %s, after %d attempts.\n%s',
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000632 request.get_full_url(), max_attempts,
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000633 self._format_exception(last_error, verbose=True))
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000634 return None
635
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000636 def _url_open(self, request):
637 """Low level method to execute urllib2.Request's.
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000638
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000639 To be mocked in tests.
640 """
641 return self.opener.open(request)
maruel@chromium.orgef333122013-03-12 20:36:40 +0000642
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000643 @staticmethod
644 def _now():
645 """To be mocked in tests."""
646 return time.time()
647
648 @staticmethod
649 def calculate_sleep_before_retry(attempt, max_duration):
650 # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
651 # survive.
652 MAX_SLEEP = 10.
653 # random.random() returns [0.0, 1.0). Starts with relatively short waiting
654 # time by starting with 1.5/2+1.5^-1 median offset.
655 duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
656 assert duration > 0.1
657 duration = min(MAX_SLEEP, duration)
658 if max_duration:
659 duration = min(max_duration, duration)
660 return duration
661
662 @classmethod
663 def sleep_before_retry(cls, attempt, max_duration):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000664 """Sleeps for some amount of time when retrying the request.
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000665
666 To be mocked in tests.
667 """
668 time.sleep(cls.calculate_sleep_before_retry(attempt, max_duration))
maruel@chromium.orgef333122013-03-12 20:36:40 +0000669
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000670 @staticmethod
671 def _format_exception(exc, verbose=False):
672 """Given an instance of some exception raised by urlopen returns human
673 readable piece of text with detailed information about the error.
674 """
675 out = ['Exception: %s' % (exc,)]
676 if verbose:
677 if isinstance(exc, urllib2.HTTPError):
678 out.append('-' * 10)
679 if exc.hdrs:
680 for header, value in exc.hdrs.items():
681 if not header.startswith('x-'):
682 out.append('%s: %s' % (header.capitalize(), value))
683 out.append('')
684 out.append(exc.read() or '<empty body>')
685 out.append('-' * 10)
686 return '\n'.join(out)
687
maruel@chromium.orgef333122013-03-12 20:36:40 +0000688
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000689class AppEngineService(HttpService):
690 """This class implements authentication support for
691 an app engine based services.
maruel@chromium.orgef333122013-03-12 20:36:40 +0000692 """
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000693
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000694 # This lock ensures that user won't be confused with multiple concurrent
695 # login prompts.
696 _auth_lock = threading.Lock()
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000697
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000698 def __init__(self, urlhost, email=None, password=None):
699 super(AppEngineService, self).__init__(urlhost)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000700 self.email = email
701 self.password = password
702 self._keyring = None
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000703
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000704 def authenticate(self):
705 """Authenticates in the app engine application.
706 Returns True on success.
707 """
708 if not upload:
vadimsh@chromium.orga1697342013-04-10 22:57:09 +0000709 logging.error('\'upload\' module is missing, '
710 'app engine authentication is disabled.')
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000711 return False
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000712 cookie_jar = self.cookie_jar
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000713 save_cookie_jar = self.save_cookie_jar
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000714 # RPC server that uses AuthenticationSupport's cookie jar.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000715 class AuthServer(upload.AbstractRpcServer):
716 def _GetOpener(self):
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000717 # Authentication code needs to know about 302 response.
718 # So make OpenerDirector without HTTPRedirectHandler.
719 opener = urllib2.OpenerDirector()
720 opener.add_handler(urllib2.ProxyHandler())
721 opener.add_handler(urllib2.UnknownHandler())
722 opener.add_handler(urllib2.HTTPHandler())
723 opener.add_handler(urllib2.HTTPDefaultErrorHandler())
724 opener.add_handler(urllib2.HTTPSHandler())
725 opener.add_handler(urllib2.HTTPErrorProcessor())
726 opener.add_handler(urllib2.HTTPCookieProcessor(cookie_jar))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000727 return opener
728 def PerformAuthentication(self):
729 self._Authenticate()
730 save_cookie_jar()
731 return self.authenticated
732 with AppEngineService._auth_lock:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000733 rpc_server = AuthServer(self.urlhost, self.get_credentials)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000734 return rpc_server.PerformAuthentication()
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000735
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000736 def get_credentials(self):
737 """Called during authentication process to get the credentials.
738 May be called mutliple times if authentication fails.
739 Returns tuple (email, password).
740 """
741 # 'authenticate' calls this only if 'upload' is present.
742 # Ensure other callers (if any) fail non-cryptically if 'upload' is missing.
743 assert upload, '\'upload\' module is required for this to work'
744 if self.email and self.password:
745 return (self.email, self.password)
746 if not self._keyring:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000747 self._keyring = upload.KeyringCreds(self.urlhost,
748 self.urlhost,
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000749 self.email)
750 return self._keyring.GetUserCredentials()
751
752
753class ThreadSafeCookieJar(cookielib.MozillaCookieJar):
754 """MozillaCookieJar with thread safe load and save."""
755
756 def load(self, filename=None, ignore_discard=False, ignore_expires=False):
757 """Loads cookies from the file if it exists."""
maruel@chromium.org4e2676d2013-06-06 18:39:48 +0000758 filename = os.path.expanduser(filename or self.filename)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000759 with self._cookies_lock:
760 if os.path.exists(filename):
761 try:
762 cookielib.MozillaCookieJar.load(self, filename,
763 ignore_discard,
764 ignore_expires)
765 logging.debug('Loaded cookies from %s', filename)
766 except (cookielib.LoadError, IOError):
767 pass
768 else:
maruel@chromium.org16452a32013-04-05 00:18:44 +0000769 try:
770 fd = os.open(filename, os.O_CREAT, 0600)
771 os.close(fd)
772 except OSError:
773 logging.error('Failed to create %s', filename)
774 try:
775 os.chmod(filename, 0600)
776 except OSError:
777 logging.error('Failed to fix mode for %s', filename)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000778
779 def save(self, filename=None, ignore_discard=False, ignore_expires=False):
780 """Saves cookies to the file, completely overwriting it."""
781 logging.debug('Saving cookies to %s', filename or self.filename)
782 with self._cookies_lock:
maruel@chromium.org16452a32013-04-05 00:18:44 +0000783 try:
784 cookielib.MozillaCookieJar.save(self, filename,
785 ignore_discard,
786 ignore_expires)
787 except OSError:
788 logging.error('Failed to save %s', filename)
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000789
790
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000791class ThreadPoolError(Exception):
792 """Base class for exceptions raised by ThreadPool."""
793
794
795class ThreadPoolEmpty(ThreadPoolError):
796 """Trying to get task result from a thread pool with no pending tasks."""
797
798
799class ThreadPoolClosed(ThreadPoolError):
800 """Trying to do something with a closed thread pool."""
801
802
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000803class ThreadPool(object):
804 """Implements a multithreaded worker pool oriented for mapping jobs with
805 thread-local result storage.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000806
807 Arguments:
808 - initial_threads: Number of threads to start immediately. Can be 0 if it is
809 uncertain that threads will be needed.
810 - max_threads: Maximum number of threads that will be started when all the
811 threads are busy working. Often the number of CPU cores.
812 - queue_size: Maximum number of tasks to buffer in the queue. 0 for unlimited
813 queue. A non-zero value may make add_task() blocking.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000814 - prefix: Prefix to use for thread names. Pool's threads will be
815 named '<prefix>-<thread index>'.
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000816 """
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000817 QUEUE_CLASS = Queue.PriorityQueue
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000818
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000819 def __init__(self, initial_threads, max_threads, queue_size, prefix=None):
820 prefix = prefix or 'tp-0x%0x' % id(self)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000821 logging.debug(
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000822 'New ThreadPool(%d, %d, %d): %s', initial_threads, max_threads,
823 queue_size, prefix)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000824 assert initial_threads <= max_threads
825 # Update this check once 256 cores CPU are common.
826 assert max_threads <= 256
827
maruel@chromium.orgeb281652012-11-08 21:10:23 +0000828 self.tasks = self.QUEUE_CLASS(queue_size)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000829 self._max_threads = max_threads
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000830 self._prefix = prefix
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000831
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000832 # Mutables.
833 self._num_of_added_tasks_lock = threading.Lock()
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000834 self._num_of_added_tasks = 0
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000835 self._outputs_exceptions_cond = threading.Condition()
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000836 self._outputs = []
837 self._exceptions = []
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000838
839 # List of threads, number of threads in wait state, number of terminated and
840 # starting threads. All protected by _workers_lock.
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000841 self._workers_lock = threading.Lock()
842 self._workers = []
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000843 self._ready = 0
844 # Number of terminated threads, used to handle some edge cases in
845 # _is_task_queue_empty.
846 self._dead = 0
847 # Number of threads already added to _workers, but not yet running the loop.
848 self._starting = 0
849 # True if close was called. Forbids adding new tasks.
850 self._is_closed = False
851
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000852 for _ in range(initial_threads):
853 self._add_worker()
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000854
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000855 def _add_worker(self):
856 """Adds one worker thread if there isn't too many. Thread-safe."""
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000857 with self._workers_lock:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000858 if len(self._workers) >= self._max_threads or self._is_closed:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000859 return False
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000860 worker = threading.Thread(
861 name='%s-%d' % (self._prefix, len(self._workers)), target=self._run)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000862 self._workers.append(worker)
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000863 self._starting += 1
864 logging.debug('Starting worker thread %s', worker.name)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000865 worker.daemon = True
866 worker.start()
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000867 return True
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000868
maruel@chromium.org831958f2013-01-22 15:01:46 +0000869 def add_task(self, priority, func, *args, **kwargs):
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000870 """Adds a task, a function to be executed by a worker.
871
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000872 |priority| can adjust the priority of the task versus others. Lower priority
maruel@chromium.org831958f2013-01-22 15:01:46 +0000873 takes precedence.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000874
maruel@chromium.orgedd25d02013-03-26 14:38:00 +0000875 |func| can either return a return value to be added to the output list or
876 be a generator which can emit multiple values.
877
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000878 Returns the index of the item added, e.g. the total number of enqueued items
879 up to now.
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000880 """
maruel@chromium.org831958f2013-01-22 15:01:46 +0000881 assert isinstance(priority, int)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000882 assert callable(func)
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000883 with self._workers_lock:
884 if self._is_closed:
885 raise ThreadPoolClosed('Can not add a task to a closed ThreadPool')
886 start_new_worker = (
887 # Pending task count plus new task > number of available workers.
888 self.tasks.qsize() + 1 > self._ready + self._starting and
889 # Enough slots.
890 len(self._workers) < self._max_threads
891 )
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000892 with self._num_of_added_tasks_lock:
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000893 self._num_of_added_tasks += 1
894 index = self._num_of_added_tasks
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000895 self.tasks.put((priority, index, func, args, kwargs))
896 if start_new_worker:
897 self._add_worker()
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000898 return index
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000899
900 def _run(self):
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000901 """Worker thread loop. Runs until a None task is queued."""
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000902 started = False
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000903 while True:
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000904 try:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000905 with self._workers_lock:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000906 self._ready += 1
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000907 if not started:
908 self._starting -= 1
909 started = True
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000910 task = self.tasks.get()
911 finally:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000912 with self._workers_lock:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000913 self._ready -= 1
914 try:
915 if task is None:
916 # We're done.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000917 with self._workers_lock:
918 self._dead += 1
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000919 return
920 _priority, _index, func, args, kwargs = task
maruel@chromium.orgedd25d02013-03-26 14:38:00 +0000921 if inspect.isgeneratorfunction(func):
922 for out in func(*args, **kwargs):
923 self._output_append(out)
924 else:
925 out = func(*args, **kwargs)
926 self._output_append(out)
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000927 except Exception as e:
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000928 logging.warning('Caught exception: %s', e)
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000929 exc_info = sys.exc_info()
maruel@chromium.org97cd0be2013-03-13 14:01:36 +0000930 logging.info(''.join(traceback.format_tb(exc_info[2])))
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000931 self._outputs_exceptions_cond.acquire()
932 try:
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000933 self._exceptions.append(exc_info)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000934 self._outputs_exceptions_cond.notifyAll()
935 finally:
936 self._outputs_exceptions_cond.release()
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000937 finally:
csharp@chromium.org60991182013-03-18 13:44:17 +0000938 try:
939 self.tasks.task_done()
940 except Exception as e:
941 # We need to catch and log this error here because this is the root
942 # function for the thread, nothing higher will catch the error.
943 logging.exception('Caught exception while marking task as done: %s',
944 e)
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000945
maruel@chromium.orgedd25d02013-03-26 14:38:00 +0000946 def _output_append(self, out):
947 if out is not None:
948 self._outputs_exceptions_cond.acquire()
949 try:
950 self._outputs.append(out)
951 self._outputs_exceptions_cond.notifyAll()
952 finally:
953 self._outputs_exceptions_cond.release()
954
maruel@chromium.orgeb281652012-11-08 21:10:23 +0000955 def join(self):
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000956 """Extracts all the results from each threads unordered.
957
958 Call repeatedly to extract all the exceptions if desired.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000959
960 Note: will wait for all work items to be done before returning an exception.
961 To get an exception early, use get_one_result().
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000962 """
963 # TODO(maruel): Stop waiting as soon as an exception is caught.
maruel@chromium.orgeb281652012-11-08 21:10:23 +0000964 self.tasks.join()
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000965 self._outputs_exceptions_cond.acquire()
966 try:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000967 if self._exceptions:
968 e = self._exceptions.pop(0)
969 raise e[0], e[1], e[2]
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000970 out = self._outputs
971 self._outputs = []
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000972 finally:
973 self._outputs_exceptions_cond.release()
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000974 return out
975
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000976 def get_one_result(self):
977 """Returns the next item that was generated or raises an exception if one
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000978 occurred.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000979
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000980 Raises:
981 ThreadPoolEmpty - no results available.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000982 """
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000983 # Get first available result.
984 for result in self.iter_results():
985 return result
986 # No results -> tasks queue is empty.
987 raise ThreadPoolEmpty('Task queue is empty')
988
989 def iter_results(self):
990 """Yields results as they appear until all tasks are processed."""
991 while True:
992 # Check for pending results.
993 result = None
994 self._outputs_exceptions_cond.acquire()
995 try:
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000996 if self._exceptions:
997 e = self._exceptions.pop(0)
998 raise e[0], e[1], e[2]
999 if self._outputs:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001000 # Remember the result to yield it outside of the lock.
1001 result = self._outputs.pop(0)
1002 else:
1003 # No pending results and no pending tasks -> all tasks are done.
1004 if self._is_task_queue_empty():
1005 return
1006 # Some task is queued, wait for its result to appear.
1007 # Use non-None timeout so that process reacts to Ctrl+C and other
1008 # signals, see http://bugs.python.org/issue8844.
1009 self._outputs_exceptions_cond.wait(timeout=5)
1010 continue
1011 finally:
1012 self._outputs_exceptions_cond.release()
1013 yield result
1014
1015 def _is_task_queue_empty(self):
1016 """True if task queue is empty and all workers are idle.
1017
1018 Doesn't check for pending results from already finished tasks.
1019
1020 Note: this property is not reliable in case tasks are still being
1021 enqueued by concurrent threads.
1022 """
1023 # Some pending tasks?
1024 if not self.tasks.empty():
1025 return False
1026 # Some workers are busy?
1027 with self._workers_lock:
1028 idle = self._ready + self._dead + self._starting
1029 if idle != len(self._workers):
1030 return False
1031 return True
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001032
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001033 def close(self):
1034 """Closes all the threads."""
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001035 # Ensure no new threads can be started, self._workers is effectively
1036 # a constant after that and can be accessed outside the lock.
1037 with self._workers_lock:
1038 if self._is_closed:
1039 raise ThreadPoolClosed('Can not close already closed ThreadPool')
1040 self._is_closed = True
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001041 for _ in range(len(self._workers)):
1042 # Enqueueing None causes the worker to stop.
maruel@chromium.orgeb281652012-11-08 21:10:23 +00001043 self.tasks.put(None)
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001044 for t in self._workers:
1045 t.join()
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001046 logging.debug(
1047 'Thread pool \'%s\' closed: spawned %d threads total',
1048 self._prefix, len(self._workers))
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001049
1050 def __enter__(self):
1051 """Enables 'with' statement."""
1052 return self
1053
maruel@chromium.org97cd0be2013-03-13 14:01:36 +00001054 def __exit__(self, _exc_type, _exc_value, _traceback):
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001055 """Enables 'with' statement."""
1056 self.close()
1057
1058
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001059def valid_file(filepath, size):
1060 """Determines if the given files appears valid (currently it just checks
1061 the file's size)."""
maruel@chromium.org770993b2012-12-11 17:16:48 +00001062 if size == UNKNOWN_FILE_SIZE:
1063 return True
1064 actual_size = os.stat(filepath).st_size
1065 if size != actual_size:
1066 logging.warning(
1067 'Found invalid item %s; %d != %d',
1068 os.path.basename(filepath), actual_size, size)
1069 return False
1070 return True
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001071
1072
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001073class Profiler(object):
1074 def __init__(self, name):
1075 self.name = name
1076 self.start_time = None
1077
1078 def __enter__(self):
1079 self.start_time = time.time()
1080 return self
1081
1082 def __exit__(self, _exc_type, _exec_value, _traceback):
1083 time_taken = time.time() - self.start_time
1084 logging.info('Profiling: Section %s took %3.3f seconds',
1085 self.name, time_taken)
1086
1087
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001088class DeadlockDetector(object):
1089 """Context manager that can detect deadlocks.
1090
1091 It will dump stack frames of all running threads if its 'ping' method isn't
1092 called in time.
1093
1094 Usage:
1095 with DeadlockDetector(timeout=60) as detector:
1096 for item in some_work():
1097 ...
1098 detector.ping()
1099 ...
1100
1101 Arguments:
1102 timeout - maximum allowed time between calls to 'ping'.
1103 """
1104
1105 def __init__(self, timeout):
1106 self.timeout = timeout
1107 self._last_ping = None
1108 self._stop_flag = False
1109 self._stop_cv = threading.Condition()
1110 self._thread = None
1111
1112 def __enter__(self):
1113 """Starts internal watcher thread."""
1114 assert self._thread is None
1115 self._last_ping = time.time()
1116 self._thread = threading.Thread(name='deadlock-watcher', target=self._run)
1117 self._thread.daemon = True
1118 self._thread.start()
1119 return self
1120
1121 def __exit__(self, *_args):
1122 """Stops internal watcher thread."""
1123 assert self._thread is not None
1124 self._stop_cv.acquire()
1125 try:
1126 self._stop_flag = True
1127 self._stop_cv.notify()
1128 finally:
1129 self._stop_cv.release()
1130 self._thread.join()
1131 self._thread = None
1132 self._stop_flag = False
1133
1134 def ping(self):
1135 """Notify detector that main thread is still running.
1136
1137 Should be called periodically to inform the detector that everything is
1138 running as it should.
1139 """
1140 self._stop_cv.acquire()
1141 self._last_ping = time.time()
1142 self._stop_cv.release()
1143
1144 def _run(self):
1145 """Loop that watches for pings and dumps threads state if ping is late."""
1146 while True:
1147 self._stop_cv.acquire()
1148 try:
1149 # This thread is closing?
1150 if self._stop_flag:
1151 return
1152
1153 # Wait until the moment we need to dump stack traces.
1154 # Most probably some other thread will call 'ping' to move deadline
1155 # further in time. We don't bother to wake up after each 'ping', only
1156 # right before initial expected deadline. After waking up we recalculate
1157 # the new deadline (since _last_ping might have changed) and dump
1158 # threads only if it's still exceeded.
1159 deadline = self._last_ping + self.timeout
1160 time_to_wait = deadline - time.time()
1161 if time_to_wait > 0:
1162 self._stop_cv.wait(time_to_wait)
1163 new_deadline = self._last_ping + self.timeout
1164
1165 # Do we still want to dump stacks frames?
1166 if not self._stop_flag and time.time() >= new_deadline:
1167 self._dump_threads(time.time() - self._last_ping)
1168 self._last_ping = time.time()
1169 finally:
1170 self._stop_cv.release()
1171
1172 def _dump_threads(self, timeout):
1173 """Dumps stack frames of all running threads."""
1174 all_threads = threading.enumerate()
1175 current_thread_id = threading.current_thread().ident
1176
1177 # Collect tracebacks: thread name -> traceback string.
1178 tracebacks = {}
1179
1180 # pylint: disable=W0212
1181 for thread_id, frame in sys._current_frames().iteritems():
1182 # Don't dump deadlock detector's own thread, it's boring.
1183 if thread_id == current_thread_id:
1184 continue
1185
1186 # Try to get more informative symbolic thread name.
1187 name = 'untitled'
1188 for thread in all_threads:
1189 if thread.ident == thread_id:
1190 name = thread.name
1191 break
1192 name += ' #%d' % (thread_id,)
1193 tracebacks[name] = ''.join(traceback.format_stack(frame))
1194
1195 # Print tracebacks, sorting them by thread name. That way a thread pool's
1196 # threads will be printed as one group.
1197 self._print('=============== Potential deadlock detected ===============')
1198 self._print('No pings in last %d sec.' % (timeout,))
1199 self._print('Dumping stack frames for all threads:')
1200 for name in sorted(tracebacks):
1201 self._print('Traceback for \'%s\':\n%s' % (name, tracebacks[name]))
1202 self._print('===========================================================')
1203
1204 @staticmethod
1205 def _print(msg):
1206 """Writes message to log."""
1207 logging.warning('%s', msg.rstrip())
1208
1209
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001210class Remote(object):
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001211 """Priority based worker queue to fetch or upload files from a
1212 content-address server. Any function may be given as the fetcher/upload,
1213 as long as it takes two inputs (the item contents, and their relative
1214 destination).
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001215
1216 Supports local file system, CIFS or http remotes.
1217
1218 When the priority of items is equals, works in strict FIFO mode.
1219 """
1220 # Initial and maximum number of worker threads.
1221 INITIAL_WORKERS = 2
1222 MAX_WORKERS = 16
1223 # Priorities.
1224 LOW, MED, HIGH = (1<<8, 2<<8, 3<<8)
1225 INTERNAL_PRIORITY_BITS = (1<<8) - 1
1226 RETRIES = 5
1227
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001228 def __init__(self, destination_root):
1229 # Function to fetch a remote object or upload to a remote location..
1230 self._do_item = self.get_file_handler(destination_root)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001231 # Contains tuple(priority, obj).
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001232 self._done = Queue.PriorityQueue()
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001233 self._pool = ThreadPool(self.INITIAL_WORKERS, self.MAX_WORKERS, 0, 'remote')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001234
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001235 def join(self):
1236 """Blocks until the queue is empty."""
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001237 return self._pool.join()
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001238
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001239 def close(self):
1240 """Terminates all worker threads."""
1241 self._pool.close()
1242
csharp@chromium.orgdf2968f2012-11-16 20:25:37 +00001243 def add_item(self, priority, obj, dest, size):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001244 """Retrieves an object from the remote data store.
1245
1246 The smaller |priority| gets fetched first.
1247
1248 Thread-safe.
1249 """
1250 assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001251 return self._add_item(priority, obj, dest, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001252
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001253 def _add_item(self, priority, obj, dest, size):
1254 assert isinstance(obj, basestring), obj
1255 assert isinstance(dest, basestring), dest
1256 assert size is None or isinstance(size, int), size
1257 return self._pool.add_task(
1258 priority, self._task_executer, priority, obj, dest, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001259
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001260 def get_one_result(self):
1261 return self._pool.get_one_result()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001262
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001263 def _task_executer(self, priority, obj, dest, size):
1264 """Wraps self._do_item to trap and retry on IOError exceptions."""
1265 try:
1266 self._do_item(obj, dest)
1267 if size and not valid_file(dest, size):
1268 download_size = os.stat(dest).st_size
1269 os.remove(dest)
1270 raise IOError('File incorrect size after download of %s. Got %s and '
1271 'expected %s' % (obj, download_size, size))
1272 # TODO(maruel): Technically, we'd want to have an output queue to be a
1273 # PriorityQueue.
1274 return obj
1275 except IOError as e:
1276 logging.debug('Caught IOError: %s', e)
1277 # Retry a few times, lowering the priority.
1278 if (priority & self.INTERNAL_PRIORITY_BITS) < self.RETRIES:
1279 self._add_item(priority + 1, obj, dest, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001280 return
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001281 raise
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001282
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001283 def get_file_handler(self, file_or_url): # pylint: disable=R0201
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001284 """Returns a object to retrieve objects from a remote."""
1285 if re.match(r'^https?://.+$', file_or_url):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001286 def download_file(item, dest):
1287 # TODO(maruel): Reuse HTTP connections. The stdlib doesn't make this
1288 # easy.
csharp@chromium.orga92403f2012-11-20 15:13:59 +00001289 try:
csharp@chromium.orgaa2d1512012-12-05 21:17:39 +00001290 zipped_source = file_or_url + item
csharp@chromium.orga92403f2012-11-20 15:13:59 +00001291 logging.debug('download_file(%s)', zipped_source)
csharp@chromium.orge9c8d942013-03-11 20:48:36 +00001292
1293 # Because the app engine DB is only eventually consistent, retry
1294 # 404 errors because the file might just not be visible yet (even
1295 # though it has been uploaded).
1296 connection = url_open(zipped_source, retry_404=True)
csharp@chromium.orgf13eec02013-03-11 18:22:56 +00001297 if not connection:
1298 raise IOError('Unable to open connection to %s' % zipped_source)
csharp@chromium.orga92403f2012-11-20 15:13:59 +00001299 decompressor = zlib.decompressobj()
maruel@chromium.org3f039182012-11-27 21:32:41 +00001300 size = 0
csharp@chromium.orga92403f2012-11-20 15:13:59 +00001301 with open(dest, 'wb') as f:
1302 while True:
1303 chunk = connection.read(ZIPPED_FILE_CHUNK)
1304 if not chunk:
1305 break
maruel@chromium.org3f039182012-11-27 21:32:41 +00001306 size += len(chunk)
csharp@chromium.orga92403f2012-11-20 15:13:59 +00001307 f.write(decompressor.decompress(chunk))
1308 # Ensure that all the data was properly decompressed.
1309 uncompressed_data = decompressor.flush()
1310 assert not uncompressed_data
maruel@chromium.orgb9738cc2013-06-06 14:06:44 +00001311 except IOError as e:
1312 logging.error(
1313 'Failed to download %s at %s.\n%s', item, dest, e)
csharp@chromium.org549669e2013-01-22 19:48:17 +00001314 raise
csharp@chromium.orga110d792013-01-07 16:16:16 +00001315 except httplib.HTTPException as e:
maruel@chromium.orgb9738cc2013-06-06 14:06:44 +00001316 msg = 'HTTPException while retrieving %s at %s.\n%s' % (
1317 item, dest, e)
1318 logging.error(msg)
1319 raise IOError(msg)
csharp@chromium.org186d6232012-11-26 14:36:12 +00001320 except zlib.error as e:
csharp@chromium.orge3413b42013-05-24 17:56:56 +00001321 remaining_size = len(connection.read())
maruel@chromium.orgb9738cc2013-06-06 14:06:44 +00001322 msg = 'Corrupted zlib for item %s. Processed %d of %d bytes.\n%s' % (
1323 item, size, size + remaining_size, e)
csharp@chromium.orge3413b42013-05-24 17:56:56 +00001324 logging.error(msg)
csharp@chromium.orgec477752013-05-24 20:48:48 +00001325
1326 # Testing seems to show that if a few machines are trying to download
1327 # the same blob, they can cause each other to fail. So if we hit a
1328 # zip error, this is the most likely cause (it only downloads some of
1329 # the data). Randomly sleep for between 5 and 25 seconds to try and
1330 # spread out the downloads.
1331 # TODO(csharp): Switch from blobstorage to cloud storage and see if
1332 # that solves the issue.
1333 sleep_duration = (random.random() * 20) + 5
1334 time.sleep(sleep_duration)
1335
csharp@chromium.orge3413b42013-05-24 17:56:56 +00001336 raise IOError(msg)
1337
csharp@chromium.orga92403f2012-11-20 15:13:59 +00001338
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001339 return download_file
1340
1341 def copy_file(item, dest):
1342 source = os.path.join(file_or_url, item)
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001343 if source == dest:
1344 logging.info('Source and destination are the same, no action required')
1345 return
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001346 logging.debug('copy_file(%s, %s)', source, dest)
1347 shutil.copy(source, dest)
1348 return copy_file
1349
1350
1351class CachePolicies(object):
1352 def __init__(self, max_cache_size, min_free_space, max_items):
1353 """
1354 Arguments:
1355 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1356 cache is effectively a leak.
1357 - min_free_space: Trim if disk free space becomes lower than this value. If
1358 0, it unconditionally fill the disk.
1359 - max_items: Maximum number of items to keep in the cache. If 0, do not
1360 enforce a limit.
1361 """
1362 self.max_cache_size = max_cache_size
1363 self.min_free_space = min_free_space
1364 self.max_items = max_items
1365
1366
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001367class NoCache(object):
1368 """This class is intended to be usable everywhere the Cache class is.
1369 Instead of downloading to a cache, all files are downloaded to the target
1370 directory and then moved to where they are needed.
1371 """
1372
1373 def __init__(self, target_directory, remote):
1374 self.target_directory = target_directory
1375 self.remote = remote
1376
1377 def retrieve(self, priority, item, size):
1378 """Get the request file."""
1379 self.remote.add_item(priority, item, self.path(item), size)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001380 self.remote.get_one_result()
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001381
1382 def wait_for(self, items):
1383 """Download the first item of the given list if it is missing."""
1384 item = items.iterkeys().next()
1385
1386 if not os.path.exists(self.path(item)):
1387 self.remote.add_item(Remote.MED, item, self.path(item), UNKNOWN_FILE_SIZE)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001388 downloaded = self.remote.get_one_result()
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001389 assert downloaded == item
1390
1391 return item
1392
1393 def path(self, item):
1394 return os.path.join(self.target_directory, item)
1395
1396
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001397class Cache(object):
1398 """Stateful LRU cache.
1399
1400 Saves its state as json file.
1401 """
1402 STATE_FILE = 'state.json'
1403
1404 def __init__(self, cache_dir, remote, policies):
1405 """
1406 Arguments:
1407 - cache_dir: Directory where to place the cache.
1408 - remote: Remote where to fetch items from.
1409 - policies: cache retention policies.
1410 """
1411 self.cache_dir = cache_dir
1412 self.remote = remote
1413 self.policies = policies
1414 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001415 # The tuple(file, size) are kept as an array in a LRU style. E.g.
1416 # self.state[0] is the oldest item.
1417 self.state = []
1418 self._state_need_to_be_saved = False
1419 # A lookup map to speed up searching.
1420 self._lookup = {}
1421 self._lookup_is_stale = True
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001422
1423 # Items currently being fetched. Keep it local to reduce lock contention.
1424 self._pending_queue = set()
1425
1426 # Profiling values.
1427 self._added = []
1428 self._removed = []
1429 self._free_disk = 0
1430
maruel@chromium.org770993b2012-12-11 17:16:48 +00001431 with Profiler('Setup'):
1432 if not os.path.isdir(self.cache_dir):
1433 os.makedirs(self.cache_dir)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001434 if os.path.isfile(self.state_file):
1435 try:
1436 self.state = json.load(open(self.state_file, 'r'))
1437 except (IOError, ValueError), e:
1438 # Too bad. The file will be overwritten and the cache cleared.
1439 logging.error(
1440 'Broken state file %s, ignoring.\n%s' % (self.STATE_FILE, e))
1441 self._state_need_to_be_saved = True
1442 if (not isinstance(self.state, list) or
1443 not all(
1444 isinstance(i, (list, tuple)) and len(i) == 2
1445 for i in self.state)):
1446 # Discard.
1447 self._state_need_to_be_saved = True
1448 self.state = []
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001449
maruel@chromium.org770993b2012-12-11 17:16:48 +00001450 # Ensure that all files listed in the state still exist and add new ones.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001451 previous = set(filename for filename, _ in self.state)
1452 if len(previous) != len(self.state):
1453 logging.warning('Cache state is corrupted, found duplicate files')
1454 self._state_need_to_be_saved = True
1455 self.state = []
1456
1457 added = 0
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001458 for filename in os.listdir(self.cache_dir):
1459 if filename == self.STATE_FILE:
1460 continue
1461 if filename in previous:
1462 previous.remove(filename)
1463 continue
1464 # An untracked file.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001465 if not RE_IS_SHA1.match(filename):
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001466 logging.warning('Removing unknown file %s from cache', filename)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001467 os.remove(self.path(filename))
maruel@chromium.org770993b2012-12-11 17:16:48 +00001468 continue
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001469 # Insert as the oldest file. It will be deleted eventually if not
1470 # accessed.
1471 self._add(filename, False)
1472 logging.warning('Add unknown file %s to cache', filename)
1473 added += 1
maruel@chromium.org770993b2012-12-11 17:16:48 +00001474
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001475 if added:
1476 logging.warning('Added back %d unknown files', added)
maruel@chromium.org770993b2012-12-11 17:16:48 +00001477 if previous:
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001478 logging.warning('Removed %d lost files', len(previous))
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001479 # Set explicitly in case self._add() wasn't called.
1480 self._state_need_to_be_saved = True
1481 # Filter out entries that were not found while keeping the previous
1482 # order.
1483 self.state = [
1484 (filename, size) for filename, size in self.state
1485 if filename not in previous
1486 ]
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001487 self.trim()
1488
1489 def __enter__(self):
1490 return self
1491
1492 def __exit__(self, _exc_type, _exec_value, _traceback):
1493 with Profiler('CleanupTrimming'):
1494 self.trim()
1495
1496 logging.info(
maruel@chromium.org5fd6f472012-12-11 00:26:08 +00001497 '%5d (%8dkb) added', len(self._added), sum(self._added) / 1024)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001498 logging.info(
maruel@chromium.org5fd6f472012-12-11 00:26:08 +00001499 '%5d (%8dkb) current',
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001500 len(self.state),
1501 sum(i[1] for i in self.state) / 1024)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001502 logging.info(
maruel@chromium.org5fd6f472012-12-11 00:26:08 +00001503 '%5d (%8dkb) removed', len(self._removed), sum(self._removed) / 1024)
1504 logging.info(' %8dkb free', self._free_disk / 1024)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001505
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001506 def remove_file_at_index(self, index):
1507 """Removes the file at the given index."""
1508 try:
1509 self._state_need_to_be_saved = True
1510 filename, size = self.state.pop(index)
1511 # If the lookup was already stale, its possible the filename was not
1512 # present yet.
1513 self._lookup_is_stale = True
1514 self._lookup.pop(filename, None)
1515 self._removed.append(size)
1516 os.remove(self.path(filename))
1517 except OSError as e:
1518 logging.error('Error attempting to delete a file\n%s' % e)
1519
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001520 def remove_lru_file(self):
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001521 """Removes the last recently used file."""
1522 self.remove_file_at_index(0)
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001523
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001524 def trim(self):
1525 """Trims anything we don't know, make sure enough free space exists."""
1526 # Ensure maximum cache size.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001527 if self.policies.max_cache_size and self.state:
1528 while sum(i[1] for i in self.state) > self.policies.max_cache_size:
1529 self.remove_lru_file()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001530
1531 # Ensure maximum number of items in the cache.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001532 if self.policies.max_items and self.state:
1533 while len(self.state) > self.policies.max_items:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001534 self.remove_lru_file()
1535
1536 # Ensure enough free space.
1537 self._free_disk = get_free_space(self.cache_dir)
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001538 trimmed_due_to_space = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001539 while (
1540 self.policies.min_free_space and
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001541 self.state and
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001542 self._free_disk < self.policies.min_free_space):
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001543 trimmed_due_to_space = True
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001544 self.remove_lru_file()
1545 self._free_disk = get_free_space(self.cache_dir)
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001546 if trimmed_due_to_space:
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001547 total = sum(i[1] for i in self.state)
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001548 logging.warning(
1549 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1550 'cache (%.1f%% of its maximum capacity)',
1551 self._free_disk / 1024.,
1552 total / 1024.,
1553 100. * self.policies.max_cache_size / float(total),
1554 )
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001555 self.save()
1556
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001557 def retrieve(self, priority, item, size):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001558 """Retrieves a file from the remote, if not already cached, and adds it to
1559 the cache.
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001560
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001561 If the file is in the cache, verifiy that the file is valid (i.e. it is
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001562 the correct size), retrieving it again if it isn't.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001563 """
1564 assert not '/' in item
1565 path = self.path(item)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001566 self._update_lookup()
1567 index = self._lookup.get(item)
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001568
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001569 if index is not None:
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001570 if not valid_file(self.path(item), size):
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001571 self.remove_file_at_index(index)
1572 index = None
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001573 else:
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001574 assert index < len(self.state)
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001575 # Was already in cache. Update it's LRU value by putting it at the end.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001576 self._state_need_to_be_saved = True
1577 self._lookup_is_stale = True
1578 self.state.append(self.state.pop(index))
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001579
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001580 if index is None:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001581 if item in self._pending_queue:
1582 # Already pending. The same object could be referenced multiple times.
1583 return
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001584 # TODO(maruel): It should look at the free disk space, the current cache
1585 # size and the size of the new item on every new item:
1586 # - Trim the cache as more entries are listed when free disk space is low,
1587 # otherwise if the amount of data downloaded during the run > free disk
1588 # space, it'll crash.
1589 # - Make sure there's enough free disk space to fit all dependencies of
1590 # this run! If not, abort early.
csharp@chromium.orgdf2968f2012-11-16 20:25:37 +00001591 self.remote.add_item(priority, item, path, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001592 self._pending_queue.add(item)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001593
1594 def add(self, filepath, obj):
1595 """Forcibly adds a file to the cache."""
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001596 self._update_lookup()
1597 if not obj in self._lookup:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001598 link_file(self.path(obj), filepath, HARDLINK)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001599 self._add(obj, True)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001600
1601 def path(self, item):
1602 """Returns the path to one item."""
1603 return os.path.join(self.cache_dir, item)
1604
1605 def save(self):
1606 """Saves the LRU ordering."""
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001607 if self._state_need_to_be_saved:
1608 json.dump(self.state, open(self.state_file, 'wb'), separators=(',',':'))
1609 self._state_need_to_be_saved = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001610
1611 def wait_for(self, items):
1612 """Starts a loop that waits for at least one of |items| to be retrieved.
1613
1614 Returns the first item retrieved.
1615 """
1616 # Flush items already present.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001617 self._update_lookup()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001618 for item in items:
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001619 if item in self._lookup:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001620 return item
1621
1622 assert all(i in self._pending_queue for i in items), (
1623 items, self._pending_queue)
1624 # Note that:
1625 # len(self._pending_queue) ==
1626 # ( len(self.remote._workers) - self.remote._ready +
1627 # len(self._remote._queue) + len(self._remote.done))
1628 # There is no lock-free way to verify that.
1629 while self._pending_queue:
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001630 item = self.remote.get_one_result()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001631 self._pending_queue.remove(item)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001632 self._add(item, True)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001633 if item in items:
1634 return item
1635
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001636 def _add(self, item, at_end):
1637 """Adds an item in the internal state.
1638
1639 If |at_end| is False, self._lookup becomes inconsistent and
1640 self._update_lookup() must be called.
1641 """
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001642 size = os.stat(self.path(item)).st_size
1643 self._added.append(size)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001644 self._state_need_to_be_saved = True
1645 if at_end:
1646 self.state.append((item, size))
1647 self._lookup[item] = len(self.state) - 1
1648 else:
1649 self._lookup_is_stale = True
1650 self.state.insert(0, (item, size))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001651
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001652 def _update_lookup(self):
1653 if self._lookup_is_stale:
1654 self._lookup = dict(
1655 (filename, index) for index, (filename, _) in enumerate(self.state))
1656 self._lookup_is_stale = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001657
1658
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001659class IsolatedFile(object):
1660 """Represents a single parsed .isolated file."""
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001661 def __init__(self, obj_hash):
1662 """|obj_hash| is really the sha-1 of the file."""
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001663 logging.debug('IsolatedFile(%s)' % obj_hash)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001664 self.obj_hash = obj_hash
1665 # Set once all the left-side of the tree is parsed. 'Tree' here means the
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001666 # .isolate and all the .isolated files recursively included by it with
1667 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1668 # .isolated file in the hash table, is important, as the later ones are not
1669 # processed until the firsts are retrieved and read.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001670 self.can_fetch = False
1671
1672 # Raw data.
1673 self.data = {}
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001674 # A IsolatedFile instance, one per object in self.includes.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001675 self.children = []
1676
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001677 # Set once the .isolated file is loaded.
1678 self._is_parsed = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001679 # Set once the files are fetched.
1680 self.files_fetched = False
1681
1682 def load(self, content):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001683 """Verifies the .isolated file is valid and loads this object with the json
1684 data.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001685 """
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001686 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1687 assert not self._is_parsed
1688 self.data = load_isolated(content)
1689 self.children = [IsolatedFile(i) for i in self.data.get('includes', [])]
1690 self._is_parsed = True
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001691
1692 def fetch_files(self, cache, files):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001693 """Adds files in this .isolated file not present in |files| dictionary.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001694
1695 Preemptively request files.
1696
1697 Note that |files| is modified by this function.
1698 """
1699 assert self.can_fetch
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001700 if not self._is_parsed or self.files_fetched:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001701 return
1702 logging.debug('fetch_files(%s)' % self.obj_hash)
1703 for filepath, properties in self.data.get('files', {}).iteritems():
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001704 # Root isolated has priority on the files being mapped. In particular,
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001705 # overriden files must not be fetched.
1706 if filepath not in files:
1707 files[filepath] = properties
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001708 if 'h' in properties:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001709 # Preemptively request files.
1710 logging.debug('fetching %s' % filepath)
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001711 cache.retrieve(Remote.MED, properties['h'], properties['s'])
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001712 self.files_fetched = True
1713
1714
1715class Settings(object):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001716 """Results of a completely parsed .isolated file."""
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001717 def __init__(self):
1718 self.command = []
1719 self.files = {}
1720 self.read_only = None
1721 self.relative_cwd = None
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001722 # The main .isolated file, a IsolatedFile instance.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001723 self.root = None
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001724
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001725 def load(self, cache, root_isolated_hash):
1726 """Loads the .isolated and all the included .isolated asynchronously.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001727
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001728 It enables support for "included" .isolated files. They are processed in
1729 strict order but fetched asynchronously from the cache. This is important so
1730 that a file in an included .isolated file that is overridden by an embedding
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001731 .isolated file is not fetched needlessly. The includes are fetched in one
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001732 pass and the files are fetched as soon as all the ones on the left-side
1733 of the tree were fetched.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001734
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001735 The prioritization is very important here for nested .isolated files.
1736 'includes' have the highest priority and the algorithm is optimized for both
1737 deep and wide trees. A deep one is a long link of .isolated files referenced
1738 one at a time by one item in 'includes'. A wide one has a large number of
1739 'includes' in a single .isolated file. 'left' is defined as an included
1740 .isolated file earlier in the 'includes' list. So the order of the elements
1741 in 'includes' is important.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001742 """
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001743 self.root = IsolatedFile(root_isolated_hash)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001744
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001745 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1746 pending = {}
1747 # Set of hashes of already retrieved items to refuse recursive includes.
1748 seen = set()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001749
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001750 def retrieve(isolated_file):
1751 h = isolated_file.obj_hash
1752 if h in seen:
1753 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1754 assert h not in pending
1755 seen.add(h)
1756 pending[h] = isolated_file
1757 cache.retrieve(Remote.HIGH, h, UNKNOWN_FILE_SIZE)
1758
1759 retrieve(self.root)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001760
1761 while pending:
1762 item_hash = cache.wait_for(pending)
1763 item = pending.pop(item_hash)
1764 item.load(open(cache.path(item_hash), 'r').read())
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001765 if item_hash == root_isolated_hash:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001766 # It's the root item.
1767 item.can_fetch = True
1768
1769 for new_child in item.children:
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001770 retrieve(new_child)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001771
1772 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001773 self._traverse_tree(cache, self.root)
1774
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001775 def check(n):
1776 return all(check(x) for x in n.children) and n.files_fetched
1777 assert check(self.root)
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001778
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001779 self.relative_cwd = self.relative_cwd or ''
1780 self.read_only = self.read_only or False
1781
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001782 def _traverse_tree(self, cache, node):
1783 if node.can_fetch:
1784 if not node.files_fetched:
1785 self._update_self(cache, node)
1786 will_break = False
1787 for i in node.children:
1788 if not i.can_fetch:
1789 if will_break:
1790 break
1791 # Automatically mark the first one as fetcheable.
1792 i.can_fetch = True
1793 will_break = True
1794 self._traverse_tree(cache, i)
1795
1796 def _update_self(self, cache, node):
1797 node.fetch_files(cache, self.files)
1798 # Grabs properties.
1799 if not self.command and node.data.get('command'):
1800 self.command = node.data['command']
1801 if self.read_only is None and node.data.get('read_only') is not None:
1802 self.read_only = node.data['read_only']
1803 if (self.relative_cwd is None and
1804 node.data.get('relative_cwd') is not None):
1805 self.relative_cwd = node.data['relative_cwd']
1806
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001807
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001808def create_directories(base_directory, files):
1809 """Creates the directory structure needed by the given list of files."""
1810 logging.debug('create_directories(%s, %d)', base_directory, len(files))
1811 # Creates the tree of directories to create.
1812 directories = set(os.path.dirname(f) for f in files)
1813 for item in list(directories):
1814 while item:
1815 directories.add(item)
1816 item = os.path.dirname(item)
1817 for d in sorted(directories):
1818 if d:
1819 os.mkdir(os.path.join(base_directory, d))
1820
1821
1822def create_links(base_directory, files):
1823 """Creates any links needed by the given set of files."""
1824 for filepath, properties in files:
csharp@chromium.org89eaf082013-03-26 18:56:21 +00001825 if 'l' not in properties:
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001826 continue
maruel@chromium.org3320ee12013-03-28 13:23:31 +00001827 if sys.platform == 'win32':
1828 # TODO(maruel): Create junctions or empty text files similar to what
1829 # cygwin do?
1830 logging.warning('Ignoring symlink %s', filepath)
1831 continue
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001832 outfile = os.path.join(base_directory, filepath)
1833 # symlink doesn't exist on Windows. So the 'link' property should
1834 # never be specified for windows .isolated file.
1835 os.symlink(properties['l'], outfile) # pylint: disable=E1101
1836 if 'm' in properties:
1837 lchmod = getattr(os, 'lchmod', None)
1838 if lchmod:
1839 lchmod(outfile, properties['m'])
1840
1841
1842def setup_commands(base_directory, cwd, cmd):
1843 """Correctly adjusts and then returns the required working directory
1844 and command needed to run the test.
1845 """
1846 assert not os.path.isabs(cwd), 'The cwd must be a relative path, got %s' % cwd
1847 cwd = os.path.join(base_directory, cwd)
1848 if not os.path.isdir(cwd):
1849 os.makedirs(cwd)
1850
1851 # Ensure paths are correctly separated on windows.
1852 cmd[0] = cmd[0].replace('/', os.path.sep)
1853 cmd = fix_python_path(cmd)
1854
1855 return cwd, cmd
1856
1857
1858def generate_remaining_files(files):
1859 """Generates a dictionary of all the remaining files to be downloaded."""
1860 remaining = {}
1861 for filepath, props in files:
1862 if 'h' in props:
1863 remaining.setdefault(props['h'], []).append((filepath, props))
1864
1865 return remaining
1866
1867
1868def download_test_data(isolated_hash, target_directory, remote):
1869 """Downloads the dependencies to the given directory."""
1870 if not os.path.exists(target_directory):
1871 os.makedirs(target_directory)
1872
1873 settings = Settings()
1874 no_cache = NoCache(target_directory, Remote(remote))
1875
1876 # Download all the isolated files.
1877 with Profiler('GetIsolateds') as _prof:
1878 settings.load(no_cache, isolated_hash)
1879
1880 if not settings.command:
1881 print >> sys.stderr, 'No command to run'
1882 return 1
1883
1884 with Profiler('GetRest') as _prof:
1885 create_directories(target_directory, settings.files)
1886 create_links(target_directory, settings.files.iteritems())
1887
1888 cwd, cmd = setup_commands(target_directory, settings.relative_cwd,
1889 settings.command[:])
1890
1891 remaining = generate_remaining_files(settings.files.iteritems())
1892
1893 # Now block on the remaining files to be downloaded and mapped.
1894 logging.info('Retrieving remaining files')
1895 last_update = time.time()
1896 while remaining:
1897 obj = no_cache.wait_for(remaining)
1898 files = remaining.pop(obj)
1899
1900 for i, (filepath, properties) in enumerate(files):
1901 outfile = os.path.join(target_directory, filepath)
1902 logging.info(no_cache.path(obj))
1903
1904 if i + 1 == len(files):
1905 os.rename(no_cache.path(obj), outfile)
1906 else:
1907 shutil.copyfile(no_cache.path(obj), outfile)
1908
maruel@chromium.orgbaa108d2013-03-28 13:24:51 +00001909 if 'm' in properties and not sys.platform == 'win32':
1910 # It's not set on Windows. It could be set only in the case of
1911 # downloading content generated from another OS. Do not crash in that
1912 # case.
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001913 os.chmod(outfile, properties['m'])
1914
1915 if time.time() - last_update > DELAY_BETWEEN_UPDATES_IN_SECS:
csharp@chromium.org5daba352013-07-03 17:29:27 +00001916 msg = '%d files remaining...' % len(remaining)
1917 print msg
1918 logging.info(msg)
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001919 last_update = time.time()
1920
1921 print('.isolated files successfully downloaded and setup in %s' %
1922 target_directory)
1923 print('To run this test please run the command %s from the directory %s' %
1924 (cmd, cwd))
1925
1926 return 0
1927
1928
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001929def run_tha_test(isolated_hash, cache_dir, remote, policies):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001930 """Downloads the dependencies in the cache, hardlinks them into a temporary
1931 directory and runs the executable.
1932 """
1933 settings = Settings()
1934 with Cache(cache_dir, Remote(remote), policies) as cache:
1935 outdir = make_temp_dir('run_tha_test', cache_dir)
1936 try:
1937 # Initiate all the files download.
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001938 with Profiler('GetIsolateds') as _prof:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001939 # Optionally support local files.
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001940 if not RE_IS_SHA1.match(isolated_hash):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001941 # Adds it in the cache. While not strictly necessary, this simplifies
1942 # the rest.
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001943 h = hashlib.sha1(open(isolated_hash, 'rb').read()).hexdigest()
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001944 cache.add(isolated_hash, h)
1945 isolated_hash = h
1946 settings.load(cache, isolated_hash)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001947
1948 if not settings.command:
1949 print >> sys.stderr, 'No command to run'
1950 return 1
1951
1952 with Profiler('GetRest') as _prof:
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001953 create_directories(outdir, settings.files)
1954 create_links(outdir, settings.files.iteritems())
1955 remaining = generate_remaining_files(settings.files.iteritems())
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001956
1957 # Do bookkeeping while files are being downloaded in the background.
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001958 cwd, cmd = setup_commands(outdir, settings.relative_cwd,
1959 settings.command[:])
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001960
1961 # Now block on the remaining files to be downloaded and mapped.
csharp@chromium.org9c59ff12012-12-12 02:32:29 +00001962 logging.info('Retrieving remaining files')
1963 last_update = time.time()
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001964 with DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1965 while remaining:
1966 detector.ping()
1967 obj = cache.wait_for(remaining)
1968 for filepath, properties in remaining.pop(obj):
1969 outfile = os.path.join(outdir, filepath)
1970 link_file(outfile, cache.path(obj), HARDLINK)
1971 if 'm' in properties:
1972 # It's not set on Windows.
1973 os.chmod(outfile, properties['m'])
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001974
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001975 if time.time() - last_update > DELAY_BETWEEN_UPDATES_IN_SECS:
1976 msg = '%d files remaining...' % len(remaining)
1977 print msg
1978 logging.info(msg)
1979 last_update = time.time()
csharp@chromium.org9c59ff12012-12-12 02:32:29 +00001980
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001981 if settings.read_only:
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001982 logging.info('Making files read only')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001983 make_writable(outdir, True)
1984 logging.info('Running %s, cwd=%s' % (cmd, cwd))
csharp@chromium.orge217f302012-11-22 16:51:53 +00001985
1986 # TODO(csharp): This should be specified somewhere else.
1987 # Add a rotating log file if one doesn't already exist.
1988 env = os.environ.copy()
1989 env.setdefault('RUN_TEST_CASES_LOG_FILE', RUN_TEST_CASES_LOG)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001990 try:
1991 with Profiler('RunTest') as _prof:
csharp@chromium.orge217f302012-11-22 16:51:53 +00001992 return subprocess.call(cmd, cwd=cwd, env=env)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001993 except OSError:
1994 print >> sys.stderr, 'Failed to run %s; cwd=%s' % (cmd, cwd)
1995 raise
1996 finally:
1997 rmtree(outdir)
1998
1999
2000def main():
maruel@chromium.org46e61cc2013-03-25 19:55:34 +00002001 disable_buffering()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002002 parser = optparse.OptionParser(
2003 usage='%prog <options>', description=sys.modules[__name__].__doc__)
2004 parser.add_option(
2005 '-v', '--verbose', action='count', default=0, help='Use multiple times')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002006
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002007 group = optparse.OptionGroup(parser, 'Download')
2008 group.add_option(
2009 '--download', metavar='DEST',
2010 help='Downloads files to DEST and returns without running, instead of '
2011 'downloading and then running from a temporary directory.')
2012 parser.add_option_group(group)
2013
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002014 group = optparse.OptionGroup(parser, 'Data source')
2015 group.add_option(
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002016 '-s', '--isolated',
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002017 metavar='FILE',
2018 help='File/url describing what to map or run')
2019 group.add_option(
2020 '-H', '--hash',
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002021 help='Hash of the .isolated to grab from the hash table')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002022 parser.add_option_group(group)
2023
2024 group.add_option(
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002025 '-r', '--remote', metavar='URL',
2026 default=
2027 'https://isolateserver.appspot.com/content/retrieve/default-gzip/',
2028 help='Remote where to get the items. Defaults to %default')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002029 group = optparse.OptionGroup(parser, 'Cache management')
2030 group.add_option(
2031 '--cache',
2032 default='cache',
2033 metavar='DIR',
2034 help='Cache directory, default=%default')
2035 group.add_option(
2036 '--max-cache-size',
2037 type='int',
2038 metavar='NNN',
2039 default=20*1024*1024*1024,
2040 help='Trim if the cache gets larger than this value, default=%default')
2041 group.add_option(
2042 '--min-free-space',
2043 type='int',
2044 metavar='NNN',
maruel@chromium.org9e98e432013-05-31 17:06:51 +00002045 default=2*1024*1024*1024,
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002046 help='Trim if disk free space becomes lower than this value, '
2047 'default=%default')
2048 group.add_option(
2049 '--max-items',
2050 type='int',
2051 metavar='NNN',
2052 default=100000,
2053 help='Trim if more than this number of items are in the cache '
2054 'default=%default')
2055 parser.add_option_group(group)
2056
2057 options, args = parser.parse_args()
maruel@chromium.org9e98e432013-05-31 17:06:51 +00002058 levels = [logging.WARNING, logging.INFO, logging.DEBUG]
2059 level = levels[min(len(levels) - 1, options.verbose)]
csharp@chromium.orgff2a4662012-11-21 20:49:32 +00002060
2061 logging_console = logging.StreamHandler()
2062 logging_console.setFormatter(logging.Formatter(
2063 '%(levelname)5s %(module)15s(%(lineno)3d): %(message)s'))
2064 logging_console.setLevel(level)
2065 logging.getLogger().addHandler(logging_console)
2066
2067 logging_rotating_file = logging.handlers.RotatingFileHandler(
2068 RUN_ISOLATED_LOG_FILE,
2069 maxBytes=10 * 1024 * 1024, backupCount=5)
2070 logging_rotating_file.setLevel(logging.DEBUG)
2071 logging_rotating_file.setFormatter(logging.Formatter(
2072 '%(asctime)s %(levelname)-8s %(module)15s(%(lineno)3d): %(message)s'))
2073 logging.getLogger().addHandler(logging_rotating_file)
2074
2075 logging.getLogger().setLevel(logging.DEBUG)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002076
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002077 if bool(options.isolated) == bool(options.hash):
maruel@chromium.org5dd75dd2012-12-03 15:11:32 +00002078 logging.debug('One and only one of --isolated or --hash is required.')
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002079 parser.error('One and only one of --isolated or --hash is required.')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002080 if args:
maruel@chromium.org5dd75dd2012-12-03 15:11:32 +00002081 logging.debug('Unsupported args %s' % ' '.join(args))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002082 parser.error('Unsupported args %s' % ' '.join(args))
2083
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002084 options.cache = os.path.abspath(options.cache)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002085 policies = CachePolicies(
2086 options.max_cache_size, options.min_free_space, options.max_items)
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002087
2088 if options.download:
2089 return download_test_data(options.isolated or options.hash,
2090 options.download, options.remote)
2091 else:
2092 try:
2093 return run_tha_test(
2094 options.isolated or options.hash,
2095 options.cache,
2096 options.remote,
2097 policies)
2098 except Exception, e:
2099 # Make sure any exception is logged.
2100 logging.exception(e)
2101 return 1
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002102
2103
2104if __name__ == '__main__':
csharp@chromium.orgbfb98742013-03-26 20:28:36 +00002105 # Ensure that we are always running with the correct encoding.
2106 fix_default_encoding()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002107 sys.exit(main())