blob: c7e69b33ca082dc80aed1f82fa1c8d0e945e42bd [file] [log] [blame]
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001#!/usr/bin/env python
2# Copyright (c) 2012 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00006"""Reads a .isolated, creates a tree of hardlinks and runs the test.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00007
8Keeps a local cache.
9"""
10
vadimsh@chromium.org87d63262013-04-04 19:34:21 +000011import cookielib
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000012import ctypes
vadimsh@chromium.org80f73002013-07-12 14:52:44 +000013import functools
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000014import hashlib
csharp@chromium.orga110d792013-01-07 16:16:16 +000015import httplib
maruel@chromium.orgedd25d02013-03-26 14:38:00 +000016import inspect
maruel@chromium.org2b2139a2013-04-30 20:14:58 +000017import itertools
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000018import json
csharp@chromium.orgbfb98742013-03-26 20:28:36 +000019import locale
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000020import logging
csharp@chromium.orgff2a4662012-11-21 20:49:32 +000021import logging.handlers
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000022import math
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000023import optparse
24import os
25import Queue
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000026import random
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000027import re
28import shutil
vadimsh@chromium.org80f73002013-07-12 14:52:44 +000029import socket
30import ssl
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000031import stat
32import subprocess
33import sys
34import tempfile
35import threading
36import time
maruel@chromium.org97cd0be2013-03-13 14:01:36 +000037import traceback
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000038import urllib
csharp@chromium.orga92403f2012-11-20 15:13:59 +000039import urllib2
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000040import urlparse
csharp@chromium.orga92403f2012-11-20 15:13:59 +000041import zlib
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000042
vadimsh@chromium.org87d63262013-04-04 19:34:21 +000043# Try to import 'upload' module used by AppEngineService for authentication.
44# If it is not there, app engine authentication support will be disabled.
45try:
46 from third_party import upload
47 # Hack out upload logging.info()
48 upload.logging = logging.getLogger('upload')
49 # Mac pylint choke on this line.
50 upload.logging.setLevel(logging.WARNING) # pylint: disable=E1103
51except ImportError:
52 upload = None
53
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000054
maruel@chromium.org6b365dc2012-10-18 19:17:56 +000055# Types of action accepted by link_file().
maruel@chromium.orgba6489b2013-07-11 20:23:33 +000056HARDLINK, HARDLINK_WITH_FALLBACK, SYMLINK, COPY = range(1, 5)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000057
58RE_IS_SHA1 = re.compile(r'^[a-fA-F0-9]{40}$')
59
csharp@chromium.org8dc52542012-11-08 20:29:55 +000060# The file size to be used when we don't know the correct file size,
61# generally used for .isolated files.
62UNKNOWN_FILE_SIZE = None
63
csharp@chromium.orga92403f2012-11-20 15:13:59 +000064# The size of each chunk to read when downloading and unzipping files.
65ZIPPED_FILE_CHUNK = 16 * 1024
66
csharp@chromium.orgff2a4662012-11-21 20:49:32 +000067# The name of the log file to use.
68RUN_ISOLATED_LOG_FILE = 'run_isolated.log'
69
csharp@chromium.orge217f302012-11-22 16:51:53 +000070# The base directory containing this file.
71BASE_DIR = os.path.dirname(os.path.abspath(__file__))
72
73# The name of the log to use for the run_test_cases.py command
74RUN_TEST_CASES_LOG = os.path.join(BASE_DIR, 'run_test_cases.log')
75
csharp@chromium.org9c59ff12012-12-12 02:32:29 +000076# The delay (in seconds) to wait between logging statements when retrieving
77# the required files. This is intended to let the user (or buildbot) know that
78# the program is still running.
79DELAY_BETWEEN_UPDATES_IN_SECS = 30
80
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +000081# Maximum expected delay (in seconds) between successive file fetches
82# in run_tha_test. If it takes longer than that, a deadlock might be happening
83# and all stack frames for all threads are dumped to log.
84DEADLOCK_TIMEOUT = 5 * 60
85
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000086# The name of the key to store the count of url attempts.
87COUNT_KEY = 'UrlOpenAttempt'
88
maruel@chromium.org2b2139a2013-04-30 20:14:58 +000089# Default maximum number of attempts to trying opening a url before aborting.
90URL_OPEN_MAX_ATTEMPTS = 30
91# Default timeout when retrying.
92URL_OPEN_TIMEOUT = 6*60.
csharp@chromium.orgf13eec02013-03-11 18:22:56 +000093
vadimsh@chromium.org80f73002013-07-12 14:52:44 +000094# Read timeout in seconds for downloads from isolate storage. If there's no
95# response from the server within this timeout whole download will be aborted.
96DOWNLOAD_READ_TIMEOUT = 60
97
vadimsh@chromium.org87d63262013-04-04 19:34:21 +000098# Global (for now) map: server URL (http://example.com) -> HttpService instance.
99# Used by get_http_service to cache HttpService instances.
100_http_services = {}
101_http_services_lock = threading.Lock()
102
maruel@chromium.org9e9ceaa2013-04-05 15:42:42 +0000103# Used by get_flavor().
104FLAVOR_MAPPING = {
105 'cygwin': 'win',
106 'win32': 'win',
107 'darwin': 'mac',
108 'sunos5': 'solaris',
109 'freebsd7': 'freebsd',
110 'freebsd8': 'freebsd',
111}
112
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000113
114class ConfigError(ValueError):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +0000115 """Generic failure to load a .isolated file."""
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000116 pass
117
118
119class MappingError(OSError):
120 """Failed to recreate the tree."""
121 pass
122
123
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000124class TimeoutError(IOError):
125 """Timeout while reading HTTP response."""
126
127 def __init__(self, inner_exc=None):
128 super(TimeoutError, self).__init__(str(inner_exc or 'Timeout'))
129 self.inner_exc = inner_exc
130
131
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000132def get_flavor():
133 """Returns the system default flavor. Copied from gyp/pylib/gyp/common.py."""
maruel@chromium.org9e9ceaa2013-04-05 15:42:42 +0000134 return FLAVOR_MAPPING.get(sys.platform, 'linux')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000135
136
csharp@chromium.orgbfb98742013-03-26 20:28:36 +0000137def fix_default_encoding():
138 """Forces utf8 solidly on all platforms.
139
140 By default python execution environment is lazy and defaults to ascii
141 encoding.
142
143 http://uucode.com/blog/2007/03/23/shut-up-you-dummy-7-bit-python/
144 """
145 if sys.getdefaultencoding() == 'utf-8':
146 return False
147
148 # Regenerate setdefaultencoding.
149 reload(sys)
150 # Module 'sys' has no 'setdefaultencoding' member
151 # pylint: disable=E1101
152 sys.setdefaultencoding('utf-8')
153 for attr in dir(locale):
154 if attr[0:3] != 'LC_':
155 continue
156 aref = getattr(locale, attr)
157 try:
158 locale.setlocale(aref, '')
159 except locale.Error:
160 continue
161 try:
162 lang = locale.getlocale(aref)[0]
163 except (TypeError, ValueError):
164 continue
165 if lang:
166 try:
167 locale.setlocale(aref, (lang, 'UTF-8'))
168 except locale.Error:
169 os.environ[attr] = lang + '.UTF-8'
170 try:
171 locale.setlocale(locale.LC_ALL, '')
172 except locale.Error:
173 pass
174 return True
175
176
maruel@chromium.org46e61cc2013-03-25 19:55:34 +0000177class Unbuffered(object):
178 """Disable buffering on a file object."""
179 def __init__(self, stream):
180 self.stream = stream
181
182 def write(self, data):
183 self.stream.write(data)
184 if '\n' in data:
185 self.stream.flush()
186
187 def __getattr__(self, attr):
188 return getattr(self.stream, attr)
189
190
191def disable_buffering():
192 """Makes this process and child processes stdout unbuffered."""
193 if not os.environ.get('PYTHONUNBUFFERED'):
194 # Since sys.stdout is a C++ object, it's impossible to do
195 # sys.stdout.write = lambda...
196 sys.stdout = Unbuffered(sys.stdout)
197 os.environ['PYTHONUNBUFFERED'] = 'x'
198
199
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000200def os_link(source, link_name):
201 """Add support for os.link() on Windows."""
202 if sys.platform == 'win32':
203 if not ctypes.windll.kernel32.CreateHardLinkW(
204 unicode(link_name), unicode(source), 0):
205 raise OSError()
206 else:
207 os.link(source, link_name)
208
209
210def readable_copy(outfile, infile):
211 """Makes a copy of the file that is readable by everyone."""
csharp@chromium.org59d116d2013-07-05 18:04:08 +0000212 shutil.copy2(infile, outfile)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000213 read_enabled_mode = (os.stat(outfile).st_mode | stat.S_IRUSR |
214 stat.S_IRGRP | stat.S_IROTH)
215 os.chmod(outfile, read_enabled_mode)
216
217
218def link_file(outfile, infile, action):
219 """Links a file. The type of link depends on |action|."""
220 logging.debug('Mapping %s to %s' % (infile, outfile))
maruel@chromium.orgba6489b2013-07-11 20:23:33 +0000221 if action not in (HARDLINK, HARDLINK_WITH_FALLBACK, SYMLINK, COPY):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000222 raise ValueError('Unknown mapping action %s' % action)
223 if not os.path.isfile(infile):
224 raise MappingError('%s is missing' % infile)
225 if os.path.isfile(outfile):
226 raise MappingError(
227 '%s already exist; insize:%d; outsize:%d' %
228 (outfile, os.stat(infile).st_size, os.stat(outfile).st_size))
229
230 if action == COPY:
231 readable_copy(outfile, infile)
232 elif action == SYMLINK and sys.platform != 'win32':
233 # On windows, symlink are converted to hardlink and fails over to copy.
maruel@chromium.orgf43e68b2012-10-15 20:23:10 +0000234 os.symlink(infile, outfile) # pylint: disable=E1101
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000235 else:
236 try:
237 os_link(infile, outfile)
maruel@chromium.orgba6489b2013-07-11 20:23:33 +0000238 except OSError as e:
239 if action == HARDLINK:
240 raise MappingError(
241 'Failed to hardlink %s to %s: %s' % (infile, outfile, e))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000242 # Probably a different file system.
maruel@chromium.org9e98e432013-05-31 17:06:51 +0000243 logging.warning(
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000244 'Failed to hardlink, failing back to copy %s to %s' % (
245 infile, outfile))
246 readable_copy(outfile, infile)
247
248
249def _set_write_bit(path, read_only):
250 """Sets or resets the executable bit on a file or directory."""
251 mode = os.lstat(path).st_mode
252 if read_only:
253 mode = mode & 0500
254 else:
255 mode = mode | 0200
256 if hasattr(os, 'lchmod'):
257 os.lchmod(path, mode) # pylint: disable=E1101
258 else:
259 if stat.S_ISLNK(mode):
260 # Skip symlink without lchmod() support.
261 logging.debug('Can\'t change +w bit on symlink %s' % path)
262 return
263
264 # TODO(maruel): Implement proper DACL modification on Windows.
265 os.chmod(path, mode)
266
267
268def make_writable(root, read_only):
269 """Toggle the writable bit on a directory tree."""
csharp@chromium.org837352f2013-01-17 21:17:03 +0000270 assert os.path.isabs(root), root
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000271 for dirpath, dirnames, filenames in os.walk(root, topdown=True):
272 for filename in filenames:
273 _set_write_bit(os.path.join(dirpath, filename), read_only)
274
275 for dirname in dirnames:
276 _set_write_bit(os.path.join(dirpath, dirname), read_only)
277
278
279def rmtree(root):
280 """Wrapper around shutil.rmtree() to retry automatically on Windows."""
281 make_writable(root, False)
282 if sys.platform == 'win32':
283 for i in range(3):
284 try:
285 shutil.rmtree(root)
286 break
287 except WindowsError: # pylint: disable=E0602
288 delay = (i+1)*2
289 print >> sys.stderr, (
290 'The test has subprocess outliving it. Sleep %d seconds.' % delay)
291 time.sleep(delay)
292 else:
293 shutil.rmtree(root)
294
295
296def is_same_filesystem(path1, path2):
297 """Returns True if both paths are on the same filesystem.
298
299 This is required to enable the use of hardlinks.
300 """
301 assert os.path.isabs(path1), path1
302 assert os.path.isabs(path2), path2
303 if sys.platform == 'win32':
304 # If the drive letter mismatches, assume it's a separate partition.
305 # TODO(maruel): It should look at the underlying drive, a drive letter could
306 # be a mount point to a directory on another drive.
307 assert re.match(r'^[a-zA-Z]\:\\.*', path1), path1
308 assert re.match(r'^[a-zA-Z]\:\\.*', path2), path2
309 if path1[0].lower() != path2[0].lower():
310 return False
311 return os.stat(path1).st_dev == os.stat(path2).st_dev
312
313
314def get_free_space(path):
315 """Returns the number of free bytes."""
316 if sys.platform == 'win32':
317 free_bytes = ctypes.c_ulonglong(0)
318 ctypes.windll.kernel32.GetDiskFreeSpaceExW(
319 ctypes.c_wchar_p(path), None, None, ctypes.pointer(free_bytes))
320 return free_bytes.value
maruel@chromium.orgf43e68b2012-10-15 20:23:10 +0000321 # For OSes other than Windows.
322 f = os.statvfs(path) # pylint: disable=E1101
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000323 return f.f_bfree * f.f_frsize
324
325
326def make_temp_dir(prefix, root_dir):
327 """Returns a temporary directory on the same file system as root_dir."""
328 base_temp_dir = None
329 if not is_same_filesystem(root_dir, tempfile.gettempdir()):
330 base_temp_dir = os.path.dirname(root_dir)
331 return tempfile.mkdtemp(prefix=prefix, dir=base_temp_dir)
332
333
frankf@chromium.org3348ee02013-06-27 14:53:17 +0000334def load_isolated(content, os_flavor=None):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +0000335 """Verifies the .isolated file is valid and loads this object with the json
336 data.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000337 """
338 try:
339 data = json.loads(content)
340 except ValueError:
341 raise ConfigError('Failed to parse: %s...' % content[:100])
342
343 if not isinstance(data, dict):
344 raise ConfigError('Expected dict, got %r' % data)
345
346 for key, value in data.iteritems():
347 if key == 'command':
348 if not isinstance(value, list):
349 raise ConfigError('Expected list, got %r' % value)
maruel@chromium.org89ad2db2012-12-12 14:29:22 +0000350 if not value:
351 raise ConfigError('Expected non-empty command')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000352 for subvalue in value:
353 if not isinstance(subvalue, basestring):
354 raise ConfigError('Expected string, got %r' % subvalue)
355
356 elif key == 'files':
357 if not isinstance(value, dict):
358 raise ConfigError('Expected dict, got %r' % value)
359 for subkey, subvalue in value.iteritems():
360 if not isinstance(subkey, basestring):
361 raise ConfigError('Expected string, got %r' % subkey)
362 if not isinstance(subvalue, dict):
363 raise ConfigError('Expected dict, got %r' % subvalue)
364 for subsubkey, subsubvalue in subvalue.iteritems():
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000365 if subsubkey == 'l':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000366 if not isinstance(subsubvalue, basestring):
367 raise ConfigError('Expected string, got %r' % subsubvalue)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000368 elif subsubkey == 'm':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000369 if not isinstance(subsubvalue, int):
370 raise ConfigError('Expected int, got %r' % subsubvalue)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000371 elif subsubkey == 'h':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000372 if not RE_IS_SHA1.match(subsubvalue):
373 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000374 elif subsubkey == 's':
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000375 if not isinstance(subsubvalue, int):
376 raise ConfigError('Expected int, got %r' % subsubvalue)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000377 else:
378 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000379 if bool('h' in subvalue) and bool('l' in subvalue):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000380 raise ConfigError(
maruel@chromium.orge5c17132012-11-21 18:18:46 +0000381 'Did not expect both \'h\' (sha-1) and \'l\' (link), got: %r' %
382 subvalue)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000383
384 elif key == 'includes':
385 if not isinstance(value, list):
386 raise ConfigError('Expected list, got %r' % value)
maruel@chromium.org89ad2db2012-12-12 14:29:22 +0000387 if not value:
388 raise ConfigError('Expected non-empty includes list')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000389 for subvalue in value:
390 if not RE_IS_SHA1.match(subvalue):
391 raise ConfigError('Expected sha-1, got %r' % subvalue)
392
393 elif key == 'read_only':
394 if not isinstance(value, bool):
395 raise ConfigError('Expected bool, got %r' % value)
396
397 elif key == 'relative_cwd':
398 if not isinstance(value, basestring):
399 raise ConfigError('Expected string, got %r' % value)
400
401 elif key == 'os':
frankf@chromium.org3348ee02013-06-27 14:53:17 +0000402 expected_value = os_flavor or get_flavor()
403 if value != expected_value:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000404 raise ConfigError(
405 'Expected \'os\' to be \'%s\' but got \'%s\'' %
frankf@chromium.org3348ee02013-06-27 14:53:17 +0000406 (expected_value, value))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000407
408 else:
409 raise ConfigError('Unknown key %s' % key)
410
411 return data
412
413
414def fix_python_path(cmd):
415 """Returns the fixed command line to call the right python executable."""
416 out = cmd[:]
417 if out[0] == 'python':
418 out[0] = sys.executable
419 elif out[0].endswith('.py'):
420 out.insert(0, sys.executable)
421 return out
422
423
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000424def url_open(url, **kwargs):
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000425 """Attempts to open the given url multiple times.
426
427 |data| can be either:
428 -None for a GET request
429 -str for pre-encoded data
430 -list for data to be encoded
431 -dict for data to be encoded (COUNT_KEY will be added in this case)
432
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000433 Returns HttpResponse object, where the response may be read from, or None
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000434 if it was unable to connect.
435 """
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000436 urlhost, urlpath = split_server_request_url(url)
437 service = get_http_service(urlhost)
438 return service.request(urlpath, **kwargs)
439
440
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000441def url_read(url, **kwargs):
442 """Attempts to open the given url multiple times and read all data from it.
443
444 Accepts same arguments as url_open function.
445
446 Returns all data read or None if it was unable to connect or read the data.
447 """
448 response = url_open(url, **kwargs)
449 if not response:
450 return None
451 try:
452 return response.read()
453 except TimeoutError:
454 return None
455
456
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000457def split_server_request_url(url):
458 """Splits the url into scheme+netloc and path+params+query+fragment."""
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000459 url_parts = list(urlparse.urlparse(url))
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000460 urlhost = '%s://%s' % (url_parts[0], url_parts[1])
461 urlpath = urlparse.urlunparse(['', ''] + url_parts[2:])
462 return urlhost, urlpath
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000463
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000464
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000465def get_http_service(urlhost):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000466 """Returns existing or creates new instance of HttpService that can send
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000467 requests to given base urlhost.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000468 """
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000469 # Ensure consistency.
470 urlhost = str(urlhost).lower().rstrip('/')
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000471 with _http_services_lock:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000472 service = _http_services.get(urlhost)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000473 if not service:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000474 service = AppEngineService(urlhost)
475 _http_services[urlhost] = service
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000476 return service
477
478
479class HttpService(object):
480 """Base class for a class that provides an API to HTTP based service:
481 - Provides 'request' method.
482 - Supports automatic request retries.
483 - Supports persistent cookies.
484 - Thread safe.
485 """
486
487 # File to use to store all auth cookies.
maruel@chromium.orgbf2a02a2013-07-11 13:27:16 +0000488 COOKIE_FILE = os.path.join(os.path.expanduser('~'), '.isolated_cookies')
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000489
490 # CookieJar reused by all services + lock that protects its instantiation.
491 _cookie_jar = None
492 _cookie_jar_lock = threading.Lock()
493
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000494 def __init__(self, urlhost):
495 self.urlhost = urlhost
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000496 self.cookie_jar = self.load_cookie_jar()
497 self.opener = self.create_url_opener()
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000498
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000499 def authenticate(self): # pylint: disable=R0201
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000500 """Called when HTTP server asks client to authenticate.
501 Can be implemented in subclasses.
502 """
503 return False
504
505 @staticmethod
506 def load_cookie_jar():
507 """Returns global CoookieJar object that stores cookies in the file."""
508 with HttpService._cookie_jar_lock:
509 if HttpService._cookie_jar is not None:
510 return HttpService._cookie_jar
maruel@chromium.orgbf2a02a2013-07-11 13:27:16 +0000511 jar = ThreadSafeCookieJar(HttpService.COOKIE_FILE)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000512 jar.load()
513 HttpService._cookie_jar = jar
514 return jar
515
516 @staticmethod
517 def save_cookie_jar():
518 """Called when cookie jar needs to be flushed to disk."""
519 with HttpService._cookie_jar_lock:
520 if HttpService._cookie_jar is not None:
521 HttpService._cookie_jar.save()
522
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000523 def create_url_opener(self): # pylint: disable=R0201
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000524 """Returns OpenerDirector that will be used when sending requests.
525 Can be reimplemented in subclasses."""
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000526 return urllib2.build_opener(urllib2.HTTPCookieProcessor(self.cookie_jar))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000527
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000528 def request(self, urlpath, data=None, content_type=None, **kwargs):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000529 """Attempts to open the given url multiple times.
530
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000531 |urlpath| is relative to the server root, i.e. '/some/request?param=1'.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000532
533 |data| can be either:
534 -None for a GET request
535 -str for pre-encoded data
536 -list for data to be encoded
537 -dict for data to be encoded (COUNT_KEY will be added in this case)
538
539 Returns a file-like object, where the response may be read from, or None
540 if it was unable to connect.
541 """
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000542 assert urlpath and urlpath[0] == '/'
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000543
544 if isinstance(data, dict) and COUNT_KEY in data:
545 logging.error('%s already existed in the data passed into UlrOpen. It '
546 'would be overwritten. Aborting UrlOpen', COUNT_KEY)
547 return None
548
549 method = 'GET' if data is None else 'POST'
550 assert not ((method != 'POST') and content_type), (
551 'Can\'t use content_type on GET')
552
553 def make_request(extra):
554 """Returns a urllib2.Request instance for this specific retry."""
555 if isinstance(data, str) or data is None:
556 payload = data
557 else:
558 if isinstance(data, dict):
559 payload = data.items()
560 else:
561 payload = data[:]
562 payload.extend(extra.iteritems())
563 payload = urllib.urlencode(payload)
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000564 new_url = urlparse.urljoin(self.urlhost, urlpath[1:])
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000565 if isinstance(data, str) or data is None:
566 # In these cases, add the extra parameter to the query part of the url.
567 url_parts = list(urlparse.urlparse(new_url))
568 # Append the query parameter.
569 if url_parts[4] and extra:
570 url_parts[4] += '&'
571 url_parts[4] += urllib.urlencode(extra)
572 new_url = urlparse.urlunparse(url_parts)
573 request = urllib2.Request(new_url, data=payload)
574 if payload is not None:
575 if content_type:
576 request.add_header('Content-Type', content_type)
577 request.add_header('Content-Length', len(payload))
578 return request
579
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000580 return self._retry_loop(make_request, **kwargs)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000581
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000582 def _retry_loop(
583 self,
584 make_request,
585 max_attempts=URL_OPEN_MAX_ATTEMPTS,
586 retry_404=False,
587 retry_50x=True,
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000588 timeout=URL_OPEN_TIMEOUT,
589 read_timeout=None):
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000590 """Runs internal request-retry loop.
591
592 - Optionally retries HTTP 404 and 50x.
593 - Retries up to |max_attempts| times. If None or 0, there's no limit in the
594 number of retries.
595 - Retries up to |timeout| duration in seconds. If None or 0, there's no
596 limit in the time taken to do retries.
597 - If both |max_attempts| and |timeout| are None or 0, this functions retries
598 indefinitely.
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000599
600 If |read_timeout| is not None will configure underlying socket to
601 raise TimeoutError exception whenever there's no response from the server
602 for more than |read_timeout| seconds. It can happen during any read
603 operation so once you pass non-None |read_timeout| be prepared to handle
604 these exceptions in subsequent reads from the stream.
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000605 """
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000606 authenticated = False
607 last_error = None
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000608 attempt = 0
609 start = self._now()
610 for attempt in itertools.count():
611 if max_attempts and attempt >= max_attempts:
612 # Too many attempts.
613 break
614 if timeout and (self._now() - start) >= timeout:
615 # Retried for too long.
616 break
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000617 extra = {COUNT_KEY: attempt} if attempt else {}
618 request = make_request(extra)
619 try:
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000620 url_response = self._url_open(request, timeout=read_timeout)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000621 logging.debug('url_open(%s) succeeded', request.get_full_url())
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000622 return HttpResponse(url_response, request.get_full_url())
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000623 except urllib2.HTTPError as e:
624 # Unauthorized. Ask to authenticate and then try again.
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000625 if e.code in (401, 403):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000626 # Try to authenticate only once. If it doesn't help, then server does
627 # not support app engine authentication.
vadimsh@chromium.orga1697342013-04-10 22:57:09 +0000628 logging.error(
vadimsh@chromium.orgdde2d732013-04-10 21:12:52 +0000629 'Authentication is required for %s on attempt %d.\n%s',
630 request.get_full_url(), attempt,
631 self._format_exception(e, verbose=True))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000632 if not authenticated and self.authenticate():
633 authenticated = True
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000634 # Do not sleep.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000635 continue
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000636 # If authentication failed, return.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000637 logging.error(
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000638 'Unable to authenticate to %s.\n%s',
639 request.get_full_url(), self._format_exception(e, verbose=True))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000640 return None
641
maruel@chromium.orgd58bf5b2013-04-26 17:57:42 +0000642 if ((e.code < 500 and not (retry_404 and e.code == 404)) or
643 (e.code >= 500 and not retry_50x)):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000644 # This HTTPError means we reached the server and there was a problem
645 # with the request, so don't retry.
646 logging.error(
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000647 'Able to connect to %s but an exception was thrown.\n%s',
648 request.get_full_url(), self._format_exception(e, verbose=True))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000649 return None
650
651 # The HTTPError was due to a server error, so retry the attempt.
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000652 logging.warning('Able to connect to %s on attempt %d.\n%s',
653 request.get_full_url(), attempt,
654 self._format_exception(e))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000655 last_error = e
656
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000657 except (urllib2.URLError, httplib.HTTPException,
658 socket.timeout, ssl.SSLError) as e:
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000659 logging.warning('Unable to open url %s on attempt %d.\n%s',
660 request.get_full_url(), attempt,
661 self._format_exception(e))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000662 last_error = e
663
664 # Only sleep if we are going to try again.
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000665 if max_attempts and attempt != max_attempts:
666 remaining = None
667 if timeout:
668 remaining = timeout - (self._now() - start)
669 if remaining <= 0:
670 break
671 self.sleep_before_retry(attempt, remaining)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000672
673 logging.error('Unable to open given url, %s, after %d attempts.\n%s',
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000674 request.get_full_url(), max_attempts,
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000675 self._format_exception(last_error, verbose=True))
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000676 return None
677
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000678 def _url_open(self, request, timeout=None):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000679 """Low level method to execute urllib2.Request's.
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000680
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000681 To be mocked in tests.
682 """
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000683 if timeout is not None:
684 return self.opener.open(request, timeout=timeout)
685 else:
686 # Leave original default value for |timeout|. It's nontrivial.
687 return self.opener.open(request)
maruel@chromium.orgef333122013-03-12 20:36:40 +0000688
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000689 @staticmethod
690 def _now():
691 """To be mocked in tests."""
692 return time.time()
693
694 @staticmethod
695 def calculate_sleep_before_retry(attempt, max_duration):
696 # Maximum sleeping time. We're hammering a cloud-distributed service, it'll
697 # survive.
698 MAX_SLEEP = 10.
699 # random.random() returns [0.0, 1.0). Starts with relatively short waiting
700 # time by starting with 1.5/2+1.5^-1 median offset.
701 duration = (random.random() * 1.5) + math.pow(1.5, (attempt - 1))
702 assert duration > 0.1
703 duration = min(MAX_SLEEP, duration)
704 if max_duration:
705 duration = min(max_duration, duration)
706 return duration
707
708 @classmethod
709 def sleep_before_retry(cls, attempt, max_duration):
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000710 """Sleeps for some amount of time when retrying the request.
maruel@chromium.org2b2139a2013-04-30 20:14:58 +0000711
712 To be mocked in tests.
713 """
714 time.sleep(cls.calculate_sleep_before_retry(attempt, max_duration))
maruel@chromium.orgef333122013-03-12 20:36:40 +0000715
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000716 @staticmethod
717 def _format_exception(exc, verbose=False):
718 """Given an instance of some exception raised by urlopen returns human
719 readable piece of text with detailed information about the error.
720 """
721 out = ['Exception: %s' % (exc,)]
722 if verbose:
723 if isinstance(exc, urllib2.HTTPError):
724 out.append('-' * 10)
725 if exc.hdrs:
726 for header, value in exc.hdrs.items():
727 if not header.startswith('x-'):
728 out.append('%s: %s' % (header.capitalize(), value))
729 out.append('')
730 out.append(exc.read() or '<empty body>')
731 out.append('-' * 10)
732 return '\n'.join(out)
733
maruel@chromium.orgef333122013-03-12 20:36:40 +0000734
vadimsh@chromium.org80f73002013-07-12 14:52:44 +0000735class HttpResponse(object):
736 """Response from HttpService."""
737
738 def __init__(self, url_response, url):
739 self._url_response = url_response
740 self._url = url
741 self._read = 0
742
743 @property
744 def content_length(self):
745 """Total length to the response or None if not known in advance."""
746 length = self._url_response.headers.get('Content-Length')
747 return int(length) if length is not None else None
748
749 def read(self, size=None):
750 """Reads up to |size| bytes from the stream and returns them.
751
752 If |size| is None reads all available bytes.
753
754 Raises TimeoutError on read timeout.
755 """
756 try:
757 data = self._url_response.read(size)
758 self._read += len(data)
759 return data
760 except (socket.timeout, ssl.SSLError) as e:
761 logging.error('Timeout while reading from %s, read %d of %s: %s',
762 self._url, self._read, self.content_length, e)
763 raise TimeoutError(e)
764
765
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000766class AppEngineService(HttpService):
767 """This class implements authentication support for
768 an app engine based services.
maruel@chromium.orgef333122013-03-12 20:36:40 +0000769 """
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000770
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000771 # This lock ensures that user won't be confused with multiple concurrent
772 # login prompts.
773 _auth_lock = threading.Lock()
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000774
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000775 def __init__(self, urlhost, email=None, password=None):
776 super(AppEngineService, self).__init__(urlhost)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000777 self.email = email
778 self.password = password
779 self._keyring = None
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000780
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000781 def authenticate(self):
782 """Authenticates in the app engine application.
783 Returns True on success.
784 """
785 if not upload:
vadimsh@chromium.orga1697342013-04-10 22:57:09 +0000786 logging.error('\'upload\' module is missing, '
787 'app engine authentication is disabled.')
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000788 return False
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000789 cookie_jar = self.cookie_jar
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000790 save_cookie_jar = self.save_cookie_jar
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000791 # RPC server that uses AuthenticationSupport's cookie jar.
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000792 class AuthServer(upload.AbstractRpcServer):
793 def _GetOpener(self):
vadimsh@chromium.org2edbe3f2013-04-05 19:44:54 +0000794 # Authentication code needs to know about 302 response.
795 # So make OpenerDirector without HTTPRedirectHandler.
796 opener = urllib2.OpenerDirector()
797 opener.add_handler(urllib2.ProxyHandler())
798 opener.add_handler(urllib2.UnknownHandler())
799 opener.add_handler(urllib2.HTTPHandler())
800 opener.add_handler(urllib2.HTTPDefaultErrorHandler())
801 opener.add_handler(urllib2.HTTPSHandler())
802 opener.add_handler(urllib2.HTTPErrorProcessor())
803 opener.add_handler(urllib2.HTTPCookieProcessor(cookie_jar))
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000804 return opener
805 def PerformAuthentication(self):
806 self._Authenticate()
807 save_cookie_jar()
808 return self.authenticated
809 with AppEngineService._auth_lock:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000810 rpc_server = AuthServer(self.urlhost, self.get_credentials)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000811 return rpc_server.PerformAuthentication()
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000812
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000813 def get_credentials(self):
814 """Called during authentication process to get the credentials.
815 May be called mutliple times if authentication fails.
816 Returns tuple (email, password).
817 """
818 # 'authenticate' calls this only if 'upload' is present.
819 # Ensure other callers (if any) fail non-cryptically if 'upload' is missing.
820 assert upload, '\'upload\' module is required for this to work'
821 if self.email and self.password:
822 return (self.email, self.password)
823 if not self._keyring:
maruel@chromium.org000bb4d2013-04-26 17:53:27 +0000824 self._keyring = upload.KeyringCreds(self.urlhost,
825 self.urlhost,
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000826 self.email)
827 return self._keyring.GetUserCredentials()
828
829
830class ThreadSafeCookieJar(cookielib.MozillaCookieJar):
831 """MozillaCookieJar with thread safe load and save."""
832
833 def load(self, filename=None, ignore_discard=False, ignore_expires=False):
834 """Loads cookies from the file if it exists."""
maruel@chromium.org4e2676d2013-06-06 18:39:48 +0000835 filename = os.path.expanduser(filename or self.filename)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000836 with self._cookies_lock:
837 if os.path.exists(filename):
838 try:
839 cookielib.MozillaCookieJar.load(self, filename,
840 ignore_discard,
841 ignore_expires)
842 logging.debug('Loaded cookies from %s', filename)
843 except (cookielib.LoadError, IOError):
844 pass
845 else:
maruel@chromium.org16452a32013-04-05 00:18:44 +0000846 try:
847 fd = os.open(filename, os.O_CREAT, 0600)
848 os.close(fd)
849 except OSError:
850 logging.error('Failed to create %s', filename)
851 try:
852 os.chmod(filename, 0600)
853 except OSError:
854 logging.error('Failed to fix mode for %s', filename)
vadimsh@chromium.org87d63262013-04-04 19:34:21 +0000855
856 def save(self, filename=None, ignore_discard=False, ignore_expires=False):
857 """Saves cookies to the file, completely overwriting it."""
858 logging.debug('Saving cookies to %s', filename or self.filename)
859 with self._cookies_lock:
maruel@chromium.org16452a32013-04-05 00:18:44 +0000860 try:
861 cookielib.MozillaCookieJar.save(self, filename,
862 ignore_discard,
863 ignore_expires)
864 except OSError:
865 logging.error('Failed to save %s', filename)
csharp@chromium.orgf13eec02013-03-11 18:22:56 +0000866
867
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000868class ThreadPoolError(Exception):
869 """Base class for exceptions raised by ThreadPool."""
870
871
872class ThreadPoolEmpty(ThreadPoolError):
873 """Trying to get task result from a thread pool with no pending tasks."""
874
875
876class ThreadPoolClosed(ThreadPoolError):
877 """Trying to do something with a closed thread pool."""
878
879
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000880class ThreadPool(object):
881 """Implements a multithreaded worker pool oriented for mapping jobs with
882 thread-local result storage.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000883
884 Arguments:
885 - initial_threads: Number of threads to start immediately. Can be 0 if it is
886 uncertain that threads will be needed.
887 - max_threads: Maximum number of threads that will be started when all the
888 threads are busy working. Often the number of CPU cores.
889 - queue_size: Maximum number of tasks to buffer in the queue. 0 for unlimited
890 queue. A non-zero value may make add_task() blocking.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000891 - prefix: Prefix to use for thread names. Pool's threads will be
892 named '<prefix>-<thread index>'.
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000893 """
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000894 QUEUE_CLASS = Queue.PriorityQueue
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000895
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000896 def __init__(self, initial_threads, max_threads, queue_size, prefix=None):
897 prefix = prefix or 'tp-0x%0x' % id(self)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000898 logging.debug(
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000899 'New ThreadPool(%d, %d, %d): %s', initial_threads, max_threads,
900 queue_size, prefix)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000901 assert initial_threads <= max_threads
902 # Update this check once 256 cores CPU are common.
903 assert max_threads <= 256
904
maruel@chromium.orgeb281652012-11-08 21:10:23 +0000905 self.tasks = self.QUEUE_CLASS(queue_size)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000906 self._max_threads = max_threads
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000907 self._prefix = prefix
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000908
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000909 # Used to assign indexes to tasks.
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000910 self._num_of_added_tasks_lock = threading.Lock()
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000911 self._num_of_added_tasks = 0
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000912
913 # Lock that protected everything below (including conditional variable).
914 self._lock = threading.Lock()
915
916 # Condition 'bool(_outputs) or bool(_exceptions) or _pending_count == 0'.
917 self._outputs_exceptions_cond = threading.Condition(self._lock)
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000918 self._outputs = []
919 self._exceptions = []
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000920
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000921 # Number of pending tasks (queued or being processed now).
922 self._pending_count = 0
923
924 # List of threads.
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000925 self._workers = []
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000926 # Number of threads that are waiting for new tasks.
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000927 self._ready = 0
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000928 # Number of threads already added to _workers, but not yet running the loop.
929 self._starting = 0
930 # True if close was called. Forbids adding new tasks.
931 self._is_closed = False
932
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000933 for _ in range(initial_threads):
934 self._add_worker()
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000935
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000936 def _add_worker(self):
937 """Adds one worker thread if there isn't too many. Thread-safe."""
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000938 with self._lock:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000939 if len(self._workers) >= self._max_threads or self._is_closed:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000940 return False
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000941 worker = threading.Thread(
942 name='%s-%d' % (self._prefix, len(self._workers)), target=self._run)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000943 self._workers.append(worker)
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000944 self._starting += 1
945 logging.debug('Starting worker thread %s', worker.name)
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000946 worker.daemon = True
947 worker.start()
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000948 return True
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000949
maruel@chromium.org831958f2013-01-22 15:01:46 +0000950 def add_task(self, priority, func, *args, **kwargs):
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000951 """Adds a task, a function to be executed by a worker.
952
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000953 |priority| can adjust the priority of the task versus others. Lower priority
maruel@chromium.org831958f2013-01-22 15:01:46 +0000954 takes precedence.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000955
maruel@chromium.orgedd25d02013-03-26 14:38:00 +0000956 |func| can either return a return value to be added to the output list or
957 be a generator which can emit multiple values.
958
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000959 Returns the index of the item added, e.g. the total number of enqueued items
960 up to now.
maruel@chromium.org8df128b2012-11-08 19:05:04 +0000961 """
maruel@chromium.org831958f2013-01-22 15:01:46 +0000962 assert isinstance(priority, int)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000963 assert callable(func)
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000964 with self._lock:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +0000965 if self._is_closed:
966 raise ThreadPoolClosed('Can not add a task to a closed ThreadPool')
967 start_new_worker = (
968 # Pending task count plus new task > number of available workers.
969 self.tasks.qsize() + 1 > self._ready + self._starting and
970 # Enough slots.
971 len(self._workers) < self._max_threads
972 )
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000973 self._pending_count += 1
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000974 with self._num_of_added_tasks_lock:
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000975 self._num_of_added_tasks += 1
976 index = self._num_of_added_tasks
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000977 self.tasks.put((priority, index, func, args, kwargs))
978 if start_new_worker:
979 self._add_worker()
maruel@chromium.org13eca0b2013-01-22 16:42:21 +0000980 return index
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000981
982 def _run(self):
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000983 """Worker thread loop. Runs until a None task is queued."""
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000984 # Thread has started, adjust counters.
985 with self._lock:
986 self._starting -= 1
987 self._ready += 1
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000988 while True:
maruel@chromium.org5a1446a2013-01-17 15:13:27 +0000989 try:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000990 task = self.tasks.get()
991 finally:
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +0000992 with self._lock:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +0000993 self._ready -= 1
994 try:
995 if task is None:
996 # We're done.
997 return
998 _priority, _index, func, args, kwargs = task
maruel@chromium.orgedd25d02013-03-26 14:38:00 +0000999 if inspect.isgeneratorfunction(func):
1000 for out in func(*args, **kwargs):
1001 self._output_append(out)
1002 else:
1003 out = func(*args, **kwargs)
1004 self._output_append(out)
maruel@chromium.org5a1446a2013-01-17 15:13:27 +00001005 except Exception as e:
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001006 logging.warning('Caught exception: %s', e)
maruel@chromium.org5a1446a2013-01-17 15:13:27 +00001007 exc_info = sys.exc_info()
maruel@chromium.org97cd0be2013-03-13 14:01:36 +00001008 logging.info(''.join(traceback.format_tb(exc_info[2])))
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +00001009 with self._outputs_exceptions_cond:
maruel@chromium.org5a1446a2013-01-17 15:13:27 +00001010 self._exceptions.append(exc_info)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001011 self._outputs_exceptions_cond.notifyAll()
maruel@chromium.org5a1446a2013-01-17 15:13:27 +00001012 finally:
csharp@chromium.org60991182013-03-18 13:44:17 +00001013 try:
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +00001014 # Mark thread as ready again, mark task as processed. Do it before
1015 # waking up threads waiting on self.tasks.join(). Otherwise they might
1016 # find ThreadPool still 'busy' and perform unnecessary wait on CV.
1017 with self._outputs_exceptions_cond:
1018 self._ready += 1
1019 self._pending_count -= 1
1020 if self._pending_count == 0:
1021 self._outputs_exceptions_cond.notifyAll()
csharp@chromium.org60991182013-03-18 13:44:17 +00001022 self.tasks.task_done()
1023 except Exception as e:
1024 # We need to catch and log this error here because this is the root
1025 # function for the thread, nothing higher will catch the error.
1026 logging.exception('Caught exception while marking task as done: %s',
1027 e)
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001028
maruel@chromium.orgedd25d02013-03-26 14:38:00 +00001029 def _output_append(self, out):
1030 if out is not None:
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +00001031 with self._outputs_exceptions_cond:
maruel@chromium.orgedd25d02013-03-26 14:38:00 +00001032 self._outputs.append(out)
1033 self._outputs_exceptions_cond.notifyAll()
maruel@chromium.orgedd25d02013-03-26 14:38:00 +00001034
maruel@chromium.orgeb281652012-11-08 21:10:23 +00001035 def join(self):
maruel@chromium.org5a1446a2013-01-17 15:13:27 +00001036 """Extracts all the results from each threads unordered.
1037
1038 Call repeatedly to extract all the exceptions if desired.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001039
1040 Note: will wait for all work items to be done before returning an exception.
1041 To get an exception early, use get_one_result().
maruel@chromium.org5a1446a2013-01-17 15:13:27 +00001042 """
1043 # TODO(maruel): Stop waiting as soon as an exception is caught.
maruel@chromium.orgeb281652012-11-08 21:10:23 +00001044 self.tasks.join()
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +00001045 with self._outputs_exceptions_cond:
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +00001046 if self._exceptions:
1047 e = self._exceptions.pop(0)
1048 raise e[0], e[1], e[2]
maruel@chromium.org6b0c9ec2013-01-18 00:34:31 +00001049 out = self._outputs
1050 self._outputs = []
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001051 return out
1052
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001053 def get_one_result(self):
1054 """Returns the next item that was generated or raises an exception if one
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001055 occurred.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001056
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001057 Raises:
1058 ThreadPoolEmpty - no results available.
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001059 """
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001060 # Get first available result.
1061 for result in self.iter_results():
1062 return result
1063 # No results -> tasks queue is empty.
1064 raise ThreadPoolEmpty('Task queue is empty')
1065
1066 def iter_results(self):
1067 """Yields results as they appear until all tasks are processed."""
1068 while True:
1069 # Check for pending results.
1070 result = None
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +00001071 with self._outputs_exceptions_cond:
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001072 if self._exceptions:
1073 e = self._exceptions.pop(0)
1074 raise e[0], e[1], e[2]
1075 if self._outputs:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001076 # Remember the result to yield it outside of the lock.
1077 result = self._outputs.pop(0)
1078 else:
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +00001079 # No pending tasks -> all tasks are done.
1080 if not self._pending_count:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001081 return
1082 # Some task is queued, wait for its result to appear.
1083 # Use non-None timeout so that process reacts to Ctrl+C and other
1084 # signals, see http://bugs.python.org/issue8844.
1085 self._outputs_exceptions_cond.wait(timeout=5)
1086 continue
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001087 yield result
1088
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001089 def close(self):
1090 """Closes all the threads."""
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001091 # Ensure no new threads can be started, self._workers is effectively
1092 # a constant after that and can be accessed outside the lock.
vadimsh@chromium.org6e2eca62013-07-10 13:47:36 +00001093 with self._lock:
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001094 if self._is_closed:
1095 raise ThreadPoolClosed('Can not close already closed ThreadPool')
1096 self._is_closed = True
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001097 for _ in range(len(self._workers)):
1098 # Enqueueing None causes the worker to stop.
maruel@chromium.orgeb281652012-11-08 21:10:23 +00001099 self.tasks.put(None)
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001100 for t in self._workers:
1101 t.join()
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001102 logging.debug(
1103 'Thread pool \'%s\' closed: spawned %d threads total',
1104 self._prefix, len(self._workers))
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001105
1106 def __enter__(self):
1107 """Enables 'with' statement."""
1108 return self
1109
maruel@chromium.org97cd0be2013-03-13 14:01:36 +00001110 def __exit__(self, _exc_type, _exc_value, _traceback):
maruel@chromium.org8df128b2012-11-08 19:05:04 +00001111 """Enables 'with' statement."""
1112 self.close()
1113
1114
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001115def valid_file(filepath, size):
1116 """Determines if the given files appears valid (currently it just checks
1117 the file's size)."""
maruel@chromium.org770993b2012-12-11 17:16:48 +00001118 if size == UNKNOWN_FILE_SIZE:
1119 return True
1120 actual_size = os.stat(filepath).st_size
1121 if size != actual_size:
1122 logging.warning(
1123 'Found invalid item %s; %d != %d',
1124 os.path.basename(filepath), actual_size, size)
1125 return False
1126 return True
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001127
1128
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001129class Profiler(object):
1130 def __init__(self, name):
1131 self.name = name
1132 self.start_time = None
1133
1134 def __enter__(self):
1135 self.start_time = time.time()
1136 return self
1137
1138 def __exit__(self, _exc_type, _exec_value, _traceback):
1139 time_taken = time.time() - self.start_time
1140 logging.info('Profiling: Section %s took %3.3f seconds',
1141 self.name, time_taken)
1142
1143
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001144class DeadlockDetector(object):
1145 """Context manager that can detect deadlocks.
1146
1147 It will dump stack frames of all running threads if its 'ping' method isn't
1148 called in time.
1149
1150 Usage:
1151 with DeadlockDetector(timeout=60) as detector:
1152 for item in some_work():
1153 ...
1154 detector.ping()
1155 ...
1156
1157 Arguments:
1158 timeout - maximum allowed time between calls to 'ping'.
1159 """
1160
1161 def __init__(self, timeout):
1162 self.timeout = timeout
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001163 self._thread = None
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001164 # Thread stop condition. Also lock for shared variables below.
1165 self._stop_cv = threading.Condition()
1166 self._stop_flag = False
1167 # Time when 'ping' was called last time.
1168 self._last_ping = None
1169 # True if pings are coming on time.
1170 self._alive = True
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001171
1172 def __enter__(self):
1173 """Starts internal watcher thread."""
1174 assert self._thread is None
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001175 self.ping()
1176 self._thread = threading.Thread(name='deadlock-detector', target=self._run)
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001177 self._thread.daemon = True
1178 self._thread.start()
1179 return self
1180
1181 def __exit__(self, *_args):
1182 """Stops internal watcher thread."""
1183 assert self._thread is not None
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001184 with self._stop_cv:
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001185 self._stop_flag = True
1186 self._stop_cv.notify()
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001187 self._thread.join()
1188 self._thread = None
1189 self._stop_flag = False
1190
1191 def ping(self):
1192 """Notify detector that main thread is still running.
1193
1194 Should be called periodically to inform the detector that everything is
1195 running as it should.
1196 """
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001197 with self._stop_cv:
1198 self._last_ping = time.time()
1199 self._alive = True
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001200
1201 def _run(self):
1202 """Loop that watches for pings and dumps threads state if ping is late."""
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001203 with self._stop_cv:
1204 while not self._stop_flag:
1205 # Skipped deadline? Dump threads and switch to 'not alive' state.
1206 if self._alive and time.time() > self._last_ping + self.timeout:
1207 self.dump_threads(time.time() - self._last_ping, True)
1208 self._alive = False
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001209
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001210 # Pings are on time?
1211 if self._alive:
1212 # Wait until the moment we need to dump stack traces.
1213 # Most probably some other thread will call 'ping' to move deadline
1214 # further in time. We don't bother to wake up after each 'ping',
1215 # only right before initial expected deadline.
1216 self._stop_cv.wait(self._last_ping + self.timeout - time.time())
1217 else:
1218 # Skipped some pings previously. Just periodically silently check
1219 # for new pings with some arbitrary frequency.
1220 self._stop_cv.wait(self.timeout * 0.1)
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001221
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001222 @staticmethod
1223 def dump_threads(timeout=None, skip_current_thread=False):
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001224 """Dumps stack frames of all running threads."""
1225 all_threads = threading.enumerate()
1226 current_thread_id = threading.current_thread().ident
1227
1228 # Collect tracebacks: thread name -> traceback string.
1229 tracebacks = {}
1230
1231 # pylint: disable=W0212
1232 for thread_id, frame in sys._current_frames().iteritems():
1233 # Don't dump deadlock detector's own thread, it's boring.
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001234 if thread_id == current_thread_id and not skip_current_thread:
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001235 continue
1236
1237 # Try to get more informative symbolic thread name.
1238 name = 'untitled'
1239 for thread in all_threads:
1240 if thread.ident == thread_id:
1241 name = thread.name
1242 break
1243 name += ' #%d' % (thread_id,)
1244 tracebacks[name] = ''.join(traceback.format_stack(frame))
1245
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001246 # Function to print a message. Makes it easier to change output destination.
1247 def output(msg):
1248 logging.warning(msg.rstrip())
1249
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001250 # Print tracebacks, sorting them by thread name. That way a thread pool's
1251 # threads will be printed as one group.
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001252 output('=============== Potential deadlock detected ===============')
1253 if timeout is not None:
1254 output('No pings in last %d sec.' % (timeout,))
1255 output('Dumping stack frames for all threads:')
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001256 for name in sorted(tracebacks):
vadimsh@chromium.orgea769022013-07-11 13:35:49 +00001257 output('Traceback for \'%s\':\n%s' % (name, tracebacks[name]))
1258 output('===========================================================')
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001259
1260
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001261class Remote(object):
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001262 """Priority based worker queue to fetch or upload files from a
1263 content-address server. Any function may be given as the fetcher/upload,
1264 as long as it takes two inputs (the item contents, and their relative
1265 destination).
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001266
1267 Supports local file system, CIFS or http remotes.
1268
1269 When the priority of items is equals, works in strict FIFO mode.
1270 """
1271 # Initial and maximum number of worker threads.
1272 INITIAL_WORKERS = 2
1273 MAX_WORKERS = 16
1274 # Priorities.
1275 LOW, MED, HIGH = (1<<8, 2<<8, 3<<8)
1276 INTERNAL_PRIORITY_BITS = (1<<8) - 1
1277 RETRIES = 5
1278
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001279 def __init__(self, destination_root):
1280 # Function to fetch a remote object or upload to a remote location..
1281 self._do_item = self.get_file_handler(destination_root)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001282 # Contains tuple(priority, obj).
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001283 self._done = Queue.PriorityQueue()
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00001284 self._pool = ThreadPool(self.INITIAL_WORKERS, self.MAX_WORKERS, 0, 'remote')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001285
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001286 def join(self):
1287 """Blocks until the queue is empty."""
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001288 return self._pool.join()
maruel@chromium.orgfb155e92012-09-28 20:36:54 +00001289
vadimsh@chromium.org53f8d5a2013-06-19 13:03:55 +00001290 def close(self):
1291 """Terminates all worker threads."""
1292 self._pool.close()
1293
csharp@chromium.orgdf2968f2012-11-16 20:25:37 +00001294 def add_item(self, priority, obj, dest, size):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001295 """Retrieves an object from the remote data store.
1296
1297 The smaller |priority| gets fetched first.
1298
1299 Thread-safe.
1300 """
1301 assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001302 return self._add_item(priority, obj, dest, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001303
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001304 def _add_item(self, priority, obj, dest, size):
1305 assert isinstance(obj, basestring), obj
1306 assert isinstance(dest, basestring), dest
1307 assert size is None or isinstance(size, int), size
1308 return self._pool.add_task(
1309 priority, self._task_executer, priority, obj, dest, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001310
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001311 def get_one_result(self):
1312 return self._pool.get_one_result()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001313
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001314 def _task_executer(self, priority, obj, dest, size):
1315 """Wraps self._do_item to trap and retry on IOError exceptions."""
1316 try:
1317 self._do_item(obj, dest)
1318 if size and not valid_file(dest, size):
1319 download_size = os.stat(dest).st_size
1320 os.remove(dest)
1321 raise IOError('File incorrect size after download of %s. Got %s and '
1322 'expected %s' % (obj, download_size, size))
1323 # TODO(maruel): Technically, we'd want to have an output queue to be a
1324 # PriorityQueue.
1325 return obj
1326 except IOError as e:
1327 logging.debug('Caught IOError: %s', e)
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001328 # Remove unfinished download.
1329 if os.path.exists(dest):
1330 os.remove(dest)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001331 # Retry a few times, lowering the priority.
1332 if (priority & self.INTERNAL_PRIORITY_BITS) < self.RETRIES:
1333 self._add_item(priority + 1, obj, dest, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001334 return
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001335 raise
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001336
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001337 def get_file_handler(self, file_or_url): # pylint: disable=R0201
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001338 """Returns a object to retrieve objects from a remote."""
1339 if re.match(r'^https?://.+$', file_or_url):
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001340 return functools.partial(self._download_file, file_or_url)
1341 else:
1342 return functools.partial(self._copy_file, file_or_url)
csharp@chromium.orge9c8d942013-03-11 20:48:36 +00001343
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001344 @staticmethod
1345 def _download_file(base_url, item, dest):
1346 # TODO(maruel): Reuse HTTP connections. The stdlib doesn't make this
1347 # easy.
1348 try:
1349 zipped_source = base_url + item
1350 logging.debug('download_file(%s)', zipped_source)
csharp@chromium.orgec477752013-05-24 20:48:48 +00001351
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001352 # Because the app engine DB is only eventually consistent, retry
1353 # 404 errors because the file might just not be visible yet (even
1354 # though it has been uploaded).
1355 connection = url_open(zipped_source, retry_404=True,
1356 read_timeout=DOWNLOAD_READ_TIMEOUT)
1357 if not connection:
1358 raise IOError('Unable to open connection to %s' % zipped_source)
csharp@chromium.orgec477752013-05-24 20:48:48 +00001359
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001360 content_length = connection.content_length
1361 decompressor = zlib.decompressobj()
1362 size = 0
1363 with open(dest, 'wb') as f:
1364 while True:
1365 chunk = connection.read(ZIPPED_FILE_CHUNK)
1366 if not chunk:
1367 break
1368 size += len(chunk)
1369 f.write(decompressor.decompress(chunk))
1370 # Ensure that all the data was properly decompressed.
1371 uncompressed_data = decompressor.flush()
1372 assert not uncompressed_data
1373 except IOError as e:
1374 logging.error('Failed to download %s at %s.\n%s', item, dest, e)
1375 raise
1376 except httplib.HTTPException as e:
1377 msg = 'HTTPException while retrieving %s at %s.\n%s' % (item, dest, e)
1378 logging.error(msg)
1379 raise IOError(msg)
1380 except zlib.error as e:
1381 msg = 'Corrupted zlib for item %s. Processed %d of %s bytes.\n%s' % (
1382 item, size, content_length, e)
1383 logging.error(msg)
csharp@chromium.orge3413b42013-05-24 17:56:56 +00001384
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001385 # Testing seems to show that if a few machines are trying to download
1386 # the same blob, they can cause each other to fail. So if we hit a
1387 # zip error, this is the most likely cause (it only downloads some of
1388 # the data). Randomly sleep for between 5 and 25 seconds to try and
1389 # spread out the downloads.
1390 # TODO(csharp): Switch from blobstorage to cloud storage and see if
1391 # that solves the issue.
1392 sleep_duration = (random.random() * 20) + 5
1393 time.sleep(sleep_duration)
csharp@chromium.orga92403f2012-11-20 15:13:59 +00001394
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001395 raise IOError(msg)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001396
vadimsh@chromium.org80f73002013-07-12 14:52:44 +00001397 @staticmethod
1398 def _copy_file(base_path, item, dest):
1399 source = os.path.join(base_path, item)
1400 if source == dest:
1401 logging.info('Source and destination are the same, no action required')
1402 return
1403 logging.debug('copy_file(%s, %s)', source, dest)
1404 shutil.copy(source, dest)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001405
1406
1407class CachePolicies(object):
1408 def __init__(self, max_cache_size, min_free_space, max_items):
1409 """
1410 Arguments:
1411 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1412 cache is effectively a leak.
1413 - min_free_space: Trim if disk free space becomes lower than this value. If
1414 0, it unconditionally fill the disk.
1415 - max_items: Maximum number of items to keep in the cache. If 0, do not
1416 enforce a limit.
1417 """
1418 self.max_cache_size = max_cache_size
1419 self.min_free_space = min_free_space
1420 self.max_items = max_items
1421
1422
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001423class NoCache(object):
1424 """This class is intended to be usable everywhere the Cache class is.
1425 Instead of downloading to a cache, all files are downloaded to the target
1426 directory and then moved to where they are needed.
1427 """
1428
1429 def __init__(self, target_directory, remote):
1430 self.target_directory = target_directory
1431 self.remote = remote
1432
1433 def retrieve(self, priority, item, size):
1434 """Get the request file."""
1435 self.remote.add_item(priority, item, self.path(item), size)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001436 self.remote.get_one_result()
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001437
1438 def wait_for(self, items):
1439 """Download the first item of the given list if it is missing."""
1440 item = items.iterkeys().next()
1441
1442 if not os.path.exists(self.path(item)):
1443 self.remote.add_item(Remote.MED, item, self.path(item), UNKNOWN_FILE_SIZE)
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001444 downloaded = self.remote.get_one_result()
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001445 assert downloaded == item
1446
1447 return item
1448
1449 def path(self, item):
1450 return os.path.join(self.target_directory, item)
1451
1452
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001453class Cache(object):
1454 """Stateful LRU cache.
1455
1456 Saves its state as json file.
1457 """
1458 STATE_FILE = 'state.json'
1459
1460 def __init__(self, cache_dir, remote, policies):
1461 """
1462 Arguments:
1463 - cache_dir: Directory where to place the cache.
1464 - remote: Remote where to fetch items from.
1465 - policies: cache retention policies.
1466 """
1467 self.cache_dir = cache_dir
1468 self.remote = remote
1469 self.policies = policies
1470 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001471 # The tuple(file, size) are kept as an array in a LRU style. E.g.
1472 # self.state[0] is the oldest item.
1473 self.state = []
1474 self._state_need_to_be_saved = False
1475 # A lookup map to speed up searching.
1476 self._lookup = {}
1477 self._lookup_is_stale = True
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001478
1479 # Items currently being fetched. Keep it local to reduce lock contention.
1480 self._pending_queue = set()
1481
1482 # Profiling values.
1483 self._added = []
1484 self._removed = []
1485 self._free_disk = 0
1486
maruel@chromium.org770993b2012-12-11 17:16:48 +00001487 with Profiler('Setup'):
1488 if not os.path.isdir(self.cache_dir):
1489 os.makedirs(self.cache_dir)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001490 if os.path.isfile(self.state_file):
1491 try:
1492 self.state = json.load(open(self.state_file, 'r'))
1493 except (IOError, ValueError), e:
1494 # Too bad. The file will be overwritten and the cache cleared.
1495 logging.error(
1496 'Broken state file %s, ignoring.\n%s' % (self.STATE_FILE, e))
1497 self._state_need_to_be_saved = True
1498 if (not isinstance(self.state, list) or
1499 not all(
1500 isinstance(i, (list, tuple)) and len(i) == 2
1501 for i in self.state)):
1502 # Discard.
1503 self._state_need_to_be_saved = True
1504 self.state = []
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001505
maruel@chromium.org770993b2012-12-11 17:16:48 +00001506 # Ensure that all files listed in the state still exist and add new ones.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001507 previous = set(filename for filename, _ in self.state)
1508 if len(previous) != len(self.state):
1509 logging.warning('Cache state is corrupted, found duplicate files')
1510 self._state_need_to_be_saved = True
1511 self.state = []
1512
1513 added = 0
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001514 for filename in os.listdir(self.cache_dir):
1515 if filename == self.STATE_FILE:
1516 continue
1517 if filename in previous:
1518 previous.remove(filename)
1519 continue
1520 # An untracked file.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001521 if not RE_IS_SHA1.match(filename):
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001522 logging.warning('Removing unknown file %s from cache', filename)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001523 os.remove(self.path(filename))
maruel@chromium.org770993b2012-12-11 17:16:48 +00001524 continue
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001525 # Insert as the oldest file. It will be deleted eventually if not
1526 # accessed.
1527 self._add(filename, False)
1528 logging.warning('Add unknown file %s to cache', filename)
1529 added += 1
maruel@chromium.org770993b2012-12-11 17:16:48 +00001530
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001531 if added:
1532 logging.warning('Added back %d unknown files', added)
maruel@chromium.org770993b2012-12-11 17:16:48 +00001533 if previous:
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001534 logging.warning('Removed %d lost files', len(previous))
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001535 # Set explicitly in case self._add() wasn't called.
1536 self._state_need_to_be_saved = True
1537 # Filter out entries that were not found while keeping the previous
1538 # order.
1539 self.state = [
1540 (filename, size) for filename, size in self.state
1541 if filename not in previous
1542 ]
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001543 self.trim()
1544
1545 def __enter__(self):
1546 return self
1547
1548 def __exit__(self, _exc_type, _exec_value, _traceback):
1549 with Profiler('CleanupTrimming'):
1550 self.trim()
1551
1552 logging.info(
maruel@chromium.org5fd6f472012-12-11 00:26:08 +00001553 '%5d (%8dkb) added', len(self._added), sum(self._added) / 1024)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001554 logging.info(
maruel@chromium.org5fd6f472012-12-11 00:26:08 +00001555 '%5d (%8dkb) current',
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001556 len(self.state),
1557 sum(i[1] for i in self.state) / 1024)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001558 logging.info(
maruel@chromium.org5fd6f472012-12-11 00:26:08 +00001559 '%5d (%8dkb) removed', len(self._removed), sum(self._removed) / 1024)
1560 logging.info(' %8dkb free', self._free_disk / 1024)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001561
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001562 def remove_file_at_index(self, index):
1563 """Removes the file at the given index."""
1564 try:
1565 self._state_need_to_be_saved = True
1566 filename, size = self.state.pop(index)
1567 # If the lookup was already stale, its possible the filename was not
1568 # present yet.
1569 self._lookup_is_stale = True
1570 self._lookup.pop(filename, None)
1571 self._removed.append(size)
1572 os.remove(self.path(filename))
1573 except OSError as e:
1574 logging.error('Error attempting to delete a file\n%s' % e)
1575
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001576 def remove_lru_file(self):
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001577 """Removes the last recently used file."""
1578 self.remove_file_at_index(0)
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001579
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001580 def trim(self):
1581 """Trims anything we don't know, make sure enough free space exists."""
1582 # Ensure maximum cache size.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001583 if self.policies.max_cache_size and self.state:
1584 while sum(i[1] for i in self.state) > self.policies.max_cache_size:
1585 self.remove_lru_file()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001586
1587 # Ensure maximum number of items in the cache.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001588 if self.policies.max_items and self.state:
1589 while len(self.state) > self.policies.max_items:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001590 self.remove_lru_file()
1591
1592 # Ensure enough free space.
1593 self._free_disk = get_free_space(self.cache_dir)
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001594 trimmed_due_to_space = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001595 while (
1596 self.policies.min_free_space and
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001597 self.state and
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001598 self._free_disk < self.policies.min_free_space):
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001599 trimmed_due_to_space = True
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001600 self.remove_lru_file()
1601 self._free_disk = get_free_space(self.cache_dir)
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001602 if trimmed_due_to_space:
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001603 total = sum(i[1] for i in self.state)
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001604 logging.warning(
1605 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1606 'cache (%.1f%% of its maximum capacity)',
1607 self._free_disk / 1024.,
1608 total / 1024.,
1609 100. * self.policies.max_cache_size / float(total),
1610 )
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001611 self.save()
1612
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001613 def retrieve(self, priority, item, size):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001614 """Retrieves a file from the remote, if not already cached, and adds it to
1615 the cache.
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001616
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001617 If the file is in the cache, verifiy that the file is valid (i.e. it is
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001618 the correct size), retrieving it again if it isn't.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001619 """
1620 assert not '/' in item
1621 path = self.path(item)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001622 self._update_lookup()
1623 index = self._lookup.get(item)
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001624
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001625 if index is not None:
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001626 if not valid_file(self.path(item), size):
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001627 self.remove_file_at_index(index)
1628 index = None
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001629 else:
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001630 assert index < len(self.state)
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001631 # Was already in cache. Update it's LRU value by putting it at the end.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001632 self._state_need_to_be_saved = True
1633 self._lookup_is_stale = True
1634 self.state.append(self.state.pop(index))
csharp@chromium.org8dc52542012-11-08 20:29:55 +00001635
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001636 if index is None:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001637 if item in self._pending_queue:
1638 # Already pending. The same object could be referenced multiple times.
1639 return
maruel@chromium.org9e98e432013-05-31 17:06:51 +00001640 # TODO(maruel): It should look at the free disk space, the current cache
1641 # size and the size of the new item on every new item:
1642 # - Trim the cache as more entries are listed when free disk space is low,
1643 # otherwise if the amount of data downloaded during the run > free disk
1644 # space, it'll crash.
1645 # - Make sure there's enough free disk space to fit all dependencies of
1646 # this run! If not, abort early.
csharp@chromium.orgdf2968f2012-11-16 20:25:37 +00001647 self.remote.add_item(priority, item, path, size)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001648 self._pending_queue.add(item)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001649
1650 def add(self, filepath, obj):
1651 """Forcibly adds a file to the cache."""
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001652 self._update_lookup()
1653 if not obj in self._lookup:
maruel@chromium.orgba6489b2013-07-11 20:23:33 +00001654 link_file(self.path(obj), filepath, HARDLINK_WITH_FALLBACK)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001655 self._add(obj, True)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001656
1657 def path(self, item):
1658 """Returns the path to one item."""
1659 return os.path.join(self.cache_dir, item)
1660
1661 def save(self):
1662 """Saves the LRU ordering."""
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001663 if self._state_need_to_be_saved:
1664 json.dump(self.state, open(self.state_file, 'wb'), separators=(',',':'))
1665 self._state_need_to_be_saved = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001666
1667 def wait_for(self, items):
1668 """Starts a loop that waits for at least one of |items| to be retrieved.
1669
1670 Returns the first item retrieved.
1671 """
1672 # Flush items already present.
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001673 self._update_lookup()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001674 for item in items:
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001675 if item in self._lookup:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001676 return item
1677
1678 assert all(i in self._pending_queue for i in items), (
1679 items, self._pending_queue)
1680 # Note that:
1681 # len(self._pending_queue) ==
1682 # ( len(self.remote._workers) - self.remote._ready +
1683 # len(self._remote._queue) + len(self._remote.done))
1684 # There is no lock-free way to verify that.
1685 while self._pending_queue:
maruel@chromium.org13eca0b2013-01-22 16:42:21 +00001686 item = self.remote.get_one_result()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001687 self._pending_queue.remove(item)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001688 self._add(item, True)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001689 if item in items:
1690 return item
1691
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001692 def _add(self, item, at_end):
1693 """Adds an item in the internal state.
1694
1695 If |at_end| is False, self._lookup becomes inconsistent and
1696 self._update_lookup() must be called.
1697 """
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001698 size = os.stat(self.path(item)).st_size
1699 self._added.append(size)
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001700 self._state_need_to_be_saved = True
1701 if at_end:
1702 self.state.append((item, size))
1703 self._lookup[item] = len(self.state) - 1
1704 else:
1705 self._lookup_is_stale = True
1706 self.state.insert(0, (item, size))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001707
vadimsh@chromium.orga40428e2013-07-04 15:43:14 +00001708 def _update_lookup(self):
1709 if self._lookup_is_stale:
1710 self._lookup = dict(
1711 (filename, index) for index, (filename, _) in enumerate(self.state))
1712 self._lookup_is_stale = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001713
1714
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001715class IsolatedFile(object):
1716 """Represents a single parsed .isolated file."""
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001717 def __init__(self, obj_hash):
1718 """|obj_hash| is really the sha-1 of the file."""
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001719 logging.debug('IsolatedFile(%s)' % obj_hash)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001720 self.obj_hash = obj_hash
1721 # Set once all the left-side of the tree is parsed. 'Tree' here means the
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001722 # .isolate and all the .isolated files recursively included by it with
1723 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1724 # .isolated file in the hash table, is important, as the later ones are not
1725 # processed until the firsts are retrieved and read.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001726 self.can_fetch = False
1727
1728 # Raw data.
1729 self.data = {}
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001730 # A IsolatedFile instance, one per object in self.includes.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001731 self.children = []
1732
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001733 # Set once the .isolated file is loaded.
1734 self._is_parsed = False
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001735 # Set once the files are fetched.
1736 self.files_fetched = False
1737
1738 def load(self, content):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001739 """Verifies the .isolated file is valid and loads this object with the json
1740 data.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001741 """
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001742 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1743 assert not self._is_parsed
1744 self.data = load_isolated(content)
1745 self.children = [IsolatedFile(i) for i in self.data.get('includes', [])]
1746 self._is_parsed = True
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001747
1748 def fetch_files(self, cache, files):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001749 """Adds files in this .isolated file not present in |files| dictionary.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001750
1751 Preemptively request files.
1752
1753 Note that |files| is modified by this function.
1754 """
1755 assert self.can_fetch
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001756 if not self._is_parsed or self.files_fetched:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001757 return
1758 logging.debug('fetch_files(%s)' % self.obj_hash)
1759 for filepath, properties in self.data.get('files', {}).iteritems():
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001760 # Root isolated has priority on the files being mapped. In particular,
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001761 # overriden files must not be fetched.
1762 if filepath not in files:
1763 files[filepath] = properties
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001764 if 'h' in properties:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001765 # Preemptively request files.
1766 logging.debug('fetching %s' % filepath)
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001767 cache.retrieve(Remote.MED, properties['h'], properties['s'])
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001768 self.files_fetched = True
1769
1770
1771class Settings(object):
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001772 """Results of a completely parsed .isolated file."""
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001773 def __init__(self):
1774 self.command = []
1775 self.files = {}
1776 self.read_only = None
1777 self.relative_cwd = None
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001778 # The main .isolated file, a IsolatedFile instance.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001779 self.root = None
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001780
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001781 def load(self, cache, root_isolated_hash):
1782 """Loads the .isolated and all the included .isolated asynchronously.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001783
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001784 It enables support for "included" .isolated files. They are processed in
1785 strict order but fetched asynchronously from the cache. This is important so
1786 that a file in an included .isolated file that is overridden by an embedding
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001787 .isolated file is not fetched needlessly. The includes are fetched in one
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001788 pass and the files are fetched as soon as all the ones on the left-side
1789 of the tree were fetched.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001790
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001791 The prioritization is very important here for nested .isolated files.
1792 'includes' have the highest priority and the algorithm is optimized for both
1793 deep and wide trees. A deep one is a long link of .isolated files referenced
1794 one at a time by one item in 'includes'. A wide one has a large number of
1795 'includes' in a single .isolated file. 'left' is defined as an included
1796 .isolated file earlier in the 'includes' list. So the order of the elements
1797 in 'includes' is important.
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001798 """
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001799 self.root = IsolatedFile(root_isolated_hash)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001800
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001801 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1802 pending = {}
1803 # Set of hashes of already retrieved items to refuse recursive includes.
1804 seen = set()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001805
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001806 def retrieve(isolated_file):
1807 h = isolated_file.obj_hash
1808 if h in seen:
1809 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1810 assert h not in pending
1811 seen.add(h)
1812 pending[h] = isolated_file
1813 cache.retrieve(Remote.HIGH, h, UNKNOWN_FILE_SIZE)
1814
1815 retrieve(self.root)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001816
1817 while pending:
1818 item_hash = cache.wait_for(pending)
1819 item = pending.pop(item_hash)
1820 item.load(open(cache.path(item_hash), 'r').read())
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001821 if item_hash == root_isolated_hash:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001822 # It's the root item.
1823 item.can_fetch = True
1824
1825 for new_child in item.children:
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001826 retrieve(new_child)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001827
1828 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001829 self._traverse_tree(cache, self.root)
1830
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001831 def check(n):
1832 return all(check(x) for x in n.children) and n.files_fetched
1833 assert check(self.root)
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001834
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001835 self.relative_cwd = self.relative_cwd or ''
1836 self.read_only = self.read_only or False
1837
vadimsh@chromium.orgf4c063e2013-07-04 14:23:31 +00001838 def _traverse_tree(self, cache, node):
1839 if node.can_fetch:
1840 if not node.files_fetched:
1841 self._update_self(cache, node)
1842 will_break = False
1843 for i in node.children:
1844 if not i.can_fetch:
1845 if will_break:
1846 break
1847 # Automatically mark the first one as fetcheable.
1848 i.can_fetch = True
1849 will_break = True
1850 self._traverse_tree(cache, i)
1851
1852 def _update_self(self, cache, node):
1853 node.fetch_files(cache, self.files)
1854 # Grabs properties.
1855 if not self.command and node.data.get('command'):
1856 self.command = node.data['command']
1857 if self.read_only is None and node.data.get('read_only') is not None:
1858 self.read_only = node.data['read_only']
1859 if (self.relative_cwd is None and
1860 node.data.get('relative_cwd') is not None):
1861 self.relative_cwd = node.data['relative_cwd']
1862
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001863
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001864def create_directories(base_directory, files):
1865 """Creates the directory structure needed by the given list of files."""
1866 logging.debug('create_directories(%s, %d)', base_directory, len(files))
1867 # Creates the tree of directories to create.
1868 directories = set(os.path.dirname(f) for f in files)
1869 for item in list(directories):
1870 while item:
1871 directories.add(item)
1872 item = os.path.dirname(item)
1873 for d in sorted(directories):
1874 if d:
1875 os.mkdir(os.path.join(base_directory, d))
1876
1877
1878def create_links(base_directory, files):
1879 """Creates any links needed by the given set of files."""
1880 for filepath, properties in files:
csharp@chromium.org89eaf082013-03-26 18:56:21 +00001881 if 'l' not in properties:
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001882 continue
maruel@chromium.org3320ee12013-03-28 13:23:31 +00001883 if sys.platform == 'win32':
1884 # TODO(maruel): Create junctions or empty text files similar to what
1885 # cygwin do?
1886 logging.warning('Ignoring symlink %s', filepath)
1887 continue
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001888 outfile = os.path.join(base_directory, filepath)
1889 # symlink doesn't exist on Windows. So the 'link' property should
1890 # never be specified for windows .isolated file.
1891 os.symlink(properties['l'], outfile) # pylint: disable=E1101
1892 if 'm' in properties:
1893 lchmod = getattr(os, 'lchmod', None)
1894 if lchmod:
1895 lchmod(outfile, properties['m'])
1896
1897
1898def setup_commands(base_directory, cwd, cmd):
1899 """Correctly adjusts and then returns the required working directory
1900 and command needed to run the test.
1901 """
1902 assert not os.path.isabs(cwd), 'The cwd must be a relative path, got %s' % cwd
1903 cwd = os.path.join(base_directory, cwd)
1904 if not os.path.isdir(cwd):
1905 os.makedirs(cwd)
1906
1907 # Ensure paths are correctly separated on windows.
1908 cmd[0] = cmd[0].replace('/', os.path.sep)
1909 cmd = fix_python_path(cmd)
1910
1911 return cwd, cmd
1912
1913
1914def generate_remaining_files(files):
1915 """Generates a dictionary of all the remaining files to be downloaded."""
1916 remaining = {}
1917 for filepath, props in files:
1918 if 'h' in props:
1919 remaining.setdefault(props['h'], []).append((filepath, props))
1920
1921 return remaining
1922
1923
1924def download_test_data(isolated_hash, target_directory, remote):
1925 """Downloads the dependencies to the given directory."""
1926 if not os.path.exists(target_directory):
1927 os.makedirs(target_directory)
1928
1929 settings = Settings()
1930 no_cache = NoCache(target_directory, Remote(remote))
1931
1932 # Download all the isolated files.
1933 with Profiler('GetIsolateds') as _prof:
1934 settings.load(no_cache, isolated_hash)
1935
1936 if not settings.command:
1937 print >> sys.stderr, 'No command to run'
1938 return 1
1939
1940 with Profiler('GetRest') as _prof:
1941 create_directories(target_directory, settings.files)
1942 create_links(target_directory, settings.files.iteritems())
1943
1944 cwd, cmd = setup_commands(target_directory, settings.relative_cwd,
1945 settings.command[:])
1946
1947 remaining = generate_remaining_files(settings.files.iteritems())
1948
1949 # Now block on the remaining files to be downloaded and mapped.
1950 logging.info('Retrieving remaining files')
1951 last_update = time.time()
1952 while remaining:
1953 obj = no_cache.wait_for(remaining)
1954 files = remaining.pop(obj)
1955
1956 for i, (filepath, properties) in enumerate(files):
1957 outfile = os.path.join(target_directory, filepath)
1958 logging.info(no_cache.path(obj))
1959
1960 if i + 1 == len(files):
1961 os.rename(no_cache.path(obj), outfile)
1962 else:
1963 shutil.copyfile(no_cache.path(obj), outfile)
1964
maruel@chromium.orgbaa108d2013-03-28 13:24:51 +00001965 if 'm' in properties and not sys.platform == 'win32':
1966 # It's not set on Windows. It could be set only in the case of
1967 # downloading content generated from another OS. Do not crash in that
1968 # case.
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001969 os.chmod(outfile, properties['m'])
1970
1971 if time.time() - last_update > DELAY_BETWEEN_UPDATES_IN_SECS:
csharp@chromium.org5daba352013-07-03 17:29:27 +00001972 msg = '%d files remaining...' % len(remaining)
1973 print msg
1974 logging.info(msg)
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00001975 last_update = time.time()
1976
1977 print('.isolated files successfully downloaded and setup in %s' %
1978 target_directory)
1979 print('To run this test please run the command %s from the directory %s' %
1980 (cmd, cwd))
1981
1982 return 0
1983
1984
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001985def run_tha_test(isolated_hash, cache_dir, remote, policies):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001986 """Downloads the dependencies in the cache, hardlinks them into a temporary
1987 directory and runs the executable.
1988 """
1989 settings = Settings()
1990 with Cache(cache_dir, Remote(remote), policies) as cache:
1991 outdir = make_temp_dir('run_tha_test', cache_dir)
1992 try:
1993 # Initiate all the files download.
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001994 with Profiler('GetIsolateds') as _prof:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001995 # Optionally support local files.
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00001996 if not RE_IS_SHA1.match(isolated_hash):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001997 # Adds it in the cache. While not strictly necessary, this simplifies
1998 # the rest.
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001999 h = hashlib.sha1(open(isolated_hash, 'rb').read()).hexdigest()
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002000 cache.add(isolated_hash, h)
2001 isolated_hash = h
2002 settings.load(cache, isolated_hash)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002003
2004 if not settings.command:
2005 print >> sys.stderr, 'No command to run'
2006 return 1
2007
2008 with Profiler('GetRest') as _prof:
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002009 create_directories(outdir, settings.files)
2010 create_links(outdir, settings.files.iteritems())
2011 remaining = generate_remaining_files(settings.files.iteritems())
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002012
2013 # Do bookkeeping while files are being downloaded in the background.
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002014 cwd, cmd = setup_commands(outdir, settings.relative_cwd,
2015 settings.command[:])
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002016
2017 # Now block on the remaining files to be downloaded and mapped.
csharp@chromium.org9c59ff12012-12-12 02:32:29 +00002018 logging.info('Retrieving remaining files')
2019 last_update = time.time()
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00002020 with DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
2021 while remaining:
2022 detector.ping()
2023 obj = cache.wait_for(remaining)
2024 for filepath, properties in remaining.pop(obj):
2025 outfile = os.path.join(outdir, filepath)
maruel@chromium.orgb7c003d2013-07-24 13:04:30 +00002026 link_file(outfile, cache.path(obj), HARDLINK)
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00002027 if 'm' in properties:
2028 # It's not set on Windows.
2029 os.chmod(outfile, properties['m'])
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002030
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00002031 if time.time() - last_update > DELAY_BETWEEN_UPDATES_IN_SECS:
2032 msg = '%d files remaining...' % len(remaining)
2033 print msg
2034 logging.info(msg)
2035 last_update = time.time()
csharp@chromium.org9c59ff12012-12-12 02:32:29 +00002036
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002037 if settings.read_only:
vadimsh@chromium.org5db0f4f2013-07-04 13:57:02 +00002038 logging.info('Making files read only')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002039 make_writable(outdir, True)
2040 logging.info('Running %s, cwd=%s' % (cmd, cwd))
csharp@chromium.orge217f302012-11-22 16:51:53 +00002041
2042 # TODO(csharp): This should be specified somewhere else.
2043 # Add a rotating log file if one doesn't already exist.
2044 env = os.environ.copy()
2045 env.setdefault('RUN_TEST_CASES_LOG_FILE', RUN_TEST_CASES_LOG)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002046 try:
2047 with Profiler('RunTest') as _prof:
csharp@chromium.orge217f302012-11-22 16:51:53 +00002048 return subprocess.call(cmd, cwd=cwd, env=env)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002049 except OSError:
2050 print >> sys.stderr, 'Failed to run %s; cwd=%s' % (cmd, cwd)
2051 raise
2052 finally:
2053 rmtree(outdir)
2054
2055
2056def main():
maruel@chromium.org46e61cc2013-03-25 19:55:34 +00002057 disable_buffering()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002058 parser = optparse.OptionParser(
2059 usage='%prog <options>', description=sys.modules[__name__].__doc__)
2060 parser.add_option(
2061 '-v', '--verbose', action='count', default=0, help='Use multiple times')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002062
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002063 group = optparse.OptionGroup(parser, 'Download')
2064 group.add_option(
2065 '--download', metavar='DEST',
2066 help='Downloads files to DEST and returns without running, instead of '
2067 'downloading and then running from a temporary directory.')
2068 parser.add_option_group(group)
2069
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002070 group = optparse.OptionGroup(parser, 'Data source')
2071 group.add_option(
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002072 '-s', '--isolated',
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002073 metavar='FILE',
2074 help='File/url describing what to map or run')
2075 group.add_option(
2076 '-H', '--hash',
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002077 help='Hash of the .isolated to grab from the hash table')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002078 parser.add_option_group(group)
2079
2080 group.add_option(
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002081 '-r', '--remote', metavar='URL',
2082 default=
2083 'https://isolateserver.appspot.com/content/retrieve/default-gzip/',
2084 help='Remote where to get the items. Defaults to %default')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002085 group = optparse.OptionGroup(parser, 'Cache management')
2086 group.add_option(
2087 '--cache',
2088 default='cache',
2089 metavar='DIR',
2090 help='Cache directory, default=%default')
2091 group.add_option(
2092 '--max-cache-size',
2093 type='int',
2094 metavar='NNN',
2095 default=20*1024*1024*1024,
2096 help='Trim if the cache gets larger than this value, default=%default')
2097 group.add_option(
2098 '--min-free-space',
2099 type='int',
2100 metavar='NNN',
maruel@chromium.org9e98e432013-05-31 17:06:51 +00002101 default=2*1024*1024*1024,
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002102 help='Trim if disk free space becomes lower than this value, '
2103 'default=%default')
2104 group.add_option(
2105 '--max-items',
2106 type='int',
2107 metavar='NNN',
2108 default=100000,
2109 help='Trim if more than this number of items are in the cache '
2110 'default=%default')
2111 parser.add_option_group(group)
2112
2113 options, args = parser.parse_args()
maruel@chromium.org9e98e432013-05-31 17:06:51 +00002114 levels = [logging.WARNING, logging.INFO, logging.DEBUG]
2115 level = levels[min(len(levels) - 1, options.verbose)]
csharp@chromium.orgff2a4662012-11-21 20:49:32 +00002116
2117 logging_console = logging.StreamHandler()
2118 logging_console.setFormatter(logging.Formatter(
2119 '%(levelname)5s %(module)15s(%(lineno)3d): %(message)s'))
2120 logging_console.setLevel(level)
2121 logging.getLogger().addHandler(logging_console)
2122
2123 logging_rotating_file = logging.handlers.RotatingFileHandler(
2124 RUN_ISOLATED_LOG_FILE,
2125 maxBytes=10 * 1024 * 1024, backupCount=5)
2126 logging_rotating_file.setLevel(logging.DEBUG)
2127 logging_rotating_file.setFormatter(logging.Formatter(
2128 '%(asctime)s %(levelname)-8s %(module)15s(%(lineno)3d): %(message)s'))
2129 logging.getLogger().addHandler(logging_rotating_file)
2130
2131 logging.getLogger().setLevel(logging.DEBUG)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002132
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002133 if bool(options.isolated) == bool(options.hash):
maruel@chromium.org5dd75dd2012-12-03 15:11:32 +00002134 logging.debug('One and only one of --isolated or --hash is required.')
maruel@chromium.org0cd0b182012-10-22 13:34:15 +00002135 parser.error('One and only one of --isolated or --hash is required.')
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002136 if args:
maruel@chromium.org5dd75dd2012-12-03 15:11:32 +00002137 logging.debug('Unsupported args %s' % ' '.join(args))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002138 parser.error('Unsupported args %s' % ' '.join(args))
2139
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002140 options.cache = os.path.abspath(options.cache)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002141 policies = CachePolicies(
2142 options.max_cache_size, options.min_free_space, options.max_items)
csharp@chromium.orgffd8cf02013-01-09 21:57:38 +00002143
2144 if options.download:
2145 return download_test_data(options.isolated or options.hash,
2146 options.download, options.remote)
2147 else:
2148 try:
2149 return run_tha_test(
2150 options.isolated or options.hash,
2151 options.cache,
2152 options.remote,
2153 policies)
2154 except Exception, e:
2155 # Make sure any exception is logged.
2156 logging.exception(e)
2157 return 1
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002158
2159
2160if __name__ == '__main__':
csharp@chromium.orgbfb98742013-03-26 20:28:36 +00002161 # Ensure that we are always running with the correct encoding.
2162 fix_default_encoding()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00002163 sys.exit(main())