blob: fb0d670dc0843b93ad9603922ef67b0a321d608a [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002# Copyright 2013 The Chromium Authors. All rights reserved.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00003# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000020import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000022from third_party import colorama
23from third_party.depot_tools import fix_encoding
24from third_party.depot_tools import subcommand
25
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000026from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000027from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000028from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000029
30
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000031# Version of isolate protocol passed to the server in /handshake request.
32ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000034
35# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000036# All files are sorted by likelihood of a change in the file content
37# (currently file size is used to estimate this: larger the file -> larger the
38# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000040# and so on. Numbers here is a trade-off; the more per request, the lower the
41# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42# larger values cause longer lookups, increasing the initial latency to start
43# uploading, which is especially an issue for large files. This value is
44# optimized for the "few thousands files to look up with minimal number of large
45# files missing" case.
46ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000047
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000048
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000049# A list of already compressed extension types that should not receive any
50# compression before being uploaded.
51ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip'
54]
55
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000056
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000057# The file size to be used when we don't know the correct file size,
58# generally used for .isolated files.
59UNKNOWN_FILE_SIZE = None
60
61
62# The size of each chunk to read when downloading and unzipping files.
63ZIPPED_FILE_CHUNK = 16 * 1024
64
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000065# Chunk size to use when doing disk I/O.
66DISK_FILE_CHUNK = 1024 * 1024
67
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000068# Chunk size to use when reading from network stream.
69NET_IO_FILE_CHUNK = 16 * 1024
70
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000071
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000072# Read timeout in seconds for downloads from isolate storage. If there's no
73# response from the server within this timeout whole download will be aborted.
74DOWNLOAD_READ_TIMEOUT = 60
75
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000076# Maximum expected delay (in seconds) between successive file fetches
77# in run_tha_test. If it takes longer than that, a deadlock might be happening
78# and all stack frames for all threads are dumped to log.
79DEADLOCK_TIMEOUT = 5 * 60
80
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
maruel@chromium.org385d73d2013-09-19 18:33:21 +000088# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
89# specify the names here.
90SUPPORTED_ALGOS = {
91 'md5': hashlib.md5,
92 'sha-1': hashlib.sha1,
93 'sha-512': hashlib.sha512,
94}
95
96
97# Used for serialization.
98SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
99
100
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000101class ConfigError(ValueError):
102 """Generic failure to load a .isolated file."""
103 pass
104
105
106class MappingError(OSError):
107 """Failed to recreate the tree."""
108 pass
109
110
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000111def is_valid_hash(value, algo):
112 """Returns if the value is a valid hash for the corresponding algorithm."""
113 size = 2 * algo().digest_size
114 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
115
116
117def hash_file(filepath, algo):
118 """Calculates the hash of a file without reading it all in memory at once.
119
120 |algo| should be one of hashlib hashing algorithm.
121 """
122 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000123 with open(filepath, 'rb') as f:
124 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000126 if not chunk:
127 break
128 digest.update(chunk)
129 return digest.hexdigest()
130
131
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000132def stream_read(stream, chunk_size):
133 """Reads chunks from |stream| and yields them."""
134 while True:
135 data = stream.read(chunk_size)
136 if not data:
137 break
138 yield data
139
140
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000141def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
142 """Yields file content in chunks of given |chunk_size|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000143 with open(filepath, 'rb') as f:
144 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000145 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000146 if not data:
147 break
148 yield data
149
150
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151def file_write(filepath, content_generator):
152 """Writes file content as generated by content_generator.
153
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 Creates the intermediary directory as needed.
155
156 Returns the number of bytes written.
157
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000158 Meant to be mocked out in unit tests.
159 """
160 filedir = os.path.dirname(filepath)
161 if not os.path.isdir(filedir):
162 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000163 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000164 with open(filepath, 'wb') as f:
165 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000166 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000167 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000168 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000169
170
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000171def zip_compress(content_generator, level=7):
172 """Reads chunks from |content_generator| and yields zip compressed chunks."""
173 compressor = zlib.compressobj(level)
174 for chunk in content_generator:
175 compressed = compressor.compress(chunk)
176 if compressed:
177 yield compressed
178 tail = compressor.flush(zlib.Z_FINISH)
179 if tail:
180 yield tail
181
182
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000183def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
184 """Reads zipped data from |content_generator| and yields decompressed data.
185
186 Decompresses data in small chunks (no larger than |chunk_size|) so that
187 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
188
189 Raises IOError if data is corrupted or incomplete.
190 """
191 decompressor = zlib.decompressobj()
192 compressed_size = 0
193 try:
194 for chunk in content_generator:
195 compressed_size += len(chunk)
196 data = decompressor.decompress(chunk, chunk_size)
197 if data:
198 yield data
199 while decompressor.unconsumed_tail:
200 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
201 if data:
202 yield data
203 tail = decompressor.flush()
204 if tail:
205 yield tail
206 except zlib.error as e:
207 raise IOError(
208 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
209 # Ensure all data was read and decompressed.
210 if decompressor.unused_data or decompressor.unconsumed_tail:
211 raise IOError('Not all data was decompressed')
212
213
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000214def get_zip_compression_level(filename):
215 """Given a filename calculates the ideal zip compression level to use."""
216 file_ext = os.path.splitext(filename)[1].lower()
217 # TODO(csharp): Profile to find what compression level works best.
218 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
219
220
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000221def create_directories(base_directory, files):
222 """Creates the directory structure needed by the given list of files."""
223 logging.debug('create_directories(%s, %d)', base_directory, len(files))
224 # Creates the tree of directories to create.
225 directories = set(os.path.dirname(f) for f in files)
226 for item in list(directories):
227 while item:
228 directories.add(item)
229 item = os.path.dirname(item)
230 for d in sorted(directories):
231 if d:
232 os.mkdir(os.path.join(base_directory, d))
233
234
235def create_links(base_directory, files):
236 """Creates any links needed by the given set of files."""
237 for filepath, properties in files:
238 if 'l' not in properties:
239 continue
240 if sys.platform == 'win32':
241 # TODO(maruel): Create junctions or empty text files similar to what
242 # cygwin do?
243 logging.warning('Ignoring symlink %s', filepath)
244 continue
245 outfile = os.path.join(base_directory, filepath)
246 # symlink doesn't exist on Windows. So the 'link' property should
247 # never be specified for windows .isolated file.
248 os.symlink(properties['l'], outfile) # pylint: disable=E1101
249 if 'm' in properties:
250 lchmod = getattr(os, 'lchmod', None)
251 if lchmod:
252 lchmod(outfile, properties['m'])
253
254
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000255def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000256 """Determines if the given files appears valid.
257
258 Currently it just checks the file's size.
259 """
260 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000261 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000262 actual_size = os.stat(filepath).st_size
263 if size != actual_size:
264 logging.warning(
265 'Found invalid item %s; %d != %d',
266 os.path.basename(filepath), actual_size, size)
267 return False
268 return True
269
270
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000271class WorkerPool(threading_utils.AutoRetryThreadPool):
272 """Thread pool that automatically retries on IOError and runs a preconfigured
273 function.
274 """
275 # Initial and maximum number of worker threads.
276 INITIAL_WORKERS = 2
277 MAX_WORKERS = 16
278 RETRIES = 5
279
280 def __init__(self):
281 super(WorkerPool, self).__init__(
282 [IOError],
283 self.RETRIES,
284 self.INITIAL_WORKERS,
285 self.MAX_WORKERS,
286 0,
287 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000288
289
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000290class Item(object):
291 """An item to push to Storage.
292
293 It starts its life in a main thread, travels to 'contains' thread, then to
294 'push' thread and then finally back to the main thread.
295
296 It is never used concurrently from multiple threads.
297 """
298
299 def __init__(self, digest, size, is_isolated=False):
300 self.digest = digest
301 self.size = size
302 self.is_isolated = is_isolated
303 self.compression_level = 6
304 self.push_state = None
305
306 def content(self, chunk_size):
307 """Iterable with content of this item in chunks of given size."""
308 raise NotImplementedError()
309
310
311class FileItem(Item):
312 """A file to push to Storage."""
313
314 def __init__(self, path, digest, size, is_isolated):
315 super(FileItem, self).__init__(digest, size, is_isolated)
316 self.path = path
317 self.compression_level = get_zip_compression_level(path)
318
319 def content(self, chunk_size):
320 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000321
322
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000323class Storage(object):
324 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000325
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000326 def __init__(self, storage_api, use_zip):
327 self.use_zip = use_zip
328 self._storage_api = storage_api
329 self._cpu_thread_pool = None
330 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000331
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000332 @property
333 def cpu_thread_pool(self):
334 """ThreadPool for CPU-bound tasks like zipping."""
335 if self._cpu_thread_pool is None:
336 self._cpu_thread_pool = threading_utils.ThreadPool(
337 2, max(threading_utils.num_processors(), 2), 0, 'zip')
338 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000339
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000340 @property
341 def net_thread_pool(self):
342 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
343 if self._net_thread_pool is None:
344 self._net_thread_pool = WorkerPool()
345 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000346
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000347 def close(self):
348 """Waits for all pending tasks to finish."""
349 if self._cpu_thread_pool:
350 self._cpu_thread_pool.join()
351 self._cpu_thread_pool.close()
352 self._cpu_thread_pool = None
353 if self._net_thread_pool:
354 self._net_thread_pool.join()
355 self._net_thread_pool.close()
356 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000357
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000358 def __enter__(self):
359 """Context manager interface."""
360 return self
361
362 def __exit__(self, _exc_type, _exc_value, _traceback):
363 """Context manager interface."""
364 self.close()
365 return False
366
367 def upload_tree(self, indir, infiles):
368 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000369
370 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000371 indir: root directory the infiles are based in.
372 infiles: dict of files to upload from |indir|.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000373 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000374 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
375
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000376 # TODO(vadimsh): Introduce Item as a part of the public interface?
377
378 # Convert |indir| + |infiles| into a list of FileItem objects.
379 # Filter out symlinks, since they are not represented by items on isolate
380 # server side.
381 items = [
382 FileItem(
383 path=os.path.join(indir, filepath),
384 digest=metadata['h'],
385 size=metadata['s'],
386 is_isolated=metadata.get('priority') == '0')
387 for filepath, metadata in infiles.iteritems()
388 if 'l' not in metadata
389 ]
390
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000391 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000392 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000393 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000394 for missing_item in self.get_missing_items(items):
395 missing.add(missing_item)
396 self.async_push(
397 channel,
398 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
399 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000400
401 # No need to spawn deadlock detector thread if there's nothing to upload.
402 if missing:
403 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
404 # Wait for all started uploads to finish.
405 uploaded = 0
406 while uploaded != len(missing):
407 detector.ping()
408 item = channel.pull()
409 uploaded += 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000410 logging.debug(
411 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 logging.info('All files are uploaded')
413
414 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000415 total = len(items)
416 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000417 logging.info(
418 'Total: %6d, %9.1fkb',
419 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000420 total_size / 1024.)
421 cache_hit = set(items) - missing
422 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000423 logging.info(
424 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
425 len(cache_hit),
426 cache_hit_size / 1024.,
427 len(cache_hit) * 100. / total,
428 cache_hit_size * 100. / total_size if total_size else 0)
429 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000430 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000431 logging.info(
432 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
433 len(cache_miss),
434 cache_miss_size / 1024.,
435 len(cache_miss) * 100. / total,
436 cache_miss_size * 100. / total_size if total_size else 0)
437
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000438 def async_push(self, channel, priority, item):
439 """Starts asynchronous push to the server in a parallel thread.
440
441 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000442 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000443 priority: thread pool task priority for the push.
444 item: item to upload as instance of Item class.
445 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000446 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000447 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000448 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000449 return item
450
451 # If zipping is not required, just start a push task.
452 if not self.use_zip:
453 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000454 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000455 return
456
457 # If zipping is enabled, zip in a separate thread.
458 def zip_and_push():
459 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
460 # content right here. It will block until all file is zipped.
461 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000462 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
463 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 data = ''.join(stream)
465 except Exception as exc:
466 logging.error('Failed to zip \'%s\': %s', item, exc)
467 channel.send_exception(exc)
468 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000469 self.net_thread_pool.add_task_with_channel(
470 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000471 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000472
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000473 def async_fetch(self, channel, priority, digest, size, sink):
474 """Starts asynchronous fetch from the server in a parallel thread.
475
476 Arguments:
477 channel: TaskChannel that receives back |digest| when download ends.
478 priority: thread pool task priority for the fetch.
479 digest: hex digest of an item to download.
480 size: expected size of the item (after decompression).
481 sink: function that will be called as sink(generator).
482 """
483 def fetch():
484 try:
485 # Prepare reading pipeline.
486 stream = self._storage_api.fetch(digest)
487 if self.use_zip:
488 stream = zip_decompress(stream, DISK_FILE_CHUNK)
489 # Run |stream| through verifier that will assert its size.
490 verifier = FetchStreamVerifier(stream, size)
491 # Verified stream goes to |sink|.
492 sink(verifier.run())
493 except Exception as err:
494 logging.warning('Failed to fetch %s: %s', digest, err)
495 raise
496 return digest
497
498 # Don't bother with zip_thread_pool for decompression. Decompression is
499 # really fast and most probably IO bound anyway.
500 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
501
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000502 def get_missing_items(self, items):
503 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000504
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000505 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000506
507 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000508 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000509
510 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000511 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000512 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000513 channel = threading_utils.TaskChannel()
514 pending = 0
515 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000516 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000517 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
518 self._storage_api.contains, batch)
519 pending += 1
520 # Yield results as they come in.
521 for _ in xrange(pending):
522 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000523 yield missing
524
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000525 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000526 def batch_items_for_check(items):
527 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000528
529 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000530 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000531
532 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000533 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000534
535 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000536 Batches of items to query for existence in a single operation,
537 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000538 """
539 batch_count = 0
540 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
541 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 for item in sorted(items, key=lambda x: x.size, reverse=True):
543 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000544 if len(next_queries) == batch_size_limit:
545 yield next_queries
546 next_queries = []
547 batch_count += 1
548 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
549 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
550 if next_queries:
551 yield next_queries
552
553
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000554class FetchQueue(object):
555 """Fetches items from Storage and places them into LocalCache.
556
557 It manages multiple concurrent fetch operations. Acts as a bridge between
558 Storage and LocalCache so that Storage and LocalCache don't depend on each
559 other at all.
560 """
561
562 def __init__(self, storage, cache):
563 self.storage = storage
564 self.cache = cache
565 self._channel = threading_utils.TaskChannel()
566 self._pending = set()
567 self._accessed = set()
568 self._fetched = cache.cached_set()
569
570 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
571 """Starts asynchronous fetch of item |digest|."""
572 # Fetching it now?
573 if digest in self._pending:
574 return
575
576 # Mark this file as in use, verify_all_cached will later ensure it is still
577 # in cache.
578 self._accessed.add(digest)
579
580 # Already fetched? Notify cache to update item's LRU position.
581 if digest in self._fetched:
582 # 'touch' returns True if item is in cache and not corrupted.
583 if self.cache.touch(digest, size):
584 return
585 # Item is corrupted, remove it from cache and fetch it again.
586 self._fetched.remove(digest)
587 self.cache.evict(digest)
588
589 # TODO(maruel): It should look at the free disk space, the current cache
590 # size and the size of the new item on every new item:
591 # - Trim the cache as more entries are listed when free disk space is low,
592 # otherwise if the amount of data downloaded during the run > free disk
593 # space, it'll crash.
594 # - Make sure there's enough free disk space to fit all dependencies of
595 # this run! If not, abort early.
596
597 # Start fetching.
598 self._pending.add(digest)
599 self.storage.async_fetch(
600 self._channel, priority, digest, size,
601 functools.partial(self.cache.write, digest))
602
603 def wait(self, digests):
604 """Starts a loop that waits for at least one of |digests| to be retrieved.
605
606 Returns the first digest retrieved.
607 """
608 # Flush any already fetched items.
609 for digest in digests:
610 if digest in self._fetched:
611 return digest
612
613 # Ensure all requested items are being fetched now.
614 assert all(digest in self._pending for digest in digests), (
615 digests, self._pending)
616
617 # Wait for some requested item to finish fetching.
618 while self._pending:
619 digest = self._channel.pull()
620 self._pending.remove(digest)
621 self._fetched.add(digest)
622 if digest in digests:
623 return digest
624
625 # Should never reach this point due to assert above.
626 raise RuntimeError('Impossible state')
627
628 def inject_local_file(self, path, algo):
629 """Adds local file to the cache as if it was fetched from storage."""
630 with open(path, 'rb') as f:
631 data = f.read()
632 digest = algo(data).hexdigest()
633 self.cache.write(digest, [data])
634 self._fetched.add(digest)
635 return digest
636
637 @property
638 def pending_count(self):
639 """Returns number of items to be fetched."""
640 return len(self._pending)
641
642 def verify_all_cached(self):
643 """True if all accessed items are in cache."""
644 return self._accessed.issubset(self.cache.cached_set())
645
646
647class FetchStreamVerifier(object):
648 """Verifies that fetched file is valid before passing it to the LocalCache."""
649
650 def __init__(self, stream, expected_size):
651 self.stream = stream
652 self.expected_size = expected_size
653 self.current_size = 0
654
655 def run(self):
656 """Generator that yields same items as |stream|.
657
658 Verifies |stream| is complete before yielding a last chunk to consumer.
659
660 Also wraps IOError produced by consumer into MappingError exceptions since
661 otherwise Storage will retry fetch on unrelated local cache errors.
662 """
663 # Read one chunk ahead, keep it in |stored|.
664 # That way a complete stream can be verified before pushing last chunk
665 # to consumer.
666 stored = None
667 for chunk in self.stream:
668 assert chunk is not None
669 if stored is not None:
670 self._inspect_chunk(stored, is_last=False)
671 try:
672 yield stored
673 except IOError as exc:
674 raise MappingError('Failed to store an item in cache: %s' % exc)
675 stored = chunk
676 if stored is not None:
677 self._inspect_chunk(stored, is_last=True)
678 try:
679 yield stored
680 except IOError as exc:
681 raise MappingError('Failed to store an item in cache: %s' % exc)
682
683 def _inspect_chunk(self, chunk, is_last):
684 """Called for each fetched chunk before passing it to consumer."""
685 self.current_size += len(chunk)
686 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
687 (self.expected_size != self.current_size)):
688 raise IOError('Incorrect file size: expected %d, got %d' % (
689 self.expected_size, self.current_size))
690
691
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000692class StorageApi(object):
693 """Interface for classes that implement low-level storage operations."""
694
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000695 def fetch(self, digest):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000696 """Fetches an object and yields its content.
697
698 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000699 digest: hash digest of item to download.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000700
701 Yields:
702 Chunks of downloaded item (as str objects).
703 """
704 raise NotImplementedError()
705
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000706 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000707 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000708
709 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000710 item: Item object that holds information about an item being pushed.
711 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000712
713 Returns:
714 None.
715 """
716 raise NotImplementedError()
717
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000718 def contains(self, items):
719 """Checks for existence of given |items| on the server.
720
721 Mutates |items| by assigning opaque implement specific object to Item's
722 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000723
724 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000725 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000726
727 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000728 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000729 """
730 raise NotImplementedError()
731
732
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000733class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000734 """StorageApi implementation that downloads and uploads to Isolate Server.
735
736 It uploads and downloads directly from Google Storage whenever appropriate.
737 """
738
739 class _PushState(object):
740 """State needed to call .push(), to be stored in Item.push_state."""
741 def __init__(self, upload_url, finalize_url):
742 self.upload_url = upload_url
743 self.finalize_url = finalize_url
744 self.uploaded = False
745 self.finalized = False
746
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000747 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000748 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000749 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000750 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000751 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000752 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000753 self._server_caps = None
754
755 @staticmethod
756 def _generate_handshake_request():
757 """Returns a dict to be sent as handshake request body."""
758 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
759 return {
760 'client_app_version': __version__,
761 'fetcher': True,
762 'protocol_version': ISOLATE_PROTOCOL_VERSION,
763 'pusher': True,
764 }
765
766 @staticmethod
767 def _validate_handshake_response(caps):
768 """Validates and normalizes handshake response."""
769 logging.info('Protocol version: %s', caps['protocol_version'])
770 logging.info('Server version: %s', caps['server_app_version'])
771 if caps.get('error'):
772 raise MappingError(caps['error'])
773 if not caps['access_token']:
774 raise ValueError('access_token is missing')
775 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000776
777 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000778 def _server_capabilities(self):
779 """Performs handshake with the server if not yet done.
780
781 Returns:
782 Server capabilities dictionary as returned by /handshake endpoint.
783
784 Raises:
785 MappingError if server rejects the handshake.
786 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000787 # TODO(maruel): Make this request much earlier asynchronously while the
788 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000789 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000790 if self._server_caps is None:
791 request_body = json.dumps(
792 self._generate_handshake_request(), separators=(',', ':'))
793 response = net.url_read(
794 url=self.base_url + '/content-gs/handshake',
795 data=request_body,
796 content_type='application/json',
797 method='POST')
798 if response is None:
799 raise MappingError('Failed to perform handshake.')
800 try:
801 caps = json.loads(response)
802 if not isinstance(caps, dict):
803 raise ValueError('Expecting JSON dict')
804 self._server_caps = self._validate_handshake_response(caps)
805 except (ValueError, KeyError, TypeError) as exc:
806 # KeyError exception has very confusing str conversion: it's just a
807 # missing key value and nothing else. So print exception class name
808 # as well.
809 raise MappingError('Invalid handshake response (%s): %s' % (
810 exc.__class__.__name__, exc))
811 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000812
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000813 def fetch(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000814 assert isinstance(digest, basestring)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815
816 source_url = '%s/content-gs/retrieve/%s/%s' % (
817 self.base_url, self.namespace, digest)
818 logging.debug('download_file(%s)', source_url)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000819
820 # Because the app engine DB is only eventually consistent, retry 404 errors
821 # because the file might just not be visible yet (even though it has been
822 # uploaded).
823 connection = net.url_open(
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000824 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000825 if not connection:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000826 raise IOError('Unable to open connection to %s' % source_url)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000827 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000828
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000829 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000830 assert isinstance(item, Item)
831 assert isinstance(item.push_state, IsolateServer._PushState)
832 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000833
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000834 # TODO(vadimsh): Do not read from |content| generator when retrying push.
835 # If |content| is indeed a generator, it can not be re-winded back
836 # to the beginning of the stream. A retry will find it exhausted. A possible
837 # solution is to wrap |content| generator with some sort of caching
838 # restartable generator. It should be done alongside streaming support
839 # implementation.
840
841 # This push operation may be a retry after failed finalization call below,
842 # no need to reupload contents in that case.
843 if not item.push_state.uploaded:
844 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
845 # upload support is implemented.
846 if isinstance(content, list) and len(content) == 1:
847 content = content[0]
848 else:
849 content = ''.join(content)
850 # PUT file to |upload_url|.
851 response = net.url_read(
852 url=item.push_state.upload_url,
853 data=content,
854 content_type='application/octet-stream',
855 method='PUT')
856 if response is None:
857 raise IOError('Failed to upload a file %s to %s' % (
858 item.digest, item.push_state.upload_url))
859 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000860 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000861 logging.info(
862 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000863
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000864 # Optionally notify the server that it's done.
865 if item.push_state.finalize_url:
866 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
867 # send it to isolated server. That way isolate server can verify that
868 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
869 # stored files).
870 response = net.url_read(
871 url=item.push_state.finalize_url,
872 data='',
873 content_type='application/json',
874 method='POST')
875 if response is None:
876 raise IOError('Failed to finalize an upload of %s' % item.digest)
877 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000878
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000879 def contains(self, items):
880 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000881
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000882 # Request body is a json encoded list of dicts.
883 body = [
884 {
885 'h': item.digest,
886 's': item.size,
887 'i': int(item.is_isolated),
888 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000889 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000890
891 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
892 self.base_url,
893 self.namespace,
894 urllib.quote(self._server_capabilities['access_token']))
895 response_body = net.url_read(
896 url=query_url,
897 data=json.dumps(body, separators=(',', ':')),
898 content_type='application/json',
899 method='POST')
900 if response_body is None:
901 raise MappingError('Failed to execute /pre-upload query')
902
903 # Response body is a list of push_urls (or null if file is already present).
904 try:
905 response = json.loads(response_body)
906 if not isinstance(response, list):
907 raise ValueError('Expecting response with json-encoded list')
908 if len(response) != len(items):
909 raise ValueError(
910 'Incorrect number of items in the list, expected %d, '
911 'but got %d' % (len(items), len(response)))
912 except ValueError as err:
913 raise MappingError(
914 'Invalid response from server: %s, body is %s' % (err, response_body))
915
916 # Pick Items that are missing, attach _PushState to them.
917 missing_items = []
918 for i, push_urls in enumerate(response):
919 if push_urls:
920 assert len(push_urls) == 2, str(push_urls)
921 item = items[i]
922 assert item.push_state is None
923 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
924 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000925 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000926 len(items), len(items) - len(missing_items))
927 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000928
929
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000930class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000931 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000932
933 The common use case is a NFS/CIFS file server that is mounted locally that is
934 used to fetch the file on a local partition.
935 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000936
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000937 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000938 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000939 self.base_path = base_path
940
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000941 def fetch(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000942 assert isinstance(digest, basestring)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000943 return file_read(os.path.join(self.base_path, digest))
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000944
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000945 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000946 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000947 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000948
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000949 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000950 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000951 item for item in items
952 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000953 ]
954
955
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000956class LocalCache(object):
957 """Local cache that stores objects fetched via Storage.
958
959 It can be accessed concurrently from multiple threads, so it should protect
960 its internal state with some lock.
961 """
962
963 def __enter__(self):
964 """Context manager interface."""
965 return self
966
967 def __exit__(self, _exc_type, _exec_value, _traceback):
968 """Context manager interface."""
969 return False
970
971 def cached_set(self):
972 """Returns a set of all cached digests (always a new object)."""
973 raise NotImplementedError()
974
975 def touch(self, digest, size):
976 """Ensures item is not corrupted and updates its LRU position.
977
978 Arguments:
979 digest: hash digest of item to check.
980 size: expected size of this item.
981
982 Returns:
983 True if item is in cache and not corrupted.
984 """
985 raise NotImplementedError()
986
987 def evict(self, digest):
988 """Removes item from cache if it's there."""
989 raise NotImplementedError()
990
991 def read(self, digest):
992 """Returns contents of the cached item as a single str."""
993 raise NotImplementedError()
994
995 def write(self, digest, content):
996 """Reads data from |content| generator and stores it in cache."""
997 raise NotImplementedError()
998
999 def link(self, digest, dest, file_mode=None):
1000 """Ensures file at |dest| has same content as cached |digest|."""
1001 raise NotImplementedError()
1002
1003
1004class MemoryCache(LocalCache):
1005 """LocalCache implementation that stores everything in memory."""
1006
1007 def __init__(self):
1008 super(MemoryCache, self).__init__()
1009 # Let's not assume dict is thread safe.
1010 self._lock = threading.Lock()
1011 self._contents = {}
1012
1013 def cached_set(self):
1014 with self._lock:
1015 return set(self._contents)
1016
1017 def touch(self, digest, size):
1018 with self._lock:
1019 return digest in self._contents
1020
1021 def evict(self, digest):
1022 with self._lock:
1023 self._contents.pop(digest, None)
1024
1025 def read(self, digest):
1026 with self._lock:
1027 return self._contents[digest]
1028
1029 def write(self, digest, content):
1030 # Assemble whole stream before taking the lock.
1031 data = ''.join(content)
1032 with self._lock:
1033 self._contents[digest] = data
1034
1035 def link(self, digest, dest, file_mode=None):
1036 file_write(dest, [self.read(digest)])
1037 if file_mode is not None:
1038 os.chmod(dest, file_mode)
1039
1040
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001041def get_hash_algo(_namespace):
1042 """Return hash algorithm class to use when uploading to given |namespace|."""
1043 # TODO(vadimsh): Implement this at some point.
1044 return hashlib.sha1
1045
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001046
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001047def is_namespace_with_compression(namespace):
1048 """Returns True if given |namespace| stores compressed objects."""
1049 return namespace.endswith(('-gzip', '-deflate'))
1050
1051
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001052def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001053 """Returns an object that implements StorageApi interface."""
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001054 if re.match(r'^https?://.+$', file_or_url):
1055 return IsolateServer(file_or_url, namespace)
1056 else:
1057 return FileSystem(file_or_url)
1058
1059
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001060def get_storage(file_or_url, namespace):
1061 """Returns Storage class configured with appropriate StorageApi instance."""
1062 return Storage(
1063 get_storage_api(file_or_url, namespace),
1064 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001065
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001066
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001067def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001068 """Uploads the given tree to the given url.
1069
1070 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001071 base_url: The base url, it is assume that |base_url|/has/ can be used to
1072 query if an element was already uploaded, and |base_url|/store/
1073 can be used to upload a new element.
1074 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001075 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001076 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001077 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001078 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001079 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001080 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001081
1082
maruel@chromium.org41601642013-09-18 19:40:46 +00001083def load_isolated(content, os_flavor, algo):
1084 """Verifies the .isolated file is valid and loads this object with the json
1085 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001086
1087 Arguments:
1088 - content: raw serialized content to load.
1089 - os_flavor: OS to load this file on. Optional.
1090 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1091 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001092 """
1093 try:
1094 data = json.loads(content)
1095 except ValueError:
1096 raise ConfigError('Failed to parse: %s...' % content[:100])
1097
1098 if not isinstance(data, dict):
1099 raise ConfigError('Expected dict, got %r' % data)
1100
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001101 # Check 'version' first, since it could modify the parsing after.
1102 value = data.get('version', '1.0')
1103 if not isinstance(value, basestring):
1104 raise ConfigError('Expected string, got %r' % value)
1105 if not re.match(r'^(\d+)\.(\d+)$', value):
1106 raise ConfigError('Expected a compatible version, got %r' % value)
1107 if value.split('.', 1)[0] != '1':
1108 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1109
1110 if algo is None:
1111 # Default the algorithm used in the .isolated file itself, falls back to
1112 # 'sha-1' if unspecified.
1113 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1114
maruel@chromium.org41601642013-09-18 19:40:46 +00001115 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001116 if key == 'algo':
1117 if not isinstance(value, basestring):
1118 raise ConfigError('Expected string, got %r' % value)
1119 if value not in SUPPORTED_ALGOS:
1120 raise ConfigError(
1121 'Expected one of \'%s\', got %r' %
1122 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1123 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1124 raise ConfigError(
1125 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1126
1127 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001128 if not isinstance(value, list):
1129 raise ConfigError('Expected list, got %r' % value)
1130 if not value:
1131 raise ConfigError('Expected non-empty command')
1132 for subvalue in value:
1133 if not isinstance(subvalue, basestring):
1134 raise ConfigError('Expected string, got %r' % subvalue)
1135
1136 elif key == 'files':
1137 if not isinstance(value, dict):
1138 raise ConfigError('Expected dict, got %r' % value)
1139 for subkey, subvalue in value.iteritems():
1140 if not isinstance(subkey, basestring):
1141 raise ConfigError('Expected string, got %r' % subkey)
1142 if not isinstance(subvalue, dict):
1143 raise ConfigError('Expected dict, got %r' % subvalue)
1144 for subsubkey, subsubvalue in subvalue.iteritems():
1145 if subsubkey == 'l':
1146 if not isinstance(subsubvalue, basestring):
1147 raise ConfigError('Expected string, got %r' % subsubvalue)
1148 elif subsubkey == 'm':
1149 if not isinstance(subsubvalue, int):
1150 raise ConfigError('Expected int, got %r' % subsubvalue)
1151 elif subsubkey == 'h':
1152 if not is_valid_hash(subsubvalue, algo):
1153 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1154 elif subsubkey == 's':
1155 if not isinstance(subsubvalue, int):
1156 raise ConfigError('Expected int, got %r' % subsubvalue)
1157 else:
1158 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001159 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001160 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001161 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1162 subvalue)
1163 if bool('h' in subvalue) != bool('s' in subvalue):
1164 raise ConfigError(
1165 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1166 subvalue)
1167 if bool('s' in subvalue) == bool('l' in subvalue):
1168 raise ConfigError(
1169 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1170 subvalue)
1171 if bool('l' in subvalue) and bool('m' in subvalue):
1172 raise ConfigError(
1173 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001174 subvalue)
1175
1176 elif key == 'includes':
1177 if not isinstance(value, list):
1178 raise ConfigError('Expected list, got %r' % value)
1179 if not value:
1180 raise ConfigError('Expected non-empty includes list')
1181 for subvalue in value:
1182 if not is_valid_hash(subvalue, algo):
1183 raise ConfigError('Expected sha-1, got %r' % subvalue)
1184
1185 elif key == 'read_only':
1186 if not isinstance(value, bool):
1187 raise ConfigError('Expected bool, got %r' % value)
1188
1189 elif key == 'relative_cwd':
1190 if not isinstance(value, basestring):
1191 raise ConfigError('Expected string, got %r' % value)
1192
1193 elif key == 'os':
1194 if os_flavor and value != os_flavor:
1195 raise ConfigError(
1196 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1197 (os_flavor, value))
1198
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001199 elif key == 'version':
1200 # Already checked above.
1201 pass
1202
maruel@chromium.org41601642013-09-18 19:40:46 +00001203 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001204 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001205
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001206 # Automatically fix os.path.sep if necessary. While .isolated files are always
1207 # in the the native path format, someone could want to download an .isolated
1208 # tree from another OS.
1209 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1210 if 'files' in data:
1211 data['files'] = dict(
1212 (k.replace(wrong_path_sep, os.path.sep), v)
1213 for k, v in data['files'].iteritems())
1214 for v in data['files'].itervalues():
1215 if 'l' in v:
1216 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1217 if 'relative_cwd' in data:
1218 data['relative_cwd'] = data['relative_cwd'].replace(
1219 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001220 return data
1221
1222
1223class IsolatedFile(object):
1224 """Represents a single parsed .isolated file."""
1225 def __init__(self, obj_hash, algo):
1226 """|obj_hash| is really the sha-1 of the file."""
1227 logging.debug('IsolatedFile(%s)' % obj_hash)
1228 self.obj_hash = obj_hash
1229 self.algo = algo
1230 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1231 # .isolate and all the .isolated files recursively included by it with
1232 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1233 # .isolated file in the hash table, is important, as the later ones are not
1234 # processed until the firsts are retrieved and read.
1235 self.can_fetch = False
1236
1237 # Raw data.
1238 self.data = {}
1239 # A IsolatedFile instance, one per object in self.includes.
1240 self.children = []
1241
1242 # Set once the .isolated file is loaded.
1243 self._is_parsed = False
1244 # Set once the files are fetched.
1245 self.files_fetched = False
1246
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001247 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001248 """Verifies the .isolated file is valid and loads this object with the json
1249 data.
1250 """
1251 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1252 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001253 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001254 self.children = [
1255 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1256 ]
1257 self._is_parsed = True
1258
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001259 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001260 """Adds files in this .isolated file not present in |files| dictionary.
1261
1262 Preemptively request files.
1263
1264 Note that |files| is modified by this function.
1265 """
1266 assert self.can_fetch
1267 if not self._is_parsed or self.files_fetched:
1268 return
1269 logging.debug('fetch_files(%s)' % self.obj_hash)
1270 for filepath, properties in self.data.get('files', {}).iteritems():
1271 # Root isolated has priority on the files being mapped. In particular,
1272 # overriden files must not be fetched.
1273 if filepath not in files:
1274 files[filepath] = properties
1275 if 'h' in properties:
1276 # Preemptively request files.
1277 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001278 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001279 self.files_fetched = True
1280
1281
1282class Settings(object):
1283 """Results of a completely parsed .isolated file."""
1284 def __init__(self):
1285 self.command = []
1286 self.files = {}
1287 self.read_only = None
1288 self.relative_cwd = None
1289 # The main .isolated file, a IsolatedFile instance.
1290 self.root = None
1291
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001292 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001293 """Loads the .isolated and all the included .isolated asynchronously.
1294
1295 It enables support for "included" .isolated files. They are processed in
1296 strict order but fetched asynchronously from the cache. This is important so
1297 that a file in an included .isolated file that is overridden by an embedding
1298 .isolated file is not fetched needlessly. The includes are fetched in one
1299 pass and the files are fetched as soon as all the ones on the left-side
1300 of the tree were fetched.
1301
1302 The prioritization is very important here for nested .isolated files.
1303 'includes' have the highest priority and the algorithm is optimized for both
1304 deep and wide trees. A deep one is a long link of .isolated files referenced
1305 one at a time by one item in 'includes'. A wide one has a large number of
1306 'includes' in a single .isolated file. 'left' is defined as an included
1307 .isolated file earlier in the 'includes' list. So the order of the elements
1308 in 'includes' is important.
1309 """
1310 self.root = IsolatedFile(root_isolated_hash, algo)
1311
1312 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1313 pending = {}
1314 # Set of hashes of already retrieved items to refuse recursive includes.
1315 seen = set()
1316
1317 def retrieve(isolated_file):
1318 h = isolated_file.obj_hash
1319 if h in seen:
1320 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1321 assert h not in pending
1322 seen.add(h)
1323 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001324 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001325
1326 retrieve(self.root)
1327
1328 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001329 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001330 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001331 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001332 if item_hash == root_isolated_hash:
1333 # It's the root item.
1334 item.can_fetch = True
1335
1336 for new_child in item.children:
1337 retrieve(new_child)
1338
1339 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001340 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001341
1342 def check(n):
1343 return all(check(x) for x in n.children) and n.files_fetched
1344 assert check(self.root)
1345
1346 self.relative_cwd = self.relative_cwd or ''
1347 self.read_only = self.read_only or False
1348
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001349 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001350 if node.can_fetch:
1351 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001352 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001353 will_break = False
1354 for i in node.children:
1355 if not i.can_fetch:
1356 if will_break:
1357 break
1358 # Automatically mark the first one as fetcheable.
1359 i.can_fetch = True
1360 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001361 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001362
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001363 def _update_self(self, fetch_queue, node):
1364 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001365 # Grabs properties.
1366 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001367 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001368 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001369 if self.command:
1370 self.command[0] = self.command[0].replace('/', os.path.sep)
1371 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001372 if self.read_only is None and node.data.get('read_only') is not None:
1373 self.read_only = node.data['read_only']
1374 if (self.relative_cwd is None and
1375 node.data.get('relative_cwd') is not None):
1376 self.relative_cwd = node.data['relative_cwd']
1377
1378
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001379def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001380 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001381 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001382
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001383 Arguments:
1384 isolated_hash: hash of the root *.isolated file.
1385 storage: Storage class that communicates with isolate storage.
1386 cache: LocalCache class that knows how to store and map files locally.
1387 algo: hash algorithm to use.
1388 outdir: Output directory to map file tree to.
1389 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1390 require_command: Ensure *.isolated specifies a command to run.
1391
1392 Returns:
1393 Settings object that holds details about loaded *.isolated file.
1394 """
1395 with cache:
1396 fetch_queue = FetchQueue(storage, cache)
1397 settings = Settings()
1398
1399 with tools.Profiler('GetIsolateds'):
1400 # Optionally support local files by manually adding them to cache.
1401 if not is_valid_hash(isolated_hash, algo):
1402 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1403
1404 # Load all *.isolated and start loading rest of the files.
1405 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001406 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001407 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1408 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001409 raise ConfigError('No command to run')
1410
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001411 with tools.Profiler('GetRest'):
1412 # Create file system hierarchy.
1413 if not os.path.isdir(outdir):
1414 os.makedirs(outdir)
1415 create_directories(outdir, settings.files)
1416 create_links(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001417
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001418 # Ensure working directory exists.
1419 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1420 if not os.path.isdir(cwd):
1421 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001422
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001423 # Multimap: digest -> list of pairs (path, props).
1424 remaining = {}
1425 for filepath, props in settings.files.iteritems():
1426 if 'h' in props:
1427 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001428
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001429 # Now block on the remaining files to be downloaded and mapped.
1430 logging.info('Retrieving remaining files (%d of them)...',
1431 fetch_queue.pending_count)
1432 last_update = time.time()
1433 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1434 while remaining:
1435 detector.ping()
1436
1437 # Wait for any item to finish fetching to cache.
1438 digest = fetch_queue.wait(remaining)
1439
1440 # Link corresponding files to a fetched item in cache.
1441 for filepath, props in remaining.pop(digest):
1442 cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
1443
1444 # Report progress.
1445 duration = time.time() - last_update
1446 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1447 msg = '%d files remaining...' % len(remaining)
1448 print msg
1449 logging.info(msg)
1450 last_update = time.time()
1451
1452 # Cache could evict some items we just tried to fetch, it's a fatal error.
1453 if not fetch_queue.verify_all_cached():
1454 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001455 return settings
1456
1457
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001458@subcommand.usage('<file1..fileN> or - to read from stdin')
1459def CMDarchive(parser, args):
1460 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001461 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001462
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001463 if files == ['-']:
1464 files = sys.stdin.readlines()
1465
1466 if not files:
1467 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001468
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001469 # Load the necessary metadata.
1470 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001471 infiles = dict(
1472 (
1473 f,
1474 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001475 's': os.stat(f).st_size,
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001476 'h': hash_file(f, get_hash_algo(options.namespace)),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001477 }
1478 )
1479 for f in files)
1480
vadimsh@chromium.orga4326472013-08-24 02:05:41 +00001481 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001482 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001483 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001484 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001485 infiles=infiles,
1486 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001487 if not ret:
1488 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1489 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001490
1491
1492def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001493 """Download data from the server.
1494
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001495 It can either download individual files or a complete tree from a .isolated
1496 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001497 """
1498 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001499 '-i', '--isolated', metavar='HASH',
1500 help='hash of an isolated file, .isolated file content is discarded, use '
1501 '--file if you need it')
1502 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001503 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1504 help='hash and destination of a file, can be used multiple times')
1505 parser.add_option(
1506 '-t', '--target', metavar='DIR', default=os.getcwd(),
1507 help='destination directory')
1508 options, args = parser.parse_args(args)
1509 if args:
1510 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001511 if bool(options.isolated) == bool(options.file):
1512 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001513
1514 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001515 storage = get_storage(options.isolate_server, options.namespace)
1516 cache = MemoryCache()
1517 algo = get_hash_algo(options.namespace)
1518
1519 # Fetching individual files.
1520 if options.file:
1521 channel = threading_utils.TaskChannel()
1522 pending = {}
1523 for digest, dest in options.file:
1524 pending[digest] = dest
1525 storage.async_fetch(
1526 channel,
1527 WorkerPool.MED,
1528 digest,
1529 UNKNOWN_FILE_SIZE,
1530 functools.partial(file_write, os.path.join(options.target, dest)))
1531 while pending:
1532 fetched = channel.pull()
1533 dest = pending.pop(fetched)
1534 logging.info('%s: %s', fetched, dest)
1535
1536 # Fetching whole isolated tree.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001537 if options.isolated:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001538 settings = fetch_isolated(
1539 isolated_hash=options.isolated,
1540 storage=storage,
1541 cache=cache,
1542 algo=algo,
1543 outdir=options.target,
1544 os_flavor=None,
1545 require_command=False)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001546 rel = os.path.join(options.target, settings.relative_cwd)
1547 print('To run this test please run from the directory %s:' %
1548 os.path.join(options.target, rel))
1549 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001550
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001551 return 0
1552
1553
1554class OptionParserIsolateServer(tools.OptionParserWithLogging):
1555 def __init__(self, **kwargs):
1556 tools.OptionParserWithLogging.__init__(self, **kwargs)
1557 self.add_option(
1558 '-I', '--isolate-server',
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00001559 metavar='URL', default='',
1560 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001561 self.add_option(
1562 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001563 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001564
1565 def parse_args(self, *args, **kwargs):
1566 options, args = tools.OptionParserWithLogging.parse_args(
1567 self, *args, **kwargs)
1568 options.isolate_server = options.isolate_server.rstrip('/')
1569 if not options.isolate_server:
1570 self.error('--isolate-server is required.')
1571 return options, args
1572
1573
1574def main(args):
1575 dispatcher = subcommand.CommandDispatcher(__name__)
1576 try:
1577 return dispatcher.execute(
1578 OptionParserIsolateServer(version=__version__), args)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001579 except (ConfigError, MappingError) as e:
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001580 sys.stderr.write('\nError: ')
1581 sys.stderr.write(str(e))
1582 sys.stderr.write('\n')
1583 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001584
1585
1586if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001587 fix_encoding.fix_encoding()
1588 tools.disable_buffering()
1589 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001590 sys.exit(main(sys.argv[1:]))