blob: c85d68eab62e00ce9f271c4090846df3c0be63f4 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002# Copyright 2013 The Chromium Authors. All rights reserved.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00003# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000020import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000022from third_party import colorama
23from third_party.depot_tools import fix_encoding
24from third_party.depot_tools import subcommand
25
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000026from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000027from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000028from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000029
30
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000031# Version of isolate protocol passed to the server in /handshake request.
32ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000034
35# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000036# All files are sorted by likelihood of a change in the file content
37# (currently file size is used to estimate this: larger the file -> larger the
38# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000040# and so on. Numbers here is a trade-off; the more per request, the lower the
41# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42# larger values cause longer lookups, increasing the initial latency to start
43# uploading, which is especially an issue for large files. This value is
44# optimized for the "few thousands files to look up with minimal number of large
45# files missing" case.
46ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000047
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000048
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000049# A list of already compressed extension types that should not receive any
50# compression before being uploaded.
51ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip'
54]
55
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000056
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000057# The file size to be used when we don't know the correct file size,
58# generally used for .isolated files.
59UNKNOWN_FILE_SIZE = None
60
61
62# The size of each chunk to read when downloading and unzipping files.
63ZIPPED_FILE_CHUNK = 16 * 1024
64
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000065# Chunk size to use when doing disk I/O.
66DISK_FILE_CHUNK = 1024 * 1024
67
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000068# Chunk size to use when reading from network stream.
69NET_IO_FILE_CHUNK = 16 * 1024
70
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000071
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000072# Read timeout in seconds for downloads from isolate storage. If there's no
73# response from the server within this timeout whole download will be aborted.
74DOWNLOAD_READ_TIMEOUT = 60
75
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000076# Maximum expected delay (in seconds) between successive file fetches
77# in run_tha_test. If it takes longer than that, a deadlock might be happening
78# and all stack frames for all threads are dumped to log.
79DEADLOCK_TIMEOUT = 5 * 60
80
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
maruel@chromium.org385d73d2013-09-19 18:33:21 +000088# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
89# specify the names here.
90SUPPORTED_ALGOS = {
91 'md5': hashlib.md5,
92 'sha-1': hashlib.sha1,
93 'sha-512': hashlib.sha512,
94}
95
96
97# Used for serialization.
98SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
99
100
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000101class ConfigError(ValueError):
102 """Generic failure to load a .isolated file."""
103 pass
104
105
106class MappingError(OSError):
107 """Failed to recreate the tree."""
108 pass
109
110
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000111def is_valid_hash(value, algo):
112 """Returns if the value is a valid hash for the corresponding algorithm."""
113 size = 2 * algo().digest_size
114 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
115
116
117def hash_file(filepath, algo):
118 """Calculates the hash of a file without reading it all in memory at once.
119
120 |algo| should be one of hashlib hashing algorithm.
121 """
122 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000123 with open(filepath, 'rb') as f:
124 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000126 if not chunk:
127 break
128 digest.update(chunk)
129 return digest.hexdigest()
130
131
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000132def stream_read(stream, chunk_size):
133 """Reads chunks from |stream| and yields them."""
134 while True:
135 data = stream.read(chunk_size)
136 if not data:
137 break
138 yield data
139
140
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000141def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
142 """Yields file content in chunks of given |chunk_size|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000143 with open(filepath, 'rb') as f:
144 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000145 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000146 if not data:
147 break
148 yield data
149
150
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151def file_write(filepath, content_generator):
152 """Writes file content as generated by content_generator.
153
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 Creates the intermediary directory as needed.
155
156 Returns the number of bytes written.
157
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000158 Meant to be mocked out in unit tests.
159 """
160 filedir = os.path.dirname(filepath)
161 if not os.path.isdir(filedir):
162 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000163 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000164 with open(filepath, 'wb') as f:
165 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000166 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000167 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000168 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000169
170
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000171def zip_compress(content_generator, level=7):
172 """Reads chunks from |content_generator| and yields zip compressed chunks."""
173 compressor = zlib.compressobj(level)
174 for chunk in content_generator:
175 compressed = compressor.compress(chunk)
176 if compressed:
177 yield compressed
178 tail = compressor.flush(zlib.Z_FINISH)
179 if tail:
180 yield tail
181
182
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000183def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
184 """Reads zipped data from |content_generator| and yields decompressed data.
185
186 Decompresses data in small chunks (no larger than |chunk_size|) so that
187 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
188
189 Raises IOError if data is corrupted or incomplete.
190 """
191 decompressor = zlib.decompressobj()
192 compressed_size = 0
193 try:
194 for chunk in content_generator:
195 compressed_size += len(chunk)
196 data = decompressor.decompress(chunk, chunk_size)
197 if data:
198 yield data
199 while decompressor.unconsumed_tail:
200 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
201 if data:
202 yield data
203 tail = decompressor.flush()
204 if tail:
205 yield tail
206 except zlib.error as e:
207 raise IOError(
208 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
209 # Ensure all data was read and decompressed.
210 if decompressor.unused_data or decompressor.unconsumed_tail:
211 raise IOError('Not all data was decompressed')
212
213
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000214def get_zip_compression_level(filename):
215 """Given a filename calculates the ideal zip compression level to use."""
216 file_ext = os.path.splitext(filename)[1].lower()
217 # TODO(csharp): Profile to find what compression level works best.
218 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
219
220
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000221def create_directories(base_directory, files):
222 """Creates the directory structure needed by the given list of files."""
223 logging.debug('create_directories(%s, %d)', base_directory, len(files))
224 # Creates the tree of directories to create.
225 directories = set(os.path.dirname(f) for f in files)
226 for item in list(directories):
227 while item:
228 directories.add(item)
229 item = os.path.dirname(item)
230 for d in sorted(directories):
231 if d:
232 os.mkdir(os.path.join(base_directory, d))
233
234
235def create_links(base_directory, files):
236 """Creates any links needed by the given set of files."""
237 for filepath, properties in files:
238 if 'l' not in properties:
239 continue
240 if sys.platform == 'win32':
241 # TODO(maruel): Create junctions or empty text files similar to what
242 # cygwin do?
243 logging.warning('Ignoring symlink %s', filepath)
244 continue
245 outfile = os.path.join(base_directory, filepath)
246 # symlink doesn't exist on Windows. So the 'link' property should
247 # never be specified for windows .isolated file.
248 os.symlink(properties['l'], outfile) # pylint: disable=E1101
249 if 'm' in properties:
250 lchmod = getattr(os, 'lchmod', None)
251 if lchmod:
252 lchmod(outfile, properties['m'])
253
254
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000255def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000256 """Determines if the given files appears valid.
257
258 Currently it just checks the file's size.
259 """
260 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000261 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000262 actual_size = os.stat(filepath).st_size
263 if size != actual_size:
264 logging.warning(
265 'Found invalid item %s; %d != %d',
266 os.path.basename(filepath), actual_size, size)
267 return False
268 return True
269
270
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000271class WorkerPool(threading_utils.AutoRetryThreadPool):
272 """Thread pool that automatically retries on IOError and runs a preconfigured
273 function.
274 """
275 # Initial and maximum number of worker threads.
276 INITIAL_WORKERS = 2
277 MAX_WORKERS = 16
278 RETRIES = 5
279
280 def __init__(self):
281 super(WorkerPool, self).__init__(
282 [IOError],
283 self.RETRIES,
284 self.INITIAL_WORKERS,
285 self.MAX_WORKERS,
286 0,
287 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000288
289
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000290class Item(object):
291 """An item to push to Storage.
292
293 It starts its life in a main thread, travels to 'contains' thread, then to
294 'push' thread and then finally back to the main thread.
295
296 It is never used concurrently from multiple threads.
297 """
298
299 def __init__(self, digest, size, is_isolated=False):
300 self.digest = digest
301 self.size = size
302 self.is_isolated = is_isolated
303 self.compression_level = 6
304 self.push_state = None
305
306 def content(self, chunk_size):
307 """Iterable with content of this item in chunks of given size."""
308 raise NotImplementedError()
309
310
311class FileItem(Item):
312 """A file to push to Storage."""
313
314 def __init__(self, path, digest, size, is_isolated):
315 super(FileItem, self).__init__(digest, size, is_isolated)
316 self.path = path
317 self.compression_level = get_zip_compression_level(path)
318
319 def content(self, chunk_size):
320 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000321
322
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000323class Storage(object):
324 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000325
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000326 def __init__(self, storage_api, use_zip):
327 self.use_zip = use_zip
328 self._storage_api = storage_api
329 self._cpu_thread_pool = None
330 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000331
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000332 @property
333 def cpu_thread_pool(self):
334 """ThreadPool for CPU-bound tasks like zipping."""
335 if self._cpu_thread_pool is None:
336 self._cpu_thread_pool = threading_utils.ThreadPool(
337 2, max(threading_utils.num_processors(), 2), 0, 'zip')
338 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000339
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000340 @property
341 def net_thread_pool(self):
342 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
343 if self._net_thread_pool is None:
344 self._net_thread_pool = WorkerPool()
345 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000346
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000347 def close(self):
348 """Waits for all pending tasks to finish."""
349 if self._cpu_thread_pool:
350 self._cpu_thread_pool.join()
351 self._cpu_thread_pool.close()
352 self._cpu_thread_pool = None
353 if self._net_thread_pool:
354 self._net_thread_pool.join()
355 self._net_thread_pool.close()
356 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000357
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000358 def __enter__(self):
359 """Context manager interface."""
360 return self
361
362 def __exit__(self, _exc_type, _exc_value, _traceback):
363 """Context manager interface."""
364 self.close()
365 return False
366
367 def upload_tree(self, indir, infiles):
368 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000369
370 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000371 indir: root directory the infiles are based in.
372 infiles: dict of files to upload from |indir|.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000373 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000374 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
375
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000376 # TODO(vadimsh): Introduce Item as a part of the public interface?
377
378 # Convert |indir| + |infiles| into a list of FileItem objects.
379 # Filter out symlinks, since they are not represented by items on isolate
380 # server side.
381 items = [
382 FileItem(
383 path=os.path.join(indir, filepath),
384 digest=metadata['h'],
385 size=metadata['s'],
386 is_isolated=metadata.get('priority') == '0')
387 for filepath, metadata in infiles.iteritems()
388 if 'l' not in metadata
389 ]
390
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000391 # For each digest keep only first Item that matches it. All other items
392 # are just indistinguishable copies from the point of view of isolate
393 # server (it doesn't care about paths at all, only content and digests).
394 seen = {}
395 duplicates = 0
396 for item in items:
397 if seen.setdefault(item.digest, item) is not item:
398 duplicates += 1
399 items = seen.values()
400 if duplicates:
401 logging.info('Skipped %d duplicated files', duplicates)
402
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000403 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000404 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000405 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000406 for missing_item in self.get_missing_items(items):
407 missing.add(missing_item)
408 self.async_push(
409 channel,
410 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
411 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412
413 # No need to spawn deadlock detector thread if there's nothing to upload.
414 if missing:
415 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
416 # Wait for all started uploads to finish.
417 uploaded = 0
418 while uploaded != len(missing):
419 detector.ping()
420 item = channel.pull()
421 uploaded += 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000422 logging.debug(
423 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000424 logging.info('All files are uploaded')
425
426 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000427 total = len(items)
428 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000429 logging.info(
430 'Total: %6d, %9.1fkb',
431 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000432 total_size / 1024.)
433 cache_hit = set(items) - missing
434 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000435 logging.info(
436 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
437 len(cache_hit),
438 cache_hit_size / 1024.,
439 len(cache_hit) * 100. / total,
440 cache_hit_size * 100. / total_size if total_size else 0)
441 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000442 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000443 logging.info(
444 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
445 len(cache_miss),
446 cache_miss_size / 1024.,
447 len(cache_miss) * 100. / total,
448 cache_miss_size * 100. / total_size if total_size else 0)
449
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000450 def async_push(self, channel, priority, item):
451 """Starts asynchronous push to the server in a parallel thread.
452
453 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000454 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000455 priority: thread pool task priority for the push.
456 item: item to upload as instance of Item class.
457 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000458 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000460 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 return item
462
463 # If zipping is not required, just start a push task.
464 if not self.use_zip:
465 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000466 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000467 return
468
469 # If zipping is enabled, zip in a separate thread.
470 def zip_and_push():
471 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
472 # content right here. It will block until all file is zipped.
473 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000474 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
475 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000476 data = ''.join(stream)
477 except Exception as exc:
478 logging.error('Failed to zip \'%s\': %s', item, exc)
479 channel.send_exception(exc)
480 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000481 self.net_thread_pool.add_task_with_channel(
482 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000483 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000484
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000485 def async_fetch(self, channel, priority, digest, size, sink):
486 """Starts asynchronous fetch from the server in a parallel thread.
487
488 Arguments:
489 channel: TaskChannel that receives back |digest| when download ends.
490 priority: thread pool task priority for the fetch.
491 digest: hex digest of an item to download.
492 size: expected size of the item (after decompression).
493 sink: function that will be called as sink(generator).
494 """
495 def fetch():
496 try:
497 # Prepare reading pipeline.
498 stream = self._storage_api.fetch(digest)
499 if self.use_zip:
500 stream = zip_decompress(stream, DISK_FILE_CHUNK)
501 # Run |stream| through verifier that will assert its size.
502 verifier = FetchStreamVerifier(stream, size)
503 # Verified stream goes to |sink|.
504 sink(verifier.run())
505 except Exception as err:
506 logging.warning('Failed to fetch %s: %s', digest, err)
507 raise
508 return digest
509
510 # Don't bother with zip_thread_pool for decompression. Decompression is
511 # really fast and most probably IO bound anyway.
512 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
513
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000514 def get_missing_items(self, items):
515 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000516
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000517 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000518
519 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000520 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000521
522 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000523 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000524 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000525 channel = threading_utils.TaskChannel()
526 pending = 0
527 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000529 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
530 self._storage_api.contains, batch)
531 pending += 1
532 # Yield results as they come in.
533 for _ in xrange(pending):
534 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000535 yield missing
536
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000537 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000538 def batch_items_for_check(items):
539 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000540
541 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000542 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000543
544 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000545 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000546
547 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000548 Batches of items to query for existence in a single operation,
549 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000550 """
551 batch_count = 0
552 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
553 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000554 for item in sorted(items, key=lambda x: x.size, reverse=True):
555 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000556 if len(next_queries) == batch_size_limit:
557 yield next_queries
558 next_queries = []
559 batch_count += 1
560 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
561 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
562 if next_queries:
563 yield next_queries
564
565
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000566class FetchQueue(object):
567 """Fetches items from Storage and places them into LocalCache.
568
569 It manages multiple concurrent fetch operations. Acts as a bridge between
570 Storage and LocalCache so that Storage and LocalCache don't depend on each
571 other at all.
572 """
573
574 def __init__(self, storage, cache):
575 self.storage = storage
576 self.cache = cache
577 self._channel = threading_utils.TaskChannel()
578 self._pending = set()
579 self._accessed = set()
580 self._fetched = cache.cached_set()
581
582 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
583 """Starts asynchronous fetch of item |digest|."""
584 # Fetching it now?
585 if digest in self._pending:
586 return
587
588 # Mark this file as in use, verify_all_cached will later ensure it is still
589 # in cache.
590 self._accessed.add(digest)
591
592 # Already fetched? Notify cache to update item's LRU position.
593 if digest in self._fetched:
594 # 'touch' returns True if item is in cache and not corrupted.
595 if self.cache.touch(digest, size):
596 return
597 # Item is corrupted, remove it from cache and fetch it again.
598 self._fetched.remove(digest)
599 self.cache.evict(digest)
600
601 # TODO(maruel): It should look at the free disk space, the current cache
602 # size and the size of the new item on every new item:
603 # - Trim the cache as more entries are listed when free disk space is low,
604 # otherwise if the amount of data downloaded during the run > free disk
605 # space, it'll crash.
606 # - Make sure there's enough free disk space to fit all dependencies of
607 # this run! If not, abort early.
608
609 # Start fetching.
610 self._pending.add(digest)
611 self.storage.async_fetch(
612 self._channel, priority, digest, size,
613 functools.partial(self.cache.write, digest))
614
615 def wait(self, digests):
616 """Starts a loop that waits for at least one of |digests| to be retrieved.
617
618 Returns the first digest retrieved.
619 """
620 # Flush any already fetched items.
621 for digest in digests:
622 if digest in self._fetched:
623 return digest
624
625 # Ensure all requested items are being fetched now.
626 assert all(digest in self._pending for digest in digests), (
627 digests, self._pending)
628
629 # Wait for some requested item to finish fetching.
630 while self._pending:
631 digest = self._channel.pull()
632 self._pending.remove(digest)
633 self._fetched.add(digest)
634 if digest in digests:
635 return digest
636
637 # Should never reach this point due to assert above.
638 raise RuntimeError('Impossible state')
639
640 def inject_local_file(self, path, algo):
641 """Adds local file to the cache as if it was fetched from storage."""
642 with open(path, 'rb') as f:
643 data = f.read()
644 digest = algo(data).hexdigest()
645 self.cache.write(digest, [data])
646 self._fetched.add(digest)
647 return digest
648
649 @property
650 def pending_count(self):
651 """Returns number of items to be fetched."""
652 return len(self._pending)
653
654 def verify_all_cached(self):
655 """True if all accessed items are in cache."""
656 return self._accessed.issubset(self.cache.cached_set())
657
658
659class FetchStreamVerifier(object):
660 """Verifies that fetched file is valid before passing it to the LocalCache."""
661
662 def __init__(self, stream, expected_size):
663 self.stream = stream
664 self.expected_size = expected_size
665 self.current_size = 0
666
667 def run(self):
668 """Generator that yields same items as |stream|.
669
670 Verifies |stream| is complete before yielding a last chunk to consumer.
671
672 Also wraps IOError produced by consumer into MappingError exceptions since
673 otherwise Storage will retry fetch on unrelated local cache errors.
674 """
675 # Read one chunk ahead, keep it in |stored|.
676 # That way a complete stream can be verified before pushing last chunk
677 # to consumer.
678 stored = None
679 for chunk in self.stream:
680 assert chunk is not None
681 if stored is not None:
682 self._inspect_chunk(stored, is_last=False)
683 try:
684 yield stored
685 except IOError as exc:
686 raise MappingError('Failed to store an item in cache: %s' % exc)
687 stored = chunk
688 if stored is not None:
689 self._inspect_chunk(stored, is_last=True)
690 try:
691 yield stored
692 except IOError as exc:
693 raise MappingError('Failed to store an item in cache: %s' % exc)
694
695 def _inspect_chunk(self, chunk, is_last):
696 """Called for each fetched chunk before passing it to consumer."""
697 self.current_size += len(chunk)
698 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
699 (self.expected_size != self.current_size)):
700 raise IOError('Incorrect file size: expected %d, got %d' % (
701 self.expected_size, self.current_size))
702
703
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000704class StorageApi(object):
705 """Interface for classes that implement low-level storage operations."""
706
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000707 def fetch(self, digest):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000708 """Fetches an object and yields its content.
709
710 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000711 digest: hash digest of item to download.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000712
713 Yields:
714 Chunks of downloaded item (as str objects).
715 """
716 raise NotImplementedError()
717
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000718 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000719 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000720
721 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000722 item: Item object that holds information about an item being pushed.
723 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000724
725 Returns:
726 None.
727 """
728 raise NotImplementedError()
729
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000730 def contains(self, items):
731 """Checks for existence of given |items| on the server.
732
733 Mutates |items| by assigning opaque implement specific object to Item's
734 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000735
736 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000737 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000738
739 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000740 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000741 """
742 raise NotImplementedError()
743
744
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000745class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000746 """StorageApi implementation that downloads and uploads to Isolate Server.
747
748 It uploads and downloads directly from Google Storage whenever appropriate.
749 """
750
751 class _PushState(object):
752 """State needed to call .push(), to be stored in Item.push_state."""
753 def __init__(self, upload_url, finalize_url):
754 self.upload_url = upload_url
755 self.finalize_url = finalize_url
756 self.uploaded = False
757 self.finalized = False
758
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000759 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000760 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000761 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000762 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000763 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000764 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000765 self._server_caps = None
766
767 @staticmethod
768 def _generate_handshake_request():
769 """Returns a dict to be sent as handshake request body."""
770 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
771 return {
772 'client_app_version': __version__,
773 'fetcher': True,
774 'protocol_version': ISOLATE_PROTOCOL_VERSION,
775 'pusher': True,
776 }
777
778 @staticmethod
779 def _validate_handshake_response(caps):
780 """Validates and normalizes handshake response."""
781 logging.info('Protocol version: %s', caps['protocol_version'])
782 logging.info('Server version: %s', caps['server_app_version'])
783 if caps.get('error'):
784 raise MappingError(caps['error'])
785 if not caps['access_token']:
786 raise ValueError('access_token is missing')
787 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000788
789 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000790 def _server_capabilities(self):
791 """Performs handshake with the server if not yet done.
792
793 Returns:
794 Server capabilities dictionary as returned by /handshake endpoint.
795
796 Raises:
797 MappingError if server rejects the handshake.
798 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000799 # TODO(maruel): Make this request much earlier asynchronously while the
800 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000801 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000802 if self._server_caps is None:
803 request_body = json.dumps(
804 self._generate_handshake_request(), separators=(',', ':'))
805 response = net.url_read(
806 url=self.base_url + '/content-gs/handshake',
807 data=request_body,
808 content_type='application/json',
809 method='POST')
810 if response is None:
811 raise MappingError('Failed to perform handshake.')
812 try:
813 caps = json.loads(response)
814 if not isinstance(caps, dict):
815 raise ValueError('Expecting JSON dict')
816 self._server_caps = self._validate_handshake_response(caps)
817 except (ValueError, KeyError, TypeError) as exc:
818 # KeyError exception has very confusing str conversion: it's just a
819 # missing key value and nothing else. So print exception class name
820 # as well.
821 raise MappingError('Invalid handshake response (%s): %s' % (
822 exc.__class__.__name__, exc))
823 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000824
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000825 def fetch(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000826 assert isinstance(digest, basestring)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000827
828 source_url = '%s/content-gs/retrieve/%s/%s' % (
829 self.base_url, self.namespace, digest)
830 logging.debug('download_file(%s)', source_url)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000831
832 # Because the app engine DB is only eventually consistent, retry 404 errors
833 # because the file might just not be visible yet (even though it has been
834 # uploaded).
835 connection = net.url_open(
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000836 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000837 if not connection:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000838 raise IOError('Unable to open connection to %s' % source_url)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000839 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000840
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000841 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000842 assert isinstance(item, Item)
843 assert isinstance(item.push_state, IsolateServer._PushState)
844 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000845
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000846 # TODO(vadimsh): Do not read from |content| generator when retrying push.
847 # If |content| is indeed a generator, it can not be re-winded back
848 # to the beginning of the stream. A retry will find it exhausted. A possible
849 # solution is to wrap |content| generator with some sort of caching
850 # restartable generator. It should be done alongside streaming support
851 # implementation.
852
853 # This push operation may be a retry after failed finalization call below,
854 # no need to reupload contents in that case.
855 if not item.push_state.uploaded:
856 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
857 # upload support is implemented.
858 if isinstance(content, list) and len(content) == 1:
859 content = content[0]
860 else:
861 content = ''.join(content)
862 # PUT file to |upload_url|.
863 response = net.url_read(
864 url=item.push_state.upload_url,
865 data=content,
866 content_type='application/octet-stream',
867 method='PUT')
868 if response is None:
869 raise IOError('Failed to upload a file %s to %s' % (
870 item.digest, item.push_state.upload_url))
871 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000872 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000873 logging.info(
874 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000875
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000876 # Optionally notify the server that it's done.
877 if item.push_state.finalize_url:
878 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
879 # send it to isolated server. That way isolate server can verify that
880 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
881 # stored files).
882 response = net.url_read(
883 url=item.push_state.finalize_url,
884 data='',
885 content_type='application/json',
886 method='POST')
887 if response is None:
888 raise IOError('Failed to finalize an upload of %s' % item.digest)
889 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000890
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000891 def contains(self, items):
892 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000893
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000894 # Request body is a json encoded list of dicts.
895 body = [
896 {
897 'h': item.digest,
898 's': item.size,
899 'i': int(item.is_isolated),
900 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000901 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000902
903 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
904 self.base_url,
905 self.namespace,
906 urllib.quote(self._server_capabilities['access_token']))
907 response_body = net.url_read(
908 url=query_url,
909 data=json.dumps(body, separators=(',', ':')),
910 content_type='application/json',
911 method='POST')
912 if response_body is None:
913 raise MappingError('Failed to execute /pre-upload query')
914
915 # Response body is a list of push_urls (or null if file is already present).
916 try:
917 response = json.loads(response_body)
918 if not isinstance(response, list):
919 raise ValueError('Expecting response with json-encoded list')
920 if len(response) != len(items):
921 raise ValueError(
922 'Incorrect number of items in the list, expected %d, '
923 'but got %d' % (len(items), len(response)))
924 except ValueError as err:
925 raise MappingError(
926 'Invalid response from server: %s, body is %s' % (err, response_body))
927
928 # Pick Items that are missing, attach _PushState to them.
929 missing_items = []
930 for i, push_urls in enumerate(response):
931 if push_urls:
932 assert len(push_urls) == 2, str(push_urls)
933 item = items[i]
934 assert item.push_state is None
935 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
936 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000937 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000938 len(items), len(items) - len(missing_items))
939 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000940
941
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000942class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000943 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000944
945 The common use case is a NFS/CIFS file server that is mounted locally that is
946 used to fetch the file on a local partition.
947 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000948
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000949 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000950 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000951 self.base_path = base_path
952
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000953 def fetch(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000954 assert isinstance(digest, basestring)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000955 return file_read(os.path.join(self.base_path, digest))
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000956
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000957 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000958 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000959 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000960
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000961 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000962 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000963 item for item in items
964 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000965 ]
966
967
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000968class LocalCache(object):
969 """Local cache that stores objects fetched via Storage.
970
971 It can be accessed concurrently from multiple threads, so it should protect
972 its internal state with some lock.
973 """
974
975 def __enter__(self):
976 """Context manager interface."""
977 return self
978
979 def __exit__(self, _exc_type, _exec_value, _traceback):
980 """Context manager interface."""
981 return False
982
983 def cached_set(self):
984 """Returns a set of all cached digests (always a new object)."""
985 raise NotImplementedError()
986
987 def touch(self, digest, size):
988 """Ensures item is not corrupted and updates its LRU position.
989
990 Arguments:
991 digest: hash digest of item to check.
992 size: expected size of this item.
993
994 Returns:
995 True if item is in cache and not corrupted.
996 """
997 raise NotImplementedError()
998
999 def evict(self, digest):
1000 """Removes item from cache if it's there."""
1001 raise NotImplementedError()
1002
1003 def read(self, digest):
1004 """Returns contents of the cached item as a single str."""
1005 raise NotImplementedError()
1006
1007 def write(self, digest, content):
1008 """Reads data from |content| generator and stores it in cache."""
1009 raise NotImplementedError()
1010
1011 def link(self, digest, dest, file_mode=None):
1012 """Ensures file at |dest| has same content as cached |digest|."""
1013 raise NotImplementedError()
1014
1015
1016class MemoryCache(LocalCache):
1017 """LocalCache implementation that stores everything in memory."""
1018
1019 def __init__(self):
1020 super(MemoryCache, self).__init__()
1021 # Let's not assume dict is thread safe.
1022 self._lock = threading.Lock()
1023 self._contents = {}
1024
1025 def cached_set(self):
1026 with self._lock:
1027 return set(self._contents)
1028
1029 def touch(self, digest, size):
1030 with self._lock:
1031 return digest in self._contents
1032
1033 def evict(self, digest):
1034 with self._lock:
1035 self._contents.pop(digest, None)
1036
1037 def read(self, digest):
1038 with self._lock:
1039 return self._contents[digest]
1040
1041 def write(self, digest, content):
1042 # Assemble whole stream before taking the lock.
1043 data = ''.join(content)
1044 with self._lock:
1045 self._contents[digest] = data
1046
1047 def link(self, digest, dest, file_mode=None):
1048 file_write(dest, [self.read(digest)])
1049 if file_mode is not None:
1050 os.chmod(dest, file_mode)
1051
1052
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001053def get_hash_algo(_namespace):
1054 """Return hash algorithm class to use when uploading to given |namespace|."""
1055 # TODO(vadimsh): Implement this at some point.
1056 return hashlib.sha1
1057
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001058
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001059def is_namespace_with_compression(namespace):
1060 """Returns True if given |namespace| stores compressed objects."""
1061 return namespace.endswith(('-gzip', '-deflate'))
1062
1063
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001064def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001065 """Returns an object that implements StorageApi interface."""
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001066 if re.match(r'^https?://.+$', file_or_url):
1067 return IsolateServer(file_or_url, namespace)
1068 else:
1069 return FileSystem(file_or_url)
1070
1071
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001072def get_storage(file_or_url, namespace):
1073 """Returns Storage class configured with appropriate StorageApi instance."""
1074 return Storage(
1075 get_storage_api(file_or_url, namespace),
1076 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001077
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001078
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001079def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001080 """Uploads the given tree to the given url.
1081
1082 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001083 base_url: The base url, it is assume that |base_url|/has/ can be used to
1084 query if an element was already uploaded, and |base_url|/store/
1085 can be used to upload a new element.
1086 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001087 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001088 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001089 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001090 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001091 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001092 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001093
1094
maruel@chromium.org41601642013-09-18 19:40:46 +00001095def load_isolated(content, os_flavor, algo):
1096 """Verifies the .isolated file is valid and loads this object with the json
1097 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001098
1099 Arguments:
1100 - content: raw serialized content to load.
1101 - os_flavor: OS to load this file on. Optional.
1102 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1103 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001104 """
1105 try:
1106 data = json.loads(content)
1107 except ValueError:
1108 raise ConfigError('Failed to parse: %s...' % content[:100])
1109
1110 if not isinstance(data, dict):
1111 raise ConfigError('Expected dict, got %r' % data)
1112
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001113 # Check 'version' first, since it could modify the parsing after.
1114 value = data.get('version', '1.0')
1115 if not isinstance(value, basestring):
1116 raise ConfigError('Expected string, got %r' % value)
1117 if not re.match(r'^(\d+)\.(\d+)$', value):
1118 raise ConfigError('Expected a compatible version, got %r' % value)
1119 if value.split('.', 1)[0] != '1':
1120 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1121
1122 if algo is None:
1123 # Default the algorithm used in the .isolated file itself, falls back to
1124 # 'sha-1' if unspecified.
1125 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1126
maruel@chromium.org41601642013-09-18 19:40:46 +00001127 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001128 if key == 'algo':
1129 if not isinstance(value, basestring):
1130 raise ConfigError('Expected string, got %r' % value)
1131 if value not in SUPPORTED_ALGOS:
1132 raise ConfigError(
1133 'Expected one of \'%s\', got %r' %
1134 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1135 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1136 raise ConfigError(
1137 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1138
1139 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001140 if not isinstance(value, list):
1141 raise ConfigError('Expected list, got %r' % value)
1142 if not value:
1143 raise ConfigError('Expected non-empty command')
1144 for subvalue in value:
1145 if not isinstance(subvalue, basestring):
1146 raise ConfigError('Expected string, got %r' % subvalue)
1147
1148 elif key == 'files':
1149 if not isinstance(value, dict):
1150 raise ConfigError('Expected dict, got %r' % value)
1151 for subkey, subvalue in value.iteritems():
1152 if not isinstance(subkey, basestring):
1153 raise ConfigError('Expected string, got %r' % subkey)
1154 if not isinstance(subvalue, dict):
1155 raise ConfigError('Expected dict, got %r' % subvalue)
1156 for subsubkey, subsubvalue in subvalue.iteritems():
1157 if subsubkey == 'l':
1158 if not isinstance(subsubvalue, basestring):
1159 raise ConfigError('Expected string, got %r' % subsubvalue)
1160 elif subsubkey == 'm':
1161 if not isinstance(subsubvalue, int):
1162 raise ConfigError('Expected int, got %r' % subsubvalue)
1163 elif subsubkey == 'h':
1164 if not is_valid_hash(subsubvalue, algo):
1165 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1166 elif subsubkey == 's':
1167 if not isinstance(subsubvalue, int):
1168 raise ConfigError('Expected int, got %r' % subsubvalue)
1169 else:
1170 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001171 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001172 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001173 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1174 subvalue)
1175 if bool('h' in subvalue) != bool('s' in subvalue):
1176 raise ConfigError(
1177 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1178 subvalue)
1179 if bool('s' in subvalue) == bool('l' in subvalue):
1180 raise ConfigError(
1181 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1182 subvalue)
1183 if bool('l' in subvalue) and bool('m' in subvalue):
1184 raise ConfigError(
1185 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001186 subvalue)
1187
1188 elif key == 'includes':
1189 if not isinstance(value, list):
1190 raise ConfigError('Expected list, got %r' % value)
1191 if not value:
1192 raise ConfigError('Expected non-empty includes list')
1193 for subvalue in value:
1194 if not is_valid_hash(subvalue, algo):
1195 raise ConfigError('Expected sha-1, got %r' % subvalue)
1196
1197 elif key == 'read_only':
1198 if not isinstance(value, bool):
1199 raise ConfigError('Expected bool, got %r' % value)
1200
1201 elif key == 'relative_cwd':
1202 if not isinstance(value, basestring):
1203 raise ConfigError('Expected string, got %r' % value)
1204
1205 elif key == 'os':
1206 if os_flavor and value != os_flavor:
1207 raise ConfigError(
1208 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1209 (os_flavor, value))
1210
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001211 elif key == 'version':
1212 # Already checked above.
1213 pass
1214
maruel@chromium.org41601642013-09-18 19:40:46 +00001215 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001216 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001217
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001218 # Automatically fix os.path.sep if necessary. While .isolated files are always
1219 # in the the native path format, someone could want to download an .isolated
1220 # tree from another OS.
1221 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1222 if 'files' in data:
1223 data['files'] = dict(
1224 (k.replace(wrong_path_sep, os.path.sep), v)
1225 for k, v in data['files'].iteritems())
1226 for v in data['files'].itervalues():
1227 if 'l' in v:
1228 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1229 if 'relative_cwd' in data:
1230 data['relative_cwd'] = data['relative_cwd'].replace(
1231 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001232 return data
1233
1234
1235class IsolatedFile(object):
1236 """Represents a single parsed .isolated file."""
1237 def __init__(self, obj_hash, algo):
1238 """|obj_hash| is really the sha-1 of the file."""
1239 logging.debug('IsolatedFile(%s)' % obj_hash)
1240 self.obj_hash = obj_hash
1241 self.algo = algo
1242 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1243 # .isolate and all the .isolated files recursively included by it with
1244 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1245 # .isolated file in the hash table, is important, as the later ones are not
1246 # processed until the firsts are retrieved and read.
1247 self.can_fetch = False
1248
1249 # Raw data.
1250 self.data = {}
1251 # A IsolatedFile instance, one per object in self.includes.
1252 self.children = []
1253
1254 # Set once the .isolated file is loaded.
1255 self._is_parsed = False
1256 # Set once the files are fetched.
1257 self.files_fetched = False
1258
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001259 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001260 """Verifies the .isolated file is valid and loads this object with the json
1261 data.
1262 """
1263 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1264 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001265 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001266 self.children = [
1267 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1268 ]
1269 self._is_parsed = True
1270
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001271 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001272 """Adds files in this .isolated file not present in |files| dictionary.
1273
1274 Preemptively request files.
1275
1276 Note that |files| is modified by this function.
1277 """
1278 assert self.can_fetch
1279 if not self._is_parsed or self.files_fetched:
1280 return
1281 logging.debug('fetch_files(%s)' % self.obj_hash)
1282 for filepath, properties in self.data.get('files', {}).iteritems():
1283 # Root isolated has priority on the files being mapped. In particular,
1284 # overriden files must not be fetched.
1285 if filepath not in files:
1286 files[filepath] = properties
1287 if 'h' in properties:
1288 # Preemptively request files.
1289 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001290 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001291 self.files_fetched = True
1292
1293
1294class Settings(object):
1295 """Results of a completely parsed .isolated file."""
1296 def __init__(self):
1297 self.command = []
1298 self.files = {}
1299 self.read_only = None
1300 self.relative_cwd = None
1301 # The main .isolated file, a IsolatedFile instance.
1302 self.root = None
1303
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001304 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001305 """Loads the .isolated and all the included .isolated asynchronously.
1306
1307 It enables support for "included" .isolated files. They are processed in
1308 strict order but fetched asynchronously from the cache. This is important so
1309 that a file in an included .isolated file that is overridden by an embedding
1310 .isolated file is not fetched needlessly. The includes are fetched in one
1311 pass and the files are fetched as soon as all the ones on the left-side
1312 of the tree were fetched.
1313
1314 The prioritization is very important here for nested .isolated files.
1315 'includes' have the highest priority and the algorithm is optimized for both
1316 deep and wide trees. A deep one is a long link of .isolated files referenced
1317 one at a time by one item in 'includes'. A wide one has a large number of
1318 'includes' in a single .isolated file. 'left' is defined as an included
1319 .isolated file earlier in the 'includes' list. So the order of the elements
1320 in 'includes' is important.
1321 """
1322 self.root = IsolatedFile(root_isolated_hash, algo)
1323
1324 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1325 pending = {}
1326 # Set of hashes of already retrieved items to refuse recursive includes.
1327 seen = set()
1328
1329 def retrieve(isolated_file):
1330 h = isolated_file.obj_hash
1331 if h in seen:
1332 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1333 assert h not in pending
1334 seen.add(h)
1335 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001336 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001337
1338 retrieve(self.root)
1339
1340 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001341 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001342 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001343 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001344 if item_hash == root_isolated_hash:
1345 # It's the root item.
1346 item.can_fetch = True
1347
1348 for new_child in item.children:
1349 retrieve(new_child)
1350
1351 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001352 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001353
1354 def check(n):
1355 return all(check(x) for x in n.children) and n.files_fetched
1356 assert check(self.root)
1357
1358 self.relative_cwd = self.relative_cwd or ''
1359 self.read_only = self.read_only or False
1360
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001361 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001362 if node.can_fetch:
1363 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001364 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001365 will_break = False
1366 for i in node.children:
1367 if not i.can_fetch:
1368 if will_break:
1369 break
1370 # Automatically mark the first one as fetcheable.
1371 i.can_fetch = True
1372 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001373 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001374
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001375 def _update_self(self, fetch_queue, node):
1376 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001377 # Grabs properties.
1378 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001379 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001380 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001381 if self.command:
1382 self.command[0] = self.command[0].replace('/', os.path.sep)
1383 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001384 if self.read_only is None and node.data.get('read_only') is not None:
1385 self.read_only = node.data['read_only']
1386 if (self.relative_cwd is None and
1387 node.data.get('relative_cwd') is not None):
1388 self.relative_cwd = node.data['relative_cwd']
1389
1390
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001391def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001392 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001393 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001394
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001395 Arguments:
1396 isolated_hash: hash of the root *.isolated file.
1397 storage: Storage class that communicates with isolate storage.
1398 cache: LocalCache class that knows how to store and map files locally.
1399 algo: hash algorithm to use.
1400 outdir: Output directory to map file tree to.
1401 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1402 require_command: Ensure *.isolated specifies a command to run.
1403
1404 Returns:
1405 Settings object that holds details about loaded *.isolated file.
1406 """
1407 with cache:
1408 fetch_queue = FetchQueue(storage, cache)
1409 settings = Settings()
1410
1411 with tools.Profiler('GetIsolateds'):
1412 # Optionally support local files by manually adding them to cache.
1413 if not is_valid_hash(isolated_hash, algo):
1414 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1415
1416 # Load all *.isolated and start loading rest of the files.
1417 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001418 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001419 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1420 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001421 raise ConfigError('No command to run')
1422
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001423 with tools.Profiler('GetRest'):
1424 # Create file system hierarchy.
1425 if not os.path.isdir(outdir):
1426 os.makedirs(outdir)
1427 create_directories(outdir, settings.files)
1428 create_links(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001429
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001430 # Ensure working directory exists.
1431 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1432 if not os.path.isdir(cwd):
1433 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001434
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001435 # Multimap: digest -> list of pairs (path, props).
1436 remaining = {}
1437 for filepath, props in settings.files.iteritems():
1438 if 'h' in props:
1439 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001440
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001441 # Now block on the remaining files to be downloaded and mapped.
1442 logging.info('Retrieving remaining files (%d of them)...',
1443 fetch_queue.pending_count)
1444 last_update = time.time()
1445 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1446 while remaining:
1447 detector.ping()
1448
1449 # Wait for any item to finish fetching to cache.
1450 digest = fetch_queue.wait(remaining)
1451
1452 # Link corresponding files to a fetched item in cache.
1453 for filepath, props in remaining.pop(digest):
1454 cache.link(digest, os.path.join(outdir, filepath), props.get('m'))
1455
1456 # Report progress.
1457 duration = time.time() - last_update
1458 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1459 msg = '%d files remaining...' % len(remaining)
1460 print msg
1461 logging.info(msg)
1462 last_update = time.time()
1463
1464 # Cache could evict some items we just tried to fetch, it's a fatal error.
1465 if not fetch_queue.verify_all_cached():
1466 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001467 return settings
1468
1469
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001470@subcommand.usage('<file1..fileN> or - to read from stdin')
1471def CMDarchive(parser, args):
1472 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001473 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001474
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001475 if files == ['-']:
1476 files = sys.stdin.readlines()
1477
1478 if not files:
1479 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001480
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001481 # Load the necessary metadata.
1482 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001483 infiles = dict(
1484 (
1485 f,
1486 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001487 's': os.stat(f).st_size,
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001488 'h': hash_file(f, get_hash_algo(options.namespace)),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001489 }
1490 )
1491 for f in files)
1492
vadimsh@chromium.orga4326472013-08-24 02:05:41 +00001493 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001494 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001495 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001496 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001497 infiles=infiles,
1498 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001499 if not ret:
1500 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1501 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001502
1503
1504def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001505 """Download data from the server.
1506
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001507 It can either download individual files or a complete tree from a .isolated
1508 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001509 """
1510 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001511 '-i', '--isolated', metavar='HASH',
1512 help='hash of an isolated file, .isolated file content is discarded, use '
1513 '--file if you need it')
1514 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001515 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1516 help='hash and destination of a file, can be used multiple times')
1517 parser.add_option(
1518 '-t', '--target', metavar='DIR', default=os.getcwd(),
1519 help='destination directory')
1520 options, args = parser.parse_args(args)
1521 if args:
1522 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001523 if bool(options.isolated) == bool(options.file):
1524 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001525
1526 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001527 storage = get_storage(options.isolate_server, options.namespace)
1528 cache = MemoryCache()
1529 algo = get_hash_algo(options.namespace)
1530
1531 # Fetching individual files.
1532 if options.file:
1533 channel = threading_utils.TaskChannel()
1534 pending = {}
1535 for digest, dest in options.file:
1536 pending[digest] = dest
1537 storage.async_fetch(
1538 channel,
1539 WorkerPool.MED,
1540 digest,
1541 UNKNOWN_FILE_SIZE,
1542 functools.partial(file_write, os.path.join(options.target, dest)))
1543 while pending:
1544 fetched = channel.pull()
1545 dest = pending.pop(fetched)
1546 logging.info('%s: %s', fetched, dest)
1547
1548 # Fetching whole isolated tree.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001549 if options.isolated:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001550 settings = fetch_isolated(
1551 isolated_hash=options.isolated,
1552 storage=storage,
1553 cache=cache,
1554 algo=algo,
1555 outdir=options.target,
1556 os_flavor=None,
1557 require_command=False)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001558 rel = os.path.join(options.target, settings.relative_cwd)
1559 print('To run this test please run from the directory %s:' %
1560 os.path.join(options.target, rel))
1561 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001562
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001563 return 0
1564
1565
1566class OptionParserIsolateServer(tools.OptionParserWithLogging):
1567 def __init__(self, **kwargs):
1568 tools.OptionParserWithLogging.__init__(self, **kwargs)
1569 self.add_option(
1570 '-I', '--isolate-server',
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00001571 metavar='URL', default='',
1572 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001573 self.add_option(
1574 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001575 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001576
1577 def parse_args(self, *args, **kwargs):
1578 options, args = tools.OptionParserWithLogging.parse_args(
1579 self, *args, **kwargs)
1580 options.isolate_server = options.isolate_server.rstrip('/')
1581 if not options.isolate_server:
1582 self.error('--isolate-server is required.')
1583 return options, args
1584
1585
1586def main(args):
1587 dispatcher = subcommand.CommandDispatcher(__name__)
1588 try:
1589 return dispatcher.execute(
1590 OptionParserIsolateServer(version=__version__), args)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001591 except (ConfigError, MappingError) as e:
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001592 sys.stderr.write('\nError: ')
1593 sys.stderr.write(str(e))
1594 sys.stderr.write('\n')
1595 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001596
1597
1598if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001599 fix_encoding.fix_encoding()
1600 tools.disable_buffering()
1601 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001602 sys.exit(main(sys.argv[1:]))