blob: a0b9a160b22ad377dbe4d62384280e799870603d [file] [log] [blame]
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +00001#!/usr/bin/env python
2# Copyright (c) 2012 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Reads a manifest, creates a tree of hardlinks and runs the test.
7
8Keeps a local cache.
9"""
10
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000011import ctypes
12import hashlib
13import json
14import logging
15import optparse
16import os
17import Queue
18import re
19import shutil
20import stat
21import subprocess
22import sys
23import tempfile
24import threading
25import time
26import urllib
27
28
29# Types of action accepted by recreate_tree().
30HARDLINK, SYMLINK, COPY = range(1, 4)
31
32RE_IS_SHA1 = re.compile(r'^[a-fA-F0-9]{40}$')
33
34
35class ConfigError(ValueError):
36 """Generic failure to load a manifest."""
37 pass
38
39
40class MappingError(OSError):
41 """Failed to recreate the tree."""
42 pass
43
44
45def get_flavor():
46 """Returns the system default flavor. Copied from gyp/pylib/gyp/common.py."""
47 flavors = {
48 'cygwin': 'win',
49 'win32': 'win',
50 'darwin': 'mac',
51 'sunos5': 'solaris',
52 'freebsd7': 'freebsd',
53 'freebsd8': 'freebsd',
54 }
55 return flavors.get(sys.platform, 'linux')
56
57
58def os_link(source, link_name):
59 """Add support for os.link() on Windows."""
60 if sys.platform == 'win32':
61 if not ctypes.windll.kernel32.CreateHardLinkW(
62 unicode(link_name), unicode(source), 0):
63 raise OSError()
64 else:
65 os.link(source, link_name)
66
67
68def readable_copy(outfile, infile):
69 """Makes a copy of the file that is readable by everyone."""
70 shutil.copy(infile, outfile)
71 read_enabled_mode = (os.stat(outfile).st_mode | stat.S_IRUSR |
72 stat.S_IRGRP | stat.S_IROTH)
73 os.chmod(outfile, read_enabled_mode)
74
75
76def link_file(outfile, infile, action):
77 """Links a file. The type of link depends on |action|."""
78 logging.debug('Mapping %s to %s' % (infile, outfile))
79 if action not in (HARDLINK, SYMLINK, COPY):
80 raise ValueError('Unknown mapping action %s' % action)
81 if not os.path.isfile(infile):
82 raise MappingError('%s is missing' % infile)
83 if os.path.isfile(outfile):
84 raise MappingError(
85 '%s already exist; insize:%d; outsize:%d' %
86 (outfile, os.stat(infile).st_size, os.stat(outfile).st_size))
87
88 if action == COPY:
89 readable_copy(outfile, infile)
90 elif action == SYMLINK and sys.platform != 'win32':
91 # On windows, symlink are converted to hardlink and fails over to copy.
maruel@chromium.orgf43e68b2012-10-15 20:23:10 +000092 os.symlink(infile, outfile) # pylint: disable=E1101
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +000093 else:
94 try:
95 os_link(infile, outfile)
96 except OSError:
97 # Probably a different file system.
98 logging.warn(
99 'Failed to hardlink, failing back to copy %s to %s' % (
100 infile, outfile))
101 readable_copy(outfile, infile)
102
103
104def _set_write_bit(path, read_only):
105 """Sets or resets the executable bit on a file or directory."""
106 mode = os.lstat(path).st_mode
107 if read_only:
108 mode = mode & 0500
109 else:
110 mode = mode | 0200
111 if hasattr(os, 'lchmod'):
112 os.lchmod(path, mode) # pylint: disable=E1101
113 else:
114 if stat.S_ISLNK(mode):
115 # Skip symlink without lchmod() support.
116 logging.debug('Can\'t change +w bit on symlink %s' % path)
117 return
118
119 # TODO(maruel): Implement proper DACL modification on Windows.
120 os.chmod(path, mode)
121
122
123def make_writable(root, read_only):
124 """Toggle the writable bit on a directory tree."""
125 root = os.path.abspath(root)
126 for dirpath, dirnames, filenames in os.walk(root, topdown=True):
127 for filename in filenames:
128 _set_write_bit(os.path.join(dirpath, filename), read_only)
129
130 for dirname in dirnames:
131 _set_write_bit(os.path.join(dirpath, dirname), read_only)
132
133
134def rmtree(root):
135 """Wrapper around shutil.rmtree() to retry automatically on Windows."""
136 make_writable(root, False)
137 if sys.platform == 'win32':
138 for i in range(3):
139 try:
140 shutil.rmtree(root)
141 break
142 except WindowsError: # pylint: disable=E0602
143 delay = (i+1)*2
144 print >> sys.stderr, (
145 'The test has subprocess outliving it. Sleep %d seconds.' % delay)
146 time.sleep(delay)
147 else:
148 shutil.rmtree(root)
149
150
151def is_same_filesystem(path1, path2):
152 """Returns True if both paths are on the same filesystem.
153
154 This is required to enable the use of hardlinks.
155 """
156 assert os.path.isabs(path1), path1
157 assert os.path.isabs(path2), path2
158 if sys.platform == 'win32':
159 # If the drive letter mismatches, assume it's a separate partition.
160 # TODO(maruel): It should look at the underlying drive, a drive letter could
161 # be a mount point to a directory on another drive.
162 assert re.match(r'^[a-zA-Z]\:\\.*', path1), path1
163 assert re.match(r'^[a-zA-Z]\:\\.*', path2), path2
164 if path1[0].lower() != path2[0].lower():
165 return False
166 return os.stat(path1).st_dev == os.stat(path2).st_dev
167
168
169def get_free_space(path):
170 """Returns the number of free bytes."""
171 if sys.platform == 'win32':
172 free_bytes = ctypes.c_ulonglong(0)
173 ctypes.windll.kernel32.GetDiskFreeSpaceExW(
174 ctypes.c_wchar_p(path), None, None, ctypes.pointer(free_bytes))
175 return free_bytes.value
maruel@chromium.orgf43e68b2012-10-15 20:23:10 +0000176 # For OSes other than Windows.
177 f = os.statvfs(path) # pylint: disable=E1101
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000178 return f.f_bfree * f.f_frsize
179
180
181def make_temp_dir(prefix, root_dir):
182 """Returns a temporary directory on the same file system as root_dir."""
183 base_temp_dir = None
184 if not is_same_filesystem(root_dir, tempfile.gettempdir()):
185 base_temp_dir = os.path.dirname(root_dir)
186 return tempfile.mkdtemp(prefix=prefix, dir=base_temp_dir)
187
188
189def load_manifest(content):
190 """Verifies the manifest is valid and loads this object with the json data.
191 """
192 try:
193 data = json.loads(content)
194 except ValueError:
195 raise ConfigError('Failed to parse: %s...' % content[:100])
196
197 if not isinstance(data, dict):
198 raise ConfigError('Expected dict, got %r' % data)
199
200 for key, value in data.iteritems():
201 if key == 'command':
202 if not isinstance(value, list):
203 raise ConfigError('Expected list, got %r' % value)
204 for subvalue in value:
205 if not isinstance(subvalue, basestring):
206 raise ConfigError('Expected string, got %r' % subvalue)
207
208 elif key == 'files':
209 if not isinstance(value, dict):
210 raise ConfigError('Expected dict, got %r' % value)
211 for subkey, subvalue in value.iteritems():
212 if not isinstance(subkey, basestring):
213 raise ConfigError('Expected string, got %r' % subkey)
214 if not isinstance(subvalue, dict):
215 raise ConfigError('Expected dict, got %r' % subvalue)
216 for subsubkey, subsubvalue in subvalue.iteritems():
217 if subsubkey == 'link':
218 if not isinstance(subsubvalue, basestring):
219 raise ConfigError('Expected string, got %r' % subsubvalue)
220 elif subsubkey == 'mode':
221 if not isinstance(subsubvalue, int):
222 raise ConfigError('Expected int, got %r' % subsubvalue)
223 elif subsubkey == 'sha-1':
224 if not RE_IS_SHA1.match(subsubvalue):
225 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
226 elif subsubkey == 'size':
227 if not isinstance(subsubvalue, int):
228 raise ConfigError('Expected int, got %r' % subsubvalue)
229 elif subsubkey == 'timestamp':
230 if not isinstance(subsubvalue, int):
231 raise ConfigError('Expected int, got %r' % subsubvalue)
232 elif subsubkey == 'touched_only':
233 if not isinstance(subsubvalue, bool):
234 raise ConfigError('Expected bool, got %r' % subsubvalue)
235 else:
236 raise ConfigError('Unknown subsubkey %s' % subsubkey)
237 if bool('sha-1' in subvalue) and bool('link' in subvalue):
238 raise ConfigError(
239 'Did not expect both \'sha-1\' and \'link\', got: %r' % subvalue)
240
241 elif key == 'includes':
242 if not isinstance(value, list):
243 raise ConfigError('Expected list, got %r' % value)
244 for subvalue in value:
245 if not RE_IS_SHA1.match(subvalue):
246 raise ConfigError('Expected sha-1, got %r' % subvalue)
247
248 elif key == 'read_only':
249 if not isinstance(value, bool):
250 raise ConfigError('Expected bool, got %r' % value)
251
252 elif key == 'relative_cwd':
253 if not isinstance(value, basestring):
254 raise ConfigError('Expected string, got %r' % value)
255
256 elif key == 'os':
257 if value != get_flavor():
258 raise ConfigError(
259 'Expected \'os\' to be \'%s\' but got \'%s\'' %
260 (get_flavor(), value))
261
262 else:
263 raise ConfigError('Unknown key %s' % key)
264
265 return data
266
267
268def fix_python_path(cmd):
269 """Returns the fixed command line to call the right python executable."""
270 out = cmd[:]
271 if out[0] == 'python':
272 out[0] = sys.executable
273 elif out[0].endswith('.py'):
274 out.insert(0, sys.executable)
275 return out
276
277
278class Profiler(object):
279 def __init__(self, name):
280 self.name = name
281 self.start_time = None
282
283 def __enter__(self):
284 self.start_time = time.time()
285 return self
286
287 def __exit__(self, _exc_type, _exec_value, _traceback):
288 time_taken = time.time() - self.start_time
289 logging.info('Profiling: Section %s took %3.3f seconds',
290 self.name, time_taken)
291
292
293class Remote(object):
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000294 """Priority based worker queue to fetch or upload files from a
295 content-address server. Any function may be given as the fetcher/upload,
296 as long as it takes two inputs (the item contents, and their relative
297 destination).
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000298
299 Supports local file system, CIFS or http remotes.
300
301 When the priority of items is equals, works in strict FIFO mode.
302 """
303 # Initial and maximum number of worker threads.
304 INITIAL_WORKERS = 2
305 MAX_WORKERS = 16
306 # Priorities.
307 LOW, MED, HIGH = (1<<8, 2<<8, 3<<8)
308 INTERNAL_PRIORITY_BITS = (1<<8) - 1
309 RETRIES = 5
310
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000311 def __init__(self, destination_root):
312 # Function to fetch a remote object or upload to a remote location..
313 self._do_item = self.get_file_handler(destination_root)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000314 # Contains tuple(priority, index, obj, destination).
315 self._queue = Queue.PriorityQueue()
316 # Contains tuple(priority, index, obj).
317 self._done = Queue.PriorityQueue()
318
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000319 # Contains generated exceptions that haven't been handled yet.
320 self._exceptions = Queue.Queue()
321
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000322 # To keep FIFO ordering in self._queue. It is assumed xrange's iterator is
323 # thread-safe.
324 self._next_index = xrange(0, 1<<30).__iter__().next
325
326 # Control access to the following member.
327 self._ready_lock = threading.Lock()
328 # Number of threads in wait state.
329 self._ready = 0
330
331 # Control access to the following member.
332 self._workers_lock = threading.Lock()
333 self._workers = []
334 for _ in range(self.INITIAL_WORKERS):
335 self._add_worker()
336
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000337 def join(self):
338 """Blocks until the queue is empty."""
339 self._queue.join()
340
341 def next_exception(self):
342 """Returns the next unhandled exception, or None if there is
343 no exception."""
344 try:
345 return self._exceptions.get_nowait()
346 except Queue.Empty:
347 return None
348
349 def add_item(self, priority, obj, dest):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000350 """Retrieves an object from the remote data store.
351
352 The smaller |priority| gets fetched first.
353
354 Thread-safe.
355 """
356 assert (priority & self.INTERNAL_PRIORITY_BITS) == 0
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000357 self._add_to_queue(priority, obj, dest)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000358
359 def get_result(self):
360 """Returns the next file that was successfully fetched."""
361 r = self._done.get()
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000362 if r[0] == -1:
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000363 # It's an exception.
364 raise r[2][0], r[2][1], r[2][2]
365 return r[2]
366
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000367 def _add_to_queue(self, priority, obj, dest):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000368 with self._ready_lock:
369 start_new_worker = not self._ready
370 self._queue.put((priority, self._next_index(), obj, dest))
371 if start_new_worker:
372 self._add_worker()
373
374 def _add_worker(self):
375 """Add one worker thread if there isn't too many. Thread-safe."""
376 with self._workers_lock:
377 if len(self._workers) >= self.MAX_WORKERS:
378 return False
379 worker = threading.Thread(target=self._run)
380 self._workers.append(worker)
381 worker.daemon = True
382 worker.start()
383
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000384 def _step_done(self, result):
385 """Worker helper function"""
386 self._done.put(result)
387 self._queue.task_done()
388 if result[0] == -1:
389 self._exceptions.put(sys.exc_info())
390
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000391 def _run(self):
392 """Worker thread loop."""
393 while True:
394 try:
395 with self._ready_lock:
396 self._ready += 1
397 item = self._queue.get()
398 finally:
399 with self._ready_lock:
400 self._ready -= 1
401 if not item:
402 return
403 priority, index, obj, dest = item
404 try:
405 self._do_item(obj, dest)
406 except IOError:
407 # Retry a few times, lowering the priority.
408 if (priority & self.INTERNAL_PRIORITY_BITS) < self.RETRIES:
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000409 self._add_to_queue(priority + 1, obj, dest)
410 self._queue.task_done()
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000411 continue
412 # Transfers the exception back. It has maximum priority.
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000413 self._step_done((-1, 0, sys.exc_info()))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000414 except:
415 # Transfers the exception back. It has maximum priority.
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000416 self._step_done((-1, 0, sys.exc_info()))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000417 else:
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000418 self._step_done((priority, index, obj))
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000419
420 @staticmethod
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000421 def get_file_handler(file_or_url):
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000422 """Returns a object to retrieve objects from a remote."""
423 if re.match(r'^https?://.+$', file_or_url):
maruel@chromium.org986c2c42012-10-04 14:39:33 +0000424 # TODO(maruel): This is particularly hackish. It shouldn't rstrip('/') in
425 # the first place or try to append '/'.
426 if not file_or_url.endswith('='):
427 file_or_url = file_or_url.rstrip('/') + '/'
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000428 def download_file(item, dest):
429 # TODO(maruel): Reuse HTTP connections. The stdlib doesn't make this
430 # easy.
431 source = file_or_url + item
432 logging.debug('download_file(%s, %s)', source, dest)
433 urllib.urlretrieve(source, dest)
434 return download_file
435
436 def copy_file(item, dest):
437 source = os.path.join(file_or_url, item)
438 logging.debug('copy_file(%s, %s)', source, dest)
439 shutil.copy(source, dest)
440 return copy_file
441
442
443class CachePolicies(object):
444 def __init__(self, max_cache_size, min_free_space, max_items):
445 """
446 Arguments:
447 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
448 cache is effectively a leak.
449 - min_free_space: Trim if disk free space becomes lower than this value. If
450 0, it unconditionally fill the disk.
451 - max_items: Maximum number of items to keep in the cache. If 0, do not
452 enforce a limit.
453 """
454 self.max_cache_size = max_cache_size
455 self.min_free_space = min_free_space
456 self.max_items = max_items
457
458
459class Cache(object):
460 """Stateful LRU cache.
461
462 Saves its state as json file.
463 """
464 STATE_FILE = 'state.json'
465
466 def __init__(self, cache_dir, remote, policies):
467 """
468 Arguments:
469 - cache_dir: Directory where to place the cache.
470 - remote: Remote where to fetch items from.
471 - policies: cache retention policies.
472 """
473 self.cache_dir = cache_dir
474 self.remote = remote
475 self.policies = policies
476 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
477 # The tuple(file, size) are kept as an array in a LRU style. E.g.
478 # self.state[0] is the oldest item.
479 self.state = []
480 # A lookup map to speed up searching.
481 self._lookup = {}
482 self._dirty = False
483
484 # Items currently being fetched. Keep it local to reduce lock contention.
485 self._pending_queue = set()
486
487 # Profiling values.
488 self._added = []
489 self._removed = []
490 self._free_disk = 0
491
492 if not os.path.isdir(self.cache_dir):
493 os.makedirs(self.cache_dir)
494 if os.path.isfile(self.state_file):
495 try:
496 self.state = json.load(open(self.state_file, 'r'))
497 except (IOError, ValueError), e:
498 # Too bad. The file will be overwritten and the cache cleared.
499 logging.error(
500 'Broken state file %s, ignoring.\n%s' % (self.STATE_FILE, e))
501 if (not isinstance(self.state, list) or
502 not all(
503 isinstance(i, (list, tuple)) and len(i) == 2 for i in self.state)):
504 # Discard.
505 self.state = []
506 self._dirty = True
507
508 # Ensure that all files listed in the state still exist and add new ones.
509 previous = set(filename for filename, _ in self.state)
510 if len(previous) != len(self.state):
511 logging.warn('Cache state is corrupted')
512 self._dirty = True
513 self.state = []
514 else:
515 added = 0
516 for filename in os.listdir(self.cache_dir):
517 if filename == self.STATE_FILE:
518 continue
519 if filename in previous:
520 previous.remove(filename)
521 continue
522 # An untracked file.
523 self._dirty = True
524 if not RE_IS_SHA1.match(filename):
525 logging.warn('Removing unknown file %s from cache', filename)
526 os.remove(self.path(filename))
527 else:
528 # Insert as the oldest file. It will be deleted eventually if not
529 # accessed.
530 self._add(filename, False)
531 added += 1
532 if added:
533 logging.warn('Added back %d unknown files', added)
534 self.state = [
535 (filename, size) for filename, size in self.state
536 if filename not in previous
537 ]
538 self._update_lookup()
539
540 with Profiler('SetupTrimming'):
541 self.trim()
542
543 def __enter__(self):
544 return self
545
546 def __exit__(self, _exc_type, _exec_value, _traceback):
547 with Profiler('CleanupTrimming'):
548 self.trim()
549
550 logging.info(
551 '%4d (%7dkb) added', len(self._added), sum(self._added) / 1024)
552 logging.info(
553 '%4d (%7dkb) current',
554 len(self.state),
555 sum(i[1] for i in self.state) / 1024)
556 logging.info(
557 '%4d (%7dkb) removed', len(self._removed), sum(self._removed) / 1024)
558 logging.info('%7dkb free', self._free_disk / 1024)
559
560 def remove_lru_file(self):
561 """Removes the last recently used file."""
562 try:
563 filename, size = self.state.pop(0)
564 del self._lookup[filename]
565 self._removed.append(size)
566 os.remove(self.path(filename))
567 self._dirty = True
568 except OSError as e:
569 logging.error('Error attempting to delete a file\n%s' % e)
570
571 def trim(self):
572 """Trims anything we don't know, make sure enough free space exists."""
573 # Ensure maximum cache size.
574 if self.policies.max_cache_size and self.state:
575 while sum(i[1] for i in self.state) > self.policies.max_cache_size:
576 self.remove_lru_file()
577
578 # Ensure maximum number of items in the cache.
579 if self.policies.max_items and self.state:
580 while len(self.state) > self.policies.max_items:
581 self.remove_lru_file()
582
583 # Ensure enough free space.
584 self._free_disk = get_free_space(self.cache_dir)
585 while (
586 self.policies.min_free_space and
587 self.state and
588 self._free_disk < self.policies.min_free_space):
589 self.remove_lru_file()
590 self._free_disk = get_free_space(self.cache_dir)
591
592 self.save()
593
594 def retrieve(self, priority, item):
595 """Retrieves a file from the remote, if not already cached, and adds it to
596 the cache.
597 """
598 assert not '/' in item
599 path = self.path(item)
600 index = self._lookup.get(item)
601 if index is None:
602 if item in self._pending_queue:
603 # Already pending. The same object could be referenced multiple times.
604 return
maruel@chromium.orgfb155e92012-09-28 20:36:54 +0000605 self.remote.add_item(priority, item, path)
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000606 self._pending_queue.add(item)
607 else:
608 if index != len(self.state) - 1:
609 # Was already in cache. Update it's LRU value by putting it at the end.
610 self.state.append(self.state.pop(index))
611 self._dirty = True
612 self._update_lookup()
613
614 def add(self, filepath, obj):
615 """Forcibly adds a file to the cache."""
616 if not obj in self._lookup:
617 link_file(self.path(obj), filepath, HARDLINK)
618 self._add(obj, True)
619
620 def path(self, item):
621 """Returns the path to one item."""
622 return os.path.join(self.cache_dir, item)
623
624 def save(self):
625 """Saves the LRU ordering."""
626 json.dump(self.state, open(self.state_file, 'wb'), separators=(',',':'))
627
628 def wait_for(self, items):
629 """Starts a loop that waits for at least one of |items| to be retrieved.
630
631 Returns the first item retrieved.
632 """
633 # Flush items already present.
634 for item in items:
635 if item in self._lookup:
636 return item
637
638 assert all(i in self._pending_queue for i in items), (
639 items, self._pending_queue)
640 # Note that:
641 # len(self._pending_queue) ==
642 # ( len(self.remote._workers) - self.remote._ready +
643 # len(self._remote._queue) + len(self._remote.done))
644 # There is no lock-free way to verify that.
645 while self._pending_queue:
646 item = self.remote.get_result()
647 self._pending_queue.remove(item)
648 self._add(item, True)
649 if item in items:
650 return item
651
652 def _add(self, item, at_end):
653 """Adds an item in the internal state.
654
655 If |at_end| is False, self._lookup becomes inconsistent and
656 self._update_lookup() must be called.
657 """
658 size = os.stat(self.path(item)).st_size
659 self._added.append(size)
660 if at_end:
661 self.state.append((item, size))
662 self._lookup[item] = len(self.state) - 1
663 else:
664 self.state.insert(0, (item, size))
665 self._dirty = True
666
667 def _update_lookup(self):
668 self._lookup = dict(
669 (filename, index) for index, (filename, _) in enumerate(self.state))
670
671
672
673class Manifest(object):
maruel@chromium.org4b57f692012-10-05 20:33:09 +0000674 """Represents a single parsed manifest, e.g. a .isolated file."""
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000675 def __init__(self, obj_hash):
676 """|obj_hash| is really the sha-1 of the file."""
677 logging.debug('Manifest(%s)' % obj_hash)
678 self.obj_hash = obj_hash
679 # Set once all the left-side of the tree is parsed. 'Tree' here means the
680 # manifest and all the manifest recursively included by it with 'includes'
681 # key. The order of each manifest sha-1 in 'includes' is important, as the
682 # later ones are not processed until the firsts are retrieved and read.
683 self.can_fetch = False
684
685 # Raw data.
686 self.data = {}
687 # A Manifest instance, one per object in self.includes.
688 self.children = []
689
690 # Set once the manifest is loaded.
691 self._manifest_parsed = False
692 # Set once the files are fetched.
693 self.files_fetched = False
694
695 def load(self, content):
696 """Verifies the manifest is valid and loads this object with the json data.
697 """
698 logging.debug('Manifest.load(%s)' % self.obj_hash)
699 assert not self._manifest_parsed
700 self.data = load_manifest(content)
701 self.children = [Manifest(i) for i in self.data.get('includes', [])]
702 self._manifest_parsed = True
703
704 def fetch_files(self, cache, files):
705 """Adds files in this manifest not present in files dictionary.
706
707 Preemptively request files.
708
709 Note that |files| is modified by this function.
710 """
711 assert self.can_fetch
712 if not self._manifest_parsed or self.files_fetched:
713 return
714 logging.debug('fetch_files(%s)' % self.obj_hash)
715 for filepath, properties in self.data.get('files', {}).iteritems():
716 # Root manifest has priority on the files being mapped. In particular,
717 # overriden files must not be fetched.
718 if filepath not in files:
719 files[filepath] = properties
720 if 'sha-1' in properties:
721 # Preemptively request files.
722 logging.debug('fetching %s' % filepath)
723 cache.retrieve(Remote.MED, properties['sha-1'])
724 self.files_fetched = True
725
726
727class Settings(object):
728 """Results of a completely parsed manifest."""
729 def __init__(self):
730 self.command = []
731 self.files = {}
732 self.read_only = None
733 self.relative_cwd = None
734 # The main manifest.
735 self.root = None
736 logging.debug('Settings')
737
738 def load(self, cache, root_manifest_hash):
739 """Loads the manifest and all the included manifests asynchronously.
740
741 It enables support for included manifest. They are processed in strict order
742 but fetched asynchronously from the cache. This is important so that a file
743 in an included manifest that is overridden by an embedding manifest is not
744 fetched neededlessly. The includes are fetched in one pass and the files are
745 fetched as soon as all the manifests on the left-side of the tree were
746 fetched.
747
748 The prioritization is very important here for nested manifests. 'includes'
749 have the highest priority and the algorithm is optimized for both deep and
750 wide manifests. A deep one is a long link of manifest referenced one at a
751 time by one item in 'includes'. A wide one has a large number of 'includes'
752 in a single manifest. 'left' is defined as an included manifest earlier in
753 the 'includes' list. So the order of the elements in 'includes' is
754 important.
755 """
756 self.root = Manifest(root_manifest_hash)
757 cache.retrieve(Remote.HIGH, root_manifest_hash)
758 pending = {root_manifest_hash: self.root}
759 # Keeps the list of retrieved items to refuse recursive includes.
760 retrieved = [root_manifest_hash]
761
762 def update_self(node):
763 node.fetch_files(cache, self.files)
764 # Grabs properties.
765 if not self.command and node.data.get('command'):
766 self.command = node.data['command']
767 if self.read_only is None and node.data.get('read_only') is not None:
768 self.read_only = node.data['read_only']
769 if (self.relative_cwd is None and
770 node.data.get('relative_cwd') is not None):
771 self.relative_cwd = node.data['relative_cwd']
772
773 def traverse_tree(node):
774 if node.can_fetch:
775 if not node.files_fetched:
776 update_self(node)
777 will_break = False
778 for i in node.children:
779 if not i.can_fetch:
780 if will_break:
781 break
782 # Automatically mark the first one as fetcheable.
783 i.can_fetch = True
784 will_break = True
785 traverse_tree(i)
786
787 while pending:
788 item_hash = cache.wait_for(pending)
789 item = pending.pop(item_hash)
790 item.load(open(cache.path(item_hash), 'r').read())
791 if item_hash == root_manifest_hash:
792 # It's the root item.
793 item.can_fetch = True
794
795 for new_child in item.children:
796 h = new_child.obj_hash
797 if h in retrieved:
798 raise ConfigError('Manifest %s is retrieved recursively' % h)
799 pending[h] = new_child
800 cache.retrieve(Remote.HIGH, h)
801
802 # Traverse the whole tree to see if files can now be fetched.
803 traverse_tree(self.root)
804 def check(n):
805 return all(check(x) for x in n.children) and n.files_fetched
806 assert check(self.root)
807 self.relative_cwd = self.relative_cwd or ''
808 self.read_only = self.read_only or False
809
810
811def run_tha_test(manifest_hash, cache_dir, remote, policies):
812 """Downloads the dependencies in the cache, hardlinks them into a temporary
813 directory and runs the executable.
814 """
815 settings = Settings()
816 with Cache(cache_dir, Remote(remote), policies) as cache:
817 outdir = make_temp_dir('run_tha_test', cache_dir)
818 try:
819 # Initiate all the files download.
820 with Profiler('GetManifests') as _prof:
821 # Optionally support local files.
822 if not RE_IS_SHA1.match(manifest_hash):
823 # Adds it in the cache. While not strictly necessary, this simplifies
824 # the rest.
825 h = hashlib.sha1(open(manifest_hash, 'r').read()).hexdigest()
826 cache.add(manifest_hash, h)
827 manifest_hash = h
828 settings.load(cache, manifest_hash)
829
830 if not settings.command:
831 print >> sys.stderr, 'No command to run'
832 return 1
833
834 with Profiler('GetRest') as _prof:
835 logging.debug('Creating directories')
836 # Creates the tree of directories to create.
837 directories = set(os.path.dirname(f) for f in settings.files)
838 for item in list(directories):
839 while item:
840 directories.add(item)
841 item = os.path.dirname(item)
842 for d in sorted(directories):
843 if d:
844 os.mkdir(os.path.join(outdir, d))
845
846 # Creates the links if necessary.
847 for filepath, properties in settings.files.iteritems():
848 if 'link' not in properties:
849 continue
850 outfile = os.path.join(outdir, filepath)
maruel@chromium.orgf43e68b2012-10-15 20:23:10 +0000851 # symlink doesn't exist on Windows. So the 'link' property should
852 # never be specified for windows .isolated file.
853 os.symlink(properties['link'], outfile) # pylint: disable=E1101
maruel@chromium.org9c72d4e2012-09-28 19:20:25 +0000854 if 'mode' in properties:
855 # It's not set on Windows.
856 os.chmod(outfile, properties['mode'])
857
858 # Remaining files to be processed.
859 # Note that files could still be not be downloaded yet here.
860 remaining = dict()
861 for filepath, props in settings.files.iteritems():
862 if 'sha-1' in props:
863 remaining.setdefault(props['sha-1'], []).append((filepath, props))
864
865 # Do bookkeeping while files are being downloaded in the background.
866 cwd = os.path.join(outdir, settings.relative_cwd)
867 if not os.path.isdir(cwd):
868 os.makedirs(cwd)
869 cmd = settings.command[:]
870 # Ensure paths are correctly separated on windows.
871 cmd[0] = cmd[0].replace('/', os.path.sep)
872 cmd = fix_python_path(cmd)
873
874 # Now block on the remaining files to be downloaded and mapped.
875 while remaining:
876 obj = cache.wait_for(remaining)
877 for filepath, properties in remaining.pop(obj):
878 outfile = os.path.join(outdir, filepath)
879 link_file(outfile, cache.path(obj), HARDLINK)
880 if 'mode' in properties:
881 # It's not set on Windows.
882 os.chmod(outfile, properties['mode'])
883
884 if settings.read_only:
885 make_writable(outdir, True)
886 logging.info('Running %s, cwd=%s' % (cmd, cwd))
887 try:
888 with Profiler('RunTest') as _prof:
889 return subprocess.call(cmd, cwd=cwd)
890 except OSError:
891 print >> sys.stderr, 'Failed to run %s; cwd=%s' % (cmd, cwd)
892 raise
893 finally:
894 rmtree(outdir)
895
896
897def main():
898 parser = optparse.OptionParser(
899 usage='%prog <options>', description=sys.modules[__name__].__doc__)
900 parser.add_option(
901 '-v', '--verbose', action='count', default=0, help='Use multiple times')
902 parser.add_option('--no-run', action='store_true', help='Skip the run part')
903
904 group = optparse.OptionGroup(parser, 'Data source')
905 group.add_option(
906 '-m', '--manifest',
907 metavar='FILE',
908 help='File/url describing what to map or run')
909 group.add_option(
910 '-H', '--hash',
911 help='Hash of the manifest to grab from the hash table')
912 parser.add_option_group(group)
913
914 group.add_option(
915 '-r', '--remote', metavar='URL', help='Remote where to get the items')
916 group = optparse.OptionGroup(parser, 'Cache management')
917 group.add_option(
918 '--cache',
919 default='cache',
920 metavar='DIR',
921 help='Cache directory, default=%default')
922 group.add_option(
923 '--max-cache-size',
924 type='int',
925 metavar='NNN',
926 default=20*1024*1024*1024,
927 help='Trim if the cache gets larger than this value, default=%default')
928 group.add_option(
929 '--min-free-space',
930 type='int',
931 metavar='NNN',
932 default=1*1024*1024*1024,
933 help='Trim if disk free space becomes lower than this value, '
934 'default=%default')
935 group.add_option(
936 '--max-items',
937 type='int',
938 metavar='NNN',
939 default=100000,
940 help='Trim if more than this number of items are in the cache '
941 'default=%default')
942 parser.add_option_group(group)
943
944 options, args = parser.parse_args()
945 level = [logging.ERROR, logging.INFO, logging.DEBUG][min(2, options.verbose)]
946 logging.basicConfig(
947 level=level,
948 format='%(levelname)5s %(module)15s(%(lineno)3d): %(message)s')
949
950 if bool(options.manifest) == bool(options.hash):
951 parser.error('One and only one of --manifest or --hash is required.')
952 if not options.remote:
953 parser.error('--remote is required.')
954 if args:
955 parser.error('Unsupported args %s' % ' '.join(args))
956
957 policies = CachePolicies(
958 options.max_cache_size, options.min_free_space, options.max_items)
959 try:
960 return run_tha_test(
961 options.manifest or options.hash,
962 os.path.abspath(options.cache),
963 options.remote,
964 policies)
965 except (ConfigError, MappingError), e:
966 print >> sys.stderr, str(e)
967 return 1
968
969
970if __name__ == '__main__':
971 sys.exit(main())