blob: ec4a5255e53338aaedc3bf25fb7eac11e96775b4 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05006"""Archives a set of files or directories to a server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05008__version__ = '0.3'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import shutil
17import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050019import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000030from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000031from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000032from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080034import auth
35
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000037# Version of isolate protocol passed to the server in /handshake request.
38ISOLATE_PROTOCOL_VERSION = '1.0'
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -050039# Version stored and expected in .isolated files.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -050040ISOLATED_FILE_VERSION = '1.3'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000042
43# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000044# All files are sorted by likelihood of a change in the file content
45# (currently file size is used to estimate this: larger the file -> larger the
46# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000047# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000048# and so on. Numbers here is a trade-off; the more per request, the lower the
49# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
50# larger values cause longer lookups, increasing the initial latency to start
51# uploading, which is especially an issue for large files. This value is
52# optimized for the "few thousands files to look up with minimal number of large
53# files missing" case.
54ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000055
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000056
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000057# A list of already compressed extension types that should not receive any
58# compression before being uploaded.
59ALREADY_COMPRESSED_TYPES = [
60 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
61 'wav', 'zip'
62]
63
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000064
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000065# The file size to be used when we don't know the correct file size,
66# generally used for .isolated files.
67UNKNOWN_FILE_SIZE = None
68
69
70# The size of each chunk to read when downloading and unzipping files.
71ZIPPED_FILE_CHUNK = 16 * 1024
72
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000073# Chunk size to use when doing disk I/O.
74DISK_FILE_CHUNK = 1024 * 1024
75
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000076# Chunk size to use when reading from network stream.
77NET_IO_FILE_CHUNK = 16 * 1024
78
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000079
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000080# Read timeout in seconds for downloads from isolate storage. If there's no
81# response from the server within this timeout whole download will be aborted.
82DOWNLOAD_READ_TIMEOUT = 60
83
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000084# Maximum expected delay (in seconds) between successive file fetches
85# in run_tha_test. If it takes longer than that, a deadlock might be happening
86# and all stack frames for all threads are dumped to log.
87DEADLOCK_TIMEOUT = 5 * 60
88
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000089
maruel@chromium.org41601642013-09-18 19:40:46 +000090# The delay (in seconds) to wait between logging statements when retrieving
91# the required files. This is intended to let the user (or buildbot) know that
92# the program is still running.
93DELAY_BETWEEN_UPDATES_IN_SECS = 30
94
95
maruel@chromium.org385d73d2013-09-19 18:33:21 +000096# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
97# specify the names here.
98SUPPORTED_ALGOS = {
99 'md5': hashlib.md5,
100 'sha-1': hashlib.sha1,
101 'sha-512': hashlib.sha512,
102}
103
104
105# Used for serialization.
106SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
107
108
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500109DEFAULT_BLACKLIST = (
110 # Temporary vim or python files.
111 r'^.+\.(?:pyc|swp)$',
112 # .git or .svn directory.
113 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
114)
115
116
117# Chromium-specific.
118DEFAULT_BLACKLIST += (
119 r'^.+\.(?:run_test_cases)$',
120 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
121)
122
123
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500124class Error(Exception):
125 """Generic runtime error."""
126 pass
127
128
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000129class ConfigError(ValueError):
130 """Generic failure to load a .isolated file."""
131 pass
132
133
134class MappingError(OSError):
135 """Failed to recreate the tree."""
136 pass
137
138
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000139def is_valid_hash(value, algo):
140 """Returns if the value is a valid hash for the corresponding algorithm."""
141 size = 2 * algo().digest_size
142 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
143
144
145def hash_file(filepath, algo):
146 """Calculates the hash of a file without reading it all in memory at once.
147
148 |algo| should be one of hashlib hashing algorithm.
149 """
150 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000151 with open(filepath, 'rb') as f:
152 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000154 if not chunk:
155 break
156 digest.update(chunk)
157 return digest.hexdigest()
158
159
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000160def stream_read(stream, chunk_size):
161 """Reads chunks from |stream| and yields them."""
162 while True:
163 data = stream.read(chunk_size)
164 if not data:
165 break
166 yield data
167
168
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800169def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
170 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000171 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800172 if offset:
173 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000174 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000175 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000176 if not data:
177 break
178 yield data
179
180
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000181def file_write(filepath, content_generator):
182 """Writes file content as generated by content_generator.
183
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000184 Creates the intermediary directory as needed.
185
186 Returns the number of bytes written.
187
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000188 Meant to be mocked out in unit tests.
189 """
190 filedir = os.path.dirname(filepath)
191 if not os.path.isdir(filedir):
192 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000193 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000194 with open(filepath, 'wb') as f:
195 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000196 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000197 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000198 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000199
200
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000201def zip_compress(content_generator, level=7):
202 """Reads chunks from |content_generator| and yields zip compressed chunks."""
203 compressor = zlib.compressobj(level)
204 for chunk in content_generator:
205 compressed = compressor.compress(chunk)
206 if compressed:
207 yield compressed
208 tail = compressor.flush(zlib.Z_FINISH)
209 if tail:
210 yield tail
211
212
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000213def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
214 """Reads zipped data from |content_generator| and yields decompressed data.
215
216 Decompresses data in small chunks (no larger than |chunk_size|) so that
217 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
218
219 Raises IOError if data is corrupted or incomplete.
220 """
221 decompressor = zlib.decompressobj()
222 compressed_size = 0
223 try:
224 for chunk in content_generator:
225 compressed_size += len(chunk)
226 data = decompressor.decompress(chunk, chunk_size)
227 if data:
228 yield data
229 while decompressor.unconsumed_tail:
230 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
231 if data:
232 yield data
233 tail = decompressor.flush()
234 if tail:
235 yield tail
236 except zlib.error as e:
237 raise IOError(
238 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
239 # Ensure all data was read and decompressed.
240 if decompressor.unused_data or decompressor.unconsumed_tail:
241 raise IOError('Not all data was decompressed')
242
243
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000244def get_zip_compression_level(filename):
245 """Given a filename calculates the ideal zip compression level to use."""
246 file_ext = os.path.splitext(filename)[1].lower()
247 # TODO(csharp): Profile to find what compression level works best.
248 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
249
250
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000251def create_directories(base_directory, files):
252 """Creates the directory structure needed by the given list of files."""
253 logging.debug('create_directories(%s, %d)', base_directory, len(files))
254 # Creates the tree of directories to create.
255 directories = set(os.path.dirname(f) for f in files)
256 for item in list(directories):
257 while item:
258 directories.add(item)
259 item = os.path.dirname(item)
260 for d in sorted(directories):
261 if d:
262 os.mkdir(os.path.join(base_directory, d))
263
264
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500265def create_symlinks(base_directory, files):
266 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000267 for filepath, properties in files:
268 if 'l' not in properties:
269 continue
270 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500271 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000272 logging.warning('Ignoring symlink %s', filepath)
273 continue
274 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500275 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000276 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000277
278
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000279def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000280 """Determines if the given files appears valid.
281
282 Currently it just checks the file's size.
283 """
284 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000285 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000286 actual_size = os.stat(filepath).st_size
287 if size != actual_size:
288 logging.warning(
289 'Found invalid item %s; %d != %d',
290 os.path.basename(filepath), actual_size, size)
291 return False
292 return True
293
294
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000295class WorkerPool(threading_utils.AutoRetryThreadPool):
296 """Thread pool that automatically retries on IOError and runs a preconfigured
297 function.
298 """
299 # Initial and maximum number of worker threads.
300 INITIAL_WORKERS = 2
301 MAX_WORKERS = 16
302 RETRIES = 5
303
304 def __init__(self):
305 super(WorkerPool, self).__init__(
306 [IOError],
307 self.RETRIES,
308 self.INITIAL_WORKERS,
309 self.MAX_WORKERS,
310 0,
311 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000312
313
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000314class Item(object):
315 """An item to push to Storage.
316
317 It starts its life in a main thread, travels to 'contains' thread, then to
318 'push' thread and then finally back to the main thread.
319
320 It is never used concurrently from multiple threads.
321 """
322
323 def __init__(self, digest, size, is_isolated=False):
324 self.digest = digest
325 self.size = size
326 self.is_isolated = is_isolated
327 self.compression_level = 6
328 self.push_state = None
329
330 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000331 """Iterable with content of this item in chunks of given size.
332
333 Arguments:
334 chunk_size: preferred size of the chunk to produce, may be ignored.
335 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000336 raise NotImplementedError()
337
338
339class FileItem(Item):
340 """A file to push to Storage."""
341
342 def __init__(self, path, digest, size, is_isolated):
343 super(FileItem, self).__init__(digest, size, is_isolated)
344 self.path = path
345 self.compression_level = get_zip_compression_level(path)
346
347 def content(self, chunk_size):
348 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000349
350
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000351class BufferItem(Item):
352 """A byte buffer to push to Storage."""
353
354 def __init__(self, buf, algo, is_isolated=False):
355 super(BufferItem, self).__init__(
356 algo(buf).hexdigest(), len(buf), is_isolated)
357 self.buffer = buf
358
359 def content(self, _chunk_size):
360 return [self.buffer]
361
362
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000363class Storage(object):
364 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000365
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000366 def __init__(self, storage_api, use_zip):
367 self.use_zip = use_zip
368 self._storage_api = storage_api
369 self._cpu_thread_pool = None
370 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000371
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000372 @property
373 def cpu_thread_pool(self):
374 """ThreadPool for CPU-bound tasks like zipping."""
375 if self._cpu_thread_pool is None:
376 self._cpu_thread_pool = threading_utils.ThreadPool(
377 2, max(threading_utils.num_processors(), 2), 0, 'zip')
378 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000379
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000380 @property
381 def net_thread_pool(self):
382 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
383 if self._net_thread_pool is None:
384 self._net_thread_pool = WorkerPool()
385 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000386
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 def close(self):
388 """Waits for all pending tasks to finish."""
389 if self._cpu_thread_pool:
390 self._cpu_thread_pool.join()
391 self._cpu_thread_pool.close()
392 self._cpu_thread_pool = None
393 if self._net_thread_pool:
394 self._net_thread_pool.join()
395 self._net_thread_pool.close()
396 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000397
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000398 def __enter__(self):
399 """Context manager interface."""
400 return self
401
402 def __exit__(self, _exc_type, _exc_value, _traceback):
403 """Context manager interface."""
404 self.close()
405 return False
406
407 def upload_tree(self, indir, infiles):
408 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000409
410 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 indir: root directory the infiles are based in.
412 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000413
414 Returns:
415 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000416 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000417 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
418
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000419 # Convert |indir| + |infiles| into a list of FileItem objects.
420 # Filter out symlinks, since they are not represented by items on isolate
421 # server side.
422 items = [
423 FileItem(
424 path=os.path.join(indir, filepath),
425 digest=metadata['h'],
426 size=metadata['s'],
427 is_isolated=metadata.get('priority') == '0')
428 for filepath, metadata in infiles.iteritems()
429 if 'l' not in metadata
430 ]
431
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000432 return self.upload_items(items)
433
434 def upload_items(self, items):
435 """Uploads bunch of items to the isolate server.
436
437 Will upload only items that are missing.
438
439 Arguments:
440 items: list of Item instances that represents data to upload.
441
442 Returns:
443 List of items that were uploaded. All other items are already there.
444 """
445 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
446 # used by swarming.py. There's no need to spawn multiple threads and try to
447 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
448 # 'push' should be performed sequentially in the context of current thread.
449
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000450 # For each digest keep only first Item that matches it. All other items
451 # are just indistinguishable copies from the point of view of isolate
452 # server (it doesn't care about paths at all, only content and digests).
453 seen = {}
454 duplicates = 0
455 for item in items:
456 if seen.setdefault(item.digest, item) is not item:
457 duplicates += 1
458 items = seen.values()
459 if duplicates:
460 logging.info('Skipped %d duplicated files', duplicates)
461
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000463 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000465 for missing_item in self.get_missing_items(items):
466 missing.add(missing_item)
467 self.async_push(
468 channel,
469 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
470 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000471
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000472 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000473 # No need to spawn deadlock detector thread if there's nothing to upload.
474 if missing:
475 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
476 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000477 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000478 detector.ping()
479 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000480 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000481 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000482 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000483 logging.info('All files are uploaded')
484
485 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000486 total = len(items)
487 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000488 logging.info(
489 'Total: %6d, %9.1fkb',
490 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000491 total_size / 1024.)
492 cache_hit = set(items) - missing
493 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 logging.info(
495 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
496 len(cache_hit),
497 cache_hit_size / 1024.,
498 len(cache_hit) * 100. / total,
499 cache_hit_size * 100. / total_size if total_size else 0)
500 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000501 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000502 logging.info(
503 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
504 len(cache_miss),
505 cache_miss_size / 1024.,
506 len(cache_miss) * 100. / total,
507 cache_miss_size * 100. / total_size if total_size else 0)
508
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000509 return uploaded
510
511 def get_fetch_url(self, digest):
512 """Returns an URL that can be used to fetch an item with given digest.
513
514 Arguments:
515 digest: hex digest of item to fetch.
516
517 Returns:
518 An URL or None if underlying protocol doesn't support this.
519 """
520 return self._storage_api.get_fetch_url(digest)
521
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000522 def async_push(self, channel, priority, item):
523 """Starts asynchronous push to the server in a parallel thread.
524
525 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000526 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 priority: thread pool task priority for the push.
528 item: item to upload as instance of Item class.
529 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000530 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000531 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000532 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 return item
534
535 # If zipping is not required, just start a push task.
536 if not self.use_zip:
537 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000538 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 return
540
541 # If zipping is enabled, zip in a separate thread.
542 def zip_and_push():
543 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
544 # content right here. It will block until all file is zipped.
545 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000546 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
547 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000548 data = ''.join(stream)
549 except Exception as exc:
550 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800551 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000553 self.net_thread_pool.add_task_with_channel(
554 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000555 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000556
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000557 def async_fetch(self, channel, priority, digest, size, sink):
558 """Starts asynchronous fetch from the server in a parallel thread.
559
560 Arguments:
561 channel: TaskChannel that receives back |digest| when download ends.
562 priority: thread pool task priority for the fetch.
563 digest: hex digest of an item to download.
564 size: expected size of the item (after decompression).
565 sink: function that will be called as sink(generator).
566 """
567 def fetch():
568 try:
569 # Prepare reading pipeline.
570 stream = self._storage_api.fetch(digest)
571 if self.use_zip:
572 stream = zip_decompress(stream, DISK_FILE_CHUNK)
573 # Run |stream| through verifier that will assert its size.
574 verifier = FetchStreamVerifier(stream, size)
575 # Verified stream goes to |sink|.
576 sink(verifier.run())
577 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800578 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000579 raise
580 return digest
581
582 # Don't bother with zip_thread_pool for decompression. Decompression is
583 # really fast and most probably IO bound anyway.
584 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
585
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000586 def get_missing_items(self, items):
587 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000588
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000589 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000590
591 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000592 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000593
594 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000595 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000596 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000597 channel = threading_utils.TaskChannel()
598 pending = 0
599 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000600 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000601 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
602 self._storage_api.contains, batch)
603 pending += 1
604 # Yield results as they come in.
605 for _ in xrange(pending):
606 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000607 yield missing
608
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000609 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000610 def batch_items_for_check(items):
611 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000612
613 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000614 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000615
616 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000617 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000618
619 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000620 Batches of items to query for existence in a single operation,
621 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000622 """
623 batch_count = 0
624 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
625 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000626 for item in sorted(items, key=lambda x: x.size, reverse=True):
627 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628 if len(next_queries) == batch_size_limit:
629 yield next_queries
630 next_queries = []
631 batch_count += 1
632 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
633 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
634 if next_queries:
635 yield next_queries
636
637
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000638class FetchQueue(object):
639 """Fetches items from Storage and places them into LocalCache.
640
641 It manages multiple concurrent fetch operations. Acts as a bridge between
642 Storage and LocalCache so that Storage and LocalCache don't depend on each
643 other at all.
644 """
645
646 def __init__(self, storage, cache):
647 self.storage = storage
648 self.cache = cache
649 self._channel = threading_utils.TaskChannel()
650 self._pending = set()
651 self._accessed = set()
652 self._fetched = cache.cached_set()
653
654 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
655 """Starts asynchronous fetch of item |digest|."""
656 # Fetching it now?
657 if digest in self._pending:
658 return
659
660 # Mark this file as in use, verify_all_cached will later ensure it is still
661 # in cache.
662 self._accessed.add(digest)
663
664 # Already fetched? Notify cache to update item's LRU position.
665 if digest in self._fetched:
666 # 'touch' returns True if item is in cache and not corrupted.
667 if self.cache.touch(digest, size):
668 return
669 # Item is corrupted, remove it from cache and fetch it again.
670 self._fetched.remove(digest)
671 self.cache.evict(digest)
672
673 # TODO(maruel): It should look at the free disk space, the current cache
674 # size and the size of the new item on every new item:
675 # - Trim the cache as more entries are listed when free disk space is low,
676 # otherwise if the amount of data downloaded during the run > free disk
677 # space, it'll crash.
678 # - Make sure there's enough free disk space to fit all dependencies of
679 # this run! If not, abort early.
680
681 # Start fetching.
682 self._pending.add(digest)
683 self.storage.async_fetch(
684 self._channel, priority, digest, size,
685 functools.partial(self.cache.write, digest))
686
687 def wait(self, digests):
688 """Starts a loop that waits for at least one of |digests| to be retrieved.
689
690 Returns the first digest retrieved.
691 """
692 # Flush any already fetched items.
693 for digest in digests:
694 if digest in self._fetched:
695 return digest
696
697 # Ensure all requested items are being fetched now.
698 assert all(digest in self._pending for digest in digests), (
699 digests, self._pending)
700
701 # Wait for some requested item to finish fetching.
702 while self._pending:
703 digest = self._channel.pull()
704 self._pending.remove(digest)
705 self._fetched.add(digest)
706 if digest in digests:
707 return digest
708
709 # Should never reach this point due to assert above.
710 raise RuntimeError('Impossible state')
711
712 def inject_local_file(self, path, algo):
713 """Adds local file to the cache as if it was fetched from storage."""
714 with open(path, 'rb') as f:
715 data = f.read()
716 digest = algo(data).hexdigest()
717 self.cache.write(digest, [data])
718 self._fetched.add(digest)
719 return digest
720
721 @property
722 def pending_count(self):
723 """Returns number of items to be fetched."""
724 return len(self._pending)
725
726 def verify_all_cached(self):
727 """True if all accessed items are in cache."""
728 return self._accessed.issubset(self.cache.cached_set())
729
730
731class FetchStreamVerifier(object):
732 """Verifies that fetched file is valid before passing it to the LocalCache."""
733
734 def __init__(self, stream, expected_size):
735 self.stream = stream
736 self.expected_size = expected_size
737 self.current_size = 0
738
739 def run(self):
740 """Generator that yields same items as |stream|.
741
742 Verifies |stream| is complete before yielding a last chunk to consumer.
743
744 Also wraps IOError produced by consumer into MappingError exceptions since
745 otherwise Storage will retry fetch on unrelated local cache errors.
746 """
747 # Read one chunk ahead, keep it in |stored|.
748 # That way a complete stream can be verified before pushing last chunk
749 # to consumer.
750 stored = None
751 for chunk in self.stream:
752 assert chunk is not None
753 if stored is not None:
754 self._inspect_chunk(stored, is_last=False)
755 try:
756 yield stored
757 except IOError as exc:
758 raise MappingError('Failed to store an item in cache: %s' % exc)
759 stored = chunk
760 if stored is not None:
761 self._inspect_chunk(stored, is_last=True)
762 try:
763 yield stored
764 except IOError as exc:
765 raise MappingError('Failed to store an item in cache: %s' % exc)
766
767 def _inspect_chunk(self, chunk, is_last):
768 """Called for each fetched chunk before passing it to consumer."""
769 self.current_size += len(chunk)
770 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
771 (self.expected_size != self.current_size)):
772 raise IOError('Incorrect file size: expected %d, got %d' % (
773 self.expected_size, self.current_size))
774
775
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000776class StorageApi(object):
777 """Interface for classes that implement low-level storage operations."""
778
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000779 def get_fetch_url(self, digest):
780 """Returns an URL that can be used to fetch an item with given digest.
781
782 Arguments:
783 digest: hex digest of item to fetch.
784
785 Returns:
786 An URL or None if the protocol doesn't support this.
787 """
788 raise NotImplementedError()
789
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800790 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000791 """Fetches an object and yields its content.
792
793 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000794 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800795 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000796
797 Yields:
798 Chunks of downloaded item (as str objects).
799 """
800 raise NotImplementedError()
801
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000802 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000803 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000804
805 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000806 item: Item object that holds information about an item being pushed.
807 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000808
809 Returns:
810 None.
811 """
812 raise NotImplementedError()
813
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000814 def contains(self, items):
815 """Checks for existence of given |items| on the server.
816
817 Mutates |items| by assigning opaque implement specific object to Item's
818 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000819
820 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000821 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000822
823 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000824 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000825 """
826 raise NotImplementedError()
827
828
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000829class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000830 """StorageApi implementation that downloads and uploads to Isolate Server.
831
832 It uploads and downloads directly from Google Storage whenever appropriate.
833 """
834
835 class _PushState(object):
836 """State needed to call .push(), to be stored in Item.push_state."""
837 def __init__(self, upload_url, finalize_url):
838 self.upload_url = upload_url
839 self.finalize_url = finalize_url
840 self.uploaded = False
841 self.finalized = False
842
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000843 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000844 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000845 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000846 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000847 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000848 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000849 self._server_caps = None
850
851 @staticmethod
852 def _generate_handshake_request():
853 """Returns a dict to be sent as handshake request body."""
854 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
855 return {
856 'client_app_version': __version__,
857 'fetcher': True,
858 'protocol_version': ISOLATE_PROTOCOL_VERSION,
859 'pusher': True,
860 }
861
862 @staticmethod
863 def _validate_handshake_response(caps):
864 """Validates and normalizes handshake response."""
865 logging.info('Protocol version: %s', caps['protocol_version'])
866 logging.info('Server version: %s', caps['server_app_version'])
867 if caps.get('error'):
868 raise MappingError(caps['error'])
869 if not caps['access_token']:
870 raise ValueError('access_token is missing')
871 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000872
873 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000874 def _server_capabilities(self):
875 """Performs handshake with the server if not yet done.
876
877 Returns:
878 Server capabilities dictionary as returned by /handshake endpoint.
879
880 Raises:
881 MappingError if server rejects the handshake.
882 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000883 # TODO(maruel): Make this request much earlier asynchronously while the
884 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000885 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000886 if self._server_caps is None:
887 request_body = json.dumps(
888 self._generate_handshake_request(), separators=(',', ':'))
889 response = net.url_read(
890 url=self.base_url + '/content-gs/handshake',
891 data=request_body,
892 content_type='application/json',
893 method='POST')
894 if response is None:
895 raise MappingError('Failed to perform handshake.')
896 try:
897 caps = json.loads(response)
898 if not isinstance(caps, dict):
899 raise ValueError('Expecting JSON dict')
900 self._server_caps = self._validate_handshake_response(caps)
901 except (ValueError, KeyError, TypeError) as exc:
902 # KeyError exception has very confusing str conversion: it's just a
903 # missing key value and nothing else. So print exception class name
904 # as well.
905 raise MappingError('Invalid handshake response (%s): %s' % (
906 exc.__class__.__name__, exc))
907 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000908
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000909 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000910 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000911 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000912 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000913
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800914 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000915 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800916 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000917
918 # Because the app engine DB is only eventually consistent, retry 404 errors
919 # because the file might just not be visible yet (even though it has been
920 # uploaded).
921 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800922 source_url,
923 retry_404=True,
924 read_timeout=DOWNLOAD_READ_TIMEOUT,
925 headers={'Range': 'bytes=%d-' % offset} if offset else None)
926
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000927 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800928 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800929
930 # If |offset| is used, verify server respects it by checking Content-Range.
931 if offset:
932 content_range = connection.get_header('Content-Range')
933 if not content_range:
934 raise IOError('Missing Content-Range header')
935
936 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
937 # According to a spec, <size> can be '*' meaning "Total size of the file
938 # is not known in advance".
939 try:
940 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
941 if not match:
942 raise ValueError()
943 content_offset = int(match.group(1))
944 last_byte_index = int(match.group(2))
945 size = None if match.group(3) == '*' else int(match.group(3))
946 except ValueError:
947 raise IOError('Invalid Content-Range header: %s' % content_range)
948
949 # Ensure returned offset equals requested one.
950 if offset != content_offset:
951 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
952 offset, content_offset, content_range))
953
954 # Ensure entire tail of the file is returned.
955 if size is not None and last_byte_index + 1 != size:
956 raise IOError('Incomplete response. Content-Range: %s' % content_range)
957
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000958 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000959
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000960 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000961 assert isinstance(item, Item)
962 assert isinstance(item.push_state, IsolateServer._PushState)
963 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000964
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000965 # TODO(vadimsh): Do not read from |content| generator when retrying push.
966 # If |content| is indeed a generator, it can not be re-winded back
967 # to the beginning of the stream. A retry will find it exhausted. A possible
968 # solution is to wrap |content| generator with some sort of caching
969 # restartable generator. It should be done alongside streaming support
970 # implementation.
971
972 # This push operation may be a retry after failed finalization call below,
973 # no need to reupload contents in that case.
974 if not item.push_state.uploaded:
975 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
976 # upload support is implemented.
977 if isinstance(content, list) and len(content) == 1:
978 content = content[0]
979 else:
980 content = ''.join(content)
981 # PUT file to |upload_url|.
982 response = net.url_read(
983 url=item.push_state.upload_url,
984 data=content,
985 content_type='application/octet-stream',
986 method='PUT')
987 if response is None:
988 raise IOError('Failed to upload a file %s to %s' % (
989 item.digest, item.push_state.upload_url))
990 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000991 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000992 logging.info(
993 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000994
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000995 # Optionally notify the server that it's done.
996 if item.push_state.finalize_url:
997 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
998 # send it to isolated server. That way isolate server can verify that
999 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1000 # stored files).
1001 response = net.url_read(
1002 url=item.push_state.finalize_url,
1003 data='',
1004 content_type='application/json',
1005 method='POST')
1006 if response is None:
1007 raise IOError('Failed to finalize an upload of %s' % item.digest)
1008 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001009
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001010 def contains(self, items):
1011 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001012
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001013 # Request body is a json encoded list of dicts.
1014 body = [
1015 {
1016 'h': item.digest,
1017 's': item.size,
1018 'i': int(item.is_isolated),
1019 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001020 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001021
1022 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1023 self.base_url,
1024 self.namespace,
1025 urllib.quote(self._server_capabilities['access_token']))
1026 response_body = net.url_read(
1027 url=query_url,
1028 data=json.dumps(body, separators=(',', ':')),
1029 content_type='application/json',
1030 method='POST')
1031 if response_body is None:
1032 raise MappingError('Failed to execute /pre-upload query')
1033
1034 # Response body is a list of push_urls (or null if file is already present).
1035 try:
1036 response = json.loads(response_body)
1037 if not isinstance(response, list):
1038 raise ValueError('Expecting response with json-encoded list')
1039 if len(response) != len(items):
1040 raise ValueError(
1041 'Incorrect number of items in the list, expected %d, '
1042 'but got %d' % (len(items), len(response)))
1043 except ValueError as err:
1044 raise MappingError(
1045 'Invalid response from server: %s, body is %s' % (err, response_body))
1046
1047 # Pick Items that are missing, attach _PushState to them.
1048 missing_items = []
1049 for i, push_urls in enumerate(response):
1050 if push_urls:
1051 assert len(push_urls) == 2, str(push_urls)
1052 item = items[i]
1053 assert item.push_state is None
1054 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
1055 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001056 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001057 len(items), len(items) - len(missing_items))
1058 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001059
1060
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001061class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001062 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001063
1064 The common use case is a NFS/CIFS file server that is mounted locally that is
1065 used to fetch the file on a local partition.
1066 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001067
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001068 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001069 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001070 self.base_path = base_path
1071
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001072 def get_fetch_url(self, digest):
1073 return None
1074
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001075 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001076 assert isinstance(digest, basestring)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001077 return file_read(os.path.join(self.base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001078
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001079 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001080 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001081 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001082
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001083 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001084 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001085 item for item in items
1086 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001087 ]
1088
1089
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001090class LocalCache(object):
1091 """Local cache that stores objects fetched via Storage.
1092
1093 It can be accessed concurrently from multiple threads, so it should protect
1094 its internal state with some lock.
1095 """
1096
1097 def __enter__(self):
1098 """Context manager interface."""
1099 return self
1100
1101 def __exit__(self, _exc_type, _exec_value, _traceback):
1102 """Context manager interface."""
1103 return False
1104
1105 def cached_set(self):
1106 """Returns a set of all cached digests (always a new object)."""
1107 raise NotImplementedError()
1108
1109 def touch(self, digest, size):
1110 """Ensures item is not corrupted and updates its LRU position.
1111
1112 Arguments:
1113 digest: hash digest of item to check.
1114 size: expected size of this item.
1115
1116 Returns:
1117 True if item is in cache and not corrupted.
1118 """
1119 raise NotImplementedError()
1120
1121 def evict(self, digest):
1122 """Removes item from cache if it's there."""
1123 raise NotImplementedError()
1124
1125 def read(self, digest):
1126 """Returns contents of the cached item as a single str."""
1127 raise NotImplementedError()
1128
1129 def write(self, digest, content):
1130 """Reads data from |content| generator and stores it in cache."""
1131 raise NotImplementedError()
1132
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001133 def hardlink(self, digest, dest, file_mode):
1134 """Ensures file at |dest| has same content as cached |digest|.
1135
1136 If file_mode is provided, it is used to set the executable bit if
1137 applicable.
1138 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001139 raise NotImplementedError()
1140
1141
1142class MemoryCache(LocalCache):
1143 """LocalCache implementation that stores everything in memory."""
1144
1145 def __init__(self):
1146 super(MemoryCache, self).__init__()
1147 # Let's not assume dict is thread safe.
1148 self._lock = threading.Lock()
1149 self._contents = {}
1150
1151 def cached_set(self):
1152 with self._lock:
1153 return set(self._contents)
1154
1155 def touch(self, digest, size):
1156 with self._lock:
1157 return digest in self._contents
1158
1159 def evict(self, digest):
1160 with self._lock:
1161 self._contents.pop(digest, None)
1162
1163 def read(self, digest):
1164 with self._lock:
1165 return self._contents[digest]
1166
1167 def write(self, digest, content):
1168 # Assemble whole stream before taking the lock.
1169 data = ''.join(content)
1170 with self._lock:
1171 self._contents[digest] = data
1172
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001173 def hardlink(self, digest, dest, file_mode):
1174 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001175 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001176 if file_mode is not None:
1177 # Ignores all other bits.
1178 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001179
1180
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001181def get_hash_algo(_namespace):
1182 """Return hash algorithm class to use when uploading to given |namespace|."""
1183 # TODO(vadimsh): Implement this at some point.
1184 return hashlib.sha1
1185
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001186
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001187def is_namespace_with_compression(namespace):
1188 """Returns True if given |namespace| stores compressed objects."""
1189 return namespace.endswith(('-gzip', '-deflate'))
1190
1191
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001192def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001193 """Returns an object that implements StorageApi interface."""
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001194 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001195 return IsolateServer(file_or_url, namespace)
1196 else:
1197 return FileSystem(file_or_url)
1198
1199
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001200def get_storage(file_or_url, namespace):
1201 """Returns Storage class configured with appropriate StorageApi instance."""
1202 return Storage(
1203 get_storage_api(file_or_url, namespace),
1204 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001205
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001206
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001207def expand_symlinks(indir, relfile):
1208 """Follows symlinks in |relfile|, but treating symlinks that point outside the
1209 build tree as if they were ordinary directories/files. Returns the final
1210 symlink-free target and a list of paths to symlinks encountered in the
1211 process.
1212
1213 The rule about symlinks outside the build tree is for the benefit of the
1214 Chromium OS ebuild, which symlinks the output directory to an unrelated path
1215 in the chroot.
1216
1217 Fails when a directory loop is detected, although in theory we could support
1218 that case.
1219 """
1220 is_directory = relfile.endswith(os.path.sep)
1221 done = indir
1222 todo = relfile.strip(os.path.sep)
1223 symlinks = []
1224
1225 while todo:
1226 pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1227 done, todo)
1228 if not symlink:
1229 todo = file_path.fix_native_path_case(done, todo)
1230 done = os.path.join(done, todo)
1231 break
1232 symlink_path = os.path.join(done, pre_symlink, symlink)
1233 post_symlink = post_symlink.lstrip(os.path.sep)
1234 # readlink doesn't exist on Windows.
1235 # pylint: disable=E1101
1236 target = os.path.normpath(os.path.join(done, pre_symlink))
1237 symlink_target = os.readlink(symlink_path)
1238 if os.path.isabs(symlink_target):
1239 # Absolute path are considered a normal directories. The use case is
1240 # generally someone who puts the output directory on a separate drive.
1241 target = symlink_target
1242 else:
1243 # The symlink itself could be using the wrong path case.
1244 target = file_path.fix_native_path_case(target, symlink_target)
1245
1246 if not os.path.exists(target):
1247 raise MappingError(
1248 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1249 target = file_path.get_native_path_case(target)
1250 if not file_path.path_starts_with(indir, target):
1251 done = symlink_path
1252 todo = post_symlink
1253 continue
1254 if file_path.path_starts_with(target, symlink_path):
1255 raise MappingError(
1256 'Can\'t map recursive symlink reference %s -> %s' %
1257 (symlink_path, target))
1258 logging.info('Found symlink: %s -> %s', symlink_path, target)
1259 symlinks.append(os.path.relpath(symlink_path, indir))
1260 # Treat the common prefix of the old and new paths as done, and start
1261 # scanning again.
1262 target = target.split(os.path.sep)
1263 symlink_path = symlink_path.split(os.path.sep)
1264 prefix_length = 0
1265 for target_piece, symlink_path_piece in zip(target, symlink_path):
1266 if target_piece == symlink_path_piece:
1267 prefix_length += 1
1268 else:
1269 break
1270 done = os.path.sep.join(target[:prefix_length])
1271 todo = os.path.join(
1272 os.path.sep.join(target[prefix_length:]), post_symlink)
1273
1274 relfile = os.path.relpath(done, indir)
1275 relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1276 return relfile, symlinks
1277
1278
1279def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1280 """Expands a single input. It can result in multiple outputs.
1281
1282 This function is recursive when relfile is a directory.
1283
1284 Note: this code doesn't properly handle recursive symlink like one created
1285 with:
1286 ln -s .. foo
1287 """
1288 if os.path.isabs(relfile):
1289 raise MappingError('Can\'t map absolute path %s' % relfile)
1290
1291 infile = file_path.normpath(os.path.join(indir, relfile))
1292 if not infile.startswith(indir):
1293 raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1294
1295 filepath = os.path.join(indir, relfile)
1296 native_filepath = file_path.get_native_path_case(filepath)
1297 if filepath != native_filepath:
1298 # Special case './'.
1299 if filepath != native_filepath + '.' + os.path.sep:
1300 # Give up enforcing strict path case on OSX. Really, it's that sad. The
1301 # case where it happens is very specific and hard to reproduce:
1302 # get_native_path_case(
1303 # u'Foo.framework/Versions/A/Resources/Something.nib') will return
1304 # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1305 #
1306 # Note that this is really something deep in OSX because running
1307 # ls Foo.framework/Versions/A
1308 # will print out 'Resources', while file_path.get_native_path_case()
1309 # returns a lower case 'r'.
1310 #
1311 # So *something* is happening under the hood resulting in the command 'ls'
1312 # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
1313 # have no idea why.
1314 if sys.platform != 'darwin':
1315 raise MappingError(
1316 'File path doesn\'t equal native file path\n%s != %s' %
1317 (filepath, native_filepath))
1318
1319 symlinks = []
1320 if follow_symlinks:
1321 relfile, symlinks = expand_symlinks(indir, relfile)
1322
1323 if relfile.endswith(os.path.sep):
1324 if not os.path.isdir(infile):
1325 raise MappingError(
1326 '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1327
1328 # Special case './'.
1329 if relfile.startswith('.' + os.path.sep):
1330 relfile = relfile[2:]
1331 outfiles = symlinks
1332 try:
1333 for filename in os.listdir(infile):
1334 inner_relfile = os.path.join(relfile, filename)
1335 if blacklist and blacklist(inner_relfile):
1336 continue
1337 if os.path.isdir(os.path.join(indir, inner_relfile)):
1338 inner_relfile += os.path.sep
1339 outfiles.extend(
1340 expand_directory_and_symlink(indir, inner_relfile, blacklist,
1341 follow_symlinks))
1342 return outfiles
1343 except OSError as e:
1344 raise MappingError(
1345 'Unable to iterate over directory %s.\n%s' % (infile, e))
1346 else:
1347 # Always add individual files even if they were blacklisted.
1348 if os.path.isdir(infile):
1349 raise MappingError(
1350 'Input directory %s must have a trailing slash' % infile)
1351
1352 if not os.path.isfile(infile):
1353 raise MappingError('Input file %s doesn\'t exist' % infile)
1354
1355 return symlinks + [relfile]
1356
1357
1358def process_input(filepath, prevdict, read_only, flavor, algo):
1359 """Processes an input file, a dependency, and return meta data about it.
1360
1361 Behaviors:
1362 - Retrieves the file mode, file size, file timestamp, file link
1363 destination if it is a file link and calcultate the SHA-1 of the file's
1364 content if the path points to a file and not a symlink.
1365
1366 Arguments:
1367 filepath: File to act on.
1368 prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1369 to skip recalculating the hash. Optional.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001370 read_only: If 1 or 2, the file mode is manipulated. In practice, only save
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001371 one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1372 windows, mode is not set since all files are 'executable' by
1373 default.
1374 flavor: One isolated flavor, like 'linux', 'mac' or 'win'.
1375 algo: Hashing algorithm used.
1376
1377 Returns:
1378 The necessary data to create a entry in the 'files' section of an .isolated
1379 file.
1380 """
1381 out = {}
1382 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1383 # if prevdict.get('T') == True:
1384 # # The file's content is ignored. Skip the time and hard code mode.
1385 # if get_flavor() != 'win':
1386 # out['m'] = stat.S_IRUSR | stat.S_IRGRP
1387 # out['s'] = 0
1388 # out['h'] = algo().hexdigest()
1389 # out['T'] = True
1390 # return out
1391
1392 # Always check the file stat and check if it is a link. The timestamp is used
1393 # to know if the file's content/symlink destination should be looked into.
1394 # E.g. only reuse from prevdict if the timestamp hasn't changed.
1395 # There is the risk of the file's timestamp being reset to its last value
1396 # manually while its content changed. We don't protect against that use case.
1397 try:
1398 filestats = os.lstat(filepath)
1399 except OSError:
1400 # The file is not present.
1401 raise MappingError('%s is missing' % filepath)
1402 is_link = stat.S_ISLNK(filestats.st_mode)
1403
1404 if flavor != 'win':
1405 # Ignore file mode on Windows since it's not really useful there.
1406 filemode = stat.S_IMODE(filestats.st_mode)
1407 # Remove write access for group and all access to 'others'.
1408 filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1409 if read_only:
1410 filemode &= ~stat.S_IWUSR
1411 if filemode & stat.S_IXUSR:
1412 filemode |= stat.S_IXGRP
1413 else:
1414 filemode &= ~stat.S_IXGRP
1415 if not is_link:
1416 out['m'] = filemode
1417
1418 # Used to skip recalculating the hash or link destination. Use the most recent
1419 # update time.
1420 # TODO(maruel): Save it in the .state file instead of .isolated so the
1421 # .isolated file is deterministic.
1422 out['t'] = int(round(filestats.st_mtime))
1423
1424 if not is_link:
1425 out['s'] = filestats.st_size
1426 # If the timestamp wasn't updated and the file size is still the same, carry
1427 # on the sha-1.
1428 if (prevdict.get('t') == out['t'] and
1429 prevdict.get('s') == out['s']):
1430 # Reuse the previous hash if available.
1431 out['h'] = prevdict.get('h')
1432 if not out.get('h'):
1433 out['h'] = hash_file(filepath, algo)
1434 else:
1435 # If the timestamp wasn't updated, carry on the link destination.
1436 if prevdict.get('t') == out['t']:
1437 # Reuse the previous link destination if available.
1438 out['l'] = prevdict.get('l')
1439 if out.get('l') is None:
1440 # The link could be in an incorrect path case. In practice, this only
1441 # happen on OSX on case insensitive HFS.
1442 # TODO(maruel): It'd be better if it was only done once, in
1443 # expand_directory_and_symlink(), so it would not be necessary to do again
1444 # here.
1445 symlink_value = os.readlink(filepath) # pylint: disable=E1101
1446 filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1447 native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1448 out['l'] = os.path.relpath(native_dest, filedir)
1449 return out
1450
1451
1452def save_isolated(isolated, data):
1453 """Writes one or multiple .isolated files.
1454
1455 Note: this reference implementation does not create child .isolated file so it
1456 always returns an empty list.
1457
1458 Returns the list of child isolated files that are included by |isolated|.
1459 """
1460 # Make sure the data is valid .isolated data by 'reloading' it.
1461 algo = SUPPORTED_ALGOS[data['algo']]
1462 load_isolated(json.dumps(data), data.get('flavor'), algo)
1463 tools.write_json(isolated, data, True)
1464 return []
1465
1466
1467
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001468def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001469 """Uploads the given tree to the given url.
1470
1471 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001472 base_url: The base url, it is assume that |base_url|/has/ can be used to
1473 query if an element was already uploaded, and |base_url|/store/
1474 can be used to upload a new element.
1475 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001476 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001477 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001478 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001479 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001480 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001481 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001482
1483
maruel@chromium.org41601642013-09-18 19:40:46 +00001484def load_isolated(content, os_flavor, algo):
1485 """Verifies the .isolated file is valid and loads this object with the json
1486 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001487
1488 Arguments:
1489 - content: raw serialized content to load.
1490 - os_flavor: OS to load this file on. Optional.
1491 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1492 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001493 """
1494 try:
1495 data = json.loads(content)
1496 except ValueError:
1497 raise ConfigError('Failed to parse: %s...' % content[:100])
1498
1499 if not isinstance(data, dict):
1500 raise ConfigError('Expected dict, got %r' % data)
1501
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001502 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001503 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001504 value = data.get('version', ISOLATED_FILE_VERSION)
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001505 if not isinstance(value, basestring):
1506 raise ConfigError('Expected string, got %r' % value)
1507 if not re.match(r'^(\d+)\.(\d+)$', value):
1508 raise ConfigError('Expected a compatible version, got %r' % value)
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001509 if value.split('.', 1)[0] != ISOLATED_FILE_VERSION.split('.', 1)[0]:
1510 raise ConfigError(
1511 'Expected compatible \'%s\' version, got %r' %
1512 (ISOLATED_FILE_VERSION, value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001513
1514 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001515 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001516 # Default the algorithm used in the .isolated file itself, falls back to
1517 # 'sha-1' if unspecified.
1518 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1519
maruel@chromium.org41601642013-09-18 19:40:46 +00001520 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001521 if key == 'algo':
1522 if not isinstance(value, basestring):
1523 raise ConfigError('Expected string, got %r' % value)
1524 if value not in SUPPORTED_ALGOS:
1525 raise ConfigError(
1526 'Expected one of \'%s\', got %r' %
1527 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1528 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1529 raise ConfigError(
1530 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1531
1532 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001533 if not isinstance(value, list):
1534 raise ConfigError('Expected list, got %r' % value)
1535 if not value:
1536 raise ConfigError('Expected non-empty command')
1537 for subvalue in value:
1538 if not isinstance(subvalue, basestring):
1539 raise ConfigError('Expected string, got %r' % subvalue)
1540
1541 elif key == 'files':
1542 if not isinstance(value, dict):
1543 raise ConfigError('Expected dict, got %r' % value)
1544 for subkey, subvalue in value.iteritems():
1545 if not isinstance(subkey, basestring):
1546 raise ConfigError('Expected string, got %r' % subkey)
1547 if not isinstance(subvalue, dict):
1548 raise ConfigError('Expected dict, got %r' % subvalue)
1549 for subsubkey, subsubvalue in subvalue.iteritems():
1550 if subsubkey == 'l':
1551 if not isinstance(subsubvalue, basestring):
1552 raise ConfigError('Expected string, got %r' % subsubvalue)
1553 elif subsubkey == 'm':
1554 if not isinstance(subsubvalue, int):
1555 raise ConfigError('Expected int, got %r' % subsubvalue)
1556 elif subsubkey == 'h':
1557 if not is_valid_hash(subsubvalue, algo):
1558 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1559 elif subsubkey == 's':
Marc-Antoine Ruelaab3a622013-11-28 09:47:05 -05001560 if not isinstance(subsubvalue, (int, long)):
1561 raise ConfigError('Expected int or long, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001562 else:
1563 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001564 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001565 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001566 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1567 subvalue)
1568 if bool('h' in subvalue) != bool('s' in subvalue):
1569 raise ConfigError(
1570 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1571 subvalue)
1572 if bool('s' in subvalue) == bool('l' in subvalue):
1573 raise ConfigError(
1574 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1575 subvalue)
1576 if bool('l' in subvalue) and bool('m' in subvalue):
1577 raise ConfigError(
1578 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001579 subvalue)
1580
1581 elif key == 'includes':
1582 if not isinstance(value, list):
1583 raise ConfigError('Expected list, got %r' % value)
1584 if not value:
1585 raise ConfigError('Expected non-empty includes list')
1586 for subvalue in value:
1587 if not is_valid_hash(subvalue, algo):
1588 raise ConfigError('Expected sha-1, got %r' % subvalue)
1589
1590 elif key == 'read_only':
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001591 if not value in (0, 1, 2):
1592 raise ConfigError('Expected 0, 1 or 2, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001593
1594 elif key == 'relative_cwd':
1595 if not isinstance(value, basestring):
1596 raise ConfigError('Expected string, got %r' % value)
1597
1598 elif key == 'os':
1599 if os_flavor and value != os_flavor:
1600 raise ConfigError(
1601 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1602 (os_flavor, value))
1603
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001604 elif key == 'version':
1605 # Already checked above.
1606 pass
1607
maruel@chromium.org41601642013-09-18 19:40:46 +00001608 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001609 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001610
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001611 # Automatically fix os.path.sep if necessary. While .isolated files are always
1612 # in the the native path format, someone could want to download an .isolated
1613 # tree from another OS.
1614 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1615 if 'files' in data:
1616 data['files'] = dict(
1617 (k.replace(wrong_path_sep, os.path.sep), v)
1618 for k, v in data['files'].iteritems())
1619 for v in data['files'].itervalues():
1620 if 'l' in v:
1621 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1622 if 'relative_cwd' in data:
1623 data['relative_cwd'] = data['relative_cwd'].replace(
1624 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001625 return data
1626
1627
1628class IsolatedFile(object):
1629 """Represents a single parsed .isolated file."""
1630 def __init__(self, obj_hash, algo):
1631 """|obj_hash| is really the sha-1 of the file."""
1632 logging.debug('IsolatedFile(%s)' % obj_hash)
1633 self.obj_hash = obj_hash
1634 self.algo = algo
1635 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1636 # .isolate and all the .isolated files recursively included by it with
1637 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1638 # .isolated file in the hash table, is important, as the later ones are not
1639 # processed until the firsts are retrieved and read.
1640 self.can_fetch = False
1641
1642 # Raw data.
1643 self.data = {}
1644 # A IsolatedFile instance, one per object in self.includes.
1645 self.children = []
1646
1647 # Set once the .isolated file is loaded.
1648 self._is_parsed = False
1649 # Set once the files are fetched.
1650 self.files_fetched = False
1651
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001652 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001653 """Verifies the .isolated file is valid and loads this object with the json
1654 data.
1655 """
1656 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1657 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001658 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001659 self.children = [
1660 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1661 ]
1662 self._is_parsed = True
1663
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001664 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001665 """Adds files in this .isolated file not present in |files| dictionary.
1666
1667 Preemptively request files.
1668
1669 Note that |files| is modified by this function.
1670 """
1671 assert self.can_fetch
1672 if not self._is_parsed or self.files_fetched:
1673 return
1674 logging.debug('fetch_files(%s)' % self.obj_hash)
1675 for filepath, properties in self.data.get('files', {}).iteritems():
1676 # Root isolated has priority on the files being mapped. In particular,
1677 # overriden files must not be fetched.
1678 if filepath not in files:
1679 files[filepath] = properties
1680 if 'h' in properties:
1681 # Preemptively request files.
1682 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001683 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001684 self.files_fetched = True
1685
1686
1687class Settings(object):
1688 """Results of a completely parsed .isolated file."""
1689 def __init__(self):
1690 self.command = []
1691 self.files = {}
1692 self.read_only = None
1693 self.relative_cwd = None
1694 # The main .isolated file, a IsolatedFile instance.
1695 self.root = None
1696
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001697 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001698 """Loads the .isolated and all the included .isolated asynchronously.
1699
1700 It enables support for "included" .isolated files. They are processed in
1701 strict order but fetched asynchronously from the cache. This is important so
1702 that a file in an included .isolated file that is overridden by an embedding
1703 .isolated file is not fetched needlessly. The includes are fetched in one
1704 pass and the files are fetched as soon as all the ones on the left-side
1705 of the tree were fetched.
1706
1707 The prioritization is very important here for nested .isolated files.
1708 'includes' have the highest priority and the algorithm is optimized for both
1709 deep and wide trees. A deep one is a long link of .isolated files referenced
1710 one at a time by one item in 'includes'. A wide one has a large number of
1711 'includes' in a single .isolated file. 'left' is defined as an included
1712 .isolated file earlier in the 'includes' list. So the order of the elements
1713 in 'includes' is important.
1714 """
1715 self.root = IsolatedFile(root_isolated_hash, algo)
1716
1717 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1718 pending = {}
1719 # Set of hashes of already retrieved items to refuse recursive includes.
1720 seen = set()
1721
1722 def retrieve(isolated_file):
1723 h = isolated_file.obj_hash
1724 if h in seen:
1725 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1726 assert h not in pending
1727 seen.add(h)
1728 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001729 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001730
1731 retrieve(self.root)
1732
1733 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001734 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001735 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001736 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001737 if item_hash == root_isolated_hash:
1738 # It's the root item.
1739 item.can_fetch = True
1740
1741 for new_child in item.children:
1742 retrieve(new_child)
1743
1744 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001745 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001746
1747 def check(n):
1748 return all(check(x) for x in n.children) and n.files_fetched
1749 assert check(self.root)
1750
1751 self.relative_cwd = self.relative_cwd or ''
maruel@chromium.org41601642013-09-18 19:40:46 +00001752
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001753 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001754 if node.can_fetch:
1755 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001756 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001757 will_break = False
1758 for i in node.children:
1759 if not i.can_fetch:
1760 if will_break:
1761 break
1762 # Automatically mark the first one as fetcheable.
1763 i.can_fetch = True
1764 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001765 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001766
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001767 def _update_self(self, fetch_queue, node):
1768 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001769 # Grabs properties.
1770 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001771 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001772 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001773 if self.command:
1774 self.command[0] = self.command[0].replace('/', os.path.sep)
1775 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001776 if self.read_only is None and node.data.get('read_only') is not None:
1777 self.read_only = node.data['read_only']
1778 if (self.relative_cwd is None and
1779 node.data.get('relative_cwd') is not None):
1780 self.relative_cwd = node.data['relative_cwd']
1781
1782
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001783def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001784 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001785 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001786
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001787 Arguments:
1788 isolated_hash: hash of the root *.isolated file.
1789 storage: Storage class that communicates with isolate storage.
1790 cache: LocalCache class that knows how to store and map files locally.
1791 algo: hash algorithm to use.
1792 outdir: Output directory to map file tree to.
1793 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1794 require_command: Ensure *.isolated specifies a command to run.
1795
1796 Returns:
1797 Settings object that holds details about loaded *.isolated file.
1798 """
1799 with cache:
1800 fetch_queue = FetchQueue(storage, cache)
1801 settings = Settings()
1802
1803 with tools.Profiler('GetIsolateds'):
1804 # Optionally support local files by manually adding them to cache.
1805 if not is_valid_hash(isolated_hash, algo):
1806 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1807
1808 # Load all *.isolated and start loading rest of the files.
1809 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001810 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001811 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1812 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001813 raise ConfigError('No command to run')
1814
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001815 with tools.Profiler('GetRest'):
1816 # Create file system hierarchy.
1817 if not os.path.isdir(outdir):
1818 os.makedirs(outdir)
1819 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001820 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001821
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001822 # Ensure working directory exists.
1823 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1824 if not os.path.isdir(cwd):
1825 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001826
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001827 # Multimap: digest -> list of pairs (path, props).
1828 remaining = {}
1829 for filepath, props in settings.files.iteritems():
1830 if 'h' in props:
1831 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001832
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001833 # Now block on the remaining files to be downloaded and mapped.
1834 logging.info('Retrieving remaining files (%d of them)...',
1835 fetch_queue.pending_count)
1836 last_update = time.time()
1837 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1838 while remaining:
1839 detector.ping()
1840
1841 # Wait for any item to finish fetching to cache.
1842 digest = fetch_queue.wait(remaining)
1843
1844 # Link corresponding files to a fetched item in cache.
1845 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001846 cache.hardlink(
1847 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001848
1849 # Report progress.
1850 duration = time.time() - last_update
1851 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1852 msg = '%d files remaining...' % len(remaining)
1853 print msg
1854 logging.info(msg)
1855 last_update = time.time()
1856
1857 # Cache could evict some items we just tried to fetch, it's a fatal error.
1858 if not fetch_queue.verify_all_cached():
1859 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001860 return settings
1861
1862
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001863def directory_to_metadata(root, algo, blacklist):
1864 """Returns the FileItem list and .isolated metadata for a directory."""
1865 root = file_path.get_native_path_case(root)
1866 metadata = dict(
1867 (relpath, process_input(
1868 os.path.join(root, relpath), {}, False, sys.platform, algo))
1869 for relpath in expand_directory_and_symlink(
1870 root, './', blacklist, True)
1871 )
1872 for v in metadata.itervalues():
1873 v.pop('t')
1874 items = [
1875 FileItem(
1876 path=os.path.join(root, relpath),
1877 digest=meta['h'],
1878 size=meta['s'],
1879 is_isolated=relpath.endswith('.isolated'))
1880 for relpath, meta in metadata.iteritems() if 'h' in meta
1881 ]
1882 return items, metadata
1883
1884
1885def archive(storage, algo, files, blacklist):
1886 """Stores every entries and returns the relevant data."""
1887 assert all(isinstance(i, unicode) for i in files), files
1888 if len(files) != len(set(map(os.path.abspath, files))):
1889 raise Error('Duplicate entries found.')
1890
1891 results = []
1892 # The temporary directory is only created as needed.
1893 tempdir = None
1894 try:
1895 # TODO(maruel): Yield the files to a worker thread.
1896 items_to_upload = []
1897 for f in files:
1898 try:
1899 filepath = os.path.abspath(f)
1900 if os.path.isdir(filepath):
1901 # Uploading a whole directory.
1902 items, metadata = directory_to_metadata(filepath, algo, blacklist)
1903
1904 # Create the .isolated file.
1905 if not tempdir:
1906 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1907 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1908 os.close(handle)
1909 data = {
1910 'algo': SUPPORTED_ALGOS_REVERSE[algo],
1911 'files': metadata,
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001912 'version': ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001913 }
1914 save_isolated(isolated, data)
1915 h = hash_file(isolated, algo)
1916 items_to_upload.extend(items)
1917 items_to_upload.append(
1918 FileItem(
1919 path=isolated,
1920 digest=h,
1921 size=os.stat(isolated).st_size,
1922 is_isolated=True))
1923 results.append((h, f))
1924
1925 elif os.path.isfile(filepath):
1926 h = hash_file(filepath, algo)
1927 items_to_upload.append(
1928 FileItem(
1929 path=filepath,
1930 digest=h,
1931 size=os.stat(filepath).st_size,
1932 is_isolated=f.endswith('.isolated')))
1933 results.append((h, f))
1934 else:
1935 raise Error('%s is neither a file or directory.' % f)
1936 except OSError:
1937 raise Error('Failed to process %s.' % f)
1938 # Technically we would care about the uploaded files but we don't much in
1939 # practice.
1940 _uploaded_files = storage.upload_items(items_to_upload)
1941 return results
1942 finally:
1943 if tempdir:
1944 shutil.rmtree(tempdir)
1945
1946
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001947@subcommand.usage('<file1..fileN> or - to read from stdin')
1948def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001949 """Archives data to the server.
1950
1951 If a directory is specified, a .isolated file is created the whole directory
1952 is uploaded. Then this .isolated file can be included in another one to run
1953 commands.
1954
1955 The commands output each file that was processed with its content hash. For
1956 directories, the .isolated generated for the directory is listed as the
1957 directory entry itself.
1958 """
1959 parser.add_option(
1960 '--blacklist',
1961 action='append', default=list(DEFAULT_BLACKLIST),
1962 help='List of regexp to use as blacklist filter when uploading '
1963 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001964 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001965
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001966 if files == ['-']:
1967 files = sys.stdin.readlines()
1968
1969 if not files:
1970 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001971
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001972 files = [f.decode('utf-8') for f in files]
1973 algo = get_hash_algo(options.namespace)
1974 blacklist = tools.gen_blacklist(options.blacklist)
1975 try:
1976 with get_storage(options.isolate_server, options.namespace) as storage:
1977 results = archive(storage, algo, files, blacklist)
1978 except Error as e:
1979 parser.error(e.args[0])
1980 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1981 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001982
1983
1984def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001985 """Download data from the server.
1986
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001987 It can either download individual files or a complete tree from a .isolated
1988 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001989 """
1990 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001991 '-i', '--isolated', metavar='HASH',
1992 help='hash of an isolated file, .isolated file content is discarded, use '
1993 '--file if you need it')
1994 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001995 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1996 help='hash and destination of a file, can be used multiple times')
1997 parser.add_option(
1998 '-t', '--target', metavar='DIR', default=os.getcwd(),
1999 help='destination directory')
2000 options, args = parser.parse_args(args)
2001 if args:
2002 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002003 if bool(options.isolated) == bool(options.file):
2004 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002005
2006 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002007
Vadim Shtayura3172be52013-12-03 12:49:05 -08002008 with get_storage(options.isolate_server, options.namespace) as storage:
2009 # Fetching individual files.
2010 if options.file:
2011 channel = threading_utils.TaskChannel()
2012 pending = {}
2013 for digest, dest in options.file:
2014 pending[digest] = dest
2015 storage.async_fetch(
2016 channel,
2017 WorkerPool.MED,
2018 digest,
2019 UNKNOWN_FILE_SIZE,
2020 functools.partial(file_write, os.path.join(options.target, dest)))
2021 while pending:
2022 fetched = channel.pull()
2023 dest = pending.pop(fetched)
2024 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002025
Vadim Shtayura3172be52013-12-03 12:49:05 -08002026 # Fetching whole isolated tree.
2027 if options.isolated:
2028 settings = fetch_isolated(
2029 isolated_hash=options.isolated,
2030 storage=storage,
2031 cache=MemoryCache(),
2032 algo=get_hash_algo(options.namespace),
2033 outdir=options.target,
2034 os_flavor=None,
2035 require_command=False)
2036 rel = os.path.join(options.target, settings.relative_cwd)
2037 print('To run this test please run from the directory %s:' %
2038 os.path.join(options.target, rel))
2039 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002040
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002041 return 0
2042
2043
2044class OptionParserIsolateServer(tools.OptionParserWithLogging):
2045 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002046 tools.OptionParserWithLogging.__init__(
2047 self,
2048 version=__version__,
2049 prog=os.path.basename(sys.modules[__name__].__file__),
2050 **kwargs)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002051 self.add_option(
2052 '-I', '--isolate-server',
Kevin Graney5346c162014-01-24 12:20:01 -05002053 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00002054 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002055 self.add_option(
2056 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002057 help='The namespace to use on the server, default: %default')
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002058 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002059
2060 def parse_args(self, *args, **kwargs):
2061 options, args = tools.OptionParserWithLogging.parse_args(
2062 self, *args, **kwargs)
2063 options.isolate_server = options.isolate_server.rstrip('/')
2064 if not options.isolate_server:
2065 self.error('--isolate-server is required.')
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002066 auth.process_auth_options(options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002067 return options, args
2068
2069
2070def main(args):
2071 dispatcher = subcommand.CommandDispatcher(__name__)
2072 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002073 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00002074 except Exception as e:
2075 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002076 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002077
2078
2079if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002080 fix_encoding.fix_encoding()
2081 tools.disable_buffering()
2082 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002083 sys.exit(main(sys.argv[1:]))