blob: dbb36c6007ff883d5e3f6aef163ed1306d54e703 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000020import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000022from third_party import colorama
23from third_party.depot_tools import fix_encoding
24from third_party.depot_tools import subcommand
25
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000026from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000027from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000028from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000029
30
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000031# Version of isolate protocol passed to the server in /handshake request.
32ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000034
35# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000036# All files are sorted by likelihood of a change in the file content
37# (currently file size is used to estimate this: larger the file -> larger the
38# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000040# and so on. Numbers here is a trade-off; the more per request, the lower the
41# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42# larger values cause longer lookups, increasing the initial latency to start
43# uploading, which is especially an issue for large files. This value is
44# optimized for the "few thousands files to look up with minimal number of large
45# files missing" case.
46ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000047
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000048
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000049# A list of already compressed extension types that should not receive any
50# compression before being uploaded.
51ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip'
54]
55
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000056
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000057# The file size to be used when we don't know the correct file size,
58# generally used for .isolated files.
59UNKNOWN_FILE_SIZE = None
60
61
62# The size of each chunk to read when downloading and unzipping files.
63ZIPPED_FILE_CHUNK = 16 * 1024
64
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000065# Chunk size to use when doing disk I/O.
66DISK_FILE_CHUNK = 1024 * 1024
67
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000068# Chunk size to use when reading from network stream.
69NET_IO_FILE_CHUNK = 16 * 1024
70
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000071
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000072# Read timeout in seconds for downloads from isolate storage. If there's no
73# response from the server within this timeout whole download will be aborted.
74DOWNLOAD_READ_TIMEOUT = 60
75
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000076# Maximum expected delay (in seconds) between successive file fetches
77# in run_tha_test. If it takes longer than that, a deadlock might be happening
78# and all stack frames for all threads are dumped to log.
79DEADLOCK_TIMEOUT = 5 * 60
80
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
maruel@chromium.org385d73d2013-09-19 18:33:21 +000088# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
89# specify the names here.
90SUPPORTED_ALGOS = {
91 'md5': hashlib.md5,
92 'sha-1': hashlib.sha1,
93 'sha-512': hashlib.sha512,
94}
95
96
97# Used for serialization.
98SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
99
100
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000101class ConfigError(ValueError):
102 """Generic failure to load a .isolated file."""
103 pass
104
105
106class MappingError(OSError):
107 """Failed to recreate the tree."""
108 pass
109
110
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000111def is_valid_hash(value, algo):
112 """Returns if the value is a valid hash for the corresponding algorithm."""
113 size = 2 * algo().digest_size
114 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
115
116
117def hash_file(filepath, algo):
118 """Calculates the hash of a file without reading it all in memory at once.
119
120 |algo| should be one of hashlib hashing algorithm.
121 """
122 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000123 with open(filepath, 'rb') as f:
124 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000126 if not chunk:
127 break
128 digest.update(chunk)
129 return digest.hexdigest()
130
131
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000132def stream_read(stream, chunk_size):
133 """Reads chunks from |stream| and yields them."""
134 while True:
135 data = stream.read(chunk_size)
136 if not data:
137 break
138 yield data
139
140
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000141def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
142 """Yields file content in chunks of given |chunk_size|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000143 with open(filepath, 'rb') as f:
144 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000145 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000146 if not data:
147 break
148 yield data
149
150
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151def file_write(filepath, content_generator):
152 """Writes file content as generated by content_generator.
153
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 Creates the intermediary directory as needed.
155
156 Returns the number of bytes written.
157
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000158 Meant to be mocked out in unit tests.
159 """
160 filedir = os.path.dirname(filepath)
161 if not os.path.isdir(filedir):
162 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000163 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000164 with open(filepath, 'wb') as f:
165 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000166 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000167 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000168 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000169
170
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000171def zip_compress(content_generator, level=7):
172 """Reads chunks from |content_generator| and yields zip compressed chunks."""
173 compressor = zlib.compressobj(level)
174 for chunk in content_generator:
175 compressed = compressor.compress(chunk)
176 if compressed:
177 yield compressed
178 tail = compressor.flush(zlib.Z_FINISH)
179 if tail:
180 yield tail
181
182
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000183def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
184 """Reads zipped data from |content_generator| and yields decompressed data.
185
186 Decompresses data in small chunks (no larger than |chunk_size|) so that
187 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
188
189 Raises IOError if data is corrupted or incomplete.
190 """
191 decompressor = zlib.decompressobj()
192 compressed_size = 0
193 try:
194 for chunk in content_generator:
195 compressed_size += len(chunk)
196 data = decompressor.decompress(chunk, chunk_size)
197 if data:
198 yield data
199 while decompressor.unconsumed_tail:
200 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
201 if data:
202 yield data
203 tail = decompressor.flush()
204 if tail:
205 yield tail
206 except zlib.error as e:
207 raise IOError(
208 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
209 # Ensure all data was read and decompressed.
210 if decompressor.unused_data or decompressor.unconsumed_tail:
211 raise IOError('Not all data was decompressed')
212
213
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000214def get_zip_compression_level(filename):
215 """Given a filename calculates the ideal zip compression level to use."""
216 file_ext = os.path.splitext(filename)[1].lower()
217 # TODO(csharp): Profile to find what compression level works best.
218 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
219
220
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000221def create_directories(base_directory, files):
222 """Creates the directory structure needed by the given list of files."""
223 logging.debug('create_directories(%s, %d)', base_directory, len(files))
224 # Creates the tree of directories to create.
225 directories = set(os.path.dirname(f) for f in files)
226 for item in list(directories):
227 while item:
228 directories.add(item)
229 item = os.path.dirname(item)
230 for d in sorted(directories):
231 if d:
232 os.mkdir(os.path.join(base_directory, d))
233
234
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500235def create_symlinks(base_directory, files):
236 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000237 for filepath, properties in files:
238 if 'l' not in properties:
239 continue
240 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500241 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000242 logging.warning('Ignoring symlink %s', filepath)
243 continue
244 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500245 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000246 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000247
248
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000249def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000250 """Determines if the given files appears valid.
251
252 Currently it just checks the file's size.
253 """
254 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000255 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000256 actual_size = os.stat(filepath).st_size
257 if size != actual_size:
258 logging.warning(
259 'Found invalid item %s; %d != %d',
260 os.path.basename(filepath), actual_size, size)
261 return False
262 return True
263
264
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000265class WorkerPool(threading_utils.AutoRetryThreadPool):
266 """Thread pool that automatically retries on IOError and runs a preconfigured
267 function.
268 """
269 # Initial and maximum number of worker threads.
270 INITIAL_WORKERS = 2
271 MAX_WORKERS = 16
272 RETRIES = 5
273
274 def __init__(self):
275 super(WorkerPool, self).__init__(
276 [IOError],
277 self.RETRIES,
278 self.INITIAL_WORKERS,
279 self.MAX_WORKERS,
280 0,
281 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000282
283
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000284class Item(object):
285 """An item to push to Storage.
286
287 It starts its life in a main thread, travels to 'contains' thread, then to
288 'push' thread and then finally back to the main thread.
289
290 It is never used concurrently from multiple threads.
291 """
292
293 def __init__(self, digest, size, is_isolated=False):
294 self.digest = digest
295 self.size = size
296 self.is_isolated = is_isolated
297 self.compression_level = 6
298 self.push_state = None
299
300 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000301 """Iterable with content of this item in chunks of given size.
302
303 Arguments:
304 chunk_size: preferred size of the chunk to produce, may be ignored.
305 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000306 raise NotImplementedError()
307
308
309class FileItem(Item):
310 """A file to push to Storage."""
311
312 def __init__(self, path, digest, size, is_isolated):
313 super(FileItem, self).__init__(digest, size, is_isolated)
314 self.path = path
315 self.compression_level = get_zip_compression_level(path)
316
317 def content(self, chunk_size):
318 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000319
320
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000321class BufferItem(Item):
322 """A byte buffer to push to Storage."""
323
324 def __init__(self, buf, algo, is_isolated=False):
325 super(BufferItem, self).__init__(
326 algo(buf).hexdigest(), len(buf), is_isolated)
327 self.buffer = buf
328
329 def content(self, _chunk_size):
330 return [self.buffer]
331
332
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000333class Storage(object):
334 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000335
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 def __init__(self, storage_api, use_zip):
337 self.use_zip = use_zip
338 self._storage_api = storage_api
339 self._cpu_thread_pool = None
340 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000341
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000342 @property
343 def cpu_thread_pool(self):
344 """ThreadPool for CPU-bound tasks like zipping."""
345 if self._cpu_thread_pool is None:
346 self._cpu_thread_pool = threading_utils.ThreadPool(
347 2, max(threading_utils.num_processors(), 2), 0, 'zip')
348 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000349
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000350 @property
351 def net_thread_pool(self):
352 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
353 if self._net_thread_pool is None:
354 self._net_thread_pool = WorkerPool()
355 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000356
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000357 def close(self):
358 """Waits for all pending tasks to finish."""
359 if self._cpu_thread_pool:
360 self._cpu_thread_pool.join()
361 self._cpu_thread_pool.close()
362 self._cpu_thread_pool = None
363 if self._net_thread_pool:
364 self._net_thread_pool.join()
365 self._net_thread_pool.close()
366 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000367
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000368 def __enter__(self):
369 """Context manager interface."""
370 return self
371
372 def __exit__(self, _exc_type, _exc_value, _traceback):
373 """Context manager interface."""
374 self.close()
375 return False
376
377 def upload_tree(self, indir, infiles):
378 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000379
380 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000381 indir: root directory the infiles are based in.
382 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000383
384 Returns:
385 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000386 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
388
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000389 # Convert |indir| + |infiles| into a list of FileItem objects.
390 # Filter out symlinks, since they are not represented by items on isolate
391 # server side.
392 items = [
393 FileItem(
394 path=os.path.join(indir, filepath),
395 digest=metadata['h'],
396 size=metadata['s'],
397 is_isolated=metadata.get('priority') == '0')
398 for filepath, metadata in infiles.iteritems()
399 if 'l' not in metadata
400 ]
401
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000402 return self.upload_items(items)
403
404 def upload_items(self, items):
405 """Uploads bunch of items to the isolate server.
406
407 Will upload only items that are missing.
408
409 Arguments:
410 items: list of Item instances that represents data to upload.
411
412 Returns:
413 List of items that were uploaded. All other items are already there.
414 """
415 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
416 # used by swarming.py. There's no need to spawn multiple threads and try to
417 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
418 # 'push' should be performed sequentially in the context of current thread.
419
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000420 # For each digest keep only first Item that matches it. All other items
421 # are just indistinguishable copies from the point of view of isolate
422 # server (it doesn't care about paths at all, only content and digests).
423 seen = {}
424 duplicates = 0
425 for item in items:
426 if seen.setdefault(item.digest, item) is not item:
427 duplicates += 1
428 items = seen.values()
429 if duplicates:
430 logging.info('Skipped %d duplicated files', duplicates)
431
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000432 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000433 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000434 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000435 for missing_item in self.get_missing_items(items):
436 missing.add(missing_item)
437 self.async_push(
438 channel,
439 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
440 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000441
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000442 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000443 # No need to spawn deadlock detector thread if there's nothing to upload.
444 if missing:
445 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
446 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000447 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000448 detector.ping()
449 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000450 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000451 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000452 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 logging.info('All files are uploaded')
454
455 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000456 total = len(items)
457 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000458 logging.info(
459 'Total: %6d, %9.1fkb',
460 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000461 total_size / 1024.)
462 cache_hit = set(items) - missing
463 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 logging.info(
465 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
466 len(cache_hit),
467 cache_hit_size / 1024.,
468 len(cache_hit) * 100. / total,
469 cache_hit_size * 100. / total_size if total_size else 0)
470 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000471 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472 logging.info(
473 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
474 len(cache_miss),
475 cache_miss_size / 1024.,
476 len(cache_miss) * 100. / total,
477 cache_miss_size * 100. / total_size if total_size else 0)
478
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000479 return uploaded
480
481 def get_fetch_url(self, digest):
482 """Returns an URL that can be used to fetch an item with given digest.
483
484 Arguments:
485 digest: hex digest of item to fetch.
486
487 Returns:
488 An URL or None if underlying protocol doesn't support this.
489 """
490 return self._storage_api.get_fetch_url(digest)
491
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000492 def async_push(self, channel, priority, item):
493 """Starts asynchronous push to the server in a parallel thread.
494
495 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000496 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000497 priority: thread pool task priority for the push.
498 item: item to upload as instance of Item class.
499 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000500 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000501 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000502 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000503 return item
504
505 # If zipping is not required, just start a push task.
506 if not self.use_zip:
507 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000508 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000509 return
510
511 # If zipping is enabled, zip in a separate thread.
512 def zip_and_push():
513 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
514 # content right here. It will block until all file is zipped.
515 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000516 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
517 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000518 data = ''.join(stream)
519 except Exception as exc:
520 logging.error('Failed to zip \'%s\': %s', item, exc)
521 channel.send_exception(exc)
522 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000523 self.net_thread_pool.add_task_with_channel(
524 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000525 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000526
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000527 def async_fetch(self, channel, priority, digest, size, sink):
528 """Starts asynchronous fetch from the server in a parallel thread.
529
530 Arguments:
531 channel: TaskChannel that receives back |digest| when download ends.
532 priority: thread pool task priority for the fetch.
533 digest: hex digest of an item to download.
534 size: expected size of the item (after decompression).
535 sink: function that will be called as sink(generator).
536 """
537 def fetch():
538 try:
539 # Prepare reading pipeline.
540 stream = self._storage_api.fetch(digest)
541 if self.use_zip:
542 stream = zip_decompress(stream, DISK_FILE_CHUNK)
543 # Run |stream| through verifier that will assert its size.
544 verifier = FetchStreamVerifier(stream, size)
545 # Verified stream goes to |sink|.
546 sink(verifier.run())
547 except Exception as err:
548 logging.warning('Failed to fetch %s: %s', digest, err)
549 raise
550 return digest
551
552 # Don't bother with zip_thread_pool for decompression. Decompression is
553 # really fast and most probably IO bound anyway.
554 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
555
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000556 def get_missing_items(self, items):
557 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000558
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000560
561 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000563
564 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000565 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000566 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000567 channel = threading_utils.TaskChannel()
568 pending = 0
569 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000570 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000571 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
572 self._storage_api.contains, batch)
573 pending += 1
574 # Yield results as they come in.
575 for _ in xrange(pending):
576 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000577 yield missing
578
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000579 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 def batch_items_for_check(items):
581 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000582
583 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000584 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000585
586 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000588
589 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000590 Batches of items to query for existence in a single operation,
591 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000592 """
593 batch_count = 0
594 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
595 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000596 for item in sorted(items, key=lambda x: x.size, reverse=True):
597 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000598 if len(next_queries) == batch_size_limit:
599 yield next_queries
600 next_queries = []
601 batch_count += 1
602 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
603 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
604 if next_queries:
605 yield next_queries
606
607
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000608class FetchQueue(object):
609 """Fetches items from Storage and places them into LocalCache.
610
611 It manages multiple concurrent fetch operations. Acts as a bridge between
612 Storage and LocalCache so that Storage and LocalCache don't depend on each
613 other at all.
614 """
615
616 def __init__(self, storage, cache):
617 self.storage = storage
618 self.cache = cache
619 self._channel = threading_utils.TaskChannel()
620 self._pending = set()
621 self._accessed = set()
622 self._fetched = cache.cached_set()
623
624 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
625 """Starts asynchronous fetch of item |digest|."""
626 # Fetching it now?
627 if digest in self._pending:
628 return
629
630 # Mark this file as in use, verify_all_cached will later ensure it is still
631 # in cache.
632 self._accessed.add(digest)
633
634 # Already fetched? Notify cache to update item's LRU position.
635 if digest in self._fetched:
636 # 'touch' returns True if item is in cache and not corrupted.
637 if self.cache.touch(digest, size):
638 return
639 # Item is corrupted, remove it from cache and fetch it again.
640 self._fetched.remove(digest)
641 self.cache.evict(digest)
642
643 # TODO(maruel): It should look at the free disk space, the current cache
644 # size and the size of the new item on every new item:
645 # - Trim the cache as more entries are listed when free disk space is low,
646 # otherwise if the amount of data downloaded during the run > free disk
647 # space, it'll crash.
648 # - Make sure there's enough free disk space to fit all dependencies of
649 # this run! If not, abort early.
650
651 # Start fetching.
652 self._pending.add(digest)
653 self.storage.async_fetch(
654 self._channel, priority, digest, size,
655 functools.partial(self.cache.write, digest))
656
657 def wait(self, digests):
658 """Starts a loop that waits for at least one of |digests| to be retrieved.
659
660 Returns the first digest retrieved.
661 """
662 # Flush any already fetched items.
663 for digest in digests:
664 if digest in self._fetched:
665 return digest
666
667 # Ensure all requested items are being fetched now.
668 assert all(digest in self._pending for digest in digests), (
669 digests, self._pending)
670
671 # Wait for some requested item to finish fetching.
672 while self._pending:
673 digest = self._channel.pull()
674 self._pending.remove(digest)
675 self._fetched.add(digest)
676 if digest in digests:
677 return digest
678
679 # Should never reach this point due to assert above.
680 raise RuntimeError('Impossible state')
681
682 def inject_local_file(self, path, algo):
683 """Adds local file to the cache as if it was fetched from storage."""
684 with open(path, 'rb') as f:
685 data = f.read()
686 digest = algo(data).hexdigest()
687 self.cache.write(digest, [data])
688 self._fetched.add(digest)
689 return digest
690
691 @property
692 def pending_count(self):
693 """Returns number of items to be fetched."""
694 return len(self._pending)
695
696 def verify_all_cached(self):
697 """True if all accessed items are in cache."""
698 return self._accessed.issubset(self.cache.cached_set())
699
700
701class FetchStreamVerifier(object):
702 """Verifies that fetched file is valid before passing it to the LocalCache."""
703
704 def __init__(self, stream, expected_size):
705 self.stream = stream
706 self.expected_size = expected_size
707 self.current_size = 0
708
709 def run(self):
710 """Generator that yields same items as |stream|.
711
712 Verifies |stream| is complete before yielding a last chunk to consumer.
713
714 Also wraps IOError produced by consumer into MappingError exceptions since
715 otherwise Storage will retry fetch on unrelated local cache errors.
716 """
717 # Read one chunk ahead, keep it in |stored|.
718 # That way a complete stream can be verified before pushing last chunk
719 # to consumer.
720 stored = None
721 for chunk in self.stream:
722 assert chunk is not None
723 if stored is not None:
724 self._inspect_chunk(stored, is_last=False)
725 try:
726 yield stored
727 except IOError as exc:
728 raise MappingError('Failed to store an item in cache: %s' % exc)
729 stored = chunk
730 if stored is not None:
731 self._inspect_chunk(stored, is_last=True)
732 try:
733 yield stored
734 except IOError as exc:
735 raise MappingError('Failed to store an item in cache: %s' % exc)
736
737 def _inspect_chunk(self, chunk, is_last):
738 """Called for each fetched chunk before passing it to consumer."""
739 self.current_size += len(chunk)
740 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
741 (self.expected_size != self.current_size)):
742 raise IOError('Incorrect file size: expected %d, got %d' % (
743 self.expected_size, self.current_size))
744
745
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000746class StorageApi(object):
747 """Interface for classes that implement low-level storage operations."""
748
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000749 def get_fetch_url(self, digest):
750 """Returns an URL that can be used to fetch an item with given digest.
751
752 Arguments:
753 digest: hex digest of item to fetch.
754
755 Returns:
756 An URL or None if the protocol doesn't support this.
757 """
758 raise NotImplementedError()
759
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000760 def fetch(self, digest):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000761 """Fetches an object and yields its content.
762
763 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000764 digest: hash digest of item to download.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000765
766 Yields:
767 Chunks of downloaded item (as str objects).
768 """
769 raise NotImplementedError()
770
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000771 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000772 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000773
774 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000775 item: Item object that holds information about an item being pushed.
776 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000777
778 Returns:
779 None.
780 """
781 raise NotImplementedError()
782
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000783 def contains(self, items):
784 """Checks for existence of given |items| on the server.
785
786 Mutates |items| by assigning opaque implement specific object to Item's
787 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000788
789 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000790 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000791
792 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000793 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000794 """
795 raise NotImplementedError()
796
797
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000798class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000799 """StorageApi implementation that downloads and uploads to Isolate Server.
800
801 It uploads and downloads directly from Google Storage whenever appropriate.
802 """
803
804 class _PushState(object):
805 """State needed to call .push(), to be stored in Item.push_state."""
806 def __init__(self, upload_url, finalize_url):
807 self.upload_url = upload_url
808 self.finalize_url = finalize_url
809 self.uploaded = False
810 self.finalized = False
811
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000812 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000813 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000814 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000816 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000817 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000818 self._server_caps = None
819
820 @staticmethod
821 def _generate_handshake_request():
822 """Returns a dict to be sent as handshake request body."""
823 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
824 return {
825 'client_app_version': __version__,
826 'fetcher': True,
827 'protocol_version': ISOLATE_PROTOCOL_VERSION,
828 'pusher': True,
829 }
830
831 @staticmethod
832 def _validate_handshake_response(caps):
833 """Validates and normalizes handshake response."""
834 logging.info('Protocol version: %s', caps['protocol_version'])
835 logging.info('Server version: %s', caps['server_app_version'])
836 if caps.get('error'):
837 raise MappingError(caps['error'])
838 if not caps['access_token']:
839 raise ValueError('access_token is missing')
840 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000841
842 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000843 def _server_capabilities(self):
844 """Performs handshake with the server if not yet done.
845
846 Returns:
847 Server capabilities dictionary as returned by /handshake endpoint.
848
849 Raises:
850 MappingError if server rejects the handshake.
851 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000852 # TODO(maruel): Make this request much earlier asynchronously while the
853 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000854 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000855 if self._server_caps is None:
856 request_body = json.dumps(
857 self._generate_handshake_request(), separators=(',', ':'))
858 response = net.url_read(
859 url=self.base_url + '/content-gs/handshake',
860 data=request_body,
861 content_type='application/json',
862 method='POST')
863 if response is None:
864 raise MappingError('Failed to perform handshake.')
865 try:
866 caps = json.loads(response)
867 if not isinstance(caps, dict):
868 raise ValueError('Expecting JSON dict')
869 self._server_caps = self._validate_handshake_response(caps)
870 except (ValueError, KeyError, TypeError) as exc:
871 # KeyError exception has very confusing str conversion: it's just a
872 # missing key value and nothing else. So print exception class name
873 # as well.
874 raise MappingError('Invalid handshake response (%s): %s' % (
875 exc.__class__.__name__, exc))
876 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000877
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000878 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000879 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000880 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000881 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000882
883 def fetch(self, digest):
884 source_url = self.get_fetch_url(digest)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000885 logging.debug('download_file(%s)', source_url)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000886
887 # Because the app engine DB is only eventually consistent, retry 404 errors
888 # because the file might just not be visible yet (even though it has been
889 # uploaded).
890 connection = net.url_open(
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000891 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000892 if not connection:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000893 raise IOError('Unable to open connection to %s' % source_url)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000894 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000895
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000896 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000897 assert isinstance(item, Item)
898 assert isinstance(item.push_state, IsolateServer._PushState)
899 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000900
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000901 # TODO(vadimsh): Do not read from |content| generator when retrying push.
902 # If |content| is indeed a generator, it can not be re-winded back
903 # to the beginning of the stream. A retry will find it exhausted. A possible
904 # solution is to wrap |content| generator with some sort of caching
905 # restartable generator. It should be done alongside streaming support
906 # implementation.
907
908 # This push operation may be a retry after failed finalization call below,
909 # no need to reupload contents in that case.
910 if not item.push_state.uploaded:
911 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
912 # upload support is implemented.
913 if isinstance(content, list) and len(content) == 1:
914 content = content[0]
915 else:
916 content = ''.join(content)
917 # PUT file to |upload_url|.
918 response = net.url_read(
919 url=item.push_state.upload_url,
920 data=content,
921 content_type='application/octet-stream',
922 method='PUT')
923 if response is None:
924 raise IOError('Failed to upload a file %s to %s' % (
925 item.digest, item.push_state.upload_url))
926 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000927 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000928 logging.info(
929 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000930
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000931 # Optionally notify the server that it's done.
932 if item.push_state.finalize_url:
933 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
934 # send it to isolated server. That way isolate server can verify that
935 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
936 # stored files).
937 response = net.url_read(
938 url=item.push_state.finalize_url,
939 data='',
940 content_type='application/json',
941 method='POST')
942 if response is None:
943 raise IOError('Failed to finalize an upload of %s' % item.digest)
944 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000945
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000946 def contains(self, items):
947 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000948
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000949 # Request body is a json encoded list of dicts.
950 body = [
951 {
952 'h': item.digest,
953 's': item.size,
954 'i': int(item.is_isolated),
955 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000956 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000957
958 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
959 self.base_url,
960 self.namespace,
961 urllib.quote(self._server_capabilities['access_token']))
962 response_body = net.url_read(
963 url=query_url,
964 data=json.dumps(body, separators=(',', ':')),
965 content_type='application/json',
966 method='POST')
967 if response_body is None:
968 raise MappingError('Failed to execute /pre-upload query')
969
970 # Response body is a list of push_urls (or null if file is already present).
971 try:
972 response = json.loads(response_body)
973 if not isinstance(response, list):
974 raise ValueError('Expecting response with json-encoded list')
975 if len(response) != len(items):
976 raise ValueError(
977 'Incorrect number of items in the list, expected %d, '
978 'but got %d' % (len(items), len(response)))
979 except ValueError as err:
980 raise MappingError(
981 'Invalid response from server: %s, body is %s' % (err, response_body))
982
983 # Pick Items that are missing, attach _PushState to them.
984 missing_items = []
985 for i, push_urls in enumerate(response):
986 if push_urls:
987 assert len(push_urls) == 2, str(push_urls)
988 item = items[i]
989 assert item.push_state is None
990 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
991 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000992 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000993 len(items), len(items) - len(missing_items))
994 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000995
996
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000997class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000998 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000999
1000 The common use case is a NFS/CIFS file server that is mounted locally that is
1001 used to fetch the file on a local partition.
1002 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001003
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001004 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001005 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001006 self.base_path = base_path
1007
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001008 def get_fetch_url(self, digest):
1009 return None
1010
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001011 def fetch(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001012 assert isinstance(digest, basestring)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001013 return file_read(os.path.join(self.base_path, digest))
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001014
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001016 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001017 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001018
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001019 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001020 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001021 item for item in items
1022 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001023 ]
1024
1025
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001026class LocalCache(object):
1027 """Local cache that stores objects fetched via Storage.
1028
1029 It can be accessed concurrently from multiple threads, so it should protect
1030 its internal state with some lock.
1031 """
1032
1033 def __enter__(self):
1034 """Context manager interface."""
1035 return self
1036
1037 def __exit__(self, _exc_type, _exec_value, _traceback):
1038 """Context manager interface."""
1039 return False
1040
1041 def cached_set(self):
1042 """Returns a set of all cached digests (always a new object)."""
1043 raise NotImplementedError()
1044
1045 def touch(self, digest, size):
1046 """Ensures item is not corrupted and updates its LRU position.
1047
1048 Arguments:
1049 digest: hash digest of item to check.
1050 size: expected size of this item.
1051
1052 Returns:
1053 True if item is in cache and not corrupted.
1054 """
1055 raise NotImplementedError()
1056
1057 def evict(self, digest):
1058 """Removes item from cache if it's there."""
1059 raise NotImplementedError()
1060
1061 def read(self, digest):
1062 """Returns contents of the cached item as a single str."""
1063 raise NotImplementedError()
1064
1065 def write(self, digest, content):
1066 """Reads data from |content| generator and stores it in cache."""
1067 raise NotImplementedError()
1068
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001069 def hardlink(self, digest, dest, file_mode):
1070 """Ensures file at |dest| has same content as cached |digest|.
1071
1072 If file_mode is provided, it is used to set the executable bit if
1073 applicable.
1074 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001075 raise NotImplementedError()
1076
1077
1078class MemoryCache(LocalCache):
1079 """LocalCache implementation that stores everything in memory."""
1080
1081 def __init__(self):
1082 super(MemoryCache, self).__init__()
1083 # Let's not assume dict is thread safe.
1084 self._lock = threading.Lock()
1085 self._contents = {}
1086
1087 def cached_set(self):
1088 with self._lock:
1089 return set(self._contents)
1090
1091 def touch(self, digest, size):
1092 with self._lock:
1093 return digest in self._contents
1094
1095 def evict(self, digest):
1096 with self._lock:
1097 self._contents.pop(digest, None)
1098
1099 def read(self, digest):
1100 with self._lock:
1101 return self._contents[digest]
1102
1103 def write(self, digest, content):
1104 # Assemble whole stream before taking the lock.
1105 data = ''.join(content)
1106 with self._lock:
1107 self._contents[digest] = data
1108
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001109 def hardlink(self, digest, dest, file_mode):
1110 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001111 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001112 if file_mode is not None:
1113 # Ignores all other bits.
1114 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001115
1116
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001117def get_hash_algo(_namespace):
1118 """Return hash algorithm class to use when uploading to given |namespace|."""
1119 # TODO(vadimsh): Implement this at some point.
1120 return hashlib.sha1
1121
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001122
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001123def is_namespace_with_compression(namespace):
1124 """Returns True if given |namespace| stores compressed objects."""
1125 return namespace.endswith(('-gzip', '-deflate'))
1126
1127
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001128def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001129 """Returns an object that implements StorageApi interface."""
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001130 if re.match(r'^https?://.+$', file_or_url):
1131 return IsolateServer(file_or_url, namespace)
1132 else:
1133 return FileSystem(file_or_url)
1134
1135
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001136def get_storage(file_or_url, namespace):
1137 """Returns Storage class configured with appropriate StorageApi instance."""
1138 return Storage(
1139 get_storage_api(file_or_url, namespace),
1140 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001141
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001142
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001143def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001144 """Uploads the given tree to the given url.
1145
1146 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001147 base_url: The base url, it is assume that |base_url|/has/ can be used to
1148 query if an element was already uploaded, and |base_url|/store/
1149 can be used to upload a new element.
1150 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001151 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001152 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001153 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001154 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001155 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001156 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001157
1158
maruel@chromium.org41601642013-09-18 19:40:46 +00001159def load_isolated(content, os_flavor, algo):
1160 """Verifies the .isolated file is valid and loads this object with the json
1161 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001162
1163 Arguments:
1164 - content: raw serialized content to load.
1165 - os_flavor: OS to load this file on. Optional.
1166 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1167 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001168 """
1169 try:
1170 data = json.loads(content)
1171 except ValueError:
1172 raise ConfigError('Failed to parse: %s...' % content[:100])
1173
1174 if not isinstance(data, dict):
1175 raise ConfigError('Expected dict, got %r' % data)
1176
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001177 # Check 'version' first, since it could modify the parsing after.
1178 value = data.get('version', '1.0')
1179 if not isinstance(value, basestring):
1180 raise ConfigError('Expected string, got %r' % value)
1181 if not re.match(r'^(\d+)\.(\d+)$', value):
1182 raise ConfigError('Expected a compatible version, got %r' % value)
1183 if value.split('.', 1)[0] != '1':
1184 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1185
1186 if algo is None:
1187 # Default the algorithm used in the .isolated file itself, falls back to
1188 # 'sha-1' if unspecified.
1189 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1190
maruel@chromium.org41601642013-09-18 19:40:46 +00001191 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001192 if key == 'algo':
1193 if not isinstance(value, basestring):
1194 raise ConfigError('Expected string, got %r' % value)
1195 if value not in SUPPORTED_ALGOS:
1196 raise ConfigError(
1197 'Expected one of \'%s\', got %r' %
1198 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1199 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1200 raise ConfigError(
1201 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1202
1203 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001204 if not isinstance(value, list):
1205 raise ConfigError('Expected list, got %r' % value)
1206 if not value:
1207 raise ConfigError('Expected non-empty command')
1208 for subvalue in value:
1209 if not isinstance(subvalue, basestring):
1210 raise ConfigError('Expected string, got %r' % subvalue)
1211
1212 elif key == 'files':
1213 if not isinstance(value, dict):
1214 raise ConfigError('Expected dict, got %r' % value)
1215 for subkey, subvalue in value.iteritems():
1216 if not isinstance(subkey, basestring):
1217 raise ConfigError('Expected string, got %r' % subkey)
1218 if not isinstance(subvalue, dict):
1219 raise ConfigError('Expected dict, got %r' % subvalue)
1220 for subsubkey, subsubvalue in subvalue.iteritems():
1221 if subsubkey == 'l':
1222 if not isinstance(subsubvalue, basestring):
1223 raise ConfigError('Expected string, got %r' % subsubvalue)
1224 elif subsubkey == 'm':
1225 if not isinstance(subsubvalue, int):
1226 raise ConfigError('Expected int, got %r' % subsubvalue)
1227 elif subsubkey == 'h':
1228 if not is_valid_hash(subsubvalue, algo):
1229 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1230 elif subsubkey == 's':
1231 if not isinstance(subsubvalue, int):
1232 raise ConfigError('Expected int, got %r' % subsubvalue)
1233 else:
1234 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001235 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001236 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001237 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1238 subvalue)
1239 if bool('h' in subvalue) != bool('s' in subvalue):
1240 raise ConfigError(
1241 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1242 subvalue)
1243 if bool('s' in subvalue) == bool('l' in subvalue):
1244 raise ConfigError(
1245 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1246 subvalue)
1247 if bool('l' in subvalue) and bool('m' in subvalue):
1248 raise ConfigError(
1249 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001250 subvalue)
1251
1252 elif key == 'includes':
1253 if not isinstance(value, list):
1254 raise ConfigError('Expected list, got %r' % value)
1255 if not value:
1256 raise ConfigError('Expected non-empty includes list')
1257 for subvalue in value:
1258 if not is_valid_hash(subvalue, algo):
1259 raise ConfigError('Expected sha-1, got %r' % subvalue)
1260
1261 elif key == 'read_only':
1262 if not isinstance(value, bool):
1263 raise ConfigError('Expected bool, got %r' % value)
1264
1265 elif key == 'relative_cwd':
1266 if not isinstance(value, basestring):
1267 raise ConfigError('Expected string, got %r' % value)
1268
1269 elif key == 'os':
1270 if os_flavor and value != os_flavor:
1271 raise ConfigError(
1272 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1273 (os_flavor, value))
1274
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001275 elif key == 'version':
1276 # Already checked above.
1277 pass
1278
maruel@chromium.org41601642013-09-18 19:40:46 +00001279 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001280 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001281
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001282 # Automatically fix os.path.sep if necessary. While .isolated files are always
1283 # in the the native path format, someone could want to download an .isolated
1284 # tree from another OS.
1285 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1286 if 'files' in data:
1287 data['files'] = dict(
1288 (k.replace(wrong_path_sep, os.path.sep), v)
1289 for k, v in data['files'].iteritems())
1290 for v in data['files'].itervalues():
1291 if 'l' in v:
1292 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1293 if 'relative_cwd' in data:
1294 data['relative_cwd'] = data['relative_cwd'].replace(
1295 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001296 return data
1297
1298
1299class IsolatedFile(object):
1300 """Represents a single parsed .isolated file."""
1301 def __init__(self, obj_hash, algo):
1302 """|obj_hash| is really the sha-1 of the file."""
1303 logging.debug('IsolatedFile(%s)' % obj_hash)
1304 self.obj_hash = obj_hash
1305 self.algo = algo
1306 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1307 # .isolate and all the .isolated files recursively included by it with
1308 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1309 # .isolated file in the hash table, is important, as the later ones are not
1310 # processed until the firsts are retrieved and read.
1311 self.can_fetch = False
1312
1313 # Raw data.
1314 self.data = {}
1315 # A IsolatedFile instance, one per object in self.includes.
1316 self.children = []
1317
1318 # Set once the .isolated file is loaded.
1319 self._is_parsed = False
1320 # Set once the files are fetched.
1321 self.files_fetched = False
1322
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001323 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001324 """Verifies the .isolated file is valid and loads this object with the json
1325 data.
1326 """
1327 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1328 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001329 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001330 self.children = [
1331 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1332 ]
1333 self._is_parsed = True
1334
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001335 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001336 """Adds files in this .isolated file not present in |files| dictionary.
1337
1338 Preemptively request files.
1339
1340 Note that |files| is modified by this function.
1341 """
1342 assert self.can_fetch
1343 if not self._is_parsed or self.files_fetched:
1344 return
1345 logging.debug('fetch_files(%s)' % self.obj_hash)
1346 for filepath, properties in self.data.get('files', {}).iteritems():
1347 # Root isolated has priority on the files being mapped. In particular,
1348 # overriden files must not be fetched.
1349 if filepath not in files:
1350 files[filepath] = properties
1351 if 'h' in properties:
1352 # Preemptively request files.
1353 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001354 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001355 self.files_fetched = True
1356
1357
1358class Settings(object):
1359 """Results of a completely parsed .isolated file."""
1360 def __init__(self):
1361 self.command = []
1362 self.files = {}
1363 self.read_only = None
1364 self.relative_cwd = None
1365 # The main .isolated file, a IsolatedFile instance.
1366 self.root = None
1367
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001368 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001369 """Loads the .isolated and all the included .isolated asynchronously.
1370
1371 It enables support for "included" .isolated files. They are processed in
1372 strict order but fetched asynchronously from the cache. This is important so
1373 that a file in an included .isolated file that is overridden by an embedding
1374 .isolated file is not fetched needlessly. The includes are fetched in one
1375 pass and the files are fetched as soon as all the ones on the left-side
1376 of the tree were fetched.
1377
1378 The prioritization is very important here for nested .isolated files.
1379 'includes' have the highest priority and the algorithm is optimized for both
1380 deep and wide trees. A deep one is a long link of .isolated files referenced
1381 one at a time by one item in 'includes'. A wide one has a large number of
1382 'includes' in a single .isolated file. 'left' is defined as an included
1383 .isolated file earlier in the 'includes' list. So the order of the elements
1384 in 'includes' is important.
1385 """
1386 self.root = IsolatedFile(root_isolated_hash, algo)
1387
1388 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1389 pending = {}
1390 # Set of hashes of already retrieved items to refuse recursive includes.
1391 seen = set()
1392
1393 def retrieve(isolated_file):
1394 h = isolated_file.obj_hash
1395 if h in seen:
1396 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1397 assert h not in pending
1398 seen.add(h)
1399 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001400 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001401
1402 retrieve(self.root)
1403
1404 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001405 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001406 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001407 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001408 if item_hash == root_isolated_hash:
1409 # It's the root item.
1410 item.can_fetch = True
1411
1412 for new_child in item.children:
1413 retrieve(new_child)
1414
1415 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001416 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001417
1418 def check(n):
1419 return all(check(x) for x in n.children) and n.files_fetched
1420 assert check(self.root)
1421
1422 self.relative_cwd = self.relative_cwd or ''
1423 self.read_only = self.read_only or False
1424
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001425 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001426 if node.can_fetch:
1427 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001428 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001429 will_break = False
1430 for i in node.children:
1431 if not i.can_fetch:
1432 if will_break:
1433 break
1434 # Automatically mark the first one as fetcheable.
1435 i.can_fetch = True
1436 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001437 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001438
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001439 def _update_self(self, fetch_queue, node):
1440 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001441 # Grabs properties.
1442 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001443 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001444 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001445 if self.command:
1446 self.command[0] = self.command[0].replace('/', os.path.sep)
1447 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001448 if self.read_only is None and node.data.get('read_only') is not None:
1449 self.read_only = node.data['read_only']
1450 if (self.relative_cwd is None and
1451 node.data.get('relative_cwd') is not None):
1452 self.relative_cwd = node.data['relative_cwd']
1453
1454
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001455def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001456 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001457 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001458
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001459 Arguments:
1460 isolated_hash: hash of the root *.isolated file.
1461 storage: Storage class that communicates with isolate storage.
1462 cache: LocalCache class that knows how to store and map files locally.
1463 algo: hash algorithm to use.
1464 outdir: Output directory to map file tree to.
1465 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1466 require_command: Ensure *.isolated specifies a command to run.
1467
1468 Returns:
1469 Settings object that holds details about loaded *.isolated file.
1470 """
1471 with cache:
1472 fetch_queue = FetchQueue(storage, cache)
1473 settings = Settings()
1474
1475 with tools.Profiler('GetIsolateds'):
1476 # Optionally support local files by manually adding them to cache.
1477 if not is_valid_hash(isolated_hash, algo):
1478 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1479
1480 # Load all *.isolated and start loading rest of the files.
1481 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001482 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001483 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1484 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001485 raise ConfigError('No command to run')
1486
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001487 with tools.Profiler('GetRest'):
1488 # Create file system hierarchy.
1489 if not os.path.isdir(outdir):
1490 os.makedirs(outdir)
1491 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001492 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001493
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001494 # Ensure working directory exists.
1495 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1496 if not os.path.isdir(cwd):
1497 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001498
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001499 # Multimap: digest -> list of pairs (path, props).
1500 remaining = {}
1501 for filepath, props in settings.files.iteritems():
1502 if 'h' in props:
1503 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001504
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001505 # Now block on the remaining files to be downloaded and mapped.
1506 logging.info('Retrieving remaining files (%d of them)...',
1507 fetch_queue.pending_count)
1508 last_update = time.time()
1509 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1510 while remaining:
1511 detector.ping()
1512
1513 # Wait for any item to finish fetching to cache.
1514 digest = fetch_queue.wait(remaining)
1515
1516 # Link corresponding files to a fetched item in cache.
1517 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001518 cache.hardlink(
1519 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001520
1521 # Report progress.
1522 duration = time.time() - last_update
1523 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1524 msg = '%d files remaining...' % len(remaining)
1525 print msg
1526 logging.info(msg)
1527 last_update = time.time()
1528
1529 # Cache could evict some items we just tried to fetch, it's a fatal error.
1530 if not fetch_queue.verify_all_cached():
1531 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001532 return settings
1533
1534
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001535@subcommand.usage('<file1..fileN> or - to read from stdin')
1536def CMDarchive(parser, args):
1537 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001538 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001539
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001540 if files == ['-']:
1541 files = sys.stdin.readlines()
1542
1543 if not files:
1544 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001545
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001546 # Load the necessary metadata.
1547 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001548 infiles = dict(
1549 (
1550 f,
1551 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001552 's': os.stat(f).st_size,
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001553 'h': hash_file(f, get_hash_algo(options.namespace)),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001554 }
1555 )
1556 for f in files)
1557
vadimsh@chromium.orga4326472013-08-24 02:05:41 +00001558 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001559 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001560 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001561 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001562 infiles=infiles,
1563 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001564 if not ret:
1565 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1566 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001567
1568
1569def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001570 """Download data from the server.
1571
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001572 It can either download individual files or a complete tree from a .isolated
1573 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001574 """
1575 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001576 '-i', '--isolated', metavar='HASH',
1577 help='hash of an isolated file, .isolated file content is discarded, use '
1578 '--file if you need it')
1579 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001580 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1581 help='hash and destination of a file, can be used multiple times')
1582 parser.add_option(
1583 '-t', '--target', metavar='DIR', default=os.getcwd(),
1584 help='destination directory')
1585 options, args = parser.parse_args(args)
1586 if args:
1587 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001588 if bool(options.isolated) == bool(options.file):
1589 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001590
1591 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001592 storage = get_storage(options.isolate_server, options.namespace)
1593 cache = MemoryCache()
1594 algo = get_hash_algo(options.namespace)
1595
1596 # Fetching individual files.
1597 if options.file:
1598 channel = threading_utils.TaskChannel()
1599 pending = {}
1600 for digest, dest in options.file:
1601 pending[digest] = dest
1602 storage.async_fetch(
1603 channel,
1604 WorkerPool.MED,
1605 digest,
1606 UNKNOWN_FILE_SIZE,
1607 functools.partial(file_write, os.path.join(options.target, dest)))
1608 while pending:
1609 fetched = channel.pull()
1610 dest = pending.pop(fetched)
1611 logging.info('%s: %s', fetched, dest)
1612
1613 # Fetching whole isolated tree.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001614 if options.isolated:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001615 settings = fetch_isolated(
1616 isolated_hash=options.isolated,
1617 storage=storage,
1618 cache=cache,
1619 algo=algo,
1620 outdir=options.target,
1621 os_flavor=None,
1622 require_command=False)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001623 rel = os.path.join(options.target, settings.relative_cwd)
1624 print('To run this test please run from the directory %s:' %
1625 os.path.join(options.target, rel))
1626 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001627
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001628 return 0
1629
1630
1631class OptionParserIsolateServer(tools.OptionParserWithLogging):
1632 def __init__(self, **kwargs):
1633 tools.OptionParserWithLogging.__init__(self, **kwargs)
1634 self.add_option(
1635 '-I', '--isolate-server',
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00001636 metavar='URL', default='',
1637 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001638 self.add_option(
1639 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001640 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001641
1642 def parse_args(self, *args, **kwargs):
1643 options, args = tools.OptionParserWithLogging.parse_args(
1644 self, *args, **kwargs)
1645 options.isolate_server = options.isolate_server.rstrip('/')
1646 if not options.isolate_server:
1647 self.error('--isolate-server is required.')
1648 return options, args
1649
1650
1651def main(args):
1652 dispatcher = subcommand.CommandDispatcher(__name__)
1653 try:
1654 return dispatcher.execute(
1655 OptionParserIsolateServer(version=__version__), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00001656 except Exception as e:
1657 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001658 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001659
1660
1661if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001662 fix_encoding.fix_encoding()
1663 tools.disable_buffering()
1664 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001665 sys.exit(main(sys.argv[1:]))